iceberg/catalog/memory/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains memory catalog implementation.
19
20use std::collections::HashMap;
21use std::str::FromStr;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use futures::lock::{Mutex, MutexGuard};
26use itertools::Itertools;
27
28use super::namespace_state::NamespaceState;
29use crate::io::{FileIO, FileIOBuilder, MemoryStorageFactory, StorageFactory};
30use crate::runtime::Runtime;
31use crate::spec::{TableMetadata, TableMetadataBuilder};
32use crate::table::Table;
33use crate::{
34    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
35    TableCommit, TableCreation, TableIdent,
36};
37
38/// Memory catalog warehouse location
39pub const MEMORY_CATALOG_WAREHOUSE: &str = "warehouse";
40
41/// namespace `location` property
42const LOCATION: &str = "location";
43
44/// Builder for [`MemoryCatalog`].
45#[derive(Debug)]
46pub struct MemoryCatalogBuilder {
47    config: MemoryCatalogConfig,
48    storage_factory: Option<Arc<dyn StorageFactory>>,
49    runtime: Option<Runtime>,
50}
51
52impl Default for MemoryCatalogBuilder {
53    fn default() -> Self {
54        Self {
55            config: MemoryCatalogConfig {
56                name: None,
57                warehouse: "".to_string(),
58                props: HashMap::new(),
59            },
60            storage_factory: None,
61            runtime: None,
62        }
63    }
64}
65
66impl CatalogBuilder for MemoryCatalogBuilder {
67    type C = MemoryCatalog;
68
69    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
70        self.storage_factory = Some(storage_factory);
71        self
72    }
73
74    fn with_runtime(mut self, runtime: Runtime) -> Self {
75        self.runtime = Some(runtime);
76        self
77    }
78
79    fn load(
80        mut self,
81        name: impl Into<String>,
82        props: HashMap<String, String>,
83    ) -> impl Future<Output = Result<Self::C>> + Send {
84        self.config.name = Some(name.into());
85
86        if props.contains_key(MEMORY_CATALOG_WAREHOUSE) {
87            self.config.warehouse = props
88                .get(MEMORY_CATALOG_WAREHOUSE)
89                .cloned()
90                .unwrap_or_default()
91        }
92
93        // Collect other remaining properties
94        self.config.props = props
95            .into_iter()
96            .filter(|(k, _)| k != MEMORY_CATALOG_WAREHOUSE)
97            .collect();
98
99        let result = {
100            if self.config.name.is_none() {
101                Err(Error::new(
102                    ErrorKind::DataInvalid,
103                    "Catalog name is required",
104                ))
105            } else if self.config.warehouse.is_empty() {
106                Err(Error::new(
107                    ErrorKind::DataInvalid,
108                    "Catalog warehouse is required",
109                ))
110            } else {
111                let runtime = self.runtime.unwrap_or_else(Runtime::current);
112                MemoryCatalog::new(self.config, self.storage_factory, runtime)
113            }
114        };
115
116        std::future::ready(result)
117    }
118}
119
120#[derive(Clone, Debug)]
121pub(crate) struct MemoryCatalogConfig {
122    name: Option<String>,
123    warehouse: String,
124    props: HashMap<String, String>,
125}
126
127/// Memory catalog implementation.
128#[derive(Debug)]
129pub struct MemoryCatalog {
130    root_namespace_state: Mutex<NamespaceState>,
131    file_io: FileIO,
132    warehouse_location: String,
133    runtime: Runtime,
134}
135
136impl MemoryCatalog {
137    /// Creates a memory catalog.
138    fn new(
139        config: MemoryCatalogConfig,
140        storage_factory: Option<Arc<dyn StorageFactory>>,
141        runtime: Runtime,
142    ) -> Result<Self> {
143        // Use provided factory or default to MemoryStorageFactory
144        let factory = storage_factory.unwrap_or_else(|| Arc::new(MemoryStorageFactory));
145
146        Ok(Self {
147            root_namespace_state: Mutex::new(NamespaceState::default()),
148            file_io: FileIOBuilder::new(factory).with_props(config.props).build(),
149            warehouse_location: config.warehouse,
150            runtime,
151        })
152    }
153
154    /// Loads a table from the locked namespace state.
155    async fn load_table_from_locked_state(
156        &self,
157        table_ident: &TableIdent,
158        root_namespace_state: &MutexGuard<'_, NamespaceState>,
159    ) -> Result<Table> {
160        let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?;
161        let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
162
163        Table::builder()
164            .identifier(table_ident.clone())
165            .metadata(metadata)
166            .metadata_location(metadata_location.to_string())
167            .file_io(self.file_io.clone())
168            .runtime(self.runtime.clone())
169            .build()
170    }
171}
172
173#[async_trait]
174impl Catalog for MemoryCatalog {
175    /// List namespaces inside the catalog.
176    async fn list_namespaces(
177        &self,
178        maybe_parent: Option<&NamespaceIdent>,
179    ) -> Result<Vec<NamespaceIdent>> {
180        let root_namespace_state = self.root_namespace_state.lock().await;
181
182        match maybe_parent {
183            None => {
184                let namespaces = root_namespace_state
185                    .list_top_level_namespaces()
186                    .into_iter()
187                    .map(|str| NamespaceIdent::new(str.to_string()))
188                    .collect_vec();
189
190                Ok(namespaces)
191            }
192            Some(parent_namespace_ident) => {
193                let namespaces = root_namespace_state
194                    .list_namespaces_under(parent_namespace_ident)?
195                    .into_iter()
196                    .map(|name| {
197                        let mut names = parent_namespace_ident.iter().cloned().collect::<Vec<_>>();
198                        names.push(name.to_string());
199                        NamespaceIdent::from_vec(names)
200                    })
201                    .collect::<Result<Vec<_>>>()?;
202
203                Ok(namespaces)
204            }
205        }
206    }
207
208    /// Create a new namespace inside the catalog.
209    async fn create_namespace(
210        &self,
211        namespace_ident: &NamespaceIdent,
212        properties: HashMap<String, String>,
213    ) -> Result<Namespace> {
214        let mut root_namespace_state = self.root_namespace_state.lock().await;
215
216        root_namespace_state.insert_new_namespace(namespace_ident, properties.clone())?;
217        let namespace = Namespace::with_properties(namespace_ident.clone(), properties);
218
219        Ok(namespace)
220    }
221
222    /// Get a namespace information from the catalog.
223    async fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<Namespace> {
224        let root_namespace_state = self.root_namespace_state.lock().await;
225
226        let namespace = Namespace::with_properties(
227            namespace_ident.clone(),
228            root_namespace_state
229                .get_properties(namespace_ident)?
230                .clone(),
231        );
232
233        Ok(namespace)
234    }
235
236    /// Check if namespace exists in catalog.
237    async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> Result<bool> {
238        let guarded_namespaces = self.root_namespace_state.lock().await;
239
240        Ok(guarded_namespaces.namespace_exists(namespace_ident))
241    }
242
243    /// Update a namespace inside the catalog.
244    ///
245    /// # Behavior
246    ///
247    /// The properties must be the full set of namespace.
248    async fn update_namespace(
249        &self,
250        namespace_ident: &NamespaceIdent,
251        properties: HashMap<String, String>,
252    ) -> Result<()> {
253        let mut root_namespace_state = self.root_namespace_state.lock().await;
254
255        root_namespace_state.replace_properties(namespace_ident, properties)
256    }
257
258    /// Drop a namespace from the catalog.
259    async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<()> {
260        let mut root_namespace_state = self.root_namespace_state.lock().await;
261
262        root_namespace_state.remove_existing_namespace(namespace_ident)
263    }
264
265    /// List tables from namespace.
266    async fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result<Vec<TableIdent>> {
267        let root_namespace_state = self.root_namespace_state.lock().await;
268
269        let table_names = root_namespace_state.list_tables(namespace_ident)?;
270        let table_idents = table_names
271            .into_iter()
272            .map(|table_name| TableIdent::new(namespace_ident.clone(), table_name.clone()))
273            .collect_vec();
274
275        Ok(table_idents)
276    }
277
278    /// Create a new table inside the namespace.
279    async fn create_table(
280        &self,
281        namespace_ident: &NamespaceIdent,
282        table_creation: TableCreation,
283    ) -> Result<Table> {
284        let mut root_namespace_state = self.root_namespace_state.lock().await;
285
286        let table_name = table_creation.name.clone();
287        let table_ident = TableIdent::new(namespace_ident.clone(), table_name);
288
289        let (table_creation, location) = match table_creation.location.clone() {
290            Some(location) => (table_creation, location),
291            None => {
292                let namespace_properties = root_namespace_state.get_properties(namespace_ident)?;
293                let location_prefix = match namespace_properties.get(LOCATION) {
294                    Some(namespace_location) => namespace_location.clone(),
295                    None => format!("{}/{}", self.warehouse_location, namespace_ident.join("/")),
296                };
297
298                let location = format!("{}/{}", location_prefix, table_ident.name());
299
300                let new_table_creation = TableCreation {
301                    location: Some(location.clone()),
302                    ..table_creation
303                };
304
305                (new_table_creation, location)
306            }
307        };
308
309        let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
310            .build()?
311            .metadata;
312        let metadata_location = MetadataLocation::new_with_metadata(location, &metadata);
313
314        metadata.write_to(&self.file_io, &metadata_location).await?;
315
316        root_namespace_state.insert_new_table(&table_ident, metadata_location.to_string())?;
317
318        Table::builder()
319            .file_io(self.file_io.clone())
320            .metadata_location(metadata_location.to_string())
321            .metadata(metadata)
322            .identifier(table_ident)
323            .runtime(self.runtime.clone())
324            .build()
325    }
326
327    /// Load table from the catalog.
328    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
329        let root_namespace_state = self.root_namespace_state.lock().await;
330
331        self.load_table_from_locked_state(table_ident, &root_namespace_state)
332            .await
333    }
334
335    /// Drop a table from the catalog.
336    async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> {
337        let mut root_namespace_state = self.root_namespace_state.lock().await;
338
339        root_namespace_state.remove_existing_table(table_ident)?;
340        Ok(())
341    }
342
343    async fn purge_table(&self, table_ident: &TableIdent) -> Result<()> {
344        let table_info = self.load_table(table_ident).await?;
345        self.drop_table(table_ident).await?;
346        crate::catalog::utils::drop_table_data(
347            table_info.file_io(),
348            table_info.metadata(),
349            table_info.metadata_location(),
350        )
351        .await
352    }
353
354    /// Check if a table exists in the catalog.
355    async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
356        let root_namespace_state = self.root_namespace_state.lock().await;
357
358        root_namespace_state.table_exists(table_ident)
359    }
360
361    /// Rename a table in the catalog.
362    async fn rename_table(
363        &self,
364        src_table_ident: &TableIdent,
365        dst_table_ident: &TableIdent,
366    ) -> Result<()> {
367        let mut root_namespace_state = self.root_namespace_state.lock().await;
368
369        let mut new_root_namespace_state = root_namespace_state.clone();
370        let metadata_location = new_root_namespace_state
371            .get_existing_table_location(src_table_ident)?
372            .clone();
373        new_root_namespace_state.remove_existing_table(src_table_ident)?;
374        new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?;
375        *root_namespace_state = new_root_namespace_state;
376
377        Ok(())
378    }
379
380    async fn register_table(
381        &self,
382        table_ident: &TableIdent,
383        metadata_location: String,
384    ) -> Result<Table> {
385        let mut root_namespace_state = self.root_namespace_state.lock().await;
386        root_namespace_state.insert_new_table(&table_ident.clone(), metadata_location.clone())?;
387
388        let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
389
390        Table::builder()
391            .file_io(self.file_io.clone())
392            .metadata_location(metadata_location)
393            .metadata(metadata)
394            .identifier(table_ident.clone())
395            .runtime(self.runtime.clone())
396            .build()
397    }
398
399    /// Update a table in the catalog.
400    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
401        let mut root_namespace_state = self.root_namespace_state.lock().await;
402
403        let current_table = self
404            .load_table_from_locked_state(commit.identifier(), &root_namespace_state)
405            .await?;
406
407        // Apply TableCommit to get staged table
408        let staged_table = commit.apply(current_table)?;
409
410        // Write table metadata to the new location
411        let metadata_location =
412            MetadataLocation::from_str(staged_table.metadata_location_result()?)?;
413        staged_table
414            .metadata()
415            .write_to(staged_table.file_io(), &metadata_location)
416            .await?;
417
418        // Flip the pointer to reference the new metadata file.
419        let updated_table = root_namespace_state.commit_table_update(staged_table)?;
420
421        Ok(updated_table)
422    }
423}
424
425#[cfg(test)]
426pub(crate) mod tests {
427    use std::collections::HashSet;
428    use std::hash::Hash;
429    use std::iter::FromIterator;
430    use std::vec;
431
432    use regex::Regex;
433    use tempfile::TempDir;
434
435    use super::*;
436    use crate::io::FileIO;
437    use crate::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
438    use crate::test_utils::test_runtime;
439    use crate::transaction::{ApplyTransactionAction, Transaction};
440
441    fn temp_path() -> String {
442        let temp_dir = TempDir::new().unwrap();
443        temp_dir.path().to_str().unwrap().to_string()
444    }
445
446    pub(crate) async fn new_memory_catalog() -> impl Catalog {
447        let warehouse_location = temp_path();
448        MemoryCatalogBuilder::default()
449            .load(
450                "memory",
451                HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_location)]),
452            )
453            .await
454            .unwrap()
455    }
456
457    async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
458        let _ = catalog
459            .create_namespace(namespace_ident, HashMap::new())
460            .await
461            .unwrap();
462    }
463
464    async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
465        for namespace_ident in namespace_idents {
466            let _ = create_namespace(catalog, namespace_ident).await;
467        }
468    }
469
470    fn to_set<T: Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
471        HashSet::from_iter(vec)
472    }
473
474    fn simple_table_schema() -> Schema {
475        Schema::builder()
476            .with_fields(vec![
477                NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
478            ])
479            .build()
480            .unwrap()
481    }
482
483    async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) -> Table {
484        catalog
485            .create_table(
486                &table_ident.namespace,
487                TableCreation::builder()
488                    .name(table_ident.name().into())
489                    .schema(simple_table_schema())
490                    .build(),
491            )
492            .await
493            .unwrap()
494    }
495
496    async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
497        for table_ident in table_idents {
498            create_table(catalog, table_ident).await;
499        }
500    }
501
502    async fn create_table_with_namespace<C: Catalog>(catalog: &C) -> Table {
503        let namespace_ident = NamespaceIdent::new("abc".into());
504        create_namespace(catalog, &namespace_ident).await;
505
506        let table_ident = TableIdent::new(namespace_ident, "test".to_string());
507        create_table(catalog, &table_ident).await
508    }
509
510    fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
511        assert_eq!(table.identifier(), expected_table_ident);
512
513        let metadata = table.metadata();
514
515        assert_eq!(metadata.current_schema().as_ref(), expected_schema);
516
517        let expected_partition_spec = PartitionSpec::builder((*expected_schema).clone())
518            .with_spec_id(0)
519            .build()
520            .unwrap();
521
522        assert_eq!(
523            metadata
524                .partition_specs_iter()
525                .map(|p| p.as_ref())
526                .collect_vec(),
527            vec![&expected_partition_spec]
528        );
529
530        let expected_sorted_order = SortOrder::builder()
531            .with_order_id(0)
532            .with_fields(vec![])
533            .build(expected_schema)
534            .unwrap();
535
536        assert_eq!(
537            metadata
538                .sort_orders_iter()
539                .map(|s| s.as_ref())
540                .collect_vec(),
541            vec![&expected_sorted_order]
542        );
543
544        assert_eq!(metadata.properties(), &HashMap::new());
545
546        assert!(!table.readonly());
547    }
548
549    const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
550
551    fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
552        let actual = table.metadata_location().unwrap().to_string();
553        let regex = Regex::new(regex_str).unwrap();
554        assert!(
555            regex.is_match(&actual),
556            "Expected metadata location to match regex, but got location: {actual} and regex: {regex}"
557        )
558    }
559
560    #[tokio::test]
561    async fn test_list_namespaces_returns_empty_vector() {
562        let catalog = new_memory_catalog().await;
563
564        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
565    }
566
567    #[tokio::test]
568    async fn test_list_namespaces_returns_single_namespace() {
569        let catalog = new_memory_catalog().await;
570        let namespace_ident = NamespaceIdent::new("abc".into());
571        create_namespace(&catalog, &namespace_ident).await;
572
573        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
574            namespace_ident
575        ]);
576    }
577
578    #[tokio::test]
579    async fn test_list_namespaces_returns_multiple_namespaces() {
580        let catalog = new_memory_catalog().await;
581        let namespace_ident_1 = NamespaceIdent::new("a".into());
582        let namespace_ident_2 = NamespaceIdent::new("b".into());
583        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
584
585        assert_eq!(
586            to_set(catalog.list_namespaces(None).await.unwrap()),
587            to_set(vec![namespace_ident_1, namespace_ident_2])
588        );
589    }
590
591    #[tokio::test]
592    async fn test_list_namespaces_returns_only_top_level_namespaces() {
593        let catalog = new_memory_catalog().await;
594        let namespace_ident_1 = NamespaceIdent::new("a".into());
595        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
596        let namespace_ident_3 = NamespaceIdent::new("b".into());
597        create_namespaces(&catalog, &vec![
598            &namespace_ident_1,
599            &namespace_ident_2,
600            &namespace_ident_3,
601        ])
602        .await;
603
604        assert_eq!(
605            to_set(catalog.list_namespaces(None).await.unwrap()),
606            to_set(vec![namespace_ident_1, namespace_ident_3])
607        );
608    }
609
610    #[tokio::test]
611    async fn test_list_namespaces_returns_no_namespaces_under_parent() {
612        let catalog = new_memory_catalog().await;
613        let namespace_ident_1 = NamespaceIdent::new("a".into());
614        let namespace_ident_2 = NamespaceIdent::new("b".into());
615        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
616
617        assert_eq!(
618            catalog
619                .list_namespaces(Some(&namespace_ident_1))
620                .await
621                .unwrap(),
622            vec![]
623        );
624    }
625
626    #[tokio::test]
627    async fn test_list_namespaces_returns_namespace_under_parent() {
628        let catalog = new_memory_catalog().await;
629        let namespace_ident_1 = NamespaceIdent::new("a".into());
630        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
631        let namespace_ident_3 = NamespaceIdent::new("c".into());
632        create_namespaces(&catalog, &vec![
633            &namespace_ident_1,
634            &namespace_ident_2,
635            &namespace_ident_3,
636        ])
637        .await;
638
639        assert_eq!(
640            to_set(catalog.list_namespaces(None).await.unwrap()),
641            to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
642        );
643
644        assert_eq!(
645            catalog
646                .list_namespaces(Some(&namespace_ident_1))
647                .await
648                .unwrap(),
649            vec![namespace_ident_2]
650        );
651    }
652
653    #[tokio::test]
654    async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
655        let catalog = new_memory_catalog().await;
656        let namespace_ident_1 = NamespaceIdent::new("a".to_string());
657        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
658        let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
659        let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
660        let namespace_ident_5 = NamespaceIdent::new("b".into());
661        create_namespaces(&catalog, &vec![
662            &namespace_ident_1,
663            &namespace_ident_2,
664            &namespace_ident_3,
665            &namespace_ident_4,
666            &namespace_ident_5,
667        ])
668        .await;
669
670        assert_eq!(
671            to_set(
672                catalog
673                    .list_namespaces(Some(&namespace_ident_1))
674                    .await
675                    .unwrap()
676            ),
677            to_set(vec![
678                namespace_ident_2,
679                namespace_ident_3,
680                namespace_ident_4,
681            ])
682        );
683    }
684
685    #[tokio::test]
686    async fn test_namespace_exists_returns_false() {
687        let catalog = new_memory_catalog().await;
688        let namespace_ident = NamespaceIdent::new("a".into());
689        create_namespace(&catalog, &namespace_ident).await;
690
691        assert!(
692            !catalog
693                .namespace_exists(&NamespaceIdent::new("b".into()))
694                .await
695                .unwrap()
696        );
697    }
698
699    #[tokio::test]
700    async fn test_namespace_exists_returns_true() {
701        let catalog = new_memory_catalog().await;
702        let namespace_ident = NamespaceIdent::new("a".into());
703        create_namespace(&catalog, &namespace_ident).await;
704
705        assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
706    }
707
708    #[tokio::test]
709    async fn test_create_namespace_with_empty_properties() {
710        let catalog = new_memory_catalog().await;
711        let namespace_ident = NamespaceIdent::new("a".into());
712
713        assert_eq!(
714            catalog
715                .create_namespace(&namespace_ident, HashMap::new())
716                .await
717                .unwrap(),
718            Namespace::new(namespace_ident.clone())
719        );
720
721        assert_eq!(
722            catalog.get_namespace(&namespace_ident).await.unwrap(),
723            Namespace::with_properties(namespace_ident, HashMap::new())
724        );
725    }
726
727    #[tokio::test]
728    async fn test_create_namespace_with_properties() {
729        let catalog = new_memory_catalog().await;
730        let namespace_ident = NamespaceIdent::new("abc".into());
731
732        let mut properties: HashMap<String, String> = HashMap::new();
733        properties.insert("k".into(), "v".into());
734
735        assert_eq!(
736            catalog
737                .create_namespace(&namespace_ident, properties.clone())
738                .await
739                .unwrap(),
740            Namespace::with_properties(namespace_ident.clone(), properties.clone())
741        );
742
743        assert_eq!(
744            catalog.get_namespace(&namespace_ident).await.unwrap(),
745            Namespace::with_properties(namespace_ident, properties)
746        );
747    }
748
749    #[tokio::test]
750    async fn test_create_namespace_throws_error_if_namespace_already_exists() {
751        let catalog = new_memory_catalog().await;
752        let namespace_ident = NamespaceIdent::new("a".into());
753        create_namespace(&catalog, &namespace_ident).await;
754
755        assert_eq!(
756            catalog
757                .create_namespace(&namespace_ident, HashMap::new())
758                .await
759                .unwrap_err()
760                .to_string(),
761            format!(
762                "NamespaceAlreadyExists => Cannot create namespace {:?}. Namespace already exists.",
763                &namespace_ident
764            )
765        );
766
767        assert_eq!(
768            catalog.get_namespace(&namespace_ident).await.unwrap(),
769            Namespace::with_properties(namespace_ident, HashMap::new())
770        );
771    }
772
773    #[tokio::test]
774    async fn test_create_nested_namespace() {
775        let catalog = new_memory_catalog().await;
776        let parent_namespace_ident = NamespaceIdent::new("a".into());
777        create_namespace(&catalog, &parent_namespace_ident).await;
778
779        let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
780
781        assert_eq!(
782            catalog
783                .create_namespace(&child_namespace_ident, HashMap::new())
784                .await
785                .unwrap(),
786            Namespace::new(child_namespace_ident.clone())
787        );
788
789        assert_eq!(
790            catalog.get_namespace(&child_namespace_ident).await.unwrap(),
791            Namespace::with_properties(child_namespace_ident, HashMap::new())
792        );
793    }
794
795    #[tokio::test]
796    async fn test_create_deeply_nested_namespace() {
797        let catalog = new_memory_catalog().await;
798        let namespace_ident_a = NamespaceIdent::new("a".into());
799        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
800        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
801
802        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
803
804        assert_eq!(
805            catalog
806                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
807                .await
808                .unwrap(),
809            Namespace::new(namespace_ident_a_b_c.clone())
810        );
811
812        assert_eq!(
813            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
814            Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
815        );
816    }
817
818    #[tokio::test]
819    async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() {
820        let catalog = new_memory_catalog().await;
821
822        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
823
824        assert_eq!(
825            catalog
826                .create_namespace(&nested_namespace_ident, HashMap::new())
827                .await
828                .unwrap_err()
829                .to_string(),
830            format!(
831                "NamespaceNotFound => No such namespace: {:?}",
832                NamespaceIdent::new("a".into())
833            )
834        );
835
836        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
837    }
838
839    #[tokio::test]
840    async fn test_create_deeply_nested_namespace_throws_error_if_intermediate_namespace_doesnt_exist()
841     {
842        let catalog = new_memory_catalog().await;
843
844        let namespace_ident_a = NamespaceIdent::new("a".into());
845        create_namespace(&catalog, &namespace_ident_a).await;
846
847        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
848
849        assert_eq!(
850            catalog
851                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
852                .await
853                .unwrap_err()
854                .to_string(),
855            format!(
856                "NamespaceNotFound => No such namespace: {:?}",
857                NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()
858            )
859        );
860
861        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
862            namespace_ident_a.clone()
863        ]);
864
865        assert_eq!(
866            catalog
867                .list_namespaces(Some(&namespace_ident_a))
868                .await
869                .unwrap(),
870            vec![]
871        );
872    }
873
874    #[tokio::test]
875    async fn test_get_namespace() {
876        let catalog = new_memory_catalog().await;
877        let namespace_ident = NamespaceIdent::new("abc".into());
878
879        let mut properties: HashMap<String, String> = HashMap::new();
880        properties.insert("k".into(), "v".into());
881        let _ = catalog
882            .create_namespace(&namespace_ident, properties.clone())
883            .await
884            .unwrap();
885
886        assert_eq!(
887            catalog.get_namespace(&namespace_ident).await.unwrap(),
888            Namespace::with_properties(namespace_ident, properties)
889        )
890    }
891
892    #[tokio::test]
893    async fn test_get_nested_namespace() {
894        let catalog = new_memory_catalog().await;
895        let namespace_ident_a = NamespaceIdent::new("a".into());
896        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
897        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
898
899        assert_eq!(
900            catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
901            Namespace::with_properties(namespace_ident_a_b, HashMap::new())
902        );
903    }
904
905    #[tokio::test]
906    async fn test_get_deeply_nested_namespace() {
907        let catalog = new_memory_catalog().await;
908        let namespace_ident_a = NamespaceIdent::new("a".into());
909        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
910        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
911        create_namespaces(&catalog, &vec![
912            &namespace_ident_a,
913            &namespace_ident_a_b,
914            &namespace_ident_a_b_c,
915        ])
916        .await;
917
918        assert_eq!(
919            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
920            Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
921        );
922    }
923
924    #[tokio::test]
925    async fn test_get_namespace_throws_error_if_namespace_doesnt_exist() {
926        let catalog = new_memory_catalog().await;
927        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
928
929        let non_existent_namespace_ident = NamespaceIdent::new("b".into());
930        assert_eq!(
931            catalog
932                .get_namespace(&non_existent_namespace_ident)
933                .await
934                .unwrap_err()
935                .to_string(),
936            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
937        )
938    }
939
940    #[tokio::test]
941    async fn test_update_namespace() {
942        let catalog = new_memory_catalog().await;
943        let namespace_ident = NamespaceIdent::new("abc".into());
944        create_namespace(&catalog, &namespace_ident).await;
945
946        let mut new_properties: HashMap<String, String> = HashMap::new();
947        new_properties.insert("k".into(), "v".into());
948
949        catalog
950            .update_namespace(&namespace_ident, new_properties.clone())
951            .await
952            .unwrap();
953
954        assert_eq!(
955            catalog.get_namespace(&namespace_ident).await.unwrap(),
956            Namespace::with_properties(namespace_ident, new_properties)
957        )
958    }
959
960    #[tokio::test]
961    async fn test_update_nested_namespace() {
962        let catalog = new_memory_catalog().await;
963        let namespace_ident_a = NamespaceIdent::new("a".into());
964        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
965        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
966
967        let mut new_properties = HashMap::new();
968        new_properties.insert("k".into(), "v".into());
969
970        catalog
971            .update_namespace(&namespace_ident_a_b, new_properties.clone())
972            .await
973            .unwrap();
974
975        assert_eq!(
976            catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
977            Namespace::with_properties(namespace_ident_a_b, new_properties)
978        );
979    }
980
981    #[tokio::test]
982    async fn test_update_deeply_nested_namespace() {
983        let catalog = new_memory_catalog().await;
984        let namespace_ident_a = NamespaceIdent::new("a".into());
985        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
986        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
987        create_namespaces(&catalog, &vec![
988            &namespace_ident_a,
989            &namespace_ident_a_b,
990            &namespace_ident_a_b_c,
991        ])
992        .await;
993
994        let mut new_properties = HashMap::new();
995        new_properties.insert("k".into(), "v".into());
996
997        catalog
998            .update_namespace(&namespace_ident_a_b_c, new_properties.clone())
999            .await
1000            .unwrap();
1001
1002        assert_eq!(
1003            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
1004            Namespace::with_properties(namespace_ident_a_b_c, new_properties)
1005        );
1006    }
1007
1008    #[tokio::test]
1009    async fn test_update_namespace_throws_error_if_namespace_doesnt_exist() {
1010        let catalog = new_memory_catalog().await;
1011        create_namespace(&catalog, &NamespaceIdent::new("abc".into())).await;
1012
1013        let non_existent_namespace_ident = NamespaceIdent::new("def".into());
1014        assert_eq!(
1015            catalog
1016                .update_namespace(&non_existent_namespace_ident, HashMap::new())
1017                .await
1018                .unwrap_err()
1019                .to_string(),
1020            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1021        )
1022    }
1023
1024    #[tokio::test]
1025    async fn test_drop_namespace() {
1026        let catalog = new_memory_catalog().await;
1027        let namespace_ident = NamespaceIdent::new("abc".into());
1028        create_namespace(&catalog, &namespace_ident).await;
1029
1030        catalog.drop_namespace(&namespace_ident).await.unwrap();
1031
1032        assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
1033    }
1034
1035    #[tokio::test]
1036    async fn test_drop_nested_namespace() {
1037        let catalog = new_memory_catalog().await;
1038        let namespace_ident_a = NamespaceIdent::new("a".into());
1039        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1040        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1041
1042        catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1043
1044        assert!(
1045            !catalog
1046                .namespace_exists(&namespace_ident_a_b)
1047                .await
1048                .unwrap()
1049        );
1050
1051        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1052    }
1053
1054    #[tokio::test]
1055    async fn test_drop_deeply_nested_namespace() {
1056        let catalog = new_memory_catalog().await;
1057        let namespace_ident_a = NamespaceIdent::new("a".into());
1058        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1059        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1060        create_namespaces(&catalog, &vec![
1061            &namespace_ident_a,
1062            &namespace_ident_a_b,
1063            &namespace_ident_a_b_c,
1064        ])
1065        .await;
1066
1067        catalog
1068            .drop_namespace(&namespace_ident_a_b_c)
1069            .await
1070            .unwrap();
1071
1072        assert!(
1073            !catalog
1074                .namespace_exists(&namespace_ident_a_b_c)
1075                .await
1076                .unwrap()
1077        );
1078
1079        assert!(
1080            catalog
1081                .namespace_exists(&namespace_ident_a_b)
1082                .await
1083                .unwrap()
1084        );
1085
1086        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1087    }
1088
1089    #[tokio::test]
1090    async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1091        let catalog = new_memory_catalog().await;
1092
1093        let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1094        assert_eq!(
1095            catalog
1096                .drop_namespace(&non_existent_namespace_ident)
1097                .await
1098                .unwrap_err()
1099                .to_string(),
1100            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1101        )
1102    }
1103
1104    #[tokio::test]
1105    async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1106        let catalog = new_memory_catalog().await;
1107        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1108
1109        let non_existent_namespace_ident =
1110            NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1111        assert_eq!(
1112            catalog
1113                .drop_namespace(&non_existent_namespace_ident)
1114                .await
1115                .unwrap_err()
1116                .to_string(),
1117            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1118        )
1119    }
1120
1121    #[tokio::test]
1122    async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() {
1123        let catalog = new_memory_catalog().await;
1124        let namespace_ident_a = NamespaceIdent::new("a".into());
1125        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1126        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1127
1128        catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1129
1130        assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1131
1132        assert!(
1133            !catalog
1134                .namespace_exists(&namespace_ident_a_b)
1135                .await
1136                .unwrap()
1137        );
1138    }
1139
1140    #[tokio::test]
1141    async fn test_create_table_with_location() {
1142        let tmp_dir = TempDir::new().unwrap();
1143        let catalog = new_memory_catalog().await;
1144        let namespace_ident = NamespaceIdent::new("a".into());
1145        create_namespace(&catalog, &namespace_ident).await;
1146
1147        let table_name = "abc";
1148        let location = tmp_dir.path().to_str().unwrap().to_string();
1149        let table_creation = TableCreation::builder()
1150            .name(table_name.into())
1151            .location(location.clone())
1152            .schema(simple_table_schema())
1153            .build();
1154
1155        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1156
1157        assert_table_eq(
1158            &catalog
1159                .create_table(&namespace_ident, table_creation)
1160                .await
1161                .unwrap(),
1162            &expected_table_ident,
1163            &simple_table_schema(),
1164        );
1165
1166        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1167
1168        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1169
1170        assert!(
1171            table
1172                .metadata_location()
1173                .unwrap()
1174                .to_string()
1175                .starts_with(&location)
1176        )
1177    }
1178
1179    #[tokio::test]
1180    async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1181        let warehouse_location = temp_path();
1182        let catalog = MemoryCatalogBuilder::default()
1183            .load(
1184                "memory",
1185                HashMap::from([(
1186                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1187                    warehouse_location.clone(),
1188                )]),
1189            )
1190            .await
1191            .unwrap();
1192
1193        let namespace_ident = NamespaceIdent::new("a".into());
1194        let mut namespace_properties = HashMap::new();
1195        let namespace_location = temp_path();
1196        namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1197        catalog
1198            .create_namespace(&namespace_ident, namespace_properties)
1199            .await
1200            .unwrap();
1201
1202        let table_name = "tbl1";
1203        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1204        let expected_table_metadata_location_regex =
1205            format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1206
1207        let table = catalog
1208            .create_table(
1209                &namespace_ident,
1210                TableCreation::builder()
1211                    .name(table_name.into())
1212                    .schema(simple_table_schema())
1213                    // no location specified for table
1214                    .build(),
1215            )
1216            .await
1217            .unwrap();
1218        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1219        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1220
1221        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1222        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1223        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1224    }
1225
1226    #[tokio::test]
1227    async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1228     {
1229        let warehouse_location = temp_path();
1230        let catalog = MemoryCatalogBuilder::default()
1231            .load(
1232                "memory",
1233                HashMap::from([(
1234                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1235                    warehouse_location.clone(),
1236                )]),
1237            )
1238            .await
1239            .unwrap();
1240
1241        let namespace_ident = NamespaceIdent::new("a".into());
1242        let mut namespace_properties = HashMap::new();
1243        let namespace_location = temp_path();
1244        namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1245        catalog
1246            .create_namespace(&namespace_ident, namespace_properties)
1247            .await
1248            .unwrap();
1249
1250        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1251        let mut nested_namespace_properties = HashMap::new();
1252        let nested_namespace_location = temp_path();
1253        nested_namespace_properties
1254            .insert(LOCATION.to_string(), nested_namespace_location.to_string());
1255        catalog
1256            .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1257            .await
1258            .unwrap();
1259
1260        let table_name = "tbl1";
1261        let expected_table_ident =
1262            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1263        let expected_table_metadata_location_regex = format!(
1264            "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1265        );
1266
1267        let table = catalog
1268            .create_table(
1269                &nested_namespace_ident,
1270                TableCreation::builder()
1271                    .name(table_name.into())
1272                    .schema(simple_table_schema())
1273                    // no location specified for table
1274                    .build(),
1275            )
1276            .await
1277            .unwrap();
1278        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1279        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1280
1281        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1282        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1283        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1284    }
1285
1286    #[tokio::test]
1287    async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1288     {
1289        let warehouse_location = temp_path();
1290        let catalog = MemoryCatalogBuilder::default()
1291            .load(
1292                "memory",
1293                HashMap::from([(
1294                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1295                    warehouse_location.clone(),
1296                )]),
1297            )
1298            .await
1299            .unwrap();
1300
1301        let namespace_ident = NamespaceIdent::new("a".into());
1302        // note: no location specified in namespace_properties
1303        let namespace_properties = HashMap::new();
1304        catalog
1305            .create_namespace(&namespace_ident, namespace_properties)
1306            .await
1307            .unwrap();
1308
1309        let table_name = "tbl1";
1310        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1311        let expected_table_metadata_location_regex =
1312            format!("^{warehouse_location}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1313
1314        let table = catalog
1315            .create_table(
1316                &namespace_ident,
1317                TableCreation::builder()
1318                    .name(table_name.into())
1319                    .schema(simple_table_schema())
1320                    // no location specified for table
1321                    .build(),
1322            )
1323            .await
1324            .unwrap();
1325        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1326        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1327
1328        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1329        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1330        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1331    }
1332
1333    #[tokio::test]
1334    async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1335     {
1336        let warehouse_location = temp_path();
1337        let catalog = MemoryCatalogBuilder::default()
1338            .load(
1339                "memory",
1340                HashMap::from([(
1341                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1342                    warehouse_location.clone(),
1343                )]),
1344            )
1345            .await
1346            .unwrap();
1347
1348        let namespace_ident = NamespaceIdent::new("a".into());
1349        catalog
1350            // note: no location specified in namespace_properties
1351            .create_namespace(&namespace_ident, HashMap::new())
1352            .await
1353            .unwrap();
1354
1355        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1356        catalog
1357            // note: no location specified in namespace_properties
1358            .create_namespace(&nested_namespace_ident, HashMap::new())
1359            .await
1360            .unwrap();
1361
1362        let table_name = "tbl1";
1363        let expected_table_ident =
1364            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1365        let expected_table_metadata_location_regex = format!(
1366            "^{warehouse_location}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$"
1367        );
1368
1369        let table = catalog
1370            .create_table(
1371                &nested_namespace_ident,
1372                TableCreation::builder()
1373                    .name(table_name.into())
1374                    .schema(simple_table_schema())
1375                    // no location specified for table
1376                    .build(),
1377            )
1378            .await
1379            .unwrap();
1380        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1381        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1382
1383        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1384        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1385        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1386    }
1387
1388    #[tokio::test]
1389    async fn test_create_table_throws_error_if_table_location_and_namespace_location_and_warehouse_location_are_missing()
1390     {
1391        let catalog = MemoryCatalogBuilder::default()
1392            .load("memory", HashMap::from([]))
1393            .await;
1394
1395        assert!(catalog.is_err());
1396        assert_eq!(
1397            catalog.unwrap_err().to_string(),
1398            "DataInvalid => Catalog warehouse is required"
1399        );
1400    }
1401
1402    #[tokio::test]
1403    async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1404        let catalog = new_memory_catalog().await;
1405        let namespace_ident = NamespaceIdent::new("a".into());
1406        create_namespace(&catalog, &namespace_ident).await;
1407        let table_name = "tbl1";
1408        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1409        create_table(&catalog, &table_ident).await;
1410
1411        let tmp_dir = TempDir::new().unwrap();
1412        let location = tmp_dir.path().to_str().unwrap().to_string();
1413
1414        assert_eq!(
1415            catalog
1416                .create_table(
1417                    &namespace_ident,
1418                    TableCreation::builder()
1419                        .name(table_name.into())
1420                        .schema(simple_table_schema())
1421                        .location(location)
1422                        .build()
1423                )
1424                .await
1425                .unwrap_err()
1426                .to_string(),
1427            format!(
1428                "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1429                &table_ident
1430            )
1431        );
1432    }
1433
1434    #[tokio::test]
1435    async fn test_list_tables_returns_empty_vector() {
1436        let catalog = new_memory_catalog().await;
1437        let namespace_ident = NamespaceIdent::new("a".into());
1438        create_namespace(&catalog, &namespace_ident).await;
1439
1440        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1441    }
1442
1443    #[tokio::test]
1444    async fn test_list_tables_returns_a_single_table() {
1445        let catalog = new_memory_catalog().await;
1446        let namespace_ident = NamespaceIdent::new("n1".into());
1447        create_namespace(&catalog, &namespace_ident).await;
1448
1449        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1450        create_table(&catalog, &table_ident).await;
1451
1452        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1453            table_ident
1454        ]);
1455    }
1456
1457    #[tokio::test]
1458    async fn test_list_tables_returns_multiple_tables() {
1459        let catalog = new_memory_catalog().await;
1460        let namespace_ident = NamespaceIdent::new("n1".into());
1461        create_namespace(&catalog, &namespace_ident).await;
1462
1463        let table_ident_1 = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1464        let table_ident_2 = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1465        let _ = create_tables(&catalog, vec![&table_ident_1, &table_ident_2]).await;
1466
1467        assert_eq!(
1468            to_set(catalog.list_tables(&namespace_ident).await.unwrap()),
1469            to_set(vec![table_ident_1, table_ident_2])
1470        );
1471    }
1472
1473    #[tokio::test]
1474    async fn test_list_tables_returns_tables_from_correct_namespace() {
1475        let catalog = new_memory_catalog().await;
1476        let namespace_ident_1 = NamespaceIdent::new("n1".into());
1477        let namespace_ident_2 = NamespaceIdent::new("n2".into());
1478        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1479
1480        let table_ident_1 = TableIdent::new(namespace_ident_1.clone(), "tbl1".into());
1481        let table_ident_2 = TableIdent::new(namespace_ident_1.clone(), "tbl2".into());
1482        let table_ident_3 = TableIdent::new(namespace_ident_2.clone(), "tbl1".into());
1483        let _ = create_tables(&catalog, vec![
1484            &table_ident_1,
1485            &table_ident_2,
1486            &table_ident_3,
1487        ])
1488        .await;
1489
1490        assert_eq!(
1491            to_set(catalog.list_tables(&namespace_ident_1).await.unwrap()),
1492            to_set(vec![table_ident_1, table_ident_2])
1493        );
1494
1495        assert_eq!(
1496            to_set(catalog.list_tables(&namespace_ident_2).await.unwrap()),
1497            to_set(vec![table_ident_3])
1498        );
1499    }
1500
1501    #[tokio::test]
1502    async fn test_list_tables_returns_table_under_nested_namespace() {
1503        let catalog = new_memory_catalog().await;
1504        let namespace_ident_a = NamespaceIdent::new("a".into());
1505        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1506        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1507
1508        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1509        create_table(&catalog, &table_ident).await;
1510
1511        assert_eq!(
1512            catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1513            vec![table_ident]
1514        );
1515    }
1516
1517    #[tokio::test]
1518    async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1519        let catalog = new_memory_catalog().await;
1520
1521        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1522
1523        assert_eq!(
1524            catalog
1525                .list_tables(&non_existent_namespace_ident)
1526                .await
1527                .unwrap_err()
1528                .to_string(),
1529            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1530        );
1531    }
1532
1533    #[tokio::test]
1534    async fn test_drop_table() {
1535        let catalog = new_memory_catalog().await;
1536        let namespace_ident = NamespaceIdent::new("n1".into());
1537        create_namespace(&catalog, &namespace_ident).await;
1538        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1539        create_table(&catalog, &table_ident).await;
1540
1541        catalog.drop_table(&table_ident).await.unwrap();
1542    }
1543
1544    #[tokio::test]
1545    async fn test_drop_table_drops_table_under_nested_namespace() {
1546        let catalog = new_memory_catalog().await;
1547        let namespace_ident_a = NamespaceIdent::new("a".into());
1548        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1549        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1550
1551        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1552        create_table(&catalog, &table_ident).await;
1553
1554        catalog.drop_table(&table_ident).await.unwrap();
1555
1556        assert_eq!(
1557            catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1558            vec![]
1559        );
1560    }
1561
1562    #[tokio::test]
1563    async fn test_drop_table_throws_error_if_namespace_doesnt_exist() {
1564        let catalog = new_memory_catalog().await;
1565
1566        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1567        let non_existent_table_ident =
1568            TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1569
1570        assert_eq!(
1571            catalog
1572                .drop_table(&non_existent_table_ident)
1573                .await
1574                .unwrap_err()
1575                .to_string(),
1576            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1577        );
1578    }
1579
1580    #[tokio::test]
1581    async fn test_drop_table_throws_error_if_table_doesnt_exist() {
1582        let catalog = new_memory_catalog().await;
1583        let namespace_ident = NamespaceIdent::new("n1".into());
1584        create_namespace(&catalog, &namespace_ident).await;
1585
1586        let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1587
1588        assert_eq!(
1589            catalog
1590                .drop_table(&non_existent_table_ident)
1591                .await
1592                .unwrap_err()
1593                .to_string(),
1594            format!("TableNotFound => No such table: {non_existent_table_ident:?}"),
1595        );
1596    }
1597
1598    #[tokio::test]
1599    async fn test_table_exists_returns_true() {
1600        let catalog = new_memory_catalog().await;
1601        let namespace_ident = NamespaceIdent::new("n1".into());
1602        create_namespace(&catalog, &namespace_ident).await;
1603        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1604        create_table(&catalog, &table_ident).await;
1605
1606        assert!(catalog.table_exists(&table_ident).await.unwrap());
1607    }
1608
1609    #[tokio::test]
1610    async fn test_table_exists_returns_false() {
1611        let catalog = new_memory_catalog().await;
1612        let namespace_ident = NamespaceIdent::new("n1".into());
1613        create_namespace(&catalog, &namespace_ident).await;
1614        let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1615
1616        assert!(
1617            !catalog
1618                .table_exists(&non_existent_table_ident)
1619                .await
1620                .unwrap()
1621        );
1622    }
1623
1624    #[tokio::test]
1625    async fn test_table_exists_under_nested_namespace() {
1626        let catalog = new_memory_catalog().await;
1627        let namespace_ident_a = NamespaceIdent::new("a".into());
1628        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1629        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1630
1631        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1632        create_table(&catalog, &table_ident).await;
1633
1634        assert!(catalog.table_exists(&table_ident).await.unwrap());
1635
1636        let non_existent_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl2".into());
1637        assert!(
1638            !catalog
1639                .table_exists(&non_existent_table_ident)
1640                .await
1641                .unwrap()
1642        );
1643    }
1644
1645    #[tokio::test]
1646    async fn test_table_exists_throws_error_if_namespace_doesnt_exist() {
1647        let catalog = new_memory_catalog().await;
1648
1649        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1650        let non_existent_table_ident =
1651            TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1652
1653        assert_eq!(
1654            catalog
1655                .table_exists(&non_existent_table_ident)
1656                .await
1657                .unwrap_err()
1658                .to_string(),
1659            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1660        );
1661    }
1662
1663    #[tokio::test]
1664    async fn test_rename_table_in_same_namespace() {
1665        let catalog = new_memory_catalog().await;
1666        let namespace_ident = NamespaceIdent::new("n1".into());
1667        create_namespace(&catalog, &namespace_ident).await;
1668        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1669        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1670        create_table(&catalog, &src_table_ident).await;
1671
1672        catalog
1673            .rename_table(&src_table_ident, &dst_table_ident)
1674            .await
1675            .unwrap();
1676
1677        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1678            dst_table_ident
1679        ],);
1680    }
1681
1682    #[tokio::test]
1683    async fn test_rename_table_across_namespaces() {
1684        let catalog = new_memory_catalog().await;
1685        let src_namespace_ident = NamespaceIdent::new("a".into());
1686        let dst_namespace_ident = NamespaceIdent::new("b".into());
1687        create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
1688        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1689        let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
1690        create_table(&catalog, &src_table_ident).await;
1691
1692        catalog
1693            .rename_table(&src_table_ident, &dst_table_ident)
1694            .await
1695            .unwrap();
1696
1697        assert_eq!(
1698            catalog.list_tables(&src_namespace_ident).await.unwrap(),
1699            vec![],
1700        );
1701
1702        assert_eq!(
1703            catalog.list_tables(&dst_namespace_ident).await.unwrap(),
1704            vec![dst_table_ident],
1705        );
1706    }
1707
1708    #[tokio::test]
1709    async fn test_rename_table_src_table_is_same_as_dst_table() {
1710        let catalog = new_memory_catalog().await;
1711        let namespace_ident = NamespaceIdent::new("n1".into());
1712        create_namespace(&catalog, &namespace_ident).await;
1713        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1714        create_table(&catalog, &table_ident).await;
1715
1716        catalog
1717            .rename_table(&table_ident, &table_ident)
1718            .await
1719            .unwrap();
1720
1721        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1722            table_ident
1723        ],);
1724    }
1725
1726    #[tokio::test]
1727    async fn test_rename_table_across_nested_namespaces() {
1728        let catalog = new_memory_catalog().await;
1729        let namespace_ident_a = NamespaceIdent::new("a".into());
1730        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1731        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1732        create_namespaces(&catalog, &vec![
1733            &namespace_ident_a,
1734            &namespace_ident_a_b,
1735            &namespace_ident_a_b_c,
1736        ])
1737        .await;
1738
1739        let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
1740        create_tables(&catalog, vec![&src_table_ident]).await;
1741
1742        let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1743        catalog
1744            .rename_table(&src_table_ident, &dst_table_ident)
1745            .await
1746            .unwrap();
1747
1748        assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
1749
1750        assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
1751    }
1752
1753    #[tokio::test]
1754    async fn test_rename_table_throws_error_if_src_namespace_doesnt_exist() {
1755        let catalog = new_memory_catalog().await;
1756
1757        let non_existent_src_namespace_ident = NamespaceIdent::new("n1".into());
1758        let src_table_ident =
1759            TableIdent::new(non_existent_src_namespace_ident.clone(), "tbl1".into());
1760
1761        let dst_namespace_ident = NamespaceIdent::new("n2".into());
1762        create_namespace(&catalog, &dst_namespace_ident).await;
1763        let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl1".into());
1764
1765        assert_eq!(
1766            catalog
1767                .rename_table(&src_table_ident, &dst_table_ident)
1768                .await
1769                .unwrap_err()
1770                .to_string(),
1771            format!("NamespaceNotFound => No such namespace: {non_existent_src_namespace_ident:?}"),
1772        );
1773    }
1774
1775    #[tokio::test]
1776    async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
1777        let catalog = new_memory_catalog().await;
1778        let src_namespace_ident = NamespaceIdent::new("n1".into());
1779        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1780        create_namespace(&catalog, &src_namespace_ident).await;
1781        create_table(&catalog, &src_table_ident).await;
1782
1783        let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
1784        let dst_table_ident =
1785            TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
1786        assert_eq!(
1787            catalog
1788                .rename_table(&src_table_ident, &dst_table_ident)
1789                .await
1790                .unwrap_err()
1791                .to_string(),
1792            format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"),
1793        );
1794    }
1795
1796    #[tokio::test]
1797    async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
1798        let catalog = new_memory_catalog().await;
1799        let namespace_ident = NamespaceIdent::new("n1".into());
1800        create_namespace(&catalog, &namespace_ident).await;
1801        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1802        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1803
1804        assert_eq!(
1805            catalog
1806                .rename_table(&src_table_ident, &dst_table_ident)
1807                .await
1808                .unwrap_err()
1809                .to_string(),
1810            format!("TableNotFound => No such table: {src_table_ident:?}"),
1811        );
1812    }
1813
1814    #[tokio::test]
1815    async fn test_rename_table_throws_error_if_dst_table_already_exists() {
1816        let catalog = new_memory_catalog().await;
1817        let namespace_ident = NamespaceIdent::new("n1".into());
1818        create_namespace(&catalog, &namespace_ident).await;
1819        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1820        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1821        create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
1822
1823        assert_eq!(
1824            catalog
1825                .rename_table(&src_table_ident, &dst_table_ident)
1826                .await
1827                .unwrap_err()
1828                .to_string(),
1829            format!(
1830                "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1831                &dst_table_ident
1832            ),
1833        );
1834    }
1835
1836    #[tokio::test]
1837    async fn test_register_table() {
1838        // Create a catalog and namespace
1839        let catalog = new_memory_catalog().await;
1840        let namespace_ident = NamespaceIdent::new("test_namespace".into());
1841        create_namespace(&catalog, &namespace_ident).await;
1842
1843        // Create a table to get a valid metadata file
1844        let source_table_ident = TableIdent::new(namespace_ident.clone(), "source_table".into());
1845        create_table(&catalog, &source_table_ident).await;
1846
1847        // Get the metadata location from the source table
1848        let source_table = catalog.load_table(&source_table_ident).await.unwrap();
1849        let metadata_location = source_table.metadata_location().unwrap().to_string();
1850
1851        // Register a new table using the same metadata location
1852        let register_table_ident =
1853            TableIdent::new(namespace_ident.clone(), "register_table".into());
1854        let registered_table = catalog
1855            .register_table(&register_table_ident, metadata_location.clone())
1856            .await
1857            .unwrap();
1858
1859        // Verify the registered table has the correct identifier
1860        assert_eq!(registered_table.identifier(), &register_table_ident);
1861
1862        // Verify the registered table has the correct metadata location
1863        assert_eq!(
1864            registered_table.metadata_location().unwrap().to_string(),
1865            metadata_location
1866        );
1867
1868        // Verify the table exists in the catalog
1869        assert!(catalog.table_exists(&register_table_ident).await.unwrap());
1870
1871        // Verify we can load the registered table
1872        let loaded_table = catalog.load_table(&register_table_ident).await.unwrap();
1873        assert_eq!(loaded_table.identifier(), &register_table_ident);
1874        assert_eq!(
1875            loaded_table.metadata_location().unwrap().to_string(),
1876            metadata_location
1877        );
1878    }
1879
1880    #[tokio::test]
1881    async fn test_update_table() {
1882        let catalog = new_memory_catalog().await;
1883
1884        let table = create_table_with_namespace(&catalog).await;
1885
1886        // Assert the table doesn't contain the update yet
1887        assert!(!table.metadata().properties().contains_key("key"));
1888
1889        // `last_updated_ms` is millisecond-resolution `chrono::Utc::now()`.
1890        // Without this sleep, create and commit can land in the same
1891        // millisecond and the `<` assertion below flakes.
1892        tokio::time::sleep(std::time::Duration::from_millis(2)).await;
1893
1894        // Update table metadata
1895        let tx = Transaction::new(&table);
1896        let updated_table = tx
1897            .update_table_properties()
1898            .set("key".to_string(), "value".to_string())
1899            .apply(tx)
1900            .unwrap()
1901            .commit(&catalog)
1902            .await
1903            .unwrap();
1904
1905        assert_eq!(
1906            updated_table.metadata().properties().get("key").unwrap(),
1907            "value"
1908        );
1909
1910        assert_eq!(table.identifier(), updated_table.identifier());
1911        assert_eq!(table.metadata().uuid(), updated_table.metadata().uuid());
1912        assert_ne!(table.metadata_location(), updated_table.metadata_location());
1913
1914        assert!(
1915            table.metadata().metadata_log().len() < updated_table.metadata().metadata_log().len()
1916        );
1917    }
1918
1919    #[tokio::test]
1920    async fn test_update_table_fails_if_table_doesnt_exist() {
1921        let catalog = new_memory_catalog().await;
1922
1923        let namespace_ident = NamespaceIdent::new("a".into());
1924        create_namespace(&catalog, &namespace_ident).await;
1925
1926        // This table is not known to the catalog.
1927        let table_ident = TableIdent::new(namespace_ident, "test".to_string());
1928        let table = build_table(table_ident);
1929
1930        let tx = Transaction::new(&table);
1931        let err = tx
1932            .update_table_properties()
1933            .set("key".to_string(), "value".to_string())
1934            .apply(tx)
1935            .unwrap()
1936            .commit(&catalog)
1937            .await
1938            .unwrap_err();
1939        assert_eq!(err.kind(), ErrorKind::TableNotFound);
1940    }
1941
1942    fn build_table(ident: TableIdent) -> Table {
1943        let file_io = FileIO::new_with_fs();
1944
1945        let temp_dir = TempDir::new().unwrap();
1946        let location = temp_dir.path().to_str().unwrap().to_string();
1947
1948        let table_creation = TableCreation::builder()
1949            .name(ident.name().to_string())
1950            .schema(simple_table_schema())
1951            .location(location)
1952            .build();
1953        let metadata = TableMetadataBuilder::from_table_creation(table_creation)
1954            .unwrap()
1955            .build()
1956            .unwrap()
1957            .metadata;
1958
1959        Table::builder()
1960            .identifier(ident)
1961            .metadata(metadata)
1962            .file_io(file_io)
1963            .runtime(test_runtime())
1964            .build()
1965            .unwrap()
1966    }
1967}