1use std::collections::HashMap;
21use std::str::FromStr;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use futures::lock::{Mutex, MutexGuard};
26use itertools::Itertools;
27
28use super::namespace_state::NamespaceState;
29use crate::io::{FileIO, FileIOBuilder, MemoryStorageFactory, StorageFactory};
30use crate::runtime::Runtime;
31use crate::spec::{TableMetadata, TableMetadataBuilder};
32use crate::table::Table;
33use crate::{
34 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
35 TableCommit, TableCreation, TableIdent,
36};
37
38pub const MEMORY_CATALOG_WAREHOUSE: &str = "warehouse";
40
41const LOCATION: &str = "location";
43
44#[derive(Debug)]
46pub struct MemoryCatalogBuilder {
47 config: MemoryCatalogConfig,
48 storage_factory: Option<Arc<dyn StorageFactory>>,
49 runtime: Option<Runtime>,
50}
51
52impl Default for MemoryCatalogBuilder {
53 fn default() -> Self {
54 Self {
55 config: MemoryCatalogConfig {
56 name: None,
57 warehouse: "".to_string(),
58 props: HashMap::new(),
59 },
60 storage_factory: None,
61 runtime: None,
62 }
63 }
64}
65
66impl CatalogBuilder for MemoryCatalogBuilder {
67 type C = MemoryCatalog;
68
69 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
70 self.storage_factory = Some(storage_factory);
71 self
72 }
73
74 fn with_runtime(mut self, runtime: Runtime) -> Self {
75 self.runtime = Some(runtime);
76 self
77 }
78
79 fn load(
80 mut self,
81 name: impl Into<String>,
82 props: HashMap<String, String>,
83 ) -> impl Future<Output = Result<Self::C>> + Send {
84 self.config.name = Some(name.into());
85
86 if props.contains_key(MEMORY_CATALOG_WAREHOUSE) {
87 self.config.warehouse = props
88 .get(MEMORY_CATALOG_WAREHOUSE)
89 .cloned()
90 .unwrap_or_default()
91 }
92
93 self.config.props = props
95 .into_iter()
96 .filter(|(k, _)| k != MEMORY_CATALOG_WAREHOUSE)
97 .collect();
98
99 let result = {
100 if self.config.name.is_none() {
101 Err(Error::new(
102 ErrorKind::DataInvalid,
103 "Catalog name is required",
104 ))
105 } else if self.config.warehouse.is_empty() {
106 Err(Error::new(
107 ErrorKind::DataInvalid,
108 "Catalog warehouse is required",
109 ))
110 } else {
111 let runtime = self.runtime.unwrap_or_else(Runtime::current);
112 MemoryCatalog::new(self.config, self.storage_factory, runtime)
113 }
114 };
115
116 std::future::ready(result)
117 }
118}
119
120#[derive(Clone, Debug)]
121pub(crate) struct MemoryCatalogConfig {
122 name: Option<String>,
123 warehouse: String,
124 props: HashMap<String, String>,
125}
126
127#[derive(Debug)]
129pub struct MemoryCatalog {
130 root_namespace_state: Mutex<NamespaceState>,
131 file_io: FileIO,
132 warehouse_location: String,
133 runtime: Runtime,
134}
135
136impl MemoryCatalog {
137 fn new(
139 config: MemoryCatalogConfig,
140 storage_factory: Option<Arc<dyn StorageFactory>>,
141 runtime: Runtime,
142 ) -> Result<Self> {
143 let factory = storage_factory.unwrap_or_else(|| Arc::new(MemoryStorageFactory));
145
146 Ok(Self {
147 root_namespace_state: Mutex::new(NamespaceState::default()),
148 file_io: FileIOBuilder::new(factory).with_props(config.props).build(),
149 warehouse_location: config.warehouse,
150 runtime,
151 })
152 }
153
154 async fn load_table_from_locked_state(
156 &self,
157 table_ident: &TableIdent,
158 root_namespace_state: &MutexGuard<'_, NamespaceState>,
159 ) -> Result<Table> {
160 let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?;
161 let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
162
163 Table::builder()
164 .identifier(table_ident.clone())
165 .metadata(metadata)
166 .metadata_location(metadata_location.to_string())
167 .file_io(self.file_io.clone())
168 .runtime(self.runtime.clone())
169 .build()
170 }
171}
172
173#[async_trait]
174impl Catalog for MemoryCatalog {
175 async fn list_namespaces(
177 &self,
178 maybe_parent: Option<&NamespaceIdent>,
179 ) -> Result<Vec<NamespaceIdent>> {
180 let root_namespace_state = self.root_namespace_state.lock().await;
181
182 match maybe_parent {
183 None => {
184 let namespaces = root_namespace_state
185 .list_top_level_namespaces()
186 .into_iter()
187 .map(|str| NamespaceIdent::new(str.to_string()))
188 .collect_vec();
189
190 Ok(namespaces)
191 }
192 Some(parent_namespace_ident) => {
193 let namespaces = root_namespace_state
194 .list_namespaces_under(parent_namespace_ident)?
195 .into_iter()
196 .map(|name| {
197 let mut names = parent_namespace_ident.iter().cloned().collect::<Vec<_>>();
198 names.push(name.to_string());
199 NamespaceIdent::from_vec(names)
200 })
201 .collect::<Result<Vec<_>>>()?;
202
203 Ok(namespaces)
204 }
205 }
206 }
207
208 async fn create_namespace(
210 &self,
211 namespace_ident: &NamespaceIdent,
212 properties: HashMap<String, String>,
213 ) -> Result<Namespace> {
214 let mut root_namespace_state = self.root_namespace_state.lock().await;
215
216 root_namespace_state.insert_new_namespace(namespace_ident, properties.clone())?;
217 let namespace = Namespace::with_properties(namespace_ident.clone(), properties);
218
219 Ok(namespace)
220 }
221
222 async fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<Namespace> {
224 let root_namespace_state = self.root_namespace_state.lock().await;
225
226 let namespace = Namespace::with_properties(
227 namespace_ident.clone(),
228 root_namespace_state
229 .get_properties(namespace_ident)?
230 .clone(),
231 );
232
233 Ok(namespace)
234 }
235
236 async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> Result<bool> {
238 let guarded_namespaces = self.root_namespace_state.lock().await;
239
240 Ok(guarded_namespaces.namespace_exists(namespace_ident))
241 }
242
243 async fn update_namespace(
249 &self,
250 namespace_ident: &NamespaceIdent,
251 properties: HashMap<String, String>,
252 ) -> Result<()> {
253 let mut root_namespace_state = self.root_namespace_state.lock().await;
254
255 root_namespace_state.replace_properties(namespace_ident, properties)
256 }
257
258 async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<()> {
260 let mut root_namespace_state = self.root_namespace_state.lock().await;
261
262 root_namespace_state.remove_existing_namespace(namespace_ident)
263 }
264
265 async fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result<Vec<TableIdent>> {
267 let root_namespace_state = self.root_namespace_state.lock().await;
268
269 let table_names = root_namespace_state.list_tables(namespace_ident)?;
270 let table_idents = table_names
271 .into_iter()
272 .map(|table_name| TableIdent::new(namespace_ident.clone(), table_name.clone()))
273 .collect_vec();
274
275 Ok(table_idents)
276 }
277
278 async fn create_table(
280 &self,
281 namespace_ident: &NamespaceIdent,
282 table_creation: TableCreation,
283 ) -> Result<Table> {
284 let mut root_namespace_state = self.root_namespace_state.lock().await;
285
286 let table_name = table_creation.name.clone();
287 let table_ident = TableIdent::new(namespace_ident.clone(), table_name);
288
289 let (table_creation, location) = match table_creation.location.clone() {
290 Some(location) => (table_creation, location),
291 None => {
292 let namespace_properties = root_namespace_state.get_properties(namespace_ident)?;
293 let location_prefix = match namespace_properties.get(LOCATION) {
294 Some(namespace_location) => namespace_location.clone(),
295 None => format!("{}/{}", self.warehouse_location, namespace_ident.join("/")),
296 };
297
298 let location = format!("{}/{}", location_prefix, table_ident.name());
299
300 let new_table_creation = TableCreation {
301 location: Some(location.clone()),
302 ..table_creation
303 };
304
305 (new_table_creation, location)
306 }
307 };
308
309 let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
310 .build()?
311 .metadata;
312 let metadata_location = MetadataLocation::new_with_metadata(location, &metadata);
313
314 metadata.write_to(&self.file_io, &metadata_location).await?;
315
316 root_namespace_state.insert_new_table(&table_ident, metadata_location.to_string())?;
317
318 Table::builder()
319 .file_io(self.file_io.clone())
320 .metadata_location(metadata_location.to_string())
321 .metadata(metadata)
322 .identifier(table_ident)
323 .runtime(self.runtime.clone())
324 .build()
325 }
326
327 async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
329 let root_namespace_state = self.root_namespace_state.lock().await;
330
331 self.load_table_from_locked_state(table_ident, &root_namespace_state)
332 .await
333 }
334
335 async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> {
337 let mut root_namespace_state = self.root_namespace_state.lock().await;
338
339 root_namespace_state.remove_existing_table(table_ident)?;
340 Ok(())
341 }
342
343 async fn purge_table(&self, table_ident: &TableIdent) -> Result<()> {
344 let table_info = self.load_table(table_ident).await?;
345 self.drop_table(table_ident).await?;
346 crate::catalog::utils::drop_table_data(&table_info).await
347 }
348
349 async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
351 let root_namespace_state = self.root_namespace_state.lock().await;
352
353 root_namespace_state.table_exists(table_ident)
354 }
355
356 async fn rename_table(
358 &self,
359 src_table_ident: &TableIdent,
360 dst_table_ident: &TableIdent,
361 ) -> Result<()> {
362 let mut root_namespace_state = self.root_namespace_state.lock().await;
363
364 let mut new_root_namespace_state = root_namespace_state.clone();
365 let metadata_location = new_root_namespace_state
366 .get_existing_table_location(src_table_ident)?
367 .clone();
368 new_root_namespace_state.remove_existing_table(src_table_ident)?;
369 new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?;
370 *root_namespace_state = new_root_namespace_state;
371
372 Ok(())
373 }
374
375 async fn register_table(
376 &self,
377 table_ident: &TableIdent,
378 metadata_location: String,
379 ) -> Result<Table> {
380 let mut root_namespace_state = self.root_namespace_state.lock().await;
381 root_namespace_state.insert_new_table(&table_ident.clone(), metadata_location.clone())?;
382
383 let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
384
385 Table::builder()
386 .file_io(self.file_io.clone())
387 .metadata_location(metadata_location)
388 .metadata(metadata)
389 .identifier(table_ident.clone())
390 .runtime(self.runtime.clone())
391 .build()
392 }
393
394 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
396 let mut root_namespace_state = self.root_namespace_state.lock().await;
397
398 let current_table = self
399 .load_table_from_locked_state(commit.identifier(), &root_namespace_state)
400 .await?;
401
402 let staged_table = commit.apply(current_table)?;
404
405 let metadata_location =
407 MetadataLocation::from_str(staged_table.metadata_location_result()?)?;
408 staged_table
409 .metadata()
410 .write_to(staged_table.file_io(), &metadata_location)
411 .await?;
412
413 let updated_table = root_namespace_state.commit_table_update(staged_table)?;
415
416 Ok(updated_table)
417 }
418}
419
420#[cfg(test)]
421pub(crate) mod tests {
422 use std::collections::HashSet;
423 use std::hash::Hash;
424 use std::iter::FromIterator;
425 use std::vec;
426
427 use regex::Regex;
428 use tempfile::TempDir;
429
430 use super::*;
431 use crate::io::FileIO;
432 use crate::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
433 use crate::test_utils::test_runtime;
434 use crate::transaction::{ApplyTransactionAction, Transaction};
435
436 fn temp_path() -> String {
437 let temp_dir = TempDir::new().unwrap();
438 temp_dir.path().to_str().unwrap().to_string()
439 }
440
441 pub(crate) async fn new_memory_catalog() -> impl Catalog {
442 let warehouse_location = temp_path();
443 MemoryCatalogBuilder::default()
444 .load(
445 "memory",
446 HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_location)]),
447 )
448 .await
449 .unwrap()
450 }
451
452 async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
453 let _ = catalog
454 .create_namespace(namespace_ident, HashMap::new())
455 .await
456 .unwrap();
457 }
458
459 async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
460 for namespace_ident in namespace_idents {
461 let _ = create_namespace(catalog, namespace_ident).await;
462 }
463 }
464
465 fn to_set<T: Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
466 HashSet::from_iter(vec)
467 }
468
469 fn simple_table_schema() -> Schema {
470 Schema::builder()
471 .with_fields(vec![
472 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
473 ])
474 .build()
475 .unwrap()
476 }
477
478 async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) -> Table {
479 catalog
480 .create_table(
481 &table_ident.namespace,
482 TableCreation::builder()
483 .name(table_ident.name().into())
484 .schema(simple_table_schema())
485 .build(),
486 )
487 .await
488 .unwrap()
489 }
490
491 async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
492 for table_ident in table_idents {
493 create_table(catalog, table_ident).await;
494 }
495 }
496
497 async fn create_table_with_namespace<C: Catalog>(catalog: &C) -> Table {
498 let namespace_ident = NamespaceIdent::new("abc".into());
499 create_namespace(catalog, &namespace_ident).await;
500
501 let table_ident = TableIdent::new(namespace_ident, "test".to_string());
502 create_table(catalog, &table_ident).await
503 }
504
505 fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
506 assert_eq!(table.identifier(), expected_table_ident);
507
508 let metadata = table.metadata();
509
510 assert_eq!(metadata.current_schema().as_ref(), expected_schema);
511
512 let expected_partition_spec = PartitionSpec::builder((*expected_schema).clone())
513 .with_spec_id(0)
514 .build()
515 .unwrap();
516
517 assert_eq!(
518 metadata
519 .partition_specs_iter()
520 .map(|p| p.as_ref())
521 .collect_vec(),
522 vec![&expected_partition_spec]
523 );
524
525 let expected_sorted_order = SortOrder::builder()
526 .with_order_id(0)
527 .with_fields(vec![])
528 .build(expected_schema)
529 .unwrap();
530
531 assert_eq!(
532 metadata
533 .sort_orders_iter()
534 .map(|s| s.as_ref())
535 .collect_vec(),
536 vec![&expected_sorted_order]
537 );
538
539 assert_eq!(metadata.properties(), &HashMap::new());
540
541 assert!(!table.readonly());
542 }
543
544 const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
545
546 fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
547 let actual = table.metadata_location().unwrap().to_string();
548 let regex = Regex::new(regex_str).unwrap();
549 assert!(
550 regex.is_match(&actual),
551 "Expected metadata location to match regex, but got location: {actual} and regex: {regex}"
552 )
553 }
554
555 #[tokio::test]
556 async fn test_list_namespaces_returns_empty_vector() {
557 let catalog = new_memory_catalog().await;
558
559 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
560 }
561
562 #[tokio::test]
563 async fn test_list_namespaces_returns_single_namespace() {
564 let catalog = new_memory_catalog().await;
565 let namespace_ident = NamespaceIdent::new("abc".into());
566 create_namespace(&catalog, &namespace_ident).await;
567
568 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
569 namespace_ident
570 ]);
571 }
572
573 #[tokio::test]
574 async fn test_list_namespaces_returns_multiple_namespaces() {
575 let catalog = new_memory_catalog().await;
576 let namespace_ident_1 = NamespaceIdent::new("a".into());
577 let namespace_ident_2 = NamespaceIdent::new("b".into());
578 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
579
580 assert_eq!(
581 to_set(catalog.list_namespaces(None).await.unwrap()),
582 to_set(vec![namespace_ident_1, namespace_ident_2])
583 );
584 }
585
586 #[tokio::test]
587 async fn test_list_namespaces_returns_only_top_level_namespaces() {
588 let catalog = new_memory_catalog().await;
589 let namespace_ident_1 = NamespaceIdent::new("a".into());
590 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
591 let namespace_ident_3 = NamespaceIdent::new("b".into());
592 create_namespaces(&catalog, &vec![
593 &namespace_ident_1,
594 &namespace_ident_2,
595 &namespace_ident_3,
596 ])
597 .await;
598
599 assert_eq!(
600 to_set(catalog.list_namespaces(None).await.unwrap()),
601 to_set(vec![namespace_ident_1, namespace_ident_3])
602 );
603 }
604
605 #[tokio::test]
606 async fn test_list_namespaces_returns_no_namespaces_under_parent() {
607 let catalog = new_memory_catalog().await;
608 let namespace_ident_1 = NamespaceIdent::new("a".into());
609 let namespace_ident_2 = NamespaceIdent::new("b".into());
610 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
611
612 assert_eq!(
613 catalog
614 .list_namespaces(Some(&namespace_ident_1))
615 .await
616 .unwrap(),
617 vec![]
618 );
619 }
620
621 #[tokio::test]
622 async fn test_list_namespaces_returns_namespace_under_parent() {
623 let catalog = new_memory_catalog().await;
624 let namespace_ident_1 = NamespaceIdent::new("a".into());
625 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
626 let namespace_ident_3 = NamespaceIdent::new("c".into());
627 create_namespaces(&catalog, &vec![
628 &namespace_ident_1,
629 &namespace_ident_2,
630 &namespace_ident_3,
631 ])
632 .await;
633
634 assert_eq!(
635 to_set(catalog.list_namespaces(None).await.unwrap()),
636 to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
637 );
638
639 assert_eq!(
640 catalog
641 .list_namespaces(Some(&namespace_ident_1))
642 .await
643 .unwrap(),
644 vec![namespace_ident_2]
645 );
646 }
647
648 #[tokio::test]
649 async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
650 let catalog = new_memory_catalog().await;
651 let namespace_ident_1 = NamespaceIdent::new("a".to_string());
652 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
653 let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
654 let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
655 let namespace_ident_5 = NamespaceIdent::new("b".into());
656 create_namespaces(&catalog, &vec![
657 &namespace_ident_1,
658 &namespace_ident_2,
659 &namespace_ident_3,
660 &namespace_ident_4,
661 &namespace_ident_5,
662 ])
663 .await;
664
665 assert_eq!(
666 to_set(
667 catalog
668 .list_namespaces(Some(&namespace_ident_1))
669 .await
670 .unwrap()
671 ),
672 to_set(vec![
673 namespace_ident_2,
674 namespace_ident_3,
675 namespace_ident_4,
676 ])
677 );
678 }
679
680 #[tokio::test]
681 async fn test_namespace_exists_returns_false() {
682 let catalog = new_memory_catalog().await;
683 let namespace_ident = NamespaceIdent::new("a".into());
684 create_namespace(&catalog, &namespace_ident).await;
685
686 assert!(
687 !catalog
688 .namespace_exists(&NamespaceIdent::new("b".into()))
689 .await
690 .unwrap()
691 );
692 }
693
694 #[tokio::test]
695 async fn test_namespace_exists_returns_true() {
696 let catalog = new_memory_catalog().await;
697 let namespace_ident = NamespaceIdent::new("a".into());
698 create_namespace(&catalog, &namespace_ident).await;
699
700 assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
701 }
702
703 #[tokio::test]
704 async fn test_create_namespace_with_empty_properties() {
705 let catalog = new_memory_catalog().await;
706 let namespace_ident = NamespaceIdent::new("a".into());
707
708 assert_eq!(
709 catalog
710 .create_namespace(&namespace_ident, HashMap::new())
711 .await
712 .unwrap(),
713 Namespace::new(namespace_ident.clone())
714 );
715
716 assert_eq!(
717 catalog.get_namespace(&namespace_ident).await.unwrap(),
718 Namespace::with_properties(namespace_ident, HashMap::new())
719 );
720 }
721
722 #[tokio::test]
723 async fn test_create_namespace_with_properties() {
724 let catalog = new_memory_catalog().await;
725 let namespace_ident = NamespaceIdent::new("abc".into());
726
727 let mut properties: HashMap<String, String> = HashMap::new();
728 properties.insert("k".into(), "v".into());
729
730 assert_eq!(
731 catalog
732 .create_namespace(&namespace_ident, properties.clone())
733 .await
734 .unwrap(),
735 Namespace::with_properties(namespace_ident.clone(), properties.clone())
736 );
737
738 assert_eq!(
739 catalog.get_namespace(&namespace_ident).await.unwrap(),
740 Namespace::with_properties(namespace_ident, properties)
741 );
742 }
743
744 #[tokio::test]
745 async fn test_create_namespace_throws_error_if_namespace_already_exists() {
746 let catalog = new_memory_catalog().await;
747 let namespace_ident = NamespaceIdent::new("a".into());
748 create_namespace(&catalog, &namespace_ident).await;
749
750 assert_eq!(
751 catalog
752 .create_namespace(&namespace_ident, HashMap::new())
753 .await
754 .unwrap_err()
755 .to_string(),
756 format!(
757 "NamespaceAlreadyExists => Cannot create namespace {:?}. Namespace already exists.",
758 &namespace_ident
759 )
760 );
761
762 assert_eq!(
763 catalog.get_namespace(&namespace_ident).await.unwrap(),
764 Namespace::with_properties(namespace_ident, HashMap::new())
765 );
766 }
767
768 #[tokio::test]
769 async fn test_create_nested_namespace() {
770 let catalog = new_memory_catalog().await;
771 let parent_namespace_ident = NamespaceIdent::new("a".into());
772 create_namespace(&catalog, &parent_namespace_ident).await;
773
774 let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
775
776 assert_eq!(
777 catalog
778 .create_namespace(&child_namespace_ident, HashMap::new())
779 .await
780 .unwrap(),
781 Namespace::new(child_namespace_ident.clone())
782 );
783
784 assert_eq!(
785 catalog.get_namespace(&child_namespace_ident).await.unwrap(),
786 Namespace::with_properties(child_namespace_ident, HashMap::new())
787 );
788 }
789
790 #[tokio::test]
791 async fn test_create_deeply_nested_namespace() {
792 let catalog = new_memory_catalog().await;
793 let namespace_ident_a = NamespaceIdent::new("a".into());
794 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
795 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
796
797 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
798
799 assert_eq!(
800 catalog
801 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
802 .await
803 .unwrap(),
804 Namespace::new(namespace_ident_a_b_c.clone())
805 );
806
807 assert_eq!(
808 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
809 Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
810 );
811 }
812
813 #[tokio::test]
814 async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() {
815 let catalog = new_memory_catalog().await;
816
817 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
818
819 assert_eq!(
820 catalog
821 .create_namespace(&nested_namespace_ident, HashMap::new())
822 .await
823 .unwrap_err()
824 .to_string(),
825 format!(
826 "NamespaceNotFound => No such namespace: {:?}",
827 NamespaceIdent::new("a".into())
828 )
829 );
830
831 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
832 }
833
834 #[tokio::test]
835 async fn test_create_deeply_nested_namespace_throws_error_if_intermediate_namespace_doesnt_exist()
836 {
837 let catalog = new_memory_catalog().await;
838
839 let namespace_ident_a = NamespaceIdent::new("a".into());
840 create_namespace(&catalog, &namespace_ident_a).await;
841
842 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
843
844 assert_eq!(
845 catalog
846 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
847 .await
848 .unwrap_err()
849 .to_string(),
850 format!(
851 "NamespaceNotFound => No such namespace: {:?}",
852 NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()
853 )
854 );
855
856 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
857 namespace_ident_a.clone()
858 ]);
859
860 assert_eq!(
861 catalog
862 .list_namespaces(Some(&namespace_ident_a))
863 .await
864 .unwrap(),
865 vec![]
866 );
867 }
868
869 #[tokio::test]
870 async fn test_get_namespace() {
871 let catalog = new_memory_catalog().await;
872 let namespace_ident = NamespaceIdent::new("abc".into());
873
874 let mut properties: HashMap<String, String> = HashMap::new();
875 properties.insert("k".into(), "v".into());
876 let _ = catalog
877 .create_namespace(&namespace_ident, properties.clone())
878 .await
879 .unwrap();
880
881 assert_eq!(
882 catalog.get_namespace(&namespace_ident).await.unwrap(),
883 Namespace::with_properties(namespace_ident, properties)
884 )
885 }
886
887 #[tokio::test]
888 async fn test_get_nested_namespace() {
889 let catalog = new_memory_catalog().await;
890 let namespace_ident_a = NamespaceIdent::new("a".into());
891 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
892 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
893
894 assert_eq!(
895 catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
896 Namespace::with_properties(namespace_ident_a_b, HashMap::new())
897 );
898 }
899
900 #[tokio::test]
901 async fn test_get_deeply_nested_namespace() {
902 let catalog = new_memory_catalog().await;
903 let namespace_ident_a = NamespaceIdent::new("a".into());
904 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
905 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
906 create_namespaces(&catalog, &vec![
907 &namespace_ident_a,
908 &namespace_ident_a_b,
909 &namespace_ident_a_b_c,
910 ])
911 .await;
912
913 assert_eq!(
914 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
915 Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
916 );
917 }
918
919 #[tokio::test]
920 async fn test_get_namespace_throws_error_if_namespace_doesnt_exist() {
921 let catalog = new_memory_catalog().await;
922 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
923
924 let non_existent_namespace_ident = NamespaceIdent::new("b".into());
925 assert_eq!(
926 catalog
927 .get_namespace(&non_existent_namespace_ident)
928 .await
929 .unwrap_err()
930 .to_string(),
931 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
932 )
933 }
934
935 #[tokio::test]
936 async fn test_update_namespace() {
937 let catalog = new_memory_catalog().await;
938 let namespace_ident = NamespaceIdent::new("abc".into());
939 create_namespace(&catalog, &namespace_ident).await;
940
941 let mut new_properties: HashMap<String, String> = HashMap::new();
942 new_properties.insert("k".into(), "v".into());
943
944 catalog
945 .update_namespace(&namespace_ident, new_properties.clone())
946 .await
947 .unwrap();
948
949 assert_eq!(
950 catalog.get_namespace(&namespace_ident).await.unwrap(),
951 Namespace::with_properties(namespace_ident, new_properties)
952 )
953 }
954
955 #[tokio::test]
956 async fn test_update_nested_namespace() {
957 let catalog = new_memory_catalog().await;
958 let namespace_ident_a = NamespaceIdent::new("a".into());
959 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
960 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
961
962 let mut new_properties = HashMap::new();
963 new_properties.insert("k".into(), "v".into());
964
965 catalog
966 .update_namespace(&namespace_ident_a_b, new_properties.clone())
967 .await
968 .unwrap();
969
970 assert_eq!(
971 catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
972 Namespace::with_properties(namespace_ident_a_b, new_properties)
973 );
974 }
975
976 #[tokio::test]
977 async fn test_update_deeply_nested_namespace() {
978 let catalog = new_memory_catalog().await;
979 let namespace_ident_a = NamespaceIdent::new("a".into());
980 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
981 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
982 create_namespaces(&catalog, &vec![
983 &namespace_ident_a,
984 &namespace_ident_a_b,
985 &namespace_ident_a_b_c,
986 ])
987 .await;
988
989 let mut new_properties = HashMap::new();
990 new_properties.insert("k".into(), "v".into());
991
992 catalog
993 .update_namespace(&namespace_ident_a_b_c, new_properties.clone())
994 .await
995 .unwrap();
996
997 assert_eq!(
998 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
999 Namespace::with_properties(namespace_ident_a_b_c, new_properties)
1000 );
1001 }
1002
1003 #[tokio::test]
1004 async fn test_update_namespace_throws_error_if_namespace_doesnt_exist() {
1005 let catalog = new_memory_catalog().await;
1006 create_namespace(&catalog, &NamespaceIdent::new("abc".into())).await;
1007
1008 let non_existent_namespace_ident = NamespaceIdent::new("def".into());
1009 assert_eq!(
1010 catalog
1011 .update_namespace(&non_existent_namespace_ident, HashMap::new())
1012 .await
1013 .unwrap_err()
1014 .to_string(),
1015 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1016 )
1017 }
1018
1019 #[tokio::test]
1020 async fn test_drop_namespace() {
1021 let catalog = new_memory_catalog().await;
1022 let namespace_ident = NamespaceIdent::new("abc".into());
1023 create_namespace(&catalog, &namespace_ident).await;
1024
1025 catalog.drop_namespace(&namespace_ident).await.unwrap();
1026
1027 assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
1028 }
1029
1030 #[tokio::test]
1031 async fn test_drop_nested_namespace() {
1032 let catalog = new_memory_catalog().await;
1033 let namespace_ident_a = NamespaceIdent::new("a".into());
1034 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1035 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1036
1037 catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1038
1039 assert!(
1040 !catalog
1041 .namespace_exists(&namespace_ident_a_b)
1042 .await
1043 .unwrap()
1044 );
1045
1046 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1047 }
1048
1049 #[tokio::test]
1050 async fn test_drop_deeply_nested_namespace() {
1051 let catalog = new_memory_catalog().await;
1052 let namespace_ident_a = NamespaceIdent::new("a".into());
1053 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1054 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1055 create_namespaces(&catalog, &vec![
1056 &namespace_ident_a,
1057 &namespace_ident_a_b,
1058 &namespace_ident_a_b_c,
1059 ])
1060 .await;
1061
1062 catalog
1063 .drop_namespace(&namespace_ident_a_b_c)
1064 .await
1065 .unwrap();
1066
1067 assert!(
1068 !catalog
1069 .namespace_exists(&namespace_ident_a_b_c)
1070 .await
1071 .unwrap()
1072 );
1073
1074 assert!(
1075 catalog
1076 .namespace_exists(&namespace_ident_a_b)
1077 .await
1078 .unwrap()
1079 );
1080
1081 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1082 }
1083
1084 #[tokio::test]
1085 async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1086 let catalog = new_memory_catalog().await;
1087
1088 let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1089 assert_eq!(
1090 catalog
1091 .drop_namespace(&non_existent_namespace_ident)
1092 .await
1093 .unwrap_err()
1094 .to_string(),
1095 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1096 )
1097 }
1098
1099 #[tokio::test]
1100 async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1101 let catalog = new_memory_catalog().await;
1102 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1103
1104 let non_existent_namespace_ident =
1105 NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1106 assert_eq!(
1107 catalog
1108 .drop_namespace(&non_existent_namespace_ident)
1109 .await
1110 .unwrap_err()
1111 .to_string(),
1112 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1113 )
1114 }
1115
1116 #[tokio::test]
1117 async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() {
1118 let catalog = new_memory_catalog().await;
1119 let namespace_ident_a = NamespaceIdent::new("a".into());
1120 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1121 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1122
1123 catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1124
1125 assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1126
1127 assert!(
1128 !catalog
1129 .namespace_exists(&namespace_ident_a_b)
1130 .await
1131 .unwrap()
1132 );
1133 }
1134
1135 #[tokio::test]
1136 async fn test_create_table_with_location() {
1137 let tmp_dir = TempDir::new().unwrap();
1138 let catalog = new_memory_catalog().await;
1139 let namespace_ident = NamespaceIdent::new("a".into());
1140 create_namespace(&catalog, &namespace_ident).await;
1141
1142 let table_name = "abc";
1143 let location = tmp_dir.path().to_str().unwrap().to_string();
1144 let table_creation = TableCreation::builder()
1145 .name(table_name.into())
1146 .location(location.clone())
1147 .schema(simple_table_schema())
1148 .build();
1149
1150 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1151
1152 assert_table_eq(
1153 &catalog
1154 .create_table(&namespace_ident, table_creation)
1155 .await
1156 .unwrap(),
1157 &expected_table_ident,
1158 &simple_table_schema(),
1159 );
1160
1161 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1162
1163 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1164
1165 assert!(
1166 table
1167 .metadata_location()
1168 .unwrap()
1169 .to_string()
1170 .starts_with(&location)
1171 )
1172 }
1173
1174 #[tokio::test]
1175 async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1176 let warehouse_location = temp_path();
1177 let catalog = MemoryCatalogBuilder::default()
1178 .load(
1179 "memory",
1180 HashMap::from([(
1181 MEMORY_CATALOG_WAREHOUSE.to_string(),
1182 warehouse_location.clone(),
1183 )]),
1184 )
1185 .await
1186 .unwrap();
1187
1188 let namespace_ident = NamespaceIdent::new("a".into());
1189 let mut namespace_properties = HashMap::new();
1190 let namespace_location = temp_path();
1191 namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1192 catalog
1193 .create_namespace(&namespace_ident, namespace_properties)
1194 .await
1195 .unwrap();
1196
1197 let table_name = "tbl1";
1198 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1199 let expected_table_metadata_location_regex =
1200 format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1201
1202 let table = catalog
1203 .create_table(
1204 &namespace_ident,
1205 TableCreation::builder()
1206 .name(table_name.into())
1207 .schema(simple_table_schema())
1208 .build(),
1210 )
1211 .await
1212 .unwrap();
1213 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1214 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1215
1216 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1217 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1218 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1219 }
1220
1221 #[tokio::test]
1222 async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1223 {
1224 let warehouse_location = temp_path();
1225 let catalog = MemoryCatalogBuilder::default()
1226 .load(
1227 "memory",
1228 HashMap::from([(
1229 MEMORY_CATALOG_WAREHOUSE.to_string(),
1230 warehouse_location.clone(),
1231 )]),
1232 )
1233 .await
1234 .unwrap();
1235
1236 let namespace_ident = NamespaceIdent::new("a".into());
1237 let mut namespace_properties = HashMap::new();
1238 let namespace_location = temp_path();
1239 namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1240 catalog
1241 .create_namespace(&namespace_ident, namespace_properties)
1242 .await
1243 .unwrap();
1244
1245 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1246 let mut nested_namespace_properties = HashMap::new();
1247 let nested_namespace_location = temp_path();
1248 nested_namespace_properties
1249 .insert(LOCATION.to_string(), nested_namespace_location.to_string());
1250 catalog
1251 .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1252 .await
1253 .unwrap();
1254
1255 let table_name = "tbl1";
1256 let expected_table_ident =
1257 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1258 let expected_table_metadata_location_regex = format!(
1259 "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1260 );
1261
1262 let table = catalog
1263 .create_table(
1264 &nested_namespace_ident,
1265 TableCreation::builder()
1266 .name(table_name.into())
1267 .schema(simple_table_schema())
1268 .build(),
1270 )
1271 .await
1272 .unwrap();
1273 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1274 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1275
1276 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1277 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1278 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1279 }
1280
1281 #[tokio::test]
1282 async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1283 {
1284 let warehouse_location = temp_path();
1285 let catalog = MemoryCatalogBuilder::default()
1286 .load(
1287 "memory",
1288 HashMap::from([(
1289 MEMORY_CATALOG_WAREHOUSE.to_string(),
1290 warehouse_location.clone(),
1291 )]),
1292 )
1293 .await
1294 .unwrap();
1295
1296 let namespace_ident = NamespaceIdent::new("a".into());
1297 let namespace_properties = HashMap::new();
1299 catalog
1300 .create_namespace(&namespace_ident, namespace_properties)
1301 .await
1302 .unwrap();
1303
1304 let table_name = "tbl1";
1305 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1306 let expected_table_metadata_location_regex =
1307 format!("^{warehouse_location}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1308
1309 let table = catalog
1310 .create_table(
1311 &namespace_ident,
1312 TableCreation::builder()
1313 .name(table_name.into())
1314 .schema(simple_table_schema())
1315 .build(),
1317 )
1318 .await
1319 .unwrap();
1320 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1321 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1322
1323 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1324 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1325 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1326 }
1327
1328 #[tokio::test]
1329 async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1330 {
1331 let warehouse_location = temp_path();
1332 let catalog = MemoryCatalogBuilder::default()
1333 .load(
1334 "memory",
1335 HashMap::from([(
1336 MEMORY_CATALOG_WAREHOUSE.to_string(),
1337 warehouse_location.clone(),
1338 )]),
1339 )
1340 .await
1341 .unwrap();
1342
1343 let namespace_ident = NamespaceIdent::new("a".into());
1344 catalog
1345 .create_namespace(&namespace_ident, HashMap::new())
1347 .await
1348 .unwrap();
1349
1350 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1351 catalog
1352 .create_namespace(&nested_namespace_ident, HashMap::new())
1354 .await
1355 .unwrap();
1356
1357 let table_name = "tbl1";
1358 let expected_table_ident =
1359 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1360 let expected_table_metadata_location_regex = format!(
1361 "^{warehouse_location}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$"
1362 );
1363
1364 let table = catalog
1365 .create_table(
1366 &nested_namespace_ident,
1367 TableCreation::builder()
1368 .name(table_name.into())
1369 .schema(simple_table_schema())
1370 .build(),
1372 )
1373 .await
1374 .unwrap();
1375 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1376 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1377
1378 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1379 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1380 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1381 }
1382
1383 #[tokio::test]
1384 async fn test_create_table_throws_error_if_table_location_and_namespace_location_and_warehouse_location_are_missing()
1385 {
1386 let catalog = MemoryCatalogBuilder::default()
1387 .load("memory", HashMap::from([]))
1388 .await;
1389
1390 assert!(catalog.is_err());
1391 assert_eq!(
1392 catalog.unwrap_err().to_string(),
1393 "DataInvalid => Catalog warehouse is required"
1394 );
1395 }
1396
1397 #[tokio::test]
1398 async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1399 let catalog = new_memory_catalog().await;
1400 let namespace_ident = NamespaceIdent::new("a".into());
1401 create_namespace(&catalog, &namespace_ident).await;
1402 let table_name = "tbl1";
1403 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1404 create_table(&catalog, &table_ident).await;
1405
1406 let tmp_dir = TempDir::new().unwrap();
1407 let location = tmp_dir.path().to_str().unwrap().to_string();
1408
1409 assert_eq!(
1410 catalog
1411 .create_table(
1412 &namespace_ident,
1413 TableCreation::builder()
1414 .name(table_name.into())
1415 .schema(simple_table_schema())
1416 .location(location)
1417 .build()
1418 )
1419 .await
1420 .unwrap_err()
1421 .to_string(),
1422 format!(
1423 "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1424 &table_ident
1425 )
1426 );
1427 }
1428
1429 #[tokio::test]
1430 async fn test_list_tables_returns_empty_vector() {
1431 let catalog = new_memory_catalog().await;
1432 let namespace_ident = NamespaceIdent::new("a".into());
1433 create_namespace(&catalog, &namespace_ident).await;
1434
1435 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1436 }
1437
1438 #[tokio::test]
1439 async fn test_list_tables_returns_a_single_table() {
1440 let catalog = new_memory_catalog().await;
1441 let namespace_ident = NamespaceIdent::new("n1".into());
1442 create_namespace(&catalog, &namespace_ident).await;
1443
1444 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1445 create_table(&catalog, &table_ident).await;
1446
1447 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1448 table_ident
1449 ]);
1450 }
1451
1452 #[tokio::test]
1453 async fn test_list_tables_returns_multiple_tables() {
1454 let catalog = new_memory_catalog().await;
1455 let namespace_ident = NamespaceIdent::new("n1".into());
1456 create_namespace(&catalog, &namespace_ident).await;
1457
1458 let table_ident_1 = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1459 let table_ident_2 = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1460 let _ = create_tables(&catalog, vec![&table_ident_1, &table_ident_2]).await;
1461
1462 assert_eq!(
1463 to_set(catalog.list_tables(&namespace_ident).await.unwrap()),
1464 to_set(vec![table_ident_1, table_ident_2])
1465 );
1466 }
1467
1468 #[tokio::test]
1469 async fn test_list_tables_returns_tables_from_correct_namespace() {
1470 let catalog = new_memory_catalog().await;
1471 let namespace_ident_1 = NamespaceIdent::new("n1".into());
1472 let namespace_ident_2 = NamespaceIdent::new("n2".into());
1473 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1474
1475 let table_ident_1 = TableIdent::new(namespace_ident_1.clone(), "tbl1".into());
1476 let table_ident_2 = TableIdent::new(namespace_ident_1.clone(), "tbl2".into());
1477 let table_ident_3 = TableIdent::new(namespace_ident_2.clone(), "tbl1".into());
1478 let _ = create_tables(&catalog, vec![
1479 &table_ident_1,
1480 &table_ident_2,
1481 &table_ident_3,
1482 ])
1483 .await;
1484
1485 assert_eq!(
1486 to_set(catalog.list_tables(&namespace_ident_1).await.unwrap()),
1487 to_set(vec![table_ident_1, table_ident_2])
1488 );
1489
1490 assert_eq!(
1491 to_set(catalog.list_tables(&namespace_ident_2).await.unwrap()),
1492 to_set(vec![table_ident_3])
1493 );
1494 }
1495
1496 #[tokio::test]
1497 async fn test_list_tables_returns_table_under_nested_namespace() {
1498 let catalog = new_memory_catalog().await;
1499 let namespace_ident_a = NamespaceIdent::new("a".into());
1500 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1501 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1502
1503 let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1504 create_table(&catalog, &table_ident).await;
1505
1506 assert_eq!(
1507 catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1508 vec![table_ident]
1509 );
1510 }
1511
1512 #[tokio::test]
1513 async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1514 let catalog = new_memory_catalog().await;
1515
1516 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1517
1518 assert_eq!(
1519 catalog
1520 .list_tables(&non_existent_namespace_ident)
1521 .await
1522 .unwrap_err()
1523 .to_string(),
1524 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1525 );
1526 }
1527
1528 #[tokio::test]
1529 async fn test_drop_table() {
1530 let catalog = new_memory_catalog().await;
1531 let namespace_ident = NamespaceIdent::new("n1".into());
1532 create_namespace(&catalog, &namespace_ident).await;
1533 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1534 create_table(&catalog, &table_ident).await;
1535
1536 catalog.drop_table(&table_ident).await.unwrap();
1537 }
1538
1539 #[tokio::test]
1540 async fn test_drop_table_drops_table_under_nested_namespace() {
1541 let catalog = new_memory_catalog().await;
1542 let namespace_ident_a = NamespaceIdent::new("a".into());
1543 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1544 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1545
1546 let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1547 create_table(&catalog, &table_ident).await;
1548
1549 catalog.drop_table(&table_ident).await.unwrap();
1550
1551 assert_eq!(
1552 catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1553 vec![]
1554 );
1555 }
1556
1557 #[tokio::test]
1558 async fn test_drop_table_throws_error_if_namespace_doesnt_exist() {
1559 let catalog = new_memory_catalog().await;
1560
1561 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1562 let non_existent_table_ident =
1563 TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1564
1565 assert_eq!(
1566 catalog
1567 .drop_table(&non_existent_table_ident)
1568 .await
1569 .unwrap_err()
1570 .to_string(),
1571 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1572 );
1573 }
1574
1575 #[tokio::test]
1576 async fn test_drop_table_throws_error_if_table_doesnt_exist() {
1577 let catalog = new_memory_catalog().await;
1578 let namespace_ident = NamespaceIdent::new("n1".into());
1579 create_namespace(&catalog, &namespace_ident).await;
1580
1581 let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1582
1583 assert_eq!(
1584 catalog
1585 .drop_table(&non_existent_table_ident)
1586 .await
1587 .unwrap_err()
1588 .to_string(),
1589 format!("TableNotFound => No such table: {non_existent_table_ident:?}"),
1590 );
1591 }
1592
1593 #[tokio::test]
1594 async fn test_table_exists_returns_true() {
1595 let catalog = new_memory_catalog().await;
1596 let namespace_ident = NamespaceIdent::new("n1".into());
1597 create_namespace(&catalog, &namespace_ident).await;
1598 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1599 create_table(&catalog, &table_ident).await;
1600
1601 assert!(catalog.table_exists(&table_ident).await.unwrap());
1602 }
1603
1604 #[tokio::test]
1605 async fn test_table_exists_returns_false() {
1606 let catalog = new_memory_catalog().await;
1607 let namespace_ident = NamespaceIdent::new("n1".into());
1608 create_namespace(&catalog, &namespace_ident).await;
1609 let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1610
1611 assert!(
1612 !catalog
1613 .table_exists(&non_existent_table_ident)
1614 .await
1615 .unwrap()
1616 );
1617 }
1618
1619 #[tokio::test]
1620 async fn test_table_exists_under_nested_namespace() {
1621 let catalog = new_memory_catalog().await;
1622 let namespace_ident_a = NamespaceIdent::new("a".into());
1623 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1624 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1625
1626 let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1627 create_table(&catalog, &table_ident).await;
1628
1629 assert!(catalog.table_exists(&table_ident).await.unwrap());
1630
1631 let non_existent_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl2".into());
1632 assert!(
1633 !catalog
1634 .table_exists(&non_existent_table_ident)
1635 .await
1636 .unwrap()
1637 );
1638 }
1639
1640 #[tokio::test]
1641 async fn test_table_exists_throws_error_if_namespace_doesnt_exist() {
1642 let catalog = new_memory_catalog().await;
1643
1644 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1645 let non_existent_table_ident =
1646 TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1647
1648 assert_eq!(
1649 catalog
1650 .table_exists(&non_existent_table_ident)
1651 .await
1652 .unwrap_err()
1653 .to_string(),
1654 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1655 );
1656 }
1657
1658 #[tokio::test]
1659 async fn test_rename_table_in_same_namespace() {
1660 let catalog = new_memory_catalog().await;
1661 let namespace_ident = NamespaceIdent::new("n1".into());
1662 create_namespace(&catalog, &namespace_ident).await;
1663 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1664 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1665 create_table(&catalog, &src_table_ident).await;
1666
1667 catalog
1668 .rename_table(&src_table_ident, &dst_table_ident)
1669 .await
1670 .unwrap();
1671
1672 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1673 dst_table_ident
1674 ],);
1675 }
1676
1677 #[tokio::test]
1678 async fn test_rename_table_across_namespaces() {
1679 let catalog = new_memory_catalog().await;
1680 let src_namespace_ident = NamespaceIdent::new("a".into());
1681 let dst_namespace_ident = NamespaceIdent::new("b".into());
1682 create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
1683 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1684 let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
1685 create_table(&catalog, &src_table_ident).await;
1686
1687 catalog
1688 .rename_table(&src_table_ident, &dst_table_ident)
1689 .await
1690 .unwrap();
1691
1692 assert_eq!(
1693 catalog.list_tables(&src_namespace_ident).await.unwrap(),
1694 vec![],
1695 );
1696
1697 assert_eq!(
1698 catalog.list_tables(&dst_namespace_ident).await.unwrap(),
1699 vec![dst_table_ident],
1700 );
1701 }
1702
1703 #[tokio::test]
1704 async fn test_rename_table_src_table_is_same_as_dst_table() {
1705 let catalog = new_memory_catalog().await;
1706 let namespace_ident = NamespaceIdent::new("n1".into());
1707 create_namespace(&catalog, &namespace_ident).await;
1708 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1709 create_table(&catalog, &table_ident).await;
1710
1711 catalog
1712 .rename_table(&table_ident, &table_ident)
1713 .await
1714 .unwrap();
1715
1716 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1717 table_ident
1718 ],);
1719 }
1720
1721 #[tokio::test]
1722 async fn test_rename_table_across_nested_namespaces() {
1723 let catalog = new_memory_catalog().await;
1724 let namespace_ident_a = NamespaceIdent::new("a".into());
1725 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1726 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1727 create_namespaces(&catalog, &vec![
1728 &namespace_ident_a,
1729 &namespace_ident_a_b,
1730 &namespace_ident_a_b_c,
1731 ])
1732 .await;
1733
1734 let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
1735 create_tables(&catalog, vec![&src_table_ident]).await;
1736
1737 let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1738 catalog
1739 .rename_table(&src_table_ident, &dst_table_ident)
1740 .await
1741 .unwrap();
1742
1743 assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
1744
1745 assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
1746 }
1747
1748 #[tokio::test]
1749 async fn test_rename_table_throws_error_if_src_namespace_doesnt_exist() {
1750 let catalog = new_memory_catalog().await;
1751
1752 let non_existent_src_namespace_ident = NamespaceIdent::new("n1".into());
1753 let src_table_ident =
1754 TableIdent::new(non_existent_src_namespace_ident.clone(), "tbl1".into());
1755
1756 let dst_namespace_ident = NamespaceIdent::new("n2".into());
1757 create_namespace(&catalog, &dst_namespace_ident).await;
1758 let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl1".into());
1759
1760 assert_eq!(
1761 catalog
1762 .rename_table(&src_table_ident, &dst_table_ident)
1763 .await
1764 .unwrap_err()
1765 .to_string(),
1766 format!("NamespaceNotFound => No such namespace: {non_existent_src_namespace_ident:?}"),
1767 );
1768 }
1769
1770 #[tokio::test]
1771 async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
1772 let catalog = new_memory_catalog().await;
1773 let src_namespace_ident = NamespaceIdent::new("n1".into());
1774 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1775 create_namespace(&catalog, &src_namespace_ident).await;
1776 create_table(&catalog, &src_table_ident).await;
1777
1778 let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
1779 let dst_table_ident =
1780 TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
1781 assert_eq!(
1782 catalog
1783 .rename_table(&src_table_ident, &dst_table_ident)
1784 .await
1785 .unwrap_err()
1786 .to_string(),
1787 format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"),
1788 );
1789 }
1790
1791 #[tokio::test]
1792 async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
1793 let catalog = new_memory_catalog().await;
1794 let namespace_ident = NamespaceIdent::new("n1".into());
1795 create_namespace(&catalog, &namespace_ident).await;
1796 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1797 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1798
1799 assert_eq!(
1800 catalog
1801 .rename_table(&src_table_ident, &dst_table_ident)
1802 .await
1803 .unwrap_err()
1804 .to_string(),
1805 format!("TableNotFound => No such table: {src_table_ident:?}"),
1806 );
1807 }
1808
1809 #[tokio::test]
1810 async fn test_rename_table_throws_error_if_dst_table_already_exists() {
1811 let catalog = new_memory_catalog().await;
1812 let namespace_ident = NamespaceIdent::new("n1".into());
1813 create_namespace(&catalog, &namespace_ident).await;
1814 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1815 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1816 create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
1817
1818 assert_eq!(
1819 catalog
1820 .rename_table(&src_table_ident, &dst_table_ident)
1821 .await
1822 .unwrap_err()
1823 .to_string(),
1824 format!(
1825 "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1826 &dst_table_ident
1827 ),
1828 );
1829 }
1830
1831 #[tokio::test]
1832 async fn test_register_table() {
1833 let catalog = new_memory_catalog().await;
1835 let namespace_ident = NamespaceIdent::new("test_namespace".into());
1836 create_namespace(&catalog, &namespace_ident).await;
1837
1838 let source_table_ident = TableIdent::new(namespace_ident.clone(), "source_table".into());
1840 create_table(&catalog, &source_table_ident).await;
1841
1842 let source_table = catalog.load_table(&source_table_ident).await.unwrap();
1844 let metadata_location = source_table.metadata_location().unwrap().to_string();
1845
1846 let register_table_ident =
1848 TableIdent::new(namespace_ident.clone(), "register_table".into());
1849 let registered_table = catalog
1850 .register_table(®ister_table_ident, metadata_location.clone())
1851 .await
1852 .unwrap();
1853
1854 assert_eq!(registered_table.identifier(), ®ister_table_ident);
1856
1857 assert_eq!(
1859 registered_table.metadata_location().unwrap().to_string(),
1860 metadata_location
1861 );
1862
1863 assert!(catalog.table_exists(®ister_table_ident).await.unwrap());
1865
1866 let loaded_table = catalog.load_table(®ister_table_ident).await.unwrap();
1868 assert_eq!(loaded_table.identifier(), ®ister_table_ident);
1869 assert_eq!(
1870 loaded_table.metadata_location().unwrap().to_string(),
1871 metadata_location
1872 );
1873 }
1874
1875 #[tokio::test]
1876 async fn test_update_table() {
1877 let catalog = new_memory_catalog().await;
1878
1879 let table = create_table_with_namespace(&catalog).await;
1880
1881 assert!(!table.metadata().properties().contains_key("key"));
1883
1884 tokio::time::sleep(std::time::Duration::from_millis(2)).await;
1888
1889 let tx = Transaction::new(&table);
1891 let updated_table = tx
1892 .update_table_properties()
1893 .set("key".to_string(), "value".to_string())
1894 .apply(tx)
1895 .unwrap()
1896 .commit(&catalog)
1897 .await
1898 .unwrap();
1899
1900 assert_eq!(
1901 updated_table.metadata().properties().get("key").unwrap(),
1902 "value"
1903 );
1904
1905 assert_eq!(table.identifier(), updated_table.identifier());
1906 assert_eq!(table.metadata().uuid(), updated_table.metadata().uuid());
1907 assert_ne!(table.metadata_location(), updated_table.metadata_location());
1908
1909 assert!(
1910 table.metadata().metadata_log().len() < updated_table.metadata().metadata_log().len()
1911 );
1912 }
1913
1914 #[tokio::test]
1915 async fn test_update_table_fails_if_table_doesnt_exist() {
1916 let catalog = new_memory_catalog().await;
1917
1918 let namespace_ident = NamespaceIdent::new("a".into());
1919 create_namespace(&catalog, &namespace_ident).await;
1920
1921 let table_ident = TableIdent::new(namespace_ident, "test".to_string());
1923 let table = build_table(table_ident);
1924
1925 let tx = Transaction::new(&table);
1926 let err = tx
1927 .update_table_properties()
1928 .set("key".to_string(), "value".to_string())
1929 .apply(tx)
1930 .unwrap()
1931 .commit(&catalog)
1932 .await
1933 .unwrap_err();
1934 assert_eq!(err.kind(), ErrorKind::TableNotFound);
1935 }
1936
1937 fn build_table(ident: TableIdent) -> Table {
1938 let file_io = FileIO::new_with_fs();
1939
1940 let temp_dir = TempDir::new().unwrap();
1941 let location = temp_dir.path().to_str().unwrap().to_string();
1942
1943 let table_creation = TableCreation::builder()
1944 .name(ident.name().to_string())
1945 .schema(simple_table_schema())
1946 .location(location)
1947 .build();
1948 let metadata = TableMetadataBuilder::from_table_creation(table_creation)
1949 .unwrap()
1950 .build()
1951 .unwrap()
1952 .metadata;
1953
1954 Table::builder()
1955 .identifier(ident)
1956 .metadata(metadata)
1957 .file_io(file_io)
1958 .runtime(test_runtime())
1959 .build()
1960 .unwrap()
1961 }
1962}