iceberg/catalog/memory/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains memory catalog implementation.
19
20use std::collections::HashMap;
21
22use async_trait::async_trait;
23use futures::lock::{Mutex, MutexGuard};
24use itertools::Itertools;
25
26use super::namespace_state::NamespaceState;
27use crate::io::FileIO;
28use crate::spec::{TableMetadata, TableMetadataBuilder};
29use crate::table::Table;
30use crate::{
31    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
32    TableCommit, TableCreation, TableIdent,
33};
34
35/// Memory catalog warehouse location
36pub const MEMORY_CATALOG_WAREHOUSE: &str = "warehouse";
37
38/// namespace `location` property
39const LOCATION: &str = "location";
40
41/// Builder for [`MemoryCatalog`].
42#[derive(Debug)]
43pub struct MemoryCatalogBuilder(MemoryCatalogConfig);
44
45impl Default for MemoryCatalogBuilder {
46    fn default() -> Self {
47        Self(MemoryCatalogConfig {
48            name: None,
49            warehouse: "".to_string(),
50            props: HashMap::new(),
51        })
52    }
53}
54
55impl CatalogBuilder for MemoryCatalogBuilder {
56    type C = MemoryCatalog;
57
58    fn load(
59        mut self,
60        name: impl Into<String>,
61        props: HashMap<String, String>,
62    ) -> impl Future<Output = Result<Self::C>> + Send {
63        self.0.name = Some(name.into());
64
65        if props.contains_key(MEMORY_CATALOG_WAREHOUSE) {
66            self.0.warehouse = props
67                .get(MEMORY_CATALOG_WAREHOUSE)
68                .cloned()
69                .unwrap_or_default()
70        }
71
72        // Collect other remaining properties
73        self.0.props = props
74            .into_iter()
75            .filter(|(k, _)| k != MEMORY_CATALOG_WAREHOUSE)
76            .collect();
77
78        let result = {
79            if self.0.name.is_none() {
80                Err(Error::new(
81                    ErrorKind::DataInvalid,
82                    "Catalog name is required",
83                ))
84            } else if self.0.warehouse.is_empty() {
85                Err(Error::new(
86                    ErrorKind::DataInvalid,
87                    "Catalog warehouse is required",
88                ))
89            } else {
90                MemoryCatalog::new(self.0)
91            }
92        };
93
94        std::future::ready(result)
95    }
96}
97
98#[derive(Clone, Debug)]
99pub(crate) struct MemoryCatalogConfig {
100    name: Option<String>,
101    warehouse: String,
102    props: HashMap<String, String>,
103}
104
105/// Memory catalog implementation.
106#[derive(Debug)]
107pub struct MemoryCatalog {
108    root_namespace_state: Mutex<NamespaceState>,
109    file_io: FileIO,
110    warehouse_location: String,
111}
112
113impl MemoryCatalog {
114    /// Creates a memory catalog.
115    fn new(config: MemoryCatalogConfig) -> Result<Self> {
116        Ok(Self {
117            root_namespace_state: Mutex::new(NamespaceState::default()),
118            file_io: FileIO::from_path(&config.warehouse)?
119                .with_props(config.props)
120                .build()?,
121            warehouse_location: config.warehouse,
122        })
123    }
124
125    /// Loads a table from the locked namespace state.
126    async fn load_table_from_locked_state(
127        &self,
128        table_ident: &TableIdent,
129        root_namespace_state: &MutexGuard<'_, NamespaceState>,
130    ) -> Result<Table> {
131        let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?;
132        let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
133
134        Table::builder()
135            .identifier(table_ident.clone())
136            .metadata(metadata)
137            .metadata_location(metadata_location.to_string())
138            .file_io(self.file_io.clone())
139            .build()
140    }
141}
142
143#[async_trait]
144impl Catalog for MemoryCatalog {
145    /// List namespaces inside the catalog.
146    async fn list_namespaces(
147        &self,
148        maybe_parent: Option<&NamespaceIdent>,
149    ) -> Result<Vec<NamespaceIdent>> {
150        let root_namespace_state = self.root_namespace_state.lock().await;
151
152        match maybe_parent {
153            None => {
154                let namespaces = root_namespace_state
155                    .list_top_level_namespaces()
156                    .into_iter()
157                    .map(|str| NamespaceIdent::new(str.to_string()))
158                    .collect_vec();
159
160                Ok(namespaces)
161            }
162            Some(parent_namespace_ident) => {
163                let namespaces = root_namespace_state
164                    .list_namespaces_under(parent_namespace_ident)?
165                    .into_iter()
166                    .map(|name| NamespaceIdent::new(name.to_string()))
167                    .collect_vec();
168
169                Ok(namespaces)
170            }
171        }
172    }
173
174    /// Create a new namespace inside the catalog.
175    async fn create_namespace(
176        &self,
177        namespace_ident: &NamespaceIdent,
178        properties: HashMap<String, String>,
179    ) -> Result<Namespace> {
180        let mut root_namespace_state = self.root_namespace_state.lock().await;
181
182        root_namespace_state.insert_new_namespace(namespace_ident, properties.clone())?;
183        let namespace = Namespace::with_properties(namespace_ident.clone(), properties);
184
185        Ok(namespace)
186    }
187
188    /// Get a namespace information from the catalog.
189    async fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<Namespace> {
190        let root_namespace_state = self.root_namespace_state.lock().await;
191
192        let namespace = Namespace::with_properties(
193            namespace_ident.clone(),
194            root_namespace_state
195                .get_properties(namespace_ident)?
196                .clone(),
197        );
198
199        Ok(namespace)
200    }
201
202    /// Check if namespace exists in catalog.
203    async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> Result<bool> {
204        let guarded_namespaces = self.root_namespace_state.lock().await;
205
206        Ok(guarded_namespaces.namespace_exists(namespace_ident))
207    }
208
209    /// Update a namespace inside the catalog.
210    ///
211    /// # Behavior
212    ///
213    /// The properties must be the full set of namespace.
214    async fn update_namespace(
215        &self,
216        namespace_ident: &NamespaceIdent,
217        properties: HashMap<String, String>,
218    ) -> Result<()> {
219        let mut root_namespace_state = self.root_namespace_state.lock().await;
220
221        root_namespace_state.replace_properties(namespace_ident, properties)
222    }
223
224    /// Drop a namespace from the catalog.
225    async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<()> {
226        let mut root_namespace_state = self.root_namespace_state.lock().await;
227
228        root_namespace_state.remove_existing_namespace(namespace_ident)
229    }
230
231    /// List tables from namespace.
232    async fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result<Vec<TableIdent>> {
233        let root_namespace_state = self.root_namespace_state.lock().await;
234
235        let table_names = root_namespace_state.list_tables(namespace_ident)?;
236        let table_idents = table_names
237            .into_iter()
238            .map(|table_name| TableIdent::new(namespace_ident.clone(), table_name.clone()))
239            .collect_vec();
240
241        Ok(table_idents)
242    }
243
244    /// Create a new table inside the namespace.
245    async fn create_table(
246        &self,
247        namespace_ident: &NamespaceIdent,
248        table_creation: TableCreation,
249    ) -> Result<Table> {
250        let mut root_namespace_state = self.root_namespace_state.lock().await;
251
252        let table_name = table_creation.name.clone();
253        let table_ident = TableIdent::new(namespace_ident.clone(), table_name);
254
255        let (table_creation, location) = match table_creation.location.clone() {
256            Some(location) => (table_creation, location),
257            None => {
258                let namespace_properties = root_namespace_state.get_properties(namespace_ident)?;
259                let location_prefix = match namespace_properties.get(LOCATION) {
260                    Some(namespace_location) => namespace_location.clone(),
261                    None => format!("{}/{}", self.warehouse_location, namespace_ident.join("/")),
262                };
263
264                let location = format!("{}/{}", location_prefix, table_ident.name());
265
266                let new_table_creation = TableCreation {
267                    location: Some(location.clone()),
268                    ..table_creation
269                };
270
271                (new_table_creation, location)
272            }
273        };
274
275        let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
276            .build()?
277            .metadata;
278        let metadata_location = MetadataLocation::new_with_table_location(location).to_string();
279
280        metadata.write_to(&self.file_io, &metadata_location).await?;
281
282        root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?;
283
284        Table::builder()
285            .file_io(self.file_io.clone())
286            .metadata_location(metadata_location)
287            .metadata(metadata)
288            .identifier(table_ident)
289            .build()
290    }
291
292    /// Load table from the catalog.
293    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
294        let root_namespace_state = self.root_namespace_state.lock().await;
295
296        self.load_table_from_locked_state(table_ident, &root_namespace_state)
297            .await
298    }
299
300    /// Drop a table from the catalog.
301    async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> {
302        let mut root_namespace_state = self.root_namespace_state.lock().await;
303
304        let metadata_location = root_namespace_state.remove_existing_table(table_ident)?;
305        self.file_io.delete(&metadata_location).await
306    }
307
308    /// Check if a table exists in the catalog.
309    async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
310        let root_namespace_state = self.root_namespace_state.lock().await;
311
312        root_namespace_state.table_exists(table_ident)
313    }
314
315    /// Rename a table in the catalog.
316    async fn rename_table(
317        &self,
318        src_table_ident: &TableIdent,
319        dst_table_ident: &TableIdent,
320    ) -> Result<()> {
321        let mut root_namespace_state = self.root_namespace_state.lock().await;
322
323        let mut new_root_namespace_state = root_namespace_state.clone();
324        let metadata_location = new_root_namespace_state
325            .get_existing_table_location(src_table_ident)?
326            .clone();
327        new_root_namespace_state.remove_existing_table(src_table_ident)?;
328        new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?;
329        *root_namespace_state = new_root_namespace_state;
330
331        Ok(())
332    }
333
334    async fn register_table(
335        &self,
336        table_ident: &TableIdent,
337        metadata_location: String,
338    ) -> Result<Table> {
339        let mut root_namespace_state = self.root_namespace_state.lock().await;
340        root_namespace_state.insert_new_table(&table_ident.clone(), metadata_location.clone())?;
341
342        let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
343
344        Table::builder()
345            .file_io(self.file_io.clone())
346            .metadata_location(metadata_location)
347            .metadata(metadata)
348            .identifier(table_ident.clone())
349            .build()
350    }
351
352    /// Update a table in the catalog.
353    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
354        let mut root_namespace_state = self.root_namespace_state.lock().await;
355
356        let current_table = self
357            .load_table_from_locked_state(commit.identifier(), &root_namespace_state)
358            .await?;
359
360        // Apply TableCommit to get staged table
361        let staged_table = commit.apply(current_table)?;
362
363        // Write table metadata to the new location
364        staged_table
365            .metadata()
366            .write_to(
367                staged_table.file_io(),
368                staged_table.metadata_location_result()?,
369            )
370            .await?;
371
372        // Flip the pointer to reference the new metadata file.
373        let updated_table = root_namespace_state.commit_table_update(staged_table)?;
374
375        Ok(updated_table)
376    }
377}
378
379#[cfg(test)]
380pub(crate) mod tests {
381    use std::collections::HashSet;
382    use std::hash::Hash;
383    use std::iter::FromIterator;
384    use std::vec;
385
386    use regex::Regex;
387    use tempfile::TempDir;
388
389    use super::*;
390    use crate::io::FileIOBuilder;
391    use crate::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
392    use crate::transaction::{ApplyTransactionAction, Transaction};
393
394    fn temp_path() -> String {
395        let temp_dir = TempDir::new().unwrap();
396        temp_dir.path().to_str().unwrap().to_string()
397    }
398
399    pub(crate) async fn new_memory_catalog() -> impl Catalog {
400        let warehouse_location = temp_path();
401        MemoryCatalogBuilder::default()
402            .load(
403                "memory",
404                HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_location)]),
405            )
406            .await
407            .unwrap()
408    }
409
410    async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
411        let _ = catalog
412            .create_namespace(namespace_ident, HashMap::new())
413            .await
414            .unwrap();
415    }
416
417    async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
418        for namespace_ident in namespace_idents {
419            let _ = create_namespace(catalog, namespace_ident).await;
420        }
421    }
422
423    fn to_set<T: Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
424        HashSet::from_iter(vec)
425    }
426
427    fn simple_table_schema() -> Schema {
428        Schema::builder()
429            .with_fields(vec![
430                NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
431            ])
432            .build()
433            .unwrap()
434    }
435
436    async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) -> Table {
437        catalog
438            .create_table(
439                &table_ident.namespace,
440                TableCreation::builder()
441                    .name(table_ident.name().into())
442                    .schema(simple_table_schema())
443                    .build(),
444            )
445            .await
446            .unwrap()
447    }
448
449    async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
450        for table_ident in table_idents {
451            create_table(catalog, table_ident).await;
452        }
453    }
454
455    async fn create_table_with_namespace<C: Catalog>(catalog: &C) -> Table {
456        let namespace_ident = NamespaceIdent::new("abc".into());
457        create_namespace(catalog, &namespace_ident).await;
458
459        let table_ident = TableIdent::new(namespace_ident, "test".to_string());
460        create_table(catalog, &table_ident).await
461    }
462
463    fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
464        assert_eq!(table.identifier(), expected_table_ident);
465
466        let metadata = table.metadata();
467
468        assert_eq!(metadata.current_schema().as_ref(), expected_schema);
469
470        let expected_partition_spec = PartitionSpec::builder((*expected_schema).clone())
471            .with_spec_id(0)
472            .build()
473            .unwrap();
474
475        assert_eq!(
476            metadata
477                .partition_specs_iter()
478                .map(|p| p.as_ref())
479                .collect_vec(),
480            vec![&expected_partition_spec]
481        );
482
483        let expected_sorted_order = SortOrder::builder()
484            .with_order_id(0)
485            .with_fields(vec![])
486            .build(expected_schema)
487            .unwrap();
488
489        assert_eq!(
490            metadata
491                .sort_orders_iter()
492                .map(|s| s.as_ref())
493                .collect_vec(),
494            vec![&expected_sorted_order]
495        );
496
497        assert_eq!(metadata.properties(), &HashMap::new());
498
499        assert!(!table.readonly());
500    }
501
502    const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
503
504    fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
505        let actual = table.metadata_location().unwrap().to_string();
506        let regex = Regex::new(regex_str).unwrap();
507        assert!(
508            regex.is_match(&actual),
509            "Expected metadata location to match regex, but got location: {actual} and regex: {regex}"
510        )
511    }
512
513    #[tokio::test]
514    async fn test_list_namespaces_returns_empty_vector() {
515        let catalog = new_memory_catalog().await;
516
517        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
518    }
519
520    #[tokio::test]
521    async fn test_list_namespaces_returns_single_namespace() {
522        let catalog = new_memory_catalog().await;
523        let namespace_ident = NamespaceIdent::new("abc".into());
524        create_namespace(&catalog, &namespace_ident).await;
525
526        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
527            namespace_ident
528        ]);
529    }
530
531    #[tokio::test]
532    async fn test_list_namespaces_returns_multiple_namespaces() {
533        let catalog = new_memory_catalog().await;
534        let namespace_ident_1 = NamespaceIdent::new("a".into());
535        let namespace_ident_2 = NamespaceIdent::new("b".into());
536        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
537
538        assert_eq!(
539            to_set(catalog.list_namespaces(None).await.unwrap()),
540            to_set(vec![namespace_ident_1, namespace_ident_2])
541        );
542    }
543
544    #[tokio::test]
545    async fn test_list_namespaces_returns_only_top_level_namespaces() {
546        let catalog = new_memory_catalog().await;
547        let namespace_ident_1 = NamespaceIdent::new("a".into());
548        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
549        let namespace_ident_3 = NamespaceIdent::new("b".into());
550        create_namespaces(&catalog, &vec![
551            &namespace_ident_1,
552            &namespace_ident_2,
553            &namespace_ident_3,
554        ])
555        .await;
556
557        assert_eq!(
558            to_set(catalog.list_namespaces(None).await.unwrap()),
559            to_set(vec![namespace_ident_1, namespace_ident_3])
560        );
561    }
562
563    #[tokio::test]
564    async fn test_list_namespaces_returns_no_namespaces_under_parent() {
565        let catalog = new_memory_catalog().await;
566        let namespace_ident_1 = NamespaceIdent::new("a".into());
567        let namespace_ident_2 = NamespaceIdent::new("b".into());
568        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
569
570        assert_eq!(
571            catalog
572                .list_namespaces(Some(&namespace_ident_1))
573                .await
574                .unwrap(),
575            vec![]
576        );
577    }
578
579    #[tokio::test]
580    async fn test_list_namespaces_returns_namespace_under_parent() {
581        let catalog = new_memory_catalog().await;
582        let namespace_ident_1 = NamespaceIdent::new("a".into());
583        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
584        let namespace_ident_3 = NamespaceIdent::new("c".into());
585        create_namespaces(&catalog, &vec![
586            &namespace_ident_1,
587            &namespace_ident_2,
588            &namespace_ident_3,
589        ])
590        .await;
591
592        assert_eq!(
593            to_set(catalog.list_namespaces(None).await.unwrap()),
594            to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
595        );
596
597        assert_eq!(
598            catalog
599                .list_namespaces(Some(&namespace_ident_1))
600                .await
601                .unwrap(),
602            vec![NamespaceIdent::new("b".into())]
603        );
604    }
605
606    #[tokio::test]
607    async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
608        let catalog = new_memory_catalog().await;
609        let namespace_ident_1 = NamespaceIdent::new("a".to_string());
610        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
611        let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
612        let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
613        let namespace_ident_5 = NamespaceIdent::new("b".into());
614        create_namespaces(&catalog, &vec![
615            &namespace_ident_1,
616            &namespace_ident_2,
617            &namespace_ident_3,
618            &namespace_ident_4,
619            &namespace_ident_5,
620        ])
621        .await;
622
623        assert_eq!(
624            to_set(
625                catalog
626                    .list_namespaces(Some(&namespace_ident_1))
627                    .await
628                    .unwrap()
629            ),
630            to_set(vec![
631                NamespaceIdent::new("a".into()),
632                NamespaceIdent::new("b".into()),
633                NamespaceIdent::new("c".into()),
634            ])
635        );
636    }
637
638    #[tokio::test]
639    async fn test_namespace_exists_returns_false() {
640        let catalog = new_memory_catalog().await;
641        let namespace_ident = NamespaceIdent::new("a".into());
642        create_namespace(&catalog, &namespace_ident).await;
643
644        assert!(
645            !catalog
646                .namespace_exists(&NamespaceIdent::new("b".into()))
647                .await
648                .unwrap()
649        );
650    }
651
652    #[tokio::test]
653    async fn test_namespace_exists_returns_true() {
654        let catalog = new_memory_catalog().await;
655        let namespace_ident = NamespaceIdent::new("a".into());
656        create_namespace(&catalog, &namespace_ident).await;
657
658        assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
659    }
660
661    #[tokio::test]
662    async fn test_create_namespace_with_empty_properties() {
663        let catalog = new_memory_catalog().await;
664        let namespace_ident = NamespaceIdent::new("a".into());
665
666        assert_eq!(
667            catalog
668                .create_namespace(&namespace_ident, HashMap::new())
669                .await
670                .unwrap(),
671            Namespace::new(namespace_ident.clone())
672        );
673
674        assert_eq!(
675            catalog.get_namespace(&namespace_ident).await.unwrap(),
676            Namespace::with_properties(namespace_ident, HashMap::new())
677        );
678    }
679
680    #[tokio::test]
681    async fn test_create_namespace_with_properties() {
682        let catalog = new_memory_catalog().await;
683        let namespace_ident = NamespaceIdent::new("abc".into());
684
685        let mut properties: HashMap<String, String> = HashMap::new();
686        properties.insert("k".into(), "v".into());
687
688        assert_eq!(
689            catalog
690                .create_namespace(&namespace_ident, properties.clone())
691                .await
692                .unwrap(),
693            Namespace::with_properties(namespace_ident.clone(), properties.clone())
694        );
695
696        assert_eq!(
697            catalog.get_namespace(&namespace_ident).await.unwrap(),
698            Namespace::with_properties(namespace_ident, properties)
699        );
700    }
701
702    #[tokio::test]
703    async fn test_create_namespace_throws_error_if_namespace_already_exists() {
704        let catalog = new_memory_catalog().await;
705        let namespace_ident = NamespaceIdent::new("a".into());
706        create_namespace(&catalog, &namespace_ident).await;
707
708        assert_eq!(
709            catalog
710                .create_namespace(&namespace_ident, HashMap::new())
711                .await
712                .unwrap_err()
713                .to_string(),
714            format!(
715                "NamespaceAlreadyExists => Cannot create namespace {:?}. Namespace already exists.",
716                &namespace_ident
717            )
718        );
719
720        assert_eq!(
721            catalog.get_namespace(&namespace_ident).await.unwrap(),
722            Namespace::with_properties(namespace_ident, HashMap::new())
723        );
724    }
725
726    #[tokio::test]
727    async fn test_create_nested_namespace() {
728        let catalog = new_memory_catalog().await;
729        let parent_namespace_ident = NamespaceIdent::new("a".into());
730        create_namespace(&catalog, &parent_namespace_ident).await;
731
732        let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
733
734        assert_eq!(
735            catalog
736                .create_namespace(&child_namespace_ident, HashMap::new())
737                .await
738                .unwrap(),
739            Namespace::new(child_namespace_ident.clone())
740        );
741
742        assert_eq!(
743            catalog.get_namespace(&child_namespace_ident).await.unwrap(),
744            Namespace::with_properties(child_namespace_ident, HashMap::new())
745        );
746    }
747
748    #[tokio::test]
749    async fn test_create_deeply_nested_namespace() {
750        let catalog = new_memory_catalog().await;
751        let namespace_ident_a = NamespaceIdent::new("a".into());
752        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
753        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
754
755        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
756
757        assert_eq!(
758            catalog
759                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
760                .await
761                .unwrap(),
762            Namespace::new(namespace_ident_a_b_c.clone())
763        );
764
765        assert_eq!(
766            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
767            Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
768        );
769    }
770
771    #[tokio::test]
772    async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() {
773        let catalog = new_memory_catalog().await;
774
775        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
776
777        assert_eq!(
778            catalog
779                .create_namespace(&nested_namespace_ident, HashMap::new())
780                .await
781                .unwrap_err()
782                .to_string(),
783            format!(
784                "NamespaceNotFound => No such namespace: {:?}",
785                NamespaceIdent::new("a".into())
786            )
787        );
788
789        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
790    }
791
792    #[tokio::test]
793    async fn test_create_deeply_nested_namespace_throws_error_if_intermediate_namespace_doesnt_exist()
794     {
795        let catalog = new_memory_catalog().await;
796
797        let namespace_ident_a = NamespaceIdent::new("a".into());
798        create_namespace(&catalog, &namespace_ident_a).await;
799
800        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
801
802        assert_eq!(
803            catalog
804                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
805                .await
806                .unwrap_err()
807                .to_string(),
808            format!(
809                "NamespaceNotFound => No such namespace: {:?}",
810                NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()
811            )
812        );
813
814        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
815            namespace_ident_a.clone()
816        ]);
817
818        assert_eq!(
819            catalog
820                .list_namespaces(Some(&namespace_ident_a))
821                .await
822                .unwrap(),
823            vec![]
824        );
825    }
826
827    #[tokio::test]
828    async fn test_get_namespace() {
829        let catalog = new_memory_catalog().await;
830        let namespace_ident = NamespaceIdent::new("abc".into());
831
832        let mut properties: HashMap<String, String> = HashMap::new();
833        properties.insert("k".into(), "v".into());
834        let _ = catalog
835            .create_namespace(&namespace_ident, properties.clone())
836            .await
837            .unwrap();
838
839        assert_eq!(
840            catalog.get_namespace(&namespace_ident).await.unwrap(),
841            Namespace::with_properties(namespace_ident, properties)
842        )
843    }
844
845    #[tokio::test]
846    async fn test_get_nested_namespace() {
847        let catalog = new_memory_catalog().await;
848        let namespace_ident_a = NamespaceIdent::new("a".into());
849        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
850        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
851
852        assert_eq!(
853            catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
854            Namespace::with_properties(namespace_ident_a_b, HashMap::new())
855        );
856    }
857
858    #[tokio::test]
859    async fn test_get_deeply_nested_namespace() {
860        let catalog = new_memory_catalog().await;
861        let namespace_ident_a = NamespaceIdent::new("a".into());
862        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
863        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
864        create_namespaces(&catalog, &vec![
865            &namespace_ident_a,
866            &namespace_ident_a_b,
867            &namespace_ident_a_b_c,
868        ])
869        .await;
870
871        assert_eq!(
872            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
873            Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
874        );
875    }
876
877    #[tokio::test]
878    async fn test_get_namespace_throws_error_if_namespace_doesnt_exist() {
879        let catalog = new_memory_catalog().await;
880        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
881
882        let non_existent_namespace_ident = NamespaceIdent::new("b".into());
883        assert_eq!(
884            catalog
885                .get_namespace(&non_existent_namespace_ident)
886                .await
887                .unwrap_err()
888                .to_string(),
889            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
890        )
891    }
892
893    #[tokio::test]
894    async fn test_update_namespace() {
895        let catalog = new_memory_catalog().await;
896        let namespace_ident = NamespaceIdent::new("abc".into());
897        create_namespace(&catalog, &namespace_ident).await;
898
899        let mut new_properties: HashMap<String, String> = HashMap::new();
900        new_properties.insert("k".into(), "v".into());
901
902        catalog
903            .update_namespace(&namespace_ident, new_properties.clone())
904            .await
905            .unwrap();
906
907        assert_eq!(
908            catalog.get_namespace(&namespace_ident).await.unwrap(),
909            Namespace::with_properties(namespace_ident, new_properties)
910        )
911    }
912
913    #[tokio::test]
914    async fn test_update_nested_namespace() {
915        let catalog = new_memory_catalog().await;
916        let namespace_ident_a = NamespaceIdent::new("a".into());
917        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
918        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
919
920        let mut new_properties = HashMap::new();
921        new_properties.insert("k".into(), "v".into());
922
923        catalog
924            .update_namespace(&namespace_ident_a_b, new_properties.clone())
925            .await
926            .unwrap();
927
928        assert_eq!(
929            catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
930            Namespace::with_properties(namespace_ident_a_b, new_properties)
931        );
932    }
933
934    #[tokio::test]
935    async fn test_update_deeply_nested_namespace() {
936        let catalog = new_memory_catalog().await;
937        let namespace_ident_a = NamespaceIdent::new("a".into());
938        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
939        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
940        create_namespaces(&catalog, &vec![
941            &namespace_ident_a,
942            &namespace_ident_a_b,
943            &namespace_ident_a_b_c,
944        ])
945        .await;
946
947        let mut new_properties = HashMap::new();
948        new_properties.insert("k".into(), "v".into());
949
950        catalog
951            .update_namespace(&namespace_ident_a_b_c, new_properties.clone())
952            .await
953            .unwrap();
954
955        assert_eq!(
956            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
957            Namespace::with_properties(namespace_ident_a_b_c, new_properties)
958        );
959    }
960
961    #[tokio::test]
962    async fn test_update_namespace_throws_error_if_namespace_doesnt_exist() {
963        let catalog = new_memory_catalog().await;
964        create_namespace(&catalog, &NamespaceIdent::new("abc".into())).await;
965
966        let non_existent_namespace_ident = NamespaceIdent::new("def".into());
967        assert_eq!(
968            catalog
969                .update_namespace(&non_existent_namespace_ident, HashMap::new())
970                .await
971                .unwrap_err()
972                .to_string(),
973            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
974        )
975    }
976
977    #[tokio::test]
978    async fn test_drop_namespace() {
979        let catalog = new_memory_catalog().await;
980        let namespace_ident = NamespaceIdent::new("abc".into());
981        create_namespace(&catalog, &namespace_ident).await;
982
983        catalog.drop_namespace(&namespace_ident).await.unwrap();
984
985        assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
986    }
987
988    #[tokio::test]
989    async fn test_drop_nested_namespace() {
990        let catalog = new_memory_catalog().await;
991        let namespace_ident_a = NamespaceIdent::new("a".into());
992        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
993        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
994
995        catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
996
997        assert!(
998            !catalog
999                .namespace_exists(&namespace_ident_a_b)
1000                .await
1001                .unwrap()
1002        );
1003
1004        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1005    }
1006
1007    #[tokio::test]
1008    async fn test_drop_deeply_nested_namespace() {
1009        let catalog = new_memory_catalog().await;
1010        let namespace_ident_a = NamespaceIdent::new("a".into());
1011        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1012        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1013        create_namespaces(&catalog, &vec![
1014            &namespace_ident_a,
1015            &namespace_ident_a_b,
1016            &namespace_ident_a_b_c,
1017        ])
1018        .await;
1019
1020        catalog
1021            .drop_namespace(&namespace_ident_a_b_c)
1022            .await
1023            .unwrap();
1024
1025        assert!(
1026            !catalog
1027                .namespace_exists(&namespace_ident_a_b_c)
1028                .await
1029                .unwrap()
1030        );
1031
1032        assert!(
1033            catalog
1034                .namespace_exists(&namespace_ident_a_b)
1035                .await
1036                .unwrap()
1037        );
1038
1039        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1040    }
1041
1042    #[tokio::test]
1043    async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1044        let catalog = new_memory_catalog().await;
1045
1046        let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1047        assert_eq!(
1048            catalog
1049                .drop_namespace(&non_existent_namespace_ident)
1050                .await
1051                .unwrap_err()
1052                .to_string(),
1053            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1054        )
1055    }
1056
1057    #[tokio::test]
1058    async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1059        let catalog = new_memory_catalog().await;
1060        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1061
1062        let non_existent_namespace_ident =
1063            NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1064        assert_eq!(
1065            catalog
1066                .drop_namespace(&non_existent_namespace_ident)
1067                .await
1068                .unwrap_err()
1069                .to_string(),
1070            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1071        )
1072    }
1073
1074    #[tokio::test]
1075    async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() {
1076        let catalog = new_memory_catalog().await;
1077        let namespace_ident_a = NamespaceIdent::new("a".into());
1078        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1079        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1080
1081        catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1082
1083        assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1084
1085        assert!(
1086            !catalog
1087                .namespace_exists(&namespace_ident_a_b)
1088                .await
1089                .unwrap()
1090        );
1091    }
1092
1093    #[tokio::test]
1094    async fn test_create_table_with_location() {
1095        let tmp_dir = TempDir::new().unwrap();
1096        let catalog = new_memory_catalog().await;
1097        let namespace_ident = NamespaceIdent::new("a".into());
1098        create_namespace(&catalog, &namespace_ident).await;
1099
1100        let table_name = "abc";
1101        let location = tmp_dir.path().to_str().unwrap().to_string();
1102        let table_creation = TableCreation::builder()
1103            .name(table_name.into())
1104            .location(location.clone())
1105            .schema(simple_table_schema())
1106            .build();
1107
1108        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1109
1110        assert_table_eq(
1111            &catalog
1112                .create_table(&namespace_ident, table_creation)
1113                .await
1114                .unwrap(),
1115            &expected_table_ident,
1116            &simple_table_schema(),
1117        );
1118
1119        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1120
1121        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1122
1123        assert!(
1124            table
1125                .metadata_location()
1126                .unwrap()
1127                .to_string()
1128                .starts_with(&location)
1129        )
1130    }
1131
1132    #[tokio::test]
1133    async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1134        let warehouse_location = temp_path();
1135        let catalog = MemoryCatalogBuilder::default()
1136            .load(
1137                "memory",
1138                HashMap::from([(
1139                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1140                    warehouse_location.clone(),
1141                )]),
1142            )
1143            .await
1144            .unwrap();
1145
1146        let namespace_ident = NamespaceIdent::new("a".into());
1147        let mut namespace_properties = HashMap::new();
1148        let namespace_location = temp_path();
1149        namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1150        catalog
1151            .create_namespace(&namespace_ident, namespace_properties)
1152            .await
1153            .unwrap();
1154
1155        let table_name = "tbl1";
1156        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1157        let expected_table_metadata_location_regex =
1158            format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1159
1160        let table = catalog
1161            .create_table(
1162                &namespace_ident,
1163                TableCreation::builder()
1164                    .name(table_name.into())
1165                    .schema(simple_table_schema())
1166                    // no location specified for table
1167                    .build(),
1168            )
1169            .await
1170            .unwrap();
1171        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1172        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1173
1174        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1175        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1176        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1177    }
1178
1179    #[tokio::test]
1180    async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1181     {
1182        let warehouse_location = temp_path();
1183        let catalog = MemoryCatalogBuilder::default()
1184            .load(
1185                "memory",
1186                HashMap::from([(
1187                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1188                    warehouse_location.clone(),
1189                )]),
1190            )
1191            .await
1192            .unwrap();
1193
1194        let namespace_ident = NamespaceIdent::new("a".into());
1195        let mut namespace_properties = HashMap::new();
1196        let namespace_location = temp_path();
1197        namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1198        catalog
1199            .create_namespace(&namespace_ident, namespace_properties)
1200            .await
1201            .unwrap();
1202
1203        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1204        let mut nested_namespace_properties = HashMap::new();
1205        let nested_namespace_location = temp_path();
1206        nested_namespace_properties
1207            .insert(LOCATION.to_string(), nested_namespace_location.to_string());
1208        catalog
1209            .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1210            .await
1211            .unwrap();
1212
1213        let table_name = "tbl1";
1214        let expected_table_ident =
1215            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1216        let expected_table_metadata_location_regex = format!(
1217            "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1218        );
1219
1220        let table = catalog
1221            .create_table(
1222                &nested_namespace_ident,
1223                TableCreation::builder()
1224                    .name(table_name.into())
1225                    .schema(simple_table_schema())
1226                    // no location specified for table
1227                    .build(),
1228            )
1229            .await
1230            .unwrap();
1231        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1232        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1233
1234        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1235        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1236        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1237    }
1238
1239    #[tokio::test]
1240    async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1241     {
1242        let warehouse_location = temp_path();
1243        let catalog = MemoryCatalogBuilder::default()
1244            .load(
1245                "memory",
1246                HashMap::from([(
1247                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1248                    warehouse_location.clone(),
1249                )]),
1250            )
1251            .await
1252            .unwrap();
1253
1254        let namespace_ident = NamespaceIdent::new("a".into());
1255        // note: no location specified in namespace_properties
1256        let namespace_properties = HashMap::new();
1257        catalog
1258            .create_namespace(&namespace_ident, namespace_properties)
1259            .await
1260            .unwrap();
1261
1262        let table_name = "tbl1";
1263        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1264        let expected_table_metadata_location_regex =
1265            format!("^{warehouse_location}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1266
1267        let table = catalog
1268            .create_table(
1269                &namespace_ident,
1270                TableCreation::builder()
1271                    .name(table_name.into())
1272                    .schema(simple_table_schema())
1273                    // no location specified for table
1274                    .build(),
1275            )
1276            .await
1277            .unwrap();
1278        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1279        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1280
1281        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1282        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1283        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1284    }
1285
1286    #[tokio::test]
1287    async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1288     {
1289        let warehouse_location = temp_path();
1290        let catalog = MemoryCatalogBuilder::default()
1291            .load(
1292                "memory",
1293                HashMap::from([(
1294                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1295                    warehouse_location.clone(),
1296                )]),
1297            )
1298            .await
1299            .unwrap();
1300
1301        let namespace_ident = NamespaceIdent::new("a".into());
1302        catalog
1303            // note: no location specified in namespace_properties
1304            .create_namespace(&namespace_ident, HashMap::new())
1305            .await
1306            .unwrap();
1307
1308        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1309        catalog
1310            // note: no location specified in namespace_properties
1311            .create_namespace(&nested_namespace_ident, HashMap::new())
1312            .await
1313            .unwrap();
1314
1315        let table_name = "tbl1";
1316        let expected_table_ident =
1317            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1318        let expected_table_metadata_location_regex = format!(
1319            "^{warehouse_location}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$"
1320        );
1321
1322        let table = catalog
1323            .create_table(
1324                &nested_namespace_ident,
1325                TableCreation::builder()
1326                    .name(table_name.into())
1327                    .schema(simple_table_schema())
1328                    // no location specified for table
1329                    .build(),
1330            )
1331            .await
1332            .unwrap();
1333        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1334        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1335
1336        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1337        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1338        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1339    }
1340
1341    #[tokio::test]
1342    async fn test_create_table_throws_error_if_table_location_and_namespace_location_and_warehouse_location_are_missing()
1343     {
1344        let catalog = MemoryCatalogBuilder::default()
1345            .load("memory", HashMap::from([]))
1346            .await;
1347
1348        assert!(catalog.is_err());
1349        assert_eq!(
1350            catalog.unwrap_err().to_string(),
1351            "DataInvalid => Catalog warehouse is required"
1352        );
1353    }
1354
1355    #[tokio::test]
1356    async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1357        let catalog = new_memory_catalog().await;
1358        let namespace_ident = NamespaceIdent::new("a".into());
1359        create_namespace(&catalog, &namespace_ident).await;
1360        let table_name = "tbl1";
1361        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1362        create_table(&catalog, &table_ident).await;
1363
1364        let tmp_dir = TempDir::new().unwrap();
1365        let location = tmp_dir.path().to_str().unwrap().to_string();
1366
1367        assert_eq!(
1368            catalog
1369                .create_table(
1370                    &namespace_ident,
1371                    TableCreation::builder()
1372                        .name(table_name.into())
1373                        .schema(simple_table_schema())
1374                        .location(location)
1375                        .build()
1376                )
1377                .await
1378                .unwrap_err()
1379                .to_string(),
1380            format!(
1381                "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1382                &table_ident
1383            )
1384        );
1385    }
1386
1387    #[tokio::test]
1388    async fn test_list_tables_returns_empty_vector() {
1389        let catalog = new_memory_catalog().await;
1390        let namespace_ident = NamespaceIdent::new("a".into());
1391        create_namespace(&catalog, &namespace_ident).await;
1392
1393        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1394    }
1395
1396    #[tokio::test]
1397    async fn test_list_tables_returns_a_single_table() {
1398        let catalog = new_memory_catalog().await;
1399        let namespace_ident = NamespaceIdent::new("n1".into());
1400        create_namespace(&catalog, &namespace_ident).await;
1401
1402        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1403        create_table(&catalog, &table_ident).await;
1404
1405        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1406            table_ident
1407        ]);
1408    }
1409
1410    #[tokio::test]
1411    async fn test_list_tables_returns_multiple_tables() {
1412        let catalog = new_memory_catalog().await;
1413        let namespace_ident = NamespaceIdent::new("n1".into());
1414        create_namespace(&catalog, &namespace_ident).await;
1415
1416        let table_ident_1 = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1417        let table_ident_2 = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1418        let _ = create_tables(&catalog, vec![&table_ident_1, &table_ident_2]).await;
1419
1420        assert_eq!(
1421            to_set(catalog.list_tables(&namespace_ident).await.unwrap()),
1422            to_set(vec![table_ident_1, table_ident_2])
1423        );
1424    }
1425
1426    #[tokio::test]
1427    async fn test_list_tables_returns_tables_from_correct_namespace() {
1428        let catalog = new_memory_catalog().await;
1429        let namespace_ident_1 = NamespaceIdent::new("n1".into());
1430        let namespace_ident_2 = NamespaceIdent::new("n2".into());
1431        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1432
1433        let table_ident_1 = TableIdent::new(namespace_ident_1.clone(), "tbl1".into());
1434        let table_ident_2 = TableIdent::new(namespace_ident_1.clone(), "tbl2".into());
1435        let table_ident_3 = TableIdent::new(namespace_ident_2.clone(), "tbl1".into());
1436        let _ = create_tables(&catalog, vec![
1437            &table_ident_1,
1438            &table_ident_2,
1439            &table_ident_3,
1440        ])
1441        .await;
1442
1443        assert_eq!(
1444            to_set(catalog.list_tables(&namespace_ident_1).await.unwrap()),
1445            to_set(vec![table_ident_1, table_ident_2])
1446        );
1447
1448        assert_eq!(
1449            to_set(catalog.list_tables(&namespace_ident_2).await.unwrap()),
1450            to_set(vec![table_ident_3])
1451        );
1452    }
1453
1454    #[tokio::test]
1455    async fn test_list_tables_returns_table_under_nested_namespace() {
1456        let catalog = new_memory_catalog().await;
1457        let namespace_ident_a = NamespaceIdent::new("a".into());
1458        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1459        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1460
1461        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1462        create_table(&catalog, &table_ident).await;
1463
1464        assert_eq!(
1465            catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1466            vec![table_ident]
1467        );
1468    }
1469
1470    #[tokio::test]
1471    async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1472        let catalog = new_memory_catalog().await;
1473
1474        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1475
1476        assert_eq!(
1477            catalog
1478                .list_tables(&non_existent_namespace_ident)
1479                .await
1480                .unwrap_err()
1481                .to_string(),
1482            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1483        );
1484    }
1485
1486    #[tokio::test]
1487    async fn test_drop_table() {
1488        let catalog = new_memory_catalog().await;
1489        let namespace_ident = NamespaceIdent::new("n1".into());
1490        create_namespace(&catalog, &namespace_ident).await;
1491        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1492        create_table(&catalog, &table_ident).await;
1493
1494        catalog.drop_table(&table_ident).await.unwrap();
1495    }
1496
1497    #[tokio::test]
1498    async fn test_drop_table_drops_table_under_nested_namespace() {
1499        let catalog = new_memory_catalog().await;
1500        let namespace_ident_a = NamespaceIdent::new("a".into());
1501        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1502        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1503
1504        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1505        create_table(&catalog, &table_ident).await;
1506
1507        catalog.drop_table(&table_ident).await.unwrap();
1508
1509        assert_eq!(
1510            catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1511            vec![]
1512        );
1513    }
1514
1515    #[tokio::test]
1516    async fn test_drop_table_throws_error_if_namespace_doesnt_exist() {
1517        let catalog = new_memory_catalog().await;
1518
1519        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1520        let non_existent_table_ident =
1521            TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1522
1523        assert_eq!(
1524            catalog
1525                .drop_table(&non_existent_table_ident)
1526                .await
1527                .unwrap_err()
1528                .to_string(),
1529            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1530        );
1531    }
1532
1533    #[tokio::test]
1534    async fn test_drop_table_throws_error_if_table_doesnt_exist() {
1535        let catalog = new_memory_catalog().await;
1536        let namespace_ident = NamespaceIdent::new("n1".into());
1537        create_namespace(&catalog, &namespace_ident).await;
1538
1539        let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1540
1541        assert_eq!(
1542            catalog
1543                .drop_table(&non_existent_table_ident)
1544                .await
1545                .unwrap_err()
1546                .to_string(),
1547            format!("TableNotFound => No such table: {non_existent_table_ident:?}"),
1548        );
1549    }
1550
1551    #[tokio::test]
1552    async fn test_table_exists_returns_true() {
1553        let catalog = new_memory_catalog().await;
1554        let namespace_ident = NamespaceIdent::new("n1".into());
1555        create_namespace(&catalog, &namespace_ident).await;
1556        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1557        create_table(&catalog, &table_ident).await;
1558
1559        assert!(catalog.table_exists(&table_ident).await.unwrap());
1560    }
1561
1562    #[tokio::test]
1563    async fn test_table_exists_returns_false() {
1564        let catalog = new_memory_catalog().await;
1565        let namespace_ident = NamespaceIdent::new("n1".into());
1566        create_namespace(&catalog, &namespace_ident).await;
1567        let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1568
1569        assert!(
1570            !catalog
1571                .table_exists(&non_existent_table_ident)
1572                .await
1573                .unwrap()
1574        );
1575    }
1576
1577    #[tokio::test]
1578    async fn test_table_exists_under_nested_namespace() {
1579        let catalog = new_memory_catalog().await;
1580        let namespace_ident_a = NamespaceIdent::new("a".into());
1581        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1582        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1583
1584        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1585        create_table(&catalog, &table_ident).await;
1586
1587        assert!(catalog.table_exists(&table_ident).await.unwrap());
1588
1589        let non_existent_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl2".into());
1590        assert!(
1591            !catalog
1592                .table_exists(&non_existent_table_ident)
1593                .await
1594                .unwrap()
1595        );
1596    }
1597
1598    #[tokio::test]
1599    async fn test_table_exists_throws_error_if_namespace_doesnt_exist() {
1600        let catalog = new_memory_catalog().await;
1601
1602        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1603        let non_existent_table_ident =
1604            TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1605
1606        assert_eq!(
1607            catalog
1608                .table_exists(&non_existent_table_ident)
1609                .await
1610                .unwrap_err()
1611                .to_string(),
1612            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1613        );
1614    }
1615
1616    #[tokio::test]
1617    async fn test_rename_table_in_same_namespace() {
1618        let catalog = new_memory_catalog().await;
1619        let namespace_ident = NamespaceIdent::new("n1".into());
1620        create_namespace(&catalog, &namespace_ident).await;
1621        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1622        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1623        create_table(&catalog, &src_table_ident).await;
1624
1625        catalog
1626            .rename_table(&src_table_ident, &dst_table_ident)
1627            .await
1628            .unwrap();
1629
1630        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1631            dst_table_ident
1632        ],);
1633    }
1634
1635    #[tokio::test]
1636    async fn test_rename_table_across_namespaces() {
1637        let catalog = new_memory_catalog().await;
1638        let src_namespace_ident = NamespaceIdent::new("a".into());
1639        let dst_namespace_ident = NamespaceIdent::new("b".into());
1640        create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
1641        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1642        let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
1643        create_table(&catalog, &src_table_ident).await;
1644
1645        catalog
1646            .rename_table(&src_table_ident, &dst_table_ident)
1647            .await
1648            .unwrap();
1649
1650        assert_eq!(
1651            catalog.list_tables(&src_namespace_ident).await.unwrap(),
1652            vec![],
1653        );
1654
1655        assert_eq!(
1656            catalog.list_tables(&dst_namespace_ident).await.unwrap(),
1657            vec![dst_table_ident],
1658        );
1659    }
1660
1661    #[tokio::test]
1662    async fn test_rename_table_src_table_is_same_as_dst_table() {
1663        let catalog = new_memory_catalog().await;
1664        let namespace_ident = NamespaceIdent::new("n1".into());
1665        create_namespace(&catalog, &namespace_ident).await;
1666        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1667        create_table(&catalog, &table_ident).await;
1668
1669        catalog
1670            .rename_table(&table_ident, &table_ident)
1671            .await
1672            .unwrap();
1673
1674        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1675            table_ident
1676        ],);
1677    }
1678
1679    #[tokio::test]
1680    async fn test_rename_table_across_nested_namespaces() {
1681        let catalog = new_memory_catalog().await;
1682        let namespace_ident_a = NamespaceIdent::new("a".into());
1683        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1684        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1685        create_namespaces(&catalog, &vec![
1686            &namespace_ident_a,
1687            &namespace_ident_a_b,
1688            &namespace_ident_a_b_c,
1689        ])
1690        .await;
1691
1692        let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
1693        create_tables(&catalog, vec![&src_table_ident]).await;
1694
1695        let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1696        catalog
1697            .rename_table(&src_table_ident, &dst_table_ident)
1698            .await
1699            .unwrap();
1700
1701        assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
1702
1703        assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
1704    }
1705
1706    #[tokio::test]
1707    async fn test_rename_table_throws_error_if_src_namespace_doesnt_exist() {
1708        let catalog = new_memory_catalog().await;
1709
1710        let non_existent_src_namespace_ident = NamespaceIdent::new("n1".into());
1711        let src_table_ident =
1712            TableIdent::new(non_existent_src_namespace_ident.clone(), "tbl1".into());
1713
1714        let dst_namespace_ident = NamespaceIdent::new("n2".into());
1715        create_namespace(&catalog, &dst_namespace_ident).await;
1716        let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl1".into());
1717
1718        assert_eq!(
1719            catalog
1720                .rename_table(&src_table_ident, &dst_table_ident)
1721                .await
1722                .unwrap_err()
1723                .to_string(),
1724            format!("NamespaceNotFound => No such namespace: {non_existent_src_namespace_ident:?}"),
1725        );
1726    }
1727
1728    #[tokio::test]
1729    async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
1730        let catalog = new_memory_catalog().await;
1731        let src_namespace_ident = NamespaceIdent::new("n1".into());
1732        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1733        create_namespace(&catalog, &src_namespace_ident).await;
1734        create_table(&catalog, &src_table_ident).await;
1735
1736        let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
1737        let dst_table_ident =
1738            TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
1739        assert_eq!(
1740            catalog
1741                .rename_table(&src_table_ident, &dst_table_ident)
1742                .await
1743                .unwrap_err()
1744                .to_string(),
1745            format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"),
1746        );
1747    }
1748
1749    #[tokio::test]
1750    async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
1751        let catalog = new_memory_catalog().await;
1752        let namespace_ident = NamespaceIdent::new("n1".into());
1753        create_namespace(&catalog, &namespace_ident).await;
1754        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1755        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1756
1757        assert_eq!(
1758            catalog
1759                .rename_table(&src_table_ident, &dst_table_ident)
1760                .await
1761                .unwrap_err()
1762                .to_string(),
1763            format!("TableNotFound => No such table: {src_table_ident:?}"),
1764        );
1765    }
1766
1767    #[tokio::test]
1768    async fn test_rename_table_throws_error_if_dst_table_already_exists() {
1769        let catalog = new_memory_catalog().await;
1770        let namespace_ident = NamespaceIdent::new("n1".into());
1771        create_namespace(&catalog, &namespace_ident).await;
1772        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1773        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1774        create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
1775
1776        assert_eq!(
1777            catalog
1778                .rename_table(&src_table_ident, &dst_table_ident)
1779                .await
1780                .unwrap_err()
1781                .to_string(),
1782            format!(
1783                "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1784                &dst_table_ident
1785            ),
1786        );
1787    }
1788
1789    #[tokio::test]
1790    async fn test_register_table() {
1791        // Create a catalog and namespace
1792        let catalog = new_memory_catalog().await;
1793        let namespace_ident = NamespaceIdent::new("test_namespace".into());
1794        create_namespace(&catalog, &namespace_ident).await;
1795
1796        // Create a table to get a valid metadata file
1797        let source_table_ident = TableIdent::new(namespace_ident.clone(), "source_table".into());
1798        create_table(&catalog, &source_table_ident).await;
1799
1800        // Get the metadata location from the source table
1801        let source_table = catalog.load_table(&source_table_ident).await.unwrap();
1802        let metadata_location = source_table.metadata_location().unwrap().to_string();
1803
1804        // Register a new table using the same metadata location
1805        let register_table_ident =
1806            TableIdent::new(namespace_ident.clone(), "register_table".into());
1807        let registered_table = catalog
1808            .register_table(&register_table_ident, metadata_location.clone())
1809            .await
1810            .unwrap();
1811
1812        // Verify the registered table has the correct identifier
1813        assert_eq!(registered_table.identifier(), &register_table_ident);
1814
1815        // Verify the registered table has the correct metadata location
1816        assert_eq!(
1817            registered_table.metadata_location().unwrap().to_string(),
1818            metadata_location
1819        );
1820
1821        // Verify the table exists in the catalog
1822        assert!(catalog.table_exists(&register_table_ident).await.unwrap());
1823
1824        // Verify we can load the registered table
1825        let loaded_table = catalog.load_table(&register_table_ident).await.unwrap();
1826        assert_eq!(loaded_table.identifier(), &register_table_ident);
1827        assert_eq!(
1828            loaded_table.metadata_location().unwrap().to_string(),
1829            metadata_location
1830        );
1831    }
1832
1833    #[tokio::test]
1834    async fn test_update_table() {
1835        let catalog = new_memory_catalog().await;
1836
1837        let table = create_table_with_namespace(&catalog).await;
1838
1839        // Assert the table doesn't contain the update yet
1840        assert!(!table.metadata().properties().contains_key("key"));
1841
1842        // Update table metadata
1843        let tx = Transaction::new(&table);
1844        let updated_table = tx
1845            .update_table_properties()
1846            .set("key".to_string(), "value".to_string())
1847            .apply(tx)
1848            .unwrap()
1849            .commit(&catalog)
1850            .await
1851            .unwrap();
1852
1853        assert_eq!(
1854            updated_table.metadata().properties().get("key").unwrap(),
1855            "value"
1856        );
1857
1858        assert_eq!(table.identifier(), updated_table.identifier());
1859        assert_eq!(table.metadata().uuid(), updated_table.metadata().uuid());
1860        assert!(table.metadata().last_updated_ms() < updated_table.metadata().last_updated_ms());
1861        assert_ne!(table.metadata_location(), updated_table.metadata_location());
1862
1863        assert!(
1864            table.metadata().metadata_log().len() < updated_table.metadata().metadata_log().len()
1865        );
1866    }
1867
1868    #[tokio::test]
1869    async fn test_update_table_fails_if_table_doesnt_exist() {
1870        let catalog = new_memory_catalog().await;
1871
1872        let namespace_ident = NamespaceIdent::new("a".into());
1873        create_namespace(&catalog, &namespace_ident).await;
1874
1875        // This table is not known to the catalog.
1876        let table_ident = TableIdent::new(namespace_ident, "test".to_string());
1877        let table = build_table(table_ident);
1878
1879        let tx = Transaction::new(&table);
1880        let err = tx
1881            .update_table_properties()
1882            .set("key".to_string(), "value".to_string())
1883            .apply(tx)
1884            .unwrap()
1885            .commit(&catalog)
1886            .await
1887            .unwrap_err();
1888        assert_eq!(err.kind(), ErrorKind::TableNotFound);
1889    }
1890
1891    fn build_table(ident: TableIdent) -> Table {
1892        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1893
1894        let temp_dir = TempDir::new().unwrap();
1895        let location = temp_dir.path().to_str().unwrap().to_string();
1896
1897        let table_creation = TableCreation::builder()
1898            .name(ident.name().to_string())
1899            .schema(simple_table_schema())
1900            .location(location)
1901            .build();
1902        let metadata = TableMetadataBuilder::from_table_creation(table_creation)
1903            .unwrap()
1904            .build()
1905            .unwrap()
1906            .metadata;
1907
1908        Table::builder()
1909            .identifier(ident)
1910            .metadata(metadata)
1911            .file_io(file_io)
1912            .build()
1913            .unwrap()
1914    }
1915}