iceberg/catalog/memory/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains memory catalog implementation.
19
20use std::collections::HashMap;
21use std::str::FromStr;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use futures::lock::{Mutex, MutexGuard};
26use itertools::Itertools;
27
28use super::namespace_state::NamespaceState;
29use crate::io::{FileIO, FileIOBuilder, MemoryStorageFactory, StorageFactory};
30use crate::runtime::Runtime;
31use crate::spec::{TableMetadata, TableMetadataBuilder};
32use crate::table::Table;
33use crate::{
34    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
35    TableCommit, TableCreation, TableIdent,
36};
37
38/// Memory catalog warehouse location
39pub const MEMORY_CATALOG_WAREHOUSE: &str = "warehouse";
40
41/// namespace `location` property
42const LOCATION: &str = "location";
43
44/// Builder for [`MemoryCatalog`].
45#[derive(Debug)]
46pub struct MemoryCatalogBuilder {
47    config: MemoryCatalogConfig,
48    storage_factory: Option<Arc<dyn StorageFactory>>,
49    runtime: Option<Runtime>,
50}
51
52impl Default for MemoryCatalogBuilder {
53    fn default() -> Self {
54        Self {
55            config: MemoryCatalogConfig {
56                name: None,
57                warehouse: "".to_string(),
58                props: HashMap::new(),
59            },
60            storage_factory: None,
61            runtime: None,
62        }
63    }
64}
65
66impl CatalogBuilder for MemoryCatalogBuilder {
67    type C = MemoryCatalog;
68
69    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
70        self.storage_factory = Some(storage_factory);
71        self
72    }
73
74    fn with_runtime(mut self, runtime: Runtime) -> Self {
75        self.runtime = Some(runtime);
76        self
77    }
78
79    fn load(
80        mut self,
81        name: impl Into<String>,
82        props: HashMap<String, String>,
83    ) -> impl Future<Output = Result<Self::C>> + Send {
84        self.config.name = Some(name.into());
85
86        if props.contains_key(MEMORY_CATALOG_WAREHOUSE) {
87            self.config.warehouse = props
88                .get(MEMORY_CATALOG_WAREHOUSE)
89                .cloned()
90                .unwrap_or_default()
91        }
92
93        // Collect other remaining properties
94        self.config.props = props
95            .into_iter()
96            .filter(|(k, _)| k != MEMORY_CATALOG_WAREHOUSE)
97            .collect();
98
99        let result = {
100            if self.config.name.is_none() {
101                Err(Error::new(
102                    ErrorKind::DataInvalid,
103                    "Catalog name is required",
104                ))
105            } else if self.config.warehouse.is_empty() {
106                Err(Error::new(
107                    ErrorKind::DataInvalid,
108                    "Catalog warehouse is required",
109                ))
110            } else {
111                let runtime = self.runtime.unwrap_or_else(Runtime::current);
112                MemoryCatalog::new(self.config, self.storage_factory, runtime)
113            }
114        };
115
116        std::future::ready(result)
117    }
118}
119
120#[derive(Clone, Debug)]
121pub(crate) struct MemoryCatalogConfig {
122    name: Option<String>,
123    warehouse: String,
124    props: HashMap<String, String>,
125}
126
127/// Memory catalog implementation.
128#[derive(Debug)]
129pub struct MemoryCatalog {
130    root_namespace_state: Mutex<NamespaceState>,
131    file_io: FileIO,
132    warehouse_location: String,
133    runtime: Runtime,
134}
135
136impl MemoryCatalog {
137    /// Creates a memory catalog.
138    fn new(
139        config: MemoryCatalogConfig,
140        storage_factory: Option<Arc<dyn StorageFactory>>,
141        runtime: Runtime,
142    ) -> Result<Self> {
143        // Use provided factory or default to MemoryStorageFactory
144        let factory = storage_factory.unwrap_or_else(|| Arc::new(MemoryStorageFactory));
145
146        Ok(Self {
147            root_namespace_state: Mutex::new(NamespaceState::default()),
148            file_io: FileIOBuilder::new(factory).with_props(config.props).build(),
149            warehouse_location: config.warehouse,
150            runtime,
151        })
152    }
153
154    /// Loads a table from the locked namespace state.
155    async fn load_table_from_locked_state(
156        &self,
157        table_ident: &TableIdent,
158        root_namespace_state: &MutexGuard<'_, NamespaceState>,
159    ) -> Result<Table> {
160        let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?;
161        let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
162
163        Table::builder()
164            .identifier(table_ident.clone())
165            .metadata(metadata)
166            .metadata_location(metadata_location.to_string())
167            .file_io(self.file_io.clone())
168            .runtime(self.runtime.clone())
169            .build()
170    }
171}
172
173#[async_trait]
174impl Catalog for MemoryCatalog {
175    /// List namespaces inside the catalog.
176    async fn list_namespaces(
177        &self,
178        maybe_parent: Option<&NamespaceIdent>,
179    ) -> Result<Vec<NamespaceIdent>> {
180        let root_namespace_state = self.root_namespace_state.lock().await;
181
182        match maybe_parent {
183            None => {
184                let namespaces = root_namespace_state
185                    .list_top_level_namespaces()
186                    .into_iter()
187                    .map(|str| NamespaceIdent::new(str.to_string()))
188                    .collect_vec();
189
190                Ok(namespaces)
191            }
192            Some(parent_namespace_ident) => {
193                let namespaces = root_namespace_state
194                    .list_namespaces_under(parent_namespace_ident)?
195                    .into_iter()
196                    .map(|name| {
197                        let mut names = parent_namespace_ident.iter().cloned().collect::<Vec<_>>();
198                        names.push(name.to_string());
199                        NamespaceIdent::from_vec(names)
200                    })
201                    .collect::<Result<Vec<_>>>()?;
202
203                Ok(namespaces)
204            }
205        }
206    }
207
208    /// Create a new namespace inside the catalog.
209    async fn create_namespace(
210        &self,
211        namespace_ident: &NamespaceIdent,
212        properties: HashMap<String, String>,
213    ) -> Result<Namespace> {
214        let mut root_namespace_state = self.root_namespace_state.lock().await;
215
216        root_namespace_state.insert_new_namespace(namespace_ident, properties.clone())?;
217        let namespace = Namespace::with_properties(namespace_ident.clone(), properties);
218
219        Ok(namespace)
220    }
221
222    /// Get a namespace information from the catalog.
223    async fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<Namespace> {
224        let root_namespace_state = self.root_namespace_state.lock().await;
225
226        let namespace = Namespace::with_properties(
227            namespace_ident.clone(),
228            root_namespace_state
229                .get_properties(namespace_ident)?
230                .clone(),
231        );
232
233        Ok(namespace)
234    }
235
236    /// Check if namespace exists in catalog.
237    async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> Result<bool> {
238        let guarded_namespaces = self.root_namespace_state.lock().await;
239
240        Ok(guarded_namespaces.namespace_exists(namespace_ident))
241    }
242
243    /// Update a namespace inside the catalog.
244    ///
245    /// # Behavior
246    ///
247    /// The properties must be the full set of namespace.
248    async fn update_namespace(
249        &self,
250        namespace_ident: &NamespaceIdent,
251        properties: HashMap<String, String>,
252    ) -> Result<()> {
253        let mut root_namespace_state = self.root_namespace_state.lock().await;
254
255        root_namespace_state.replace_properties(namespace_ident, properties)
256    }
257
258    /// Drop a namespace from the catalog.
259    async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<()> {
260        let mut root_namespace_state = self.root_namespace_state.lock().await;
261
262        root_namespace_state.remove_existing_namespace(namespace_ident)
263    }
264
265    /// List tables from namespace.
266    async fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result<Vec<TableIdent>> {
267        let root_namespace_state = self.root_namespace_state.lock().await;
268
269        let table_names = root_namespace_state.list_tables(namespace_ident)?;
270        let table_idents = table_names
271            .into_iter()
272            .map(|table_name| TableIdent::new(namespace_ident.clone(), table_name.clone()))
273            .collect_vec();
274
275        Ok(table_idents)
276    }
277
278    /// Create a new table inside the namespace.
279    async fn create_table(
280        &self,
281        namespace_ident: &NamespaceIdent,
282        table_creation: TableCreation,
283    ) -> Result<Table> {
284        let mut root_namespace_state = self.root_namespace_state.lock().await;
285
286        let table_name = table_creation.name.clone();
287        let table_ident = TableIdent::new(namespace_ident.clone(), table_name);
288
289        let (table_creation, location) = match table_creation.location.clone() {
290            Some(location) => (table_creation, location),
291            None => {
292                let namespace_properties = root_namespace_state.get_properties(namespace_ident)?;
293                let location_prefix = match namespace_properties.get(LOCATION) {
294                    Some(namespace_location) => namespace_location.clone(),
295                    None => format!("{}/{}", self.warehouse_location, namespace_ident.join("/")),
296                };
297
298                let location = format!("{}/{}", location_prefix, table_ident.name());
299
300                let new_table_creation = TableCreation {
301                    location: Some(location.clone()),
302                    ..table_creation
303                };
304
305                (new_table_creation, location)
306            }
307        };
308
309        let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
310            .build()?
311            .metadata;
312        let metadata_location = MetadataLocation::new_with_metadata(location, &metadata);
313
314        metadata.write_to(&self.file_io, &metadata_location).await?;
315
316        root_namespace_state.insert_new_table(&table_ident, metadata_location.to_string())?;
317
318        Table::builder()
319            .file_io(self.file_io.clone())
320            .metadata_location(metadata_location.to_string())
321            .metadata(metadata)
322            .identifier(table_ident)
323            .runtime(self.runtime.clone())
324            .build()
325    }
326
327    /// Load table from the catalog.
328    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
329        let root_namespace_state = self.root_namespace_state.lock().await;
330
331        self.load_table_from_locked_state(table_ident, &root_namespace_state)
332            .await
333    }
334
335    /// Drop a table from the catalog.
336    async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> {
337        let mut root_namespace_state = self.root_namespace_state.lock().await;
338
339        root_namespace_state.remove_existing_table(table_ident)?;
340        Ok(())
341    }
342
343    async fn purge_table(&self, table_ident: &TableIdent) -> Result<()> {
344        let table_info = self.load_table(table_ident).await?;
345        self.drop_table(table_ident).await?;
346        crate::catalog::utils::drop_table_data(&table_info).await
347    }
348
349    /// Check if a table exists in the catalog.
350    async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
351        let root_namespace_state = self.root_namespace_state.lock().await;
352
353        root_namespace_state.table_exists(table_ident)
354    }
355
356    /// Rename a table in the catalog.
357    async fn rename_table(
358        &self,
359        src_table_ident: &TableIdent,
360        dst_table_ident: &TableIdent,
361    ) -> Result<()> {
362        let mut root_namespace_state = self.root_namespace_state.lock().await;
363
364        let mut new_root_namespace_state = root_namespace_state.clone();
365        let metadata_location = new_root_namespace_state
366            .get_existing_table_location(src_table_ident)?
367            .clone();
368        new_root_namespace_state.remove_existing_table(src_table_ident)?;
369        new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?;
370        *root_namespace_state = new_root_namespace_state;
371
372        Ok(())
373    }
374
375    async fn register_table(
376        &self,
377        table_ident: &TableIdent,
378        metadata_location: String,
379    ) -> Result<Table> {
380        let mut root_namespace_state = self.root_namespace_state.lock().await;
381        root_namespace_state.insert_new_table(&table_ident.clone(), metadata_location.clone())?;
382
383        let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
384
385        Table::builder()
386            .file_io(self.file_io.clone())
387            .metadata_location(metadata_location)
388            .metadata(metadata)
389            .identifier(table_ident.clone())
390            .runtime(self.runtime.clone())
391            .build()
392    }
393
394    /// Update a table in the catalog.
395    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
396        let mut root_namespace_state = self.root_namespace_state.lock().await;
397
398        let current_table = self
399            .load_table_from_locked_state(commit.identifier(), &root_namespace_state)
400            .await?;
401
402        // Apply TableCommit to get staged table
403        let staged_table = commit.apply(current_table)?;
404
405        // Write table metadata to the new location
406        let metadata_location =
407            MetadataLocation::from_str(staged_table.metadata_location_result()?)?;
408        staged_table
409            .metadata()
410            .write_to(staged_table.file_io(), &metadata_location)
411            .await?;
412
413        // Flip the pointer to reference the new metadata file.
414        let updated_table = root_namespace_state.commit_table_update(staged_table)?;
415
416        Ok(updated_table)
417    }
418}
419
420#[cfg(test)]
421pub(crate) mod tests {
422    use std::collections::HashSet;
423    use std::hash::Hash;
424    use std::iter::FromIterator;
425    use std::vec;
426
427    use regex::Regex;
428    use tempfile::TempDir;
429
430    use super::*;
431    use crate::io::FileIO;
432    use crate::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
433    use crate::test_utils::test_runtime;
434    use crate::transaction::{ApplyTransactionAction, Transaction};
435
436    fn temp_path() -> String {
437        let temp_dir = TempDir::new().unwrap();
438        temp_dir.path().to_str().unwrap().to_string()
439    }
440
441    pub(crate) async fn new_memory_catalog() -> impl Catalog {
442        let warehouse_location = temp_path();
443        MemoryCatalogBuilder::default()
444            .load(
445                "memory",
446                HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_location)]),
447            )
448            .await
449            .unwrap()
450    }
451
452    async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
453        let _ = catalog
454            .create_namespace(namespace_ident, HashMap::new())
455            .await
456            .unwrap();
457    }
458
459    async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
460        for namespace_ident in namespace_idents {
461            let _ = create_namespace(catalog, namespace_ident).await;
462        }
463    }
464
465    fn to_set<T: Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
466        HashSet::from_iter(vec)
467    }
468
469    fn simple_table_schema() -> Schema {
470        Schema::builder()
471            .with_fields(vec![
472                NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
473            ])
474            .build()
475            .unwrap()
476    }
477
478    async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) -> Table {
479        catalog
480            .create_table(
481                &table_ident.namespace,
482                TableCreation::builder()
483                    .name(table_ident.name().into())
484                    .schema(simple_table_schema())
485                    .build(),
486            )
487            .await
488            .unwrap()
489    }
490
491    async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
492        for table_ident in table_idents {
493            create_table(catalog, table_ident).await;
494        }
495    }
496
497    async fn create_table_with_namespace<C: Catalog>(catalog: &C) -> Table {
498        let namespace_ident = NamespaceIdent::new("abc".into());
499        create_namespace(catalog, &namespace_ident).await;
500
501        let table_ident = TableIdent::new(namespace_ident, "test".to_string());
502        create_table(catalog, &table_ident).await
503    }
504
505    fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
506        assert_eq!(table.identifier(), expected_table_ident);
507
508        let metadata = table.metadata();
509
510        assert_eq!(metadata.current_schema().as_ref(), expected_schema);
511
512        let expected_partition_spec = PartitionSpec::builder((*expected_schema).clone())
513            .with_spec_id(0)
514            .build()
515            .unwrap();
516
517        assert_eq!(
518            metadata
519                .partition_specs_iter()
520                .map(|p| p.as_ref())
521                .collect_vec(),
522            vec![&expected_partition_spec]
523        );
524
525        let expected_sorted_order = SortOrder::builder()
526            .with_order_id(0)
527            .with_fields(vec![])
528            .build(expected_schema)
529            .unwrap();
530
531        assert_eq!(
532            metadata
533                .sort_orders_iter()
534                .map(|s| s.as_ref())
535                .collect_vec(),
536            vec![&expected_sorted_order]
537        );
538
539        assert_eq!(metadata.properties(), &HashMap::new());
540
541        assert!(!table.readonly());
542    }
543
544    const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
545
546    fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
547        let actual = table.metadata_location().unwrap().to_string();
548        let regex = Regex::new(regex_str).unwrap();
549        assert!(
550            regex.is_match(&actual),
551            "Expected metadata location to match regex, but got location: {actual} and regex: {regex}"
552        )
553    }
554
555    #[tokio::test]
556    async fn test_list_namespaces_returns_empty_vector() {
557        let catalog = new_memory_catalog().await;
558
559        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
560    }
561
562    #[tokio::test]
563    async fn test_list_namespaces_returns_single_namespace() {
564        let catalog = new_memory_catalog().await;
565        let namespace_ident = NamespaceIdent::new("abc".into());
566        create_namespace(&catalog, &namespace_ident).await;
567
568        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
569            namespace_ident
570        ]);
571    }
572
573    #[tokio::test]
574    async fn test_list_namespaces_returns_multiple_namespaces() {
575        let catalog = new_memory_catalog().await;
576        let namespace_ident_1 = NamespaceIdent::new("a".into());
577        let namespace_ident_2 = NamespaceIdent::new("b".into());
578        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
579
580        assert_eq!(
581            to_set(catalog.list_namespaces(None).await.unwrap()),
582            to_set(vec![namespace_ident_1, namespace_ident_2])
583        );
584    }
585
586    #[tokio::test]
587    async fn test_list_namespaces_returns_only_top_level_namespaces() {
588        let catalog = new_memory_catalog().await;
589        let namespace_ident_1 = NamespaceIdent::new("a".into());
590        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
591        let namespace_ident_3 = NamespaceIdent::new("b".into());
592        create_namespaces(&catalog, &vec![
593            &namespace_ident_1,
594            &namespace_ident_2,
595            &namespace_ident_3,
596        ])
597        .await;
598
599        assert_eq!(
600            to_set(catalog.list_namespaces(None).await.unwrap()),
601            to_set(vec![namespace_ident_1, namespace_ident_3])
602        );
603    }
604
605    #[tokio::test]
606    async fn test_list_namespaces_returns_no_namespaces_under_parent() {
607        let catalog = new_memory_catalog().await;
608        let namespace_ident_1 = NamespaceIdent::new("a".into());
609        let namespace_ident_2 = NamespaceIdent::new("b".into());
610        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
611
612        assert_eq!(
613            catalog
614                .list_namespaces(Some(&namespace_ident_1))
615                .await
616                .unwrap(),
617            vec![]
618        );
619    }
620
621    #[tokio::test]
622    async fn test_list_namespaces_returns_namespace_under_parent() {
623        let catalog = new_memory_catalog().await;
624        let namespace_ident_1 = NamespaceIdent::new("a".into());
625        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
626        let namespace_ident_3 = NamespaceIdent::new("c".into());
627        create_namespaces(&catalog, &vec![
628            &namespace_ident_1,
629            &namespace_ident_2,
630            &namespace_ident_3,
631        ])
632        .await;
633
634        assert_eq!(
635            to_set(catalog.list_namespaces(None).await.unwrap()),
636            to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
637        );
638
639        assert_eq!(
640            catalog
641                .list_namespaces(Some(&namespace_ident_1))
642                .await
643                .unwrap(),
644            vec![namespace_ident_2]
645        );
646    }
647
648    #[tokio::test]
649    async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
650        let catalog = new_memory_catalog().await;
651        let namespace_ident_1 = NamespaceIdent::new("a".to_string());
652        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
653        let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
654        let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
655        let namespace_ident_5 = NamespaceIdent::new("b".into());
656        create_namespaces(&catalog, &vec![
657            &namespace_ident_1,
658            &namespace_ident_2,
659            &namespace_ident_3,
660            &namespace_ident_4,
661            &namespace_ident_5,
662        ])
663        .await;
664
665        assert_eq!(
666            to_set(
667                catalog
668                    .list_namespaces(Some(&namespace_ident_1))
669                    .await
670                    .unwrap()
671            ),
672            to_set(vec![
673                namespace_ident_2,
674                namespace_ident_3,
675                namespace_ident_4,
676            ])
677        );
678    }
679
680    #[tokio::test]
681    async fn test_namespace_exists_returns_false() {
682        let catalog = new_memory_catalog().await;
683        let namespace_ident = NamespaceIdent::new("a".into());
684        create_namespace(&catalog, &namespace_ident).await;
685
686        assert!(
687            !catalog
688                .namespace_exists(&NamespaceIdent::new("b".into()))
689                .await
690                .unwrap()
691        );
692    }
693
694    #[tokio::test]
695    async fn test_namespace_exists_returns_true() {
696        let catalog = new_memory_catalog().await;
697        let namespace_ident = NamespaceIdent::new("a".into());
698        create_namespace(&catalog, &namespace_ident).await;
699
700        assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
701    }
702
703    #[tokio::test]
704    async fn test_create_namespace_with_empty_properties() {
705        let catalog = new_memory_catalog().await;
706        let namespace_ident = NamespaceIdent::new("a".into());
707
708        assert_eq!(
709            catalog
710                .create_namespace(&namespace_ident, HashMap::new())
711                .await
712                .unwrap(),
713            Namespace::new(namespace_ident.clone())
714        );
715
716        assert_eq!(
717            catalog.get_namespace(&namespace_ident).await.unwrap(),
718            Namespace::with_properties(namespace_ident, HashMap::new())
719        );
720    }
721
722    #[tokio::test]
723    async fn test_create_namespace_with_properties() {
724        let catalog = new_memory_catalog().await;
725        let namespace_ident = NamespaceIdent::new("abc".into());
726
727        let mut properties: HashMap<String, String> = HashMap::new();
728        properties.insert("k".into(), "v".into());
729
730        assert_eq!(
731            catalog
732                .create_namespace(&namespace_ident, properties.clone())
733                .await
734                .unwrap(),
735            Namespace::with_properties(namespace_ident.clone(), properties.clone())
736        );
737
738        assert_eq!(
739            catalog.get_namespace(&namespace_ident).await.unwrap(),
740            Namespace::with_properties(namespace_ident, properties)
741        );
742    }
743
744    #[tokio::test]
745    async fn test_create_namespace_throws_error_if_namespace_already_exists() {
746        let catalog = new_memory_catalog().await;
747        let namespace_ident = NamespaceIdent::new("a".into());
748        create_namespace(&catalog, &namespace_ident).await;
749
750        assert_eq!(
751            catalog
752                .create_namespace(&namespace_ident, HashMap::new())
753                .await
754                .unwrap_err()
755                .to_string(),
756            format!(
757                "NamespaceAlreadyExists => Cannot create namespace {:?}. Namespace already exists.",
758                &namespace_ident
759            )
760        );
761
762        assert_eq!(
763            catalog.get_namespace(&namespace_ident).await.unwrap(),
764            Namespace::with_properties(namespace_ident, HashMap::new())
765        );
766    }
767
768    #[tokio::test]
769    async fn test_create_nested_namespace() {
770        let catalog = new_memory_catalog().await;
771        let parent_namespace_ident = NamespaceIdent::new("a".into());
772        create_namespace(&catalog, &parent_namespace_ident).await;
773
774        let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
775
776        assert_eq!(
777            catalog
778                .create_namespace(&child_namespace_ident, HashMap::new())
779                .await
780                .unwrap(),
781            Namespace::new(child_namespace_ident.clone())
782        );
783
784        assert_eq!(
785            catalog.get_namespace(&child_namespace_ident).await.unwrap(),
786            Namespace::with_properties(child_namespace_ident, HashMap::new())
787        );
788    }
789
790    #[tokio::test]
791    async fn test_create_deeply_nested_namespace() {
792        let catalog = new_memory_catalog().await;
793        let namespace_ident_a = NamespaceIdent::new("a".into());
794        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
795        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
796
797        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
798
799        assert_eq!(
800            catalog
801                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
802                .await
803                .unwrap(),
804            Namespace::new(namespace_ident_a_b_c.clone())
805        );
806
807        assert_eq!(
808            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
809            Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
810        );
811    }
812
813    #[tokio::test]
814    async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() {
815        let catalog = new_memory_catalog().await;
816
817        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
818
819        assert_eq!(
820            catalog
821                .create_namespace(&nested_namespace_ident, HashMap::new())
822                .await
823                .unwrap_err()
824                .to_string(),
825            format!(
826                "NamespaceNotFound => No such namespace: {:?}",
827                NamespaceIdent::new("a".into())
828            )
829        );
830
831        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
832    }
833
834    #[tokio::test]
835    async fn test_create_deeply_nested_namespace_throws_error_if_intermediate_namespace_doesnt_exist()
836     {
837        let catalog = new_memory_catalog().await;
838
839        let namespace_ident_a = NamespaceIdent::new("a".into());
840        create_namespace(&catalog, &namespace_ident_a).await;
841
842        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
843
844        assert_eq!(
845            catalog
846                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
847                .await
848                .unwrap_err()
849                .to_string(),
850            format!(
851                "NamespaceNotFound => No such namespace: {:?}",
852                NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()
853            )
854        );
855
856        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
857            namespace_ident_a.clone()
858        ]);
859
860        assert_eq!(
861            catalog
862                .list_namespaces(Some(&namespace_ident_a))
863                .await
864                .unwrap(),
865            vec![]
866        );
867    }
868
869    #[tokio::test]
870    async fn test_get_namespace() {
871        let catalog = new_memory_catalog().await;
872        let namespace_ident = NamespaceIdent::new("abc".into());
873
874        let mut properties: HashMap<String, String> = HashMap::new();
875        properties.insert("k".into(), "v".into());
876        let _ = catalog
877            .create_namespace(&namespace_ident, properties.clone())
878            .await
879            .unwrap();
880
881        assert_eq!(
882            catalog.get_namespace(&namespace_ident).await.unwrap(),
883            Namespace::with_properties(namespace_ident, properties)
884        )
885    }
886
887    #[tokio::test]
888    async fn test_get_nested_namespace() {
889        let catalog = new_memory_catalog().await;
890        let namespace_ident_a = NamespaceIdent::new("a".into());
891        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
892        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
893
894        assert_eq!(
895            catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
896            Namespace::with_properties(namespace_ident_a_b, HashMap::new())
897        );
898    }
899
900    #[tokio::test]
901    async fn test_get_deeply_nested_namespace() {
902        let catalog = new_memory_catalog().await;
903        let namespace_ident_a = NamespaceIdent::new("a".into());
904        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
905        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
906        create_namespaces(&catalog, &vec![
907            &namespace_ident_a,
908            &namespace_ident_a_b,
909            &namespace_ident_a_b_c,
910        ])
911        .await;
912
913        assert_eq!(
914            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
915            Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
916        );
917    }
918
919    #[tokio::test]
920    async fn test_get_namespace_throws_error_if_namespace_doesnt_exist() {
921        let catalog = new_memory_catalog().await;
922        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
923
924        let non_existent_namespace_ident = NamespaceIdent::new("b".into());
925        assert_eq!(
926            catalog
927                .get_namespace(&non_existent_namespace_ident)
928                .await
929                .unwrap_err()
930                .to_string(),
931            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
932        )
933    }
934
935    #[tokio::test]
936    async fn test_update_namespace() {
937        let catalog = new_memory_catalog().await;
938        let namespace_ident = NamespaceIdent::new("abc".into());
939        create_namespace(&catalog, &namespace_ident).await;
940
941        let mut new_properties: HashMap<String, String> = HashMap::new();
942        new_properties.insert("k".into(), "v".into());
943
944        catalog
945            .update_namespace(&namespace_ident, new_properties.clone())
946            .await
947            .unwrap();
948
949        assert_eq!(
950            catalog.get_namespace(&namespace_ident).await.unwrap(),
951            Namespace::with_properties(namespace_ident, new_properties)
952        )
953    }
954
955    #[tokio::test]
956    async fn test_update_nested_namespace() {
957        let catalog = new_memory_catalog().await;
958        let namespace_ident_a = NamespaceIdent::new("a".into());
959        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
960        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
961
962        let mut new_properties = HashMap::new();
963        new_properties.insert("k".into(), "v".into());
964
965        catalog
966            .update_namespace(&namespace_ident_a_b, new_properties.clone())
967            .await
968            .unwrap();
969
970        assert_eq!(
971            catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
972            Namespace::with_properties(namespace_ident_a_b, new_properties)
973        );
974    }
975
976    #[tokio::test]
977    async fn test_update_deeply_nested_namespace() {
978        let catalog = new_memory_catalog().await;
979        let namespace_ident_a = NamespaceIdent::new("a".into());
980        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
981        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
982        create_namespaces(&catalog, &vec![
983            &namespace_ident_a,
984            &namespace_ident_a_b,
985            &namespace_ident_a_b_c,
986        ])
987        .await;
988
989        let mut new_properties = HashMap::new();
990        new_properties.insert("k".into(), "v".into());
991
992        catalog
993            .update_namespace(&namespace_ident_a_b_c, new_properties.clone())
994            .await
995            .unwrap();
996
997        assert_eq!(
998            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
999            Namespace::with_properties(namespace_ident_a_b_c, new_properties)
1000        );
1001    }
1002
1003    #[tokio::test]
1004    async fn test_update_namespace_throws_error_if_namespace_doesnt_exist() {
1005        let catalog = new_memory_catalog().await;
1006        create_namespace(&catalog, &NamespaceIdent::new("abc".into())).await;
1007
1008        let non_existent_namespace_ident = NamespaceIdent::new("def".into());
1009        assert_eq!(
1010            catalog
1011                .update_namespace(&non_existent_namespace_ident, HashMap::new())
1012                .await
1013                .unwrap_err()
1014                .to_string(),
1015            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1016        )
1017    }
1018
1019    #[tokio::test]
1020    async fn test_drop_namespace() {
1021        let catalog = new_memory_catalog().await;
1022        let namespace_ident = NamespaceIdent::new("abc".into());
1023        create_namespace(&catalog, &namespace_ident).await;
1024
1025        catalog.drop_namespace(&namespace_ident).await.unwrap();
1026
1027        assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
1028    }
1029
1030    #[tokio::test]
1031    async fn test_drop_nested_namespace() {
1032        let catalog = new_memory_catalog().await;
1033        let namespace_ident_a = NamespaceIdent::new("a".into());
1034        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1035        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1036
1037        catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1038
1039        assert!(
1040            !catalog
1041                .namespace_exists(&namespace_ident_a_b)
1042                .await
1043                .unwrap()
1044        );
1045
1046        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1047    }
1048
1049    #[tokio::test]
1050    async fn test_drop_deeply_nested_namespace() {
1051        let catalog = new_memory_catalog().await;
1052        let namespace_ident_a = NamespaceIdent::new("a".into());
1053        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1054        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1055        create_namespaces(&catalog, &vec![
1056            &namespace_ident_a,
1057            &namespace_ident_a_b,
1058            &namespace_ident_a_b_c,
1059        ])
1060        .await;
1061
1062        catalog
1063            .drop_namespace(&namespace_ident_a_b_c)
1064            .await
1065            .unwrap();
1066
1067        assert!(
1068            !catalog
1069                .namespace_exists(&namespace_ident_a_b_c)
1070                .await
1071                .unwrap()
1072        );
1073
1074        assert!(
1075            catalog
1076                .namespace_exists(&namespace_ident_a_b)
1077                .await
1078                .unwrap()
1079        );
1080
1081        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1082    }
1083
1084    #[tokio::test]
1085    async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1086        let catalog = new_memory_catalog().await;
1087
1088        let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1089        assert_eq!(
1090            catalog
1091                .drop_namespace(&non_existent_namespace_ident)
1092                .await
1093                .unwrap_err()
1094                .to_string(),
1095            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1096        )
1097    }
1098
1099    #[tokio::test]
1100    async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1101        let catalog = new_memory_catalog().await;
1102        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1103
1104        let non_existent_namespace_ident =
1105            NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1106        assert_eq!(
1107            catalog
1108                .drop_namespace(&non_existent_namespace_ident)
1109                .await
1110                .unwrap_err()
1111                .to_string(),
1112            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1113        )
1114    }
1115
1116    #[tokio::test]
1117    async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() {
1118        let catalog = new_memory_catalog().await;
1119        let namespace_ident_a = NamespaceIdent::new("a".into());
1120        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1121        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1122
1123        catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1124
1125        assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1126
1127        assert!(
1128            !catalog
1129                .namespace_exists(&namespace_ident_a_b)
1130                .await
1131                .unwrap()
1132        );
1133    }
1134
1135    #[tokio::test]
1136    async fn test_create_table_with_location() {
1137        let tmp_dir = TempDir::new().unwrap();
1138        let catalog = new_memory_catalog().await;
1139        let namespace_ident = NamespaceIdent::new("a".into());
1140        create_namespace(&catalog, &namespace_ident).await;
1141
1142        let table_name = "abc";
1143        let location = tmp_dir.path().to_str().unwrap().to_string();
1144        let table_creation = TableCreation::builder()
1145            .name(table_name.into())
1146            .location(location.clone())
1147            .schema(simple_table_schema())
1148            .build();
1149
1150        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1151
1152        assert_table_eq(
1153            &catalog
1154                .create_table(&namespace_ident, table_creation)
1155                .await
1156                .unwrap(),
1157            &expected_table_ident,
1158            &simple_table_schema(),
1159        );
1160
1161        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1162
1163        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1164
1165        assert!(
1166            table
1167                .metadata_location()
1168                .unwrap()
1169                .to_string()
1170                .starts_with(&location)
1171        )
1172    }
1173
1174    #[tokio::test]
1175    async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1176        let warehouse_location = temp_path();
1177        let catalog = MemoryCatalogBuilder::default()
1178            .load(
1179                "memory",
1180                HashMap::from([(
1181                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1182                    warehouse_location.clone(),
1183                )]),
1184            )
1185            .await
1186            .unwrap();
1187
1188        let namespace_ident = NamespaceIdent::new("a".into());
1189        let mut namespace_properties = HashMap::new();
1190        let namespace_location = temp_path();
1191        namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1192        catalog
1193            .create_namespace(&namespace_ident, namespace_properties)
1194            .await
1195            .unwrap();
1196
1197        let table_name = "tbl1";
1198        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1199        let expected_table_metadata_location_regex =
1200            format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1201
1202        let table = catalog
1203            .create_table(
1204                &namespace_ident,
1205                TableCreation::builder()
1206                    .name(table_name.into())
1207                    .schema(simple_table_schema())
1208                    // no location specified for table
1209                    .build(),
1210            )
1211            .await
1212            .unwrap();
1213        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1214        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1215
1216        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1217        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1218        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1219    }
1220
1221    #[tokio::test]
1222    async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1223     {
1224        let warehouse_location = temp_path();
1225        let catalog = MemoryCatalogBuilder::default()
1226            .load(
1227                "memory",
1228                HashMap::from([(
1229                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1230                    warehouse_location.clone(),
1231                )]),
1232            )
1233            .await
1234            .unwrap();
1235
1236        let namespace_ident = NamespaceIdent::new("a".into());
1237        let mut namespace_properties = HashMap::new();
1238        let namespace_location = temp_path();
1239        namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1240        catalog
1241            .create_namespace(&namespace_ident, namespace_properties)
1242            .await
1243            .unwrap();
1244
1245        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1246        let mut nested_namespace_properties = HashMap::new();
1247        let nested_namespace_location = temp_path();
1248        nested_namespace_properties
1249            .insert(LOCATION.to_string(), nested_namespace_location.to_string());
1250        catalog
1251            .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1252            .await
1253            .unwrap();
1254
1255        let table_name = "tbl1";
1256        let expected_table_ident =
1257            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1258        let expected_table_metadata_location_regex = format!(
1259            "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1260        );
1261
1262        let table = catalog
1263            .create_table(
1264                &nested_namespace_ident,
1265                TableCreation::builder()
1266                    .name(table_name.into())
1267                    .schema(simple_table_schema())
1268                    // no location specified for table
1269                    .build(),
1270            )
1271            .await
1272            .unwrap();
1273        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1274        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1275
1276        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1277        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1278        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1279    }
1280
1281    #[tokio::test]
1282    async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1283     {
1284        let warehouse_location = temp_path();
1285        let catalog = MemoryCatalogBuilder::default()
1286            .load(
1287                "memory",
1288                HashMap::from([(
1289                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1290                    warehouse_location.clone(),
1291                )]),
1292            )
1293            .await
1294            .unwrap();
1295
1296        let namespace_ident = NamespaceIdent::new("a".into());
1297        // note: no location specified in namespace_properties
1298        let namespace_properties = HashMap::new();
1299        catalog
1300            .create_namespace(&namespace_ident, namespace_properties)
1301            .await
1302            .unwrap();
1303
1304        let table_name = "tbl1";
1305        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1306        let expected_table_metadata_location_regex =
1307            format!("^{warehouse_location}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1308
1309        let table = catalog
1310            .create_table(
1311                &namespace_ident,
1312                TableCreation::builder()
1313                    .name(table_name.into())
1314                    .schema(simple_table_schema())
1315                    // no location specified for table
1316                    .build(),
1317            )
1318            .await
1319            .unwrap();
1320        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1321        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1322
1323        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1324        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1325        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1326    }
1327
1328    #[tokio::test]
1329    async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1330     {
1331        let warehouse_location = temp_path();
1332        let catalog = MemoryCatalogBuilder::default()
1333            .load(
1334                "memory",
1335                HashMap::from([(
1336                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1337                    warehouse_location.clone(),
1338                )]),
1339            )
1340            .await
1341            .unwrap();
1342
1343        let namespace_ident = NamespaceIdent::new("a".into());
1344        catalog
1345            // note: no location specified in namespace_properties
1346            .create_namespace(&namespace_ident, HashMap::new())
1347            .await
1348            .unwrap();
1349
1350        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1351        catalog
1352            // note: no location specified in namespace_properties
1353            .create_namespace(&nested_namespace_ident, HashMap::new())
1354            .await
1355            .unwrap();
1356
1357        let table_name = "tbl1";
1358        let expected_table_ident =
1359            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1360        let expected_table_metadata_location_regex = format!(
1361            "^{warehouse_location}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$"
1362        );
1363
1364        let table = catalog
1365            .create_table(
1366                &nested_namespace_ident,
1367                TableCreation::builder()
1368                    .name(table_name.into())
1369                    .schema(simple_table_schema())
1370                    // no location specified for table
1371                    .build(),
1372            )
1373            .await
1374            .unwrap();
1375        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1376        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1377
1378        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1379        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1380        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1381    }
1382
1383    #[tokio::test]
1384    async fn test_create_table_throws_error_if_table_location_and_namespace_location_and_warehouse_location_are_missing()
1385     {
1386        let catalog = MemoryCatalogBuilder::default()
1387            .load("memory", HashMap::from([]))
1388            .await;
1389
1390        assert!(catalog.is_err());
1391        assert_eq!(
1392            catalog.unwrap_err().to_string(),
1393            "DataInvalid => Catalog warehouse is required"
1394        );
1395    }
1396
1397    #[tokio::test]
1398    async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1399        let catalog = new_memory_catalog().await;
1400        let namespace_ident = NamespaceIdent::new("a".into());
1401        create_namespace(&catalog, &namespace_ident).await;
1402        let table_name = "tbl1";
1403        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1404        create_table(&catalog, &table_ident).await;
1405
1406        let tmp_dir = TempDir::new().unwrap();
1407        let location = tmp_dir.path().to_str().unwrap().to_string();
1408
1409        assert_eq!(
1410            catalog
1411                .create_table(
1412                    &namespace_ident,
1413                    TableCreation::builder()
1414                        .name(table_name.into())
1415                        .schema(simple_table_schema())
1416                        .location(location)
1417                        .build()
1418                )
1419                .await
1420                .unwrap_err()
1421                .to_string(),
1422            format!(
1423                "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1424                &table_ident
1425            )
1426        );
1427    }
1428
1429    #[tokio::test]
1430    async fn test_list_tables_returns_empty_vector() {
1431        let catalog = new_memory_catalog().await;
1432        let namespace_ident = NamespaceIdent::new("a".into());
1433        create_namespace(&catalog, &namespace_ident).await;
1434
1435        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1436    }
1437
1438    #[tokio::test]
1439    async fn test_list_tables_returns_a_single_table() {
1440        let catalog = new_memory_catalog().await;
1441        let namespace_ident = NamespaceIdent::new("n1".into());
1442        create_namespace(&catalog, &namespace_ident).await;
1443
1444        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1445        create_table(&catalog, &table_ident).await;
1446
1447        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1448            table_ident
1449        ]);
1450    }
1451
1452    #[tokio::test]
1453    async fn test_list_tables_returns_multiple_tables() {
1454        let catalog = new_memory_catalog().await;
1455        let namespace_ident = NamespaceIdent::new("n1".into());
1456        create_namespace(&catalog, &namespace_ident).await;
1457
1458        let table_ident_1 = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1459        let table_ident_2 = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1460        let _ = create_tables(&catalog, vec![&table_ident_1, &table_ident_2]).await;
1461
1462        assert_eq!(
1463            to_set(catalog.list_tables(&namespace_ident).await.unwrap()),
1464            to_set(vec![table_ident_1, table_ident_2])
1465        );
1466    }
1467
1468    #[tokio::test]
1469    async fn test_list_tables_returns_tables_from_correct_namespace() {
1470        let catalog = new_memory_catalog().await;
1471        let namespace_ident_1 = NamespaceIdent::new("n1".into());
1472        let namespace_ident_2 = NamespaceIdent::new("n2".into());
1473        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1474
1475        let table_ident_1 = TableIdent::new(namespace_ident_1.clone(), "tbl1".into());
1476        let table_ident_2 = TableIdent::new(namespace_ident_1.clone(), "tbl2".into());
1477        let table_ident_3 = TableIdent::new(namespace_ident_2.clone(), "tbl1".into());
1478        let _ = create_tables(&catalog, vec![
1479            &table_ident_1,
1480            &table_ident_2,
1481            &table_ident_3,
1482        ])
1483        .await;
1484
1485        assert_eq!(
1486            to_set(catalog.list_tables(&namespace_ident_1).await.unwrap()),
1487            to_set(vec![table_ident_1, table_ident_2])
1488        );
1489
1490        assert_eq!(
1491            to_set(catalog.list_tables(&namespace_ident_2).await.unwrap()),
1492            to_set(vec![table_ident_3])
1493        );
1494    }
1495
1496    #[tokio::test]
1497    async fn test_list_tables_returns_table_under_nested_namespace() {
1498        let catalog = new_memory_catalog().await;
1499        let namespace_ident_a = NamespaceIdent::new("a".into());
1500        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1501        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1502
1503        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1504        create_table(&catalog, &table_ident).await;
1505
1506        assert_eq!(
1507            catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1508            vec![table_ident]
1509        );
1510    }
1511
1512    #[tokio::test]
1513    async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1514        let catalog = new_memory_catalog().await;
1515
1516        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1517
1518        assert_eq!(
1519            catalog
1520                .list_tables(&non_existent_namespace_ident)
1521                .await
1522                .unwrap_err()
1523                .to_string(),
1524            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1525        );
1526    }
1527
1528    #[tokio::test]
1529    async fn test_drop_table() {
1530        let catalog = new_memory_catalog().await;
1531        let namespace_ident = NamespaceIdent::new("n1".into());
1532        create_namespace(&catalog, &namespace_ident).await;
1533        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1534        create_table(&catalog, &table_ident).await;
1535
1536        catalog.drop_table(&table_ident).await.unwrap();
1537    }
1538
1539    #[tokio::test]
1540    async fn test_drop_table_drops_table_under_nested_namespace() {
1541        let catalog = new_memory_catalog().await;
1542        let namespace_ident_a = NamespaceIdent::new("a".into());
1543        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1544        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1545
1546        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1547        create_table(&catalog, &table_ident).await;
1548
1549        catalog.drop_table(&table_ident).await.unwrap();
1550
1551        assert_eq!(
1552            catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1553            vec![]
1554        );
1555    }
1556
1557    #[tokio::test]
1558    async fn test_drop_table_throws_error_if_namespace_doesnt_exist() {
1559        let catalog = new_memory_catalog().await;
1560
1561        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1562        let non_existent_table_ident =
1563            TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1564
1565        assert_eq!(
1566            catalog
1567                .drop_table(&non_existent_table_ident)
1568                .await
1569                .unwrap_err()
1570                .to_string(),
1571            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1572        );
1573    }
1574
1575    #[tokio::test]
1576    async fn test_drop_table_throws_error_if_table_doesnt_exist() {
1577        let catalog = new_memory_catalog().await;
1578        let namespace_ident = NamespaceIdent::new("n1".into());
1579        create_namespace(&catalog, &namespace_ident).await;
1580
1581        let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1582
1583        assert_eq!(
1584            catalog
1585                .drop_table(&non_existent_table_ident)
1586                .await
1587                .unwrap_err()
1588                .to_string(),
1589            format!("TableNotFound => No such table: {non_existent_table_ident:?}"),
1590        );
1591    }
1592
1593    #[tokio::test]
1594    async fn test_table_exists_returns_true() {
1595        let catalog = new_memory_catalog().await;
1596        let namespace_ident = NamespaceIdent::new("n1".into());
1597        create_namespace(&catalog, &namespace_ident).await;
1598        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1599        create_table(&catalog, &table_ident).await;
1600
1601        assert!(catalog.table_exists(&table_ident).await.unwrap());
1602    }
1603
1604    #[tokio::test]
1605    async fn test_table_exists_returns_false() {
1606        let catalog = new_memory_catalog().await;
1607        let namespace_ident = NamespaceIdent::new("n1".into());
1608        create_namespace(&catalog, &namespace_ident).await;
1609        let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1610
1611        assert!(
1612            !catalog
1613                .table_exists(&non_existent_table_ident)
1614                .await
1615                .unwrap()
1616        );
1617    }
1618
1619    #[tokio::test]
1620    async fn test_table_exists_under_nested_namespace() {
1621        let catalog = new_memory_catalog().await;
1622        let namespace_ident_a = NamespaceIdent::new("a".into());
1623        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1624        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1625
1626        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1627        create_table(&catalog, &table_ident).await;
1628
1629        assert!(catalog.table_exists(&table_ident).await.unwrap());
1630
1631        let non_existent_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl2".into());
1632        assert!(
1633            !catalog
1634                .table_exists(&non_existent_table_ident)
1635                .await
1636                .unwrap()
1637        );
1638    }
1639
1640    #[tokio::test]
1641    async fn test_table_exists_throws_error_if_namespace_doesnt_exist() {
1642        let catalog = new_memory_catalog().await;
1643
1644        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1645        let non_existent_table_ident =
1646            TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1647
1648        assert_eq!(
1649            catalog
1650                .table_exists(&non_existent_table_ident)
1651                .await
1652                .unwrap_err()
1653                .to_string(),
1654            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1655        );
1656    }
1657
1658    #[tokio::test]
1659    async fn test_rename_table_in_same_namespace() {
1660        let catalog = new_memory_catalog().await;
1661        let namespace_ident = NamespaceIdent::new("n1".into());
1662        create_namespace(&catalog, &namespace_ident).await;
1663        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1664        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1665        create_table(&catalog, &src_table_ident).await;
1666
1667        catalog
1668            .rename_table(&src_table_ident, &dst_table_ident)
1669            .await
1670            .unwrap();
1671
1672        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1673            dst_table_ident
1674        ],);
1675    }
1676
1677    #[tokio::test]
1678    async fn test_rename_table_across_namespaces() {
1679        let catalog = new_memory_catalog().await;
1680        let src_namespace_ident = NamespaceIdent::new("a".into());
1681        let dst_namespace_ident = NamespaceIdent::new("b".into());
1682        create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
1683        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1684        let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
1685        create_table(&catalog, &src_table_ident).await;
1686
1687        catalog
1688            .rename_table(&src_table_ident, &dst_table_ident)
1689            .await
1690            .unwrap();
1691
1692        assert_eq!(
1693            catalog.list_tables(&src_namespace_ident).await.unwrap(),
1694            vec![],
1695        );
1696
1697        assert_eq!(
1698            catalog.list_tables(&dst_namespace_ident).await.unwrap(),
1699            vec![dst_table_ident],
1700        );
1701    }
1702
1703    #[tokio::test]
1704    async fn test_rename_table_src_table_is_same_as_dst_table() {
1705        let catalog = new_memory_catalog().await;
1706        let namespace_ident = NamespaceIdent::new("n1".into());
1707        create_namespace(&catalog, &namespace_ident).await;
1708        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1709        create_table(&catalog, &table_ident).await;
1710
1711        catalog
1712            .rename_table(&table_ident, &table_ident)
1713            .await
1714            .unwrap();
1715
1716        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1717            table_ident
1718        ],);
1719    }
1720
1721    #[tokio::test]
1722    async fn test_rename_table_across_nested_namespaces() {
1723        let catalog = new_memory_catalog().await;
1724        let namespace_ident_a = NamespaceIdent::new("a".into());
1725        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1726        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1727        create_namespaces(&catalog, &vec![
1728            &namespace_ident_a,
1729            &namespace_ident_a_b,
1730            &namespace_ident_a_b_c,
1731        ])
1732        .await;
1733
1734        let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
1735        create_tables(&catalog, vec![&src_table_ident]).await;
1736
1737        let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1738        catalog
1739            .rename_table(&src_table_ident, &dst_table_ident)
1740            .await
1741            .unwrap();
1742
1743        assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
1744
1745        assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
1746    }
1747
1748    #[tokio::test]
1749    async fn test_rename_table_throws_error_if_src_namespace_doesnt_exist() {
1750        let catalog = new_memory_catalog().await;
1751
1752        let non_existent_src_namespace_ident = NamespaceIdent::new("n1".into());
1753        let src_table_ident =
1754            TableIdent::new(non_existent_src_namespace_ident.clone(), "tbl1".into());
1755
1756        let dst_namespace_ident = NamespaceIdent::new("n2".into());
1757        create_namespace(&catalog, &dst_namespace_ident).await;
1758        let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl1".into());
1759
1760        assert_eq!(
1761            catalog
1762                .rename_table(&src_table_ident, &dst_table_ident)
1763                .await
1764                .unwrap_err()
1765                .to_string(),
1766            format!("NamespaceNotFound => No such namespace: {non_existent_src_namespace_ident:?}"),
1767        );
1768    }
1769
1770    #[tokio::test]
1771    async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
1772        let catalog = new_memory_catalog().await;
1773        let src_namespace_ident = NamespaceIdent::new("n1".into());
1774        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1775        create_namespace(&catalog, &src_namespace_ident).await;
1776        create_table(&catalog, &src_table_ident).await;
1777
1778        let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
1779        let dst_table_ident =
1780            TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
1781        assert_eq!(
1782            catalog
1783                .rename_table(&src_table_ident, &dst_table_ident)
1784                .await
1785                .unwrap_err()
1786                .to_string(),
1787            format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"),
1788        );
1789    }
1790
1791    #[tokio::test]
1792    async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
1793        let catalog = new_memory_catalog().await;
1794        let namespace_ident = NamespaceIdent::new("n1".into());
1795        create_namespace(&catalog, &namespace_ident).await;
1796        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1797        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1798
1799        assert_eq!(
1800            catalog
1801                .rename_table(&src_table_ident, &dst_table_ident)
1802                .await
1803                .unwrap_err()
1804                .to_string(),
1805            format!("TableNotFound => No such table: {src_table_ident:?}"),
1806        );
1807    }
1808
1809    #[tokio::test]
1810    async fn test_rename_table_throws_error_if_dst_table_already_exists() {
1811        let catalog = new_memory_catalog().await;
1812        let namespace_ident = NamespaceIdent::new("n1".into());
1813        create_namespace(&catalog, &namespace_ident).await;
1814        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1815        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1816        create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
1817
1818        assert_eq!(
1819            catalog
1820                .rename_table(&src_table_ident, &dst_table_ident)
1821                .await
1822                .unwrap_err()
1823                .to_string(),
1824            format!(
1825                "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1826                &dst_table_ident
1827            ),
1828        );
1829    }
1830
1831    #[tokio::test]
1832    async fn test_register_table() {
1833        // Create a catalog and namespace
1834        let catalog = new_memory_catalog().await;
1835        let namespace_ident = NamespaceIdent::new("test_namespace".into());
1836        create_namespace(&catalog, &namespace_ident).await;
1837
1838        // Create a table to get a valid metadata file
1839        let source_table_ident = TableIdent::new(namespace_ident.clone(), "source_table".into());
1840        create_table(&catalog, &source_table_ident).await;
1841
1842        // Get the metadata location from the source table
1843        let source_table = catalog.load_table(&source_table_ident).await.unwrap();
1844        let metadata_location = source_table.metadata_location().unwrap().to_string();
1845
1846        // Register a new table using the same metadata location
1847        let register_table_ident =
1848            TableIdent::new(namespace_ident.clone(), "register_table".into());
1849        let registered_table = catalog
1850            .register_table(&register_table_ident, metadata_location.clone())
1851            .await
1852            .unwrap();
1853
1854        // Verify the registered table has the correct identifier
1855        assert_eq!(registered_table.identifier(), &register_table_ident);
1856
1857        // Verify the registered table has the correct metadata location
1858        assert_eq!(
1859            registered_table.metadata_location().unwrap().to_string(),
1860            metadata_location
1861        );
1862
1863        // Verify the table exists in the catalog
1864        assert!(catalog.table_exists(&register_table_ident).await.unwrap());
1865
1866        // Verify we can load the registered table
1867        let loaded_table = catalog.load_table(&register_table_ident).await.unwrap();
1868        assert_eq!(loaded_table.identifier(), &register_table_ident);
1869        assert_eq!(
1870            loaded_table.metadata_location().unwrap().to_string(),
1871            metadata_location
1872        );
1873    }
1874
1875    #[tokio::test]
1876    async fn test_update_table() {
1877        let catalog = new_memory_catalog().await;
1878
1879        let table = create_table_with_namespace(&catalog).await;
1880
1881        // Assert the table doesn't contain the update yet
1882        assert!(!table.metadata().properties().contains_key("key"));
1883
1884        // `last_updated_ms` is millisecond-resolution `chrono::Utc::now()`.
1885        // Without this sleep, create and commit can land in the same
1886        // millisecond and the `<` assertion below flakes.
1887        tokio::time::sleep(std::time::Duration::from_millis(2)).await;
1888
1889        // Update table metadata
1890        let tx = Transaction::new(&table);
1891        let updated_table = tx
1892            .update_table_properties()
1893            .set("key".to_string(), "value".to_string())
1894            .apply(tx)
1895            .unwrap()
1896            .commit(&catalog)
1897            .await
1898            .unwrap();
1899
1900        assert_eq!(
1901            updated_table.metadata().properties().get("key").unwrap(),
1902            "value"
1903        );
1904
1905        assert_eq!(table.identifier(), updated_table.identifier());
1906        assert_eq!(table.metadata().uuid(), updated_table.metadata().uuid());
1907        assert_ne!(table.metadata_location(), updated_table.metadata_location());
1908
1909        assert!(
1910            table.metadata().metadata_log().len() < updated_table.metadata().metadata_log().len()
1911        );
1912    }
1913
1914    #[tokio::test]
1915    async fn test_update_table_fails_if_table_doesnt_exist() {
1916        let catalog = new_memory_catalog().await;
1917
1918        let namespace_ident = NamespaceIdent::new("a".into());
1919        create_namespace(&catalog, &namespace_ident).await;
1920
1921        // This table is not known to the catalog.
1922        let table_ident = TableIdent::new(namespace_ident, "test".to_string());
1923        let table = build_table(table_ident);
1924
1925        let tx = Transaction::new(&table);
1926        let err = tx
1927            .update_table_properties()
1928            .set("key".to_string(), "value".to_string())
1929            .apply(tx)
1930            .unwrap()
1931            .commit(&catalog)
1932            .await
1933            .unwrap_err();
1934        assert_eq!(err.kind(), ErrorKind::TableNotFound);
1935    }
1936
1937    fn build_table(ident: TableIdent) -> Table {
1938        let file_io = FileIO::new_with_fs();
1939
1940        let temp_dir = TempDir::new().unwrap();
1941        let location = temp_dir.path().to_str().unwrap().to_string();
1942
1943        let table_creation = TableCreation::builder()
1944            .name(ident.name().to_string())
1945            .schema(simple_table_schema())
1946            .location(location)
1947            .build();
1948        let metadata = TableMetadataBuilder::from_table_creation(table_creation)
1949            .unwrap()
1950            .build()
1951            .unwrap()
1952            .metadata;
1953
1954        Table::builder()
1955            .identifier(ident)
1956            .metadata(metadata)
1957            .file_io(file_io)
1958            .runtime(test_runtime())
1959            .build()
1960            .unwrap()
1961    }
1962}