1use std::collections::HashMap;
21
22use async_trait::async_trait;
23use futures::lock::{Mutex, MutexGuard};
24use itertools::Itertools;
25
26use super::namespace_state::NamespaceState;
27use crate::io::FileIO;
28use crate::spec::{TableMetadata, TableMetadataBuilder};
29use crate::table::Table;
30use crate::{
31 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
32 TableCommit, TableCreation, TableIdent,
33};
34
35pub const MEMORY_CATALOG_WAREHOUSE: &str = "warehouse";
37
38const LOCATION: &str = "location";
40
41#[derive(Debug)]
43pub struct MemoryCatalogBuilder(MemoryCatalogConfig);
44
45impl Default for MemoryCatalogBuilder {
46 fn default() -> Self {
47 Self(MemoryCatalogConfig {
48 name: None,
49 warehouse: "".to_string(),
50 props: HashMap::new(),
51 })
52 }
53}
54
55impl CatalogBuilder for MemoryCatalogBuilder {
56 type C = MemoryCatalog;
57
58 fn load(
59 mut self,
60 name: impl Into<String>,
61 props: HashMap<String, String>,
62 ) -> impl Future<Output = Result<Self::C>> + Send {
63 self.0.name = Some(name.into());
64
65 if props.contains_key(MEMORY_CATALOG_WAREHOUSE) {
66 self.0.warehouse = props
67 .get(MEMORY_CATALOG_WAREHOUSE)
68 .cloned()
69 .unwrap_or_default()
70 }
71
72 self.0.props = props
74 .into_iter()
75 .filter(|(k, _)| k != MEMORY_CATALOG_WAREHOUSE)
76 .collect();
77
78 let result = {
79 if self.0.name.is_none() {
80 Err(Error::new(
81 ErrorKind::DataInvalid,
82 "Catalog name is required",
83 ))
84 } else if self.0.warehouse.is_empty() {
85 Err(Error::new(
86 ErrorKind::DataInvalid,
87 "Catalog warehouse is required",
88 ))
89 } else {
90 MemoryCatalog::new(self.0)
91 }
92 };
93
94 std::future::ready(result)
95 }
96}
97
98#[derive(Clone, Debug)]
99pub(crate) struct MemoryCatalogConfig {
100 name: Option<String>,
101 warehouse: String,
102 props: HashMap<String, String>,
103}
104
105#[derive(Debug)]
107pub struct MemoryCatalog {
108 root_namespace_state: Mutex<NamespaceState>,
109 file_io: FileIO,
110 warehouse_location: String,
111}
112
113impl MemoryCatalog {
114 fn new(config: MemoryCatalogConfig) -> Result<Self> {
116 Ok(Self {
117 root_namespace_state: Mutex::new(NamespaceState::default()),
118 file_io: FileIO::from_path(&config.warehouse)?
119 .with_props(config.props)
120 .build()?,
121 warehouse_location: config.warehouse,
122 })
123 }
124
125 async fn load_table_from_locked_state(
127 &self,
128 table_ident: &TableIdent,
129 root_namespace_state: &MutexGuard<'_, NamespaceState>,
130 ) -> Result<Table> {
131 let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?;
132 let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
133
134 Table::builder()
135 .identifier(table_ident.clone())
136 .metadata(metadata)
137 .metadata_location(metadata_location.to_string())
138 .file_io(self.file_io.clone())
139 .build()
140 }
141}
142
143#[async_trait]
144impl Catalog for MemoryCatalog {
145 async fn list_namespaces(
147 &self,
148 maybe_parent: Option<&NamespaceIdent>,
149 ) -> Result<Vec<NamespaceIdent>> {
150 let root_namespace_state = self.root_namespace_state.lock().await;
151
152 match maybe_parent {
153 None => {
154 let namespaces = root_namespace_state
155 .list_top_level_namespaces()
156 .into_iter()
157 .map(|str| NamespaceIdent::new(str.to_string()))
158 .collect_vec();
159
160 Ok(namespaces)
161 }
162 Some(parent_namespace_ident) => {
163 let namespaces = root_namespace_state
164 .list_namespaces_under(parent_namespace_ident)?
165 .into_iter()
166 .map(|name| {
167 let mut names = parent_namespace_ident.iter().cloned().collect::<Vec<_>>();
168 names.push(name.to_string());
169 NamespaceIdent::from_vec(names)
170 })
171 .collect::<Result<Vec<_>>>()?;
172
173 Ok(namespaces)
174 }
175 }
176 }
177
178 async fn create_namespace(
180 &self,
181 namespace_ident: &NamespaceIdent,
182 properties: HashMap<String, String>,
183 ) -> Result<Namespace> {
184 let mut root_namespace_state = self.root_namespace_state.lock().await;
185
186 root_namespace_state.insert_new_namespace(namespace_ident, properties.clone())?;
187 let namespace = Namespace::with_properties(namespace_ident.clone(), properties);
188
189 Ok(namespace)
190 }
191
192 async fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<Namespace> {
194 let root_namespace_state = self.root_namespace_state.lock().await;
195
196 let namespace = Namespace::with_properties(
197 namespace_ident.clone(),
198 root_namespace_state
199 .get_properties(namespace_ident)?
200 .clone(),
201 );
202
203 Ok(namespace)
204 }
205
206 async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> Result<bool> {
208 let guarded_namespaces = self.root_namespace_state.lock().await;
209
210 Ok(guarded_namespaces.namespace_exists(namespace_ident))
211 }
212
213 async fn update_namespace(
219 &self,
220 namespace_ident: &NamespaceIdent,
221 properties: HashMap<String, String>,
222 ) -> Result<()> {
223 let mut root_namespace_state = self.root_namespace_state.lock().await;
224
225 root_namespace_state.replace_properties(namespace_ident, properties)
226 }
227
228 async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<()> {
230 let mut root_namespace_state = self.root_namespace_state.lock().await;
231
232 root_namespace_state.remove_existing_namespace(namespace_ident)
233 }
234
235 async fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result<Vec<TableIdent>> {
237 let root_namespace_state = self.root_namespace_state.lock().await;
238
239 let table_names = root_namespace_state.list_tables(namespace_ident)?;
240 let table_idents = table_names
241 .into_iter()
242 .map(|table_name| TableIdent::new(namespace_ident.clone(), table_name.clone()))
243 .collect_vec();
244
245 Ok(table_idents)
246 }
247
248 async fn create_table(
250 &self,
251 namespace_ident: &NamespaceIdent,
252 table_creation: TableCreation,
253 ) -> Result<Table> {
254 let mut root_namespace_state = self.root_namespace_state.lock().await;
255
256 let table_name = table_creation.name.clone();
257 let table_ident = TableIdent::new(namespace_ident.clone(), table_name);
258
259 let (table_creation, location) = match table_creation.location.clone() {
260 Some(location) => (table_creation, location),
261 None => {
262 let namespace_properties = root_namespace_state.get_properties(namespace_ident)?;
263 let location_prefix = match namespace_properties.get(LOCATION) {
264 Some(namespace_location) => namespace_location.clone(),
265 None => format!("{}/{}", self.warehouse_location, namespace_ident.join("/")),
266 };
267
268 let location = format!("{}/{}", location_prefix, table_ident.name());
269
270 let new_table_creation = TableCreation {
271 location: Some(location.clone()),
272 ..table_creation
273 };
274
275 (new_table_creation, location)
276 }
277 };
278
279 let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
280 .build()?
281 .metadata;
282 let metadata_location = MetadataLocation::new_with_table_location(location).to_string();
283
284 metadata.write_to(&self.file_io, &metadata_location).await?;
285
286 root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?;
287
288 Table::builder()
289 .file_io(self.file_io.clone())
290 .metadata_location(metadata_location)
291 .metadata(metadata)
292 .identifier(table_ident)
293 .build()
294 }
295
296 async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
298 let root_namespace_state = self.root_namespace_state.lock().await;
299
300 self.load_table_from_locked_state(table_ident, &root_namespace_state)
301 .await
302 }
303
304 async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> {
306 let mut root_namespace_state = self.root_namespace_state.lock().await;
307
308 let metadata_location = root_namespace_state.remove_existing_table(table_ident)?;
309 self.file_io.delete(&metadata_location).await
310 }
311
312 async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
314 let root_namespace_state = self.root_namespace_state.lock().await;
315
316 root_namespace_state.table_exists(table_ident)
317 }
318
319 async fn rename_table(
321 &self,
322 src_table_ident: &TableIdent,
323 dst_table_ident: &TableIdent,
324 ) -> Result<()> {
325 let mut root_namespace_state = self.root_namespace_state.lock().await;
326
327 let mut new_root_namespace_state = root_namespace_state.clone();
328 let metadata_location = new_root_namespace_state
329 .get_existing_table_location(src_table_ident)?
330 .clone();
331 new_root_namespace_state.remove_existing_table(src_table_ident)?;
332 new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?;
333 *root_namespace_state = new_root_namespace_state;
334
335 Ok(())
336 }
337
338 async fn register_table(
339 &self,
340 table_ident: &TableIdent,
341 metadata_location: String,
342 ) -> Result<Table> {
343 let mut root_namespace_state = self.root_namespace_state.lock().await;
344 root_namespace_state.insert_new_table(&table_ident.clone(), metadata_location.clone())?;
345
346 let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
347
348 Table::builder()
349 .file_io(self.file_io.clone())
350 .metadata_location(metadata_location)
351 .metadata(metadata)
352 .identifier(table_ident.clone())
353 .build()
354 }
355
356 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
358 let mut root_namespace_state = self.root_namespace_state.lock().await;
359
360 let current_table = self
361 .load_table_from_locked_state(commit.identifier(), &root_namespace_state)
362 .await?;
363
364 let staged_table = commit.apply(current_table)?;
366
367 staged_table
369 .metadata()
370 .write_to(
371 staged_table.file_io(),
372 staged_table.metadata_location_result()?,
373 )
374 .await?;
375
376 let updated_table = root_namespace_state.commit_table_update(staged_table)?;
378
379 Ok(updated_table)
380 }
381}
382
383#[cfg(test)]
384pub(crate) mod tests {
385 use std::collections::HashSet;
386 use std::hash::Hash;
387 use std::iter::FromIterator;
388 use std::vec;
389
390 use regex::Regex;
391 use tempfile::TempDir;
392
393 use super::*;
394 use crate::io::FileIOBuilder;
395 use crate::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
396 use crate::transaction::{ApplyTransactionAction, Transaction};
397
398 fn temp_path() -> String {
399 let temp_dir = TempDir::new().unwrap();
400 temp_dir.path().to_str().unwrap().to_string()
401 }
402
403 pub(crate) async fn new_memory_catalog() -> impl Catalog {
404 let warehouse_location = temp_path();
405 MemoryCatalogBuilder::default()
406 .load(
407 "memory",
408 HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_location)]),
409 )
410 .await
411 .unwrap()
412 }
413
414 async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
415 let _ = catalog
416 .create_namespace(namespace_ident, HashMap::new())
417 .await
418 .unwrap();
419 }
420
421 async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
422 for namespace_ident in namespace_idents {
423 let _ = create_namespace(catalog, namespace_ident).await;
424 }
425 }
426
427 fn to_set<T: Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
428 HashSet::from_iter(vec)
429 }
430
431 fn simple_table_schema() -> Schema {
432 Schema::builder()
433 .with_fields(vec![
434 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
435 ])
436 .build()
437 .unwrap()
438 }
439
440 async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) -> Table {
441 catalog
442 .create_table(
443 &table_ident.namespace,
444 TableCreation::builder()
445 .name(table_ident.name().into())
446 .schema(simple_table_schema())
447 .build(),
448 )
449 .await
450 .unwrap()
451 }
452
453 async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
454 for table_ident in table_idents {
455 create_table(catalog, table_ident).await;
456 }
457 }
458
459 async fn create_table_with_namespace<C: Catalog>(catalog: &C) -> Table {
460 let namespace_ident = NamespaceIdent::new("abc".into());
461 create_namespace(catalog, &namespace_ident).await;
462
463 let table_ident = TableIdent::new(namespace_ident, "test".to_string());
464 create_table(catalog, &table_ident).await
465 }
466
467 fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
468 assert_eq!(table.identifier(), expected_table_ident);
469
470 let metadata = table.metadata();
471
472 assert_eq!(metadata.current_schema().as_ref(), expected_schema);
473
474 let expected_partition_spec = PartitionSpec::builder((*expected_schema).clone())
475 .with_spec_id(0)
476 .build()
477 .unwrap();
478
479 assert_eq!(
480 metadata
481 .partition_specs_iter()
482 .map(|p| p.as_ref())
483 .collect_vec(),
484 vec![&expected_partition_spec]
485 );
486
487 let expected_sorted_order = SortOrder::builder()
488 .with_order_id(0)
489 .with_fields(vec![])
490 .build(expected_schema)
491 .unwrap();
492
493 assert_eq!(
494 metadata
495 .sort_orders_iter()
496 .map(|s| s.as_ref())
497 .collect_vec(),
498 vec![&expected_sorted_order]
499 );
500
501 assert_eq!(metadata.properties(), &HashMap::new());
502
503 assert!(!table.readonly());
504 }
505
506 const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
507
508 fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
509 let actual = table.metadata_location().unwrap().to_string();
510 let regex = Regex::new(regex_str).unwrap();
511 assert!(
512 regex.is_match(&actual),
513 "Expected metadata location to match regex, but got location: {actual} and regex: {regex}"
514 )
515 }
516
517 #[tokio::test]
518 async fn test_list_namespaces_returns_empty_vector() {
519 let catalog = new_memory_catalog().await;
520
521 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
522 }
523
524 #[tokio::test]
525 async fn test_list_namespaces_returns_single_namespace() {
526 let catalog = new_memory_catalog().await;
527 let namespace_ident = NamespaceIdent::new("abc".into());
528 create_namespace(&catalog, &namespace_ident).await;
529
530 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
531 namespace_ident
532 ]);
533 }
534
535 #[tokio::test]
536 async fn test_list_namespaces_returns_multiple_namespaces() {
537 let catalog = new_memory_catalog().await;
538 let namespace_ident_1 = NamespaceIdent::new("a".into());
539 let namespace_ident_2 = NamespaceIdent::new("b".into());
540 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
541
542 assert_eq!(
543 to_set(catalog.list_namespaces(None).await.unwrap()),
544 to_set(vec![namespace_ident_1, namespace_ident_2])
545 );
546 }
547
548 #[tokio::test]
549 async fn test_list_namespaces_returns_only_top_level_namespaces() {
550 let catalog = new_memory_catalog().await;
551 let namespace_ident_1 = NamespaceIdent::new("a".into());
552 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
553 let namespace_ident_3 = NamespaceIdent::new("b".into());
554 create_namespaces(&catalog, &vec![
555 &namespace_ident_1,
556 &namespace_ident_2,
557 &namespace_ident_3,
558 ])
559 .await;
560
561 assert_eq!(
562 to_set(catalog.list_namespaces(None).await.unwrap()),
563 to_set(vec![namespace_ident_1, namespace_ident_3])
564 );
565 }
566
567 #[tokio::test]
568 async fn test_list_namespaces_returns_no_namespaces_under_parent() {
569 let catalog = new_memory_catalog().await;
570 let namespace_ident_1 = NamespaceIdent::new("a".into());
571 let namespace_ident_2 = NamespaceIdent::new("b".into());
572 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
573
574 assert_eq!(
575 catalog
576 .list_namespaces(Some(&namespace_ident_1))
577 .await
578 .unwrap(),
579 vec![]
580 );
581 }
582
583 #[tokio::test]
584 async fn test_list_namespaces_returns_namespace_under_parent() {
585 let catalog = new_memory_catalog().await;
586 let namespace_ident_1 = NamespaceIdent::new("a".into());
587 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
588 let namespace_ident_3 = NamespaceIdent::new("c".into());
589 create_namespaces(&catalog, &vec![
590 &namespace_ident_1,
591 &namespace_ident_2,
592 &namespace_ident_3,
593 ])
594 .await;
595
596 assert_eq!(
597 to_set(catalog.list_namespaces(None).await.unwrap()),
598 to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
599 );
600
601 assert_eq!(
602 catalog
603 .list_namespaces(Some(&namespace_ident_1))
604 .await
605 .unwrap(),
606 vec![namespace_ident_2]
607 );
608 }
609
610 #[tokio::test]
611 async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
612 let catalog = new_memory_catalog().await;
613 let namespace_ident_1 = NamespaceIdent::new("a".to_string());
614 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
615 let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
616 let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
617 let namespace_ident_5 = NamespaceIdent::new("b".into());
618 create_namespaces(&catalog, &vec![
619 &namespace_ident_1,
620 &namespace_ident_2,
621 &namespace_ident_3,
622 &namespace_ident_4,
623 &namespace_ident_5,
624 ])
625 .await;
626
627 assert_eq!(
628 to_set(
629 catalog
630 .list_namespaces(Some(&namespace_ident_1))
631 .await
632 .unwrap()
633 ),
634 to_set(vec![
635 namespace_ident_2,
636 namespace_ident_3,
637 namespace_ident_4,
638 ])
639 );
640 }
641
642 #[tokio::test]
643 async fn test_namespace_exists_returns_false() {
644 let catalog = new_memory_catalog().await;
645 let namespace_ident = NamespaceIdent::new("a".into());
646 create_namespace(&catalog, &namespace_ident).await;
647
648 assert!(
649 !catalog
650 .namespace_exists(&NamespaceIdent::new("b".into()))
651 .await
652 .unwrap()
653 );
654 }
655
656 #[tokio::test]
657 async fn test_namespace_exists_returns_true() {
658 let catalog = new_memory_catalog().await;
659 let namespace_ident = NamespaceIdent::new("a".into());
660 create_namespace(&catalog, &namespace_ident).await;
661
662 assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
663 }
664
665 #[tokio::test]
666 async fn test_create_namespace_with_empty_properties() {
667 let catalog = new_memory_catalog().await;
668 let namespace_ident = NamespaceIdent::new("a".into());
669
670 assert_eq!(
671 catalog
672 .create_namespace(&namespace_ident, HashMap::new())
673 .await
674 .unwrap(),
675 Namespace::new(namespace_ident.clone())
676 );
677
678 assert_eq!(
679 catalog.get_namespace(&namespace_ident).await.unwrap(),
680 Namespace::with_properties(namespace_ident, HashMap::new())
681 );
682 }
683
684 #[tokio::test]
685 async fn test_create_namespace_with_properties() {
686 let catalog = new_memory_catalog().await;
687 let namespace_ident = NamespaceIdent::new("abc".into());
688
689 let mut properties: HashMap<String, String> = HashMap::new();
690 properties.insert("k".into(), "v".into());
691
692 assert_eq!(
693 catalog
694 .create_namespace(&namespace_ident, properties.clone())
695 .await
696 .unwrap(),
697 Namespace::with_properties(namespace_ident.clone(), properties.clone())
698 );
699
700 assert_eq!(
701 catalog.get_namespace(&namespace_ident).await.unwrap(),
702 Namespace::with_properties(namespace_ident, properties)
703 );
704 }
705
706 #[tokio::test]
707 async fn test_create_namespace_throws_error_if_namespace_already_exists() {
708 let catalog = new_memory_catalog().await;
709 let namespace_ident = NamespaceIdent::new("a".into());
710 create_namespace(&catalog, &namespace_ident).await;
711
712 assert_eq!(
713 catalog
714 .create_namespace(&namespace_ident, HashMap::new())
715 .await
716 .unwrap_err()
717 .to_string(),
718 format!(
719 "NamespaceAlreadyExists => Cannot create namespace {:?}. Namespace already exists.",
720 &namespace_ident
721 )
722 );
723
724 assert_eq!(
725 catalog.get_namespace(&namespace_ident).await.unwrap(),
726 Namespace::with_properties(namespace_ident, HashMap::new())
727 );
728 }
729
730 #[tokio::test]
731 async fn test_create_nested_namespace() {
732 let catalog = new_memory_catalog().await;
733 let parent_namespace_ident = NamespaceIdent::new("a".into());
734 create_namespace(&catalog, &parent_namespace_ident).await;
735
736 let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
737
738 assert_eq!(
739 catalog
740 .create_namespace(&child_namespace_ident, HashMap::new())
741 .await
742 .unwrap(),
743 Namespace::new(child_namespace_ident.clone())
744 );
745
746 assert_eq!(
747 catalog.get_namespace(&child_namespace_ident).await.unwrap(),
748 Namespace::with_properties(child_namespace_ident, HashMap::new())
749 );
750 }
751
752 #[tokio::test]
753 async fn test_create_deeply_nested_namespace() {
754 let catalog = new_memory_catalog().await;
755 let namespace_ident_a = NamespaceIdent::new("a".into());
756 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
757 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
758
759 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
760
761 assert_eq!(
762 catalog
763 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
764 .await
765 .unwrap(),
766 Namespace::new(namespace_ident_a_b_c.clone())
767 );
768
769 assert_eq!(
770 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
771 Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
772 );
773 }
774
775 #[tokio::test]
776 async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() {
777 let catalog = new_memory_catalog().await;
778
779 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
780
781 assert_eq!(
782 catalog
783 .create_namespace(&nested_namespace_ident, HashMap::new())
784 .await
785 .unwrap_err()
786 .to_string(),
787 format!(
788 "NamespaceNotFound => No such namespace: {:?}",
789 NamespaceIdent::new("a".into())
790 )
791 );
792
793 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
794 }
795
796 #[tokio::test]
797 async fn test_create_deeply_nested_namespace_throws_error_if_intermediate_namespace_doesnt_exist()
798 {
799 let catalog = new_memory_catalog().await;
800
801 let namespace_ident_a = NamespaceIdent::new("a".into());
802 create_namespace(&catalog, &namespace_ident_a).await;
803
804 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
805
806 assert_eq!(
807 catalog
808 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
809 .await
810 .unwrap_err()
811 .to_string(),
812 format!(
813 "NamespaceNotFound => No such namespace: {:?}",
814 NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()
815 )
816 );
817
818 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
819 namespace_ident_a.clone()
820 ]);
821
822 assert_eq!(
823 catalog
824 .list_namespaces(Some(&namespace_ident_a))
825 .await
826 .unwrap(),
827 vec![]
828 );
829 }
830
831 #[tokio::test]
832 async fn test_get_namespace() {
833 let catalog = new_memory_catalog().await;
834 let namespace_ident = NamespaceIdent::new("abc".into());
835
836 let mut properties: HashMap<String, String> = HashMap::new();
837 properties.insert("k".into(), "v".into());
838 let _ = catalog
839 .create_namespace(&namespace_ident, properties.clone())
840 .await
841 .unwrap();
842
843 assert_eq!(
844 catalog.get_namespace(&namespace_ident).await.unwrap(),
845 Namespace::with_properties(namespace_ident, properties)
846 )
847 }
848
849 #[tokio::test]
850 async fn test_get_nested_namespace() {
851 let catalog = new_memory_catalog().await;
852 let namespace_ident_a = NamespaceIdent::new("a".into());
853 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
854 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
855
856 assert_eq!(
857 catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
858 Namespace::with_properties(namespace_ident_a_b, HashMap::new())
859 );
860 }
861
862 #[tokio::test]
863 async fn test_get_deeply_nested_namespace() {
864 let catalog = new_memory_catalog().await;
865 let namespace_ident_a = NamespaceIdent::new("a".into());
866 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
867 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
868 create_namespaces(&catalog, &vec![
869 &namespace_ident_a,
870 &namespace_ident_a_b,
871 &namespace_ident_a_b_c,
872 ])
873 .await;
874
875 assert_eq!(
876 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
877 Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
878 );
879 }
880
881 #[tokio::test]
882 async fn test_get_namespace_throws_error_if_namespace_doesnt_exist() {
883 let catalog = new_memory_catalog().await;
884 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
885
886 let non_existent_namespace_ident = NamespaceIdent::new("b".into());
887 assert_eq!(
888 catalog
889 .get_namespace(&non_existent_namespace_ident)
890 .await
891 .unwrap_err()
892 .to_string(),
893 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
894 )
895 }
896
897 #[tokio::test]
898 async fn test_update_namespace() {
899 let catalog = new_memory_catalog().await;
900 let namespace_ident = NamespaceIdent::new("abc".into());
901 create_namespace(&catalog, &namespace_ident).await;
902
903 let mut new_properties: HashMap<String, String> = HashMap::new();
904 new_properties.insert("k".into(), "v".into());
905
906 catalog
907 .update_namespace(&namespace_ident, new_properties.clone())
908 .await
909 .unwrap();
910
911 assert_eq!(
912 catalog.get_namespace(&namespace_ident).await.unwrap(),
913 Namespace::with_properties(namespace_ident, new_properties)
914 )
915 }
916
917 #[tokio::test]
918 async fn test_update_nested_namespace() {
919 let catalog = new_memory_catalog().await;
920 let namespace_ident_a = NamespaceIdent::new("a".into());
921 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
922 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
923
924 let mut new_properties = HashMap::new();
925 new_properties.insert("k".into(), "v".into());
926
927 catalog
928 .update_namespace(&namespace_ident_a_b, new_properties.clone())
929 .await
930 .unwrap();
931
932 assert_eq!(
933 catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
934 Namespace::with_properties(namespace_ident_a_b, new_properties)
935 );
936 }
937
938 #[tokio::test]
939 async fn test_update_deeply_nested_namespace() {
940 let catalog = new_memory_catalog().await;
941 let namespace_ident_a = NamespaceIdent::new("a".into());
942 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
943 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
944 create_namespaces(&catalog, &vec![
945 &namespace_ident_a,
946 &namespace_ident_a_b,
947 &namespace_ident_a_b_c,
948 ])
949 .await;
950
951 let mut new_properties = HashMap::new();
952 new_properties.insert("k".into(), "v".into());
953
954 catalog
955 .update_namespace(&namespace_ident_a_b_c, new_properties.clone())
956 .await
957 .unwrap();
958
959 assert_eq!(
960 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
961 Namespace::with_properties(namespace_ident_a_b_c, new_properties)
962 );
963 }
964
965 #[tokio::test]
966 async fn test_update_namespace_throws_error_if_namespace_doesnt_exist() {
967 let catalog = new_memory_catalog().await;
968 create_namespace(&catalog, &NamespaceIdent::new("abc".into())).await;
969
970 let non_existent_namespace_ident = NamespaceIdent::new("def".into());
971 assert_eq!(
972 catalog
973 .update_namespace(&non_existent_namespace_ident, HashMap::new())
974 .await
975 .unwrap_err()
976 .to_string(),
977 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
978 )
979 }
980
981 #[tokio::test]
982 async fn test_drop_namespace() {
983 let catalog = new_memory_catalog().await;
984 let namespace_ident = NamespaceIdent::new("abc".into());
985 create_namespace(&catalog, &namespace_ident).await;
986
987 catalog.drop_namespace(&namespace_ident).await.unwrap();
988
989 assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
990 }
991
992 #[tokio::test]
993 async fn test_drop_nested_namespace() {
994 let catalog = new_memory_catalog().await;
995 let namespace_ident_a = NamespaceIdent::new("a".into());
996 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
997 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
998
999 catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1000
1001 assert!(
1002 !catalog
1003 .namespace_exists(&namespace_ident_a_b)
1004 .await
1005 .unwrap()
1006 );
1007
1008 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1009 }
1010
1011 #[tokio::test]
1012 async fn test_drop_deeply_nested_namespace() {
1013 let catalog = new_memory_catalog().await;
1014 let namespace_ident_a = NamespaceIdent::new("a".into());
1015 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1016 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1017 create_namespaces(&catalog, &vec![
1018 &namespace_ident_a,
1019 &namespace_ident_a_b,
1020 &namespace_ident_a_b_c,
1021 ])
1022 .await;
1023
1024 catalog
1025 .drop_namespace(&namespace_ident_a_b_c)
1026 .await
1027 .unwrap();
1028
1029 assert!(
1030 !catalog
1031 .namespace_exists(&namespace_ident_a_b_c)
1032 .await
1033 .unwrap()
1034 );
1035
1036 assert!(
1037 catalog
1038 .namespace_exists(&namespace_ident_a_b)
1039 .await
1040 .unwrap()
1041 );
1042
1043 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1044 }
1045
1046 #[tokio::test]
1047 async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1048 let catalog = new_memory_catalog().await;
1049
1050 let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1051 assert_eq!(
1052 catalog
1053 .drop_namespace(&non_existent_namespace_ident)
1054 .await
1055 .unwrap_err()
1056 .to_string(),
1057 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1058 )
1059 }
1060
1061 #[tokio::test]
1062 async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1063 let catalog = new_memory_catalog().await;
1064 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1065
1066 let non_existent_namespace_ident =
1067 NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1068 assert_eq!(
1069 catalog
1070 .drop_namespace(&non_existent_namespace_ident)
1071 .await
1072 .unwrap_err()
1073 .to_string(),
1074 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1075 )
1076 }
1077
1078 #[tokio::test]
1079 async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() {
1080 let catalog = new_memory_catalog().await;
1081 let namespace_ident_a = NamespaceIdent::new("a".into());
1082 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1083 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1084
1085 catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1086
1087 assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1088
1089 assert!(
1090 !catalog
1091 .namespace_exists(&namespace_ident_a_b)
1092 .await
1093 .unwrap()
1094 );
1095 }
1096
1097 #[tokio::test]
1098 async fn test_create_table_with_location() {
1099 let tmp_dir = TempDir::new().unwrap();
1100 let catalog = new_memory_catalog().await;
1101 let namespace_ident = NamespaceIdent::new("a".into());
1102 create_namespace(&catalog, &namespace_ident).await;
1103
1104 let table_name = "abc";
1105 let location = tmp_dir.path().to_str().unwrap().to_string();
1106 let table_creation = TableCreation::builder()
1107 .name(table_name.into())
1108 .location(location.clone())
1109 .schema(simple_table_schema())
1110 .build();
1111
1112 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1113
1114 assert_table_eq(
1115 &catalog
1116 .create_table(&namespace_ident, table_creation)
1117 .await
1118 .unwrap(),
1119 &expected_table_ident,
1120 &simple_table_schema(),
1121 );
1122
1123 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1124
1125 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1126
1127 assert!(
1128 table
1129 .metadata_location()
1130 .unwrap()
1131 .to_string()
1132 .starts_with(&location)
1133 )
1134 }
1135
1136 #[tokio::test]
1137 async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1138 let warehouse_location = temp_path();
1139 let catalog = MemoryCatalogBuilder::default()
1140 .load(
1141 "memory",
1142 HashMap::from([(
1143 MEMORY_CATALOG_WAREHOUSE.to_string(),
1144 warehouse_location.clone(),
1145 )]),
1146 )
1147 .await
1148 .unwrap();
1149
1150 let namespace_ident = NamespaceIdent::new("a".into());
1151 let mut namespace_properties = HashMap::new();
1152 let namespace_location = temp_path();
1153 namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1154 catalog
1155 .create_namespace(&namespace_ident, namespace_properties)
1156 .await
1157 .unwrap();
1158
1159 let table_name = "tbl1";
1160 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1161 let expected_table_metadata_location_regex =
1162 format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1163
1164 let table = catalog
1165 .create_table(
1166 &namespace_ident,
1167 TableCreation::builder()
1168 .name(table_name.into())
1169 .schema(simple_table_schema())
1170 .build(),
1172 )
1173 .await
1174 .unwrap();
1175 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1176 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1177
1178 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1179 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1180 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1181 }
1182
1183 #[tokio::test]
1184 async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1185 {
1186 let warehouse_location = temp_path();
1187 let catalog = MemoryCatalogBuilder::default()
1188 .load(
1189 "memory",
1190 HashMap::from([(
1191 MEMORY_CATALOG_WAREHOUSE.to_string(),
1192 warehouse_location.clone(),
1193 )]),
1194 )
1195 .await
1196 .unwrap();
1197
1198 let namespace_ident = NamespaceIdent::new("a".into());
1199 let mut namespace_properties = HashMap::new();
1200 let namespace_location = temp_path();
1201 namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1202 catalog
1203 .create_namespace(&namespace_ident, namespace_properties)
1204 .await
1205 .unwrap();
1206
1207 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1208 let mut nested_namespace_properties = HashMap::new();
1209 let nested_namespace_location = temp_path();
1210 nested_namespace_properties
1211 .insert(LOCATION.to_string(), nested_namespace_location.to_string());
1212 catalog
1213 .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1214 .await
1215 .unwrap();
1216
1217 let table_name = "tbl1";
1218 let expected_table_ident =
1219 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1220 let expected_table_metadata_location_regex = format!(
1221 "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1222 );
1223
1224 let table = catalog
1225 .create_table(
1226 &nested_namespace_ident,
1227 TableCreation::builder()
1228 .name(table_name.into())
1229 .schema(simple_table_schema())
1230 .build(),
1232 )
1233 .await
1234 .unwrap();
1235 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1236 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1237
1238 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1239 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1240 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1241 }
1242
1243 #[tokio::test]
1244 async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1245 {
1246 let warehouse_location = temp_path();
1247 let catalog = MemoryCatalogBuilder::default()
1248 .load(
1249 "memory",
1250 HashMap::from([(
1251 MEMORY_CATALOG_WAREHOUSE.to_string(),
1252 warehouse_location.clone(),
1253 )]),
1254 )
1255 .await
1256 .unwrap();
1257
1258 let namespace_ident = NamespaceIdent::new("a".into());
1259 let namespace_properties = HashMap::new();
1261 catalog
1262 .create_namespace(&namespace_ident, namespace_properties)
1263 .await
1264 .unwrap();
1265
1266 let table_name = "tbl1";
1267 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1268 let expected_table_metadata_location_regex =
1269 format!("^{warehouse_location}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1270
1271 let table = catalog
1272 .create_table(
1273 &namespace_ident,
1274 TableCreation::builder()
1275 .name(table_name.into())
1276 .schema(simple_table_schema())
1277 .build(),
1279 )
1280 .await
1281 .unwrap();
1282 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1283 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1284
1285 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1286 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1287 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1288 }
1289
1290 #[tokio::test]
1291 async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1292 {
1293 let warehouse_location = temp_path();
1294 let catalog = MemoryCatalogBuilder::default()
1295 .load(
1296 "memory",
1297 HashMap::from([(
1298 MEMORY_CATALOG_WAREHOUSE.to_string(),
1299 warehouse_location.clone(),
1300 )]),
1301 )
1302 .await
1303 .unwrap();
1304
1305 let namespace_ident = NamespaceIdent::new("a".into());
1306 catalog
1307 .create_namespace(&namespace_ident, HashMap::new())
1309 .await
1310 .unwrap();
1311
1312 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1313 catalog
1314 .create_namespace(&nested_namespace_ident, HashMap::new())
1316 .await
1317 .unwrap();
1318
1319 let table_name = "tbl1";
1320 let expected_table_ident =
1321 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1322 let expected_table_metadata_location_regex = format!(
1323 "^{warehouse_location}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$"
1324 );
1325
1326 let table = catalog
1327 .create_table(
1328 &nested_namespace_ident,
1329 TableCreation::builder()
1330 .name(table_name.into())
1331 .schema(simple_table_schema())
1332 .build(),
1334 )
1335 .await
1336 .unwrap();
1337 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1338 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1339
1340 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1341 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1342 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1343 }
1344
1345 #[tokio::test]
1346 async fn test_create_table_throws_error_if_table_location_and_namespace_location_and_warehouse_location_are_missing()
1347 {
1348 let catalog = MemoryCatalogBuilder::default()
1349 .load("memory", HashMap::from([]))
1350 .await;
1351
1352 assert!(catalog.is_err());
1353 assert_eq!(
1354 catalog.unwrap_err().to_string(),
1355 "DataInvalid => Catalog warehouse is required"
1356 );
1357 }
1358
1359 #[tokio::test]
1360 async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1361 let catalog = new_memory_catalog().await;
1362 let namespace_ident = NamespaceIdent::new("a".into());
1363 create_namespace(&catalog, &namespace_ident).await;
1364 let table_name = "tbl1";
1365 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1366 create_table(&catalog, &table_ident).await;
1367
1368 let tmp_dir = TempDir::new().unwrap();
1369 let location = tmp_dir.path().to_str().unwrap().to_string();
1370
1371 assert_eq!(
1372 catalog
1373 .create_table(
1374 &namespace_ident,
1375 TableCreation::builder()
1376 .name(table_name.into())
1377 .schema(simple_table_schema())
1378 .location(location)
1379 .build()
1380 )
1381 .await
1382 .unwrap_err()
1383 .to_string(),
1384 format!(
1385 "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1386 &table_ident
1387 )
1388 );
1389 }
1390
1391 #[tokio::test]
1392 async fn test_list_tables_returns_empty_vector() {
1393 let catalog = new_memory_catalog().await;
1394 let namespace_ident = NamespaceIdent::new("a".into());
1395 create_namespace(&catalog, &namespace_ident).await;
1396
1397 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1398 }
1399
1400 #[tokio::test]
1401 async fn test_list_tables_returns_a_single_table() {
1402 let catalog = new_memory_catalog().await;
1403 let namespace_ident = NamespaceIdent::new("n1".into());
1404 create_namespace(&catalog, &namespace_ident).await;
1405
1406 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1407 create_table(&catalog, &table_ident).await;
1408
1409 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1410 table_ident
1411 ]);
1412 }
1413
1414 #[tokio::test]
1415 async fn test_list_tables_returns_multiple_tables() {
1416 let catalog = new_memory_catalog().await;
1417 let namespace_ident = NamespaceIdent::new("n1".into());
1418 create_namespace(&catalog, &namespace_ident).await;
1419
1420 let table_ident_1 = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1421 let table_ident_2 = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1422 let _ = create_tables(&catalog, vec![&table_ident_1, &table_ident_2]).await;
1423
1424 assert_eq!(
1425 to_set(catalog.list_tables(&namespace_ident).await.unwrap()),
1426 to_set(vec![table_ident_1, table_ident_2])
1427 );
1428 }
1429
1430 #[tokio::test]
1431 async fn test_list_tables_returns_tables_from_correct_namespace() {
1432 let catalog = new_memory_catalog().await;
1433 let namespace_ident_1 = NamespaceIdent::new("n1".into());
1434 let namespace_ident_2 = NamespaceIdent::new("n2".into());
1435 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1436
1437 let table_ident_1 = TableIdent::new(namespace_ident_1.clone(), "tbl1".into());
1438 let table_ident_2 = TableIdent::new(namespace_ident_1.clone(), "tbl2".into());
1439 let table_ident_3 = TableIdent::new(namespace_ident_2.clone(), "tbl1".into());
1440 let _ = create_tables(&catalog, vec![
1441 &table_ident_1,
1442 &table_ident_2,
1443 &table_ident_3,
1444 ])
1445 .await;
1446
1447 assert_eq!(
1448 to_set(catalog.list_tables(&namespace_ident_1).await.unwrap()),
1449 to_set(vec![table_ident_1, table_ident_2])
1450 );
1451
1452 assert_eq!(
1453 to_set(catalog.list_tables(&namespace_ident_2).await.unwrap()),
1454 to_set(vec![table_ident_3])
1455 );
1456 }
1457
1458 #[tokio::test]
1459 async fn test_list_tables_returns_table_under_nested_namespace() {
1460 let catalog = new_memory_catalog().await;
1461 let namespace_ident_a = NamespaceIdent::new("a".into());
1462 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1463 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1464
1465 let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1466 create_table(&catalog, &table_ident).await;
1467
1468 assert_eq!(
1469 catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1470 vec![table_ident]
1471 );
1472 }
1473
1474 #[tokio::test]
1475 async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1476 let catalog = new_memory_catalog().await;
1477
1478 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1479
1480 assert_eq!(
1481 catalog
1482 .list_tables(&non_existent_namespace_ident)
1483 .await
1484 .unwrap_err()
1485 .to_string(),
1486 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1487 );
1488 }
1489
1490 #[tokio::test]
1491 async fn test_drop_table() {
1492 let catalog = new_memory_catalog().await;
1493 let namespace_ident = NamespaceIdent::new("n1".into());
1494 create_namespace(&catalog, &namespace_ident).await;
1495 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1496 create_table(&catalog, &table_ident).await;
1497
1498 catalog.drop_table(&table_ident).await.unwrap();
1499 }
1500
1501 #[tokio::test]
1502 async fn test_drop_table_drops_table_under_nested_namespace() {
1503 let catalog = new_memory_catalog().await;
1504 let namespace_ident_a = NamespaceIdent::new("a".into());
1505 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1506 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1507
1508 let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1509 create_table(&catalog, &table_ident).await;
1510
1511 catalog.drop_table(&table_ident).await.unwrap();
1512
1513 assert_eq!(
1514 catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1515 vec![]
1516 );
1517 }
1518
1519 #[tokio::test]
1520 async fn test_drop_table_throws_error_if_namespace_doesnt_exist() {
1521 let catalog = new_memory_catalog().await;
1522
1523 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1524 let non_existent_table_ident =
1525 TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1526
1527 assert_eq!(
1528 catalog
1529 .drop_table(&non_existent_table_ident)
1530 .await
1531 .unwrap_err()
1532 .to_string(),
1533 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1534 );
1535 }
1536
1537 #[tokio::test]
1538 async fn test_drop_table_throws_error_if_table_doesnt_exist() {
1539 let catalog = new_memory_catalog().await;
1540 let namespace_ident = NamespaceIdent::new("n1".into());
1541 create_namespace(&catalog, &namespace_ident).await;
1542
1543 let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1544
1545 assert_eq!(
1546 catalog
1547 .drop_table(&non_existent_table_ident)
1548 .await
1549 .unwrap_err()
1550 .to_string(),
1551 format!("TableNotFound => No such table: {non_existent_table_ident:?}"),
1552 );
1553 }
1554
1555 #[tokio::test]
1556 async fn test_table_exists_returns_true() {
1557 let catalog = new_memory_catalog().await;
1558 let namespace_ident = NamespaceIdent::new("n1".into());
1559 create_namespace(&catalog, &namespace_ident).await;
1560 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1561 create_table(&catalog, &table_ident).await;
1562
1563 assert!(catalog.table_exists(&table_ident).await.unwrap());
1564 }
1565
1566 #[tokio::test]
1567 async fn test_table_exists_returns_false() {
1568 let catalog = new_memory_catalog().await;
1569 let namespace_ident = NamespaceIdent::new("n1".into());
1570 create_namespace(&catalog, &namespace_ident).await;
1571 let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1572
1573 assert!(
1574 !catalog
1575 .table_exists(&non_existent_table_ident)
1576 .await
1577 .unwrap()
1578 );
1579 }
1580
1581 #[tokio::test]
1582 async fn test_table_exists_under_nested_namespace() {
1583 let catalog = new_memory_catalog().await;
1584 let namespace_ident_a = NamespaceIdent::new("a".into());
1585 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1586 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1587
1588 let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1589 create_table(&catalog, &table_ident).await;
1590
1591 assert!(catalog.table_exists(&table_ident).await.unwrap());
1592
1593 let non_existent_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl2".into());
1594 assert!(
1595 !catalog
1596 .table_exists(&non_existent_table_ident)
1597 .await
1598 .unwrap()
1599 );
1600 }
1601
1602 #[tokio::test]
1603 async fn test_table_exists_throws_error_if_namespace_doesnt_exist() {
1604 let catalog = new_memory_catalog().await;
1605
1606 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1607 let non_existent_table_ident =
1608 TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1609
1610 assert_eq!(
1611 catalog
1612 .table_exists(&non_existent_table_ident)
1613 .await
1614 .unwrap_err()
1615 .to_string(),
1616 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1617 );
1618 }
1619
1620 #[tokio::test]
1621 async fn test_rename_table_in_same_namespace() {
1622 let catalog = new_memory_catalog().await;
1623 let namespace_ident = NamespaceIdent::new("n1".into());
1624 create_namespace(&catalog, &namespace_ident).await;
1625 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1626 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1627 create_table(&catalog, &src_table_ident).await;
1628
1629 catalog
1630 .rename_table(&src_table_ident, &dst_table_ident)
1631 .await
1632 .unwrap();
1633
1634 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1635 dst_table_ident
1636 ],);
1637 }
1638
1639 #[tokio::test]
1640 async fn test_rename_table_across_namespaces() {
1641 let catalog = new_memory_catalog().await;
1642 let src_namespace_ident = NamespaceIdent::new("a".into());
1643 let dst_namespace_ident = NamespaceIdent::new("b".into());
1644 create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
1645 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1646 let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
1647 create_table(&catalog, &src_table_ident).await;
1648
1649 catalog
1650 .rename_table(&src_table_ident, &dst_table_ident)
1651 .await
1652 .unwrap();
1653
1654 assert_eq!(
1655 catalog.list_tables(&src_namespace_ident).await.unwrap(),
1656 vec![],
1657 );
1658
1659 assert_eq!(
1660 catalog.list_tables(&dst_namespace_ident).await.unwrap(),
1661 vec![dst_table_ident],
1662 );
1663 }
1664
1665 #[tokio::test]
1666 async fn test_rename_table_src_table_is_same_as_dst_table() {
1667 let catalog = new_memory_catalog().await;
1668 let namespace_ident = NamespaceIdent::new("n1".into());
1669 create_namespace(&catalog, &namespace_ident).await;
1670 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1671 create_table(&catalog, &table_ident).await;
1672
1673 catalog
1674 .rename_table(&table_ident, &table_ident)
1675 .await
1676 .unwrap();
1677
1678 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1679 table_ident
1680 ],);
1681 }
1682
1683 #[tokio::test]
1684 async fn test_rename_table_across_nested_namespaces() {
1685 let catalog = new_memory_catalog().await;
1686 let namespace_ident_a = NamespaceIdent::new("a".into());
1687 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1688 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1689 create_namespaces(&catalog, &vec![
1690 &namespace_ident_a,
1691 &namespace_ident_a_b,
1692 &namespace_ident_a_b_c,
1693 ])
1694 .await;
1695
1696 let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
1697 create_tables(&catalog, vec![&src_table_ident]).await;
1698
1699 let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1700 catalog
1701 .rename_table(&src_table_ident, &dst_table_ident)
1702 .await
1703 .unwrap();
1704
1705 assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
1706
1707 assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
1708 }
1709
1710 #[tokio::test]
1711 async fn test_rename_table_throws_error_if_src_namespace_doesnt_exist() {
1712 let catalog = new_memory_catalog().await;
1713
1714 let non_existent_src_namespace_ident = NamespaceIdent::new("n1".into());
1715 let src_table_ident =
1716 TableIdent::new(non_existent_src_namespace_ident.clone(), "tbl1".into());
1717
1718 let dst_namespace_ident = NamespaceIdent::new("n2".into());
1719 create_namespace(&catalog, &dst_namespace_ident).await;
1720 let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl1".into());
1721
1722 assert_eq!(
1723 catalog
1724 .rename_table(&src_table_ident, &dst_table_ident)
1725 .await
1726 .unwrap_err()
1727 .to_string(),
1728 format!("NamespaceNotFound => No such namespace: {non_existent_src_namespace_ident:?}"),
1729 );
1730 }
1731
1732 #[tokio::test]
1733 async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
1734 let catalog = new_memory_catalog().await;
1735 let src_namespace_ident = NamespaceIdent::new("n1".into());
1736 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1737 create_namespace(&catalog, &src_namespace_ident).await;
1738 create_table(&catalog, &src_table_ident).await;
1739
1740 let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
1741 let dst_table_ident =
1742 TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
1743 assert_eq!(
1744 catalog
1745 .rename_table(&src_table_ident, &dst_table_ident)
1746 .await
1747 .unwrap_err()
1748 .to_string(),
1749 format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"),
1750 );
1751 }
1752
1753 #[tokio::test]
1754 async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
1755 let catalog = new_memory_catalog().await;
1756 let namespace_ident = NamespaceIdent::new("n1".into());
1757 create_namespace(&catalog, &namespace_ident).await;
1758 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1759 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1760
1761 assert_eq!(
1762 catalog
1763 .rename_table(&src_table_ident, &dst_table_ident)
1764 .await
1765 .unwrap_err()
1766 .to_string(),
1767 format!("TableNotFound => No such table: {src_table_ident:?}"),
1768 );
1769 }
1770
1771 #[tokio::test]
1772 async fn test_rename_table_throws_error_if_dst_table_already_exists() {
1773 let catalog = new_memory_catalog().await;
1774 let namespace_ident = NamespaceIdent::new("n1".into());
1775 create_namespace(&catalog, &namespace_ident).await;
1776 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1777 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1778 create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
1779
1780 assert_eq!(
1781 catalog
1782 .rename_table(&src_table_ident, &dst_table_ident)
1783 .await
1784 .unwrap_err()
1785 .to_string(),
1786 format!(
1787 "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1788 &dst_table_ident
1789 ),
1790 );
1791 }
1792
1793 #[tokio::test]
1794 async fn test_register_table() {
1795 let catalog = new_memory_catalog().await;
1797 let namespace_ident = NamespaceIdent::new("test_namespace".into());
1798 create_namespace(&catalog, &namespace_ident).await;
1799
1800 let source_table_ident = TableIdent::new(namespace_ident.clone(), "source_table".into());
1802 create_table(&catalog, &source_table_ident).await;
1803
1804 let source_table = catalog.load_table(&source_table_ident).await.unwrap();
1806 let metadata_location = source_table.metadata_location().unwrap().to_string();
1807
1808 let register_table_ident =
1810 TableIdent::new(namespace_ident.clone(), "register_table".into());
1811 let registered_table = catalog
1812 .register_table(®ister_table_ident, metadata_location.clone())
1813 .await
1814 .unwrap();
1815
1816 assert_eq!(registered_table.identifier(), ®ister_table_ident);
1818
1819 assert_eq!(
1821 registered_table.metadata_location().unwrap().to_string(),
1822 metadata_location
1823 );
1824
1825 assert!(catalog.table_exists(®ister_table_ident).await.unwrap());
1827
1828 let loaded_table = catalog.load_table(®ister_table_ident).await.unwrap();
1830 assert_eq!(loaded_table.identifier(), ®ister_table_ident);
1831 assert_eq!(
1832 loaded_table.metadata_location().unwrap().to_string(),
1833 metadata_location
1834 );
1835 }
1836
1837 #[tokio::test]
1838 async fn test_update_table() {
1839 let catalog = new_memory_catalog().await;
1840
1841 let table = create_table_with_namespace(&catalog).await;
1842
1843 assert!(!table.metadata().properties().contains_key("key"));
1845
1846 let tx = Transaction::new(&table);
1848 let updated_table = tx
1849 .update_table_properties()
1850 .set("key".to_string(), "value".to_string())
1851 .apply(tx)
1852 .unwrap()
1853 .commit(&catalog)
1854 .await
1855 .unwrap();
1856
1857 assert_eq!(
1858 updated_table.metadata().properties().get("key").unwrap(),
1859 "value"
1860 );
1861
1862 assert_eq!(table.identifier(), updated_table.identifier());
1863 assert_eq!(table.metadata().uuid(), updated_table.metadata().uuid());
1864 assert!(table.metadata().last_updated_ms() < updated_table.metadata().last_updated_ms());
1865 assert_ne!(table.metadata_location(), updated_table.metadata_location());
1866
1867 assert!(
1868 table.metadata().metadata_log().len() < updated_table.metadata().metadata_log().len()
1869 );
1870 }
1871
1872 #[tokio::test]
1873 async fn test_update_table_fails_if_table_doesnt_exist() {
1874 let catalog = new_memory_catalog().await;
1875
1876 let namespace_ident = NamespaceIdent::new("a".into());
1877 create_namespace(&catalog, &namespace_ident).await;
1878
1879 let table_ident = TableIdent::new(namespace_ident, "test".to_string());
1881 let table = build_table(table_ident);
1882
1883 let tx = Transaction::new(&table);
1884 let err = tx
1885 .update_table_properties()
1886 .set("key".to_string(), "value".to_string())
1887 .apply(tx)
1888 .unwrap()
1889 .commit(&catalog)
1890 .await
1891 .unwrap_err();
1892 assert_eq!(err.kind(), ErrorKind::TableNotFound);
1893 }
1894
1895 fn build_table(ident: TableIdent) -> Table {
1896 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1897
1898 let temp_dir = TempDir::new().unwrap();
1899 let location = temp_dir.path().to_str().unwrap().to_string();
1900
1901 let table_creation = TableCreation::builder()
1902 .name(ident.name().to_string())
1903 .schema(simple_table_schema())
1904 .location(location)
1905 .build();
1906 let metadata = TableMetadataBuilder::from_table_creation(table_creation)
1907 .unwrap()
1908 .build()
1909 .unwrap()
1910 .metadata;
1911
1912 Table::builder()
1913 .identifier(ident)
1914 .metadata(metadata)
1915 .file_io(file_io)
1916 .build()
1917 .unwrap()
1918 }
1919}