1use std::collections::HashMap;
21use std::str::FromStr;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use futures::lock::{Mutex, MutexGuard};
26use itertools::Itertools;
27
28use super::namespace_state::NamespaceState;
29use crate::io::{FileIO, FileIOBuilder, MemoryStorageFactory, StorageFactory};
30use crate::spec::{TableMetadata, TableMetadataBuilder};
31use crate::table::Table;
32use crate::{
33 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
34 TableCommit, TableCreation, TableIdent,
35};
36
37pub const MEMORY_CATALOG_WAREHOUSE: &str = "warehouse";
39
40const LOCATION: &str = "location";
42
43#[derive(Debug)]
45pub struct MemoryCatalogBuilder {
46 config: MemoryCatalogConfig,
47 storage_factory: Option<Arc<dyn StorageFactory>>,
48}
49
50impl Default for MemoryCatalogBuilder {
51 fn default() -> Self {
52 Self {
53 config: MemoryCatalogConfig {
54 name: None,
55 warehouse: "".to_string(),
56 props: HashMap::new(),
57 },
58 storage_factory: None,
59 }
60 }
61}
62
63impl CatalogBuilder for MemoryCatalogBuilder {
64 type C = MemoryCatalog;
65
66 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
67 self.storage_factory = Some(storage_factory);
68 self
69 }
70
71 fn load(
72 mut self,
73 name: impl Into<String>,
74 props: HashMap<String, String>,
75 ) -> impl Future<Output = Result<Self::C>> + Send {
76 self.config.name = Some(name.into());
77
78 if props.contains_key(MEMORY_CATALOG_WAREHOUSE) {
79 self.config.warehouse = props
80 .get(MEMORY_CATALOG_WAREHOUSE)
81 .cloned()
82 .unwrap_or_default()
83 }
84
85 self.config.props = props
87 .into_iter()
88 .filter(|(k, _)| k != MEMORY_CATALOG_WAREHOUSE)
89 .collect();
90
91 let result = {
92 if self.config.name.is_none() {
93 Err(Error::new(
94 ErrorKind::DataInvalid,
95 "Catalog name is required",
96 ))
97 } else if self.config.warehouse.is_empty() {
98 Err(Error::new(
99 ErrorKind::DataInvalid,
100 "Catalog warehouse is required",
101 ))
102 } else {
103 MemoryCatalog::new(self.config, self.storage_factory)
104 }
105 };
106
107 std::future::ready(result)
108 }
109}
110
111#[derive(Clone, Debug)]
112pub(crate) struct MemoryCatalogConfig {
113 name: Option<String>,
114 warehouse: String,
115 props: HashMap<String, String>,
116}
117
118#[derive(Debug)]
120pub struct MemoryCatalog {
121 root_namespace_state: Mutex<NamespaceState>,
122 file_io: FileIO,
123 warehouse_location: String,
124}
125
126impl MemoryCatalog {
127 fn new(
129 config: MemoryCatalogConfig,
130 storage_factory: Option<Arc<dyn StorageFactory>>,
131 ) -> Result<Self> {
132 let factory = storage_factory.unwrap_or_else(|| Arc::new(MemoryStorageFactory));
134
135 Ok(Self {
136 root_namespace_state: Mutex::new(NamespaceState::default()),
137 file_io: FileIOBuilder::new(factory).with_props(config.props).build(),
138 warehouse_location: config.warehouse,
139 })
140 }
141
142 async fn load_table_from_locked_state(
144 &self,
145 table_ident: &TableIdent,
146 root_namespace_state: &MutexGuard<'_, NamespaceState>,
147 ) -> Result<Table> {
148 let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?;
149 let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
150
151 Table::builder()
152 .identifier(table_ident.clone())
153 .metadata(metadata)
154 .metadata_location(metadata_location.to_string())
155 .file_io(self.file_io.clone())
156 .build()
157 }
158}
159
160#[async_trait]
161impl Catalog for MemoryCatalog {
162 async fn list_namespaces(
164 &self,
165 maybe_parent: Option<&NamespaceIdent>,
166 ) -> Result<Vec<NamespaceIdent>> {
167 let root_namespace_state = self.root_namespace_state.lock().await;
168
169 match maybe_parent {
170 None => {
171 let namespaces = root_namespace_state
172 .list_top_level_namespaces()
173 .into_iter()
174 .map(|str| NamespaceIdent::new(str.to_string()))
175 .collect_vec();
176
177 Ok(namespaces)
178 }
179 Some(parent_namespace_ident) => {
180 let namespaces = root_namespace_state
181 .list_namespaces_under(parent_namespace_ident)?
182 .into_iter()
183 .map(|name| {
184 let mut names = parent_namespace_ident.iter().cloned().collect::<Vec<_>>();
185 names.push(name.to_string());
186 NamespaceIdent::from_vec(names)
187 })
188 .collect::<Result<Vec<_>>>()?;
189
190 Ok(namespaces)
191 }
192 }
193 }
194
195 async fn create_namespace(
197 &self,
198 namespace_ident: &NamespaceIdent,
199 properties: HashMap<String, String>,
200 ) -> Result<Namespace> {
201 let mut root_namespace_state = self.root_namespace_state.lock().await;
202
203 root_namespace_state.insert_new_namespace(namespace_ident, properties.clone())?;
204 let namespace = Namespace::with_properties(namespace_ident.clone(), properties);
205
206 Ok(namespace)
207 }
208
209 async fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<Namespace> {
211 let root_namespace_state = self.root_namespace_state.lock().await;
212
213 let namespace = Namespace::with_properties(
214 namespace_ident.clone(),
215 root_namespace_state
216 .get_properties(namespace_ident)?
217 .clone(),
218 );
219
220 Ok(namespace)
221 }
222
223 async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> Result<bool> {
225 let guarded_namespaces = self.root_namespace_state.lock().await;
226
227 Ok(guarded_namespaces.namespace_exists(namespace_ident))
228 }
229
230 async fn update_namespace(
236 &self,
237 namespace_ident: &NamespaceIdent,
238 properties: HashMap<String, String>,
239 ) -> Result<()> {
240 let mut root_namespace_state = self.root_namespace_state.lock().await;
241
242 root_namespace_state.replace_properties(namespace_ident, properties)
243 }
244
245 async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<()> {
247 let mut root_namespace_state = self.root_namespace_state.lock().await;
248
249 root_namespace_state.remove_existing_namespace(namespace_ident)
250 }
251
252 async fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result<Vec<TableIdent>> {
254 let root_namespace_state = self.root_namespace_state.lock().await;
255
256 let table_names = root_namespace_state.list_tables(namespace_ident)?;
257 let table_idents = table_names
258 .into_iter()
259 .map(|table_name| TableIdent::new(namespace_ident.clone(), table_name.clone()))
260 .collect_vec();
261
262 Ok(table_idents)
263 }
264
265 async fn create_table(
267 &self,
268 namespace_ident: &NamespaceIdent,
269 table_creation: TableCreation,
270 ) -> Result<Table> {
271 let mut root_namespace_state = self.root_namespace_state.lock().await;
272
273 let table_name = table_creation.name.clone();
274 let table_ident = TableIdent::new(namespace_ident.clone(), table_name);
275
276 let (table_creation, location) = match table_creation.location.clone() {
277 Some(location) => (table_creation, location),
278 None => {
279 let namespace_properties = root_namespace_state.get_properties(namespace_ident)?;
280 let location_prefix = match namespace_properties.get(LOCATION) {
281 Some(namespace_location) => namespace_location.clone(),
282 None => format!("{}/{}", self.warehouse_location, namespace_ident.join("/")),
283 };
284
285 let location = format!("{}/{}", location_prefix, table_ident.name());
286
287 let new_table_creation = TableCreation {
288 location: Some(location.clone()),
289 ..table_creation
290 };
291
292 (new_table_creation, location)
293 }
294 };
295
296 let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
297 .build()?
298 .metadata;
299 let metadata_location = MetadataLocation::new_with_metadata(location, &metadata);
300
301 metadata.write_to(&self.file_io, &metadata_location).await?;
302
303 root_namespace_state.insert_new_table(&table_ident, metadata_location.to_string())?;
304
305 Table::builder()
306 .file_io(self.file_io.clone())
307 .metadata_location(metadata_location.to_string())
308 .metadata(metadata)
309 .identifier(table_ident)
310 .build()
311 }
312
313 async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
315 let root_namespace_state = self.root_namespace_state.lock().await;
316
317 self.load_table_from_locked_state(table_ident, &root_namespace_state)
318 .await
319 }
320
321 async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> {
323 let mut root_namespace_state = self.root_namespace_state.lock().await;
324
325 root_namespace_state.remove_existing_table(table_ident)?;
326 Ok(())
327 }
328
329 async fn purge_table(&self, table_ident: &TableIdent) -> Result<()> {
330 let table_info = self.load_table(table_ident).await?;
331 self.drop_table(table_ident).await?;
332 crate::catalog::utils::drop_table_data(
333 table_info.file_io(),
334 table_info.metadata(),
335 table_info.metadata_location(),
336 )
337 .await
338 }
339
340 async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
342 let root_namespace_state = self.root_namespace_state.lock().await;
343
344 root_namespace_state.table_exists(table_ident)
345 }
346
347 async fn rename_table(
349 &self,
350 src_table_ident: &TableIdent,
351 dst_table_ident: &TableIdent,
352 ) -> Result<()> {
353 let mut root_namespace_state = self.root_namespace_state.lock().await;
354
355 let mut new_root_namespace_state = root_namespace_state.clone();
356 let metadata_location = new_root_namespace_state
357 .get_existing_table_location(src_table_ident)?
358 .clone();
359 new_root_namespace_state.remove_existing_table(src_table_ident)?;
360 new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?;
361 *root_namespace_state = new_root_namespace_state;
362
363 Ok(())
364 }
365
366 async fn register_table(
367 &self,
368 table_ident: &TableIdent,
369 metadata_location: String,
370 ) -> Result<Table> {
371 let mut root_namespace_state = self.root_namespace_state.lock().await;
372 root_namespace_state.insert_new_table(&table_ident.clone(), metadata_location.clone())?;
373
374 let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
375
376 Table::builder()
377 .file_io(self.file_io.clone())
378 .metadata_location(metadata_location)
379 .metadata(metadata)
380 .identifier(table_ident.clone())
381 .build()
382 }
383
384 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
386 let mut root_namespace_state = self.root_namespace_state.lock().await;
387
388 let current_table = self
389 .load_table_from_locked_state(commit.identifier(), &root_namespace_state)
390 .await?;
391
392 let staged_table = commit.apply(current_table)?;
394
395 let metadata_location =
397 MetadataLocation::from_str(staged_table.metadata_location_result()?)?;
398 staged_table
399 .metadata()
400 .write_to(staged_table.file_io(), &metadata_location)
401 .await?;
402
403 let updated_table = root_namespace_state.commit_table_update(staged_table)?;
405
406 Ok(updated_table)
407 }
408}
409
410#[cfg(test)]
411pub(crate) mod tests {
412 use std::collections::HashSet;
413 use std::hash::Hash;
414 use std::iter::FromIterator;
415 use std::vec;
416
417 use regex::Regex;
418 use tempfile::TempDir;
419
420 use super::*;
421 use crate::io::FileIO;
422 use crate::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
423 use crate::transaction::{ApplyTransactionAction, Transaction};
424
425 fn temp_path() -> String {
426 let temp_dir = TempDir::new().unwrap();
427 temp_dir.path().to_str().unwrap().to_string()
428 }
429
430 pub(crate) async fn new_memory_catalog() -> impl Catalog {
431 let warehouse_location = temp_path();
432 MemoryCatalogBuilder::default()
433 .load(
434 "memory",
435 HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_location)]),
436 )
437 .await
438 .unwrap()
439 }
440
441 async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
442 let _ = catalog
443 .create_namespace(namespace_ident, HashMap::new())
444 .await
445 .unwrap();
446 }
447
448 async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
449 for namespace_ident in namespace_idents {
450 let _ = create_namespace(catalog, namespace_ident).await;
451 }
452 }
453
454 fn to_set<T: Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
455 HashSet::from_iter(vec)
456 }
457
458 fn simple_table_schema() -> Schema {
459 Schema::builder()
460 .with_fields(vec![
461 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
462 ])
463 .build()
464 .unwrap()
465 }
466
467 async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) -> Table {
468 catalog
469 .create_table(
470 &table_ident.namespace,
471 TableCreation::builder()
472 .name(table_ident.name().into())
473 .schema(simple_table_schema())
474 .build(),
475 )
476 .await
477 .unwrap()
478 }
479
480 async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
481 for table_ident in table_idents {
482 create_table(catalog, table_ident).await;
483 }
484 }
485
486 async fn create_table_with_namespace<C: Catalog>(catalog: &C) -> Table {
487 let namespace_ident = NamespaceIdent::new("abc".into());
488 create_namespace(catalog, &namespace_ident).await;
489
490 let table_ident = TableIdent::new(namespace_ident, "test".to_string());
491 create_table(catalog, &table_ident).await
492 }
493
494 fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
495 assert_eq!(table.identifier(), expected_table_ident);
496
497 let metadata = table.metadata();
498
499 assert_eq!(metadata.current_schema().as_ref(), expected_schema);
500
501 let expected_partition_spec = PartitionSpec::builder((*expected_schema).clone())
502 .with_spec_id(0)
503 .build()
504 .unwrap();
505
506 assert_eq!(
507 metadata
508 .partition_specs_iter()
509 .map(|p| p.as_ref())
510 .collect_vec(),
511 vec![&expected_partition_spec]
512 );
513
514 let expected_sorted_order = SortOrder::builder()
515 .with_order_id(0)
516 .with_fields(vec![])
517 .build(expected_schema)
518 .unwrap();
519
520 assert_eq!(
521 metadata
522 .sort_orders_iter()
523 .map(|s| s.as_ref())
524 .collect_vec(),
525 vec![&expected_sorted_order]
526 );
527
528 assert_eq!(metadata.properties(), &HashMap::new());
529
530 assert!(!table.readonly());
531 }
532
533 const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
534
535 fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
536 let actual = table.metadata_location().unwrap().to_string();
537 let regex = Regex::new(regex_str).unwrap();
538 assert!(
539 regex.is_match(&actual),
540 "Expected metadata location to match regex, but got location: {actual} and regex: {regex}"
541 )
542 }
543
544 #[tokio::test]
545 async fn test_list_namespaces_returns_empty_vector() {
546 let catalog = new_memory_catalog().await;
547
548 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
549 }
550
551 #[tokio::test]
552 async fn test_list_namespaces_returns_single_namespace() {
553 let catalog = new_memory_catalog().await;
554 let namespace_ident = NamespaceIdent::new("abc".into());
555 create_namespace(&catalog, &namespace_ident).await;
556
557 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
558 namespace_ident
559 ]);
560 }
561
562 #[tokio::test]
563 async fn test_list_namespaces_returns_multiple_namespaces() {
564 let catalog = new_memory_catalog().await;
565 let namespace_ident_1 = NamespaceIdent::new("a".into());
566 let namespace_ident_2 = NamespaceIdent::new("b".into());
567 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
568
569 assert_eq!(
570 to_set(catalog.list_namespaces(None).await.unwrap()),
571 to_set(vec![namespace_ident_1, namespace_ident_2])
572 );
573 }
574
575 #[tokio::test]
576 async fn test_list_namespaces_returns_only_top_level_namespaces() {
577 let catalog = new_memory_catalog().await;
578 let namespace_ident_1 = NamespaceIdent::new("a".into());
579 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
580 let namespace_ident_3 = NamespaceIdent::new("b".into());
581 create_namespaces(&catalog, &vec![
582 &namespace_ident_1,
583 &namespace_ident_2,
584 &namespace_ident_3,
585 ])
586 .await;
587
588 assert_eq!(
589 to_set(catalog.list_namespaces(None).await.unwrap()),
590 to_set(vec![namespace_ident_1, namespace_ident_3])
591 );
592 }
593
594 #[tokio::test]
595 async fn test_list_namespaces_returns_no_namespaces_under_parent() {
596 let catalog = new_memory_catalog().await;
597 let namespace_ident_1 = NamespaceIdent::new("a".into());
598 let namespace_ident_2 = NamespaceIdent::new("b".into());
599 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
600
601 assert_eq!(
602 catalog
603 .list_namespaces(Some(&namespace_ident_1))
604 .await
605 .unwrap(),
606 vec![]
607 );
608 }
609
610 #[tokio::test]
611 async fn test_list_namespaces_returns_namespace_under_parent() {
612 let catalog = new_memory_catalog().await;
613 let namespace_ident_1 = NamespaceIdent::new("a".into());
614 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
615 let namespace_ident_3 = NamespaceIdent::new("c".into());
616 create_namespaces(&catalog, &vec![
617 &namespace_ident_1,
618 &namespace_ident_2,
619 &namespace_ident_3,
620 ])
621 .await;
622
623 assert_eq!(
624 to_set(catalog.list_namespaces(None).await.unwrap()),
625 to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
626 );
627
628 assert_eq!(
629 catalog
630 .list_namespaces(Some(&namespace_ident_1))
631 .await
632 .unwrap(),
633 vec![namespace_ident_2]
634 );
635 }
636
637 #[tokio::test]
638 async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
639 let catalog = new_memory_catalog().await;
640 let namespace_ident_1 = NamespaceIdent::new("a".to_string());
641 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
642 let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
643 let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
644 let namespace_ident_5 = NamespaceIdent::new("b".into());
645 create_namespaces(&catalog, &vec![
646 &namespace_ident_1,
647 &namespace_ident_2,
648 &namespace_ident_3,
649 &namespace_ident_4,
650 &namespace_ident_5,
651 ])
652 .await;
653
654 assert_eq!(
655 to_set(
656 catalog
657 .list_namespaces(Some(&namespace_ident_1))
658 .await
659 .unwrap()
660 ),
661 to_set(vec![
662 namespace_ident_2,
663 namespace_ident_3,
664 namespace_ident_4,
665 ])
666 );
667 }
668
669 #[tokio::test]
670 async fn test_namespace_exists_returns_false() {
671 let catalog = new_memory_catalog().await;
672 let namespace_ident = NamespaceIdent::new("a".into());
673 create_namespace(&catalog, &namespace_ident).await;
674
675 assert!(
676 !catalog
677 .namespace_exists(&NamespaceIdent::new("b".into()))
678 .await
679 .unwrap()
680 );
681 }
682
683 #[tokio::test]
684 async fn test_namespace_exists_returns_true() {
685 let catalog = new_memory_catalog().await;
686 let namespace_ident = NamespaceIdent::new("a".into());
687 create_namespace(&catalog, &namespace_ident).await;
688
689 assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
690 }
691
692 #[tokio::test]
693 async fn test_create_namespace_with_empty_properties() {
694 let catalog = new_memory_catalog().await;
695 let namespace_ident = NamespaceIdent::new("a".into());
696
697 assert_eq!(
698 catalog
699 .create_namespace(&namespace_ident, HashMap::new())
700 .await
701 .unwrap(),
702 Namespace::new(namespace_ident.clone())
703 );
704
705 assert_eq!(
706 catalog.get_namespace(&namespace_ident).await.unwrap(),
707 Namespace::with_properties(namespace_ident, HashMap::new())
708 );
709 }
710
711 #[tokio::test]
712 async fn test_create_namespace_with_properties() {
713 let catalog = new_memory_catalog().await;
714 let namespace_ident = NamespaceIdent::new("abc".into());
715
716 let mut properties: HashMap<String, String> = HashMap::new();
717 properties.insert("k".into(), "v".into());
718
719 assert_eq!(
720 catalog
721 .create_namespace(&namespace_ident, properties.clone())
722 .await
723 .unwrap(),
724 Namespace::with_properties(namespace_ident.clone(), properties.clone())
725 );
726
727 assert_eq!(
728 catalog.get_namespace(&namespace_ident).await.unwrap(),
729 Namespace::with_properties(namespace_ident, properties)
730 );
731 }
732
733 #[tokio::test]
734 async fn test_create_namespace_throws_error_if_namespace_already_exists() {
735 let catalog = new_memory_catalog().await;
736 let namespace_ident = NamespaceIdent::new("a".into());
737 create_namespace(&catalog, &namespace_ident).await;
738
739 assert_eq!(
740 catalog
741 .create_namespace(&namespace_ident, HashMap::new())
742 .await
743 .unwrap_err()
744 .to_string(),
745 format!(
746 "NamespaceAlreadyExists => Cannot create namespace {:?}. Namespace already exists.",
747 &namespace_ident
748 )
749 );
750
751 assert_eq!(
752 catalog.get_namespace(&namespace_ident).await.unwrap(),
753 Namespace::with_properties(namespace_ident, HashMap::new())
754 );
755 }
756
757 #[tokio::test]
758 async fn test_create_nested_namespace() {
759 let catalog = new_memory_catalog().await;
760 let parent_namespace_ident = NamespaceIdent::new("a".into());
761 create_namespace(&catalog, &parent_namespace_ident).await;
762
763 let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
764
765 assert_eq!(
766 catalog
767 .create_namespace(&child_namespace_ident, HashMap::new())
768 .await
769 .unwrap(),
770 Namespace::new(child_namespace_ident.clone())
771 );
772
773 assert_eq!(
774 catalog.get_namespace(&child_namespace_ident).await.unwrap(),
775 Namespace::with_properties(child_namespace_ident, HashMap::new())
776 );
777 }
778
779 #[tokio::test]
780 async fn test_create_deeply_nested_namespace() {
781 let catalog = new_memory_catalog().await;
782 let namespace_ident_a = NamespaceIdent::new("a".into());
783 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
784 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
785
786 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
787
788 assert_eq!(
789 catalog
790 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
791 .await
792 .unwrap(),
793 Namespace::new(namespace_ident_a_b_c.clone())
794 );
795
796 assert_eq!(
797 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
798 Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
799 );
800 }
801
802 #[tokio::test]
803 async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() {
804 let catalog = new_memory_catalog().await;
805
806 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
807
808 assert_eq!(
809 catalog
810 .create_namespace(&nested_namespace_ident, HashMap::new())
811 .await
812 .unwrap_err()
813 .to_string(),
814 format!(
815 "NamespaceNotFound => No such namespace: {:?}",
816 NamespaceIdent::new("a".into())
817 )
818 );
819
820 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
821 }
822
823 #[tokio::test]
824 async fn test_create_deeply_nested_namespace_throws_error_if_intermediate_namespace_doesnt_exist()
825 {
826 let catalog = new_memory_catalog().await;
827
828 let namespace_ident_a = NamespaceIdent::new("a".into());
829 create_namespace(&catalog, &namespace_ident_a).await;
830
831 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
832
833 assert_eq!(
834 catalog
835 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
836 .await
837 .unwrap_err()
838 .to_string(),
839 format!(
840 "NamespaceNotFound => No such namespace: {:?}",
841 NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()
842 )
843 );
844
845 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
846 namespace_ident_a.clone()
847 ]);
848
849 assert_eq!(
850 catalog
851 .list_namespaces(Some(&namespace_ident_a))
852 .await
853 .unwrap(),
854 vec![]
855 );
856 }
857
858 #[tokio::test]
859 async fn test_get_namespace() {
860 let catalog = new_memory_catalog().await;
861 let namespace_ident = NamespaceIdent::new("abc".into());
862
863 let mut properties: HashMap<String, String> = HashMap::new();
864 properties.insert("k".into(), "v".into());
865 let _ = catalog
866 .create_namespace(&namespace_ident, properties.clone())
867 .await
868 .unwrap();
869
870 assert_eq!(
871 catalog.get_namespace(&namespace_ident).await.unwrap(),
872 Namespace::with_properties(namespace_ident, properties)
873 )
874 }
875
876 #[tokio::test]
877 async fn test_get_nested_namespace() {
878 let catalog = new_memory_catalog().await;
879 let namespace_ident_a = NamespaceIdent::new("a".into());
880 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
881 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
882
883 assert_eq!(
884 catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
885 Namespace::with_properties(namespace_ident_a_b, HashMap::new())
886 );
887 }
888
889 #[tokio::test]
890 async fn test_get_deeply_nested_namespace() {
891 let catalog = new_memory_catalog().await;
892 let namespace_ident_a = NamespaceIdent::new("a".into());
893 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
894 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
895 create_namespaces(&catalog, &vec![
896 &namespace_ident_a,
897 &namespace_ident_a_b,
898 &namespace_ident_a_b_c,
899 ])
900 .await;
901
902 assert_eq!(
903 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
904 Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
905 );
906 }
907
908 #[tokio::test]
909 async fn test_get_namespace_throws_error_if_namespace_doesnt_exist() {
910 let catalog = new_memory_catalog().await;
911 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
912
913 let non_existent_namespace_ident = NamespaceIdent::new("b".into());
914 assert_eq!(
915 catalog
916 .get_namespace(&non_existent_namespace_ident)
917 .await
918 .unwrap_err()
919 .to_string(),
920 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
921 )
922 }
923
924 #[tokio::test]
925 async fn test_update_namespace() {
926 let catalog = new_memory_catalog().await;
927 let namespace_ident = NamespaceIdent::new("abc".into());
928 create_namespace(&catalog, &namespace_ident).await;
929
930 let mut new_properties: HashMap<String, String> = HashMap::new();
931 new_properties.insert("k".into(), "v".into());
932
933 catalog
934 .update_namespace(&namespace_ident, new_properties.clone())
935 .await
936 .unwrap();
937
938 assert_eq!(
939 catalog.get_namespace(&namespace_ident).await.unwrap(),
940 Namespace::with_properties(namespace_ident, new_properties)
941 )
942 }
943
944 #[tokio::test]
945 async fn test_update_nested_namespace() {
946 let catalog = new_memory_catalog().await;
947 let namespace_ident_a = NamespaceIdent::new("a".into());
948 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
949 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
950
951 let mut new_properties = HashMap::new();
952 new_properties.insert("k".into(), "v".into());
953
954 catalog
955 .update_namespace(&namespace_ident_a_b, new_properties.clone())
956 .await
957 .unwrap();
958
959 assert_eq!(
960 catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
961 Namespace::with_properties(namespace_ident_a_b, new_properties)
962 );
963 }
964
965 #[tokio::test]
966 async fn test_update_deeply_nested_namespace() {
967 let catalog = new_memory_catalog().await;
968 let namespace_ident_a = NamespaceIdent::new("a".into());
969 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
970 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
971 create_namespaces(&catalog, &vec![
972 &namespace_ident_a,
973 &namespace_ident_a_b,
974 &namespace_ident_a_b_c,
975 ])
976 .await;
977
978 let mut new_properties = HashMap::new();
979 new_properties.insert("k".into(), "v".into());
980
981 catalog
982 .update_namespace(&namespace_ident_a_b_c, new_properties.clone())
983 .await
984 .unwrap();
985
986 assert_eq!(
987 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
988 Namespace::with_properties(namespace_ident_a_b_c, new_properties)
989 );
990 }
991
992 #[tokio::test]
993 async fn test_update_namespace_throws_error_if_namespace_doesnt_exist() {
994 let catalog = new_memory_catalog().await;
995 create_namespace(&catalog, &NamespaceIdent::new("abc".into())).await;
996
997 let non_existent_namespace_ident = NamespaceIdent::new("def".into());
998 assert_eq!(
999 catalog
1000 .update_namespace(&non_existent_namespace_ident, HashMap::new())
1001 .await
1002 .unwrap_err()
1003 .to_string(),
1004 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1005 )
1006 }
1007
1008 #[tokio::test]
1009 async fn test_drop_namespace() {
1010 let catalog = new_memory_catalog().await;
1011 let namespace_ident = NamespaceIdent::new("abc".into());
1012 create_namespace(&catalog, &namespace_ident).await;
1013
1014 catalog.drop_namespace(&namespace_ident).await.unwrap();
1015
1016 assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
1017 }
1018
1019 #[tokio::test]
1020 async fn test_drop_nested_namespace() {
1021 let catalog = new_memory_catalog().await;
1022 let namespace_ident_a = NamespaceIdent::new("a".into());
1023 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1024 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1025
1026 catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1027
1028 assert!(
1029 !catalog
1030 .namespace_exists(&namespace_ident_a_b)
1031 .await
1032 .unwrap()
1033 );
1034
1035 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1036 }
1037
1038 #[tokio::test]
1039 async fn test_drop_deeply_nested_namespace() {
1040 let catalog = new_memory_catalog().await;
1041 let namespace_ident_a = NamespaceIdent::new("a".into());
1042 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1043 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1044 create_namespaces(&catalog, &vec![
1045 &namespace_ident_a,
1046 &namespace_ident_a_b,
1047 &namespace_ident_a_b_c,
1048 ])
1049 .await;
1050
1051 catalog
1052 .drop_namespace(&namespace_ident_a_b_c)
1053 .await
1054 .unwrap();
1055
1056 assert!(
1057 !catalog
1058 .namespace_exists(&namespace_ident_a_b_c)
1059 .await
1060 .unwrap()
1061 );
1062
1063 assert!(
1064 catalog
1065 .namespace_exists(&namespace_ident_a_b)
1066 .await
1067 .unwrap()
1068 );
1069
1070 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1071 }
1072
1073 #[tokio::test]
1074 async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1075 let catalog = new_memory_catalog().await;
1076
1077 let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1078 assert_eq!(
1079 catalog
1080 .drop_namespace(&non_existent_namespace_ident)
1081 .await
1082 .unwrap_err()
1083 .to_string(),
1084 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1085 )
1086 }
1087
1088 #[tokio::test]
1089 async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1090 let catalog = new_memory_catalog().await;
1091 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1092
1093 let non_existent_namespace_ident =
1094 NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1095 assert_eq!(
1096 catalog
1097 .drop_namespace(&non_existent_namespace_ident)
1098 .await
1099 .unwrap_err()
1100 .to_string(),
1101 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1102 )
1103 }
1104
1105 #[tokio::test]
1106 async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() {
1107 let catalog = new_memory_catalog().await;
1108 let namespace_ident_a = NamespaceIdent::new("a".into());
1109 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1110 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1111
1112 catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1113
1114 assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1115
1116 assert!(
1117 !catalog
1118 .namespace_exists(&namespace_ident_a_b)
1119 .await
1120 .unwrap()
1121 );
1122 }
1123
1124 #[tokio::test]
1125 async fn test_create_table_with_location() {
1126 let tmp_dir = TempDir::new().unwrap();
1127 let catalog = new_memory_catalog().await;
1128 let namespace_ident = NamespaceIdent::new("a".into());
1129 create_namespace(&catalog, &namespace_ident).await;
1130
1131 let table_name = "abc";
1132 let location = tmp_dir.path().to_str().unwrap().to_string();
1133 let table_creation = TableCreation::builder()
1134 .name(table_name.into())
1135 .location(location.clone())
1136 .schema(simple_table_schema())
1137 .build();
1138
1139 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1140
1141 assert_table_eq(
1142 &catalog
1143 .create_table(&namespace_ident, table_creation)
1144 .await
1145 .unwrap(),
1146 &expected_table_ident,
1147 &simple_table_schema(),
1148 );
1149
1150 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1151
1152 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1153
1154 assert!(
1155 table
1156 .metadata_location()
1157 .unwrap()
1158 .to_string()
1159 .starts_with(&location)
1160 )
1161 }
1162
1163 #[tokio::test]
1164 async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1165 let warehouse_location = temp_path();
1166 let catalog = MemoryCatalogBuilder::default()
1167 .load(
1168 "memory",
1169 HashMap::from([(
1170 MEMORY_CATALOG_WAREHOUSE.to_string(),
1171 warehouse_location.clone(),
1172 )]),
1173 )
1174 .await
1175 .unwrap();
1176
1177 let namespace_ident = NamespaceIdent::new("a".into());
1178 let mut namespace_properties = HashMap::new();
1179 let namespace_location = temp_path();
1180 namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1181 catalog
1182 .create_namespace(&namespace_ident, namespace_properties)
1183 .await
1184 .unwrap();
1185
1186 let table_name = "tbl1";
1187 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1188 let expected_table_metadata_location_regex =
1189 format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1190
1191 let table = catalog
1192 .create_table(
1193 &namespace_ident,
1194 TableCreation::builder()
1195 .name(table_name.into())
1196 .schema(simple_table_schema())
1197 .build(),
1199 )
1200 .await
1201 .unwrap();
1202 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1203 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1204
1205 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1206 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1207 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1208 }
1209
1210 #[tokio::test]
1211 async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1212 {
1213 let warehouse_location = temp_path();
1214 let catalog = MemoryCatalogBuilder::default()
1215 .load(
1216 "memory",
1217 HashMap::from([(
1218 MEMORY_CATALOG_WAREHOUSE.to_string(),
1219 warehouse_location.clone(),
1220 )]),
1221 )
1222 .await
1223 .unwrap();
1224
1225 let namespace_ident = NamespaceIdent::new("a".into());
1226 let mut namespace_properties = HashMap::new();
1227 let namespace_location = temp_path();
1228 namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1229 catalog
1230 .create_namespace(&namespace_ident, namespace_properties)
1231 .await
1232 .unwrap();
1233
1234 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1235 let mut nested_namespace_properties = HashMap::new();
1236 let nested_namespace_location = temp_path();
1237 nested_namespace_properties
1238 .insert(LOCATION.to_string(), nested_namespace_location.to_string());
1239 catalog
1240 .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1241 .await
1242 .unwrap();
1243
1244 let table_name = "tbl1";
1245 let expected_table_ident =
1246 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1247 let expected_table_metadata_location_regex = format!(
1248 "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1249 );
1250
1251 let table = catalog
1252 .create_table(
1253 &nested_namespace_ident,
1254 TableCreation::builder()
1255 .name(table_name.into())
1256 .schema(simple_table_schema())
1257 .build(),
1259 )
1260 .await
1261 .unwrap();
1262 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1263 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1264
1265 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1266 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1267 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1268 }
1269
1270 #[tokio::test]
1271 async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1272 {
1273 let warehouse_location = temp_path();
1274 let catalog = MemoryCatalogBuilder::default()
1275 .load(
1276 "memory",
1277 HashMap::from([(
1278 MEMORY_CATALOG_WAREHOUSE.to_string(),
1279 warehouse_location.clone(),
1280 )]),
1281 )
1282 .await
1283 .unwrap();
1284
1285 let namespace_ident = NamespaceIdent::new("a".into());
1286 let namespace_properties = HashMap::new();
1288 catalog
1289 .create_namespace(&namespace_ident, namespace_properties)
1290 .await
1291 .unwrap();
1292
1293 let table_name = "tbl1";
1294 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1295 let expected_table_metadata_location_regex =
1296 format!("^{warehouse_location}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1297
1298 let table = catalog
1299 .create_table(
1300 &namespace_ident,
1301 TableCreation::builder()
1302 .name(table_name.into())
1303 .schema(simple_table_schema())
1304 .build(),
1306 )
1307 .await
1308 .unwrap();
1309 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1310 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1311
1312 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1313 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1314 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1315 }
1316
1317 #[tokio::test]
1318 async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1319 {
1320 let warehouse_location = temp_path();
1321 let catalog = MemoryCatalogBuilder::default()
1322 .load(
1323 "memory",
1324 HashMap::from([(
1325 MEMORY_CATALOG_WAREHOUSE.to_string(),
1326 warehouse_location.clone(),
1327 )]),
1328 )
1329 .await
1330 .unwrap();
1331
1332 let namespace_ident = NamespaceIdent::new("a".into());
1333 catalog
1334 .create_namespace(&namespace_ident, HashMap::new())
1336 .await
1337 .unwrap();
1338
1339 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1340 catalog
1341 .create_namespace(&nested_namespace_ident, HashMap::new())
1343 .await
1344 .unwrap();
1345
1346 let table_name = "tbl1";
1347 let expected_table_ident =
1348 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1349 let expected_table_metadata_location_regex = format!(
1350 "^{warehouse_location}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$"
1351 );
1352
1353 let table = catalog
1354 .create_table(
1355 &nested_namespace_ident,
1356 TableCreation::builder()
1357 .name(table_name.into())
1358 .schema(simple_table_schema())
1359 .build(),
1361 )
1362 .await
1363 .unwrap();
1364 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1365 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1366
1367 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1368 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1369 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1370 }
1371
1372 #[tokio::test]
1373 async fn test_create_table_throws_error_if_table_location_and_namespace_location_and_warehouse_location_are_missing()
1374 {
1375 let catalog = MemoryCatalogBuilder::default()
1376 .load("memory", HashMap::from([]))
1377 .await;
1378
1379 assert!(catalog.is_err());
1380 assert_eq!(
1381 catalog.unwrap_err().to_string(),
1382 "DataInvalid => Catalog warehouse is required"
1383 );
1384 }
1385
1386 #[tokio::test]
1387 async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1388 let catalog = new_memory_catalog().await;
1389 let namespace_ident = NamespaceIdent::new("a".into());
1390 create_namespace(&catalog, &namespace_ident).await;
1391 let table_name = "tbl1";
1392 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1393 create_table(&catalog, &table_ident).await;
1394
1395 let tmp_dir = TempDir::new().unwrap();
1396 let location = tmp_dir.path().to_str().unwrap().to_string();
1397
1398 assert_eq!(
1399 catalog
1400 .create_table(
1401 &namespace_ident,
1402 TableCreation::builder()
1403 .name(table_name.into())
1404 .schema(simple_table_schema())
1405 .location(location)
1406 .build()
1407 )
1408 .await
1409 .unwrap_err()
1410 .to_string(),
1411 format!(
1412 "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1413 &table_ident
1414 )
1415 );
1416 }
1417
1418 #[tokio::test]
1419 async fn test_list_tables_returns_empty_vector() {
1420 let catalog = new_memory_catalog().await;
1421 let namespace_ident = NamespaceIdent::new("a".into());
1422 create_namespace(&catalog, &namespace_ident).await;
1423
1424 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1425 }
1426
1427 #[tokio::test]
1428 async fn test_list_tables_returns_a_single_table() {
1429 let catalog = new_memory_catalog().await;
1430 let namespace_ident = NamespaceIdent::new("n1".into());
1431 create_namespace(&catalog, &namespace_ident).await;
1432
1433 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1434 create_table(&catalog, &table_ident).await;
1435
1436 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1437 table_ident
1438 ]);
1439 }
1440
1441 #[tokio::test]
1442 async fn test_list_tables_returns_multiple_tables() {
1443 let catalog = new_memory_catalog().await;
1444 let namespace_ident = NamespaceIdent::new("n1".into());
1445 create_namespace(&catalog, &namespace_ident).await;
1446
1447 let table_ident_1 = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1448 let table_ident_2 = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1449 let _ = create_tables(&catalog, vec![&table_ident_1, &table_ident_2]).await;
1450
1451 assert_eq!(
1452 to_set(catalog.list_tables(&namespace_ident).await.unwrap()),
1453 to_set(vec![table_ident_1, table_ident_2])
1454 );
1455 }
1456
1457 #[tokio::test]
1458 async fn test_list_tables_returns_tables_from_correct_namespace() {
1459 let catalog = new_memory_catalog().await;
1460 let namespace_ident_1 = NamespaceIdent::new("n1".into());
1461 let namespace_ident_2 = NamespaceIdent::new("n2".into());
1462 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1463
1464 let table_ident_1 = TableIdent::new(namespace_ident_1.clone(), "tbl1".into());
1465 let table_ident_2 = TableIdent::new(namespace_ident_1.clone(), "tbl2".into());
1466 let table_ident_3 = TableIdent::new(namespace_ident_2.clone(), "tbl1".into());
1467 let _ = create_tables(&catalog, vec![
1468 &table_ident_1,
1469 &table_ident_2,
1470 &table_ident_3,
1471 ])
1472 .await;
1473
1474 assert_eq!(
1475 to_set(catalog.list_tables(&namespace_ident_1).await.unwrap()),
1476 to_set(vec![table_ident_1, table_ident_2])
1477 );
1478
1479 assert_eq!(
1480 to_set(catalog.list_tables(&namespace_ident_2).await.unwrap()),
1481 to_set(vec![table_ident_3])
1482 );
1483 }
1484
1485 #[tokio::test]
1486 async fn test_list_tables_returns_table_under_nested_namespace() {
1487 let catalog = new_memory_catalog().await;
1488 let namespace_ident_a = NamespaceIdent::new("a".into());
1489 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1490 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1491
1492 let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1493 create_table(&catalog, &table_ident).await;
1494
1495 assert_eq!(
1496 catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1497 vec![table_ident]
1498 );
1499 }
1500
1501 #[tokio::test]
1502 async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1503 let catalog = new_memory_catalog().await;
1504
1505 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1506
1507 assert_eq!(
1508 catalog
1509 .list_tables(&non_existent_namespace_ident)
1510 .await
1511 .unwrap_err()
1512 .to_string(),
1513 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1514 );
1515 }
1516
1517 #[tokio::test]
1518 async fn test_drop_table() {
1519 let catalog = new_memory_catalog().await;
1520 let namespace_ident = NamespaceIdent::new("n1".into());
1521 create_namespace(&catalog, &namespace_ident).await;
1522 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1523 create_table(&catalog, &table_ident).await;
1524
1525 catalog.drop_table(&table_ident).await.unwrap();
1526 }
1527
1528 #[tokio::test]
1529 async fn test_drop_table_drops_table_under_nested_namespace() {
1530 let catalog = new_memory_catalog().await;
1531 let namespace_ident_a = NamespaceIdent::new("a".into());
1532 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1533 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1534
1535 let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1536 create_table(&catalog, &table_ident).await;
1537
1538 catalog.drop_table(&table_ident).await.unwrap();
1539
1540 assert_eq!(
1541 catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1542 vec![]
1543 );
1544 }
1545
1546 #[tokio::test]
1547 async fn test_drop_table_throws_error_if_namespace_doesnt_exist() {
1548 let catalog = new_memory_catalog().await;
1549
1550 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1551 let non_existent_table_ident =
1552 TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1553
1554 assert_eq!(
1555 catalog
1556 .drop_table(&non_existent_table_ident)
1557 .await
1558 .unwrap_err()
1559 .to_string(),
1560 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1561 );
1562 }
1563
1564 #[tokio::test]
1565 async fn test_drop_table_throws_error_if_table_doesnt_exist() {
1566 let catalog = new_memory_catalog().await;
1567 let namespace_ident = NamespaceIdent::new("n1".into());
1568 create_namespace(&catalog, &namespace_ident).await;
1569
1570 let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1571
1572 assert_eq!(
1573 catalog
1574 .drop_table(&non_existent_table_ident)
1575 .await
1576 .unwrap_err()
1577 .to_string(),
1578 format!("TableNotFound => No such table: {non_existent_table_ident:?}"),
1579 );
1580 }
1581
1582 #[tokio::test]
1583 async fn test_table_exists_returns_true() {
1584 let catalog = new_memory_catalog().await;
1585 let namespace_ident = NamespaceIdent::new("n1".into());
1586 create_namespace(&catalog, &namespace_ident).await;
1587 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1588 create_table(&catalog, &table_ident).await;
1589
1590 assert!(catalog.table_exists(&table_ident).await.unwrap());
1591 }
1592
1593 #[tokio::test]
1594 async fn test_table_exists_returns_false() {
1595 let catalog = new_memory_catalog().await;
1596 let namespace_ident = NamespaceIdent::new("n1".into());
1597 create_namespace(&catalog, &namespace_ident).await;
1598 let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1599
1600 assert!(
1601 !catalog
1602 .table_exists(&non_existent_table_ident)
1603 .await
1604 .unwrap()
1605 );
1606 }
1607
1608 #[tokio::test]
1609 async fn test_table_exists_under_nested_namespace() {
1610 let catalog = new_memory_catalog().await;
1611 let namespace_ident_a = NamespaceIdent::new("a".into());
1612 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1613 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1614
1615 let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1616 create_table(&catalog, &table_ident).await;
1617
1618 assert!(catalog.table_exists(&table_ident).await.unwrap());
1619
1620 let non_existent_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl2".into());
1621 assert!(
1622 !catalog
1623 .table_exists(&non_existent_table_ident)
1624 .await
1625 .unwrap()
1626 );
1627 }
1628
1629 #[tokio::test]
1630 async fn test_table_exists_throws_error_if_namespace_doesnt_exist() {
1631 let catalog = new_memory_catalog().await;
1632
1633 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1634 let non_existent_table_ident =
1635 TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1636
1637 assert_eq!(
1638 catalog
1639 .table_exists(&non_existent_table_ident)
1640 .await
1641 .unwrap_err()
1642 .to_string(),
1643 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1644 );
1645 }
1646
1647 #[tokio::test]
1648 async fn test_rename_table_in_same_namespace() {
1649 let catalog = new_memory_catalog().await;
1650 let namespace_ident = NamespaceIdent::new("n1".into());
1651 create_namespace(&catalog, &namespace_ident).await;
1652 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1653 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1654 create_table(&catalog, &src_table_ident).await;
1655
1656 catalog
1657 .rename_table(&src_table_ident, &dst_table_ident)
1658 .await
1659 .unwrap();
1660
1661 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1662 dst_table_ident
1663 ],);
1664 }
1665
1666 #[tokio::test]
1667 async fn test_rename_table_across_namespaces() {
1668 let catalog = new_memory_catalog().await;
1669 let src_namespace_ident = NamespaceIdent::new("a".into());
1670 let dst_namespace_ident = NamespaceIdent::new("b".into());
1671 create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
1672 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1673 let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
1674 create_table(&catalog, &src_table_ident).await;
1675
1676 catalog
1677 .rename_table(&src_table_ident, &dst_table_ident)
1678 .await
1679 .unwrap();
1680
1681 assert_eq!(
1682 catalog.list_tables(&src_namespace_ident).await.unwrap(),
1683 vec![],
1684 );
1685
1686 assert_eq!(
1687 catalog.list_tables(&dst_namespace_ident).await.unwrap(),
1688 vec![dst_table_ident],
1689 );
1690 }
1691
1692 #[tokio::test]
1693 async fn test_rename_table_src_table_is_same_as_dst_table() {
1694 let catalog = new_memory_catalog().await;
1695 let namespace_ident = NamespaceIdent::new("n1".into());
1696 create_namespace(&catalog, &namespace_ident).await;
1697 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1698 create_table(&catalog, &table_ident).await;
1699
1700 catalog
1701 .rename_table(&table_ident, &table_ident)
1702 .await
1703 .unwrap();
1704
1705 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1706 table_ident
1707 ],);
1708 }
1709
1710 #[tokio::test]
1711 async fn test_rename_table_across_nested_namespaces() {
1712 let catalog = new_memory_catalog().await;
1713 let namespace_ident_a = NamespaceIdent::new("a".into());
1714 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1715 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1716 create_namespaces(&catalog, &vec![
1717 &namespace_ident_a,
1718 &namespace_ident_a_b,
1719 &namespace_ident_a_b_c,
1720 ])
1721 .await;
1722
1723 let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
1724 create_tables(&catalog, vec![&src_table_ident]).await;
1725
1726 let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1727 catalog
1728 .rename_table(&src_table_ident, &dst_table_ident)
1729 .await
1730 .unwrap();
1731
1732 assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
1733
1734 assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
1735 }
1736
1737 #[tokio::test]
1738 async fn test_rename_table_throws_error_if_src_namespace_doesnt_exist() {
1739 let catalog = new_memory_catalog().await;
1740
1741 let non_existent_src_namespace_ident = NamespaceIdent::new("n1".into());
1742 let src_table_ident =
1743 TableIdent::new(non_existent_src_namespace_ident.clone(), "tbl1".into());
1744
1745 let dst_namespace_ident = NamespaceIdent::new("n2".into());
1746 create_namespace(&catalog, &dst_namespace_ident).await;
1747 let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl1".into());
1748
1749 assert_eq!(
1750 catalog
1751 .rename_table(&src_table_ident, &dst_table_ident)
1752 .await
1753 .unwrap_err()
1754 .to_string(),
1755 format!("NamespaceNotFound => No such namespace: {non_existent_src_namespace_ident:?}"),
1756 );
1757 }
1758
1759 #[tokio::test]
1760 async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
1761 let catalog = new_memory_catalog().await;
1762 let src_namespace_ident = NamespaceIdent::new("n1".into());
1763 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1764 create_namespace(&catalog, &src_namespace_ident).await;
1765 create_table(&catalog, &src_table_ident).await;
1766
1767 let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
1768 let dst_table_ident =
1769 TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
1770 assert_eq!(
1771 catalog
1772 .rename_table(&src_table_ident, &dst_table_ident)
1773 .await
1774 .unwrap_err()
1775 .to_string(),
1776 format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"),
1777 );
1778 }
1779
1780 #[tokio::test]
1781 async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
1782 let catalog = new_memory_catalog().await;
1783 let namespace_ident = NamespaceIdent::new("n1".into());
1784 create_namespace(&catalog, &namespace_ident).await;
1785 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1786 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1787
1788 assert_eq!(
1789 catalog
1790 .rename_table(&src_table_ident, &dst_table_ident)
1791 .await
1792 .unwrap_err()
1793 .to_string(),
1794 format!("TableNotFound => No such table: {src_table_ident:?}"),
1795 );
1796 }
1797
1798 #[tokio::test]
1799 async fn test_rename_table_throws_error_if_dst_table_already_exists() {
1800 let catalog = new_memory_catalog().await;
1801 let namespace_ident = NamespaceIdent::new("n1".into());
1802 create_namespace(&catalog, &namespace_ident).await;
1803 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1804 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1805 create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
1806
1807 assert_eq!(
1808 catalog
1809 .rename_table(&src_table_ident, &dst_table_ident)
1810 .await
1811 .unwrap_err()
1812 .to_string(),
1813 format!(
1814 "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1815 &dst_table_ident
1816 ),
1817 );
1818 }
1819
1820 #[tokio::test]
1821 async fn test_register_table() {
1822 let catalog = new_memory_catalog().await;
1824 let namespace_ident = NamespaceIdent::new("test_namespace".into());
1825 create_namespace(&catalog, &namespace_ident).await;
1826
1827 let source_table_ident = TableIdent::new(namespace_ident.clone(), "source_table".into());
1829 create_table(&catalog, &source_table_ident).await;
1830
1831 let source_table = catalog.load_table(&source_table_ident).await.unwrap();
1833 let metadata_location = source_table.metadata_location().unwrap().to_string();
1834
1835 let register_table_ident =
1837 TableIdent::new(namespace_ident.clone(), "register_table".into());
1838 let registered_table = catalog
1839 .register_table(®ister_table_ident, metadata_location.clone())
1840 .await
1841 .unwrap();
1842
1843 assert_eq!(registered_table.identifier(), ®ister_table_ident);
1845
1846 assert_eq!(
1848 registered_table.metadata_location().unwrap().to_string(),
1849 metadata_location
1850 );
1851
1852 assert!(catalog.table_exists(®ister_table_ident).await.unwrap());
1854
1855 let loaded_table = catalog.load_table(®ister_table_ident).await.unwrap();
1857 assert_eq!(loaded_table.identifier(), ®ister_table_ident);
1858 assert_eq!(
1859 loaded_table.metadata_location().unwrap().to_string(),
1860 metadata_location
1861 );
1862 }
1863
1864 #[tokio::test]
1865 async fn test_update_table() {
1866 let catalog = new_memory_catalog().await;
1867
1868 let table = create_table_with_namespace(&catalog).await;
1869
1870 assert!(!table.metadata().properties().contains_key("key"));
1872
1873 let tx = Transaction::new(&table);
1875 let updated_table = tx
1876 .update_table_properties()
1877 .set("key".to_string(), "value".to_string())
1878 .apply(tx)
1879 .unwrap()
1880 .commit(&catalog)
1881 .await
1882 .unwrap();
1883
1884 assert_eq!(
1885 updated_table.metadata().properties().get("key").unwrap(),
1886 "value"
1887 );
1888
1889 assert_eq!(table.identifier(), updated_table.identifier());
1890 assert_eq!(table.metadata().uuid(), updated_table.metadata().uuid());
1891 assert!(table.metadata().last_updated_ms() < updated_table.metadata().last_updated_ms());
1892 assert_ne!(table.metadata_location(), updated_table.metadata_location());
1893
1894 assert!(
1895 table.metadata().metadata_log().len() < updated_table.metadata().metadata_log().len()
1896 );
1897 }
1898
1899 #[tokio::test]
1900 async fn test_update_table_fails_if_table_doesnt_exist() {
1901 let catalog = new_memory_catalog().await;
1902
1903 let namespace_ident = NamespaceIdent::new("a".into());
1904 create_namespace(&catalog, &namespace_ident).await;
1905
1906 let table_ident = TableIdent::new(namespace_ident, "test".to_string());
1908 let table = build_table(table_ident);
1909
1910 let tx = Transaction::new(&table);
1911 let err = tx
1912 .update_table_properties()
1913 .set("key".to_string(), "value".to_string())
1914 .apply(tx)
1915 .unwrap()
1916 .commit(&catalog)
1917 .await
1918 .unwrap_err();
1919 assert_eq!(err.kind(), ErrorKind::TableNotFound);
1920 }
1921
1922 fn build_table(ident: TableIdent) -> Table {
1923 let file_io = FileIO::new_with_fs();
1924
1925 let temp_dir = TempDir::new().unwrap();
1926 let location = temp_dir.path().to_str().unwrap().to_string();
1927
1928 let table_creation = TableCreation::builder()
1929 .name(ident.name().to_string())
1930 .schema(simple_table_schema())
1931 .location(location)
1932 .build();
1933 let metadata = TableMetadataBuilder::from_table_creation(table_creation)
1934 .unwrap()
1935 .build()
1936 .unwrap()
1937 .metadata;
1938
1939 Table::builder()
1940 .identifier(ident)
1941 .metadata(metadata)
1942 .file_io(file_io)
1943 .build()
1944 .unwrap()
1945 }
1946}