1use std::collections::HashMap;
21use std::str::FromStr;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use futures::lock::{Mutex, MutexGuard};
26use itertools::Itertools;
27
28use super::namespace_state::NamespaceState;
29use crate::io::{FileIO, FileIOBuilder, MemoryStorageFactory, StorageFactory};
30use crate::runtime::Runtime;
31use crate::spec::{TableMetadata, TableMetadataBuilder};
32use crate::table::Table;
33use crate::{
34 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
35 TableCommit, TableCreation, TableIdent,
36};
37
38pub const MEMORY_CATALOG_WAREHOUSE: &str = "warehouse";
40
41const LOCATION: &str = "location";
43
44#[derive(Debug)]
46pub struct MemoryCatalogBuilder {
47 config: MemoryCatalogConfig,
48 storage_factory: Option<Arc<dyn StorageFactory>>,
49 runtime: Option<Runtime>,
50}
51
52impl Default for MemoryCatalogBuilder {
53 fn default() -> Self {
54 Self {
55 config: MemoryCatalogConfig {
56 name: None,
57 warehouse: "".to_string(),
58 props: HashMap::new(),
59 },
60 storage_factory: None,
61 runtime: None,
62 }
63 }
64}
65
66impl CatalogBuilder for MemoryCatalogBuilder {
67 type C = MemoryCatalog;
68
69 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
70 self.storage_factory = Some(storage_factory);
71 self
72 }
73
74 fn with_runtime(mut self, runtime: Runtime) -> Self {
75 self.runtime = Some(runtime);
76 self
77 }
78
79 fn load(
80 mut self,
81 name: impl Into<String>,
82 props: HashMap<String, String>,
83 ) -> impl Future<Output = Result<Self::C>> + Send {
84 self.config.name = Some(name.into());
85
86 if props.contains_key(MEMORY_CATALOG_WAREHOUSE) {
87 self.config.warehouse = props
88 .get(MEMORY_CATALOG_WAREHOUSE)
89 .cloned()
90 .unwrap_or_default()
91 }
92
93 self.config.props = props
95 .into_iter()
96 .filter(|(k, _)| k != MEMORY_CATALOG_WAREHOUSE)
97 .collect();
98
99 let result = {
100 if self.config.name.is_none() {
101 Err(Error::new(
102 ErrorKind::DataInvalid,
103 "Catalog name is required",
104 ))
105 } else if self.config.warehouse.is_empty() {
106 Err(Error::new(
107 ErrorKind::DataInvalid,
108 "Catalog warehouse is required",
109 ))
110 } else {
111 let runtime = self.runtime.unwrap_or_else(Runtime::current);
112 MemoryCatalog::new(self.config, self.storage_factory, runtime)
113 }
114 };
115
116 std::future::ready(result)
117 }
118}
119
120#[derive(Clone, Debug)]
121pub(crate) struct MemoryCatalogConfig {
122 name: Option<String>,
123 warehouse: String,
124 props: HashMap<String, String>,
125}
126
127#[derive(Debug)]
129pub struct MemoryCatalog {
130 root_namespace_state: Mutex<NamespaceState>,
131 file_io: FileIO,
132 warehouse_location: String,
133 runtime: Runtime,
134}
135
136impl MemoryCatalog {
137 fn new(
139 config: MemoryCatalogConfig,
140 storage_factory: Option<Arc<dyn StorageFactory>>,
141 runtime: Runtime,
142 ) -> Result<Self> {
143 let factory = storage_factory.unwrap_or_else(|| Arc::new(MemoryStorageFactory));
145
146 Ok(Self {
147 root_namespace_state: Mutex::new(NamespaceState::default()),
148 file_io: FileIOBuilder::new(factory).with_props(config.props).build(),
149 warehouse_location: config.warehouse,
150 runtime,
151 })
152 }
153
154 async fn load_table_from_locked_state(
156 &self,
157 table_ident: &TableIdent,
158 root_namespace_state: &MutexGuard<'_, NamespaceState>,
159 ) -> Result<Table> {
160 let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?;
161 let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
162
163 Table::builder()
164 .identifier(table_ident.clone())
165 .metadata(metadata)
166 .metadata_location(metadata_location.to_string())
167 .file_io(self.file_io.clone())
168 .runtime(self.runtime.clone())
169 .build()
170 }
171}
172
173#[async_trait]
174impl Catalog for MemoryCatalog {
175 async fn list_namespaces(
177 &self,
178 maybe_parent: Option<&NamespaceIdent>,
179 ) -> Result<Vec<NamespaceIdent>> {
180 let root_namespace_state = self.root_namespace_state.lock().await;
181
182 match maybe_parent {
183 None => {
184 let namespaces = root_namespace_state
185 .list_top_level_namespaces()
186 .into_iter()
187 .map(|str| NamespaceIdent::new(str.to_string()))
188 .collect_vec();
189
190 Ok(namespaces)
191 }
192 Some(parent_namespace_ident) => {
193 let namespaces = root_namespace_state
194 .list_namespaces_under(parent_namespace_ident)?
195 .into_iter()
196 .map(|name| {
197 let mut names = parent_namespace_ident.iter().cloned().collect::<Vec<_>>();
198 names.push(name.to_string());
199 NamespaceIdent::from_vec(names)
200 })
201 .collect::<Result<Vec<_>>>()?;
202
203 Ok(namespaces)
204 }
205 }
206 }
207
208 async fn create_namespace(
210 &self,
211 namespace_ident: &NamespaceIdent,
212 properties: HashMap<String, String>,
213 ) -> Result<Namespace> {
214 let mut root_namespace_state = self.root_namespace_state.lock().await;
215
216 root_namespace_state.insert_new_namespace(namespace_ident, properties.clone())?;
217 let namespace = Namespace::with_properties(namespace_ident.clone(), properties);
218
219 Ok(namespace)
220 }
221
222 async fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<Namespace> {
224 let root_namespace_state = self.root_namespace_state.lock().await;
225
226 let namespace = Namespace::with_properties(
227 namespace_ident.clone(),
228 root_namespace_state
229 .get_properties(namespace_ident)?
230 .clone(),
231 );
232
233 Ok(namespace)
234 }
235
236 async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> Result<bool> {
238 let guarded_namespaces = self.root_namespace_state.lock().await;
239
240 Ok(guarded_namespaces.namespace_exists(namespace_ident))
241 }
242
243 async fn update_namespace(
249 &self,
250 namespace_ident: &NamespaceIdent,
251 properties: HashMap<String, String>,
252 ) -> Result<()> {
253 let mut root_namespace_state = self.root_namespace_state.lock().await;
254
255 root_namespace_state.replace_properties(namespace_ident, properties)
256 }
257
258 async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<()> {
260 let mut root_namespace_state = self.root_namespace_state.lock().await;
261
262 root_namespace_state.remove_existing_namespace(namespace_ident)
263 }
264
265 async fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result<Vec<TableIdent>> {
267 let root_namespace_state = self.root_namespace_state.lock().await;
268
269 let table_names = root_namespace_state.list_tables(namespace_ident)?;
270 let table_idents = table_names
271 .into_iter()
272 .map(|table_name| TableIdent::new(namespace_ident.clone(), table_name.clone()))
273 .collect_vec();
274
275 Ok(table_idents)
276 }
277
278 async fn create_table(
280 &self,
281 namespace_ident: &NamespaceIdent,
282 table_creation: TableCreation,
283 ) -> Result<Table> {
284 let mut root_namespace_state = self.root_namespace_state.lock().await;
285
286 let table_name = table_creation.name.clone();
287 let table_ident = TableIdent::new(namespace_ident.clone(), table_name);
288
289 let (table_creation, location) = match table_creation.location.clone() {
290 Some(location) => (table_creation, location),
291 None => {
292 let namespace_properties = root_namespace_state.get_properties(namespace_ident)?;
293 let location_prefix = match namespace_properties.get(LOCATION) {
294 Some(namespace_location) => namespace_location.clone(),
295 None => format!("{}/{}", self.warehouse_location, namespace_ident.join("/")),
296 };
297
298 let location = format!("{}/{}", location_prefix, table_ident.name());
299
300 let new_table_creation = TableCreation {
301 location: Some(location.clone()),
302 ..table_creation
303 };
304
305 (new_table_creation, location)
306 }
307 };
308
309 let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
310 .build()?
311 .metadata;
312 let metadata_location = MetadataLocation::new_with_metadata(location, &metadata);
313
314 metadata.write_to(&self.file_io, &metadata_location).await?;
315
316 root_namespace_state.insert_new_table(&table_ident, metadata_location.to_string())?;
317
318 Table::builder()
319 .file_io(self.file_io.clone())
320 .metadata_location(metadata_location.to_string())
321 .metadata(metadata)
322 .identifier(table_ident)
323 .runtime(self.runtime.clone())
324 .build()
325 }
326
327 async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
329 let root_namespace_state = self.root_namespace_state.lock().await;
330
331 self.load_table_from_locked_state(table_ident, &root_namespace_state)
332 .await
333 }
334
335 async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> {
337 let mut root_namespace_state = self.root_namespace_state.lock().await;
338
339 root_namespace_state.remove_existing_table(table_ident)?;
340 Ok(())
341 }
342
343 async fn purge_table(&self, table_ident: &TableIdent) -> Result<()> {
344 let table_info = self.load_table(table_ident).await?;
345 self.drop_table(table_ident).await?;
346 crate::catalog::utils::drop_table_data(
347 table_info.file_io(),
348 table_info.metadata(),
349 table_info.metadata_location(),
350 )
351 .await
352 }
353
354 async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
356 let root_namespace_state = self.root_namespace_state.lock().await;
357
358 root_namespace_state.table_exists(table_ident)
359 }
360
361 async fn rename_table(
363 &self,
364 src_table_ident: &TableIdent,
365 dst_table_ident: &TableIdent,
366 ) -> Result<()> {
367 let mut root_namespace_state = self.root_namespace_state.lock().await;
368
369 let mut new_root_namespace_state = root_namespace_state.clone();
370 let metadata_location = new_root_namespace_state
371 .get_existing_table_location(src_table_ident)?
372 .clone();
373 new_root_namespace_state.remove_existing_table(src_table_ident)?;
374 new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?;
375 *root_namespace_state = new_root_namespace_state;
376
377 Ok(())
378 }
379
380 async fn register_table(
381 &self,
382 table_ident: &TableIdent,
383 metadata_location: String,
384 ) -> Result<Table> {
385 let mut root_namespace_state = self.root_namespace_state.lock().await;
386 root_namespace_state.insert_new_table(&table_ident.clone(), metadata_location.clone())?;
387
388 let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
389
390 Table::builder()
391 .file_io(self.file_io.clone())
392 .metadata_location(metadata_location)
393 .metadata(metadata)
394 .identifier(table_ident.clone())
395 .runtime(self.runtime.clone())
396 .build()
397 }
398
399 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
401 let mut root_namespace_state = self.root_namespace_state.lock().await;
402
403 let current_table = self
404 .load_table_from_locked_state(commit.identifier(), &root_namespace_state)
405 .await?;
406
407 let staged_table = commit.apply(current_table)?;
409
410 let metadata_location =
412 MetadataLocation::from_str(staged_table.metadata_location_result()?)?;
413 staged_table
414 .metadata()
415 .write_to(staged_table.file_io(), &metadata_location)
416 .await?;
417
418 let updated_table = root_namespace_state.commit_table_update(staged_table)?;
420
421 Ok(updated_table)
422 }
423}
424
425#[cfg(test)]
426pub(crate) mod tests {
427 use std::collections::HashSet;
428 use std::hash::Hash;
429 use std::iter::FromIterator;
430 use std::vec;
431
432 use regex::Regex;
433 use tempfile::TempDir;
434
435 use super::*;
436 use crate::io::FileIO;
437 use crate::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
438 use crate::test_utils::test_runtime;
439 use crate::transaction::{ApplyTransactionAction, Transaction};
440
441 fn temp_path() -> String {
442 let temp_dir = TempDir::new().unwrap();
443 temp_dir.path().to_str().unwrap().to_string()
444 }
445
446 pub(crate) async fn new_memory_catalog() -> impl Catalog {
447 let warehouse_location = temp_path();
448 MemoryCatalogBuilder::default()
449 .load(
450 "memory",
451 HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_location)]),
452 )
453 .await
454 .unwrap()
455 }
456
457 async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
458 let _ = catalog
459 .create_namespace(namespace_ident, HashMap::new())
460 .await
461 .unwrap();
462 }
463
464 async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
465 for namespace_ident in namespace_idents {
466 let _ = create_namespace(catalog, namespace_ident).await;
467 }
468 }
469
470 fn to_set<T: Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
471 HashSet::from_iter(vec)
472 }
473
474 fn simple_table_schema() -> Schema {
475 Schema::builder()
476 .with_fields(vec![
477 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
478 ])
479 .build()
480 .unwrap()
481 }
482
483 async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) -> Table {
484 catalog
485 .create_table(
486 &table_ident.namespace,
487 TableCreation::builder()
488 .name(table_ident.name().into())
489 .schema(simple_table_schema())
490 .build(),
491 )
492 .await
493 .unwrap()
494 }
495
496 async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
497 for table_ident in table_idents {
498 create_table(catalog, table_ident).await;
499 }
500 }
501
502 async fn create_table_with_namespace<C: Catalog>(catalog: &C) -> Table {
503 let namespace_ident = NamespaceIdent::new("abc".into());
504 create_namespace(catalog, &namespace_ident).await;
505
506 let table_ident = TableIdent::new(namespace_ident, "test".to_string());
507 create_table(catalog, &table_ident).await
508 }
509
510 fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
511 assert_eq!(table.identifier(), expected_table_ident);
512
513 let metadata = table.metadata();
514
515 assert_eq!(metadata.current_schema().as_ref(), expected_schema);
516
517 let expected_partition_spec = PartitionSpec::builder((*expected_schema).clone())
518 .with_spec_id(0)
519 .build()
520 .unwrap();
521
522 assert_eq!(
523 metadata
524 .partition_specs_iter()
525 .map(|p| p.as_ref())
526 .collect_vec(),
527 vec![&expected_partition_spec]
528 );
529
530 let expected_sorted_order = SortOrder::builder()
531 .with_order_id(0)
532 .with_fields(vec![])
533 .build(expected_schema)
534 .unwrap();
535
536 assert_eq!(
537 metadata
538 .sort_orders_iter()
539 .map(|s| s.as_ref())
540 .collect_vec(),
541 vec![&expected_sorted_order]
542 );
543
544 assert_eq!(metadata.properties(), &HashMap::new());
545
546 assert!(!table.readonly());
547 }
548
549 const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
550
551 fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
552 let actual = table.metadata_location().unwrap().to_string();
553 let regex = Regex::new(regex_str).unwrap();
554 assert!(
555 regex.is_match(&actual),
556 "Expected metadata location to match regex, but got location: {actual} and regex: {regex}"
557 )
558 }
559
560 #[tokio::test]
561 async fn test_list_namespaces_returns_empty_vector() {
562 let catalog = new_memory_catalog().await;
563
564 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
565 }
566
567 #[tokio::test]
568 async fn test_list_namespaces_returns_single_namespace() {
569 let catalog = new_memory_catalog().await;
570 let namespace_ident = NamespaceIdent::new("abc".into());
571 create_namespace(&catalog, &namespace_ident).await;
572
573 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
574 namespace_ident
575 ]);
576 }
577
578 #[tokio::test]
579 async fn test_list_namespaces_returns_multiple_namespaces() {
580 let catalog = new_memory_catalog().await;
581 let namespace_ident_1 = NamespaceIdent::new("a".into());
582 let namespace_ident_2 = NamespaceIdent::new("b".into());
583 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
584
585 assert_eq!(
586 to_set(catalog.list_namespaces(None).await.unwrap()),
587 to_set(vec![namespace_ident_1, namespace_ident_2])
588 );
589 }
590
591 #[tokio::test]
592 async fn test_list_namespaces_returns_only_top_level_namespaces() {
593 let catalog = new_memory_catalog().await;
594 let namespace_ident_1 = NamespaceIdent::new("a".into());
595 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
596 let namespace_ident_3 = NamespaceIdent::new("b".into());
597 create_namespaces(&catalog, &vec![
598 &namespace_ident_1,
599 &namespace_ident_2,
600 &namespace_ident_3,
601 ])
602 .await;
603
604 assert_eq!(
605 to_set(catalog.list_namespaces(None).await.unwrap()),
606 to_set(vec![namespace_ident_1, namespace_ident_3])
607 );
608 }
609
610 #[tokio::test]
611 async fn test_list_namespaces_returns_no_namespaces_under_parent() {
612 let catalog = new_memory_catalog().await;
613 let namespace_ident_1 = NamespaceIdent::new("a".into());
614 let namespace_ident_2 = NamespaceIdent::new("b".into());
615 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
616
617 assert_eq!(
618 catalog
619 .list_namespaces(Some(&namespace_ident_1))
620 .await
621 .unwrap(),
622 vec![]
623 );
624 }
625
626 #[tokio::test]
627 async fn test_list_namespaces_returns_namespace_under_parent() {
628 let catalog = new_memory_catalog().await;
629 let namespace_ident_1 = NamespaceIdent::new("a".into());
630 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
631 let namespace_ident_3 = NamespaceIdent::new("c".into());
632 create_namespaces(&catalog, &vec![
633 &namespace_ident_1,
634 &namespace_ident_2,
635 &namespace_ident_3,
636 ])
637 .await;
638
639 assert_eq!(
640 to_set(catalog.list_namespaces(None).await.unwrap()),
641 to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
642 );
643
644 assert_eq!(
645 catalog
646 .list_namespaces(Some(&namespace_ident_1))
647 .await
648 .unwrap(),
649 vec![namespace_ident_2]
650 );
651 }
652
653 #[tokio::test]
654 async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
655 let catalog = new_memory_catalog().await;
656 let namespace_ident_1 = NamespaceIdent::new("a".to_string());
657 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
658 let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
659 let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
660 let namespace_ident_5 = NamespaceIdent::new("b".into());
661 create_namespaces(&catalog, &vec![
662 &namespace_ident_1,
663 &namespace_ident_2,
664 &namespace_ident_3,
665 &namespace_ident_4,
666 &namespace_ident_5,
667 ])
668 .await;
669
670 assert_eq!(
671 to_set(
672 catalog
673 .list_namespaces(Some(&namespace_ident_1))
674 .await
675 .unwrap()
676 ),
677 to_set(vec![
678 namespace_ident_2,
679 namespace_ident_3,
680 namespace_ident_4,
681 ])
682 );
683 }
684
685 #[tokio::test]
686 async fn test_namespace_exists_returns_false() {
687 let catalog = new_memory_catalog().await;
688 let namespace_ident = NamespaceIdent::new("a".into());
689 create_namespace(&catalog, &namespace_ident).await;
690
691 assert!(
692 !catalog
693 .namespace_exists(&NamespaceIdent::new("b".into()))
694 .await
695 .unwrap()
696 );
697 }
698
699 #[tokio::test]
700 async fn test_namespace_exists_returns_true() {
701 let catalog = new_memory_catalog().await;
702 let namespace_ident = NamespaceIdent::new("a".into());
703 create_namespace(&catalog, &namespace_ident).await;
704
705 assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
706 }
707
708 #[tokio::test]
709 async fn test_create_namespace_with_empty_properties() {
710 let catalog = new_memory_catalog().await;
711 let namespace_ident = NamespaceIdent::new("a".into());
712
713 assert_eq!(
714 catalog
715 .create_namespace(&namespace_ident, HashMap::new())
716 .await
717 .unwrap(),
718 Namespace::new(namespace_ident.clone())
719 );
720
721 assert_eq!(
722 catalog.get_namespace(&namespace_ident).await.unwrap(),
723 Namespace::with_properties(namespace_ident, HashMap::new())
724 );
725 }
726
727 #[tokio::test]
728 async fn test_create_namespace_with_properties() {
729 let catalog = new_memory_catalog().await;
730 let namespace_ident = NamespaceIdent::new("abc".into());
731
732 let mut properties: HashMap<String, String> = HashMap::new();
733 properties.insert("k".into(), "v".into());
734
735 assert_eq!(
736 catalog
737 .create_namespace(&namespace_ident, properties.clone())
738 .await
739 .unwrap(),
740 Namespace::with_properties(namespace_ident.clone(), properties.clone())
741 );
742
743 assert_eq!(
744 catalog.get_namespace(&namespace_ident).await.unwrap(),
745 Namespace::with_properties(namespace_ident, properties)
746 );
747 }
748
749 #[tokio::test]
750 async fn test_create_namespace_throws_error_if_namespace_already_exists() {
751 let catalog = new_memory_catalog().await;
752 let namespace_ident = NamespaceIdent::new("a".into());
753 create_namespace(&catalog, &namespace_ident).await;
754
755 assert_eq!(
756 catalog
757 .create_namespace(&namespace_ident, HashMap::new())
758 .await
759 .unwrap_err()
760 .to_string(),
761 format!(
762 "NamespaceAlreadyExists => Cannot create namespace {:?}. Namespace already exists.",
763 &namespace_ident
764 )
765 );
766
767 assert_eq!(
768 catalog.get_namespace(&namespace_ident).await.unwrap(),
769 Namespace::with_properties(namespace_ident, HashMap::new())
770 );
771 }
772
773 #[tokio::test]
774 async fn test_create_nested_namespace() {
775 let catalog = new_memory_catalog().await;
776 let parent_namespace_ident = NamespaceIdent::new("a".into());
777 create_namespace(&catalog, &parent_namespace_ident).await;
778
779 let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
780
781 assert_eq!(
782 catalog
783 .create_namespace(&child_namespace_ident, HashMap::new())
784 .await
785 .unwrap(),
786 Namespace::new(child_namespace_ident.clone())
787 );
788
789 assert_eq!(
790 catalog.get_namespace(&child_namespace_ident).await.unwrap(),
791 Namespace::with_properties(child_namespace_ident, HashMap::new())
792 );
793 }
794
795 #[tokio::test]
796 async fn test_create_deeply_nested_namespace() {
797 let catalog = new_memory_catalog().await;
798 let namespace_ident_a = NamespaceIdent::new("a".into());
799 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
800 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
801
802 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
803
804 assert_eq!(
805 catalog
806 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
807 .await
808 .unwrap(),
809 Namespace::new(namespace_ident_a_b_c.clone())
810 );
811
812 assert_eq!(
813 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
814 Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
815 );
816 }
817
818 #[tokio::test]
819 async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() {
820 let catalog = new_memory_catalog().await;
821
822 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
823
824 assert_eq!(
825 catalog
826 .create_namespace(&nested_namespace_ident, HashMap::new())
827 .await
828 .unwrap_err()
829 .to_string(),
830 format!(
831 "NamespaceNotFound => No such namespace: {:?}",
832 NamespaceIdent::new("a".into())
833 )
834 );
835
836 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
837 }
838
839 #[tokio::test]
840 async fn test_create_deeply_nested_namespace_throws_error_if_intermediate_namespace_doesnt_exist()
841 {
842 let catalog = new_memory_catalog().await;
843
844 let namespace_ident_a = NamespaceIdent::new("a".into());
845 create_namespace(&catalog, &namespace_ident_a).await;
846
847 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
848
849 assert_eq!(
850 catalog
851 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
852 .await
853 .unwrap_err()
854 .to_string(),
855 format!(
856 "NamespaceNotFound => No such namespace: {:?}",
857 NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()
858 )
859 );
860
861 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
862 namespace_ident_a.clone()
863 ]);
864
865 assert_eq!(
866 catalog
867 .list_namespaces(Some(&namespace_ident_a))
868 .await
869 .unwrap(),
870 vec![]
871 );
872 }
873
874 #[tokio::test]
875 async fn test_get_namespace() {
876 let catalog = new_memory_catalog().await;
877 let namespace_ident = NamespaceIdent::new("abc".into());
878
879 let mut properties: HashMap<String, String> = HashMap::new();
880 properties.insert("k".into(), "v".into());
881 let _ = catalog
882 .create_namespace(&namespace_ident, properties.clone())
883 .await
884 .unwrap();
885
886 assert_eq!(
887 catalog.get_namespace(&namespace_ident).await.unwrap(),
888 Namespace::with_properties(namespace_ident, properties)
889 )
890 }
891
892 #[tokio::test]
893 async fn test_get_nested_namespace() {
894 let catalog = new_memory_catalog().await;
895 let namespace_ident_a = NamespaceIdent::new("a".into());
896 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
897 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
898
899 assert_eq!(
900 catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
901 Namespace::with_properties(namespace_ident_a_b, HashMap::new())
902 );
903 }
904
905 #[tokio::test]
906 async fn test_get_deeply_nested_namespace() {
907 let catalog = new_memory_catalog().await;
908 let namespace_ident_a = NamespaceIdent::new("a".into());
909 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
910 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
911 create_namespaces(&catalog, &vec![
912 &namespace_ident_a,
913 &namespace_ident_a_b,
914 &namespace_ident_a_b_c,
915 ])
916 .await;
917
918 assert_eq!(
919 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
920 Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
921 );
922 }
923
924 #[tokio::test]
925 async fn test_get_namespace_throws_error_if_namespace_doesnt_exist() {
926 let catalog = new_memory_catalog().await;
927 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
928
929 let non_existent_namespace_ident = NamespaceIdent::new("b".into());
930 assert_eq!(
931 catalog
932 .get_namespace(&non_existent_namespace_ident)
933 .await
934 .unwrap_err()
935 .to_string(),
936 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
937 )
938 }
939
940 #[tokio::test]
941 async fn test_update_namespace() {
942 let catalog = new_memory_catalog().await;
943 let namespace_ident = NamespaceIdent::new("abc".into());
944 create_namespace(&catalog, &namespace_ident).await;
945
946 let mut new_properties: HashMap<String, String> = HashMap::new();
947 new_properties.insert("k".into(), "v".into());
948
949 catalog
950 .update_namespace(&namespace_ident, new_properties.clone())
951 .await
952 .unwrap();
953
954 assert_eq!(
955 catalog.get_namespace(&namespace_ident).await.unwrap(),
956 Namespace::with_properties(namespace_ident, new_properties)
957 )
958 }
959
960 #[tokio::test]
961 async fn test_update_nested_namespace() {
962 let catalog = new_memory_catalog().await;
963 let namespace_ident_a = NamespaceIdent::new("a".into());
964 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
965 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
966
967 let mut new_properties = HashMap::new();
968 new_properties.insert("k".into(), "v".into());
969
970 catalog
971 .update_namespace(&namespace_ident_a_b, new_properties.clone())
972 .await
973 .unwrap();
974
975 assert_eq!(
976 catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
977 Namespace::with_properties(namespace_ident_a_b, new_properties)
978 );
979 }
980
981 #[tokio::test]
982 async fn test_update_deeply_nested_namespace() {
983 let catalog = new_memory_catalog().await;
984 let namespace_ident_a = NamespaceIdent::new("a".into());
985 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
986 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
987 create_namespaces(&catalog, &vec![
988 &namespace_ident_a,
989 &namespace_ident_a_b,
990 &namespace_ident_a_b_c,
991 ])
992 .await;
993
994 let mut new_properties = HashMap::new();
995 new_properties.insert("k".into(), "v".into());
996
997 catalog
998 .update_namespace(&namespace_ident_a_b_c, new_properties.clone())
999 .await
1000 .unwrap();
1001
1002 assert_eq!(
1003 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
1004 Namespace::with_properties(namespace_ident_a_b_c, new_properties)
1005 );
1006 }
1007
1008 #[tokio::test]
1009 async fn test_update_namespace_throws_error_if_namespace_doesnt_exist() {
1010 let catalog = new_memory_catalog().await;
1011 create_namespace(&catalog, &NamespaceIdent::new("abc".into())).await;
1012
1013 let non_existent_namespace_ident = NamespaceIdent::new("def".into());
1014 assert_eq!(
1015 catalog
1016 .update_namespace(&non_existent_namespace_ident, HashMap::new())
1017 .await
1018 .unwrap_err()
1019 .to_string(),
1020 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1021 )
1022 }
1023
1024 #[tokio::test]
1025 async fn test_drop_namespace() {
1026 let catalog = new_memory_catalog().await;
1027 let namespace_ident = NamespaceIdent::new("abc".into());
1028 create_namespace(&catalog, &namespace_ident).await;
1029
1030 catalog.drop_namespace(&namespace_ident).await.unwrap();
1031
1032 assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
1033 }
1034
1035 #[tokio::test]
1036 async fn test_drop_nested_namespace() {
1037 let catalog = new_memory_catalog().await;
1038 let namespace_ident_a = NamespaceIdent::new("a".into());
1039 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1040 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1041
1042 catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1043
1044 assert!(
1045 !catalog
1046 .namespace_exists(&namespace_ident_a_b)
1047 .await
1048 .unwrap()
1049 );
1050
1051 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1052 }
1053
1054 #[tokio::test]
1055 async fn test_drop_deeply_nested_namespace() {
1056 let catalog = new_memory_catalog().await;
1057 let namespace_ident_a = NamespaceIdent::new("a".into());
1058 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1059 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1060 create_namespaces(&catalog, &vec![
1061 &namespace_ident_a,
1062 &namespace_ident_a_b,
1063 &namespace_ident_a_b_c,
1064 ])
1065 .await;
1066
1067 catalog
1068 .drop_namespace(&namespace_ident_a_b_c)
1069 .await
1070 .unwrap();
1071
1072 assert!(
1073 !catalog
1074 .namespace_exists(&namespace_ident_a_b_c)
1075 .await
1076 .unwrap()
1077 );
1078
1079 assert!(
1080 catalog
1081 .namespace_exists(&namespace_ident_a_b)
1082 .await
1083 .unwrap()
1084 );
1085
1086 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1087 }
1088
1089 #[tokio::test]
1090 async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1091 let catalog = new_memory_catalog().await;
1092
1093 let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1094 assert_eq!(
1095 catalog
1096 .drop_namespace(&non_existent_namespace_ident)
1097 .await
1098 .unwrap_err()
1099 .to_string(),
1100 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1101 )
1102 }
1103
1104 #[tokio::test]
1105 async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1106 let catalog = new_memory_catalog().await;
1107 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1108
1109 let non_existent_namespace_ident =
1110 NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1111 assert_eq!(
1112 catalog
1113 .drop_namespace(&non_existent_namespace_ident)
1114 .await
1115 .unwrap_err()
1116 .to_string(),
1117 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1118 )
1119 }
1120
1121 #[tokio::test]
1122 async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() {
1123 let catalog = new_memory_catalog().await;
1124 let namespace_ident_a = NamespaceIdent::new("a".into());
1125 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1126 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1127
1128 catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1129
1130 assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1131
1132 assert!(
1133 !catalog
1134 .namespace_exists(&namespace_ident_a_b)
1135 .await
1136 .unwrap()
1137 );
1138 }
1139
1140 #[tokio::test]
1141 async fn test_create_table_with_location() {
1142 let tmp_dir = TempDir::new().unwrap();
1143 let catalog = new_memory_catalog().await;
1144 let namespace_ident = NamespaceIdent::new("a".into());
1145 create_namespace(&catalog, &namespace_ident).await;
1146
1147 let table_name = "abc";
1148 let location = tmp_dir.path().to_str().unwrap().to_string();
1149 let table_creation = TableCreation::builder()
1150 .name(table_name.into())
1151 .location(location.clone())
1152 .schema(simple_table_schema())
1153 .build();
1154
1155 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1156
1157 assert_table_eq(
1158 &catalog
1159 .create_table(&namespace_ident, table_creation)
1160 .await
1161 .unwrap(),
1162 &expected_table_ident,
1163 &simple_table_schema(),
1164 );
1165
1166 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1167
1168 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1169
1170 assert!(
1171 table
1172 .metadata_location()
1173 .unwrap()
1174 .to_string()
1175 .starts_with(&location)
1176 )
1177 }
1178
1179 #[tokio::test]
1180 async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1181 let warehouse_location = temp_path();
1182 let catalog = MemoryCatalogBuilder::default()
1183 .load(
1184 "memory",
1185 HashMap::from([(
1186 MEMORY_CATALOG_WAREHOUSE.to_string(),
1187 warehouse_location.clone(),
1188 )]),
1189 )
1190 .await
1191 .unwrap();
1192
1193 let namespace_ident = NamespaceIdent::new("a".into());
1194 let mut namespace_properties = HashMap::new();
1195 let namespace_location = temp_path();
1196 namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1197 catalog
1198 .create_namespace(&namespace_ident, namespace_properties)
1199 .await
1200 .unwrap();
1201
1202 let table_name = "tbl1";
1203 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1204 let expected_table_metadata_location_regex =
1205 format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1206
1207 let table = catalog
1208 .create_table(
1209 &namespace_ident,
1210 TableCreation::builder()
1211 .name(table_name.into())
1212 .schema(simple_table_schema())
1213 .build(),
1215 )
1216 .await
1217 .unwrap();
1218 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1219 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1220
1221 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1222 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1223 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1224 }
1225
1226 #[tokio::test]
1227 async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1228 {
1229 let warehouse_location = temp_path();
1230 let catalog = MemoryCatalogBuilder::default()
1231 .load(
1232 "memory",
1233 HashMap::from([(
1234 MEMORY_CATALOG_WAREHOUSE.to_string(),
1235 warehouse_location.clone(),
1236 )]),
1237 )
1238 .await
1239 .unwrap();
1240
1241 let namespace_ident = NamespaceIdent::new("a".into());
1242 let mut namespace_properties = HashMap::new();
1243 let namespace_location = temp_path();
1244 namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1245 catalog
1246 .create_namespace(&namespace_ident, namespace_properties)
1247 .await
1248 .unwrap();
1249
1250 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1251 let mut nested_namespace_properties = HashMap::new();
1252 let nested_namespace_location = temp_path();
1253 nested_namespace_properties
1254 .insert(LOCATION.to_string(), nested_namespace_location.to_string());
1255 catalog
1256 .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1257 .await
1258 .unwrap();
1259
1260 let table_name = "tbl1";
1261 let expected_table_ident =
1262 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1263 let expected_table_metadata_location_regex = format!(
1264 "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1265 );
1266
1267 let table = catalog
1268 .create_table(
1269 &nested_namespace_ident,
1270 TableCreation::builder()
1271 .name(table_name.into())
1272 .schema(simple_table_schema())
1273 .build(),
1275 )
1276 .await
1277 .unwrap();
1278 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1279 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1280
1281 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1282 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1283 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1284 }
1285
1286 #[tokio::test]
1287 async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1288 {
1289 let warehouse_location = temp_path();
1290 let catalog = MemoryCatalogBuilder::default()
1291 .load(
1292 "memory",
1293 HashMap::from([(
1294 MEMORY_CATALOG_WAREHOUSE.to_string(),
1295 warehouse_location.clone(),
1296 )]),
1297 )
1298 .await
1299 .unwrap();
1300
1301 let namespace_ident = NamespaceIdent::new("a".into());
1302 let namespace_properties = HashMap::new();
1304 catalog
1305 .create_namespace(&namespace_ident, namespace_properties)
1306 .await
1307 .unwrap();
1308
1309 let table_name = "tbl1";
1310 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1311 let expected_table_metadata_location_regex =
1312 format!("^{warehouse_location}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1313
1314 let table = catalog
1315 .create_table(
1316 &namespace_ident,
1317 TableCreation::builder()
1318 .name(table_name.into())
1319 .schema(simple_table_schema())
1320 .build(),
1322 )
1323 .await
1324 .unwrap();
1325 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1326 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1327
1328 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1329 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1330 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1331 }
1332
1333 #[tokio::test]
1334 async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1335 {
1336 let warehouse_location = temp_path();
1337 let catalog = MemoryCatalogBuilder::default()
1338 .load(
1339 "memory",
1340 HashMap::from([(
1341 MEMORY_CATALOG_WAREHOUSE.to_string(),
1342 warehouse_location.clone(),
1343 )]),
1344 )
1345 .await
1346 .unwrap();
1347
1348 let namespace_ident = NamespaceIdent::new("a".into());
1349 catalog
1350 .create_namespace(&namespace_ident, HashMap::new())
1352 .await
1353 .unwrap();
1354
1355 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1356 catalog
1357 .create_namespace(&nested_namespace_ident, HashMap::new())
1359 .await
1360 .unwrap();
1361
1362 let table_name = "tbl1";
1363 let expected_table_ident =
1364 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1365 let expected_table_metadata_location_regex = format!(
1366 "^{warehouse_location}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$"
1367 );
1368
1369 let table = catalog
1370 .create_table(
1371 &nested_namespace_ident,
1372 TableCreation::builder()
1373 .name(table_name.into())
1374 .schema(simple_table_schema())
1375 .build(),
1377 )
1378 .await
1379 .unwrap();
1380 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1381 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1382
1383 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1384 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1385 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1386 }
1387
1388 #[tokio::test]
1389 async fn test_create_table_throws_error_if_table_location_and_namespace_location_and_warehouse_location_are_missing()
1390 {
1391 let catalog = MemoryCatalogBuilder::default()
1392 .load("memory", HashMap::from([]))
1393 .await;
1394
1395 assert!(catalog.is_err());
1396 assert_eq!(
1397 catalog.unwrap_err().to_string(),
1398 "DataInvalid => Catalog warehouse is required"
1399 );
1400 }
1401
1402 #[tokio::test]
1403 async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1404 let catalog = new_memory_catalog().await;
1405 let namespace_ident = NamespaceIdent::new("a".into());
1406 create_namespace(&catalog, &namespace_ident).await;
1407 let table_name = "tbl1";
1408 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1409 create_table(&catalog, &table_ident).await;
1410
1411 let tmp_dir = TempDir::new().unwrap();
1412 let location = tmp_dir.path().to_str().unwrap().to_string();
1413
1414 assert_eq!(
1415 catalog
1416 .create_table(
1417 &namespace_ident,
1418 TableCreation::builder()
1419 .name(table_name.into())
1420 .schema(simple_table_schema())
1421 .location(location)
1422 .build()
1423 )
1424 .await
1425 .unwrap_err()
1426 .to_string(),
1427 format!(
1428 "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1429 &table_ident
1430 )
1431 );
1432 }
1433
1434 #[tokio::test]
1435 async fn test_list_tables_returns_empty_vector() {
1436 let catalog = new_memory_catalog().await;
1437 let namespace_ident = NamespaceIdent::new("a".into());
1438 create_namespace(&catalog, &namespace_ident).await;
1439
1440 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1441 }
1442
1443 #[tokio::test]
1444 async fn test_list_tables_returns_a_single_table() {
1445 let catalog = new_memory_catalog().await;
1446 let namespace_ident = NamespaceIdent::new("n1".into());
1447 create_namespace(&catalog, &namespace_ident).await;
1448
1449 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1450 create_table(&catalog, &table_ident).await;
1451
1452 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1453 table_ident
1454 ]);
1455 }
1456
1457 #[tokio::test]
1458 async fn test_list_tables_returns_multiple_tables() {
1459 let catalog = new_memory_catalog().await;
1460 let namespace_ident = NamespaceIdent::new("n1".into());
1461 create_namespace(&catalog, &namespace_ident).await;
1462
1463 let table_ident_1 = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1464 let table_ident_2 = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1465 let _ = create_tables(&catalog, vec![&table_ident_1, &table_ident_2]).await;
1466
1467 assert_eq!(
1468 to_set(catalog.list_tables(&namespace_ident).await.unwrap()),
1469 to_set(vec![table_ident_1, table_ident_2])
1470 );
1471 }
1472
1473 #[tokio::test]
1474 async fn test_list_tables_returns_tables_from_correct_namespace() {
1475 let catalog = new_memory_catalog().await;
1476 let namespace_ident_1 = NamespaceIdent::new("n1".into());
1477 let namespace_ident_2 = NamespaceIdent::new("n2".into());
1478 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1479
1480 let table_ident_1 = TableIdent::new(namespace_ident_1.clone(), "tbl1".into());
1481 let table_ident_2 = TableIdent::new(namespace_ident_1.clone(), "tbl2".into());
1482 let table_ident_3 = TableIdent::new(namespace_ident_2.clone(), "tbl1".into());
1483 let _ = create_tables(&catalog, vec![
1484 &table_ident_1,
1485 &table_ident_2,
1486 &table_ident_3,
1487 ])
1488 .await;
1489
1490 assert_eq!(
1491 to_set(catalog.list_tables(&namespace_ident_1).await.unwrap()),
1492 to_set(vec![table_ident_1, table_ident_2])
1493 );
1494
1495 assert_eq!(
1496 to_set(catalog.list_tables(&namespace_ident_2).await.unwrap()),
1497 to_set(vec![table_ident_3])
1498 );
1499 }
1500
1501 #[tokio::test]
1502 async fn test_list_tables_returns_table_under_nested_namespace() {
1503 let catalog = new_memory_catalog().await;
1504 let namespace_ident_a = NamespaceIdent::new("a".into());
1505 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1506 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1507
1508 let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1509 create_table(&catalog, &table_ident).await;
1510
1511 assert_eq!(
1512 catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1513 vec![table_ident]
1514 );
1515 }
1516
1517 #[tokio::test]
1518 async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1519 let catalog = new_memory_catalog().await;
1520
1521 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1522
1523 assert_eq!(
1524 catalog
1525 .list_tables(&non_existent_namespace_ident)
1526 .await
1527 .unwrap_err()
1528 .to_string(),
1529 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1530 );
1531 }
1532
1533 #[tokio::test]
1534 async fn test_drop_table() {
1535 let catalog = new_memory_catalog().await;
1536 let namespace_ident = NamespaceIdent::new("n1".into());
1537 create_namespace(&catalog, &namespace_ident).await;
1538 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1539 create_table(&catalog, &table_ident).await;
1540
1541 catalog.drop_table(&table_ident).await.unwrap();
1542 }
1543
1544 #[tokio::test]
1545 async fn test_drop_table_drops_table_under_nested_namespace() {
1546 let catalog = new_memory_catalog().await;
1547 let namespace_ident_a = NamespaceIdent::new("a".into());
1548 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1549 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1550
1551 let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1552 create_table(&catalog, &table_ident).await;
1553
1554 catalog.drop_table(&table_ident).await.unwrap();
1555
1556 assert_eq!(
1557 catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1558 vec![]
1559 );
1560 }
1561
1562 #[tokio::test]
1563 async fn test_drop_table_throws_error_if_namespace_doesnt_exist() {
1564 let catalog = new_memory_catalog().await;
1565
1566 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1567 let non_existent_table_ident =
1568 TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1569
1570 assert_eq!(
1571 catalog
1572 .drop_table(&non_existent_table_ident)
1573 .await
1574 .unwrap_err()
1575 .to_string(),
1576 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1577 );
1578 }
1579
1580 #[tokio::test]
1581 async fn test_drop_table_throws_error_if_table_doesnt_exist() {
1582 let catalog = new_memory_catalog().await;
1583 let namespace_ident = NamespaceIdent::new("n1".into());
1584 create_namespace(&catalog, &namespace_ident).await;
1585
1586 let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1587
1588 assert_eq!(
1589 catalog
1590 .drop_table(&non_existent_table_ident)
1591 .await
1592 .unwrap_err()
1593 .to_string(),
1594 format!("TableNotFound => No such table: {non_existent_table_ident:?}"),
1595 );
1596 }
1597
1598 #[tokio::test]
1599 async fn test_table_exists_returns_true() {
1600 let catalog = new_memory_catalog().await;
1601 let namespace_ident = NamespaceIdent::new("n1".into());
1602 create_namespace(&catalog, &namespace_ident).await;
1603 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1604 create_table(&catalog, &table_ident).await;
1605
1606 assert!(catalog.table_exists(&table_ident).await.unwrap());
1607 }
1608
1609 #[tokio::test]
1610 async fn test_table_exists_returns_false() {
1611 let catalog = new_memory_catalog().await;
1612 let namespace_ident = NamespaceIdent::new("n1".into());
1613 create_namespace(&catalog, &namespace_ident).await;
1614 let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1615
1616 assert!(
1617 !catalog
1618 .table_exists(&non_existent_table_ident)
1619 .await
1620 .unwrap()
1621 );
1622 }
1623
1624 #[tokio::test]
1625 async fn test_table_exists_under_nested_namespace() {
1626 let catalog = new_memory_catalog().await;
1627 let namespace_ident_a = NamespaceIdent::new("a".into());
1628 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1629 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1630
1631 let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1632 create_table(&catalog, &table_ident).await;
1633
1634 assert!(catalog.table_exists(&table_ident).await.unwrap());
1635
1636 let non_existent_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl2".into());
1637 assert!(
1638 !catalog
1639 .table_exists(&non_existent_table_ident)
1640 .await
1641 .unwrap()
1642 );
1643 }
1644
1645 #[tokio::test]
1646 async fn test_table_exists_throws_error_if_namespace_doesnt_exist() {
1647 let catalog = new_memory_catalog().await;
1648
1649 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1650 let non_existent_table_ident =
1651 TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1652
1653 assert_eq!(
1654 catalog
1655 .table_exists(&non_existent_table_ident)
1656 .await
1657 .unwrap_err()
1658 .to_string(),
1659 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1660 );
1661 }
1662
1663 #[tokio::test]
1664 async fn test_rename_table_in_same_namespace() {
1665 let catalog = new_memory_catalog().await;
1666 let namespace_ident = NamespaceIdent::new("n1".into());
1667 create_namespace(&catalog, &namespace_ident).await;
1668 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1669 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1670 create_table(&catalog, &src_table_ident).await;
1671
1672 catalog
1673 .rename_table(&src_table_ident, &dst_table_ident)
1674 .await
1675 .unwrap();
1676
1677 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1678 dst_table_ident
1679 ],);
1680 }
1681
1682 #[tokio::test]
1683 async fn test_rename_table_across_namespaces() {
1684 let catalog = new_memory_catalog().await;
1685 let src_namespace_ident = NamespaceIdent::new("a".into());
1686 let dst_namespace_ident = NamespaceIdent::new("b".into());
1687 create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
1688 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1689 let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
1690 create_table(&catalog, &src_table_ident).await;
1691
1692 catalog
1693 .rename_table(&src_table_ident, &dst_table_ident)
1694 .await
1695 .unwrap();
1696
1697 assert_eq!(
1698 catalog.list_tables(&src_namespace_ident).await.unwrap(),
1699 vec![],
1700 );
1701
1702 assert_eq!(
1703 catalog.list_tables(&dst_namespace_ident).await.unwrap(),
1704 vec![dst_table_ident],
1705 );
1706 }
1707
1708 #[tokio::test]
1709 async fn test_rename_table_src_table_is_same_as_dst_table() {
1710 let catalog = new_memory_catalog().await;
1711 let namespace_ident = NamespaceIdent::new("n1".into());
1712 create_namespace(&catalog, &namespace_ident).await;
1713 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1714 create_table(&catalog, &table_ident).await;
1715
1716 catalog
1717 .rename_table(&table_ident, &table_ident)
1718 .await
1719 .unwrap();
1720
1721 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1722 table_ident
1723 ],);
1724 }
1725
1726 #[tokio::test]
1727 async fn test_rename_table_across_nested_namespaces() {
1728 let catalog = new_memory_catalog().await;
1729 let namespace_ident_a = NamespaceIdent::new("a".into());
1730 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1731 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1732 create_namespaces(&catalog, &vec![
1733 &namespace_ident_a,
1734 &namespace_ident_a_b,
1735 &namespace_ident_a_b_c,
1736 ])
1737 .await;
1738
1739 let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
1740 create_tables(&catalog, vec![&src_table_ident]).await;
1741
1742 let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1743 catalog
1744 .rename_table(&src_table_ident, &dst_table_ident)
1745 .await
1746 .unwrap();
1747
1748 assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
1749
1750 assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
1751 }
1752
1753 #[tokio::test]
1754 async fn test_rename_table_throws_error_if_src_namespace_doesnt_exist() {
1755 let catalog = new_memory_catalog().await;
1756
1757 let non_existent_src_namespace_ident = NamespaceIdent::new("n1".into());
1758 let src_table_ident =
1759 TableIdent::new(non_existent_src_namespace_ident.clone(), "tbl1".into());
1760
1761 let dst_namespace_ident = NamespaceIdent::new("n2".into());
1762 create_namespace(&catalog, &dst_namespace_ident).await;
1763 let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl1".into());
1764
1765 assert_eq!(
1766 catalog
1767 .rename_table(&src_table_ident, &dst_table_ident)
1768 .await
1769 .unwrap_err()
1770 .to_string(),
1771 format!("NamespaceNotFound => No such namespace: {non_existent_src_namespace_ident:?}"),
1772 );
1773 }
1774
1775 #[tokio::test]
1776 async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
1777 let catalog = new_memory_catalog().await;
1778 let src_namespace_ident = NamespaceIdent::new("n1".into());
1779 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1780 create_namespace(&catalog, &src_namespace_ident).await;
1781 create_table(&catalog, &src_table_ident).await;
1782
1783 let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
1784 let dst_table_ident =
1785 TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
1786 assert_eq!(
1787 catalog
1788 .rename_table(&src_table_ident, &dst_table_ident)
1789 .await
1790 .unwrap_err()
1791 .to_string(),
1792 format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"),
1793 );
1794 }
1795
1796 #[tokio::test]
1797 async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
1798 let catalog = new_memory_catalog().await;
1799 let namespace_ident = NamespaceIdent::new("n1".into());
1800 create_namespace(&catalog, &namespace_ident).await;
1801 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1802 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1803
1804 assert_eq!(
1805 catalog
1806 .rename_table(&src_table_ident, &dst_table_ident)
1807 .await
1808 .unwrap_err()
1809 .to_string(),
1810 format!("TableNotFound => No such table: {src_table_ident:?}"),
1811 );
1812 }
1813
1814 #[tokio::test]
1815 async fn test_rename_table_throws_error_if_dst_table_already_exists() {
1816 let catalog = new_memory_catalog().await;
1817 let namespace_ident = NamespaceIdent::new("n1".into());
1818 create_namespace(&catalog, &namespace_ident).await;
1819 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1820 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1821 create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
1822
1823 assert_eq!(
1824 catalog
1825 .rename_table(&src_table_ident, &dst_table_ident)
1826 .await
1827 .unwrap_err()
1828 .to_string(),
1829 format!(
1830 "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1831 &dst_table_ident
1832 ),
1833 );
1834 }
1835
1836 #[tokio::test]
1837 async fn test_register_table() {
1838 let catalog = new_memory_catalog().await;
1840 let namespace_ident = NamespaceIdent::new("test_namespace".into());
1841 create_namespace(&catalog, &namespace_ident).await;
1842
1843 let source_table_ident = TableIdent::new(namespace_ident.clone(), "source_table".into());
1845 create_table(&catalog, &source_table_ident).await;
1846
1847 let source_table = catalog.load_table(&source_table_ident).await.unwrap();
1849 let metadata_location = source_table.metadata_location().unwrap().to_string();
1850
1851 let register_table_ident =
1853 TableIdent::new(namespace_ident.clone(), "register_table".into());
1854 let registered_table = catalog
1855 .register_table(®ister_table_ident, metadata_location.clone())
1856 .await
1857 .unwrap();
1858
1859 assert_eq!(registered_table.identifier(), ®ister_table_ident);
1861
1862 assert_eq!(
1864 registered_table.metadata_location().unwrap().to_string(),
1865 metadata_location
1866 );
1867
1868 assert!(catalog.table_exists(®ister_table_ident).await.unwrap());
1870
1871 let loaded_table = catalog.load_table(®ister_table_ident).await.unwrap();
1873 assert_eq!(loaded_table.identifier(), ®ister_table_ident);
1874 assert_eq!(
1875 loaded_table.metadata_location().unwrap().to_string(),
1876 metadata_location
1877 );
1878 }
1879
1880 #[tokio::test]
1881 async fn test_update_table() {
1882 let catalog = new_memory_catalog().await;
1883
1884 let table = create_table_with_namespace(&catalog).await;
1885
1886 assert!(!table.metadata().properties().contains_key("key"));
1888
1889 tokio::time::sleep(std::time::Duration::from_millis(2)).await;
1893
1894 let tx = Transaction::new(&table);
1896 let updated_table = tx
1897 .update_table_properties()
1898 .set("key".to_string(), "value".to_string())
1899 .apply(tx)
1900 .unwrap()
1901 .commit(&catalog)
1902 .await
1903 .unwrap();
1904
1905 assert_eq!(
1906 updated_table.metadata().properties().get("key").unwrap(),
1907 "value"
1908 );
1909
1910 assert_eq!(table.identifier(), updated_table.identifier());
1911 assert_eq!(table.metadata().uuid(), updated_table.metadata().uuid());
1912 assert_ne!(table.metadata_location(), updated_table.metadata_location());
1913
1914 assert!(
1915 table.metadata().metadata_log().len() < updated_table.metadata().metadata_log().len()
1916 );
1917 }
1918
1919 #[tokio::test]
1920 async fn test_update_table_fails_if_table_doesnt_exist() {
1921 let catalog = new_memory_catalog().await;
1922
1923 let namespace_ident = NamespaceIdent::new("a".into());
1924 create_namespace(&catalog, &namespace_ident).await;
1925
1926 let table_ident = TableIdent::new(namespace_ident, "test".to_string());
1928 let table = build_table(table_ident);
1929
1930 let tx = Transaction::new(&table);
1931 let err = tx
1932 .update_table_properties()
1933 .set("key".to_string(), "value".to_string())
1934 .apply(tx)
1935 .unwrap()
1936 .commit(&catalog)
1937 .await
1938 .unwrap_err();
1939 assert_eq!(err.kind(), ErrorKind::TableNotFound);
1940 }
1941
1942 fn build_table(ident: TableIdent) -> Table {
1943 let file_io = FileIO::new_with_fs();
1944
1945 let temp_dir = TempDir::new().unwrap();
1946 let location = temp_dir.path().to_str().unwrap().to_string();
1947
1948 let table_creation = TableCreation::builder()
1949 .name(ident.name().to_string())
1950 .schema(simple_table_schema())
1951 .location(location)
1952 .build();
1953 let metadata = TableMetadataBuilder::from_table_creation(table_creation)
1954 .unwrap()
1955 .build()
1956 .unwrap()
1957 .metadata;
1958
1959 Table::builder()
1960 .identifier(ident)
1961 .metadata(metadata)
1962 .file_io(file_io)
1963 .runtime(test_runtime())
1964 .build()
1965 .unwrap()
1966 }
1967}