1use std::collections::HashMap;
21use std::str::FromStr;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use futures::lock::{Mutex, MutexGuard};
26use itertools::Itertools;
27
28use super::namespace_state::NamespaceState;
29use crate::io::{FileIO, FileIOBuilder, MemoryStorageFactory, StorageFactory};
30use crate::spec::{TableMetadata, TableMetadataBuilder};
31use crate::table::Table;
32use crate::{
33 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
34 TableCommit, TableCreation, TableIdent,
35};
36
37pub const MEMORY_CATALOG_WAREHOUSE: &str = "warehouse";
39
40const LOCATION: &str = "location";
42
43#[derive(Debug)]
45pub struct MemoryCatalogBuilder {
46 config: MemoryCatalogConfig,
47 storage_factory: Option<Arc<dyn StorageFactory>>,
48}
49
50impl Default for MemoryCatalogBuilder {
51 fn default() -> Self {
52 Self {
53 config: MemoryCatalogConfig {
54 name: None,
55 warehouse: "".to_string(),
56 props: HashMap::new(),
57 },
58 storage_factory: None,
59 }
60 }
61}
62
63impl CatalogBuilder for MemoryCatalogBuilder {
64 type C = MemoryCatalog;
65
66 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
67 self.storage_factory = Some(storage_factory);
68 self
69 }
70
71 fn load(
72 mut self,
73 name: impl Into<String>,
74 props: HashMap<String, String>,
75 ) -> impl Future<Output = Result<Self::C>> + Send {
76 self.config.name = Some(name.into());
77
78 if props.contains_key(MEMORY_CATALOG_WAREHOUSE) {
79 self.config.warehouse = props
80 .get(MEMORY_CATALOG_WAREHOUSE)
81 .cloned()
82 .unwrap_or_default()
83 }
84
85 self.config.props = props
87 .into_iter()
88 .filter(|(k, _)| k != MEMORY_CATALOG_WAREHOUSE)
89 .collect();
90
91 let result = {
92 if self.config.name.is_none() {
93 Err(Error::new(
94 ErrorKind::DataInvalid,
95 "Catalog name is required",
96 ))
97 } else if self.config.warehouse.is_empty() {
98 Err(Error::new(
99 ErrorKind::DataInvalid,
100 "Catalog warehouse is required",
101 ))
102 } else {
103 MemoryCatalog::new(self.config, self.storage_factory)
104 }
105 };
106
107 std::future::ready(result)
108 }
109}
110
111#[derive(Clone, Debug)]
112pub(crate) struct MemoryCatalogConfig {
113 name: Option<String>,
114 warehouse: String,
115 props: HashMap<String, String>,
116}
117
118#[derive(Debug)]
120pub struct MemoryCatalog {
121 root_namespace_state: Mutex<NamespaceState>,
122 file_io: FileIO,
123 warehouse_location: String,
124}
125
126impl MemoryCatalog {
127 fn new(
129 config: MemoryCatalogConfig,
130 storage_factory: Option<Arc<dyn StorageFactory>>,
131 ) -> Result<Self> {
132 let factory = storage_factory.unwrap_or_else(|| Arc::new(MemoryStorageFactory));
134
135 Ok(Self {
136 root_namespace_state: Mutex::new(NamespaceState::default()),
137 file_io: FileIOBuilder::new(factory).with_props(config.props).build(),
138 warehouse_location: config.warehouse,
139 })
140 }
141
142 async fn load_table_from_locked_state(
144 &self,
145 table_ident: &TableIdent,
146 root_namespace_state: &MutexGuard<'_, NamespaceState>,
147 ) -> Result<Table> {
148 let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?;
149 let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
150
151 Table::builder()
152 .identifier(table_ident.clone())
153 .metadata(metadata)
154 .metadata_location(metadata_location.to_string())
155 .file_io(self.file_io.clone())
156 .build()
157 }
158}
159
160#[async_trait]
161impl Catalog for MemoryCatalog {
162 async fn list_namespaces(
164 &self,
165 maybe_parent: Option<&NamespaceIdent>,
166 ) -> Result<Vec<NamespaceIdent>> {
167 let root_namespace_state = self.root_namespace_state.lock().await;
168
169 match maybe_parent {
170 None => {
171 let namespaces = root_namespace_state
172 .list_top_level_namespaces()
173 .into_iter()
174 .map(|str| NamespaceIdent::new(str.to_string()))
175 .collect_vec();
176
177 Ok(namespaces)
178 }
179 Some(parent_namespace_ident) => {
180 let namespaces = root_namespace_state
181 .list_namespaces_under(parent_namespace_ident)?
182 .into_iter()
183 .map(|name| {
184 let mut names = parent_namespace_ident.iter().cloned().collect::<Vec<_>>();
185 names.push(name.to_string());
186 NamespaceIdent::from_vec(names)
187 })
188 .collect::<Result<Vec<_>>>()?;
189
190 Ok(namespaces)
191 }
192 }
193 }
194
195 async fn create_namespace(
197 &self,
198 namespace_ident: &NamespaceIdent,
199 properties: HashMap<String, String>,
200 ) -> Result<Namespace> {
201 let mut root_namespace_state = self.root_namespace_state.lock().await;
202
203 root_namespace_state.insert_new_namespace(namespace_ident, properties.clone())?;
204 let namespace = Namespace::with_properties(namespace_ident.clone(), properties);
205
206 Ok(namespace)
207 }
208
209 async fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<Namespace> {
211 let root_namespace_state = self.root_namespace_state.lock().await;
212
213 let namespace = Namespace::with_properties(
214 namespace_ident.clone(),
215 root_namespace_state
216 .get_properties(namespace_ident)?
217 .clone(),
218 );
219
220 Ok(namespace)
221 }
222
223 async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> Result<bool> {
225 let guarded_namespaces = self.root_namespace_state.lock().await;
226
227 Ok(guarded_namespaces.namespace_exists(namespace_ident))
228 }
229
230 async fn update_namespace(
236 &self,
237 namespace_ident: &NamespaceIdent,
238 properties: HashMap<String, String>,
239 ) -> Result<()> {
240 let mut root_namespace_state = self.root_namespace_state.lock().await;
241
242 root_namespace_state.replace_properties(namespace_ident, properties)
243 }
244
245 async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<()> {
247 let mut root_namespace_state = self.root_namespace_state.lock().await;
248
249 root_namespace_state.remove_existing_namespace(namespace_ident)
250 }
251
252 async fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result<Vec<TableIdent>> {
254 let root_namespace_state = self.root_namespace_state.lock().await;
255
256 let table_names = root_namespace_state.list_tables(namespace_ident)?;
257 let table_idents = table_names
258 .into_iter()
259 .map(|table_name| TableIdent::new(namespace_ident.clone(), table_name.clone()))
260 .collect_vec();
261
262 Ok(table_idents)
263 }
264
265 async fn create_table(
267 &self,
268 namespace_ident: &NamespaceIdent,
269 table_creation: TableCreation,
270 ) -> Result<Table> {
271 let mut root_namespace_state = self.root_namespace_state.lock().await;
272
273 let table_name = table_creation.name.clone();
274 let table_ident = TableIdent::new(namespace_ident.clone(), table_name);
275
276 let (table_creation, location) = match table_creation.location.clone() {
277 Some(location) => (table_creation, location),
278 None => {
279 let namespace_properties = root_namespace_state.get_properties(namespace_ident)?;
280 let location_prefix = match namespace_properties.get(LOCATION) {
281 Some(namespace_location) => namespace_location.clone(),
282 None => format!("{}/{}", self.warehouse_location, namespace_ident.join("/")),
283 };
284
285 let location = format!("{}/{}", location_prefix, table_ident.name());
286
287 let new_table_creation = TableCreation {
288 location: Some(location.clone()),
289 ..table_creation
290 };
291
292 (new_table_creation, location)
293 }
294 };
295
296 let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
297 .build()?
298 .metadata;
299 let metadata_location = MetadataLocation::new_with_metadata(location, &metadata);
300
301 metadata.write_to(&self.file_io, &metadata_location).await?;
302
303 root_namespace_state.insert_new_table(&table_ident, metadata_location.to_string())?;
304
305 Table::builder()
306 .file_io(self.file_io.clone())
307 .metadata_location(metadata_location.to_string())
308 .metadata(metadata)
309 .identifier(table_ident)
310 .build()
311 }
312
313 async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
315 let root_namespace_state = self.root_namespace_state.lock().await;
316
317 self.load_table_from_locked_state(table_ident, &root_namespace_state)
318 .await
319 }
320
321 async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> {
323 let mut root_namespace_state = self.root_namespace_state.lock().await;
324
325 root_namespace_state.remove_existing_table(table_ident)?;
326 Ok(())
327 }
328
329 async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
331 let root_namespace_state = self.root_namespace_state.lock().await;
332
333 root_namespace_state.table_exists(table_ident)
334 }
335
336 async fn rename_table(
338 &self,
339 src_table_ident: &TableIdent,
340 dst_table_ident: &TableIdent,
341 ) -> Result<()> {
342 let mut root_namespace_state = self.root_namespace_state.lock().await;
343
344 let mut new_root_namespace_state = root_namespace_state.clone();
345 let metadata_location = new_root_namespace_state
346 .get_existing_table_location(src_table_ident)?
347 .clone();
348 new_root_namespace_state.remove_existing_table(src_table_ident)?;
349 new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?;
350 *root_namespace_state = new_root_namespace_state;
351
352 Ok(())
353 }
354
355 async fn register_table(
356 &self,
357 table_ident: &TableIdent,
358 metadata_location: String,
359 ) -> Result<Table> {
360 let mut root_namespace_state = self.root_namespace_state.lock().await;
361 root_namespace_state.insert_new_table(&table_ident.clone(), metadata_location.clone())?;
362
363 let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
364
365 Table::builder()
366 .file_io(self.file_io.clone())
367 .metadata_location(metadata_location)
368 .metadata(metadata)
369 .identifier(table_ident.clone())
370 .build()
371 }
372
373 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
375 let mut root_namespace_state = self.root_namespace_state.lock().await;
376
377 let current_table = self
378 .load_table_from_locked_state(commit.identifier(), &root_namespace_state)
379 .await?;
380
381 let staged_table = commit.apply(current_table)?;
383
384 let metadata_location =
386 MetadataLocation::from_str(staged_table.metadata_location_result()?)?;
387 staged_table
388 .metadata()
389 .write_to(staged_table.file_io(), &metadata_location)
390 .await?;
391
392 let updated_table = root_namespace_state.commit_table_update(staged_table)?;
394
395 Ok(updated_table)
396 }
397}
398
399#[cfg(test)]
400pub(crate) mod tests {
401 use std::collections::HashSet;
402 use std::hash::Hash;
403 use std::iter::FromIterator;
404 use std::vec;
405
406 use regex::Regex;
407 use tempfile::TempDir;
408
409 use super::*;
410 use crate::io::FileIO;
411 use crate::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
412 use crate::transaction::{ApplyTransactionAction, Transaction};
413
414 fn temp_path() -> String {
415 let temp_dir = TempDir::new().unwrap();
416 temp_dir.path().to_str().unwrap().to_string()
417 }
418
419 pub(crate) async fn new_memory_catalog() -> impl Catalog {
420 let warehouse_location = temp_path();
421 MemoryCatalogBuilder::default()
422 .load(
423 "memory",
424 HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_location)]),
425 )
426 .await
427 .unwrap()
428 }
429
430 async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
431 let _ = catalog
432 .create_namespace(namespace_ident, HashMap::new())
433 .await
434 .unwrap();
435 }
436
437 async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
438 for namespace_ident in namespace_idents {
439 let _ = create_namespace(catalog, namespace_ident).await;
440 }
441 }
442
443 fn to_set<T: Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
444 HashSet::from_iter(vec)
445 }
446
447 fn simple_table_schema() -> Schema {
448 Schema::builder()
449 .with_fields(vec![
450 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
451 ])
452 .build()
453 .unwrap()
454 }
455
456 async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) -> Table {
457 catalog
458 .create_table(
459 &table_ident.namespace,
460 TableCreation::builder()
461 .name(table_ident.name().into())
462 .schema(simple_table_schema())
463 .build(),
464 )
465 .await
466 .unwrap()
467 }
468
469 async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
470 for table_ident in table_idents {
471 create_table(catalog, table_ident).await;
472 }
473 }
474
475 async fn create_table_with_namespace<C: Catalog>(catalog: &C) -> Table {
476 let namespace_ident = NamespaceIdent::new("abc".into());
477 create_namespace(catalog, &namespace_ident).await;
478
479 let table_ident = TableIdent::new(namespace_ident, "test".to_string());
480 create_table(catalog, &table_ident).await
481 }
482
483 fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
484 assert_eq!(table.identifier(), expected_table_ident);
485
486 let metadata = table.metadata();
487
488 assert_eq!(metadata.current_schema().as_ref(), expected_schema);
489
490 let expected_partition_spec = PartitionSpec::builder((*expected_schema).clone())
491 .with_spec_id(0)
492 .build()
493 .unwrap();
494
495 assert_eq!(
496 metadata
497 .partition_specs_iter()
498 .map(|p| p.as_ref())
499 .collect_vec(),
500 vec![&expected_partition_spec]
501 );
502
503 let expected_sorted_order = SortOrder::builder()
504 .with_order_id(0)
505 .with_fields(vec![])
506 .build(expected_schema)
507 .unwrap();
508
509 assert_eq!(
510 metadata
511 .sort_orders_iter()
512 .map(|s| s.as_ref())
513 .collect_vec(),
514 vec![&expected_sorted_order]
515 );
516
517 assert_eq!(metadata.properties(), &HashMap::new());
518
519 assert!(!table.readonly());
520 }
521
522 const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
523
524 fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
525 let actual = table.metadata_location().unwrap().to_string();
526 let regex = Regex::new(regex_str).unwrap();
527 assert!(
528 regex.is_match(&actual),
529 "Expected metadata location to match regex, but got location: {actual} and regex: {regex}"
530 )
531 }
532
533 #[tokio::test]
534 async fn test_list_namespaces_returns_empty_vector() {
535 let catalog = new_memory_catalog().await;
536
537 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
538 }
539
540 #[tokio::test]
541 async fn test_list_namespaces_returns_single_namespace() {
542 let catalog = new_memory_catalog().await;
543 let namespace_ident = NamespaceIdent::new("abc".into());
544 create_namespace(&catalog, &namespace_ident).await;
545
546 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
547 namespace_ident
548 ]);
549 }
550
551 #[tokio::test]
552 async fn test_list_namespaces_returns_multiple_namespaces() {
553 let catalog = new_memory_catalog().await;
554 let namespace_ident_1 = NamespaceIdent::new("a".into());
555 let namespace_ident_2 = NamespaceIdent::new("b".into());
556 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
557
558 assert_eq!(
559 to_set(catalog.list_namespaces(None).await.unwrap()),
560 to_set(vec![namespace_ident_1, namespace_ident_2])
561 );
562 }
563
564 #[tokio::test]
565 async fn test_list_namespaces_returns_only_top_level_namespaces() {
566 let catalog = new_memory_catalog().await;
567 let namespace_ident_1 = NamespaceIdent::new("a".into());
568 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
569 let namespace_ident_3 = NamespaceIdent::new("b".into());
570 create_namespaces(&catalog, &vec![
571 &namespace_ident_1,
572 &namespace_ident_2,
573 &namespace_ident_3,
574 ])
575 .await;
576
577 assert_eq!(
578 to_set(catalog.list_namespaces(None).await.unwrap()),
579 to_set(vec![namespace_ident_1, namespace_ident_3])
580 );
581 }
582
583 #[tokio::test]
584 async fn test_list_namespaces_returns_no_namespaces_under_parent() {
585 let catalog = new_memory_catalog().await;
586 let namespace_ident_1 = NamespaceIdent::new("a".into());
587 let namespace_ident_2 = NamespaceIdent::new("b".into());
588 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
589
590 assert_eq!(
591 catalog
592 .list_namespaces(Some(&namespace_ident_1))
593 .await
594 .unwrap(),
595 vec![]
596 );
597 }
598
599 #[tokio::test]
600 async fn test_list_namespaces_returns_namespace_under_parent() {
601 let catalog = new_memory_catalog().await;
602 let namespace_ident_1 = NamespaceIdent::new("a".into());
603 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
604 let namespace_ident_3 = NamespaceIdent::new("c".into());
605 create_namespaces(&catalog, &vec![
606 &namespace_ident_1,
607 &namespace_ident_2,
608 &namespace_ident_3,
609 ])
610 .await;
611
612 assert_eq!(
613 to_set(catalog.list_namespaces(None).await.unwrap()),
614 to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
615 );
616
617 assert_eq!(
618 catalog
619 .list_namespaces(Some(&namespace_ident_1))
620 .await
621 .unwrap(),
622 vec![namespace_ident_2]
623 );
624 }
625
626 #[tokio::test]
627 async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
628 let catalog = new_memory_catalog().await;
629 let namespace_ident_1 = NamespaceIdent::new("a".to_string());
630 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
631 let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
632 let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
633 let namespace_ident_5 = NamespaceIdent::new("b".into());
634 create_namespaces(&catalog, &vec![
635 &namespace_ident_1,
636 &namespace_ident_2,
637 &namespace_ident_3,
638 &namespace_ident_4,
639 &namespace_ident_5,
640 ])
641 .await;
642
643 assert_eq!(
644 to_set(
645 catalog
646 .list_namespaces(Some(&namespace_ident_1))
647 .await
648 .unwrap()
649 ),
650 to_set(vec![
651 namespace_ident_2,
652 namespace_ident_3,
653 namespace_ident_4,
654 ])
655 );
656 }
657
658 #[tokio::test]
659 async fn test_namespace_exists_returns_false() {
660 let catalog = new_memory_catalog().await;
661 let namespace_ident = NamespaceIdent::new("a".into());
662 create_namespace(&catalog, &namespace_ident).await;
663
664 assert!(
665 !catalog
666 .namespace_exists(&NamespaceIdent::new("b".into()))
667 .await
668 .unwrap()
669 );
670 }
671
672 #[tokio::test]
673 async fn test_namespace_exists_returns_true() {
674 let catalog = new_memory_catalog().await;
675 let namespace_ident = NamespaceIdent::new("a".into());
676 create_namespace(&catalog, &namespace_ident).await;
677
678 assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
679 }
680
681 #[tokio::test]
682 async fn test_create_namespace_with_empty_properties() {
683 let catalog = new_memory_catalog().await;
684 let namespace_ident = NamespaceIdent::new("a".into());
685
686 assert_eq!(
687 catalog
688 .create_namespace(&namespace_ident, HashMap::new())
689 .await
690 .unwrap(),
691 Namespace::new(namespace_ident.clone())
692 );
693
694 assert_eq!(
695 catalog.get_namespace(&namespace_ident).await.unwrap(),
696 Namespace::with_properties(namespace_ident, HashMap::new())
697 );
698 }
699
700 #[tokio::test]
701 async fn test_create_namespace_with_properties() {
702 let catalog = new_memory_catalog().await;
703 let namespace_ident = NamespaceIdent::new("abc".into());
704
705 let mut properties: HashMap<String, String> = HashMap::new();
706 properties.insert("k".into(), "v".into());
707
708 assert_eq!(
709 catalog
710 .create_namespace(&namespace_ident, properties.clone())
711 .await
712 .unwrap(),
713 Namespace::with_properties(namespace_ident.clone(), properties.clone())
714 );
715
716 assert_eq!(
717 catalog.get_namespace(&namespace_ident).await.unwrap(),
718 Namespace::with_properties(namespace_ident, properties)
719 );
720 }
721
722 #[tokio::test]
723 async fn test_create_namespace_throws_error_if_namespace_already_exists() {
724 let catalog = new_memory_catalog().await;
725 let namespace_ident = NamespaceIdent::new("a".into());
726 create_namespace(&catalog, &namespace_ident).await;
727
728 assert_eq!(
729 catalog
730 .create_namespace(&namespace_ident, HashMap::new())
731 .await
732 .unwrap_err()
733 .to_string(),
734 format!(
735 "NamespaceAlreadyExists => Cannot create namespace {:?}. Namespace already exists.",
736 &namespace_ident
737 )
738 );
739
740 assert_eq!(
741 catalog.get_namespace(&namespace_ident).await.unwrap(),
742 Namespace::with_properties(namespace_ident, HashMap::new())
743 );
744 }
745
746 #[tokio::test]
747 async fn test_create_nested_namespace() {
748 let catalog = new_memory_catalog().await;
749 let parent_namespace_ident = NamespaceIdent::new("a".into());
750 create_namespace(&catalog, &parent_namespace_ident).await;
751
752 let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
753
754 assert_eq!(
755 catalog
756 .create_namespace(&child_namespace_ident, HashMap::new())
757 .await
758 .unwrap(),
759 Namespace::new(child_namespace_ident.clone())
760 );
761
762 assert_eq!(
763 catalog.get_namespace(&child_namespace_ident).await.unwrap(),
764 Namespace::with_properties(child_namespace_ident, HashMap::new())
765 );
766 }
767
768 #[tokio::test]
769 async fn test_create_deeply_nested_namespace() {
770 let catalog = new_memory_catalog().await;
771 let namespace_ident_a = NamespaceIdent::new("a".into());
772 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
773 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
774
775 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
776
777 assert_eq!(
778 catalog
779 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
780 .await
781 .unwrap(),
782 Namespace::new(namespace_ident_a_b_c.clone())
783 );
784
785 assert_eq!(
786 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
787 Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
788 );
789 }
790
791 #[tokio::test]
792 async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() {
793 let catalog = new_memory_catalog().await;
794
795 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
796
797 assert_eq!(
798 catalog
799 .create_namespace(&nested_namespace_ident, HashMap::new())
800 .await
801 .unwrap_err()
802 .to_string(),
803 format!(
804 "NamespaceNotFound => No such namespace: {:?}",
805 NamespaceIdent::new("a".into())
806 )
807 );
808
809 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
810 }
811
812 #[tokio::test]
813 async fn test_create_deeply_nested_namespace_throws_error_if_intermediate_namespace_doesnt_exist()
814 {
815 let catalog = new_memory_catalog().await;
816
817 let namespace_ident_a = NamespaceIdent::new("a".into());
818 create_namespace(&catalog, &namespace_ident_a).await;
819
820 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
821
822 assert_eq!(
823 catalog
824 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
825 .await
826 .unwrap_err()
827 .to_string(),
828 format!(
829 "NamespaceNotFound => No such namespace: {:?}",
830 NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()
831 )
832 );
833
834 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
835 namespace_ident_a.clone()
836 ]);
837
838 assert_eq!(
839 catalog
840 .list_namespaces(Some(&namespace_ident_a))
841 .await
842 .unwrap(),
843 vec![]
844 );
845 }
846
847 #[tokio::test]
848 async fn test_get_namespace() {
849 let catalog = new_memory_catalog().await;
850 let namespace_ident = NamespaceIdent::new("abc".into());
851
852 let mut properties: HashMap<String, String> = HashMap::new();
853 properties.insert("k".into(), "v".into());
854 let _ = catalog
855 .create_namespace(&namespace_ident, properties.clone())
856 .await
857 .unwrap();
858
859 assert_eq!(
860 catalog.get_namespace(&namespace_ident).await.unwrap(),
861 Namespace::with_properties(namespace_ident, properties)
862 )
863 }
864
865 #[tokio::test]
866 async fn test_get_nested_namespace() {
867 let catalog = new_memory_catalog().await;
868 let namespace_ident_a = NamespaceIdent::new("a".into());
869 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
870 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
871
872 assert_eq!(
873 catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
874 Namespace::with_properties(namespace_ident_a_b, HashMap::new())
875 );
876 }
877
878 #[tokio::test]
879 async fn test_get_deeply_nested_namespace() {
880 let catalog = new_memory_catalog().await;
881 let namespace_ident_a = NamespaceIdent::new("a".into());
882 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
883 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
884 create_namespaces(&catalog, &vec![
885 &namespace_ident_a,
886 &namespace_ident_a_b,
887 &namespace_ident_a_b_c,
888 ])
889 .await;
890
891 assert_eq!(
892 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
893 Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
894 );
895 }
896
897 #[tokio::test]
898 async fn test_get_namespace_throws_error_if_namespace_doesnt_exist() {
899 let catalog = new_memory_catalog().await;
900 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
901
902 let non_existent_namespace_ident = NamespaceIdent::new("b".into());
903 assert_eq!(
904 catalog
905 .get_namespace(&non_existent_namespace_ident)
906 .await
907 .unwrap_err()
908 .to_string(),
909 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
910 )
911 }
912
913 #[tokio::test]
914 async fn test_update_namespace() {
915 let catalog = new_memory_catalog().await;
916 let namespace_ident = NamespaceIdent::new("abc".into());
917 create_namespace(&catalog, &namespace_ident).await;
918
919 let mut new_properties: HashMap<String, String> = HashMap::new();
920 new_properties.insert("k".into(), "v".into());
921
922 catalog
923 .update_namespace(&namespace_ident, new_properties.clone())
924 .await
925 .unwrap();
926
927 assert_eq!(
928 catalog.get_namespace(&namespace_ident).await.unwrap(),
929 Namespace::with_properties(namespace_ident, new_properties)
930 )
931 }
932
933 #[tokio::test]
934 async fn test_update_nested_namespace() {
935 let catalog = new_memory_catalog().await;
936 let namespace_ident_a = NamespaceIdent::new("a".into());
937 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
938 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
939
940 let mut new_properties = HashMap::new();
941 new_properties.insert("k".into(), "v".into());
942
943 catalog
944 .update_namespace(&namespace_ident_a_b, new_properties.clone())
945 .await
946 .unwrap();
947
948 assert_eq!(
949 catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
950 Namespace::with_properties(namespace_ident_a_b, new_properties)
951 );
952 }
953
954 #[tokio::test]
955 async fn test_update_deeply_nested_namespace() {
956 let catalog = new_memory_catalog().await;
957 let namespace_ident_a = NamespaceIdent::new("a".into());
958 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
959 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
960 create_namespaces(&catalog, &vec![
961 &namespace_ident_a,
962 &namespace_ident_a_b,
963 &namespace_ident_a_b_c,
964 ])
965 .await;
966
967 let mut new_properties = HashMap::new();
968 new_properties.insert("k".into(), "v".into());
969
970 catalog
971 .update_namespace(&namespace_ident_a_b_c, new_properties.clone())
972 .await
973 .unwrap();
974
975 assert_eq!(
976 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
977 Namespace::with_properties(namespace_ident_a_b_c, new_properties)
978 );
979 }
980
981 #[tokio::test]
982 async fn test_update_namespace_throws_error_if_namespace_doesnt_exist() {
983 let catalog = new_memory_catalog().await;
984 create_namespace(&catalog, &NamespaceIdent::new("abc".into())).await;
985
986 let non_existent_namespace_ident = NamespaceIdent::new("def".into());
987 assert_eq!(
988 catalog
989 .update_namespace(&non_existent_namespace_ident, HashMap::new())
990 .await
991 .unwrap_err()
992 .to_string(),
993 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
994 )
995 }
996
997 #[tokio::test]
998 async fn test_drop_namespace() {
999 let catalog = new_memory_catalog().await;
1000 let namespace_ident = NamespaceIdent::new("abc".into());
1001 create_namespace(&catalog, &namespace_ident).await;
1002
1003 catalog.drop_namespace(&namespace_ident).await.unwrap();
1004
1005 assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
1006 }
1007
1008 #[tokio::test]
1009 async fn test_drop_nested_namespace() {
1010 let catalog = new_memory_catalog().await;
1011 let namespace_ident_a = NamespaceIdent::new("a".into());
1012 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1013 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1014
1015 catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1016
1017 assert!(
1018 !catalog
1019 .namespace_exists(&namespace_ident_a_b)
1020 .await
1021 .unwrap()
1022 );
1023
1024 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1025 }
1026
1027 #[tokio::test]
1028 async fn test_drop_deeply_nested_namespace() {
1029 let catalog = new_memory_catalog().await;
1030 let namespace_ident_a = NamespaceIdent::new("a".into());
1031 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1032 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1033 create_namespaces(&catalog, &vec![
1034 &namespace_ident_a,
1035 &namespace_ident_a_b,
1036 &namespace_ident_a_b_c,
1037 ])
1038 .await;
1039
1040 catalog
1041 .drop_namespace(&namespace_ident_a_b_c)
1042 .await
1043 .unwrap();
1044
1045 assert!(
1046 !catalog
1047 .namespace_exists(&namespace_ident_a_b_c)
1048 .await
1049 .unwrap()
1050 );
1051
1052 assert!(
1053 catalog
1054 .namespace_exists(&namespace_ident_a_b)
1055 .await
1056 .unwrap()
1057 );
1058
1059 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1060 }
1061
1062 #[tokio::test]
1063 async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1064 let catalog = new_memory_catalog().await;
1065
1066 let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1067 assert_eq!(
1068 catalog
1069 .drop_namespace(&non_existent_namespace_ident)
1070 .await
1071 .unwrap_err()
1072 .to_string(),
1073 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1074 )
1075 }
1076
1077 #[tokio::test]
1078 async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1079 let catalog = new_memory_catalog().await;
1080 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1081
1082 let non_existent_namespace_ident =
1083 NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1084 assert_eq!(
1085 catalog
1086 .drop_namespace(&non_existent_namespace_ident)
1087 .await
1088 .unwrap_err()
1089 .to_string(),
1090 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1091 )
1092 }
1093
1094 #[tokio::test]
1095 async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() {
1096 let catalog = new_memory_catalog().await;
1097 let namespace_ident_a = NamespaceIdent::new("a".into());
1098 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1099 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1100
1101 catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1102
1103 assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1104
1105 assert!(
1106 !catalog
1107 .namespace_exists(&namespace_ident_a_b)
1108 .await
1109 .unwrap()
1110 );
1111 }
1112
1113 #[tokio::test]
1114 async fn test_create_table_with_location() {
1115 let tmp_dir = TempDir::new().unwrap();
1116 let catalog = new_memory_catalog().await;
1117 let namespace_ident = NamespaceIdent::new("a".into());
1118 create_namespace(&catalog, &namespace_ident).await;
1119
1120 let table_name = "abc";
1121 let location = tmp_dir.path().to_str().unwrap().to_string();
1122 let table_creation = TableCreation::builder()
1123 .name(table_name.into())
1124 .location(location.clone())
1125 .schema(simple_table_schema())
1126 .build();
1127
1128 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1129
1130 assert_table_eq(
1131 &catalog
1132 .create_table(&namespace_ident, table_creation)
1133 .await
1134 .unwrap(),
1135 &expected_table_ident,
1136 &simple_table_schema(),
1137 );
1138
1139 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1140
1141 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1142
1143 assert!(
1144 table
1145 .metadata_location()
1146 .unwrap()
1147 .to_string()
1148 .starts_with(&location)
1149 )
1150 }
1151
1152 #[tokio::test]
1153 async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1154 let warehouse_location = temp_path();
1155 let catalog = MemoryCatalogBuilder::default()
1156 .load(
1157 "memory",
1158 HashMap::from([(
1159 MEMORY_CATALOG_WAREHOUSE.to_string(),
1160 warehouse_location.clone(),
1161 )]),
1162 )
1163 .await
1164 .unwrap();
1165
1166 let namespace_ident = NamespaceIdent::new("a".into());
1167 let mut namespace_properties = HashMap::new();
1168 let namespace_location = temp_path();
1169 namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1170 catalog
1171 .create_namespace(&namespace_ident, namespace_properties)
1172 .await
1173 .unwrap();
1174
1175 let table_name = "tbl1";
1176 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1177 let expected_table_metadata_location_regex =
1178 format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1179
1180 let table = catalog
1181 .create_table(
1182 &namespace_ident,
1183 TableCreation::builder()
1184 .name(table_name.into())
1185 .schema(simple_table_schema())
1186 .build(),
1188 )
1189 .await
1190 .unwrap();
1191 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1192 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1193
1194 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1195 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1196 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1197 }
1198
1199 #[tokio::test]
1200 async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1201 {
1202 let warehouse_location = temp_path();
1203 let catalog = MemoryCatalogBuilder::default()
1204 .load(
1205 "memory",
1206 HashMap::from([(
1207 MEMORY_CATALOG_WAREHOUSE.to_string(),
1208 warehouse_location.clone(),
1209 )]),
1210 )
1211 .await
1212 .unwrap();
1213
1214 let namespace_ident = NamespaceIdent::new("a".into());
1215 let mut namespace_properties = HashMap::new();
1216 let namespace_location = temp_path();
1217 namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1218 catalog
1219 .create_namespace(&namespace_ident, namespace_properties)
1220 .await
1221 .unwrap();
1222
1223 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1224 let mut nested_namespace_properties = HashMap::new();
1225 let nested_namespace_location = temp_path();
1226 nested_namespace_properties
1227 .insert(LOCATION.to_string(), nested_namespace_location.to_string());
1228 catalog
1229 .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1230 .await
1231 .unwrap();
1232
1233 let table_name = "tbl1";
1234 let expected_table_ident =
1235 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1236 let expected_table_metadata_location_regex = format!(
1237 "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1238 );
1239
1240 let table = catalog
1241 .create_table(
1242 &nested_namespace_ident,
1243 TableCreation::builder()
1244 .name(table_name.into())
1245 .schema(simple_table_schema())
1246 .build(),
1248 )
1249 .await
1250 .unwrap();
1251 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1252 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1253
1254 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1255 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1256 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1257 }
1258
1259 #[tokio::test]
1260 async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1261 {
1262 let warehouse_location = temp_path();
1263 let catalog = MemoryCatalogBuilder::default()
1264 .load(
1265 "memory",
1266 HashMap::from([(
1267 MEMORY_CATALOG_WAREHOUSE.to_string(),
1268 warehouse_location.clone(),
1269 )]),
1270 )
1271 .await
1272 .unwrap();
1273
1274 let namespace_ident = NamespaceIdent::new("a".into());
1275 let namespace_properties = HashMap::new();
1277 catalog
1278 .create_namespace(&namespace_ident, namespace_properties)
1279 .await
1280 .unwrap();
1281
1282 let table_name = "tbl1";
1283 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1284 let expected_table_metadata_location_regex =
1285 format!("^{warehouse_location}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1286
1287 let table = catalog
1288 .create_table(
1289 &namespace_ident,
1290 TableCreation::builder()
1291 .name(table_name.into())
1292 .schema(simple_table_schema())
1293 .build(),
1295 )
1296 .await
1297 .unwrap();
1298 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1299 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1300
1301 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1302 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1303 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1304 }
1305
1306 #[tokio::test]
1307 async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1308 {
1309 let warehouse_location = temp_path();
1310 let catalog = MemoryCatalogBuilder::default()
1311 .load(
1312 "memory",
1313 HashMap::from([(
1314 MEMORY_CATALOG_WAREHOUSE.to_string(),
1315 warehouse_location.clone(),
1316 )]),
1317 )
1318 .await
1319 .unwrap();
1320
1321 let namespace_ident = NamespaceIdent::new("a".into());
1322 catalog
1323 .create_namespace(&namespace_ident, HashMap::new())
1325 .await
1326 .unwrap();
1327
1328 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1329 catalog
1330 .create_namespace(&nested_namespace_ident, HashMap::new())
1332 .await
1333 .unwrap();
1334
1335 let table_name = "tbl1";
1336 let expected_table_ident =
1337 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1338 let expected_table_metadata_location_regex = format!(
1339 "^{warehouse_location}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$"
1340 );
1341
1342 let table = catalog
1343 .create_table(
1344 &nested_namespace_ident,
1345 TableCreation::builder()
1346 .name(table_name.into())
1347 .schema(simple_table_schema())
1348 .build(),
1350 )
1351 .await
1352 .unwrap();
1353 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1354 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1355
1356 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1357 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1358 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1359 }
1360
1361 #[tokio::test]
1362 async fn test_create_table_throws_error_if_table_location_and_namespace_location_and_warehouse_location_are_missing()
1363 {
1364 let catalog = MemoryCatalogBuilder::default()
1365 .load("memory", HashMap::from([]))
1366 .await;
1367
1368 assert!(catalog.is_err());
1369 assert_eq!(
1370 catalog.unwrap_err().to_string(),
1371 "DataInvalid => Catalog warehouse is required"
1372 );
1373 }
1374
1375 #[tokio::test]
1376 async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1377 let catalog = new_memory_catalog().await;
1378 let namespace_ident = NamespaceIdent::new("a".into());
1379 create_namespace(&catalog, &namespace_ident).await;
1380 let table_name = "tbl1";
1381 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1382 create_table(&catalog, &table_ident).await;
1383
1384 let tmp_dir = TempDir::new().unwrap();
1385 let location = tmp_dir.path().to_str().unwrap().to_string();
1386
1387 assert_eq!(
1388 catalog
1389 .create_table(
1390 &namespace_ident,
1391 TableCreation::builder()
1392 .name(table_name.into())
1393 .schema(simple_table_schema())
1394 .location(location)
1395 .build()
1396 )
1397 .await
1398 .unwrap_err()
1399 .to_string(),
1400 format!(
1401 "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1402 &table_ident
1403 )
1404 );
1405 }
1406
1407 #[tokio::test]
1408 async fn test_list_tables_returns_empty_vector() {
1409 let catalog = new_memory_catalog().await;
1410 let namespace_ident = NamespaceIdent::new("a".into());
1411 create_namespace(&catalog, &namespace_ident).await;
1412
1413 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1414 }
1415
1416 #[tokio::test]
1417 async fn test_list_tables_returns_a_single_table() {
1418 let catalog = new_memory_catalog().await;
1419 let namespace_ident = NamespaceIdent::new("n1".into());
1420 create_namespace(&catalog, &namespace_ident).await;
1421
1422 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1423 create_table(&catalog, &table_ident).await;
1424
1425 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1426 table_ident
1427 ]);
1428 }
1429
1430 #[tokio::test]
1431 async fn test_list_tables_returns_multiple_tables() {
1432 let catalog = new_memory_catalog().await;
1433 let namespace_ident = NamespaceIdent::new("n1".into());
1434 create_namespace(&catalog, &namespace_ident).await;
1435
1436 let table_ident_1 = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1437 let table_ident_2 = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1438 let _ = create_tables(&catalog, vec![&table_ident_1, &table_ident_2]).await;
1439
1440 assert_eq!(
1441 to_set(catalog.list_tables(&namespace_ident).await.unwrap()),
1442 to_set(vec![table_ident_1, table_ident_2])
1443 );
1444 }
1445
1446 #[tokio::test]
1447 async fn test_list_tables_returns_tables_from_correct_namespace() {
1448 let catalog = new_memory_catalog().await;
1449 let namespace_ident_1 = NamespaceIdent::new("n1".into());
1450 let namespace_ident_2 = NamespaceIdent::new("n2".into());
1451 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1452
1453 let table_ident_1 = TableIdent::new(namespace_ident_1.clone(), "tbl1".into());
1454 let table_ident_2 = TableIdent::new(namespace_ident_1.clone(), "tbl2".into());
1455 let table_ident_3 = TableIdent::new(namespace_ident_2.clone(), "tbl1".into());
1456 let _ = create_tables(&catalog, vec![
1457 &table_ident_1,
1458 &table_ident_2,
1459 &table_ident_3,
1460 ])
1461 .await;
1462
1463 assert_eq!(
1464 to_set(catalog.list_tables(&namespace_ident_1).await.unwrap()),
1465 to_set(vec![table_ident_1, table_ident_2])
1466 );
1467
1468 assert_eq!(
1469 to_set(catalog.list_tables(&namespace_ident_2).await.unwrap()),
1470 to_set(vec![table_ident_3])
1471 );
1472 }
1473
1474 #[tokio::test]
1475 async fn test_list_tables_returns_table_under_nested_namespace() {
1476 let catalog = new_memory_catalog().await;
1477 let namespace_ident_a = NamespaceIdent::new("a".into());
1478 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1479 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1480
1481 let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1482 create_table(&catalog, &table_ident).await;
1483
1484 assert_eq!(
1485 catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1486 vec![table_ident]
1487 );
1488 }
1489
1490 #[tokio::test]
1491 async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1492 let catalog = new_memory_catalog().await;
1493
1494 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1495
1496 assert_eq!(
1497 catalog
1498 .list_tables(&non_existent_namespace_ident)
1499 .await
1500 .unwrap_err()
1501 .to_string(),
1502 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1503 );
1504 }
1505
1506 #[tokio::test]
1507 async fn test_drop_table() {
1508 let catalog = new_memory_catalog().await;
1509 let namespace_ident = NamespaceIdent::new("n1".into());
1510 create_namespace(&catalog, &namespace_ident).await;
1511 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1512 create_table(&catalog, &table_ident).await;
1513
1514 catalog.drop_table(&table_ident).await.unwrap();
1515 }
1516
1517 #[tokio::test]
1518 async fn test_drop_table_drops_table_under_nested_namespace() {
1519 let catalog = new_memory_catalog().await;
1520 let namespace_ident_a = NamespaceIdent::new("a".into());
1521 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1522 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1523
1524 let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1525 create_table(&catalog, &table_ident).await;
1526
1527 catalog.drop_table(&table_ident).await.unwrap();
1528
1529 assert_eq!(
1530 catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1531 vec![]
1532 );
1533 }
1534
1535 #[tokio::test]
1536 async fn test_drop_table_throws_error_if_namespace_doesnt_exist() {
1537 let catalog = new_memory_catalog().await;
1538
1539 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1540 let non_existent_table_ident =
1541 TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1542
1543 assert_eq!(
1544 catalog
1545 .drop_table(&non_existent_table_ident)
1546 .await
1547 .unwrap_err()
1548 .to_string(),
1549 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1550 );
1551 }
1552
1553 #[tokio::test]
1554 async fn test_drop_table_throws_error_if_table_doesnt_exist() {
1555 let catalog = new_memory_catalog().await;
1556 let namespace_ident = NamespaceIdent::new("n1".into());
1557 create_namespace(&catalog, &namespace_ident).await;
1558
1559 let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1560
1561 assert_eq!(
1562 catalog
1563 .drop_table(&non_existent_table_ident)
1564 .await
1565 .unwrap_err()
1566 .to_string(),
1567 format!("TableNotFound => No such table: {non_existent_table_ident:?}"),
1568 );
1569 }
1570
1571 #[tokio::test]
1572 async fn test_table_exists_returns_true() {
1573 let catalog = new_memory_catalog().await;
1574 let namespace_ident = NamespaceIdent::new("n1".into());
1575 create_namespace(&catalog, &namespace_ident).await;
1576 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1577 create_table(&catalog, &table_ident).await;
1578
1579 assert!(catalog.table_exists(&table_ident).await.unwrap());
1580 }
1581
1582 #[tokio::test]
1583 async fn test_table_exists_returns_false() {
1584 let catalog = new_memory_catalog().await;
1585 let namespace_ident = NamespaceIdent::new("n1".into());
1586 create_namespace(&catalog, &namespace_ident).await;
1587 let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1588
1589 assert!(
1590 !catalog
1591 .table_exists(&non_existent_table_ident)
1592 .await
1593 .unwrap()
1594 );
1595 }
1596
1597 #[tokio::test]
1598 async fn test_table_exists_under_nested_namespace() {
1599 let catalog = new_memory_catalog().await;
1600 let namespace_ident_a = NamespaceIdent::new("a".into());
1601 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1602 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1603
1604 let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1605 create_table(&catalog, &table_ident).await;
1606
1607 assert!(catalog.table_exists(&table_ident).await.unwrap());
1608
1609 let non_existent_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl2".into());
1610 assert!(
1611 !catalog
1612 .table_exists(&non_existent_table_ident)
1613 .await
1614 .unwrap()
1615 );
1616 }
1617
1618 #[tokio::test]
1619 async fn test_table_exists_throws_error_if_namespace_doesnt_exist() {
1620 let catalog = new_memory_catalog().await;
1621
1622 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1623 let non_existent_table_ident =
1624 TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1625
1626 assert_eq!(
1627 catalog
1628 .table_exists(&non_existent_table_ident)
1629 .await
1630 .unwrap_err()
1631 .to_string(),
1632 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1633 );
1634 }
1635
1636 #[tokio::test]
1637 async fn test_rename_table_in_same_namespace() {
1638 let catalog = new_memory_catalog().await;
1639 let namespace_ident = NamespaceIdent::new("n1".into());
1640 create_namespace(&catalog, &namespace_ident).await;
1641 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1642 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1643 create_table(&catalog, &src_table_ident).await;
1644
1645 catalog
1646 .rename_table(&src_table_ident, &dst_table_ident)
1647 .await
1648 .unwrap();
1649
1650 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1651 dst_table_ident
1652 ],);
1653 }
1654
1655 #[tokio::test]
1656 async fn test_rename_table_across_namespaces() {
1657 let catalog = new_memory_catalog().await;
1658 let src_namespace_ident = NamespaceIdent::new("a".into());
1659 let dst_namespace_ident = NamespaceIdent::new("b".into());
1660 create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
1661 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1662 let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
1663 create_table(&catalog, &src_table_ident).await;
1664
1665 catalog
1666 .rename_table(&src_table_ident, &dst_table_ident)
1667 .await
1668 .unwrap();
1669
1670 assert_eq!(
1671 catalog.list_tables(&src_namespace_ident).await.unwrap(),
1672 vec![],
1673 );
1674
1675 assert_eq!(
1676 catalog.list_tables(&dst_namespace_ident).await.unwrap(),
1677 vec![dst_table_ident],
1678 );
1679 }
1680
1681 #[tokio::test]
1682 async fn test_rename_table_src_table_is_same_as_dst_table() {
1683 let catalog = new_memory_catalog().await;
1684 let namespace_ident = NamespaceIdent::new("n1".into());
1685 create_namespace(&catalog, &namespace_ident).await;
1686 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1687 create_table(&catalog, &table_ident).await;
1688
1689 catalog
1690 .rename_table(&table_ident, &table_ident)
1691 .await
1692 .unwrap();
1693
1694 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1695 table_ident
1696 ],);
1697 }
1698
1699 #[tokio::test]
1700 async fn test_rename_table_across_nested_namespaces() {
1701 let catalog = new_memory_catalog().await;
1702 let namespace_ident_a = NamespaceIdent::new("a".into());
1703 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1704 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1705 create_namespaces(&catalog, &vec![
1706 &namespace_ident_a,
1707 &namespace_ident_a_b,
1708 &namespace_ident_a_b_c,
1709 ])
1710 .await;
1711
1712 let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
1713 create_tables(&catalog, vec![&src_table_ident]).await;
1714
1715 let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1716 catalog
1717 .rename_table(&src_table_ident, &dst_table_ident)
1718 .await
1719 .unwrap();
1720
1721 assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
1722
1723 assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
1724 }
1725
1726 #[tokio::test]
1727 async fn test_rename_table_throws_error_if_src_namespace_doesnt_exist() {
1728 let catalog = new_memory_catalog().await;
1729
1730 let non_existent_src_namespace_ident = NamespaceIdent::new("n1".into());
1731 let src_table_ident =
1732 TableIdent::new(non_existent_src_namespace_ident.clone(), "tbl1".into());
1733
1734 let dst_namespace_ident = NamespaceIdent::new("n2".into());
1735 create_namespace(&catalog, &dst_namespace_ident).await;
1736 let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl1".into());
1737
1738 assert_eq!(
1739 catalog
1740 .rename_table(&src_table_ident, &dst_table_ident)
1741 .await
1742 .unwrap_err()
1743 .to_string(),
1744 format!("NamespaceNotFound => No such namespace: {non_existent_src_namespace_ident:?}"),
1745 );
1746 }
1747
1748 #[tokio::test]
1749 async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
1750 let catalog = new_memory_catalog().await;
1751 let src_namespace_ident = NamespaceIdent::new("n1".into());
1752 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1753 create_namespace(&catalog, &src_namespace_ident).await;
1754 create_table(&catalog, &src_table_ident).await;
1755
1756 let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
1757 let dst_table_ident =
1758 TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
1759 assert_eq!(
1760 catalog
1761 .rename_table(&src_table_ident, &dst_table_ident)
1762 .await
1763 .unwrap_err()
1764 .to_string(),
1765 format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"),
1766 );
1767 }
1768
1769 #[tokio::test]
1770 async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
1771 let catalog = new_memory_catalog().await;
1772 let namespace_ident = NamespaceIdent::new("n1".into());
1773 create_namespace(&catalog, &namespace_ident).await;
1774 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1775 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1776
1777 assert_eq!(
1778 catalog
1779 .rename_table(&src_table_ident, &dst_table_ident)
1780 .await
1781 .unwrap_err()
1782 .to_string(),
1783 format!("TableNotFound => No such table: {src_table_ident:?}"),
1784 );
1785 }
1786
1787 #[tokio::test]
1788 async fn test_rename_table_throws_error_if_dst_table_already_exists() {
1789 let catalog = new_memory_catalog().await;
1790 let namespace_ident = NamespaceIdent::new("n1".into());
1791 create_namespace(&catalog, &namespace_ident).await;
1792 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1793 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1794 create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
1795
1796 assert_eq!(
1797 catalog
1798 .rename_table(&src_table_ident, &dst_table_ident)
1799 .await
1800 .unwrap_err()
1801 .to_string(),
1802 format!(
1803 "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1804 &dst_table_ident
1805 ),
1806 );
1807 }
1808
1809 #[tokio::test]
1810 async fn test_register_table() {
1811 let catalog = new_memory_catalog().await;
1813 let namespace_ident = NamespaceIdent::new("test_namespace".into());
1814 create_namespace(&catalog, &namespace_ident).await;
1815
1816 let source_table_ident = TableIdent::new(namespace_ident.clone(), "source_table".into());
1818 create_table(&catalog, &source_table_ident).await;
1819
1820 let source_table = catalog.load_table(&source_table_ident).await.unwrap();
1822 let metadata_location = source_table.metadata_location().unwrap().to_string();
1823
1824 let register_table_ident =
1826 TableIdent::new(namespace_ident.clone(), "register_table".into());
1827 let registered_table = catalog
1828 .register_table(®ister_table_ident, metadata_location.clone())
1829 .await
1830 .unwrap();
1831
1832 assert_eq!(registered_table.identifier(), ®ister_table_ident);
1834
1835 assert_eq!(
1837 registered_table.metadata_location().unwrap().to_string(),
1838 metadata_location
1839 );
1840
1841 assert!(catalog.table_exists(®ister_table_ident).await.unwrap());
1843
1844 let loaded_table = catalog.load_table(®ister_table_ident).await.unwrap();
1846 assert_eq!(loaded_table.identifier(), ®ister_table_ident);
1847 assert_eq!(
1848 loaded_table.metadata_location().unwrap().to_string(),
1849 metadata_location
1850 );
1851 }
1852
1853 #[tokio::test]
1854 async fn test_update_table() {
1855 let catalog = new_memory_catalog().await;
1856
1857 let table = create_table_with_namespace(&catalog).await;
1858
1859 assert!(!table.metadata().properties().contains_key("key"));
1861
1862 let tx = Transaction::new(&table);
1864 let updated_table = tx
1865 .update_table_properties()
1866 .set("key".to_string(), "value".to_string())
1867 .apply(tx)
1868 .unwrap()
1869 .commit(&catalog)
1870 .await
1871 .unwrap();
1872
1873 assert_eq!(
1874 updated_table.metadata().properties().get("key").unwrap(),
1875 "value"
1876 );
1877
1878 assert_eq!(table.identifier(), updated_table.identifier());
1879 assert_eq!(table.metadata().uuid(), updated_table.metadata().uuid());
1880 assert!(table.metadata().last_updated_ms() < updated_table.metadata().last_updated_ms());
1881 assert_ne!(table.metadata_location(), updated_table.metadata_location());
1882
1883 assert!(
1884 table.metadata().metadata_log().len() < updated_table.metadata().metadata_log().len()
1885 );
1886 }
1887
1888 #[tokio::test]
1889 async fn test_update_table_fails_if_table_doesnt_exist() {
1890 let catalog = new_memory_catalog().await;
1891
1892 let namespace_ident = NamespaceIdent::new("a".into());
1893 create_namespace(&catalog, &namespace_ident).await;
1894
1895 let table_ident = TableIdent::new(namespace_ident, "test".to_string());
1897 let table = build_table(table_ident);
1898
1899 let tx = Transaction::new(&table);
1900 let err = tx
1901 .update_table_properties()
1902 .set("key".to_string(), "value".to_string())
1903 .apply(tx)
1904 .unwrap()
1905 .commit(&catalog)
1906 .await
1907 .unwrap_err();
1908 assert_eq!(err.kind(), ErrorKind::TableNotFound);
1909 }
1910
1911 fn build_table(ident: TableIdent) -> Table {
1912 let file_io = FileIO::new_with_fs();
1913
1914 let temp_dir = TempDir::new().unwrap();
1915 let location = temp_dir.path().to_str().unwrap().to_string();
1916
1917 let table_creation = TableCreation::builder()
1918 .name(ident.name().to_string())
1919 .schema(simple_table_schema())
1920 .location(location)
1921 .build();
1922 let metadata = TableMetadataBuilder::from_table_creation(table_creation)
1923 .unwrap()
1924 .build()
1925 .unwrap()
1926 .metadata;
1927
1928 Table::builder()
1929 .identifier(ident)
1930 .metadata(metadata)
1931 .file_io(file_io)
1932 .build()
1933 .unwrap()
1934 }
1935}