iceberg/catalog/memory/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains memory catalog implementation.
19
20use std::collections::HashMap;
21
22use async_trait::async_trait;
23use futures::lock::{Mutex, MutexGuard};
24use itertools::Itertools;
25
26use super::namespace_state::NamespaceState;
27use crate::io::FileIO;
28use crate::spec::{TableMetadata, TableMetadataBuilder};
29use crate::table::Table;
30use crate::{
31    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
32    TableCommit, TableCreation, TableIdent,
33};
34
35/// Memory catalog warehouse location
36pub const MEMORY_CATALOG_WAREHOUSE: &str = "warehouse";
37
38/// namespace `location` property
39const LOCATION: &str = "location";
40
41/// Builder for [`MemoryCatalog`].
42#[derive(Debug)]
43pub struct MemoryCatalogBuilder(MemoryCatalogConfig);
44
45impl Default for MemoryCatalogBuilder {
46    fn default() -> Self {
47        Self(MemoryCatalogConfig {
48            name: None,
49            warehouse: "".to_string(),
50            props: HashMap::new(),
51        })
52    }
53}
54
55impl CatalogBuilder for MemoryCatalogBuilder {
56    type C = MemoryCatalog;
57
58    fn load(
59        mut self,
60        name: impl Into<String>,
61        props: HashMap<String, String>,
62    ) -> impl Future<Output = Result<Self::C>> + Send {
63        self.0.name = Some(name.into());
64
65        if props.contains_key(MEMORY_CATALOG_WAREHOUSE) {
66            self.0.warehouse = props
67                .get(MEMORY_CATALOG_WAREHOUSE)
68                .cloned()
69                .unwrap_or_default()
70        }
71
72        // Collect other remaining properties
73        self.0.props = props
74            .into_iter()
75            .filter(|(k, _)| k != MEMORY_CATALOG_WAREHOUSE)
76            .collect();
77
78        let result = {
79            if self.0.name.is_none() {
80                Err(Error::new(
81                    ErrorKind::DataInvalid,
82                    "Catalog name is required",
83                ))
84            } else if self.0.warehouse.is_empty() {
85                Err(Error::new(
86                    ErrorKind::DataInvalid,
87                    "Catalog warehouse is required",
88                ))
89            } else {
90                MemoryCatalog::new(self.0)
91            }
92        };
93
94        std::future::ready(result)
95    }
96}
97
98#[derive(Clone, Debug)]
99pub(crate) struct MemoryCatalogConfig {
100    name: Option<String>,
101    warehouse: String,
102    props: HashMap<String, String>,
103}
104
105/// Memory catalog implementation.
106#[derive(Debug)]
107pub struct MemoryCatalog {
108    root_namespace_state: Mutex<NamespaceState>,
109    file_io: FileIO,
110    warehouse_location: String,
111}
112
113impl MemoryCatalog {
114    /// Creates a memory catalog.
115    fn new(config: MemoryCatalogConfig) -> Result<Self> {
116        Ok(Self {
117            root_namespace_state: Mutex::new(NamespaceState::default()),
118            file_io: FileIO::from_path(&config.warehouse)?
119                .with_props(config.props)
120                .build()?,
121            warehouse_location: config.warehouse,
122        })
123    }
124
125    /// Loads a table from the locked namespace state.
126    async fn load_table_from_locked_state(
127        &self,
128        table_ident: &TableIdent,
129        root_namespace_state: &MutexGuard<'_, NamespaceState>,
130    ) -> Result<Table> {
131        let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?;
132        let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
133
134        Table::builder()
135            .identifier(table_ident.clone())
136            .metadata(metadata)
137            .metadata_location(metadata_location.to_string())
138            .file_io(self.file_io.clone())
139            .build()
140    }
141}
142
143#[async_trait]
144impl Catalog for MemoryCatalog {
145    /// List namespaces inside the catalog.
146    async fn list_namespaces(
147        &self,
148        maybe_parent: Option<&NamespaceIdent>,
149    ) -> Result<Vec<NamespaceIdent>> {
150        let root_namespace_state = self.root_namespace_state.lock().await;
151
152        match maybe_parent {
153            None => {
154                let namespaces = root_namespace_state
155                    .list_top_level_namespaces()
156                    .into_iter()
157                    .map(|str| NamespaceIdent::new(str.to_string()))
158                    .collect_vec();
159
160                Ok(namespaces)
161            }
162            Some(parent_namespace_ident) => {
163                let namespaces = root_namespace_state
164                    .list_namespaces_under(parent_namespace_ident)?
165                    .into_iter()
166                    .map(|name| {
167                        let mut names = parent_namespace_ident.iter().cloned().collect::<Vec<_>>();
168                        names.push(name.to_string());
169                        NamespaceIdent::from_vec(names)
170                    })
171                    .collect::<Result<Vec<_>>>()?;
172
173                Ok(namespaces)
174            }
175        }
176    }
177
178    /// Create a new namespace inside the catalog.
179    async fn create_namespace(
180        &self,
181        namespace_ident: &NamespaceIdent,
182        properties: HashMap<String, String>,
183    ) -> Result<Namespace> {
184        let mut root_namespace_state = self.root_namespace_state.lock().await;
185
186        root_namespace_state.insert_new_namespace(namespace_ident, properties.clone())?;
187        let namespace = Namespace::with_properties(namespace_ident.clone(), properties);
188
189        Ok(namespace)
190    }
191
192    /// Get a namespace information from the catalog.
193    async fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<Namespace> {
194        let root_namespace_state = self.root_namespace_state.lock().await;
195
196        let namespace = Namespace::with_properties(
197            namespace_ident.clone(),
198            root_namespace_state
199                .get_properties(namespace_ident)?
200                .clone(),
201        );
202
203        Ok(namespace)
204    }
205
206    /// Check if namespace exists in catalog.
207    async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> Result<bool> {
208        let guarded_namespaces = self.root_namespace_state.lock().await;
209
210        Ok(guarded_namespaces.namespace_exists(namespace_ident))
211    }
212
213    /// Update a namespace inside the catalog.
214    ///
215    /// # Behavior
216    ///
217    /// The properties must be the full set of namespace.
218    async fn update_namespace(
219        &self,
220        namespace_ident: &NamespaceIdent,
221        properties: HashMap<String, String>,
222    ) -> Result<()> {
223        let mut root_namespace_state = self.root_namespace_state.lock().await;
224
225        root_namespace_state.replace_properties(namespace_ident, properties)
226    }
227
228    /// Drop a namespace from the catalog.
229    async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<()> {
230        let mut root_namespace_state = self.root_namespace_state.lock().await;
231
232        root_namespace_state.remove_existing_namespace(namespace_ident)
233    }
234
235    /// List tables from namespace.
236    async fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result<Vec<TableIdent>> {
237        let root_namespace_state = self.root_namespace_state.lock().await;
238
239        let table_names = root_namespace_state.list_tables(namespace_ident)?;
240        let table_idents = table_names
241            .into_iter()
242            .map(|table_name| TableIdent::new(namespace_ident.clone(), table_name.clone()))
243            .collect_vec();
244
245        Ok(table_idents)
246    }
247
248    /// Create a new table inside the namespace.
249    async fn create_table(
250        &self,
251        namespace_ident: &NamespaceIdent,
252        table_creation: TableCreation,
253    ) -> Result<Table> {
254        let mut root_namespace_state = self.root_namespace_state.lock().await;
255
256        let table_name = table_creation.name.clone();
257        let table_ident = TableIdent::new(namespace_ident.clone(), table_name);
258
259        let (table_creation, location) = match table_creation.location.clone() {
260            Some(location) => (table_creation, location),
261            None => {
262                let namespace_properties = root_namespace_state.get_properties(namespace_ident)?;
263                let location_prefix = match namespace_properties.get(LOCATION) {
264                    Some(namespace_location) => namespace_location.clone(),
265                    None => format!("{}/{}", self.warehouse_location, namespace_ident.join("/")),
266                };
267
268                let location = format!("{}/{}", location_prefix, table_ident.name());
269
270                let new_table_creation = TableCreation {
271                    location: Some(location.clone()),
272                    ..table_creation
273                };
274
275                (new_table_creation, location)
276            }
277        };
278
279        let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
280            .build()?
281            .metadata;
282        let metadata_location = MetadataLocation::new_with_table_location(location).to_string();
283
284        metadata.write_to(&self.file_io, &metadata_location).await?;
285
286        root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?;
287
288        Table::builder()
289            .file_io(self.file_io.clone())
290            .metadata_location(metadata_location)
291            .metadata(metadata)
292            .identifier(table_ident)
293            .build()
294    }
295
296    /// Load table from the catalog.
297    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
298        let root_namespace_state = self.root_namespace_state.lock().await;
299
300        self.load_table_from_locked_state(table_ident, &root_namespace_state)
301            .await
302    }
303
304    /// Drop a table from the catalog.
305    async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> {
306        let mut root_namespace_state = self.root_namespace_state.lock().await;
307
308        let metadata_location = root_namespace_state.remove_existing_table(table_ident)?;
309        self.file_io.delete(&metadata_location).await
310    }
311
312    /// Check if a table exists in the catalog.
313    async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
314        let root_namespace_state = self.root_namespace_state.lock().await;
315
316        root_namespace_state.table_exists(table_ident)
317    }
318
319    /// Rename a table in the catalog.
320    async fn rename_table(
321        &self,
322        src_table_ident: &TableIdent,
323        dst_table_ident: &TableIdent,
324    ) -> Result<()> {
325        let mut root_namespace_state = self.root_namespace_state.lock().await;
326
327        let mut new_root_namespace_state = root_namespace_state.clone();
328        let metadata_location = new_root_namespace_state
329            .get_existing_table_location(src_table_ident)?
330            .clone();
331        new_root_namespace_state.remove_existing_table(src_table_ident)?;
332        new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?;
333        *root_namespace_state = new_root_namespace_state;
334
335        Ok(())
336    }
337
338    async fn register_table(
339        &self,
340        table_ident: &TableIdent,
341        metadata_location: String,
342    ) -> Result<Table> {
343        let mut root_namespace_state = self.root_namespace_state.lock().await;
344        root_namespace_state.insert_new_table(&table_ident.clone(), metadata_location.clone())?;
345
346        let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
347
348        Table::builder()
349            .file_io(self.file_io.clone())
350            .metadata_location(metadata_location)
351            .metadata(metadata)
352            .identifier(table_ident.clone())
353            .build()
354    }
355
356    /// Update a table in the catalog.
357    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
358        let mut root_namespace_state = self.root_namespace_state.lock().await;
359
360        let current_table = self
361            .load_table_from_locked_state(commit.identifier(), &root_namespace_state)
362            .await?;
363
364        // Apply TableCommit to get staged table
365        let staged_table = commit.apply(current_table)?;
366
367        // Write table metadata to the new location
368        staged_table
369            .metadata()
370            .write_to(
371                staged_table.file_io(),
372                staged_table.metadata_location_result()?,
373            )
374            .await?;
375
376        // Flip the pointer to reference the new metadata file.
377        let updated_table = root_namespace_state.commit_table_update(staged_table)?;
378
379        Ok(updated_table)
380    }
381}
382
383#[cfg(test)]
384pub(crate) mod tests {
385    use std::collections::HashSet;
386    use std::hash::Hash;
387    use std::iter::FromIterator;
388    use std::vec;
389
390    use regex::Regex;
391    use tempfile::TempDir;
392
393    use super::*;
394    use crate::io::FileIOBuilder;
395    use crate::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
396    use crate::transaction::{ApplyTransactionAction, Transaction};
397
398    fn temp_path() -> String {
399        let temp_dir = TempDir::new().unwrap();
400        temp_dir.path().to_str().unwrap().to_string()
401    }
402
403    pub(crate) async fn new_memory_catalog() -> impl Catalog {
404        let warehouse_location = temp_path();
405        MemoryCatalogBuilder::default()
406            .load(
407                "memory",
408                HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_location)]),
409            )
410            .await
411            .unwrap()
412    }
413
414    async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
415        let _ = catalog
416            .create_namespace(namespace_ident, HashMap::new())
417            .await
418            .unwrap();
419    }
420
421    async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
422        for namespace_ident in namespace_idents {
423            let _ = create_namespace(catalog, namespace_ident).await;
424        }
425    }
426
427    fn to_set<T: Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
428        HashSet::from_iter(vec)
429    }
430
431    fn simple_table_schema() -> Schema {
432        Schema::builder()
433            .with_fields(vec![
434                NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
435            ])
436            .build()
437            .unwrap()
438    }
439
440    async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) -> Table {
441        catalog
442            .create_table(
443                &table_ident.namespace,
444                TableCreation::builder()
445                    .name(table_ident.name().into())
446                    .schema(simple_table_schema())
447                    .build(),
448            )
449            .await
450            .unwrap()
451    }
452
453    async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
454        for table_ident in table_idents {
455            create_table(catalog, table_ident).await;
456        }
457    }
458
459    async fn create_table_with_namespace<C: Catalog>(catalog: &C) -> Table {
460        let namespace_ident = NamespaceIdent::new("abc".into());
461        create_namespace(catalog, &namespace_ident).await;
462
463        let table_ident = TableIdent::new(namespace_ident, "test".to_string());
464        create_table(catalog, &table_ident).await
465    }
466
467    fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
468        assert_eq!(table.identifier(), expected_table_ident);
469
470        let metadata = table.metadata();
471
472        assert_eq!(metadata.current_schema().as_ref(), expected_schema);
473
474        let expected_partition_spec = PartitionSpec::builder((*expected_schema).clone())
475            .with_spec_id(0)
476            .build()
477            .unwrap();
478
479        assert_eq!(
480            metadata
481                .partition_specs_iter()
482                .map(|p| p.as_ref())
483                .collect_vec(),
484            vec![&expected_partition_spec]
485        );
486
487        let expected_sorted_order = SortOrder::builder()
488            .with_order_id(0)
489            .with_fields(vec![])
490            .build(expected_schema)
491            .unwrap();
492
493        assert_eq!(
494            metadata
495                .sort_orders_iter()
496                .map(|s| s.as_ref())
497                .collect_vec(),
498            vec![&expected_sorted_order]
499        );
500
501        assert_eq!(metadata.properties(), &HashMap::new());
502
503        assert!(!table.readonly());
504    }
505
506    const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
507
508    fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
509        let actual = table.metadata_location().unwrap().to_string();
510        let regex = Regex::new(regex_str).unwrap();
511        assert!(
512            regex.is_match(&actual),
513            "Expected metadata location to match regex, but got location: {actual} and regex: {regex}"
514        )
515    }
516
517    #[tokio::test]
518    async fn test_list_namespaces_returns_empty_vector() {
519        let catalog = new_memory_catalog().await;
520
521        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
522    }
523
524    #[tokio::test]
525    async fn test_list_namespaces_returns_single_namespace() {
526        let catalog = new_memory_catalog().await;
527        let namespace_ident = NamespaceIdent::new("abc".into());
528        create_namespace(&catalog, &namespace_ident).await;
529
530        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
531            namespace_ident
532        ]);
533    }
534
535    #[tokio::test]
536    async fn test_list_namespaces_returns_multiple_namespaces() {
537        let catalog = new_memory_catalog().await;
538        let namespace_ident_1 = NamespaceIdent::new("a".into());
539        let namespace_ident_2 = NamespaceIdent::new("b".into());
540        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
541
542        assert_eq!(
543            to_set(catalog.list_namespaces(None).await.unwrap()),
544            to_set(vec![namespace_ident_1, namespace_ident_2])
545        );
546    }
547
548    #[tokio::test]
549    async fn test_list_namespaces_returns_only_top_level_namespaces() {
550        let catalog = new_memory_catalog().await;
551        let namespace_ident_1 = NamespaceIdent::new("a".into());
552        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
553        let namespace_ident_3 = NamespaceIdent::new("b".into());
554        create_namespaces(&catalog, &vec![
555            &namespace_ident_1,
556            &namespace_ident_2,
557            &namespace_ident_3,
558        ])
559        .await;
560
561        assert_eq!(
562            to_set(catalog.list_namespaces(None).await.unwrap()),
563            to_set(vec![namespace_ident_1, namespace_ident_3])
564        );
565    }
566
567    #[tokio::test]
568    async fn test_list_namespaces_returns_no_namespaces_under_parent() {
569        let catalog = new_memory_catalog().await;
570        let namespace_ident_1 = NamespaceIdent::new("a".into());
571        let namespace_ident_2 = NamespaceIdent::new("b".into());
572        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
573
574        assert_eq!(
575            catalog
576                .list_namespaces(Some(&namespace_ident_1))
577                .await
578                .unwrap(),
579            vec![]
580        );
581    }
582
583    #[tokio::test]
584    async fn test_list_namespaces_returns_namespace_under_parent() {
585        let catalog = new_memory_catalog().await;
586        let namespace_ident_1 = NamespaceIdent::new("a".into());
587        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
588        let namespace_ident_3 = NamespaceIdent::new("c".into());
589        create_namespaces(&catalog, &vec![
590            &namespace_ident_1,
591            &namespace_ident_2,
592            &namespace_ident_3,
593        ])
594        .await;
595
596        assert_eq!(
597            to_set(catalog.list_namespaces(None).await.unwrap()),
598            to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
599        );
600
601        assert_eq!(
602            catalog
603                .list_namespaces(Some(&namespace_ident_1))
604                .await
605                .unwrap(),
606            vec![namespace_ident_2]
607        );
608    }
609
610    #[tokio::test]
611    async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
612        let catalog = new_memory_catalog().await;
613        let namespace_ident_1 = NamespaceIdent::new("a".to_string());
614        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
615        let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
616        let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
617        let namespace_ident_5 = NamespaceIdent::new("b".into());
618        create_namespaces(&catalog, &vec![
619            &namespace_ident_1,
620            &namespace_ident_2,
621            &namespace_ident_3,
622            &namespace_ident_4,
623            &namespace_ident_5,
624        ])
625        .await;
626
627        assert_eq!(
628            to_set(
629                catalog
630                    .list_namespaces(Some(&namespace_ident_1))
631                    .await
632                    .unwrap()
633            ),
634            to_set(vec![
635                namespace_ident_2,
636                namespace_ident_3,
637                namespace_ident_4,
638            ])
639        );
640    }
641
642    #[tokio::test]
643    async fn test_namespace_exists_returns_false() {
644        let catalog = new_memory_catalog().await;
645        let namespace_ident = NamespaceIdent::new("a".into());
646        create_namespace(&catalog, &namespace_ident).await;
647
648        assert!(
649            !catalog
650                .namespace_exists(&NamespaceIdent::new("b".into()))
651                .await
652                .unwrap()
653        );
654    }
655
656    #[tokio::test]
657    async fn test_namespace_exists_returns_true() {
658        let catalog = new_memory_catalog().await;
659        let namespace_ident = NamespaceIdent::new("a".into());
660        create_namespace(&catalog, &namespace_ident).await;
661
662        assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
663    }
664
665    #[tokio::test]
666    async fn test_create_namespace_with_empty_properties() {
667        let catalog = new_memory_catalog().await;
668        let namespace_ident = NamespaceIdent::new("a".into());
669
670        assert_eq!(
671            catalog
672                .create_namespace(&namespace_ident, HashMap::new())
673                .await
674                .unwrap(),
675            Namespace::new(namespace_ident.clone())
676        );
677
678        assert_eq!(
679            catalog.get_namespace(&namespace_ident).await.unwrap(),
680            Namespace::with_properties(namespace_ident, HashMap::new())
681        );
682    }
683
684    #[tokio::test]
685    async fn test_create_namespace_with_properties() {
686        let catalog = new_memory_catalog().await;
687        let namespace_ident = NamespaceIdent::new("abc".into());
688
689        let mut properties: HashMap<String, String> = HashMap::new();
690        properties.insert("k".into(), "v".into());
691
692        assert_eq!(
693            catalog
694                .create_namespace(&namespace_ident, properties.clone())
695                .await
696                .unwrap(),
697            Namespace::with_properties(namespace_ident.clone(), properties.clone())
698        );
699
700        assert_eq!(
701            catalog.get_namespace(&namespace_ident).await.unwrap(),
702            Namespace::with_properties(namespace_ident, properties)
703        );
704    }
705
706    #[tokio::test]
707    async fn test_create_namespace_throws_error_if_namespace_already_exists() {
708        let catalog = new_memory_catalog().await;
709        let namespace_ident = NamespaceIdent::new("a".into());
710        create_namespace(&catalog, &namespace_ident).await;
711
712        assert_eq!(
713            catalog
714                .create_namespace(&namespace_ident, HashMap::new())
715                .await
716                .unwrap_err()
717                .to_string(),
718            format!(
719                "NamespaceAlreadyExists => Cannot create namespace {:?}. Namespace already exists.",
720                &namespace_ident
721            )
722        );
723
724        assert_eq!(
725            catalog.get_namespace(&namespace_ident).await.unwrap(),
726            Namespace::with_properties(namespace_ident, HashMap::new())
727        );
728    }
729
730    #[tokio::test]
731    async fn test_create_nested_namespace() {
732        let catalog = new_memory_catalog().await;
733        let parent_namespace_ident = NamespaceIdent::new("a".into());
734        create_namespace(&catalog, &parent_namespace_ident).await;
735
736        let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
737
738        assert_eq!(
739            catalog
740                .create_namespace(&child_namespace_ident, HashMap::new())
741                .await
742                .unwrap(),
743            Namespace::new(child_namespace_ident.clone())
744        );
745
746        assert_eq!(
747            catalog.get_namespace(&child_namespace_ident).await.unwrap(),
748            Namespace::with_properties(child_namespace_ident, HashMap::new())
749        );
750    }
751
752    #[tokio::test]
753    async fn test_create_deeply_nested_namespace() {
754        let catalog = new_memory_catalog().await;
755        let namespace_ident_a = NamespaceIdent::new("a".into());
756        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
757        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
758
759        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
760
761        assert_eq!(
762            catalog
763                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
764                .await
765                .unwrap(),
766            Namespace::new(namespace_ident_a_b_c.clone())
767        );
768
769        assert_eq!(
770            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
771            Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
772        );
773    }
774
775    #[tokio::test]
776    async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() {
777        let catalog = new_memory_catalog().await;
778
779        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
780
781        assert_eq!(
782            catalog
783                .create_namespace(&nested_namespace_ident, HashMap::new())
784                .await
785                .unwrap_err()
786                .to_string(),
787            format!(
788                "NamespaceNotFound => No such namespace: {:?}",
789                NamespaceIdent::new("a".into())
790            )
791        );
792
793        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
794    }
795
796    #[tokio::test]
797    async fn test_create_deeply_nested_namespace_throws_error_if_intermediate_namespace_doesnt_exist()
798     {
799        let catalog = new_memory_catalog().await;
800
801        let namespace_ident_a = NamespaceIdent::new("a".into());
802        create_namespace(&catalog, &namespace_ident_a).await;
803
804        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
805
806        assert_eq!(
807            catalog
808                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
809                .await
810                .unwrap_err()
811                .to_string(),
812            format!(
813                "NamespaceNotFound => No such namespace: {:?}",
814                NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()
815            )
816        );
817
818        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
819            namespace_ident_a.clone()
820        ]);
821
822        assert_eq!(
823            catalog
824                .list_namespaces(Some(&namespace_ident_a))
825                .await
826                .unwrap(),
827            vec![]
828        );
829    }
830
831    #[tokio::test]
832    async fn test_get_namespace() {
833        let catalog = new_memory_catalog().await;
834        let namespace_ident = NamespaceIdent::new("abc".into());
835
836        let mut properties: HashMap<String, String> = HashMap::new();
837        properties.insert("k".into(), "v".into());
838        let _ = catalog
839            .create_namespace(&namespace_ident, properties.clone())
840            .await
841            .unwrap();
842
843        assert_eq!(
844            catalog.get_namespace(&namespace_ident).await.unwrap(),
845            Namespace::with_properties(namespace_ident, properties)
846        )
847    }
848
849    #[tokio::test]
850    async fn test_get_nested_namespace() {
851        let catalog = new_memory_catalog().await;
852        let namespace_ident_a = NamespaceIdent::new("a".into());
853        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
854        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
855
856        assert_eq!(
857            catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
858            Namespace::with_properties(namespace_ident_a_b, HashMap::new())
859        );
860    }
861
862    #[tokio::test]
863    async fn test_get_deeply_nested_namespace() {
864        let catalog = new_memory_catalog().await;
865        let namespace_ident_a = NamespaceIdent::new("a".into());
866        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
867        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
868        create_namespaces(&catalog, &vec![
869            &namespace_ident_a,
870            &namespace_ident_a_b,
871            &namespace_ident_a_b_c,
872        ])
873        .await;
874
875        assert_eq!(
876            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
877            Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
878        );
879    }
880
881    #[tokio::test]
882    async fn test_get_namespace_throws_error_if_namespace_doesnt_exist() {
883        let catalog = new_memory_catalog().await;
884        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
885
886        let non_existent_namespace_ident = NamespaceIdent::new("b".into());
887        assert_eq!(
888            catalog
889                .get_namespace(&non_existent_namespace_ident)
890                .await
891                .unwrap_err()
892                .to_string(),
893            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
894        )
895    }
896
897    #[tokio::test]
898    async fn test_update_namespace() {
899        let catalog = new_memory_catalog().await;
900        let namespace_ident = NamespaceIdent::new("abc".into());
901        create_namespace(&catalog, &namespace_ident).await;
902
903        let mut new_properties: HashMap<String, String> = HashMap::new();
904        new_properties.insert("k".into(), "v".into());
905
906        catalog
907            .update_namespace(&namespace_ident, new_properties.clone())
908            .await
909            .unwrap();
910
911        assert_eq!(
912            catalog.get_namespace(&namespace_ident).await.unwrap(),
913            Namespace::with_properties(namespace_ident, new_properties)
914        )
915    }
916
917    #[tokio::test]
918    async fn test_update_nested_namespace() {
919        let catalog = new_memory_catalog().await;
920        let namespace_ident_a = NamespaceIdent::new("a".into());
921        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
922        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
923
924        let mut new_properties = HashMap::new();
925        new_properties.insert("k".into(), "v".into());
926
927        catalog
928            .update_namespace(&namespace_ident_a_b, new_properties.clone())
929            .await
930            .unwrap();
931
932        assert_eq!(
933            catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
934            Namespace::with_properties(namespace_ident_a_b, new_properties)
935        );
936    }
937
938    #[tokio::test]
939    async fn test_update_deeply_nested_namespace() {
940        let catalog = new_memory_catalog().await;
941        let namespace_ident_a = NamespaceIdent::new("a".into());
942        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
943        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
944        create_namespaces(&catalog, &vec![
945            &namespace_ident_a,
946            &namespace_ident_a_b,
947            &namespace_ident_a_b_c,
948        ])
949        .await;
950
951        let mut new_properties = HashMap::new();
952        new_properties.insert("k".into(), "v".into());
953
954        catalog
955            .update_namespace(&namespace_ident_a_b_c, new_properties.clone())
956            .await
957            .unwrap();
958
959        assert_eq!(
960            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
961            Namespace::with_properties(namespace_ident_a_b_c, new_properties)
962        );
963    }
964
965    #[tokio::test]
966    async fn test_update_namespace_throws_error_if_namespace_doesnt_exist() {
967        let catalog = new_memory_catalog().await;
968        create_namespace(&catalog, &NamespaceIdent::new("abc".into())).await;
969
970        let non_existent_namespace_ident = NamespaceIdent::new("def".into());
971        assert_eq!(
972            catalog
973                .update_namespace(&non_existent_namespace_ident, HashMap::new())
974                .await
975                .unwrap_err()
976                .to_string(),
977            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
978        )
979    }
980
981    #[tokio::test]
982    async fn test_drop_namespace() {
983        let catalog = new_memory_catalog().await;
984        let namespace_ident = NamespaceIdent::new("abc".into());
985        create_namespace(&catalog, &namespace_ident).await;
986
987        catalog.drop_namespace(&namespace_ident).await.unwrap();
988
989        assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
990    }
991
992    #[tokio::test]
993    async fn test_drop_nested_namespace() {
994        let catalog = new_memory_catalog().await;
995        let namespace_ident_a = NamespaceIdent::new("a".into());
996        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
997        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
998
999        catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1000
1001        assert!(
1002            !catalog
1003                .namespace_exists(&namespace_ident_a_b)
1004                .await
1005                .unwrap()
1006        );
1007
1008        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1009    }
1010
1011    #[tokio::test]
1012    async fn test_drop_deeply_nested_namespace() {
1013        let catalog = new_memory_catalog().await;
1014        let namespace_ident_a = NamespaceIdent::new("a".into());
1015        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1016        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1017        create_namespaces(&catalog, &vec![
1018            &namespace_ident_a,
1019            &namespace_ident_a_b,
1020            &namespace_ident_a_b_c,
1021        ])
1022        .await;
1023
1024        catalog
1025            .drop_namespace(&namespace_ident_a_b_c)
1026            .await
1027            .unwrap();
1028
1029        assert!(
1030            !catalog
1031                .namespace_exists(&namespace_ident_a_b_c)
1032                .await
1033                .unwrap()
1034        );
1035
1036        assert!(
1037            catalog
1038                .namespace_exists(&namespace_ident_a_b)
1039                .await
1040                .unwrap()
1041        );
1042
1043        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1044    }
1045
1046    #[tokio::test]
1047    async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1048        let catalog = new_memory_catalog().await;
1049
1050        let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1051        assert_eq!(
1052            catalog
1053                .drop_namespace(&non_existent_namespace_ident)
1054                .await
1055                .unwrap_err()
1056                .to_string(),
1057            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1058        )
1059    }
1060
1061    #[tokio::test]
1062    async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1063        let catalog = new_memory_catalog().await;
1064        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1065
1066        let non_existent_namespace_ident =
1067            NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1068        assert_eq!(
1069            catalog
1070                .drop_namespace(&non_existent_namespace_ident)
1071                .await
1072                .unwrap_err()
1073                .to_string(),
1074            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1075        )
1076    }
1077
1078    #[tokio::test]
1079    async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() {
1080        let catalog = new_memory_catalog().await;
1081        let namespace_ident_a = NamespaceIdent::new("a".into());
1082        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1083        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1084
1085        catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1086
1087        assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1088
1089        assert!(
1090            !catalog
1091                .namespace_exists(&namespace_ident_a_b)
1092                .await
1093                .unwrap()
1094        );
1095    }
1096
1097    #[tokio::test]
1098    async fn test_create_table_with_location() {
1099        let tmp_dir = TempDir::new().unwrap();
1100        let catalog = new_memory_catalog().await;
1101        let namespace_ident = NamespaceIdent::new("a".into());
1102        create_namespace(&catalog, &namespace_ident).await;
1103
1104        let table_name = "abc";
1105        let location = tmp_dir.path().to_str().unwrap().to_string();
1106        let table_creation = TableCreation::builder()
1107            .name(table_name.into())
1108            .location(location.clone())
1109            .schema(simple_table_schema())
1110            .build();
1111
1112        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1113
1114        assert_table_eq(
1115            &catalog
1116                .create_table(&namespace_ident, table_creation)
1117                .await
1118                .unwrap(),
1119            &expected_table_ident,
1120            &simple_table_schema(),
1121        );
1122
1123        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1124
1125        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1126
1127        assert!(
1128            table
1129                .metadata_location()
1130                .unwrap()
1131                .to_string()
1132                .starts_with(&location)
1133        )
1134    }
1135
1136    #[tokio::test]
1137    async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1138        let warehouse_location = temp_path();
1139        let catalog = MemoryCatalogBuilder::default()
1140            .load(
1141                "memory",
1142                HashMap::from([(
1143                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1144                    warehouse_location.clone(),
1145                )]),
1146            )
1147            .await
1148            .unwrap();
1149
1150        let namespace_ident = NamespaceIdent::new("a".into());
1151        let mut namespace_properties = HashMap::new();
1152        let namespace_location = temp_path();
1153        namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1154        catalog
1155            .create_namespace(&namespace_ident, namespace_properties)
1156            .await
1157            .unwrap();
1158
1159        let table_name = "tbl1";
1160        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1161        let expected_table_metadata_location_regex =
1162            format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1163
1164        let table = catalog
1165            .create_table(
1166                &namespace_ident,
1167                TableCreation::builder()
1168                    .name(table_name.into())
1169                    .schema(simple_table_schema())
1170                    // no location specified for table
1171                    .build(),
1172            )
1173            .await
1174            .unwrap();
1175        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1176        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1177
1178        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1179        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1180        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1181    }
1182
1183    #[tokio::test]
1184    async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1185     {
1186        let warehouse_location = temp_path();
1187        let catalog = MemoryCatalogBuilder::default()
1188            .load(
1189                "memory",
1190                HashMap::from([(
1191                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1192                    warehouse_location.clone(),
1193                )]),
1194            )
1195            .await
1196            .unwrap();
1197
1198        let namespace_ident = NamespaceIdent::new("a".into());
1199        let mut namespace_properties = HashMap::new();
1200        let namespace_location = temp_path();
1201        namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1202        catalog
1203            .create_namespace(&namespace_ident, namespace_properties)
1204            .await
1205            .unwrap();
1206
1207        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1208        let mut nested_namespace_properties = HashMap::new();
1209        let nested_namespace_location = temp_path();
1210        nested_namespace_properties
1211            .insert(LOCATION.to_string(), nested_namespace_location.to_string());
1212        catalog
1213            .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1214            .await
1215            .unwrap();
1216
1217        let table_name = "tbl1";
1218        let expected_table_ident =
1219            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1220        let expected_table_metadata_location_regex = format!(
1221            "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1222        );
1223
1224        let table = catalog
1225            .create_table(
1226                &nested_namespace_ident,
1227                TableCreation::builder()
1228                    .name(table_name.into())
1229                    .schema(simple_table_schema())
1230                    // no location specified for table
1231                    .build(),
1232            )
1233            .await
1234            .unwrap();
1235        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1236        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1237
1238        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1239        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1240        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1241    }
1242
1243    #[tokio::test]
1244    async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1245     {
1246        let warehouse_location = temp_path();
1247        let catalog = MemoryCatalogBuilder::default()
1248            .load(
1249                "memory",
1250                HashMap::from([(
1251                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1252                    warehouse_location.clone(),
1253                )]),
1254            )
1255            .await
1256            .unwrap();
1257
1258        let namespace_ident = NamespaceIdent::new("a".into());
1259        // note: no location specified in namespace_properties
1260        let namespace_properties = HashMap::new();
1261        catalog
1262            .create_namespace(&namespace_ident, namespace_properties)
1263            .await
1264            .unwrap();
1265
1266        let table_name = "tbl1";
1267        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1268        let expected_table_metadata_location_regex =
1269            format!("^{warehouse_location}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1270
1271        let table = catalog
1272            .create_table(
1273                &namespace_ident,
1274                TableCreation::builder()
1275                    .name(table_name.into())
1276                    .schema(simple_table_schema())
1277                    // no location specified for table
1278                    .build(),
1279            )
1280            .await
1281            .unwrap();
1282        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1283        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1284
1285        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1286        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1287        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1288    }
1289
1290    #[tokio::test]
1291    async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1292     {
1293        let warehouse_location = temp_path();
1294        let catalog = MemoryCatalogBuilder::default()
1295            .load(
1296                "memory",
1297                HashMap::from([(
1298                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1299                    warehouse_location.clone(),
1300                )]),
1301            )
1302            .await
1303            .unwrap();
1304
1305        let namespace_ident = NamespaceIdent::new("a".into());
1306        catalog
1307            // note: no location specified in namespace_properties
1308            .create_namespace(&namespace_ident, HashMap::new())
1309            .await
1310            .unwrap();
1311
1312        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1313        catalog
1314            // note: no location specified in namespace_properties
1315            .create_namespace(&nested_namespace_ident, HashMap::new())
1316            .await
1317            .unwrap();
1318
1319        let table_name = "tbl1";
1320        let expected_table_ident =
1321            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1322        let expected_table_metadata_location_regex = format!(
1323            "^{warehouse_location}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$"
1324        );
1325
1326        let table = catalog
1327            .create_table(
1328                &nested_namespace_ident,
1329                TableCreation::builder()
1330                    .name(table_name.into())
1331                    .schema(simple_table_schema())
1332                    // no location specified for table
1333                    .build(),
1334            )
1335            .await
1336            .unwrap();
1337        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1338        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1339
1340        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1341        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1342        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1343    }
1344
1345    #[tokio::test]
1346    async fn test_create_table_throws_error_if_table_location_and_namespace_location_and_warehouse_location_are_missing()
1347     {
1348        let catalog = MemoryCatalogBuilder::default()
1349            .load("memory", HashMap::from([]))
1350            .await;
1351
1352        assert!(catalog.is_err());
1353        assert_eq!(
1354            catalog.unwrap_err().to_string(),
1355            "DataInvalid => Catalog warehouse is required"
1356        );
1357    }
1358
1359    #[tokio::test]
1360    async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1361        let catalog = new_memory_catalog().await;
1362        let namespace_ident = NamespaceIdent::new("a".into());
1363        create_namespace(&catalog, &namespace_ident).await;
1364        let table_name = "tbl1";
1365        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1366        create_table(&catalog, &table_ident).await;
1367
1368        let tmp_dir = TempDir::new().unwrap();
1369        let location = tmp_dir.path().to_str().unwrap().to_string();
1370
1371        assert_eq!(
1372            catalog
1373                .create_table(
1374                    &namespace_ident,
1375                    TableCreation::builder()
1376                        .name(table_name.into())
1377                        .schema(simple_table_schema())
1378                        .location(location)
1379                        .build()
1380                )
1381                .await
1382                .unwrap_err()
1383                .to_string(),
1384            format!(
1385                "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1386                &table_ident
1387            )
1388        );
1389    }
1390
1391    #[tokio::test]
1392    async fn test_list_tables_returns_empty_vector() {
1393        let catalog = new_memory_catalog().await;
1394        let namespace_ident = NamespaceIdent::new("a".into());
1395        create_namespace(&catalog, &namespace_ident).await;
1396
1397        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1398    }
1399
1400    #[tokio::test]
1401    async fn test_list_tables_returns_a_single_table() {
1402        let catalog = new_memory_catalog().await;
1403        let namespace_ident = NamespaceIdent::new("n1".into());
1404        create_namespace(&catalog, &namespace_ident).await;
1405
1406        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1407        create_table(&catalog, &table_ident).await;
1408
1409        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1410            table_ident
1411        ]);
1412    }
1413
1414    #[tokio::test]
1415    async fn test_list_tables_returns_multiple_tables() {
1416        let catalog = new_memory_catalog().await;
1417        let namespace_ident = NamespaceIdent::new("n1".into());
1418        create_namespace(&catalog, &namespace_ident).await;
1419
1420        let table_ident_1 = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1421        let table_ident_2 = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1422        let _ = create_tables(&catalog, vec![&table_ident_1, &table_ident_2]).await;
1423
1424        assert_eq!(
1425            to_set(catalog.list_tables(&namespace_ident).await.unwrap()),
1426            to_set(vec![table_ident_1, table_ident_2])
1427        );
1428    }
1429
1430    #[tokio::test]
1431    async fn test_list_tables_returns_tables_from_correct_namespace() {
1432        let catalog = new_memory_catalog().await;
1433        let namespace_ident_1 = NamespaceIdent::new("n1".into());
1434        let namespace_ident_2 = NamespaceIdent::new("n2".into());
1435        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1436
1437        let table_ident_1 = TableIdent::new(namespace_ident_1.clone(), "tbl1".into());
1438        let table_ident_2 = TableIdent::new(namespace_ident_1.clone(), "tbl2".into());
1439        let table_ident_3 = TableIdent::new(namespace_ident_2.clone(), "tbl1".into());
1440        let _ = create_tables(&catalog, vec![
1441            &table_ident_1,
1442            &table_ident_2,
1443            &table_ident_3,
1444        ])
1445        .await;
1446
1447        assert_eq!(
1448            to_set(catalog.list_tables(&namespace_ident_1).await.unwrap()),
1449            to_set(vec![table_ident_1, table_ident_2])
1450        );
1451
1452        assert_eq!(
1453            to_set(catalog.list_tables(&namespace_ident_2).await.unwrap()),
1454            to_set(vec![table_ident_3])
1455        );
1456    }
1457
1458    #[tokio::test]
1459    async fn test_list_tables_returns_table_under_nested_namespace() {
1460        let catalog = new_memory_catalog().await;
1461        let namespace_ident_a = NamespaceIdent::new("a".into());
1462        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1463        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1464
1465        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1466        create_table(&catalog, &table_ident).await;
1467
1468        assert_eq!(
1469            catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1470            vec![table_ident]
1471        );
1472    }
1473
1474    #[tokio::test]
1475    async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1476        let catalog = new_memory_catalog().await;
1477
1478        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1479
1480        assert_eq!(
1481            catalog
1482                .list_tables(&non_existent_namespace_ident)
1483                .await
1484                .unwrap_err()
1485                .to_string(),
1486            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1487        );
1488    }
1489
1490    #[tokio::test]
1491    async fn test_drop_table() {
1492        let catalog = new_memory_catalog().await;
1493        let namespace_ident = NamespaceIdent::new("n1".into());
1494        create_namespace(&catalog, &namespace_ident).await;
1495        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1496        create_table(&catalog, &table_ident).await;
1497
1498        catalog.drop_table(&table_ident).await.unwrap();
1499    }
1500
1501    #[tokio::test]
1502    async fn test_drop_table_drops_table_under_nested_namespace() {
1503        let catalog = new_memory_catalog().await;
1504        let namespace_ident_a = NamespaceIdent::new("a".into());
1505        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1506        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1507
1508        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1509        create_table(&catalog, &table_ident).await;
1510
1511        catalog.drop_table(&table_ident).await.unwrap();
1512
1513        assert_eq!(
1514            catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1515            vec![]
1516        );
1517    }
1518
1519    #[tokio::test]
1520    async fn test_drop_table_throws_error_if_namespace_doesnt_exist() {
1521        let catalog = new_memory_catalog().await;
1522
1523        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1524        let non_existent_table_ident =
1525            TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1526
1527        assert_eq!(
1528            catalog
1529                .drop_table(&non_existent_table_ident)
1530                .await
1531                .unwrap_err()
1532                .to_string(),
1533            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1534        );
1535    }
1536
1537    #[tokio::test]
1538    async fn test_drop_table_throws_error_if_table_doesnt_exist() {
1539        let catalog = new_memory_catalog().await;
1540        let namespace_ident = NamespaceIdent::new("n1".into());
1541        create_namespace(&catalog, &namespace_ident).await;
1542
1543        let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1544
1545        assert_eq!(
1546            catalog
1547                .drop_table(&non_existent_table_ident)
1548                .await
1549                .unwrap_err()
1550                .to_string(),
1551            format!("TableNotFound => No such table: {non_existent_table_ident:?}"),
1552        );
1553    }
1554
1555    #[tokio::test]
1556    async fn test_table_exists_returns_true() {
1557        let catalog = new_memory_catalog().await;
1558        let namespace_ident = NamespaceIdent::new("n1".into());
1559        create_namespace(&catalog, &namespace_ident).await;
1560        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1561        create_table(&catalog, &table_ident).await;
1562
1563        assert!(catalog.table_exists(&table_ident).await.unwrap());
1564    }
1565
1566    #[tokio::test]
1567    async fn test_table_exists_returns_false() {
1568        let catalog = new_memory_catalog().await;
1569        let namespace_ident = NamespaceIdent::new("n1".into());
1570        create_namespace(&catalog, &namespace_ident).await;
1571        let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1572
1573        assert!(
1574            !catalog
1575                .table_exists(&non_existent_table_ident)
1576                .await
1577                .unwrap()
1578        );
1579    }
1580
1581    #[tokio::test]
1582    async fn test_table_exists_under_nested_namespace() {
1583        let catalog = new_memory_catalog().await;
1584        let namespace_ident_a = NamespaceIdent::new("a".into());
1585        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1586        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1587
1588        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1589        create_table(&catalog, &table_ident).await;
1590
1591        assert!(catalog.table_exists(&table_ident).await.unwrap());
1592
1593        let non_existent_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl2".into());
1594        assert!(
1595            !catalog
1596                .table_exists(&non_existent_table_ident)
1597                .await
1598                .unwrap()
1599        );
1600    }
1601
1602    #[tokio::test]
1603    async fn test_table_exists_throws_error_if_namespace_doesnt_exist() {
1604        let catalog = new_memory_catalog().await;
1605
1606        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1607        let non_existent_table_ident =
1608            TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1609
1610        assert_eq!(
1611            catalog
1612                .table_exists(&non_existent_table_ident)
1613                .await
1614                .unwrap_err()
1615                .to_string(),
1616            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1617        );
1618    }
1619
1620    #[tokio::test]
1621    async fn test_rename_table_in_same_namespace() {
1622        let catalog = new_memory_catalog().await;
1623        let namespace_ident = NamespaceIdent::new("n1".into());
1624        create_namespace(&catalog, &namespace_ident).await;
1625        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1626        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1627        create_table(&catalog, &src_table_ident).await;
1628
1629        catalog
1630            .rename_table(&src_table_ident, &dst_table_ident)
1631            .await
1632            .unwrap();
1633
1634        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1635            dst_table_ident
1636        ],);
1637    }
1638
1639    #[tokio::test]
1640    async fn test_rename_table_across_namespaces() {
1641        let catalog = new_memory_catalog().await;
1642        let src_namespace_ident = NamespaceIdent::new("a".into());
1643        let dst_namespace_ident = NamespaceIdent::new("b".into());
1644        create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
1645        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1646        let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
1647        create_table(&catalog, &src_table_ident).await;
1648
1649        catalog
1650            .rename_table(&src_table_ident, &dst_table_ident)
1651            .await
1652            .unwrap();
1653
1654        assert_eq!(
1655            catalog.list_tables(&src_namespace_ident).await.unwrap(),
1656            vec![],
1657        );
1658
1659        assert_eq!(
1660            catalog.list_tables(&dst_namespace_ident).await.unwrap(),
1661            vec![dst_table_ident],
1662        );
1663    }
1664
1665    #[tokio::test]
1666    async fn test_rename_table_src_table_is_same_as_dst_table() {
1667        let catalog = new_memory_catalog().await;
1668        let namespace_ident = NamespaceIdent::new("n1".into());
1669        create_namespace(&catalog, &namespace_ident).await;
1670        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1671        create_table(&catalog, &table_ident).await;
1672
1673        catalog
1674            .rename_table(&table_ident, &table_ident)
1675            .await
1676            .unwrap();
1677
1678        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1679            table_ident
1680        ],);
1681    }
1682
1683    #[tokio::test]
1684    async fn test_rename_table_across_nested_namespaces() {
1685        let catalog = new_memory_catalog().await;
1686        let namespace_ident_a = NamespaceIdent::new("a".into());
1687        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1688        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1689        create_namespaces(&catalog, &vec![
1690            &namespace_ident_a,
1691            &namespace_ident_a_b,
1692            &namespace_ident_a_b_c,
1693        ])
1694        .await;
1695
1696        let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
1697        create_tables(&catalog, vec![&src_table_ident]).await;
1698
1699        let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1700        catalog
1701            .rename_table(&src_table_ident, &dst_table_ident)
1702            .await
1703            .unwrap();
1704
1705        assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
1706
1707        assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
1708    }
1709
1710    #[tokio::test]
1711    async fn test_rename_table_throws_error_if_src_namespace_doesnt_exist() {
1712        let catalog = new_memory_catalog().await;
1713
1714        let non_existent_src_namespace_ident = NamespaceIdent::new("n1".into());
1715        let src_table_ident =
1716            TableIdent::new(non_existent_src_namespace_ident.clone(), "tbl1".into());
1717
1718        let dst_namespace_ident = NamespaceIdent::new("n2".into());
1719        create_namespace(&catalog, &dst_namespace_ident).await;
1720        let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl1".into());
1721
1722        assert_eq!(
1723            catalog
1724                .rename_table(&src_table_ident, &dst_table_ident)
1725                .await
1726                .unwrap_err()
1727                .to_string(),
1728            format!("NamespaceNotFound => No such namespace: {non_existent_src_namespace_ident:?}"),
1729        );
1730    }
1731
1732    #[tokio::test]
1733    async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
1734        let catalog = new_memory_catalog().await;
1735        let src_namespace_ident = NamespaceIdent::new("n1".into());
1736        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1737        create_namespace(&catalog, &src_namespace_ident).await;
1738        create_table(&catalog, &src_table_ident).await;
1739
1740        let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
1741        let dst_table_ident =
1742            TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
1743        assert_eq!(
1744            catalog
1745                .rename_table(&src_table_ident, &dst_table_ident)
1746                .await
1747                .unwrap_err()
1748                .to_string(),
1749            format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"),
1750        );
1751    }
1752
1753    #[tokio::test]
1754    async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
1755        let catalog = new_memory_catalog().await;
1756        let namespace_ident = NamespaceIdent::new("n1".into());
1757        create_namespace(&catalog, &namespace_ident).await;
1758        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1759        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1760
1761        assert_eq!(
1762            catalog
1763                .rename_table(&src_table_ident, &dst_table_ident)
1764                .await
1765                .unwrap_err()
1766                .to_string(),
1767            format!("TableNotFound => No such table: {src_table_ident:?}"),
1768        );
1769    }
1770
1771    #[tokio::test]
1772    async fn test_rename_table_throws_error_if_dst_table_already_exists() {
1773        let catalog = new_memory_catalog().await;
1774        let namespace_ident = NamespaceIdent::new("n1".into());
1775        create_namespace(&catalog, &namespace_ident).await;
1776        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1777        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1778        create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
1779
1780        assert_eq!(
1781            catalog
1782                .rename_table(&src_table_ident, &dst_table_ident)
1783                .await
1784                .unwrap_err()
1785                .to_string(),
1786            format!(
1787                "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1788                &dst_table_ident
1789            ),
1790        );
1791    }
1792
1793    #[tokio::test]
1794    async fn test_register_table() {
1795        // Create a catalog and namespace
1796        let catalog = new_memory_catalog().await;
1797        let namespace_ident = NamespaceIdent::new("test_namespace".into());
1798        create_namespace(&catalog, &namespace_ident).await;
1799
1800        // Create a table to get a valid metadata file
1801        let source_table_ident = TableIdent::new(namespace_ident.clone(), "source_table".into());
1802        create_table(&catalog, &source_table_ident).await;
1803
1804        // Get the metadata location from the source table
1805        let source_table = catalog.load_table(&source_table_ident).await.unwrap();
1806        let metadata_location = source_table.metadata_location().unwrap().to_string();
1807
1808        // Register a new table using the same metadata location
1809        let register_table_ident =
1810            TableIdent::new(namespace_ident.clone(), "register_table".into());
1811        let registered_table = catalog
1812            .register_table(&register_table_ident, metadata_location.clone())
1813            .await
1814            .unwrap();
1815
1816        // Verify the registered table has the correct identifier
1817        assert_eq!(registered_table.identifier(), &register_table_ident);
1818
1819        // Verify the registered table has the correct metadata location
1820        assert_eq!(
1821            registered_table.metadata_location().unwrap().to_string(),
1822            metadata_location
1823        );
1824
1825        // Verify the table exists in the catalog
1826        assert!(catalog.table_exists(&register_table_ident).await.unwrap());
1827
1828        // Verify we can load the registered table
1829        let loaded_table = catalog.load_table(&register_table_ident).await.unwrap();
1830        assert_eq!(loaded_table.identifier(), &register_table_ident);
1831        assert_eq!(
1832            loaded_table.metadata_location().unwrap().to_string(),
1833            metadata_location
1834        );
1835    }
1836
1837    #[tokio::test]
1838    async fn test_update_table() {
1839        let catalog = new_memory_catalog().await;
1840
1841        let table = create_table_with_namespace(&catalog).await;
1842
1843        // Assert the table doesn't contain the update yet
1844        assert!(!table.metadata().properties().contains_key("key"));
1845
1846        // Update table metadata
1847        let tx = Transaction::new(&table);
1848        let updated_table = tx
1849            .update_table_properties()
1850            .set("key".to_string(), "value".to_string())
1851            .apply(tx)
1852            .unwrap()
1853            .commit(&catalog)
1854            .await
1855            .unwrap();
1856
1857        assert_eq!(
1858            updated_table.metadata().properties().get("key").unwrap(),
1859            "value"
1860        );
1861
1862        assert_eq!(table.identifier(), updated_table.identifier());
1863        assert_eq!(table.metadata().uuid(), updated_table.metadata().uuid());
1864        assert!(table.metadata().last_updated_ms() < updated_table.metadata().last_updated_ms());
1865        assert_ne!(table.metadata_location(), updated_table.metadata_location());
1866
1867        assert!(
1868            table.metadata().metadata_log().len() < updated_table.metadata().metadata_log().len()
1869        );
1870    }
1871
1872    #[tokio::test]
1873    async fn test_update_table_fails_if_table_doesnt_exist() {
1874        let catalog = new_memory_catalog().await;
1875
1876        let namespace_ident = NamespaceIdent::new("a".into());
1877        create_namespace(&catalog, &namespace_ident).await;
1878
1879        // This table is not known to the catalog.
1880        let table_ident = TableIdent::new(namespace_ident, "test".to_string());
1881        let table = build_table(table_ident);
1882
1883        let tx = Transaction::new(&table);
1884        let err = tx
1885            .update_table_properties()
1886            .set("key".to_string(), "value".to_string())
1887            .apply(tx)
1888            .unwrap()
1889            .commit(&catalog)
1890            .await
1891            .unwrap_err();
1892        assert_eq!(err.kind(), ErrorKind::TableNotFound);
1893    }
1894
1895    fn build_table(ident: TableIdent) -> Table {
1896        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1897
1898        let temp_dir = TempDir::new().unwrap();
1899        let location = temp_dir.path().to_str().unwrap().to_string();
1900
1901        let table_creation = TableCreation::builder()
1902            .name(ident.name().to_string())
1903            .schema(simple_table_schema())
1904            .location(location)
1905            .build();
1906        let metadata = TableMetadataBuilder::from_table_creation(table_creation)
1907            .unwrap()
1908            .build()
1909            .unwrap()
1910            .metadata;
1911
1912        Table::builder()
1913            .identifier(ident)
1914            .metadata(metadata)
1915            .file_io(file_io)
1916            .build()
1917            .unwrap()
1918    }
1919}