iceberg/catalog/memory/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains memory catalog implementation.
19
20use std::collections::HashMap;
21use std::str::FromStr;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use futures::lock::{Mutex, MutexGuard};
26use itertools::Itertools;
27
28use super::namespace_state::NamespaceState;
29use crate::io::{FileIO, FileIOBuilder, MemoryStorageFactory, StorageFactory};
30use crate::spec::{TableMetadata, TableMetadataBuilder};
31use crate::table::Table;
32use crate::{
33    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
34    TableCommit, TableCreation, TableIdent,
35};
36
37/// Memory catalog warehouse location
38pub const MEMORY_CATALOG_WAREHOUSE: &str = "warehouse";
39
40/// namespace `location` property
41const LOCATION: &str = "location";
42
43/// Builder for [`MemoryCatalog`].
44#[derive(Debug)]
45pub struct MemoryCatalogBuilder {
46    config: MemoryCatalogConfig,
47    storage_factory: Option<Arc<dyn StorageFactory>>,
48}
49
50impl Default for MemoryCatalogBuilder {
51    fn default() -> Self {
52        Self {
53            config: MemoryCatalogConfig {
54                name: None,
55                warehouse: "".to_string(),
56                props: HashMap::new(),
57            },
58            storage_factory: None,
59        }
60    }
61}
62
63impl CatalogBuilder for MemoryCatalogBuilder {
64    type C = MemoryCatalog;
65
66    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
67        self.storage_factory = Some(storage_factory);
68        self
69    }
70
71    fn load(
72        mut self,
73        name: impl Into<String>,
74        props: HashMap<String, String>,
75    ) -> impl Future<Output = Result<Self::C>> + Send {
76        self.config.name = Some(name.into());
77
78        if props.contains_key(MEMORY_CATALOG_WAREHOUSE) {
79            self.config.warehouse = props
80                .get(MEMORY_CATALOG_WAREHOUSE)
81                .cloned()
82                .unwrap_or_default()
83        }
84
85        // Collect other remaining properties
86        self.config.props = props
87            .into_iter()
88            .filter(|(k, _)| k != MEMORY_CATALOG_WAREHOUSE)
89            .collect();
90
91        let result = {
92            if self.config.name.is_none() {
93                Err(Error::new(
94                    ErrorKind::DataInvalid,
95                    "Catalog name is required",
96                ))
97            } else if self.config.warehouse.is_empty() {
98                Err(Error::new(
99                    ErrorKind::DataInvalid,
100                    "Catalog warehouse is required",
101                ))
102            } else {
103                MemoryCatalog::new(self.config, self.storage_factory)
104            }
105        };
106
107        std::future::ready(result)
108    }
109}
110
111#[derive(Clone, Debug)]
112pub(crate) struct MemoryCatalogConfig {
113    name: Option<String>,
114    warehouse: String,
115    props: HashMap<String, String>,
116}
117
118/// Memory catalog implementation.
119#[derive(Debug)]
120pub struct MemoryCatalog {
121    root_namespace_state: Mutex<NamespaceState>,
122    file_io: FileIO,
123    warehouse_location: String,
124}
125
126impl MemoryCatalog {
127    /// Creates a memory catalog.
128    fn new(
129        config: MemoryCatalogConfig,
130        storage_factory: Option<Arc<dyn StorageFactory>>,
131    ) -> Result<Self> {
132        // Use provided factory or default to MemoryStorageFactory
133        let factory = storage_factory.unwrap_or_else(|| Arc::new(MemoryStorageFactory));
134
135        Ok(Self {
136            root_namespace_state: Mutex::new(NamespaceState::default()),
137            file_io: FileIOBuilder::new(factory).with_props(config.props).build(),
138            warehouse_location: config.warehouse,
139        })
140    }
141
142    /// Loads a table from the locked namespace state.
143    async fn load_table_from_locked_state(
144        &self,
145        table_ident: &TableIdent,
146        root_namespace_state: &MutexGuard<'_, NamespaceState>,
147    ) -> Result<Table> {
148        let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?;
149        let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
150
151        Table::builder()
152            .identifier(table_ident.clone())
153            .metadata(metadata)
154            .metadata_location(metadata_location.to_string())
155            .file_io(self.file_io.clone())
156            .build()
157    }
158}
159
160#[async_trait]
161impl Catalog for MemoryCatalog {
162    /// List namespaces inside the catalog.
163    async fn list_namespaces(
164        &self,
165        maybe_parent: Option<&NamespaceIdent>,
166    ) -> Result<Vec<NamespaceIdent>> {
167        let root_namespace_state = self.root_namespace_state.lock().await;
168
169        match maybe_parent {
170            None => {
171                let namespaces = root_namespace_state
172                    .list_top_level_namespaces()
173                    .into_iter()
174                    .map(|str| NamespaceIdent::new(str.to_string()))
175                    .collect_vec();
176
177                Ok(namespaces)
178            }
179            Some(parent_namespace_ident) => {
180                let namespaces = root_namespace_state
181                    .list_namespaces_under(parent_namespace_ident)?
182                    .into_iter()
183                    .map(|name| {
184                        let mut names = parent_namespace_ident.iter().cloned().collect::<Vec<_>>();
185                        names.push(name.to_string());
186                        NamespaceIdent::from_vec(names)
187                    })
188                    .collect::<Result<Vec<_>>>()?;
189
190                Ok(namespaces)
191            }
192        }
193    }
194
195    /// Create a new namespace inside the catalog.
196    async fn create_namespace(
197        &self,
198        namespace_ident: &NamespaceIdent,
199        properties: HashMap<String, String>,
200    ) -> Result<Namespace> {
201        let mut root_namespace_state = self.root_namespace_state.lock().await;
202
203        root_namespace_state.insert_new_namespace(namespace_ident, properties.clone())?;
204        let namespace = Namespace::with_properties(namespace_ident.clone(), properties);
205
206        Ok(namespace)
207    }
208
209    /// Get a namespace information from the catalog.
210    async fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<Namespace> {
211        let root_namespace_state = self.root_namespace_state.lock().await;
212
213        let namespace = Namespace::with_properties(
214            namespace_ident.clone(),
215            root_namespace_state
216                .get_properties(namespace_ident)?
217                .clone(),
218        );
219
220        Ok(namespace)
221    }
222
223    /// Check if namespace exists in catalog.
224    async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> Result<bool> {
225        let guarded_namespaces = self.root_namespace_state.lock().await;
226
227        Ok(guarded_namespaces.namespace_exists(namespace_ident))
228    }
229
230    /// Update a namespace inside the catalog.
231    ///
232    /// # Behavior
233    ///
234    /// The properties must be the full set of namespace.
235    async fn update_namespace(
236        &self,
237        namespace_ident: &NamespaceIdent,
238        properties: HashMap<String, String>,
239    ) -> Result<()> {
240        let mut root_namespace_state = self.root_namespace_state.lock().await;
241
242        root_namespace_state.replace_properties(namespace_ident, properties)
243    }
244
245    /// Drop a namespace from the catalog.
246    async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<()> {
247        let mut root_namespace_state = self.root_namespace_state.lock().await;
248
249        root_namespace_state.remove_existing_namespace(namespace_ident)
250    }
251
252    /// List tables from namespace.
253    async fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result<Vec<TableIdent>> {
254        let root_namespace_state = self.root_namespace_state.lock().await;
255
256        let table_names = root_namespace_state.list_tables(namespace_ident)?;
257        let table_idents = table_names
258            .into_iter()
259            .map(|table_name| TableIdent::new(namespace_ident.clone(), table_name.clone()))
260            .collect_vec();
261
262        Ok(table_idents)
263    }
264
265    /// Create a new table inside the namespace.
266    async fn create_table(
267        &self,
268        namespace_ident: &NamespaceIdent,
269        table_creation: TableCreation,
270    ) -> Result<Table> {
271        let mut root_namespace_state = self.root_namespace_state.lock().await;
272
273        let table_name = table_creation.name.clone();
274        let table_ident = TableIdent::new(namespace_ident.clone(), table_name);
275
276        let (table_creation, location) = match table_creation.location.clone() {
277            Some(location) => (table_creation, location),
278            None => {
279                let namespace_properties = root_namespace_state.get_properties(namespace_ident)?;
280                let location_prefix = match namespace_properties.get(LOCATION) {
281                    Some(namespace_location) => namespace_location.clone(),
282                    None => format!("{}/{}", self.warehouse_location, namespace_ident.join("/")),
283                };
284
285                let location = format!("{}/{}", location_prefix, table_ident.name());
286
287                let new_table_creation = TableCreation {
288                    location: Some(location.clone()),
289                    ..table_creation
290                };
291
292                (new_table_creation, location)
293            }
294        };
295
296        let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
297            .build()?
298            .metadata;
299        let metadata_location = MetadataLocation::new_with_metadata(location, &metadata);
300
301        metadata.write_to(&self.file_io, &metadata_location).await?;
302
303        root_namespace_state.insert_new_table(&table_ident, metadata_location.to_string())?;
304
305        Table::builder()
306            .file_io(self.file_io.clone())
307            .metadata_location(metadata_location.to_string())
308            .metadata(metadata)
309            .identifier(table_ident)
310            .build()
311    }
312
313    /// Load table from the catalog.
314    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
315        let root_namespace_state = self.root_namespace_state.lock().await;
316
317        self.load_table_from_locked_state(table_ident, &root_namespace_state)
318            .await
319    }
320
321    /// Drop a table from the catalog.
322    async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> {
323        let mut root_namespace_state = self.root_namespace_state.lock().await;
324
325        root_namespace_state.remove_existing_table(table_ident)?;
326        Ok(())
327    }
328
329    /// Check if a table exists in the catalog.
330    async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
331        let root_namespace_state = self.root_namespace_state.lock().await;
332
333        root_namespace_state.table_exists(table_ident)
334    }
335
336    /// Rename a table in the catalog.
337    async fn rename_table(
338        &self,
339        src_table_ident: &TableIdent,
340        dst_table_ident: &TableIdent,
341    ) -> Result<()> {
342        let mut root_namespace_state = self.root_namespace_state.lock().await;
343
344        let mut new_root_namespace_state = root_namespace_state.clone();
345        let metadata_location = new_root_namespace_state
346            .get_existing_table_location(src_table_ident)?
347            .clone();
348        new_root_namespace_state.remove_existing_table(src_table_ident)?;
349        new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?;
350        *root_namespace_state = new_root_namespace_state;
351
352        Ok(())
353    }
354
355    async fn register_table(
356        &self,
357        table_ident: &TableIdent,
358        metadata_location: String,
359    ) -> Result<Table> {
360        let mut root_namespace_state = self.root_namespace_state.lock().await;
361        root_namespace_state.insert_new_table(&table_ident.clone(), metadata_location.clone())?;
362
363        let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
364
365        Table::builder()
366            .file_io(self.file_io.clone())
367            .metadata_location(metadata_location)
368            .metadata(metadata)
369            .identifier(table_ident.clone())
370            .build()
371    }
372
373    /// Update a table in the catalog.
374    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
375        let mut root_namespace_state = self.root_namespace_state.lock().await;
376
377        let current_table = self
378            .load_table_from_locked_state(commit.identifier(), &root_namespace_state)
379            .await?;
380
381        // Apply TableCommit to get staged table
382        let staged_table = commit.apply(current_table)?;
383
384        // Write table metadata to the new location
385        let metadata_location =
386            MetadataLocation::from_str(staged_table.metadata_location_result()?)?;
387        staged_table
388            .metadata()
389            .write_to(staged_table.file_io(), &metadata_location)
390            .await?;
391
392        // Flip the pointer to reference the new metadata file.
393        let updated_table = root_namespace_state.commit_table_update(staged_table)?;
394
395        Ok(updated_table)
396    }
397}
398
399#[cfg(test)]
400pub(crate) mod tests {
401    use std::collections::HashSet;
402    use std::hash::Hash;
403    use std::iter::FromIterator;
404    use std::vec;
405
406    use regex::Regex;
407    use tempfile::TempDir;
408
409    use super::*;
410    use crate::io::FileIO;
411    use crate::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
412    use crate::transaction::{ApplyTransactionAction, Transaction};
413
414    fn temp_path() -> String {
415        let temp_dir = TempDir::new().unwrap();
416        temp_dir.path().to_str().unwrap().to_string()
417    }
418
419    pub(crate) async fn new_memory_catalog() -> impl Catalog {
420        let warehouse_location = temp_path();
421        MemoryCatalogBuilder::default()
422            .load(
423                "memory",
424                HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_location)]),
425            )
426            .await
427            .unwrap()
428    }
429
430    async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
431        let _ = catalog
432            .create_namespace(namespace_ident, HashMap::new())
433            .await
434            .unwrap();
435    }
436
437    async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
438        for namespace_ident in namespace_idents {
439            let _ = create_namespace(catalog, namespace_ident).await;
440        }
441    }
442
443    fn to_set<T: Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
444        HashSet::from_iter(vec)
445    }
446
447    fn simple_table_schema() -> Schema {
448        Schema::builder()
449            .with_fields(vec![
450                NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
451            ])
452            .build()
453            .unwrap()
454    }
455
456    async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) -> Table {
457        catalog
458            .create_table(
459                &table_ident.namespace,
460                TableCreation::builder()
461                    .name(table_ident.name().into())
462                    .schema(simple_table_schema())
463                    .build(),
464            )
465            .await
466            .unwrap()
467    }
468
469    async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
470        for table_ident in table_idents {
471            create_table(catalog, table_ident).await;
472        }
473    }
474
475    async fn create_table_with_namespace<C: Catalog>(catalog: &C) -> Table {
476        let namespace_ident = NamespaceIdent::new("abc".into());
477        create_namespace(catalog, &namespace_ident).await;
478
479        let table_ident = TableIdent::new(namespace_ident, "test".to_string());
480        create_table(catalog, &table_ident).await
481    }
482
483    fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
484        assert_eq!(table.identifier(), expected_table_ident);
485
486        let metadata = table.metadata();
487
488        assert_eq!(metadata.current_schema().as_ref(), expected_schema);
489
490        let expected_partition_spec = PartitionSpec::builder((*expected_schema).clone())
491            .with_spec_id(0)
492            .build()
493            .unwrap();
494
495        assert_eq!(
496            metadata
497                .partition_specs_iter()
498                .map(|p| p.as_ref())
499                .collect_vec(),
500            vec![&expected_partition_spec]
501        );
502
503        let expected_sorted_order = SortOrder::builder()
504            .with_order_id(0)
505            .with_fields(vec![])
506            .build(expected_schema)
507            .unwrap();
508
509        assert_eq!(
510            metadata
511                .sort_orders_iter()
512                .map(|s| s.as_ref())
513                .collect_vec(),
514            vec![&expected_sorted_order]
515        );
516
517        assert_eq!(metadata.properties(), &HashMap::new());
518
519        assert!(!table.readonly());
520    }
521
522    const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
523
524    fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
525        let actual = table.metadata_location().unwrap().to_string();
526        let regex = Regex::new(regex_str).unwrap();
527        assert!(
528            regex.is_match(&actual),
529            "Expected metadata location to match regex, but got location: {actual} and regex: {regex}"
530        )
531    }
532
533    #[tokio::test]
534    async fn test_list_namespaces_returns_empty_vector() {
535        let catalog = new_memory_catalog().await;
536
537        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
538    }
539
540    #[tokio::test]
541    async fn test_list_namespaces_returns_single_namespace() {
542        let catalog = new_memory_catalog().await;
543        let namespace_ident = NamespaceIdent::new("abc".into());
544        create_namespace(&catalog, &namespace_ident).await;
545
546        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
547            namespace_ident
548        ]);
549    }
550
551    #[tokio::test]
552    async fn test_list_namespaces_returns_multiple_namespaces() {
553        let catalog = new_memory_catalog().await;
554        let namespace_ident_1 = NamespaceIdent::new("a".into());
555        let namespace_ident_2 = NamespaceIdent::new("b".into());
556        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
557
558        assert_eq!(
559            to_set(catalog.list_namespaces(None).await.unwrap()),
560            to_set(vec![namespace_ident_1, namespace_ident_2])
561        );
562    }
563
564    #[tokio::test]
565    async fn test_list_namespaces_returns_only_top_level_namespaces() {
566        let catalog = new_memory_catalog().await;
567        let namespace_ident_1 = NamespaceIdent::new("a".into());
568        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
569        let namespace_ident_3 = NamespaceIdent::new("b".into());
570        create_namespaces(&catalog, &vec![
571            &namespace_ident_1,
572            &namespace_ident_2,
573            &namespace_ident_3,
574        ])
575        .await;
576
577        assert_eq!(
578            to_set(catalog.list_namespaces(None).await.unwrap()),
579            to_set(vec![namespace_ident_1, namespace_ident_3])
580        );
581    }
582
583    #[tokio::test]
584    async fn test_list_namespaces_returns_no_namespaces_under_parent() {
585        let catalog = new_memory_catalog().await;
586        let namespace_ident_1 = NamespaceIdent::new("a".into());
587        let namespace_ident_2 = NamespaceIdent::new("b".into());
588        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
589
590        assert_eq!(
591            catalog
592                .list_namespaces(Some(&namespace_ident_1))
593                .await
594                .unwrap(),
595            vec![]
596        );
597    }
598
599    #[tokio::test]
600    async fn test_list_namespaces_returns_namespace_under_parent() {
601        let catalog = new_memory_catalog().await;
602        let namespace_ident_1 = NamespaceIdent::new("a".into());
603        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
604        let namespace_ident_3 = NamespaceIdent::new("c".into());
605        create_namespaces(&catalog, &vec![
606            &namespace_ident_1,
607            &namespace_ident_2,
608            &namespace_ident_3,
609        ])
610        .await;
611
612        assert_eq!(
613            to_set(catalog.list_namespaces(None).await.unwrap()),
614            to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
615        );
616
617        assert_eq!(
618            catalog
619                .list_namespaces(Some(&namespace_ident_1))
620                .await
621                .unwrap(),
622            vec![namespace_ident_2]
623        );
624    }
625
626    #[tokio::test]
627    async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
628        let catalog = new_memory_catalog().await;
629        let namespace_ident_1 = NamespaceIdent::new("a".to_string());
630        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
631        let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
632        let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
633        let namespace_ident_5 = NamespaceIdent::new("b".into());
634        create_namespaces(&catalog, &vec![
635            &namespace_ident_1,
636            &namespace_ident_2,
637            &namespace_ident_3,
638            &namespace_ident_4,
639            &namespace_ident_5,
640        ])
641        .await;
642
643        assert_eq!(
644            to_set(
645                catalog
646                    .list_namespaces(Some(&namespace_ident_1))
647                    .await
648                    .unwrap()
649            ),
650            to_set(vec![
651                namespace_ident_2,
652                namespace_ident_3,
653                namespace_ident_4,
654            ])
655        );
656    }
657
658    #[tokio::test]
659    async fn test_namespace_exists_returns_false() {
660        let catalog = new_memory_catalog().await;
661        let namespace_ident = NamespaceIdent::new("a".into());
662        create_namespace(&catalog, &namespace_ident).await;
663
664        assert!(
665            !catalog
666                .namespace_exists(&NamespaceIdent::new("b".into()))
667                .await
668                .unwrap()
669        );
670    }
671
672    #[tokio::test]
673    async fn test_namespace_exists_returns_true() {
674        let catalog = new_memory_catalog().await;
675        let namespace_ident = NamespaceIdent::new("a".into());
676        create_namespace(&catalog, &namespace_ident).await;
677
678        assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
679    }
680
681    #[tokio::test]
682    async fn test_create_namespace_with_empty_properties() {
683        let catalog = new_memory_catalog().await;
684        let namespace_ident = NamespaceIdent::new("a".into());
685
686        assert_eq!(
687            catalog
688                .create_namespace(&namespace_ident, HashMap::new())
689                .await
690                .unwrap(),
691            Namespace::new(namespace_ident.clone())
692        );
693
694        assert_eq!(
695            catalog.get_namespace(&namespace_ident).await.unwrap(),
696            Namespace::with_properties(namespace_ident, HashMap::new())
697        );
698    }
699
700    #[tokio::test]
701    async fn test_create_namespace_with_properties() {
702        let catalog = new_memory_catalog().await;
703        let namespace_ident = NamespaceIdent::new("abc".into());
704
705        let mut properties: HashMap<String, String> = HashMap::new();
706        properties.insert("k".into(), "v".into());
707
708        assert_eq!(
709            catalog
710                .create_namespace(&namespace_ident, properties.clone())
711                .await
712                .unwrap(),
713            Namespace::with_properties(namespace_ident.clone(), properties.clone())
714        );
715
716        assert_eq!(
717            catalog.get_namespace(&namespace_ident).await.unwrap(),
718            Namespace::with_properties(namespace_ident, properties)
719        );
720    }
721
722    #[tokio::test]
723    async fn test_create_namespace_throws_error_if_namespace_already_exists() {
724        let catalog = new_memory_catalog().await;
725        let namespace_ident = NamespaceIdent::new("a".into());
726        create_namespace(&catalog, &namespace_ident).await;
727
728        assert_eq!(
729            catalog
730                .create_namespace(&namespace_ident, HashMap::new())
731                .await
732                .unwrap_err()
733                .to_string(),
734            format!(
735                "NamespaceAlreadyExists => Cannot create namespace {:?}. Namespace already exists.",
736                &namespace_ident
737            )
738        );
739
740        assert_eq!(
741            catalog.get_namespace(&namespace_ident).await.unwrap(),
742            Namespace::with_properties(namespace_ident, HashMap::new())
743        );
744    }
745
746    #[tokio::test]
747    async fn test_create_nested_namespace() {
748        let catalog = new_memory_catalog().await;
749        let parent_namespace_ident = NamespaceIdent::new("a".into());
750        create_namespace(&catalog, &parent_namespace_ident).await;
751
752        let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
753
754        assert_eq!(
755            catalog
756                .create_namespace(&child_namespace_ident, HashMap::new())
757                .await
758                .unwrap(),
759            Namespace::new(child_namespace_ident.clone())
760        );
761
762        assert_eq!(
763            catalog.get_namespace(&child_namespace_ident).await.unwrap(),
764            Namespace::with_properties(child_namespace_ident, HashMap::new())
765        );
766    }
767
768    #[tokio::test]
769    async fn test_create_deeply_nested_namespace() {
770        let catalog = new_memory_catalog().await;
771        let namespace_ident_a = NamespaceIdent::new("a".into());
772        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
773        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
774
775        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
776
777        assert_eq!(
778            catalog
779                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
780                .await
781                .unwrap(),
782            Namespace::new(namespace_ident_a_b_c.clone())
783        );
784
785        assert_eq!(
786            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
787            Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
788        );
789    }
790
791    #[tokio::test]
792    async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() {
793        let catalog = new_memory_catalog().await;
794
795        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
796
797        assert_eq!(
798            catalog
799                .create_namespace(&nested_namespace_ident, HashMap::new())
800                .await
801                .unwrap_err()
802                .to_string(),
803            format!(
804                "NamespaceNotFound => No such namespace: {:?}",
805                NamespaceIdent::new("a".into())
806            )
807        );
808
809        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
810    }
811
812    #[tokio::test]
813    async fn test_create_deeply_nested_namespace_throws_error_if_intermediate_namespace_doesnt_exist()
814     {
815        let catalog = new_memory_catalog().await;
816
817        let namespace_ident_a = NamespaceIdent::new("a".into());
818        create_namespace(&catalog, &namespace_ident_a).await;
819
820        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
821
822        assert_eq!(
823            catalog
824                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
825                .await
826                .unwrap_err()
827                .to_string(),
828            format!(
829                "NamespaceNotFound => No such namespace: {:?}",
830                NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()
831            )
832        );
833
834        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
835            namespace_ident_a.clone()
836        ]);
837
838        assert_eq!(
839            catalog
840                .list_namespaces(Some(&namespace_ident_a))
841                .await
842                .unwrap(),
843            vec![]
844        );
845    }
846
847    #[tokio::test]
848    async fn test_get_namespace() {
849        let catalog = new_memory_catalog().await;
850        let namespace_ident = NamespaceIdent::new("abc".into());
851
852        let mut properties: HashMap<String, String> = HashMap::new();
853        properties.insert("k".into(), "v".into());
854        let _ = catalog
855            .create_namespace(&namespace_ident, properties.clone())
856            .await
857            .unwrap();
858
859        assert_eq!(
860            catalog.get_namespace(&namespace_ident).await.unwrap(),
861            Namespace::with_properties(namespace_ident, properties)
862        )
863    }
864
865    #[tokio::test]
866    async fn test_get_nested_namespace() {
867        let catalog = new_memory_catalog().await;
868        let namespace_ident_a = NamespaceIdent::new("a".into());
869        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
870        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
871
872        assert_eq!(
873            catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
874            Namespace::with_properties(namespace_ident_a_b, HashMap::new())
875        );
876    }
877
878    #[tokio::test]
879    async fn test_get_deeply_nested_namespace() {
880        let catalog = new_memory_catalog().await;
881        let namespace_ident_a = NamespaceIdent::new("a".into());
882        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
883        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
884        create_namespaces(&catalog, &vec![
885            &namespace_ident_a,
886            &namespace_ident_a_b,
887            &namespace_ident_a_b_c,
888        ])
889        .await;
890
891        assert_eq!(
892            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
893            Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
894        );
895    }
896
897    #[tokio::test]
898    async fn test_get_namespace_throws_error_if_namespace_doesnt_exist() {
899        let catalog = new_memory_catalog().await;
900        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
901
902        let non_existent_namespace_ident = NamespaceIdent::new("b".into());
903        assert_eq!(
904            catalog
905                .get_namespace(&non_existent_namespace_ident)
906                .await
907                .unwrap_err()
908                .to_string(),
909            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
910        )
911    }
912
913    #[tokio::test]
914    async fn test_update_namespace() {
915        let catalog = new_memory_catalog().await;
916        let namespace_ident = NamespaceIdent::new("abc".into());
917        create_namespace(&catalog, &namespace_ident).await;
918
919        let mut new_properties: HashMap<String, String> = HashMap::new();
920        new_properties.insert("k".into(), "v".into());
921
922        catalog
923            .update_namespace(&namespace_ident, new_properties.clone())
924            .await
925            .unwrap();
926
927        assert_eq!(
928            catalog.get_namespace(&namespace_ident).await.unwrap(),
929            Namespace::with_properties(namespace_ident, new_properties)
930        )
931    }
932
933    #[tokio::test]
934    async fn test_update_nested_namespace() {
935        let catalog = new_memory_catalog().await;
936        let namespace_ident_a = NamespaceIdent::new("a".into());
937        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
938        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
939
940        let mut new_properties = HashMap::new();
941        new_properties.insert("k".into(), "v".into());
942
943        catalog
944            .update_namespace(&namespace_ident_a_b, new_properties.clone())
945            .await
946            .unwrap();
947
948        assert_eq!(
949            catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
950            Namespace::with_properties(namespace_ident_a_b, new_properties)
951        );
952    }
953
954    #[tokio::test]
955    async fn test_update_deeply_nested_namespace() {
956        let catalog = new_memory_catalog().await;
957        let namespace_ident_a = NamespaceIdent::new("a".into());
958        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
959        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
960        create_namespaces(&catalog, &vec![
961            &namespace_ident_a,
962            &namespace_ident_a_b,
963            &namespace_ident_a_b_c,
964        ])
965        .await;
966
967        let mut new_properties = HashMap::new();
968        new_properties.insert("k".into(), "v".into());
969
970        catalog
971            .update_namespace(&namespace_ident_a_b_c, new_properties.clone())
972            .await
973            .unwrap();
974
975        assert_eq!(
976            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
977            Namespace::with_properties(namespace_ident_a_b_c, new_properties)
978        );
979    }
980
981    #[tokio::test]
982    async fn test_update_namespace_throws_error_if_namespace_doesnt_exist() {
983        let catalog = new_memory_catalog().await;
984        create_namespace(&catalog, &NamespaceIdent::new("abc".into())).await;
985
986        let non_existent_namespace_ident = NamespaceIdent::new("def".into());
987        assert_eq!(
988            catalog
989                .update_namespace(&non_existent_namespace_ident, HashMap::new())
990                .await
991                .unwrap_err()
992                .to_string(),
993            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
994        )
995    }
996
997    #[tokio::test]
998    async fn test_drop_namespace() {
999        let catalog = new_memory_catalog().await;
1000        let namespace_ident = NamespaceIdent::new("abc".into());
1001        create_namespace(&catalog, &namespace_ident).await;
1002
1003        catalog.drop_namespace(&namespace_ident).await.unwrap();
1004
1005        assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
1006    }
1007
1008    #[tokio::test]
1009    async fn test_drop_nested_namespace() {
1010        let catalog = new_memory_catalog().await;
1011        let namespace_ident_a = NamespaceIdent::new("a".into());
1012        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1013        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1014
1015        catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1016
1017        assert!(
1018            !catalog
1019                .namespace_exists(&namespace_ident_a_b)
1020                .await
1021                .unwrap()
1022        );
1023
1024        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1025    }
1026
1027    #[tokio::test]
1028    async fn test_drop_deeply_nested_namespace() {
1029        let catalog = new_memory_catalog().await;
1030        let namespace_ident_a = NamespaceIdent::new("a".into());
1031        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1032        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1033        create_namespaces(&catalog, &vec![
1034            &namespace_ident_a,
1035            &namespace_ident_a_b,
1036            &namespace_ident_a_b_c,
1037        ])
1038        .await;
1039
1040        catalog
1041            .drop_namespace(&namespace_ident_a_b_c)
1042            .await
1043            .unwrap();
1044
1045        assert!(
1046            !catalog
1047                .namespace_exists(&namespace_ident_a_b_c)
1048                .await
1049                .unwrap()
1050        );
1051
1052        assert!(
1053            catalog
1054                .namespace_exists(&namespace_ident_a_b)
1055                .await
1056                .unwrap()
1057        );
1058
1059        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1060    }
1061
1062    #[tokio::test]
1063    async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1064        let catalog = new_memory_catalog().await;
1065
1066        let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1067        assert_eq!(
1068            catalog
1069                .drop_namespace(&non_existent_namespace_ident)
1070                .await
1071                .unwrap_err()
1072                .to_string(),
1073            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1074        )
1075    }
1076
1077    #[tokio::test]
1078    async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1079        let catalog = new_memory_catalog().await;
1080        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1081
1082        let non_existent_namespace_ident =
1083            NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1084        assert_eq!(
1085            catalog
1086                .drop_namespace(&non_existent_namespace_ident)
1087                .await
1088                .unwrap_err()
1089                .to_string(),
1090            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1091        )
1092    }
1093
1094    #[tokio::test]
1095    async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() {
1096        let catalog = new_memory_catalog().await;
1097        let namespace_ident_a = NamespaceIdent::new("a".into());
1098        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1099        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1100
1101        catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1102
1103        assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1104
1105        assert!(
1106            !catalog
1107                .namespace_exists(&namespace_ident_a_b)
1108                .await
1109                .unwrap()
1110        );
1111    }
1112
1113    #[tokio::test]
1114    async fn test_create_table_with_location() {
1115        let tmp_dir = TempDir::new().unwrap();
1116        let catalog = new_memory_catalog().await;
1117        let namespace_ident = NamespaceIdent::new("a".into());
1118        create_namespace(&catalog, &namespace_ident).await;
1119
1120        let table_name = "abc";
1121        let location = tmp_dir.path().to_str().unwrap().to_string();
1122        let table_creation = TableCreation::builder()
1123            .name(table_name.into())
1124            .location(location.clone())
1125            .schema(simple_table_schema())
1126            .build();
1127
1128        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1129
1130        assert_table_eq(
1131            &catalog
1132                .create_table(&namespace_ident, table_creation)
1133                .await
1134                .unwrap(),
1135            &expected_table_ident,
1136            &simple_table_schema(),
1137        );
1138
1139        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1140
1141        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1142
1143        assert!(
1144            table
1145                .metadata_location()
1146                .unwrap()
1147                .to_string()
1148                .starts_with(&location)
1149        )
1150    }
1151
1152    #[tokio::test]
1153    async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1154        let warehouse_location = temp_path();
1155        let catalog = MemoryCatalogBuilder::default()
1156            .load(
1157                "memory",
1158                HashMap::from([(
1159                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1160                    warehouse_location.clone(),
1161                )]),
1162            )
1163            .await
1164            .unwrap();
1165
1166        let namespace_ident = NamespaceIdent::new("a".into());
1167        let mut namespace_properties = HashMap::new();
1168        let namespace_location = temp_path();
1169        namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1170        catalog
1171            .create_namespace(&namespace_ident, namespace_properties)
1172            .await
1173            .unwrap();
1174
1175        let table_name = "tbl1";
1176        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1177        let expected_table_metadata_location_regex =
1178            format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1179
1180        let table = catalog
1181            .create_table(
1182                &namespace_ident,
1183                TableCreation::builder()
1184                    .name(table_name.into())
1185                    .schema(simple_table_schema())
1186                    // no location specified for table
1187                    .build(),
1188            )
1189            .await
1190            .unwrap();
1191        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1192        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1193
1194        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1195        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1196        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1197    }
1198
1199    #[tokio::test]
1200    async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1201     {
1202        let warehouse_location = temp_path();
1203        let catalog = MemoryCatalogBuilder::default()
1204            .load(
1205                "memory",
1206                HashMap::from([(
1207                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1208                    warehouse_location.clone(),
1209                )]),
1210            )
1211            .await
1212            .unwrap();
1213
1214        let namespace_ident = NamespaceIdent::new("a".into());
1215        let mut namespace_properties = HashMap::new();
1216        let namespace_location = temp_path();
1217        namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1218        catalog
1219            .create_namespace(&namespace_ident, namespace_properties)
1220            .await
1221            .unwrap();
1222
1223        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1224        let mut nested_namespace_properties = HashMap::new();
1225        let nested_namespace_location = temp_path();
1226        nested_namespace_properties
1227            .insert(LOCATION.to_string(), nested_namespace_location.to_string());
1228        catalog
1229            .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1230            .await
1231            .unwrap();
1232
1233        let table_name = "tbl1";
1234        let expected_table_ident =
1235            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1236        let expected_table_metadata_location_regex = format!(
1237            "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1238        );
1239
1240        let table = catalog
1241            .create_table(
1242                &nested_namespace_ident,
1243                TableCreation::builder()
1244                    .name(table_name.into())
1245                    .schema(simple_table_schema())
1246                    // no location specified for table
1247                    .build(),
1248            )
1249            .await
1250            .unwrap();
1251        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1252        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1253
1254        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1255        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1256        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1257    }
1258
1259    #[tokio::test]
1260    async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1261     {
1262        let warehouse_location = temp_path();
1263        let catalog = MemoryCatalogBuilder::default()
1264            .load(
1265                "memory",
1266                HashMap::from([(
1267                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1268                    warehouse_location.clone(),
1269                )]),
1270            )
1271            .await
1272            .unwrap();
1273
1274        let namespace_ident = NamespaceIdent::new("a".into());
1275        // note: no location specified in namespace_properties
1276        let namespace_properties = HashMap::new();
1277        catalog
1278            .create_namespace(&namespace_ident, namespace_properties)
1279            .await
1280            .unwrap();
1281
1282        let table_name = "tbl1";
1283        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1284        let expected_table_metadata_location_regex =
1285            format!("^{warehouse_location}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1286
1287        let table = catalog
1288            .create_table(
1289                &namespace_ident,
1290                TableCreation::builder()
1291                    .name(table_name.into())
1292                    .schema(simple_table_schema())
1293                    // no location specified for table
1294                    .build(),
1295            )
1296            .await
1297            .unwrap();
1298        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1299        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1300
1301        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1302        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1303        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1304    }
1305
1306    #[tokio::test]
1307    async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1308     {
1309        let warehouse_location = temp_path();
1310        let catalog = MemoryCatalogBuilder::default()
1311            .load(
1312                "memory",
1313                HashMap::from([(
1314                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1315                    warehouse_location.clone(),
1316                )]),
1317            )
1318            .await
1319            .unwrap();
1320
1321        let namespace_ident = NamespaceIdent::new("a".into());
1322        catalog
1323            // note: no location specified in namespace_properties
1324            .create_namespace(&namespace_ident, HashMap::new())
1325            .await
1326            .unwrap();
1327
1328        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1329        catalog
1330            // note: no location specified in namespace_properties
1331            .create_namespace(&nested_namespace_ident, HashMap::new())
1332            .await
1333            .unwrap();
1334
1335        let table_name = "tbl1";
1336        let expected_table_ident =
1337            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1338        let expected_table_metadata_location_regex = format!(
1339            "^{warehouse_location}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$"
1340        );
1341
1342        let table = catalog
1343            .create_table(
1344                &nested_namespace_ident,
1345                TableCreation::builder()
1346                    .name(table_name.into())
1347                    .schema(simple_table_schema())
1348                    // no location specified for table
1349                    .build(),
1350            )
1351            .await
1352            .unwrap();
1353        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1354        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1355
1356        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1357        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1358        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1359    }
1360
1361    #[tokio::test]
1362    async fn test_create_table_throws_error_if_table_location_and_namespace_location_and_warehouse_location_are_missing()
1363     {
1364        let catalog = MemoryCatalogBuilder::default()
1365            .load("memory", HashMap::from([]))
1366            .await;
1367
1368        assert!(catalog.is_err());
1369        assert_eq!(
1370            catalog.unwrap_err().to_string(),
1371            "DataInvalid => Catalog warehouse is required"
1372        );
1373    }
1374
1375    #[tokio::test]
1376    async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1377        let catalog = new_memory_catalog().await;
1378        let namespace_ident = NamespaceIdent::new("a".into());
1379        create_namespace(&catalog, &namespace_ident).await;
1380        let table_name = "tbl1";
1381        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1382        create_table(&catalog, &table_ident).await;
1383
1384        let tmp_dir = TempDir::new().unwrap();
1385        let location = tmp_dir.path().to_str().unwrap().to_string();
1386
1387        assert_eq!(
1388            catalog
1389                .create_table(
1390                    &namespace_ident,
1391                    TableCreation::builder()
1392                        .name(table_name.into())
1393                        .schema(simple_table_schema())
1394                        .location(location)
1395                        .build()
1396                )
1397                .await
1398                .unwrap_err()
1399                .to_string(),
1400            format!(
1401                "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1402                &table_ident
1403            )
1404        );
1405    }
1406
1407    #[tokio::test]
1408    async fn test_list_tables_returns_empty_vector() {
1409        let catalog = new_memory_catalog().await;
1410        let namespace_ident = NamespaceIdent::new("a".into());
1411        create_namespace(&catalog, &namespace_ident).await;
1412
1413        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1414    }
1415
1416    #[tokio::test]
1417    async fn test_list_tables_returns_a_single_table() {
1418        let catalog = new_memory_catalog().await;
1419        let namespace_ident = NamespaceIdent::new("n1".into());
1420        create_namespace(&catalog, &namespace_ident).await;
1421
1422        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1423        create_table(&catalog, &table_ident).await;
1424
1425        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1426            table_ident
1427        ]);
1428    }
1429
1430    #[tokio::test]
1431    async fn test_list_tables_returns_multiple_tables() {
1432        let catalog = new_memory_catalog().await;
1433        let namespace_ident = NamespaceIdent::new("n1".into());
1434        create_namespace(&catalog, &namespace_ident).await;
1435
1436        let table_ident_1 = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1437        let table_ident_2 = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1438        let _ = create_tables(&catalog, vec![&table_ident_1, &table_ident_2]).await;
1439
1440        assert_eq!(
1441            to_set(catalog.list_tables(&namespace_ident).await.unwrap()),
1442            to_set(vec![table_ident_1, table_ident_2])
1443        );
1444    }
1445
1446    #[tokio::test]
1447    async fn test_list_tables_returns_tables_from_correct_namespace() {
1448        let catalog = new_memory_catalog().await;
1449        let namespace_ident_1 = NamespaceIdent::new("n1".into());
1450        let namespace_ident_2 = NamespaceIdent::new("n2".into());
1451        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1452
1453        let table_ident_1 = TableIdent::new(namespace_ident_1.clone(), "tbl1".into());
1454        let table_ident_2 = TableIdent::new(namespace_ident_1.clone(), "tbl2".into());
1455        let table_ident_3 = TableIdent::new(namespace_ident_2.clone(), "tbl1".into());
1456        let _ = create_tables(&catalog, vec![
1457            &table_ident_1,
1458            &table_ident_2,
1459            &table_ident_3,
1460        ])
1461        .await;
1462
1463        assert_eq!(
1464            to_set(catalog.list_tables(&namespace_ident_1).await.unwrap()),
1465            to_set(vec![table_ident_1, table_ident_2])
1466        );
1467
1468        assert_eq!(
1469            to_set(catalog.list_tables(&namespace_ident_2).await.unwrap()),
1470            to_set(vec![table_ident_3])
1471        );
1472    }
1473
1474    #[tokio::test]
1475    async fn test_list_tables_returns_table_under_nested_namespace() {
1476        let catalog = new_memory_catalog().await;
1477        let namespace_ident_a = NamespaceIdent::new("a".into());
1478        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1479        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1480
1481        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1482        create_table(&catalog, &table_ident).await;
1483
1484        assert_eq!(
1485            catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1486            vec![table_ident]
1487        );
1488    }
1489
1490    #[tokio::test]
1491    async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1492        let catalog = new_memory_catalog().await;
1493
1494        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1495
1496        assert_eq!(
1497            catalog
1498                .list_tables(&non_existent_namespace_ident)
1499                .await
1500                .unwrap_err()
1501                .to_string(),
1502            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1503        );
1504    }
1505
1506    #[tokio::test]
1507    async fn test_drop_table() {
1508        let catalog = new_memory_catalog().await;
1509        let namespace_ident = NamespaceIdent::new("n1".into());
1510        create_namespace(&catalog, &namespace_ident).await;
1511        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1512        create_table(&catalog, &table_ident).await;
1513
1514        catalog.drop_table(&table_ident).await.unwrap();
1515    }
1516
1517    #[tokio::test]
1518    async fn test_drop_table_drops_table_under_nested_namespace() {
1519        let catalog = new_memory_catalog().await;
1520        let namespace_ident_a = NamespaceIdent::new("a".into());
1521        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1522        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1523
1524        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1525        create_table(&catalog, &table_ident).await;
1526
1527        catalog.drop_table(&table_ident).await.unwrap();
1528
1529        assert_eq!(
1530            catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1531            vec![]
1532        );
1533    }
1534
1535    #[tokio::test]
1536    async fn test_drop_table_throws_error_if_namespace_doesnt_exist() {
1537        let catalog = new_memory_catalog().await;
1538
1539        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1540        let non_existent_table_ident =
1541            TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1542
1543        assert_eq!(
1544            catalog
1545                .drop_table(&non_existent_table_ident)
1546                .await
1547                .unwrap_err()
1548                .to_string(),
1549            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1550        );
1551    }
1552
1553    #[tokio::test]
1554    async fn test_drop_table_throws_error_if_table_doesnt_exist() {
1555        let catalog = new_memory_catalog().await;
1556        let namespace_ident = NamespaceIdent::new("n1".into());
1557        create_namespace(&catalog, &namespace_ident).await;
1558
1559        let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1560
1561        assert_eq!(
1562            catalog
1563                .drop_table(&non_existent_table_ident)
1564                .await
1565                .unwrap_err()
1566                .to_string(),
1567            format!("TableNotFound => No such table: {non_existent_table_ident:?}"),
1568        );
1569    }
1570
1571    #[tokio::test]
1572    async fn test_table_exists_returns_true() {
1573        let catalog = new_memory_catalog().await;
1574        let namespace_ident = NamespaceIdent::new("n1".into());
1575        create_namespace(&catalog, &namespace_ident).await;
1576        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1577        create_table(&catalog, &table_ident).await;
1578
1579        assert!(catalog.table_exists(&table_ident).await.unwrap());
1580    }
1581
1582    #[tokio::test]
1583    async fn test_table_exists_returns_false() {
1584        let catalog = new_memory_catalog().await;
1585        let namespace_ident = NamespaceIdent::new("n1".into());
1586        create_namespace(&catalog, &namespace_ident).await;
1587        let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1588
1589        assert!(
1590            !catalog
1591                .table_exists(&non_existent_table_ident)
1592                .await
1593                .unwrap()
1594        );
1595    }
1596
1597    #[tokio::test]
1598    async fn test_table_exists_under_nested_namespace() {
1599        let catalog = new_memory_catalog().await;
1600        let namespace_ident_a = NamespaceIdent::new("a".into());
1601        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1602        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1603
1604        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1605        create_table(&catalog, &table_ident).await;
1606
1607        assert!(catalog.table_exists(&table_ident).await.unwrap());
1608
1609        let non_existent_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl2".into());
1610        assert!(
1611            !catalog
1612                .table_exists(&non_existent_table_ident)
1613                .await
1614                .unwrap()
1615        );
1616    }
1617
1618    #[tokio::test]
1619    async fn test_table_exists_throws_error_if_namespace_doesnt_exist() {
1620        let catalog = new_memory_catalog().await;
1621
1622        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1623        let non_existent_table_ident =
1624            TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1625
1626        assert_eq!(
1627            catalog
1628                .table_exists(&non_existent_table_ident)
1629                .await
1630                .unwrap_err()
1631                .to_string(),
1632            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1633        );
1634    }
1635
1636    #[tokio::test]
1637    async fn test_rename_table_in_same_namespace() {
1638        let catalog = new_memory_catalog().await;
1639        let namespace_ident = NamespaceIdent::new("n1".into());
1640        create_namespace(&catalog, &namespace_ident).await;
1641        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1642        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1643        create_table(&catalog, &src_table_ident).await;
1644
1645        catalog
1646            .rename_table(&src_table_ident, &dst_table_ident)
1647            .await
1648            .unwrap();
1649
1650        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1651            dst_table_ident
1652        ],);
1653    }
1654
1655    #[tokio::test]
1656    async fn test_rename_table_across_namespaces() {
1657        let catalog = new_memory_catalog().await;
1658        let src_namespace_ident = NamespaceIdent::new("a".into());
1659        let dst_namespace_ident = NamespaceIdent::new("b".into());
1660        create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
1661        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1662        let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
1663        create_table(&catalog, &src_table_ident).await;
1664
1665        catalog
1666            .rename_table(&src_table_ident, &dst_table_ident)
1667            .await
1668            .unwrap();
1669
1670        assert_eq!(
1671            catalog.list_tables(&src_namespace_ident).await.unwrap(),
1672            vec![],
1673        );
1674
1675        assert_eq!(
1676            catalog.list_tables(&dst_namespace_ident).await.unwrap(),
1677            vec![dst_table_ident],
1678        );
1679    }
1680
1681    #[tokio::test]
1682    async fn test_rename_table_src_table_is_same_as_dst_table() {
1683        let catalog = new_memory_catalog().await;
1684        let namespace_ident = NamespaceIdent::new("n1".into());
1685        create_namespace(&catalog, &namespace_ident).await;
1686        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1687        create_table(&catalog, &table_ident).await;
1688
1689        catalog
1690            .rename_table(&table_ident, &table_ident)
1691            .await
1692            .unwrap();
1693
1694        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1695            table_ident
1696        ],);
1697    }
1698
1699    #[tokio::test]
1700    async fn test_rename_table_across_nested_namespaces() {
1701        let catalog = new_memory_catalog().await;
1702        let namespace_ident_a = NamespaceIdent::new("a".into());
1703        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1704        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1705        create_namespaces(&catalog, &vec![
1706            &namespace_ident_a,
1707            &namespace_ident_a_b,
1708            &namespace_ident_a_b_c,
1709        ])
1710        .await;
1711
1712        let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
1713        create_tables(&catalog, vec![&src_table_ident]).await;
1714
1715        let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1716        catalog
1717            .rename_table(&src_table_ident, &dst_table_ident)
1718            .await
1719            .unwrap();
1720
1721        assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
1722
1723        assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
1724    }
1725
1726    #[tokio::test]
1727    async fn test_rename_table_throws_error_if_src_namespace_doesnt_exist() {
1728        let catalog = new_memory_catalog().await;
1729
1730        let non_existent_src_namespace_ident = NamespaceIdent::new("n1".into());
1731        let src_table_ident =
1732            TableIdent::new(non_existent_src_namespace_ident.clone(), "tbl1".into());
1733
1734        let dst_namespace_ident = NamespaceIdent::new("n2".into());
1735        create_namespace(&catalog, &dst_namespace_ident).await;
1736        let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl1".into());
1737
1738        assert_eq!(
1739            catalog
1740                .rename_table(&src_table_ident, &dst_table_ident)
1741                .await
1742                .unwrap_err()
1743                .to_string(),
1744            format!("NamespaceNotFound => No such namespace: {non_existent_src_namespace_ident:?}"),
1745        );
1746    }
1747
1748    #[tokio::test]
1749    async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
1750        let catalog = new_memory_catalog().await;
1751        let src_namespace_ident = NamespaceIdent::new("n1".into());
1752        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1753        create_namespace(&catalog, &src_namespace_ident).await;
1754        create_table(&catalog, &src_table_ident).await;
1755
1756        let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
1757        let dst_table_ident =
1758            TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
1759        assert_eq!(
1760            catalog
1761                .rename_table(&src_table_ident, &dst_table_ident)
1762                .await
1763                .unwrap_err()
1764                .to_string(),
1765            format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"),
1766        );
1767    }
1768
1769    #[tokio::test]
1770    async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
1771        let catalog = new_memory_catalog().await;
1772        let namespace_ident = NamespaceIdent::new("n1".into());
1773        create_namespace(&catalog, &namespace_ident).await;
1774        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1775        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1776
1777        assert_eq!(
1778            catalog
1779                .rename_table(&src_table_ident, &dst_table_ident)
1780                .await
1781                .unwrap_err()
1782                .to_string(),
1783            format!("TableNotFound => No such table: {src_table_ident:?}"),
1784        );
1785    }
1786
1787    #[tokio::test]
1788    async fn test_rename_table_throws_error_if_dst_table_already_exists() {
1789        let catalog = new_memory_catalog().await;
1790        let namespace_ident = NamespaceIdent::new("n1".into());
1791        create_namespace(&catalog, &namespace_ident).await;
1792        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1793        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1794        create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
1795
1796        assert_eq!(
1797            catalog
1798                .rename_table(&src_table_ident, &dst_table_ident)
1799                .await
1800                .unwrap_err()
1801                .to_string(),
1802            format!(
1803                "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1804                &dst_table_ident
1805            ),
1806        );
1807    }
1808
1809    #[tokio::test]
1810    async fn test_register_table() {
1811        // Create a catalog and namespace
1812        let catalog = new_memory_catalog().await;
1813        let namespace_ident = NamespaceIdent::new("test_namespace".into());
1814        create_namespace(&catalog, &namespace_ident).await;
1815
1816        // Create a table to get a valid metadata file
1817        let source_table_ident = TableIdent::new(namespace_ident.clone(), "source_table".into());
1818        create_table(&catalog, &source_table_ident).await;
1819
1820        // Get the metadata location from the source table
1821        let source_table = catalog.load_table(&source_table_ident).await.unwrap();
1822        let metadata_location = source_table.metadata_location().unwrap().to_string();
1823
1824        // Register a new table using the same metadata location
1825        let register_table_ident =
1826            TableIdent::new(namespace_ident.clone(), "register_table".into());
1827        let registered_table = catalog
1828            .register_table(&register_table_ident, metadata_location.clone())
1829            .await
1830            .unwrap();
1831
1832        // Verify the registered table has the correct identifier
1833        assert_eq!(registered_table.identifier(), &register_table_ident);
1834
1835        // Verify the registered table has the correct metadata location
1836        assert_eq!(
1837            registered_table.metadata_location().unwrap().to_string(),
1838            metadata_location
1839        );
1840
1841        // Verify the table exists in the catalog
1842        assert!(catalog.table_exists(&register_table_ident).await.unwrap());
1843
1844        // Verify we can load the registered table
1845        let loaded_table = catalog.load_table(&register_table_ident).await.unwrap();
1846        assert_eq!(loaded_table.identifier(), &register_table_ident);
1847        assert_eq!(
1848            loaded_table.metadata_location().unwrap().to_string(),
1849            metadata_location
1850        );
1851    }
1852
1853    #[tokio::test]
1854    async fn test_update_table() {
1855        let catalog = new_memory_catalog().await;
1856
1857        let table = create_table_with_namespace(&catalog).await;
1858
1859        // Assert the table doesn't contain the update yet
1860        assert!(!table.metadata().properties().contains_key("key"));
1861
1862        // Update table metadata
1863        let tx = Transaction::new(&table);
1864        let updated_table = tx
1865            .update_table_properties()
1866            .set("key".to_string(), "value".to_string())
1867            .apply(tx)
1868            .unwrap()
1869            .commit(&catalog)
1870            .await
1871            .unwrap();
1872
1873        assert_eq!(
1874            updated_table.metadata().properties().get("key").unwrap(),
1875            "value"
1876        );
1877
1878        assert_eq!(table.identifier(), updated_table.identifier());
1879        assert_eq!(table.metadata().uuid(), updated_table.metadata().uuid());
1880        assert!(table.metadata().last_updated_ms() < updated_table.metadata().last_updated_ms());
1881        assert_ne!(table.metadata_location(), updated_table.metadata_location());
1882
1883        assert!(
1884            table.metadata().metadata_log().len() < updated_table.metadata().metadata_log().len()
1885        );
1886    }
1887
1888    #[tokio::test]
1889    async fn test_update_table_fails_if_table_doesnt_exist() {
1890        let catalog = new_memory_catalog().await;
1891
1892        let namespace_ident = NamespaceIdent::new("a".into());
1893        create_namespace(&catalog, &namespace_ident).await;
1894
1895        // This table is not known to the catalog.
1896        let table_ident = TableIdent::new(namespace_ident, "test".to_string());
1897        let table = build_table(table_ident);
1898
1899        let tx = Transaction::new(&table);
1900        let err = tx
1901            .update_table_properties()
1902            .set("key".to_string(), "value".to_string())
1903            .apply(tx)
1904            .unwrap()
1905            .commit(&catalog)
1906            .await
1907            .unwrap_err();
1908        assert_eq!(err.kind(), ErrorKind::TableNotFound);
1909    }
1910
1911    fn build_table(ident: TableIdent) -> Table {
1912        let file_io = FileIO::new_with_fs();
1913
1914        let temp_dir = TempDir::new().unwrap();
1915        let location = temp_dir.path().to_str().unwrap().to_string();
1916
1917        let table_creation = TableCreation::builder()
1918            .name(ident.name().to_string())
1919            .schema(simple_table_schema())
1920            .location(location)
1921            .build();
1922        let metadata = TableMetadataBuilder::from_table_creation(table_creation)
1923            .unwrap()
1924            .build()
1925            .unwrap()
1926            .metadata;
1927
1928        Table::builder()
1929            .identifier(ident)
1930            .metadata(metadata)
1931            .file_io(file_io)
1932            .build()
1933            .unwrap()
1934    }
1935}