1use std::collections::HashMap;
21
22use async_trait::async_trait;
23use futures::lock::{Mutex, MutexGuard};
24use itertools::Itertools;
25
26use super::namespace_state::NamespaceState;
27use crate::io::FileIO;
28use crate::spec::{TableMetadata, TableMetadataBuilder};
29use crate::table::Table;
30use crate::{
31 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
32 TableCommit, TableCreation, TableIdent,
33};
34
35pub const MEMORY_CATALOG_WAREHOUSE: &str = "warehouse";
37
38const LOCATION: &str = "location";
40
41#[derive(Debug)]
43pub struct MemoryCatalogBuilder(MemoryCatalogConfig);
44
45impl Default for MemoryCatalogBuilder {
46 fn default() -> Self {
47 Self(MemoryCatalogConfig {
48 name: None,
49 warehouse: "".to_string(),
50 props: HashMap::new(),
51 })
52 }
53}
54
55impl CatalogBuilder for MemoryCatalogBuilder {
56 type C = MemoryCatalog;
57
58 fn load(
59 mut self,
60 name: impl Into<String>,
61 props: HashMap<String, String>,
62 ) -> impl Future<Output = Result<Self::C>> + Send {
63 self.0.name = Some(name.into());
64
65 if props.contains_key(MEMORY_CATALOG_WAREHOUSE) {
66 self.0.warehouse = props
67 .get(MEMORY_CATALOG_WAREHOUSE)
68 .cloned()
69 .unwrap_or_default()
70 }
71
72 self.0.props = props
74 .into_iter()
75 .filter(|(k, _)| k != MEMORY_CATALOG_WAREHOUSE)
76 .collect();
77
78 let result = {
79 if self.0.name.is_none() {
80 Err(Error::new(
81 ErrorKind::DataInvalid,
82 "Catalog name is required",
83 ))
84 } else if self.0.warehouse.is_empty() {
85 Err(Error::new(
86 ErrorKind::DataInvalid,
87 "Catalog warehouse is required",
88 ))
89 } else {
90 MemoryCatalog::new(self.0)
91 }
92 };
93
94 std::future::ready(result)
95 }
96}
97
98#[derive(Clone, Debug)]
99pub(crate) struct MemoryCatalogConfig {
100 name: Option<String>,
101 warehouse: String,
102 props: HashMap<String, String>,
103}
104
105#[derive(Debug)]
107pub struct MemoryCatalog {
108 root_namespace_state: Mutex<NamespaceState>,
109 file_io: FileIO,
110 warehouse_location: String,
111}
112
113impl MemoryCatalog {
114 fn new(config: MemoryCatalogConfig) -> Result<Self> {
116 Ok(Self {
117 root_namespace_state: Mutex::new(NamespaceState::default()),
118 file_io: FileIO::from_path(&config.warehouse)?
119 .with_props(config.props)
120 .build()?,
121 warehouse_location: config.warehouse,
122 })
123 }
124
125 async fn load_table_from_locked_state(
127 &self,
128 table_ident: &TableIdent,
129 root_namespace_state: &MutexGuard<'_, NamespaceState>,
130 ) -> Result<Table> {
131 let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?;
132 let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
133
134 Table::builder()
135 .identifier(table_ident.clone())
136 .metadata(metadata)
137 .metadata_location(metadata_location.to_string())
138 .file_io(self.file_io.clone())
139 .build()
140 }
141}
142
143#[async_trait]
144impl Catalog for MemoryCatalog {
145 async fn list_namespaces(
147 &self,
148 maybe_parent: Option<&NamespaceIdent>,
149 ) -> Result<Vec<NamespaceIdent>> {
150 let root_namespace_state = self.root_namespace_state.lock().await;
151
152 match maybe_parent {
153 None => {
154 let namespaces = root_namespace_state
155 .list_top_level_namespaces()
156 .into_iter()
157 .map(|str| NamespaceIdent::new(str.to_string()))
158 .collect_vec();
159
160 Ok(namespaces)
161 }
162 Some(parent_namespace_ident) => {
163 let namespaces = root_namespace_state
164 .list_namespaces_under(parent_namespace_ident)?
165 .into_iter()
166 .map(|name| NamespaceIdent::new(name.to_string()))
167 .collect_vec();
168
169 Ok(namespaces)
170 }
171 }
172 }
173
174 async fn create_namespace(
176 &self,
177 namespace_ident: &NamespaceIdent,
178 properties: HashMap<String, String>,
179 ) -> Result<Namespace> {
180 let mut root_namespace_state = self.root_namespace_state.lock().await;
181
182 root_namespace_state.insert_new_namespace(namespace_ident, properties.clone())?;
183 let namespace = Namespace::with_properties(namespace_ident.clone(), properties);
184
185 Ok(namespace)
186 }
187
188 async fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<Namespace> {
190 let root_namespace_state = self.root_namespace_state.lock().await;
191
192 let namespace = Namespace::with_properties(
193 namespace_ident.clone(),
194 root_namespace_state
195 .get_properties(namespace_ident)?
196 .clone(),
197 );
198
199 Ok(namespace)
200 }
201
202 async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> Result<bool> {
204 let guarded_namespaces = self.root_namespace_state.lock().await;
205
206 Ok(guarded_namespaces.namespace_exists(namespace_ident))
207 }
208
209 async fn update_namespace(
215 &self,
216 namespace_ident: &NamespaceIdent,
217 properties: HashMap<String, String>,
218 ) -> Result<()> {
219 let mut root_namespace_state = self.root_namespace_state.lock().await;
220
221 root_namespace_state.replace_properties(namespace_ident, properties)
222 }
223
224 async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<()> {
226 let mut root_namespace_state = self.root_namespace_state.lock().await;
227
228 root_namespace_state.remove_existing_namespace(namespace_ident)
229 }
230
231 async fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result<Vec<TableIdent>> {
233 let root_namespace_state = self.root_namespace_state.lock().await;
234
235 let table_names = root_namespace_state.list_tables(namespace_ident)?;
236 let table_idents = table_names
237 .into_iter()
238 .map(|table_name| TableIdent::new(namespace_ident.clone(), table_name.clone()))
239 .collect_vec();
240
241 Ok(table_idents)
242 }
243
244 async fn create_table(
246 &self,
247 namespace_ident: &NamespaceIdent,
248 table_creation: TableCreation,
249 ) -> Result<Table> {
250 let mut root_namespace_state = self.root_namespace_state.lock().await;
251
252 let table_name = table_creation.name.clone();
253 let table_ident = TableIdent::new(namespace_ident.clone(), table_name);
254
255 let (table_creation, location) = match table_creation.location.clone() {
256 Some(location) => (table_creation, location),
257 None => {
258 let namespace_properties = root_namespace_state.get_properties(namespace_ident)?;
259 let location_prefix = match namespace_properties.get(LOCATION) {
260 Some(namespace_location) => namespace_location.clone(),
261 None => format!("{}/{}", self.warehouse_location, namespace_ident.join("/")),
262 };
263
264 let location = format!("{}/{}", location_prefix, table_ident.name());
265
266 let new_table_creation = TableCreation {
267 location: Some(location.clone()),
268 ..table_creation
269 };
270
271 (new_table_creation, location)
272 }
273 };
274
275 let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
276 .build()?
277 .metadata;
278 let metadata_location = MetadataLocation::new_with_table_location(location).to_string();
279
280 metadata.write_to(&self.file_io, &metadata_location).await?;
281
282 root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?;
283
284 Table::builder()
285 .file_io(self.file_io.clone())
286 .metadata_location(metadata_location)
287 .metadata(metadata)
288 .identifier(table_ident)
289 .build()
290 }
291
292 async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
294 let root_namespace_state = self.root_namespace_state.lock().await;
295
296 self.load_table_from_locked_state(table_ident, &root_namespace_state)
297 .await
298 }
299
300 async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> {
302 let mut root_namespace_state = self.root_namespace_state.lock().await;
303
304 let metadata_location = root_namespace_state.remove_existing_table(table_ident)?;
305 self.file_io.delete(&metadata_location).await
306 }
307
308 async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
310 let root_namespace_state = self.root_namespace_state.lock().await;
311
312 root_namespace_state.table_exists(table_ident)
313 }
314
315 async fn rename_table(
317 &self,
318 src_table_ident: &TableIdent,
319 dst_table_ident: &TableIdent,
320 ) -> Result<()> {
321 let mut root_namespace_state = self.root_namespace_state.lock().await;
322
323 let mut new_root_namespace_state = root_namespace_state.clone();
324 let metadata_location = new_root_namespace_state
325 .get_existing_table_location(src_table_ident)?
326 .clone();
327 new_root_namespace_state.remove_existing_table(src_table_ident)?;
328 new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?;
329 *root_namespace_state = new_root_namespace_state;
330
331 Ok(())
332 }
333
334 async fn register_table(
335 &self,
336 table_ident: &TableIdent,
337 metadata_location: String,
338 ) -> Result<Table> {
339 let mut root_namespace_state = self.root_namespace_state.lock().await;
340 root_namespace_state.insert_new_table(&table_ident.clone(), metadata_location.clone())?;
341
342 let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
343
344 Table::builder()
345 .file_io(self.file_io.clone())
346 .metadata_location(metadata_location)
347 .metadata(metadata)
348 .identifier(table_ident.clone())
349 .build()
350 }
351
352 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
354 let mut root_namespace_state = self.root_namespace_state.lock().await;
355
356 let current_table = self
357 .load_table_from_locked_state(commit.identifier(), &root_namespace_state)
358 .await?;
359
360 let staged_table = commit.apply(current_table)?;
362
363 staged_table
365 .metadata()
366 .write_to(
367 staged_table.file_io(),
368 staged_table.metadata_location_result()?,
369 )
370 .await?;
371
372 let updated_table = root_namespace_state.commit_table_update(staged_table)?;
374
375 Ok(updated_table)
376 }
377}
378
379#[cfg(test)]
380pub(crate) mod tests {
381 use std::collections::HashSet;
382 use std::hash::Hash;
383 use std::iter::FromIterator;
384 use std::vec;
385
386 use regex::Regex;
387 use tempfile::TempDir;
388
389 use super::*;
390 use crate::io::FileIOBuilder;
391 use crate::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
392 use crate::transaction::{ApplyTransactionAction, Transaction};
393
394 fn temp_path() -> String {
395 let temp_dir = TempDir::new().unwrap();
396 temp_dir.path().to_str().unwrap().to_string()
397 }
398
399 pub(crate) async fn new_memory_catalog() -> impl Catalog {
400 let warehouse_location = temp_path();
401 MemoryCatalogBuilder::default()
402 .load(
403 "memory",
404 HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_location)]),
405 )
406 .await
407 .unwrap()
408 }
409
410 async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
411 let _ = catalog
412 .create_namespace(namespace_ident, HashMap::new())
413 .await
414 .unwrap();
415 }
416
417 async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
418 for namespace_ident in namespace_idents {
419 let _ = create_namespace(catalog, namespace_ident).await;
420 }
421 }
422
423 fn to_set<T: Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
424 HashSet::from_iter(vec)
425 }
426
427 fn simple_table_schema() -> Schema {
428 Schema::builder()
429 .with_fields(vec![
430 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
431 ])
432 .build()
433 .unwrap()
434 }
435
436 async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) -> Table {
437 catalog
438 .create_table(
439 &table_ident.namespace,
440 TableCreation::builder()
441 .name(table_ident.name().into())
442 .schema(simple_table_schema())
443 .build(),
444 )
445 .await
446 .unwrap()
447 }
448
449 async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
450 for table_ident in table_idents {
451 create_table(catalog, table_ident).await;
452 }
453 }
454
455 async fn create_table_with_namespace<C: Catalog>(catalog: &C) -> Table {
456 let namespace_ident = NamespaceIdent::new("abc".into());
457 create_namespace(catalog, &namespace_ident).await;
458
459 let table_ident = TableIdent::new(namespace_ident, "test".to_string());
460 create_table(catalog, &table_ident).await
461 }
462
463 fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
464 assert_eq!(table.identifier(), expected_table_ident);
465
466 let metadata = table.metadata();
467
468 assert_eq!(metadata.current_schema().as_ref(), expected_schema);
469
470 let expected_partition_spec = PartitionSpec::builder((*expected_schema).clone())
471 .with_spec_id(0)
472 .build()
473 .unwrap();
474
475 assert_eq!(
476 metadata
477 .partition_specs_iter()
478 .map(|p| p.as_ref())
479 .collect_vec(),
480 vec![&expected_partition_spec]
481 );
482
483 let expected_sorted_order = SortOrder::builder()
484 .with_order_id(0)
485 .with_fields(vec![])
486 .build(expected_schema)
487 .unwrap();
488
489 assert_eq!(
490 metadata
491 .sort_orders_iter()
492 .map(|s| s.as_ref())
493 .collect_vec(),
494 vec![&expected_sorted_order]
495 );
496
497 assert_eq!(metadata.properties(), &HashMap::new());
498
499 assert!(!table.readonly());
500 }
501
502 const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
503
504 fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
505 let actual = table.metadata_location().unwrap().to_string();
506 let regex = Regex::new(regex_str).unwrap();
507 assert!(
508 regex.is_match(&actual),
509 "Expected metadata location to match regex, but got location: {actual} and regex: {regex}"
510 )
511 }
512
513 #[tokio::test]
514 async fn test_list_namespaces_returns_empty_vector() {
515 let catalog = new_memory_catalog().await;
516
517 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
518 }
519
520 #[tokio::test]
521 async fn test_list_namespaces_returns_single_namespace() {
522 let catalog = new_memory_catalog().await;
523 let namespace_ident = NamespaceIdent::new("abc".into());
524 create_namespace(&catalog, &namespace_ident).await;
525
526 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
527 namespace_ident
528 ]);
529 }
530
531 #[tokio::test]
532 async fn test_list_namespaces_returns_multiple_namespaces() {
533 let catalog = new_memory_catalog().await;
534 let namespace_ident_1 = NamespaceIdent::new("a".into());
535 let namespace_ident_2 = NamespaceIdent::new("b".into());
536 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
537
538 assert_eq!(
539 to_set(catalog.list_namespaces(None).await.unwrap()),
540 to_set(vec![namespace_ident_1, namespace_ident_2])
541 );
542 }
543
544 #[tokio::test]
545 async fn test_list_namespaces_returns_only_top_level_namespaces() {
546 let catalog = new_memory_catalog().await;
547 let namespace_ident_1 = NamespaceIdent::new("a".into());
548 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
549 let namespace_ident_3 = NamespaceIdent::new("b".into());
550 create_namespaces(&catalog, &vec![
551 &namespace_ident_1,
552 &namespace_ident_2,
553 &namespace_ident_3,
554 ])
555 .await;
556
557 assert_eq!(
558 to_set(catalog.list_namespaces(None).await.unwrap()),
559 to_set(vec![namespace_ident_1, namespace_ident_3])
560 );
561 }
562
563 #[tokio::test]
564 async fn test_list_namespaces_returns_no_namespaces_under_parent() {
565 let catalog = new_memory_catalog().await;
566 let namespace_ident_1 = NamespaceIdent::new("a".into());
567 let namespace_ident_2 = NamespaceIdent::new("b".into());
568 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
569
570 assert_eq!(
571 catalog
572 .list_namespaces(Some(&namespace_ident_1))
573 .await
574 .unwrap(),
575 vec![]
576 );
577 }
578
579 #[tokio::test]
580 async fn test_list_namespaces_returns_namespace_under_parent() {
581 let catalog = new_memory_catalog().await;
582 let namespace_ident_1 = NamespaceIdent::new("a".into());
583 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
584 let namespace_ident_3 = NamespaceIdent::new("c".into());
585 create_namespaces(&catalog, &vec![
586 &namespace_ident_1,
587 &namespace_ident_2,
588 &namespace_ident_3,
589 ])
590 .await;
591
592 assert_eq!(
593 to_set(catalog.list_namespaces(None).await.unwrap()),
594 to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
595 );
596
597 assert_eq!(
598 catalog
599 .list_namespaces(Some(&namespace_ident_1))
600 .await
601 .unwrap(),
602 vec![NamespaceIdent::new("b".into())]
603 );
604 }
605
606 #[tokio::test]
607 async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
608 let catalog = new_memory_catalog().await;
609 let namespace_ident_1 = NamespaceIdent::new("a".to_string());
610 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
611 let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
612 let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
613 let namespace_ident_5 = NamespaceIdent::new("b".into());
614 create_namespaces(&catalog, &vec![
615 &namespace_ident_1,
616 &namespace_ident_2,
617 &namespace_ident_3,
618 &namespace_ident_4,
619 &namespace_ident_5,
620 ])
621 .await;
622
623 assert_eq!(
624 to_set(
625 catalog
626 .list_namespaces(Some(&namespace_ident_1))
627 .await
628 .unwrap()
629 ),
630 to_set(vec![
631 NamespaceIdent::new("a".into()),
632 NamespaceIdent::new("b".into()),
633 NamespaceIdent::new("c".into()),
634 ])
635 );
636 }
637
638 #[tokio::test]
639 async fn test_namespace_exists_returns_false() {
640 let catalog = new_memory_catalog().await;
641 let namespace_ident = NamespaceIdent::new("a".into());
642 create_namespace(&catalog, &namespace_ident).await;
643
644 assert!(
645 !catalog
646 .namespace_exists(&NamespaceIdent::new("b".into()))
647 .await
648 .unwrap()
649 );
650 }
651
652 #[tokio::test]
653 async fn test_namespace_exists_returns_true() {
654 let catalog = new_memory_catalog().await;
655 let namespace_ident = NamespaceIdent::new("a".into());
656 create_namespace(&catalog, &namespace_ident).await;
657
658 assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
659 }
660
661 #[tokio::test]
662 async fn test_create_namespace_with_empty_properties() {
663 let catalog = new_memory_catalog().await;
664 let namespace_ident = NamespaceIdent::new("a".into());
665
666 assert_eq!(
667 catalog
668 .create_namespace(&namespace_ident, HashMap::new())
669 .await
670 .unwrap(),
671 Namespace::new(namespace_ident.clone())
672 );
673
674 assert_eq!(
675 catalog.get_namespace(&namespace_ident).await.unwrap(),
676 Namespace::with_properties(namespace_ident, HashMap::new())
677 );
678 }
679
680 #[tokio::test]
681 async fn test_create_namespace_with_properties() {
682 let catalog = new_memory_catalog().await;
683 let namespace_ident = NamespaceIdent::new("abc".into());
684
685 let mut properties: HashMap<String, String> = HashMap::new();
686 properties.insert("k".into(), "v".into());
687
688 assert_eq!(
689 catalog
690 .create_namespace(&namespace_ident, properties.clone())
691 .await
692 .unwrap(),
693 Namespace::with_properties(namespace_ident.clone(), properties.clone())
694 );
695
696 assert_eq!(
697 catalog.get_namespace(&namespace_ident).await.unwrap(),
698 Namespace::with_properties(namespace_ident, properties)
699 );
700 }
701
702 #[tokio::test]
703 async fn test_create_namespace_throws_error_if_namespace_already_exists() {
704 let catalog = new_memory_catalog().await;
705 let namespace_ident = NamespaceIdent::new("a".into());
706 create_namespace(&catalog, &namespace_ident).await;
707
708 assert_eq!(
709 catalog
710 .create_namespace(&namespace_ident, HashMap::new())
711 .await
712 .unwrap_err()
713 .to_string(),
714 format!(
715 "NamespaceAlreadyExists => Cannot create namespace {:?}. Namespace already exists.",
716 &namespace_ident
717 )
718 );
719
720 assert_eq!(
721 catalog.get_namespace(&namespace_ident).await.unwrap(),
722 Namespace::with_properties(namespace_ident, HashMap::new())
723 );
724 }
725
726 #[tokio::test]
727 async fn test_create_nested_namespace() {
728 let catalog = new_memory_catalog().await;
729 let parent_namespace_ident = NamespaceIdent::new("a".into());
730 create_namespace(&catalog, &parent_namespace_ident).await;
731
732 let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
733
734 assert_eq!(
735 catalog
736 .create_namespace(&child_namespace_ident, HashMap::new())
737 .await
738 .unwrap(),
739 Namespace::new(child_namespace_ident.clone())
740 );
741
742 assert_eq!(
743 catalog.get_namespace(&child_namespace_ident).await.unwrap(),
744 Namespace::with_properties(child_namespace_ident, HashMap::new())
745 );
746 }
747
748 #[tokio::test]
749 async fn test_create_deeply_nested_namespace() {
750 let catalog = new_memory_catalog().await;
751 let namespace_ident_a = NamespaceIdent::new("a".into());
752 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
753 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
754
755 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
756
757 assert_eq!(
758 catalog
759 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
760 .await
761 .unwrap(),
762 Namespace::new(namespace_ident_a_b_c.clone())
763 );
764
765 assert_eq!(
766 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
767 Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
768 );
769 }
770
771 #[tokio::test]
772 async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() {
773 let catalog = new_memory_catalog().await;
774
775 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
776
777 assert_eq!(
778 catalog
779 .create_namespace(&nested_namespace_ident, HashMap::new())
780 .await
781 .unwrap_err()
782 .to_string(),
783 format!(
784 "NamespaceNotFound => No such namespace: {:?}",
785 NamespaceIdent::new("a".into())
786 )
787 );
788
789 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
790 }
791
792 #[tokio::test]
793 async fn test_create_deeply_nested_namespace_throws_error_if_intermediate_namespace_doesnt_exist()
794 {
795 let catalog = new_memory_catalog().await;
796
797 let namespace_ident_a = NamespaceIdent::new("a".into());
798 create_namespace(&catalog, &namespace_ident_a).await;
799
800 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
801
802 assert_eq!(
803 catalog
804 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
805 .await
806 .unwrap_err()
807 .to_string(),
808 format!(
809 "NamespaceNotFound => No such namespace: {:?}",
810 NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()
811 )
812 );
813
814 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
815 namespace_ident_a.clone()
816 ]);
817
818 assert_eq!(
819 catalog
820 .list_namespaces(Some(&namespace_ident_a))
821 .await
822 .unwrap(),
823 vec![]
824 );
825 }
826
827 #[tokio::test]
828 async fn test_get_namespace() {
829 let catalog = new_memory_catalog().await;
830 let namespace_ident = NamespaceIdent::new("abc".into());
831
832 let mut properties: HashMap<String, String> = HashMap::new();
833 properties.insert("k".into(), "v".into());
834 let _ = catalog
835 .create_namespace(&namespace_ident, properties.clone())
836 .await
837 .unwrap();
838
839 assert_eq!(
840 catalog.get_namespace(&namespace_ident).await.unwrap(),
841 Namespace::with_properties(namespace_ident, properties)
842 )
843 }
844
845 #[tokio::test]
846 async fn test_get_nested_namespace() {
847 let catalog = new_memory_catalog().await;
848 let namespace_ident_a = NamespaceIdent::new("a".into());
849 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
850 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
851
852 assert_eq!(
853 catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
854 Namespace::with_properties(namespace_ident_a_b, HashMap::new())
855 );
856 }
857
858 #[tokio::test]
859 async fn test_get_deeply_nested_namespace() {
860 let catalog = new_memory_catalog().await;
861 let namespace_ident_a = NamespaceIdent::new("a".into());
862 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
863 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
864 create_namespaces(&catalog, &vec![
865 &namespace_ident_a,
866 &namespace_ident_a_b,
867 &namespace_ident_a_b_c,
868 ])
869 .await;
870
871 assert_eq!(
872 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
873 Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
874 );
875 }
876
877 #[tokio::test]
878 async fn test_get_namespace_throws_error_if_namespace_doesnt_exist() {
879 let catalog = new_memory_catalog().await;
880 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
881
882 let non_existent_namespace_ident = NamespaceIdent::new("b".into());
883 assert_eq!(
884 catalog
885 .get_namespace(&non_existent_namespace_ident)
886 .await
887 .unwrap_err()
888 .to_string(),
889 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
890 )
891 }
892
893 #[tokio::test]
894 async fn test_update_namespace() {
895 let catalog = new_memory_catalog().await;
896 let namespace_ident = NamespaceIdent::new("abc".into());
897 create_namespace(&catalog, &namespace_ident).await;
898
899 let mut new_properties: HashMap<String, String> = HashMap::new();
900 new_properties.insert("k".into(), "v".into());
901
902 catalog
903 .update_namespace(&namespace_ident, new_properties.clone())
904 .await
905 .unwrap();
906
907 assert_eq!(
908 catalog.get_namespace(&namespace_ident).await.unwrap(),
909 Namespace::with_properties(namespace_ident, new_properties)
910 )
911 }
912
913 #[tokio::test]
914 async fn test_update_nested_namespace() {
915 let catalog = new_memory_catalog().await;
916 let namespace_ident_a = NamespaceIdent::new("a".into());
917 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
918 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
919
920 let mut new_properties = HashMap::new();
921 new_properties.insert("k".into(), "v".into());
922
923 catalog
924 .update_namespace(&namespace_ident_a_b, new_properties.clone())
925 .await
926 .unwrap();
927
928 assert_eq!(
929 catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
930 Namespace::with_properties(namespace_ident_a_b, new_properties)
931 );
932 }
933
934 #[tokio::test]
935 async fn test_update_deeply_nested_namespace() {
936 let catalog = new_memory_catalog().await;
937 let namespace_ident_a = NamespaceIdent::new("a".into());
938 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
939 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
940 create_namespaces(&catalog, &vec![
941 &namespace_ident_a,
942 &namespace_ident_a_b,
943 &namespace_ident_a_b_c,
944 ])
945 .await;
946
947 let mut new_properties = HashMap::new();
948 new_properties.insert("k".into(), "v".into());
949
950 catalog
951 .update_namespace(&namespace_ident_a_b_c, new_properties.clone())
952 .await
953 .unwrap();
954
955 assert_eq!(
956 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
957 Namespace::with_properties(namespace_ident_a_b_c, new_properties)
958 );
959 }
960
961 #[tokio::test]
962 async fn test_update_namespace_throws_error_if_namespace_doesnt_exist() {
963 let catalog = new_memory_catalog().await;
964 create_namespace(&catalog, &NamespaceIdent::new("abc".into())).await;
965
966 let non_existent_namespace_ident = NamespaceIdent::new("def".into());
967 assert_eq!(
968 catalog
969 .update_namespace(&non_existent_namespace_ident, HashMap::new())
970 .await
971 .unwrap_err()
972 .to_string(),
973 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
974 )
975 }
976
977 #[tokio::test]
978 async fn test_drop_namespace() {
979 let catalog = new_memory_catalog().await;
980 let namespace_ident = NamespaceIdent::new("abc".into());
981 create_namespace(&catalog, &namespace_ident).await;
982
983 catalog.drop_namespace(&namespace_ident).await.unwrap();
984
985 assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
986 }
987
988 #[tokio::test]
989 async fn test_drop_nested_namespace() {
990 let catalog = new_memory_catalog().await;
991 let namespace_ident_a = NamespaceIdent::new("a".into());
992 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
993 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
994
995 catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
996
997 assert!(
998 !catalog
999 .namespace_exists(&namespace_ident_a_b)
1000 .await
1001 .unwrap()
1002 );
1003
1004 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1005 }
1006
1007 #[tokio::test]
1008 async fn test_drop_deeply_nested_namespace() {
1009 let catalog = new_memory_catalog().await;
1010 let namespace_ident_a = NamespaceIdent::new("a".into());
1011 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1012 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1013 create_namespaces(&catalog, &vec![
1014 &namespace_ident_a,
1015 &namespace_ident_a_b,
1016 &namespace_ident_a_b_c,
1017 ])
1018 .await;
1019
1020 catalog
1021 .drop_namespace(&namespace_ident_a_b_c)
1022 .await
1023 .unwrap();
1024
1025 assert!(
1026 !catalog
1027 .namespace_exists(&namespace_ident_a_b_c)
1028 .await
1029 .unwrap()
1030 );
1031
1032 assert!(
1033 catalog
1034 .namespace_exists(&namespace_ident_a_b)
1035 .await
1036 .unwrap()
1037 );
1038
1039 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1040 }
1041
1042 #[tokio::test]
1043 async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1044 let catalog = new_memory_catalog().await;
1045
1046 let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1047 assert_eq!(
1048 catalog
1049 .drop_namespace(&non_existent_namespace_ident)
1050 .await
1051 .unwrap_err()
1052 .to_string(),
1053 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1054 )
1055 }
1056
1057 #[tokio::test]
1058 async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1059 let catalog = new_memory_catalog().await;
1060 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1061
1062 let non_existent_namespace_ident =
1063 NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1064 assert_eq!(
1065 catalog
1066 .drop_namespace(&non_existent_namespace_ident)
1067 .await
1068 .unwrap_err()
1069 .to_string(),
1070 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1071 )
1072 }
1073
1074 #[tokio::test]
1075 async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() {
1076 let catalog = new_memory_catalog().await;
1077 let namespace_ident_a = NamespaceIdent::new("a".into());
1078 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1079 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1080
1081 catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1082
1083 assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1084
1085 assert!(
1086 !catalog
1087 .namespace_exists(&namespace_ident_a_b)
1088 .await
1089 .unwrap()
1090 );
1091 }
1092
1093 #[tokio::test]
1094 async fn test_create_table_with_location() {
1095 let tmp_dir = TempDir::new().unwrap();
1096 let catalog = new_memory_catalog().await;
1097 let namespace_ident = NamespaceIdent::new("a".into());
1098 create_namespace(&catalog, &namespace_ident).await;
1099
1100 let table_name = "abc";
1101 let location = tmp_dir.path().to_str().unwrap().to_string();
1102 let table_creation = TableCreation::builder()
1103 .name(table_name.into())
1104 .location(location.clone())
1105 .schema(simple_table_schema())
1106 .build();
1107
1108 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1109
1110 assert_table_eq(
1111 &catalog
1112 .create_table(&namespace_ident, table_creation)
1113 .await
1114 .unwrap(),
1115 &expected_table_ident,
1116 &simple_table_schema(),
1117 );
1118
1119 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1120
1121 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1122
1123 assert!(
1124 table
1125 .metadata_location()
1126 .unwrap()
1127 .to_string()
1128 .starts_with(&location)
1129 )
1130 }
1131
1132 #[tokio::test]
1133 async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1134 let warehouse_location = temp_path();
1135 let catalog = MemoryCatalogBuilder::default()
1136 .load(
1137 "memory",
1138 HashMap::from([(
1139 MEMORY_CATALOG_WAREHOUSE.to_string(),
1140 warehouse_location.clone(),
1141 )]),
1142 )
1143 .await
1144 .unwrap();
1145
1146 let namespace_ident = NamespaceIdent::new("a".into());
1147 let mut namespace_properties = HashMap::new();
1148 let namespace_location = temp_path();
1149 namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1150 catalog
1151 .create_namespace(&namespace_ident, namespace_properties)
1152 .await
1153 .unwrap();
1154
1155 let table_name = "tbl1";
1156 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1157 let expected_table_metadata_location_regex =
1158 format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1159
1160 let table = catalog
1161 .create_table(
1162 &namespace_ident,
1163 TableCreation::builder()
1164 .name(table_name.into())
1165 .schema(simple_table_schema())
1166 .build(),
1168 )
1169 .await
1170 .unwrap();
1171 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1172 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1173
1174 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1175 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1176 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1177 }
1178
1179 #[tokio::test]
1180 async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1181 {
1182 let warehouse_location = temp_path();
1183 let catalog = MemoryCatalogBuilder::default()
1184 .load(
1185 "memory",
1186 HashMap::from([(
1187 MEMORY_CATALOG_WAREHOUSE.to_string(),
1188 warehouse_location.clone(),
1189 )]),
1190 )
1191 .await
1192 .unwrap();
1193
1194 let namespace_ident = NamespaceIdent::new("a".into());
1195 let mut namespace_properties = HashMap::new();
1196 let namespace_location = temp_path();
1197 namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1198 catalog
1199 .create_namespace(&namespace_ident, namespace_properties)
1200 .await
1201 .unwrap();
1202
1203 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1204 let mut nested_namespace_properties = HashMap::new();
1205 let nested_namespace_location = temp_path();
1206 nested_namespace_properties
1207 .insert(LOCATION.to_string(), nested_namespace_location.to_string());
1208 catalog
1209 .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1210 .await
1211 .unwrap();
1212
1213 let table_name = "tbl1";
1214 let expected_table_ident =
1215 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1216 let expected_table_metadata_location_regex = format!(
1217 "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1218 );
1219
1220 let table = catalog
1221 .create_table(
1222 &nested_namespace_ident,
1223 TableCreation::builder()
1224 .name(table_name.into())
1225 .schema(simple_table_schema())
1226 .build(),
1228 )
1229 .await
1230 .unwrap();
1231 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1232 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1233
1234 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1235 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1236 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1237 }
1238
1239 #[tokio::test]
1240 async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1241 {
1242 let warehouse_location = temp_path();
1243 let catalog = MemoryCatalogBuilder::default()
1244 .load(
1245 "memory",
1246 HashMap::from([(
1247 MEMORY_CATALOG_WAREHOUSE.to_string(),
1248 warehouse_location.clone(),
1249 )]),
1250 )
1251 .await
1252 .unwrap();
1253
1254 let namespace_ident = NamespaceIdent::new("a".into());
1255 let namespace_properties = HashMap::new();
1257 catalog
1258 .create_namespace(&namespace_ident, namespace_properties)
1259 .await
1260 .unwrap();
1261
1262 let table_name = "tbl1";
1263 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1264 let expected_table_metadata_location_regex =
1265 format!("^{warehouse_location}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1266
1267 let table = catalog
1268 .create_table(
1269 &namespace_ident,
1270 TableCreation::builder()
1271 .name(table_name.into())
1272 .schema(simple_table_schema())
1273 .build(),
1275 )
1276 .await
1277 .unwrap();
1278 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1279 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1280
1281 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1282 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1283 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1284 }
1285
1286 #[tokio::test]
1287 async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1288 {
1289 let warehouse_location = temp_path();
1290 let catalog = MemoryCatalogBuilder::default()
1291 .load(
1292 "memory",
1293 HashMap::from([(
1294 MEMORY_CATALOG_WAREHOUSE.to_string(),
1295 warehouse_location.clone(),
1296 )]),
1297 )
1298 .await
1299 .unwrap();
1300
1301 let namespace_ident = NamespaceIdent::new("a".into());
1302 catalog
1303 .create_namespace(&namespace_ident, HashMap::new())
1305 .await
1306 .unwrap();
1307
1308 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1309 catalog
1310 .create_namespace(&nested_namespace_ident, HashMap::new())
1312 .await
1313 .unwrap();
1314
1315 let table_name = "tbl1";
1316 let expected_table_ident =
1317 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1318 let expected_table_metadata_location_regex = format!(
1319 "^{warehouse_location}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$"
1320 );
1321
1322 let table = catalog
1323 .create_table(
1324 &nested_namespace_ident,
1325 TableCreation::builder()
1326 .name(table_name.into())
1327 .schema(simple_table_schema())
1328 .build(),
1330 )
1331 .await
1332 .unwrap();
1333 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1334 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1335
1336 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1337 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1338 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1339 }
1340
1341 #[tokio::test]
1342 async fn test_create_table_throws_error_if_table_location_and_namespace_location_and_warehouse_location_are_missing()
1343 {
1344 let catalog = MemoryCatalogBuilder::default()
1345 .load("memory", HashMap::from([]))
1346 .await;
1347
1348 assert!(catalog.is_err());
1349 assert_eq!(
1350 catalog.unwrap_err().to_string(),
1351 "DataInvalid => Catalog warehouse is required"
1352 );
1353 }
1354
1355 #[tokio::test]
1356 async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1357 let catalog = new_memory_catalog().await;
1358 let namespace_ident = NamespaceIdent::new("a".into());
1359 create_namespace(&catalog, &namespace_ident).await;
1360 let table_name = "tbl1";
1361 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1362 create_table(&catalog, &table_ident).await;
1363
1364 let tmp_dir = TempDir::new().unwrap();
1365 let location = tmp_dir.path().to_str().unwrap().to_string();
1366
1367 assert_eq!(
1368 catalog
1369 .create_table(
1370 &namespace_ident,
1371 TableCreation::builder()
1372 .name(table_name.into())
1373 .schema(simple_table_schema())
1374 .location(location)
1375 .build()
1376 )
1377 .await
1378 .unwrap_err()
1379 .to_string(),
1380 format!(
1381 "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1382 &table_ident
1383 )
1384 );
1385 }
1386
1387 #[tokio::test]
1388 async fn test_list_tables_returns_empty_vector() {
1389 let catalog = new_memory_catalog().await;
1390 let namespace_ident = NamespaceIdent::new("a".into());
1391 create_namespace(&catalog, &namespace_ident).await;
1392
1393 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1394 }
1395
1396 #[tokio::test]
1397 async fn test_list_tables_returns_a_single_table() {
1398 let catalog = new_memory_catalog().await;
1399 let namespace_ident = NamespaceIdent::new("n1".into());
1400 create_namespace(&catalog, &namespace_ident).await;
1401
1402 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1403 create_table(&catalog, &table_ident).await;
1404
1405 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1406 table_ident
1407 ]);
1408 }
1409
1410 #[tokio::test]
1411 async fn test_list_tables_returns_multiple_tables() {
1412 let catalog = new_memory_catalog().await;
1413 let namespace_ident = NamespaceIdent::new("n1".into());
1414 create_namespace(&catalog, &namespace_ident).await;
1415
1416 let table_ident_1 = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1417 let table_ident_2 = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1418 let _ = create_tables(&catalog, vec![&table_ident_1, &table_ident_2]).await;
1419
1420 assert_eq!(
1421 to_set(catalog.list_tables(&namespace_ident).await.unwrap()),
1422 to_set(vec![table_ident_1, table_ident_2])
1423 );
1424 }
1425
1426 #[tokio::test]
1427 async fn test_list_tables_returns_tables_from_correct_namespace() {
1428 let catalog = new_memory_catalog().await;
1429 let namespace_ident_1 = NamespaceIdent::new("n1".into());
1430 let namespace_ident_2 = NamespaceIdent::new("n2".into());
1431 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1432
1433 let table_ident_1 = TableIdent::new(namespace_ident_1.clone(), "tbl1".into());
1434 let table_ident_2 = TableIdent::new(namespace_ident_1.clone(), "tbl2".into());
1435 let table_ident_3 = TableIdent::new(namespace_ident_2.clone(), "tbl1".into());
1436 let _ = create_tables(&catalog, vec![
1437 &table_ident_1,
1438 &table_ident_2,
1439 &table_ident_3,
1440 ])
1441 .await;
1442
1443 assert_eq!(
1444 to_set(catalog.list_tables(&namespace_ident_1).await.unwrap()),
1445 to_set(vec![table_ident_1, table_ident_2])
1446 );
1447
1448 assert_eq!(
1449 to_set(catalog.list_tables(&namespace_ident_2).await.unwrap()),
1450 to_set(vec![table_ident_3])
1451 );
1452 }
1453
1454 #[tokio::test]
1455 async fn test_list_tables_returns_table_under_nested_namespace() {
1456 let catalog = new_memory_catalog().await;
1457 let namespace_ident_a = NamespaceIdent::new("a".into());
1458 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1459 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1460
1461 let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1462 create_table(&catalog, &table_ident).await;
1463
1464 assert_eq!(
1465 catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1466 vec![table_ident]
1467 );
1468 }
1469
1470 #[tokio::test]
1471 async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1472 let catalog = new_memory_catalog().await;
1473
1474 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1475
1476 assert_eq!(
1477 catalog
1478 .list_tables(&non_existent_namespace_ident)
1479 .await
1480 .unwrap_err()
1481 .to_string(),
1482 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1483 );
1484 }
1485
1486 #[tokio::test]
1487 async fn test_drop_table() {
1488 let catalog = new_memory_catalog().await;
1489 let namespace_ident = NamespaceIdent::new("n1".into());
1490 create_namespace(&catalog, &namespace_ident).await;
1491 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1492 create_table(&catalog, &table_ident).await;
1493
1494 catalog.drop_table(&table_ident).await.unwrap();
1495 }
1496
1497 #[tokio::test]
1498 async fn test_drop_table_drops_table_under_nested_namespace() {
1499 let catalog = new_memory_catalog().await;
1500 let namespace_ident_a = NamespaceIdent::new("a".into());
1501 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1502 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1503
1504 let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1505 create_table(&catalog, &table_ident).await;
1506
1507 catalog.drop_table(&table_ident).await.unwrap();
1508
1509 assert_eq!(
1510 catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1511 vec![]
1512 );
1513 }
1514
1515 #[tokio::test]
1516 async fn test_drop_table_throws_error_if_namespace_doesnt_exist() {
1517 let catalog = new_memory_catalog().await;
1518
1519 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1520 let non_existent_table_ident =
1521 TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1522
1523 assert_eq!(
1524 catalog
1525 .drop_table(&non_existent_table_ident)
1526 .await
1527 .unwrap_err()
1528 .to_string(),
1529 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1530 );
1531 }
1532
1533 #[tokio::test]
1534 async fn test_drop_table_throws_error_if_table_doesnt_exist() {
1535 let catalog = new_memory_catalog().await;
1536 let namespace_ident = NamespaceIdent::new("n1".into());
1537 create_namespace(&catalog, &namespace_ident).await;
1538
1539 let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1540
1541 assert_eq!(
1542 catalog
1543 .drop_table(&non_existent_table_ident)
1544 .await
1545 .unwrap_err()
1546 .to_string(),
1547 format!("TableNotFound => No such table: {non_existent_table_ident:?}"),
1548 );
1549 }
1550
1551 #[tokio::test]
1552 async fn test_table_exists_returns_true() {
1553 let catalog = new_memory_catalog().await;
1554 let namespace_ident = NamespaceIdent::new("n1".into());
1555 create_namespace(&catalog, &namespace_ident).await;
1556 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1557 create_table(&catalog, &table_ident).await;
1558
1559 assert!(catalog.table_exists(&table_ident).await.unwrap());
1560 }
1561
1562 #[tokio::test]
1563 async fn test_table_exists_returns_false() {
1564 let catalog = new_memory_catalog().await;
1565 let namespace_ident = NamespaceIdent::new("n1".into());
1566 create_namespace(&catalog, &namespace_ident).await;
1567 let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1568
1569 assert!(
1570 !catalog
1571 .table_exists(&non_existent_table_ident)
1572 .await
1573 .unwrap()
1574 );
1575 }
1576
1577 #[tokio::test]
1578 async fn test_table_exists_under_nested_namespace() {
1579 let catalog = new_memory_catalog().await;
1580 let namespace_ident_a = NamespaceIdent::new("a".into());
1581 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1582 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1583
1584 let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1585 create_table(&catalog, &table_ident).await;
1586
1587 assert!(catalog.table_exists(&table_ident).await.unwrap());
1588
1589 let non_existent_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl2".into());
1590 assert!(
1591 !catalog
1592 .table_exists(&non_existent_table_ident)
1593 .await
1594 .unwrap()
1595 );
1596 }
1597
1598 #[tokio::test]
1599 async fn test_table_exists_throws_error_if_namespace_doesnt_exist() {
1600 let catalog = new_memory_catalog().await;
1601
1602 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1603 let non_existent_table_ident =
1604 TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1605
1606 assert_eq!(
1607 catalog
1608 .table_exists(&non_existent_table_ident)
1609 .await
1610 .unwrap_err()
1611 .to_string(),
1612 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1613 );
1614 }
1615
1616 #[tokio::test]
1617 async fn test_rename_table_in_same_namespace() {
1618 let catalog = new_memory_catalog().await;
1619 let namespace_ident = NamespaceIdent::new("n1".into());
1620 create_namespace(&catalog, &namespace_ident).await;
1621 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1622 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1623 create_table(&catalog, &src_table_ident).await;
1624
1625 catalog
1626 .rename_table(&src_table_ident, &dst_table_ident)
1627 .await
1628 .unwrap();
1629
1630 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1631 dst_table_ident
1632 ],);
1633 }
1634
1635 #[tokio::test]
1636 async fn test_rename_table_across_namespaces() {
1637 let catalog = new_memory_catalog().await;
1638 let src_namespace_ident = NamespaceIdent::new("a".into());
1639 let dst_namespace_ident = NamespaceIdent::new("b".into());
1640 create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
1641 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1642 let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
1643 create_table(&catalog, &src_table_ident).await;
1644
1645 catalog
1646 .rename_table(&src_table_ident, &dst_table_ident)
1647 .await
1648 .unwrap();
1649
1650 assert_eq!(
1651 catalog.list_tables(&src_namespace_ident).await.unwrap(),
1652 vec![],
1653 );
1654
1655 assert_eq!(
1656 catalog.list_tables(&dst_namespace_ident).await.unwrap(),
1657 vec![dst_table_ident],
1658 );
1659 }
1660
1661 #[tokio::test]
1662 async fn test_rename_table_src_table_is_same_as_dst_table() {
1663 let catalog = new_memory_catalog().await;
1664 let namespace_ident = NamespaceIdent::new("n1".into());
1665 create_namespace(&catalog, &namespace_ident).await;
1666 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1667 create_table(&catalog, &table_ident).await;
1668
1669 catalog
1670 .rename_table(&table_ident, &table_ident)
1671 .await
1672 .unwrap();
1673
1674 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1675 table_ident
1676 ],);
1677 }
1678
1679 #[tokio::test]
1680 async fn test_rename_table_across_nested_namespaces() {
1681 let catalog = new_memory_catalog().await;
1682 let namespace_ident_a = NamespaceIdent::new("a".into());
1683 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1684 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1685 create_namespaces(&catalog, &vec![
1686 &namespace_ident_a,
1687 &namespace_ident_a_b,
1688 &namespace_ident_a_b_c,
1689 ])
1690 .await;
1691
1692 let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
1693 create_tables(&catalog, vec![&src_table_ident]).await;
1694
1695 let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1696 catalog
1697 .rename_table(&src_table_ident, &dst_table_ident)
1698 .await
1699 .unwrap();
1700
1701 assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
1702
1703 assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
1704 }
1705
1706 #[tokio::test]
1707 async fn test_rename_table_throws_error_if_src_namespace_doesnt_exist() {
1708 let catalog = new_memory_catalog().await;
1709
1710 let non_existent_src_namespace_ident = NamespaceIdent::new("n1".into());
1711 let src_table_ident =
1712 TableIdent::new(non_existent_src_namespace_ident.clone(), "tbl1".into());
1713
1714 let dst_namespace_ident = NamespaceIdent::new("n2".into());
1715 create_namespace(&catalog, &dst_namespace_ident).await;
1716 let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl1".into());
1717
1718 assert_eq!(
1719 catalog
1720 .rename_table(&src_table_ident, &dst_table_ident)
1721 .await
1722 .unwrap_err()
1723 .to_string(),
1724 format!("NamespaceNotFound => No such namespace: {non_existent_src_namespace_ident:?}"),
1725 );
1726 }
1727
1728 #[tokio::test]
1729 async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
1730 let catalog = new_memory_catalog().await;
1731 let src_namespace_ident = NamespaceIdent::new("n1".into());
1732 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1733 create_namespace(&catalog, &src_namespace_ident).await;
1734 create_table(&catalog, &src_table_ident).await;
1735
1736 let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
1737 let dst_table_ident =
1738 TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
1739 assert_eq!(
1740 catalog
1741 .rename_table(&src_table_ident, &dst_table_ident)
1742 .await
1743 .unwrap_err()
1744 .to_string(),
1745 format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"),
1746 );
1747 }
1748
1749 #[tokio::test]
1750 async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
1751 let catalog = new_memory_catalog().await;
1752 let namespace_ident = NamespaceIdent::new("n1".into());
1753 create_namespace(&catalog, &namespace_ident).await;
1754 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1755 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1756
1757 assert_eq!(
1758 catalog
1759 .rename_table(&src_table_ident, &dst_table_ident)
1760 .await
1761 .unwrap_err()
1762 .to_string(),
1763 format!("TableNotFound => No such table: {src_table_ident:?}"),
1764 );
1765 }
1766
1767 #[tokio::test]
1768 async fn test_rename_table_throws_error_if_dst_table_already_exists() {
1769 let catalog = new_memory_catalog().await;
1770 let namespace_ident = NamespaceIdent::new("n1".into());
1771 create_namespace(&catalog, &namespace_ident).await;
1772 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1773 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1774 create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
1775
1776 assert_eq!(
1777 catalog
1778 .rename_table(&src_table_ident, &dst_table_ident)
1779 .await
1780 .unwrap_err()
1781 .to_string(),
1782 format!(
1783 "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1784 &dst_table_ident
1785 ),
1786 );
1787 }
1788
1789 #[tokio::test]
1790 async fn test_register_table() {
1791 let catalog = new_memory_catalog().await;
1793 let namespace_ident = NamespaceIdent::new("test_namespace".into());
1794 create_namespace(&catalog, &namespace_ident).await;
1795
1796 let source_table_ident = TableIdent::new(namespace_ident.clone(), "source_table".into());
1798 create_table(&catalog, &source_table_ident).await;
1799
1800 let source_table = catalog.load_table(&source_table_ident).await.unwrap();
1802 let metadata_location = source_table.metadata_location().unwrap().to_string();
1803
1804 let register_table_ident =
1806 TableIdent::new(namespace_ident.clone(), "register_table".into());
1807 let registered_table = catalog
1808 .register_table(®ister_table_ident, metadata_location.clone())
1809 .await
1810 .unwrap();
1811
1812 assert_eq!(registered_table.identifier(), ®ister_table_ident);
1814
1815 assert_eq!(
1817 registered_table.metadata_location().unwrap().to_string(),
1818 metadata_location
1819 );
1820
1821 assert!(catalog.table_exists(®ister_table_ident).await.unwrap());
1823
1824 let loaded_table = catalog.load_table(®ister_table_ident).await.unwrap();
1826 assert_eq!(loaded_table.identifier(), ®ister_table_ident);
1827 assert_eq!(
1828 loaded_table.metadata_location().unwrap().to_string(),
1829 metadata_location
1830 );
1831 }
1832
1833 #[tokio::test]
1834 async fn test_update_table() {
1835 let catalog = new_memory_catalog().await;
1836
1837 let table = create_table_with_namespace(&catalog).await;
1838
1839 assert!(!table.metadata().properties().contains_key("key"));
1841
1842 let tx = Transaction::new(&table);
1844 let updated_table = tx
1845 .update_table_properties()
1846 .set("key".to_string(), "value".to_string())
1847 .apply(tx)
1848 .unwrap()
1849 .commit(&catalog)
1850 .await
1851 .unwrap();
1852
1853 assert_eq!(
1854 updated_table.metadata().properties().get("key").unwrap(),
1855 "value"
1856 );
1857
1858 assert_eq!(table.identifier(), updated_table.identifier());
1859 assert_eq!(table.metadata().uuid(), updated_table.metadata().uuid());
1860 assert!(table.metadata().last_updated_ms() < updated_table.metadata().last_updated_ms());
1861 assert_ne!(table.metadata_location(), updated_table.metadata_location());
1862
1863 assert!(
1864 table.metadata().metadata_log().len() < updated_table.metadata().metadata_log().len()
1865 );
1866 }
1867
1868 #[tokio::test]
1869 async fn test_update_table_fails_if_table_doesnt_exist() {
1870 let catalog = new_memory_catalog().await;
1871
1872 let namespace_ident = NamespaceIdent::new("a".into());
1873 create_namespace(&catalog, &namespace_ident).await;
1874
1875 let table_ident = TableIdent::new(namespace_ident, "test".to_string());
1877 let table = build_table(table_ident);
1878
1879 let tx = Transaction::new(&table);
1880 let err = tx
1881 .update_table_properties()
1882 .set("key".to_string(), "value".to_string())
1883 .apply(tx)
1884 .unwrap()
1885 .commit(&catalog)
1886 .await
1887 .unwrap_err();
1888 assert_eq!(err.kind(), ErrorKind::TableNotFound);
1889 }
1890
1891 fn build_table(ident: TableIdent) -> Table {
1892 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1893
1894 let temp_dir = TempDir::new().unwrap();
1895 let location = temp_dir.path().to_str().unwrap().to_string();
1896
1897 let table_creation = TableCreation::builder()
1898 .name(ident.name().to_string())
1899 .schema(simple_table_schema())
1900 .location(location)
1901 .build();
1902 let metadata = TableMetadataBuilder::from_table_creation(table_creation)
1903 .unwrap()
1904 .build()
1905 .unwrap()
1906 .metadata;
1907
1908 Table::builder()
1909 .identifier(ident)
1910 .metadata(metadata)
1911 .file_io(file_io)
1912 .build()
1913 .unwrap()
1914 }
1915}