iceberg/catalog/memory/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains memory catalog implementation.
19
20use std::collections::HashMap;
21use std::str::FromStr;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use futures::lock::{Mutex, MutexGuard};
26use itertools::Itertools;
27
28use super::namespace_state::NamespaceState;
29use crate::io::{FileIO, FileIOBuilder, MemoryStorageFactory, StorageFactory};
30use crate::spec::{TableMetadata, TableMetadataBuilder};
31use crate::table::Table;
32use crate::{
33    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
34    TableCommit, TableCreation, TableIdent,
35};
36
37/// Memory catalog warehouse location
38pub const MEMORY_CATALOG_WAREHOUSE: &str = "warehouse";
39
40/// namespace `location` property
41const LOCATION: &str = "location";
42
43/// Builder for [`MemoryCatalog`].
44#[derive(Debug)]
45pub struct MemoryCatalogBuilder {
46    config: MemoryCatalogConfig,
47    storage_factory: Option<Arc<dyn StorageFactory>>,
48}
49
50impl Default for MemoryCatalogBuilder {
51    fn default() -> Self {
52        Self {
53            config: MemoryCatalogConfig {
54                name: None,
55                warehouse: "".to_string(),
56                props: HashMap::new(),
57            },
58            storage_factory: None,
59        }
60    }
61}
62
63impl CatalogBuilder for MemoryCatalogBuilder {
64    type C = MemoryCatalog;
65
66    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
67        self.storage_factory = Some(storage_factory);
68        self
69    }
70
71    fn load(
72        mut self,
73        name: impl Into<String>,
74        props: HashMap<String, String>,
75    ) -> impl Future<Output = Result<Self::C>> + Send {
76        self.config.name = Some(name.into());
77
78        if props.contains_key(MEMORY_CATALOG_WAREHOUSE) {
79            self.config.warehouse = props
80                .get(MEMORY_CATALOG_WAREHOUSE)
81                .cloned()
82                .unwrap_or_default()
83        }
84
85        // Collect other remaining properties
86        self.config.props = props
87            .into_iter()
88            .filter(|(k, _)| k != MEMORY_CATALOG_WAREHOUSE)
89            .collect();
90
91        let result = {
92            if self.config.name.is_none() {
93                Err(Error::new(
94                    ErrorKind::DataInvalid,
95                    "Catalog name is required",
96                ))
97            } else if self.config.warehouse.is_empty() {
98                Err(Error::new(
99                    ErrorKind::DataInvalid,
100                    "Catalog warehouse is required",
101                ))
102            } else {
103                MemoryCatalog::new(self.config, self.storage_factory)
104            }
105        };
106
107        std::future::ready(result)
108    }
109}
110
111#[derive(Clone, Debug)]
112pub(crate) struct MemoryCatalogConfig {
113    name: Option<String>,
114    warehouse: String,
115    props: HashMap<String, String>,
116}
117
118/// Memory catalog implementation.
119#[derive(Debug)]
120pub struct MemoryCatalog {
121    root_namespace_state: Mutex<NamespaceState>,
122    file_io: FileIO,
123    warehouse_location: String,
124}
125
126impl MemoryCatalog {
127    /// Creates a memory catalog.
128    fn new(
129        config: MemoryCatalogConfig,
130        storage_factory: Option<Arc<dyn StorageFactory>>,
131    ) -> Result<Self> {
132        // Use provided factory or default to MemoryStorageFactory
133        let factory = storage_factory.unwrap_or_else(|| Arc::new(MemoryStorageFactory));
134
135        Ok(Self {
136            root_namespace_state: Mutex::new(NamespaceState::default()),
137            file_io: FileIOBuilder::new(factory).with_props(config.props).build(),
138            warehouse_location: config.warehouse,
139        })
140    }
141
142    /// Loads a table from the locked namespace state.
143    async fn load_table_from_locked_state(
144        &self,
145        table_ident: &TableIdent,
146        root_namespace_state: &MutexGuard<'_, NamespaceState>,
147    ) -> Result<Table> {
148        let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?;
149        let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
150
151        Table::builder()
152            .identifier(table_ident.clone())
153            .metadata(metadata)
154            .metadata_location(metadata_location.to_string())
155            .file_io(self.file_io.clone())
156            .build()
157    }
158}
159
160#[async_trait]
161impl Catalog for MemoryCatalog {
162    /// List namespaces inside the catalog.
163    async fn list_namespaces(
164        &self,
165        maybe_parent: Option<&NamespaceIdent>,
166    ) -> Result<Vec<NamespaceIdent>> {
167        let root_namespace_state = self.root_namespace_state.lock().await;
168
169        match maybe_parent {
170            None => {
171                let namespaces = root_namespace_state
172                    .list_top_level_namespaces()
173                    .into_iter()
174                    .map(|str| NamespaceIdent::new(str.to_string()))
175                    .collect_vec();
176
177                Ok(namespaces)
178            }
179            Some(parent_namespace_ident) => {
180                let namespaces = root_namespace_state
181                    .list_namespaces_under(parent_namespace_ident)?
182                    .into_iter()
183                    .map(|name| {
184                        let mut names = parent_namespace_ident.iter().cloned().collect::<Vec<_>>();
185                        names.push(name.to_string());
186                        NamespaceIdent::from_vec(names)
187                    })
188                    .collect::<Result<Vec<_>>>()?;
189
190                Ok(namespaces)
191            }
192        }
193    }
194
195    /// Create a new namespace inside the catalog.
196    async fn create_namespace(
197        &self,
198        namespace_ident: &NamespaceIdent,
199        properties: HashMap<String, String>,
200    ) -> Result<Namespace> {
201        let mut root_namespace_state = self.root_namespace_state.lock().await;
202
203        root_namespace_state.insert_new_namespace(namespace_ident, properties.clone())?;
204        let namespace = Namespace::with_properties(namespace_ident.clone(), properties);
205
206        Ok(namespace)
207    }
208
209    /// Get a namespace information from the catalog.
210    async fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<Namespace> {
211        let root_namespace_state = self.root_namespace_state.lock().await;
212
213        let namespace = Namespace::with_properties(
214            namespace_ident.clone(),
215            root_namespace_state
216                .get_properties(namespace_ident)?
217                .clone(),
218        );
219
220        Ok(namespace)
221    }
222
223    /// Check if namespace exists in catalog.
224    async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> Result<bool> {
225        let guarded_namespaces = self.root_namespace_state.lock().await;
226
227        Ok(guarded_namespaces.namespace_exists(namespace_ident))
228    }
229
230    /// Update a namespace inside the catalog.
231    ///
232    /// # Behavior
233    ///
234    /// The properties must be the full set of namespace.
235    async fn update_namespace(
236        &self,
237        namespace_ident: &NamespaceIdent,
238        properties: HashMap<String, String>,
239    ) -> Result<()> {
240        let mut root_namespace_state = self.root_namespace_state.lock().await;
241
242        root_namespace_state.replace_properties(namespace_ident, properties)
243    }
244
245    /// Drop a namespace from the catalog.
246    async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<()> {
247        let mut root_namespace_state = self.root_namespace_state.lock().await;
248
249        root_namespace_state.remove_existing_namespace(namespace_ident)
250    }
251
252    /// List tables from namespace.
253    async fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result<Vec<TableIdent>> {
254        let root_namespace_state = self.root_namespace_state.lock().await;
255
256        let table_names = root_namespace_state.list_tables(namespace_ident)?;
257        let table_idents = table_names
258            .into_iter()
259            .map(|table_name| TableIdent::new(namespace_ident.clone(), table_name.clone()))
260            .collect_vec();
261
262        Ok(table_idents)
263    }
264
265    /// Create a new table inside the namespace.
266    async fn create_table(
267        &self,
268        namespace_ident: &NamespaceIdent,
269        table_creation: TableCreation,
270    ) -> Result<Table> {
271        let mut root_namespace_state = self.root_namespace_state.lock().await;
272
273        let table_name = table_creation.name.clone();
274        let table_ident = TableIdent::new(namespace_ident.clone(), table_name);
275
276        let (table_creation, location) = match table_creation.location.clone() {
277            Some(location) => (table_creation, location),
278            None => {
279                let namespace_properties = root_namespace_state.get_properties(namespace_ident)?;
280                let location_prefix = match namespace_properties.get(LOCATION) {
281                    Some(namespace_location) => namespace_location.clone(),
282                    None => format!("{}/{}", self.warehouse_location, namespace_ident.join("/")),
283                };
284
285                let location = format!("{}/{}", location_prefix, table_ident.name());
286
287                let new_table_creation = TableCreation {
288                    location: Some(location.clone()),
289                    ..table_creation
290                };
291
292                (new_table_creation, location)
293            }
294        };
295
296        let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
297            .build()?
298            .metadata;
299        let metadata_location = MetadataLocation::new_with_metadata(location, &metadata);
300
301        metadata.write_to(&self.file_io, &metadata_location).await?;
302
303        root_namespace_state.insert_new_table(&table_ident, metadata_location.to_string())?;
304
305        Table::builder()
306            .file_io(self.file_io.clone())
307            .metadata_location(metadata_location.to_string())
308            .metadata(metadata)
309            .identifier(table_ident)
310            .build()
311    }
312
313    /// Load table from the catalog.
314    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
315        let root_namespace_state = self.root_namespace_state.lock().await;
316
317        self.load_table_from_locked_state(table_ident, &root_namespace_state)
318            .await
319    }
320
321    /// Drop a table from the catalog.
322    async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> {
323        let mut root_namespace_state = self.root_namespace_state.lock().await;
324
325        root_namespace_state.remove_existing_table(table_ident)?;
326        Ok(())
327    }
328
329    async fn purge_table(&self, table_ident: &TableIdent) -> Result<()> {
330        let table_info = self.load_table(table_ident).await?;
331        self.drop_table(table_ident).await?;
332        crate::catalog::utils::drop_table_data(
333            table_info.file_io(),
334            table_info.metadata(),
335            table_info.metadata_location(),
336        )
337        .await
338    }
339
340    /// Check if a table exists in the catalog.
341    async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
342        let root_namespace_state = self.root_namespace_state.lock().await;
343
344        root_namespace_state.table_exists(table_ident)
345    }
346
347    /// Rename a table in the catalog.
348    async fn rename_table(
349        &self,
350        src_table_ident: &TableIdent,
351        dst_table_ident: &TableIdent,
352    ) -> Result<()> {
353        let mut root_namespace_state = self.root_namespace_state.lock().await;
354
355        let mut new_root_namespace_state = root_namespace_state.clone();
356        let metadata_location = new_root_namespace_state
357            .get_existing_table_location(src_table_ident)?
358            .clone();
359        new_root_namespace_state.remove_existing_table(src_table_ident)?;
360        new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?;
361        *root_namespace_state = new_root_namespace_state;
362
363        Ok(())
364    }
365
366    async fn register_table(
367        &self,
368        table_ident: &TableIdent,
369        metadata_location: String,
370    ) -> Result<Table> {
371        let mut root_namespace_state = self.root_namespace_state.lock().await;
372        root_namespace_state.insert_new_table(&table_ident.clone(), metadata_location.clone())?;
373
374        let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
375
376        Table::builder()
377            .file_io(self.file_io.clone())
378            .metadata_location(metadata_location)
379            .metadata(metadata)
380            .identifier(table_ident.clone())
381            .build()
382    }
383
384    /// Update a table in the catalog.
385    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
386        let mut root_namespace_state = self.root_namespace_state.lock().await;
387
388        let current_table = self
389            .load_table_from_locked_state(commit.identifier(), &root_namespace_state)
390            .await?;
391
392        // Apply TableCommit to get staged table
393        let staged_table = commit.apply(current_table)?;
394
395        // Write table metadata to the new location
396        let metadata_location =
397            MetadataLocation::from_str(staged_table.metadata_location_result()?)?;
398        staged_table
399            .metadata()
400            .write_to(staged_table.file_io(), &metadata_location)
401            .await?;
402
403        // Flip the pointer to reference the new metadata file.
404        let updated_table = root_namespace_state.commit_table_update(staged_table)?;
405
406        Ok(updated_table)
407    }
408}
409
410#[cfg(test)]
411pub(crate) mod tests {
412    use std::collections::HashSet;
413    use std::hash::Hash;
414    use std::iter::FromIterator;
415    use std::vec;
416
417    use regex::Regex;
418    use tempfile::TempDir;
419
420    use super::*;
421    use crate::io::FileIO;
422    use crate::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
423    use crate::transaction::{ApplyTransactionAction, Transaction};
424
425    fn temp_path() -> String {
426        let temp_dir = TempDir::new().unwrap();
427        temp_dir.path().to_str().unwrap().to_string()
428    }
429
430    pub(crate) async fn new_memory_catalog() -> impl Catalog {
431        let warehouse_location = temp_path();
432        MemoryCatalogBuilder::default()
433            .load(
434                "memory",
435                HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_location)]),
436            )
437            .await
438            .unwrap()
439    }
440
441    async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
442        let _ = catalog
443            .create_namespace(namespace_ident, HashMap::new())
444            .await
445            .unwrap();
446    }
447
448    async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
449        for namespace_ident in namespace_idents {
450            let _ = create_namespace(catalog, namespace_ident).await;
451        }
452    }
453
454    fn to_set<T: Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
455        HashSet::from_iter(vec)
456    }
457
458    fn simple_table_schema() -> Schema {
459        Schema::builder()
460            .with_fields(vec![
461                NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
462            ])
463            .build()
464            .unwrap()
465    }
466
467    async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) -> Table {
468        catalog
469            .create_table(
470                &table_ident.namespace,
471                TableCreation::builder()
472                    .name(table_ident.name().into())
473                    .schema(simple_table_schema())
474                    .build(),
475            )
476            .await
477            .unwrap()
478    }
479
480    async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
481        for table_ident in table_idents {
482            create_table(catalog, table_ident).await;
483        }
484    }
485
486    async fn create_table_with_namespace<C: Catalog>(catalog: &C) -> Table {
487        let namespace_ident = NamespaceIdent::new("abc".into());
488        create_namespace(catalog, &namespace_ident).await;
489
490        let table_ident = TableIdent::new(namespace_ident, "test".to_string());
491        create_table(catalog, &table_ident).await
492    }
493
494    fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
495        assert_eq!(table.identifier(), expected_table_ident);
496
497        let metadata = table.metadata();
498
499        assert_eq!(metadata.current_schema().as_ref(), expected_schema);
500
501        let expected_partition_spec = PartitionSpec::builder((*expected_schema).clone())
502            .with_spec_id(0)
503            .build()
504            .unwrap();
505
506        assert_eq!(
507            metadata
508                .partition_specs_iter()
509                .map(|p| p.as_ref())
510                .collect_vec(),
511            vec![&expected_partition_spec]
512        );
513
514        let expected_sorted_order = SortOrder::builder()
515            .with_order_id(0)
516            .with_fields(vec![])
517            .build(expected_schema)
518            .unwrap();
519
520        assert_eq!(
521            metadata
522                .sort_orders_iter()
523                .map(|s| s.as_ref())
524                .collect_vec(),
525            vec![&expected_sorted_order]
526        );
527
528        assert_eq!(metadata.properties(), &HashMap::new());
529
530        assert!(!table.readonly());
531    }
532
533    const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
534
535    fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
536        let actual = table.metadata_location().unwrap().to_string();
537        let regex = Regex::new(regex_str).unwrap();
538        assert!(
539            regex.is_match(&actual),
540            "Expected metadata location to match regex, but got location: {actual} and regex: {regex}"
541        )
542    }
543
544    #[tokio::test]
545    async fn test_list_namespaces_returns_empty_vector() {
546        let catalog = new_memory_catalog().await;
547
548        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
549    }
550
551    #[tokio::test]
552    async fn test_list_namespaces_returns_single_namespace() {
553        let catalog = new_memory_catalog().await;
554        let namespace_ident = NamespaceIdent::new("abc".into());
555        create_namespace(&catalog, &namespace_ident).await;
556
557        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
558            namespace_ident
559        ]);
560    }
561
562    #[tokio::test]
563    async fn test_list_namespaces_returns_multiple_namespaces() {
564        let catalog = new_memory_catalog().await;
565        let namespace_ident_1 = NamespaceIdent::new("a".into());
566        let namespace_ident_2 = NamespaceIdent::new("b".into());
567        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
568
569        assert_eq!(
570            to_set(catalog.list_namespaces(None).await.unwrap()),
571            to_set(vec![namespace_ident_1, namespace_ident_2])
572        );
573    }
574
575    #[tokio::test]
576    async fn test_list_namespaces_returns_only_top_level_namespaces() {
577        let catalog = new_memory_catalog().await;
578        let namespace_ident_1 = NamespaceIdent::new("a".into());
579        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
580        let namespace_ident_3 = NamespaceIdent::new("b".into());
581        create_namespaces(&catalog, &vec![
582            &namespace_ident_1,
583            &namespace_ident_2,
584            &namespace_ident_3,
585        ])
586        .await;
587
588        assert_eq!(
589            to_set(catalog.list_namespaces(None).await.unwrap()),
590            to_set(vec![namespace_ident_1, namespace_ident_3])
591        );
592    }
593
594    #[tokio::test]
595    async fn test_list_namespaces_returns_no_namespaces_under_parent() {
596        let catalog = new_memory_catalog().await;
597        let namespace_ident_1 = NamespaceIdent::new("a".into());
598        let namespace_ident_2 = NamespaceIdent::new("b".into());
599        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
600
601        assert_eq!(
602            catalog
603                .list_namespaces(Some(&namespace_ident_1))
604                .await
605                .unwrap(),
606            vec![]
607        );
608    }
609
610    #[tokio::test]
611    async fn test_list_namespaces_returns_namespace_under_parent() {
612        let catalog = new_memory_catalog().await;
613        let namespace_ident_1 = NamespaceIdent::new("a".into());
614        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
615        let namespace_ident_3 = NamespaceIdent::new("c".into());
616        create_namespaces(&catalog, &vec![
617            &namespace_ident_1,
618            &namespace_ident_2,
619            &namespace_ident_3,
620        ])
621        .await;
622
623        assert_eq!(
624            to_set(catalog.list_namespaces(None).await.unwrap()),
625            to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
626        );
627
628        assert_eq!(
629            catalog
630                .list_namespaces(Some(&namespace_ident_1))
631                .await
632                .unwrap(),
633            vec![namespace_ident_2]
634        );
635    }
636
637    #[tokio::test]
638    async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
639        let catalog = new_memory_catalog().await;
640        let namespace_ident_1 = NamespaceIdent::new("a".to_string());
641        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
642        let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
643        let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
644        let namespace_ident_5 = NamespaceIdent::new("b".into());
645        create_namespaces(&catalog, &vec![
646            &namespace_ident_1,
647            &namespace_ident_2,
648            &namespace_ident_3,
649            &namespace_ident_4,
650            &namespace_ident_5,
651        ])
652        .await;
653
654        assert_eq!(
655            to_set(
656                catalog
657                    .list_namespaces(Some(&namespace_ident_1))
658                    .await
659                    .unwrap()
660            ),
661            to_set(vec![
662                namespace_ident_2,
663                namespace_ident_3,
664                namespace_ident_4,
665            ])
666        );
667    }
668
669    #[tokio::test]
670    async fn test_namespace_exists_returns_false() {
671        let catalog = new_memory_catalog().await;
672        let namespace_ident = NamespaceIdent::new("a".into());
673        create_namespace(&catalog, &namespace_ident).await;
674
675        assert!(
676            !catalog
677                .namespace_exists(&NamespaceIdent::new("b".into()))
678                .await
679                .unwrap()
680        );
681    }
682
683    #[tokio::test]
684    async fn test_namespace_exists_returns_true() {
685        let catalog = new_memory_catalog().await;
686        let namespace_ident = NamespaceIdent::new("a".into());
687        create_namespace(&catalog, &namespace_ident).await;
688
689        assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
690    }
691
692    #[tokio::test]
693    async fn test_create_namespace_with_empty_properties() {
694        let catalog = new_memory_catalog().await;
695        let namespace_ident = NamespaceIdent::new("a".into());
696
697        assert_eq!(
698            catalog
699                .create_namespace(&namespace_ident, HashMap::new())
700                .await
701                .unwrap(),
702            Namespace::new(namespace_ident.clone())
703        );
704
705        assert_eq!(
706            catalog.get_namespace(&namespace_ident).await.unwrap(),
707            Namespace::with_properties(namespace_ident, HashMap::new())
708        );
709    }
710
711    #[tokio::test]
712    async fn test_create_namespace_with_properties() {
713        let catalog = new_memory_catalog().await;
714        let namespace_ident = NamespaceIdent::new("abc".into());
715
716        let mut properties: HashMap<String, String> = HashMap::new();
717        properties.insert("k".into(), "v".into());
718
719        assert_eq!(
720            catalog
721                .create_namespace(&namespace_ident, properties.clone())
722                .await
723                .unwrap(),
724            Namespace::with_properties(namespace_ident.clone(), properties.clone())
725        );
726
727        assert_eq!(
728            catalog.get_namespace(&namespace_ident).await.unwrap(),
729            Namespace::with_properties(namespace_ident, properties)
730        );
731    }
732
733    #[tokio::test]
734    async fn test_create_namespace_throws_error_if_namespace_already_exists() {
735        let catalog = new_memory_catalog().await;
736        let namespace_ident = NamespaceIdent::new("a".into());
737        create_namespace(&catalog, &namespace_ident).await;
738
739        assert_eq!(
740            catalog
741                .create_namespace(&namespace_ident, HashMap::new())
742                .await
743                .unwrap_err()
744                .to_string(),
745            format!(
746                "NamespaceAlreadyExists => Cannot create namespace {:?}. Namespace already exists.",
747                &namespace_ident
748            )
749        );
750
751        assert_eq!(
752            catalog.get_namespace(&namespace_ident).await.unwrap(),
753            Namespace::with_properties(namespace_ident, HashMap::new())
754        );
755    }
756
757    #[tokio::test]
758    async fn test_create_nested_namespace() {
759        let catalog = new_memory_catalog().await;
760        let parent_namespace_ident = NamespaceIdent::new("a".into());
761        create_namespace(&catalog, &parent_namespace_ident).await;
762
763        let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
764
765        assert_eq!(
766            catalog
767                .create_namespace(&child_namespace_ident, HashMap::new())
768                .await
769                .unwrap(),
770            Namespace::new(child_namespace_ident.clone())
771        );
772
773        assert_eq!(
774            catalog.get_namespace(&child_namespace_ident).await.unwrap(),
775            Namespace::with_properties(child_namespace_ident, HashMap::new())
776        );
777    }
778
779    #[tokio::test]
780    async fn test_create_deeply_nested_namespace() {
781        let catalog = new_memory_catalog().await;
782        let namespace_ident_a = NamespaceIdent::new("a".into());
783        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
784        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
785
786        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
787
788        assert_eq!(
789            catalog
790                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
791                .await
792                .unwrap(),
793            Namespace::new(namespace_ident_a_b_c.clone())
794        );
795
796        assert_eq!(
797            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
798            Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
799        );
800    }
801
802    #[tokio::test]
803    async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() {
804        let catalog = new_memory_catalog().await;
805
806        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
807
808        assert_eq!(
809            catalog
810                .create_namespace(&nested_namespace_ident, HashMap::new())
811                .await
812                .unwrap_err()
813                .to_string(),
814            format!(
815                "NamespaceNotFound => No such namespace: {:?}",
816                NamespaceIdent::new("a".into())
817            )
818        );
819
820        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
821    }
822
823    #[tokio::test]
824    async fn test_create_deeply_nested_namespace_throws_error_if_intermediate_namespace_doesnt_exist()
825     {
826        let catalog = new_memory_catalog().await;
827
828        let namespace_ident_a = NamespaceIdent::new("a".into());
829        create_namespace(&catalog, &namespace_ident_a).await;
830
831        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
832
833        assert_eq!(
834            catalog
835                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
836                .await
837                .unwrap_err()
838                .to_string(),
839            format!(
840                "NamespaceNotFound => No such namespace: {:?}",
841                NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()
842            )
843        );
844
845        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
846            namespace_ident_a.clone()
847        ]);
848
849        assert_eq!(
850            catalog
851                .list_namespaces(Some(&namespace_ident_a))
852                .await
853                .unwrap(),
854            vec![]
855        );
856    }
857
858    #[tokio::test]
859    async fn test_get_namespace() {
860        let catalog = new_memory_catalog().await;
861        let namespace_ident = NamespaceIdent::new("abc".into());
862
863        let mut properties: HashMap<String, String> = HashMap::new();
864        properties.insert("k".into(), "v".into());
865        let _ = catalog
866            .create_namespace(&namespace_ident, properties.clone())
867            .await
868            .unwrap();
869
870        assert_eq!(
871            catalog.get_namespace(&namespace_ident).await.unwrap(),
872            Namespace::with_properties(namespace_ident, properties)
873        )
874    }
875
876    #[tokio::test]
877    async fn test_get_nested_namespace() {
878        let catalog = new_memory_catalog().await;
879        let namespace_ident_a = NamespaceIdent::new("a".into());
880        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
881        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
882
883        assert_eq!(
884            catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
885            Namespace::with_properties(namespace_ident_a_b, HashMap::new())
886        );
887    }
888
889    #[tokio::test]
890    async fn test_get_deeply_nested_namespace() {
891        let catalog = new_memory_catalog().await;
892        let namespace_ident_a = NamespaceIdent::new("a".into());
893        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
894        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
895        create_namespaces(&catalog, &vec![
896            &namespace_ident_a,
897            &namespace_ident_a_b,
898            &namespace_ident_a_b_c,
899        ])
900        .await;
901
902        assert_eq!(
903            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
904            Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
905        );
906    }
907
908    #[tokio::test]
909    async fn test_get_namespace_throws_error_if_namespace_doesnt_exist() {
910        let catalog = new_memory_catalog().await;
911        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
912
913        let non_existent_namespace_ident = NamespaceIdent::new("b".into());
914        assert_eq!(
915            catalog
916                .get_namespace(&non_existent_namespace_ident)
917                .await
918                .unwrap_err()
919                .to_string(),
920            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
921        )
922    }
923
924    #[tokio::test]
925    async fn test_update_namespace() {
926        let catalog = new_memory_catalog().await;
927        let namespace_ident = NamespaceIdent::new("abc".into());
928        create_namespace(&catalog, &namespace_ident).await;
929
930        let mut new_properties: HashMap<String, String> = HashMap::new();
931        new_properties.insert("k".into(), "v".into());
932
933        catalog
934            .update_namespace(&namespace_ident, new_properties.clone())
935            .await
936            .unwrap();
937
938        assert_eq!(
939            catalog.get_namespace(&namespace_ident).await.unwrap(),
940            Namespace::with_properties(namespace_ident, new_properties)
941        )
942    }
943
944    #[tokio::test]
945    async fn test_update_nested_namespace() {
946        let catalog = new_memory_catalog().await;
947        let namespace_ident_a = NamespaceIdent::new("a".into());
948        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
949        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
950
951        let mut new_properties = HashMap::new();
952        new_properties.insert("k".into(), "v".into());
953
954        catalog
955            .update_namespace(&namespace_ident_a_b, new_properties.clone())
956            .await
957            .unwrap();
958
959        assert_eq!(
960            catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
961            Namespace::with_properties(namespace_ident_a_b, new_properties)
962        );
963    }
964
965    #[tokio::test]
966    async fn test_update_deeply_nested_namespace() {
967        let catalog = new_memory_catalog().await;
968        let namespace_ident_a = NamespaceIdent::new("a".into());
969        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
970        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
971        create_namespaces(&catalog, &vec![
972            &namespace_ident_a,
973            &namespace_ident_a_b,
974            &namespace_ident_a_b_c,
975        ])
976        .await;
977
978        let mut new_properties = HashMap::new();
979        new_properties.insert("k".into(), "v".into());
980
981        catalog
982            .update_namespace(&namespace_ident_a_b_c, new_properties.clone())
983            .await
984            .unwrap();
985
986        assert_eq!(
987            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
988            Namespace::with_properties(namespace_ident_a_b_c, new_properties)
989        );
990    }
991
992    #[tokio::test]
993    async fn test_update_namespace_throws_error_if_namespace_doesnt_exist() {
994        let catalog = new_memory_catalog().await;
995        create_namespace(&catalog, &NamespaceIdent::new("abc".into())).await;
996
997        let non_existent_namespace_ident = NamespaceIdent::new("def".into());
998        assert_eq!(
999            catalog
1000                .update_namespace(&non_existent_namespace_ident, HashMap::new())
1001                .await
1002                .unwrap_err()
1003                .to_string(),
1004            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1005        )
1006    }
1007
1008    #[tokio::test]
1009    async fn test_drop_namespace() {
1010        let catalog = new_memory_catalog().await;
1011        let namespace_ident = NamespaceIdent::new("abc".into());
1012        create_namespace(&catalog, &namespace_ident).await;
1013
1014        catalog.drop_namespace(&namespace_ident).await.unwrap();
1015
1016        assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
1017    }
1018
1019    #[tokio::test]
1020    async fn test_drop_nested_namespace() {
1021        let catalog = new_memory_catalog().await;
1022        let namespace_ident_a = NamespaceIdent::new("a".into());
1023        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1024        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1025
1026        catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1027
1028        assert!(
1029            !catalog
1030                .namespace_exists(&namespace_ident_a_b)
1031                .await
1032                .unwrap()
1033        );
1034
1035        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1036    }
1037
1038    #[tokio::test]
1039    async fn test_drop_deeply_nested_namespace() {
1040        let catalog = new_memory_catalog().await;
1041        let namespace_ident_a = NamespaceIdent::new("a".into());
1042        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1043        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1044        create_namespaces(&catalog, &vec![
1045            &namespace_ident_a,
1046            &namespace_ident_a_b,
1047            &namespace_ident_a_b_c,
1048        ])
1049        .await;
1050
1051        catalog
1052            .drop_namespace(&namespace_ident_a_b_c)
1053            .await
1054            .unwrap();
1055
1056        assert!(
1057            !catalog
1058                .namespace_exists(&namespace_ident_a_b_c)
1059                .await
1060                .unwrap()
1061        );
1062
1063        assert!(
1064            catalog
1065                .namespace_exists(&namespace_ident_a_b)
1066                .await
1067                .unwrap()
1068        );
1069
1070        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1071    }
1072
1073    #[tokio::test]
1074    async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1075        let catalog = new_memory_catalog().await;
1076
1077        let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1078        assert_eq!(
1079            catalog
1080                .drop_namespace(&non_existent_namespace_ident)
1081                .await
1082                .unwrap_err()
1083                .to_string(),
1084            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1085        )
1086    }
1087
1088    #[tokio::test]
1089    async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1090        let catalog = new_memory_catalog().await;
1091        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1092
1093        let non_existent_namespace_ident =
1094            NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1095        assert_eq!(
1096            catalog
1097                .drop_namespace(&non_existent_namespace_ident)
1098                .await
1099                .unwrap_err()
1100                .to_string(),
1101            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1102        )
1103    }
1104
1105    #[tokio::test]
1106    async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() {
1107        let catalog = new_memory_catalog().await;
1108        let namespace_ident_a = NamespaceIdent::new("a".into());
1109        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1110        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1111
1112        catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1113
1114        assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1115
1116        assert!(
1117            !catalog
1118                .namespace_exists(&namespace_ident_a_b)
1119                .await
1120                .unwrap()
1121        );
1122    }
1123
1124    #[tokio::test]
1125    async fn test_create_table_with_location() {
1126        let tmp_dir = TempDir::new().unwrap();
1127        let catalog = new_memory_catalog().await;
1128        let namespace_ident = NamespaceIdent::new("a".into());
1129        create_namespace(&catalog, &namespace_ident).await;
1130
1131        let table_name = "abc";
1132        let location = tmp_dir.path().to_str().unwrap().to_string();
1133        let table_creation = TableCreation::builder()
1134            .name(table_name.into())
1135            .location(location.clone())
1136            .schema(simple_table_schema())
1137            .build();
1138
1139        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1140
1141        assert_table_eq(
1142            &catalog
1143                .create_table(&namespace_ident, table_creation)
1144                .await
1145                .unwrap(),
1146            &expected_table_ident,
1147            &simple_table_schema(),
1148        );
1149
1150        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1151
1152        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1153
1154        assert!(
1155            table
1156                .metadata_location()
1157                .unwrap()
1158                .to_string()
1159                .starts_with(&location)
1160        )
1161    }
1162
1163    #[tokio::test]
1164    async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1165        let warehouse_location = temp_path();
1166        let catalog = MemoryCatalogBuilder::default()
1167            .load(
1168                "memory",
1169                HashMap::from([(
1170                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1171                    warehouse_location.clone(),
1172                )]),
1173            )
1174            .await
1175            .unwrap();
1176
1177        let namespace_ident = NamespaceIdent::new("a".into());
1178        let mut namespace_properties = HashMap::new();
1179        let namespace_location = temp_path();
1180        namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1181        catalog
1182            .create_namespace(&namespace_ident, namespace_properties)
1183            .await
1184            .unwrap();
1185
1186        let table_name = "tbl1";
1187        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1188        let expected_table_metadata_location_regex =
1189            format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1190
1191        let table = catalog
1192            .create_table(
1193                &namespace_ident,
1194                TableCreation::builder()
1195                    .name(table_name.into())
1196                    .schema(simple_table_schema())
1197                    // no location specified for table
1198                    .build(),
1199            )
1200            .await
1201            .unwrap();
1202        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1203        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1204
1205        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1206        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1207        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1208    }
1209
1210    #[tokio::test]
1211    async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1212     {
1213        let warehouse_location = temp_path();
1214        let catalog = MemoryCatalogBuilder::default()
1215            .load(
1216                "memory",
1217                HashMap::from([(
1218                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1219                    warehouse_location.clone(),
1220                )]),
1221            )
1222            .await
1223            .unwrap();
1224
1225        let namespace_ident = NamespaceIdent::new("a".into());
1226        let mut namespace_properties = HashMap::new();
1227        let namespace_location = temp_path();
1228        namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1229        catalog
1230            .create_namespace(&namespace_ident, namespace_properties)
1231            .await
1232            .unwrap();
1233
1234        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1235        let mut nested_namespace_properties = HashMap::new();
1236        let nested_namespace_location = temp_path();
1237        nested_namespace_properties
1238            .insert(LOCATION.to_string(), nested_namespace_location.to_string());
1239        catalog
1240            .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1241            .await
1242            .unwrap();
1243
1244        let table_name = "tbl1";
1245        let expected_table_ident =
1246            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1247        let expected_table_metadata_location_regex = format!(
1248            "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1249        );
1250
1251        let table = catalog
1252            .create_table(
1253                &nested_namespace_ident,
1254                TableCreation::builder()
1255                    .name(table_name.into())
1256                    .schema(simple_table_schema())
1257                    // no location specified for table
1258                    .build(),
1259            )
1260            .await
1261            .unwrap();
1262        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1263        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1264
1265        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1266        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1267        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1268    }
1269
1270    #[tokio::test]
1271    async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1272     {
1273        let warehouse_location = temp_path();
1274        let catalog = MemoryCatalogBuilder::default()
1275            .load(
1276                "memory",
1277                HashMap::from([(
1278                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1279                    warehouse_location.clone(),
1280                )]),
1281            )
1282            .await
1283            .unwrap();
1284
1285        let namespace_ident = NamespaceIdent::new("a".into());
1286        // note: no location specified in namespace_properties
1287        let namespace_properties = HashMap::new();
1288        catalog
1289            .create_namespace(&namespace_ident, namespace_properties)
1290            .await
1291            .unwrap();
1292
1293        let table_name = "tbl1";
1294        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1295        let expected_table_metadata_location_regex =
1296            format!("^{warehouse_location}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1297
1298        let table = catalog
1299            .create_table(
1300                &namespace_ident,
1301                TableCreation::builder()
1302                    .name(table_name.into())
1303                    .schema(simple_table_schema())
1304                    // no location specified for table
1305                    .build(),
1306            )
1307            .await
1308            .unwrap();
1309        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1310        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1311
1312        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1313        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1314        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1315    }
1316
1317    #[tokio::test]
1318    async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1319     {
1320        let warehouse_location = temp_path();
1321        let catalog = MemoryCatalogBuilder::default()
1322            .load(
1323                "memory",
1324                HashMap::from([(
1325                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1326                    warehouse_location.clone(),
1327                )]),
1328            )
1329            .await
1330            .unwrap();
1331
1332        let namespace_ident = NamespaceIdent::new("a".into());
1333        catalog
1334            // note: no location specified in namespace_properties
1335            .create_namespace(&namespace_ident, HashMap::new())
1336            .await
1337            .unwrap();
1338
1339        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1340        catalog
1341            // note: no location specified in namespace_properties
1342            .create_namespace(&nested_namespace_ident, HashMap::new())
1343            .await
1344            .unwrap();
1345
1346        let table_name = "tbl1";
1347        let expected_table_ident =
1348            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1349        let expected_table_metadata_location_regex = format!(
1350            "^{warehouse_location}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$"
1351        );
1352
1353        let table = catalog
1354            .create_table(
1355                &nested_namespace_ident,
1356                TableCreation::builder()
1357                    .name(table_name.into())
1358                    .schema(simple_table_schema())
1359                    // no location specified for table
1360                    .build(),
1361            )
1362            .await
1363            .unwrap();
1364        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1365        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1366
1367        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1368        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1369        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1370    }
1371
1372    #[tokio::test]
1373    async fn test_create_table_throws_error_if_table_location_and_namespace_location_and_warehouse_location_are_missing()
1374     {
1375        let catalog = MemoryCatalogBuilder::default()
1376            .load("memory", HashMap::from([]))
1377            .await;
1378
1379        assert!(catalog.is_err());
1380        assert_eq!(
1381            catalog.unwrap_err().to_string(),
1382            "DataInvalid => Catalog warehouse is required"
1383        );
1384    }
1385
1386    #[tokio::test]
1387    async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1388        let catalog = new_memory_catalog().await;
1389        let namespace_ident = NamespaceIdent::new("a".into());
1390        create_namespace(&catalog, &namespace_ident).await;
1391        let table_name = "tbl1";
1392        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1393        create_table(&catalog, &table_ident).await;
1394
1395        let tmp_dir = TempDir::new().unwrap();
1396        let location = tmp_dir.path().to_str().unwrap().to_string();
1397
1398        assert_eq!(
1399            catalog
1400                .create_table(
1401                    &namespace_ident,
1402                    TableCreation::builder()
1403                        .name(table_name.into())
1404                        .schema(simple_table_schema())
1405                        .location(location)
1406                        .build()
1407                )
1408                .await
1409                .unwrap_err()
1410                .to_string(),
1411            format!(
1412                "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1413                &table_ident
1414            )
1415        );
1416    }
1417
1418    #[tokio::test]
1419    async fn test_list_tables_returns_empty_vector() {
1420        let catalog = new_memory_catalog().await;
1421        let namespace_ident = NamespaceIdent::new("a".into());
1422        create_namespace(&catalog, &namespace_ident).await;
1423
1424        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1425    }
1426
1427    #[tokio::test]
1428    async fn test_list_tables_returns_a_single_table() {
1429        let catalog = new_memory_catalog().await;
1430        let namespace_ident = NamespaceIdent::new("n1".into());
1431        create_namespace(&catalog, &namespace_ident).await;
1432
1433        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1434        create_table(&catalog, &table_ident).await;
1435
1436        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1437            table_ident
1438        ]);
1439    }
1440
1441    #[tokio::test]
1442    async fn test_list_tables_returns_multiple_tables() {
1443        let catalog = new_memory_catalog().await;
1444        let namespace_ident = NamespaceIdent::new("n1".into());
1445        create_namespace(&catalog, &namespace_ident).await;
1446
1447        let table_ident_1 = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1448        let table_ident_2 = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1449        let _ = create_tables(&catalog, vec![&table_ident_1, &table_ident_2]).await;
1450
1451        assert_eq!(
1452            to_set(catalog.list_tables(&namespace_ident).await.unwrap()),
1453            to_set(vec![table_ident_1, table_ident_2])
1454        );
1455    }
1456
1457    #[tokio::test]
1458    async fn test_list_tables_returns_tables_from_correct_namespace() {
1459        let catalog = new_memory_catalog().await;
1460        let namespace_ident_1 = NamespaceIdent::new("n1".into());
1461        let namespace_ident_2 = NamespaceIdent::new("n2".into());
1462        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1463
1464        let table_ident_1 = TableIdent::new(namespace_ident_1.clone(), "tbl1".into());
1465        let table_ident_2 = TableIdent::new(namespace_ident_1.clone(), "tbl2".into());
1466        let table_ident_3 = TableIdent::new(namespace_ident_2.clone(), "tbl1".into());
1467        let _ = create_tables(&catalog, vec![
1468            &table_ident_1,
1469            &table_ident_2,
1470            &table_ident_3,
1471        ])
1472        .await;
1473
1474        assert_eq!(
1475            to_set(catalog.list_tables(&namespace_ident_1).await.unwrap()),
1476            to_set(vec![table_ident_1, table_ident_2])
1477        );
1478
1479        assert_eq!(
1480            to_set(catalog.list_tables(&namespace_ident_2).await.unwrap()),
1481            to_set(vec![table_ident_3])
1482        );
1483    }
1484
1485    #[tokio::test]
1486    async fn test_list_tables_returns_table_under_nested_namespace() {
1487        let catalog = new_memory_catalog().await;
1488        let namespace_ident_a = NamespaceIdent::new("a".into());
1489        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1490        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1491
1492        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1493        create_table(&catalog, &table_ident).await;
1494
1495        assert_eq!(
1496            catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1497            vec![table_ident]
1498        );
1499    }
1500
1501    #[tokio::test]
1502    async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1503        let catalog = new_memory_catalog().await;
1504
1505        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1506
1507        assert_eq!(
1508            catalog
1509                .list_tables(&non_existent_namespace_ident)
1510                .await
1511                .unwrap_err()
1512                .to_string(),
1513            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1514        );
1515    }
1516
1517    #[tokio::test]
1518    async fn test_drop_table() {
1519        let catalog = new_memory_catalog().await;
1520        let namespace_ident = NamespaceIdent::new("n1".into());
1521        create_namespace(&catalog, &namespace_ident).await;
1522        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1523        create_table(&catalog, &table_ident).await;
1524
1525        catalog.drop_table(&table_ident).await.unwrap();
1526    }
1527
1528    #[tokio::test]
1529    async fn test_drop_table_drops_table_under_nested_namespace() {
1530        let catalog = new_memory_catalog().await;
1531        let namespace_ident_a = NamespaceIdent::new("a".into());
1532        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1533        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1534
1535        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1536        create_table(&catalog, &table_ident).await;
1537
1538        catalog.drop_table(&table_ident).await.unwrap();
1539
1540        assert_eq!(
1541            catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1542            vec![]
1543        );
1544    }
1545
1546    #[tokio::test]
1547    async fn test_drop_table_throws_error_if_namespace_doesnt_exist() {
1548        let catalog = new_memory_catalog().await;
1549
1550        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1551        let non_existent_table_ident =
1552            TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1553
1554        assert_eq!(
1555            catalog
1556                .drop_table(&non_existent_table_ident)
1557                .await
1558                .unwrap_err()
1559                .to_string(),
1560            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1561        );
1562    }
1563
1564    #[tokio::test]
1565    async fn test_drop_table_throws_error_if_table_doesnt_exist() {
1566        let catalog = new_memory_catalog().await;
1567        let namespace_ident = NamespaceIdent::new("n1".into());
1568        create_namespace(&catalog, &namespace_ident).await;
1569
1570        let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1571
1572        assert_eq!(
1573            catalog
1574                .drop_table(&non_existent_table_ident)
1575                .await
1576                .unwrap_err()
1577                .to_string(),
1578            format!("TableNotFound => No such table: {non_existent_table_ident:?}"),
1579        );
1580    }
1581
1582    #[tokio::test]
1583    async fn test_table_exists_returns_true() {
1584        let catalog = new_memory_catalog().await;
1585        let namespace_ident = NamespaceIdent::new("n1".into());
1586        create_namespace(&catalog, &namespace_ident).await;
1587        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1588        create_table(&catalog, &table_ident).await;
1589
1590        assert!(catalog.table_exists(&table_ident).await.unwrap());
1591    }
1592
1593    #[tokio::test]
1594    async fn test_table_exists_returns_false() {
1595        let catalog = new_memory_catalog().await;
1596        let namespace_ident = NamespaceIdent::new("n1".into());
1597        create_namespace(&catalog, &namespace_ident).await;
1598        let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1599
1600        assert!(
1601            !catalog
1602                .table_exists(&non_existent_table_ident)
1603                .await
1604                .unwrap()
1605        );
1606    }
1607
1608    #[tokio::test]
1609    async fn test_table_exists_under_nested_namespace() {
1610        let catalog = new_memory_catalog().await;
1611        let namespace_ident_a = NamespaceIdent::new("a".into());
1612        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1613        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1614
1615        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1616        create_table(&catalog, &table_ident).await;
1617
1618        assert!(catalog.table_exists(&table_ident).await.unwrap());
1619
1620        let non_existent_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl2".into());
1621        assert!(
1622            !catalog
1623                .table_exists(&non_existent_table_ident)
1624                .await
1625                .unwrap()
1626        );
1627    }
1628
1629    #[tokio::test]
1630    async fn test_table_exists_throws_error_if_namespace_doesnt_exist() {
1631        let catalog = new_memory_catalog().await;
1632
1633        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1634        let non_existent_table_ident =
1635            TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1636
1637        assert_eq!(
1638            catalog
1639                .table_exists(&non_existent_table_ident)
1640                .await
1641                .unwrap_err()
1642                .to_string(),
1643            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1644        );
1645    }
1646
1647    #[tokio::test]
1648    async fn test_rename_table_in_same_namespace() {
1649        let catalog = new_memory_catalog().await;
1650        let namespace_ident = NamespaceIdent::new("n1".into());
1651        create_namespace(&catalog, &namespace_ident).await;
1652        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1653        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1654        create_table(&catalog, &src_table_ident).await;
1655
1656        catalog
1657            .rename_table(&src_table_ident, &dst_table_ident)
1658            .await
1659            .unwrap();
1660
1661        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1662            dst_table_ident
1663        ],);
1664    }
1665
1666    #[tokio::test]
1667    async fn test_rename_table_across_namespaces() {
1668        let catalog = new_memory_catalog().await;
1669        let src_namespace_ident = NamespaceIdent::new("a".into());
1670        let dst_namespace_ident = NamespaceIdent::new("b".into());
1671        create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
1672        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1673        let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
1674        create_table(&catalog, &src_table_ident).await;
1675
1676        catalog
1677            .rename_table(&src_table_ident, &dst_table_ident)
1678            .await
1679            .unwrap();
1680
1681        assert_eq!(
1682            catalog.list_tables(&src_namespace_ident).await.unwrap(),
1683            vec![],
1684        );
1685
1686        assert_eq!(
1687            catalog.list_tables(&dst_namespace_ident).await.unwrap(),
1688            vec![dst_table_ident],
1689        );
1690    }
1691
1692    #[tokio::test]
1693    async fn test_rename_table_src_table_is_same_as_dst_table() {
1694        let catalog = new_memory_catalog().await;
1695        let namespace_ident = NamespaceIdent::new("n1".into());
1696        create_namespace(&catalog, &namespace_ident).await;
1697        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1698        create_table(&catalog, &table_ident).await;
1699
1700        catalog
1701            .rename_table(&table_ident, &table_ident)
1702            .await
1703            .unwrap();
1704
1705        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1706            table_ident
1707        ],);
1708    }
1709
1710    #[tokio::test]
1711    async fn test_rename_table_across_nested_namespaces() {
1712        let catalog = new_memory_catalog().await;
1713        let namespace_ident_a = NamespaceIdent::new("a".into());
1714        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1715        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1716        create_namespaces(&catalog, &vec![
1717            &namespace_ident_a,
1718            &namespace_ident_a_b,
1719            &namespace_ident_a_b_c,
1720        ])
1721        .await;
1722
1723        let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
1724        create_tables(&catalog, vec![&src_table_ident]).await;
1725
1726        let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1727        catalog
1728            .rename_table(&src_table_ident, &dst_table_ident)
1729            .await
1730            .unwrap();
1731
1732        assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
1733
1734        assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
1735    }
1736
1737    #[tokio::test]
1738    async fn test_rename_table_throws_error_if_src_namespace_doesnt_exist() {
1739        let catalog = new_memory_catalog().await;
1740
1741        let non_existent_src_namespace_ident = NamespaceIdent::new("n1".into());
1742        let src_table_ident =
1743            TableIdent::new(non_existent_src_namespace_ident.clone(), "tbl1".into());
1744
1745        let dst_namespace_ident = NamespaceIdent::new("n2".into());
1746        create_namespace(&catalog, &dst_namespace_ident).await;
1747        let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl1".into());
1748
1749        assert_eq!(
1750            catalog
1751                .rename_table(&src_table_ident, &dst_table_ident)
1752                .await
1753                .unwrap_err()
1754                .to_string(),
1755            format!("NamespaceNotFound => No such namespace: {non_existent_src_namespace_ident:?}"),
1756        );
1757    }
1758
1759    #[tokio::test]
1760    async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
1761        let catalog = new_memory_catalog().await;
1762        let src_namespace_ident = NamespaceIdent::new("n1".into());
1763        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1764        create_namespace(&catalog, &src_namespace_ident).await;
1765        create_table(&catalog, &src_table_ident).await;
1766
1767        let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
1768        let dst_table_ident =
1769            TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
1770        assert_eq!(
1771            catalog
1772                .rename_table(&src_table_ident, &dst_table_ident)
1773                .await
1774                .unwrap_err()
1775                .to_string(),
1776            format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"),
1777        );
1778    }
1779
1780    #[tokio::test]
1781    async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
1782        let catalog = new_memory_catalog().await;
1783        let namespace_ident = NamespaceIdent::new("n1".into());
1784        create_namespace(&catalog, &namespace_ident).await;
1785        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1786        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1787
1788        assert_eq!(
1789            catalog
1790                .rename_table(&src_table_ident, &dst_table_ident)
1791                .await
1792                .unwrap_err()
1793                .to_string(),
1794            format!("TableNotFound => No such table: {src_table_ident:?}"),
1795        );
1796    }
1797
1798    #[tokio::test]
1799    async fn test_rename_table_throws_error_if_dst_table_already_exists() {
1800        let catalog = new_memory_catalog().await;
1801        let namespace_ident = NamespaceIdent::new("n1".into());
1802        create_namespace(&catalog, &namespace_ident).await;
1803        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1804        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1805        create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
1806
1807        assert_eq!(
1808            catalog
1809                .rename_table(&src_table_ident, &dst_table_ident)
1810                .await
1811                .unwrap_err()
1812                .to_string(),
1813            format!(
1814                "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1815                &dst_table_ident
1816            ),
1817        );
1818    }
1819
1820    #[tokio::test]
1821    async fn test_register_table() {
1822        // Create a catalog and namespace
1823        let catalog = new_memory_catalog().await;
1824        let namespace_ident = NamespaceIdent::new("test_namespace".into());
1825        create_namespace(&catalog, &namespace_ident).await;
1826
1827        // Create a table to get a valid metadata file
1828        let source_table_ident = TableIdent::new(namespace_ident.clone(), "source_table".into());
1829        create_table(&catalog, &source_table_ident).await;
1830
1831        // Get the metadata location from the source table
1832        let source_table = catalog.load_table(&source_table_ident).await.unwrap();
1833        let metadata_location = source_table.metadata_location().unwrap().to_string();
1834
1835        // Register a new table using the same metadata location
1836        let register_table_ident =
1837            TableIdent::new(namespace_ident.clone(), "register_table".into());
1838        let registered_table = catalog
1839            .register_table(&register_table_ident, metadata_location.clone())
1840            .await
1841            .unwrap();
1842
1843        // Verify the registered table has the correct identifier
1844        assert_eq!(registered_table.identifier(), &register_table_ident);
1845
1846        // Verify the registered table has the correct metadata location
1847        assert_eq!(
1848            registered_table.metadata_location().unwrap().to_string(),
1849            metadata_location
1850        );
1851
1852        // Verify the table exists in the catalog
1853        assert!(catalog.table_exists(&register_table_ident).await.unwrap());
1854
1855        // Verify we can load the registered table
1856        let loaded_table = catalog.load_table(&register_table_ident).await.unwrap();
1857        assert_eq!(loaded_table.identifier(), &register_table_ident);
1858        assert_eq!(
1859            loaded_table.metadata_location().unwrap().to_string(),
1860            metadata_location
1861        );
1862    }
1863
1864    #[tokio::test]
1865    async fn test_update_table() {
1866        let catalog = new_memory_catalog().await;
1867
1868        let table = create_table_with_namespace(&catalog).await;
1869
1870        // Assert the table doesn't contain the update yet
1871        assert!(!table.metadata().properties().contains_key("key"));
1872
1873        // Update table metadata
1874        let tx = Transaction::new(&table);
1875        let updated_table = tx
1876            .update_table_properties()
1877            .set("key".to_string(), "value".to_string())
1878            .apply(tx)
1879            .unwrap()
1880            .commit(&catalog)
1881            .await
1882            .unwrap();
1883
1884        assert_eq!(
1885            updated_table.metadata().properties().get("key").unwrap(),
1886            "value"
1887        );
1888
1889        assert_eq!(table.identifier(), updated_table.identifier());
1890        assert_eq!(table.metadata().uuid(), updated_table.metadata().uuid());
1891        assert!(table.metadata().last_updated_ms() < updated_table.metadata().last_updated_ms());
1892        assert_ne!(table.metadata_location(), updated_table.metadata_location());
1893
1894        assert!(
1895            table.metadata().metadata_log().len() < updated_table.metadata().metadata_log().len()
1896        );
1897    }
1898
1899    #[tokio::test]
1900    async fn test_update_table_fails_if_table_doesnt_exist() {
1901        let catalog = new_memory_catalog().await;
1902
1903        let namespace_ident = NamespaceIdent::new("a".into());
1904        create_namespace(&catalog, &namespace_ident).await;
1905
1906        // This table is not known to the catalog.
1907        let table_ident = TableIdent::new(namespace_ident, "test".to_string());
1908        let table = build_table(table_ident);
1909
1910        let tx = Transaction::new(&table);
1911        let err = tx
1912            .update_table_properties()
1913            .set("key".to_string(), "value".to_string())
1914            .apply(tx)
1915            .unwrap()
1916            .commit(&catalog)
1917            .await
1918            .unwrap_err();
1919        assert_eq!(err.kind(), ErrorKind::TableNotFound);
1920    }
1921
1922    fn build_table(ident: TableIdent) -> Table {
1923        let file_io = FileIO::new_with_fs();
1924
1925        let temp_dir = TempDir::new().unwrap();
1926        let location = temp_dir.path().to_str().unwrap().to_string();
1927
1928        let table_creation = TableCreation::builder()
1929            .name(ident.name().to_string())
1930            .schema(simple_table_schema())
1931            .location(location)
1932            .build();
1933        let metadata = TableMetadataBuilder::from_table_creation(table_creation)
1934            .unwrap()
1935            .build()
1936            .unwrap()
1937            .metadata;
1938
1939        Table::builder()
1940            .identifier(ident)
1941            .metadata(metadata)
1942            .file_io(file_io)
1943            .build()
1944            .unwrap()
1945    }
1946}