iceberg_catalog_sql/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use async_trait::async_trait;
24use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
25use iceberg::spec::{TableMetadata, TableMetadataBuilder};
26use iceberg::table::Table;
27use iceberg::{
28    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
29    TableCommit, TableCreation, TableIdent,
30};
31use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers};
32use sqlx::{Any, AnyPool, Row, Transaction};
33
34use crate::error::{
35    from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err,
36};
37
38/// catalog URI
39pub const SQL_CATALOG_PROP_URI: &str = "uri";
40/// catalog warehouse location
41pub const SQL_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
42/// catalog sql bind style
43pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style";
44
45static CATALOG_TABLE_NAME: &str = "iceberg_tables";
46static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
47static CATALOG_FIELD_TABLE_NAME: &str = "table_name";
48static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace";
49static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location";
50static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
51static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type";
52static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE";
53
54static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties";
55static NAMESPACE_FIELD_NAME: &str = "namespace";
56static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key";
57static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value";
58
59static NAMESPACE_LOCATION_PROPERTY_KEY: &str = "location";
60
61static MAX_CONNECTIONS: u32 = 10; // Default the SQL pool to 10 connections if not provided
62static IDLE_TIMEOUT: u64 = 10; // Default the maximum idle timeout per connection to 10s before it is closed
63static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each connection to enabled prior to returning
64
65/// Builder for [`SqlCatalog`]
66#[derive(Debug)]
67pub struct SqlCatalogBuilder {
68    config: SqlCatalogConfig,
69    storage_factory: Option<Arc<dyn StorageFactory>>,
70}
71
72impl Default for SqlCatalogBuilder {
73    fn default() -> Self {
74        Self {
75            config: SqlCatalogConfig {
76                uri: "".to_string(),
77                name: "".to_string(),
78                warehouse_location: "".to_string(),
79                sql_bind_style: SqlBindStyle::DollarNumeric,
80                props: HashMap::new(),
81            },
82            storage_factory: None,
83        }
84    }
85}
86
87impl SqlCatalogBuilder {
88    /// Configure the database URI
89    ///
90    /// If `SQL_CATALOG_PROP_URI` has a value set in `props` during `SqlCatalogBuilder::load`,
91    /// that value takes precedence, and the value specified by this method will not be used.
92    pub fn uri(mut self, uri: impl Into<String>) -> Self {
93        self.config.uri = uri.into();
94        self
95    }
96
97    /// Configure the warehouse location
98    ///
99    /// If `SQL_CATALOG_PROP_WAREHOUSE` has a value set in `props` during `SqlCatalogBuilder::load`,
100    /// that value takes precedence, and the value specified by this method will not be used.
101    pub fn warehouse_location(mut self, location: impl Into<String>) -> Self {
102        self.config.warehouse_location = location.into();
103        self
104    }
105
106    /// Configure the bound SQL Statement
107    ///
108    /// If `SQL_CATALOG_PROP_BIND_STYLE` has a value set in `props` during `SqlCatalogBuilder::load`,
109    /// that value takes precedence, and the value specified by this method will not be used.
110    pub fn sql_bind_style(mut self, sql_bind_style: SqlBindStyle) -> Self {
111        self.config.sql_bind_style = sql_bind_style;
112        self
113    }
114
115    /// Configure the any properties
116    ///
117    /// If the same key has values set in `props` during `SqlCatalogBuilder::load`,
118    /// those values will take precedence.
119    pub fn props(mut self, props: HashMap<String, String>) -> Self {
120        for (k, v) in props {
121            self.config.props.insert(k, v);
122        }
123        self
124    }
125
126    /// Set a new property on the property to be configured.
127    /// When multiple methods are executed with the same key,
128    /// the later-set value takes precedence.
129    ///
130    /// If the same key has values set in `props` during `SqlCatalogBuilder::load`,
131    /// those values will take precedence.
132    pub fn prop(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
133        self.config.props.insert(key.into(), value.into());
134        self
135    }
136}
137
138impl CatalogBuilder for SqlCatalogBuilder {
139    type C = SqlCatalog;
140
141    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
142        self.storage_factory = Some(storage_factory);
143        self
144    }
145
146    fn load(
147        mut self,
148        name: impl Into<String>,
149        props: HashMap<String, String>,
150    ) -> impl Future<Output = Result<Self::C>> + Send {
151        for (k, v) in props {
152            self.config.props.insert(k, v);
153        }
154
155        if let Some(uri) = self.config.props.remove(SQL_CATALOG_PROP_URI) {
156            self.config.uri = uri;
157        }
158        if let Some(warehouse_location) = self.config.props.remove(SQL_CATALOG_PROP_WAREHOUSE) {
159            self.config.warehouse_location = warehouse_location;
160        }
161
162        let name = name.into();
163
164        let mut valid_sql_bind_style = true;
165        if let Some(sql_bind_style) = self.config.props.remove(SQL_CATALOG_PROP_BIND_STYLE) {
166            if let Ok(sql_bind_style) = SqlBindStyle::from_str(&sql_bind_style) {
167                self.config.sql_bind_style = sql_bind_style;
168            } else {
169                valid_sql_bind_style = false;
170            }
171        }
172
173        let valid_name = !name.trim().is_empty();
174
175        async move {
176            if !valid_name {
177                Err(Error::new(
178                    ErrorKind::DataInvalid,
179                    "Catalog name cannot be empty",
180                ))
181            } else if !valid_sql_bind_style {
182                Err(Error::new(
183                    ErrorKind::DataInvalid,
184                    format!(
185                        "`{}` values are valid only if they're `{}` or `{}`",
186                        SQL_CATALOG_PROP_BIND_STYLE,
187                        SqlBindStyle::DollarNumeric,
188                        SqlBindStyle::QMark
189                    ),
190                ))
191            } else {
192                self.config.name = name;
193                SqlCatalog::new(self.config, self.storage_factory).await
194            }
195        }
196    }
197}
198
199/// A struct representing the SQL catalog configuration.
200///
201/// This struct contains various parameters that are used to configure a SQL catalog,
202/// such as the database URI, warehouse location, and file I/O settings.
203/// You are required to provide a `SqlBindStyle`, which determines how SQL statements will be bound to values in the catalog.
204/// The options available for this parameter include:
205/// - `SqlBindStyle::DollarNumeric`: Binds SQL statements using `$1`, `$2`, etc., as placeholders. This is for PostgreSQL databases.
206/// - `SqlBindStyle::QuestionMark`: Binds SQL statements using `?` as a placeholder. This is for MySQL and SQLite databases.
207#[derive(Debug)]
208struct SqlCatalogConfig {
209    uri: String,
210    name: String,
211    warehouse_location: String,
212    sql_bind_style: SqlBindStyle,
213    props: HashMap<String, String>,
214}
215
216#[derive(Debug)]
217/// Sql catalog implementation.
218pub struct SqlCatalog {
219    name: String,
220    connection: AnyPool,
221    warehouse_location: String,
222    fileio: FileIO,
223    sql_bind_style: SqlBindStyle,
224}
225
226#[derive(Debug, PartialEq, strum::EnumString, strum::Display)]
227/// Set the SQL parameter bind style to either $1..$N (Postgres style) or ? (SQLite/MySQL/MariaDB)
228pub enum SqlBindStyle {
229    /// DollarNumeric uses parameters of the form `$1..$N``, which is the Postgres style
230    DollarNumeric,
231    /// QMark uses parameters of the form `?` which is the style for other dialects (SQLite/MySQL/MariaDB)
232    QMark,
233}
234
235impl SqlCatalog {
236    /// Create new sql catalog instance
237    async fn new(
238        config: SqlCatalogConfig,
239        storage_factory: Option<Arc<dyn StorageFactory>>,
240    ) -> Result<Self> {
241        let factory = storage_factory.ok_or_else(|| {
242            Error::new(
243                ErrorKind::Unexpected,
244                "StorageFactory must be provided for SqlCatalog. Use `with_storage_factory` to configure it.",
245            )
246        })?;
247        let fileio = FileIOBuilder::new(factory).build();
248
249        install_default_drivers();
250        let max_connections: u32 = config
251            .props
252            .get("pool.max-connections")
253            .map(|v| v.parse().unwrap())
254            .unwrap_or(MAX_CONNECTIONS);
255        let idle_timeout: u64 = config
256            .props
257            .get("pool.idle-timeout")
258            .map(|v| v.parse().unwrap())
259            .unwrap_or(IDLE_TIMEOUT);
260        let test_before_acquire: bool = config
261            .props
262            .get("pool.test-before-acquire")
263            .map(|v| v.parse().unwrap())
264            .unwrap_or(TEST_BEFORE_ACQUIRE);
265
266        let pool = AnyPoolOptions::new()
267            .max_connections(max_connections)
268            .idle_timeout(Duration::from_secs(idle_timeout))
269            .test_before_acquire(test_before_acquire)
270            .connect(&config.uri)
271            .await
272            .map_err(from_sqlx_error)?;
273
274        sqlx::query(&format!(
275            "CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_NAME} (
276                {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
277                {CATALOG_FIELD_TABLE_NAMESPACE} VARCHAR(255) NOT NULL,
278                {CATALOG_FIELD_TABLE_NAME} VARCHAR(255) NOT NULL,
279                {CATALOG_FIELD_METADATA_LOCATION_PROP} VARCHAR(1000),
280                {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} VARCHAR(1000),
281                {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5),
282                PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}))"
283        ))
284        .execute(&pool)
285        .await
286        .map_err(from_sqlx_error)?;
287
288        sqlx::query(&format!(
289            "CREATE TABLE IF NOT EXISTS {NAMESPACE_TABLE_NAME} (
290                {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
291                {NAMESPACE_FIELD_NAME} VARCHAR(255) NOT NULL,
292                {NAMESPACE_FIELD_PROPERTY_KEY} VARCHAR(255),
293                {NAMESPACE_FIELD_PROPERTY_VALUE} VARCHAR(1000),
294                PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}))"
295        ))
296        .execute(&pool)
297        .await
298        .map_err(from_sqlx_error)?;
299
300        Ok(SqlCatalog {
301            name: config.name.to_owned(),
302            connection: pool,
303            warehouse_location: config.warehouse_location,
304            fileio,
305            sql_bind_style: config.sql_bind_style,
306        })
307    }
308
309    /// SQLX Any does not implement PostgresSQL bindings, so we have to do this.
310    fn replace_placeholders(&self, query: &str) -> String {
311        match self.sql_bind_style {
312            SqlBindStyle::DollarNumeric => {
313                let mut count = 1;
314                query
315                    .chars()
316                    .fold(String::with_capacity(query.len()), |mut acc, c| {
317                        if c == '?' {
318                            acc.push('$');
319                            acc.push_str(&count.to_string());
320                            count += 1;
321                        } else {
322                            acc.push(c);
323                        }
324                        acc
325                    })
326            }
327            _ => query.to_owned(),
328        }
329    }
330
331    /// Fetch a vec of AnyRows from a given query
332    async fn fetch_rows(&self, query: &str, args: Vec<Option<&str>>) -> Result<Vec<AnyRow>> {
333        let query_with_placeholders = self.replace_placeholders(query);
334
335        let mut sqlx_query = sqlx::query(&query_with_placeholders);
336        for arg in args {
337            sqlx_query = sqlx_query.bind(arg);
338        }
339
340        sqlx_query
341            .fetch_all(&self.connection)
342            .await
343            .map_err(from_sqlx_error)
344    }
345
346    /// Execute statements in a transaction, provided or not
347    async fn execute(
348        &self,
349        query: &str,
350        args: Vec<Option<&str>>,
351        transaction: Option<&mut Transaction<'_, Any>>,
352    ) -> Result<AnyQueryResult> {
353        let query_with_placeholders = self.replace_placeholders(query);
354
355        let mut sqlx_query = sqlx::query(&query_with_placeholders);
356        for arg in args {
357            sqlx_query = sqlx_query.bind(arg);
358        }
359
360        match transaction {
361            Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error),
362            None => {
363                let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
364                let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error);
365                let _ = tx.commit().await.map_err(from_sqlx_error);
366                result
367            }
368        }
369    }
370}
371
372#[async_trait]
373impl Catalog for SqlCatalog {
374    async fn list_namespaces(
375        &self,
376        parent: Option<&NamespaceIdent>,
377    ) -> Result<Vec<NamespaceIdent>> {
378        // UNION will remove duplicates.
379        let all_namespaces_stmt = format!(
380            "SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
381             FROM {CATALOG_TABLE_NAME}
382             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
383             UNION
384             SELECT {NAMESPACE_FIELD_NAME}
385             FROM {NAMESPACE_TABLE_NAME}
386             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
387        );
388
389        let namespace_rows = self
390            .fetch_rows(&all_namespaces_stmt, vec![
391                Some(&self.name),
392                Some(&self.name),
393            ])
394            .await?;
395
396        let mut namespaces = HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len());
397
398        if let Some(parent) = parent {
399            if self.namespace_exists(parent).await? {
400                let parent_str = parent.join(".");
401
402                for row in namespace_rows.iter() {
403                    let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
404                    // if parent = a, then we only want to see a.b, a.c returned.
405                    if nsp != parent_str && nsp.starts_with(&parent_str) {
406                        namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?);
407                    }
408                }
409
410                Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
411            } else {
412                no_such_namespace_err(parent)
413            }
414        } else {
415            for row in namespace_rows.iter() {
416                let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
417                let mut levels = nsp.split(".").collect::<Vec<&str>>();
418                if !levels.is_empty() {
419                    let first_level = levels.drain(..1).collect::<Vec<&str>>();
420                    namespaces.insert(NamespaceIdent::from_strs(first_level)?);
421                }
422            }
423
424            Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
425        }
426    }
427
428    async fn create_namespace(
429        &self,
430        namespace: &NamespaceIdent,
431        properties: HashMap<String, String>,
432    ) -> Result<Namespace> {
433        let exists = self.namespace_exists(namespace).await?;
434
435        if exists {
436            return Err(Error::new(
437                iceberg::ErrorKind::NamespaceAlreadyExists,
438                format!("Namespace {namespace:?} already exists"),
439            ));
440        }
441
442        let namespace_str = namespace.join(".");
443        let insert = format!(
444            "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
445             VALUES (?, ?, ?, ?)");
446        if !properties.is_empty() {
447            let mut insert_properties = properties.clone();
448            insert_properties.insert("exists".to_string(), "true".to_string());
449
450            let mut query_args = Vec::with_capacity(insert_properties.len() * 4);
451            let mut insert_stmt = insert.clone();
452            for (index, (key, value)) in insert_properties.iter().enumerate() {
453                query_args.extend_from_slice(&[
454                    Some(self.name.as_str()),
455                    Some(namespace_str.as_str()),
456                    Some(key.as_str()),
457                    Some(value.as_str()),
458                ]);
459                if index > 0 {
460                    insert_stmt.push_str(", (?, ?, ?, ?)");
461                }
462            }
463
464            self.execute(&insert_stmt, query_args, None).await?;
465
466            Ok(Namespace::with_properties(
467                namespace.clone(),
468                insert_properties,
469            ))
470        } else {
471            // set a default property of exists = true
472            self.execute(
473                &insert,
474                vec![
475                    Some(&self.name),
476                    Some(&namespace_str),
477                    Some("exists"),
478                    Some("true"),
479                ],
480                None,
481            )
482            .await?;
483            Ok(Namespace::with_properties(namespace.clone(), properties))
484        }
485    }
486
487    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
488        let exists = self.namespace_exists(namespace).await?;
489        if exists {
490            let namespace_props = self
491                .fetch_rows(
492                    &format!(
493                        "SELECT
494                            {NAMESPACE_FIELD_NAME},
495                            {NAMESPACE_FIELD_PROPERTY_KEY},
496                            {NAMESPACE_FIELD_PROPERTY_VALUE}
497                            FROM {NAMESPACE_TABLE_NAME}
498                            WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
499                            AND {NAMESPACE_FIELD_NAME} = ?"
500                    ),
501                    vec![Some(&self.name), Some(&namespace.join("."))],
502                )
503                .await?;
504
505            let mut properties = HashMap::with_capacity(namespace_props.len());
506
507            for row in namespace_props {
508                let key = row
509                    .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_KEY)
510                    .map_err(from_sqlx_error)?;
511                let value = row
512                    .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_VALUE)
513                    .map_err(from_sqlx_error)?;
514
515                properties.insert(key, value);
516            }
517
518            Ok(Namespace::with_properties(namespace.clone(), properties))
519        } else {
520            no_such_namespace_err(namespace)
521        }
522    }
523
524    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
525        let namespace_str = namespace.join(".");
526
527        let table_namespaces = self
528            .fetch_rows(
529                &format!(
530                    "SELECT 1 FROM {CATALOG_TABLE_NAME}
531                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
532                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
533                     LIMIT 1"
534                ),
535                vec![Some(&self.name), Some(&namespace_str)],
536            )
537            .await?;
538
539        if !table_namespaces.is_empty() {
540            Ok(true)
541        } else {
542            let namespaces = self
543                .fetch_rows(
544                    &format!(
545                        "SELECT 1 FROM {NAMESPACE_TABLE_NAME}
546                         WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
547                          AND {NAMESPACE_FIELD_NAME} = ?
548                         LIMIT 1"
549                    ),
550                    vec![Some(&self.name), Some(&namespace_str)],
551                )
552                .await?;
553            if !namespaces.is_empty() {
554                Ok(true)
555            } else {
556                Ok(false)
557            }
558        }
559    }
560
561    async fn update_namespace(
562        &self,
563        namespace: &NamespaceIdent,
564        properties: HashMap<String, String>,
565    ) -> Result<()> {
566        let exists = self.namespace_exists(namespace).await?;
567        if exists {
568            let existing_properties = self.get_namespace(namespace).await?.properties().clone();
569            let namespace_str = namespace.join(".");
570
571            let mut updates = vec![];
572            let mut inserts = vec![];
573
574            for (key, value) in properties.iter() {
575                if existing_properties.contains_key(key) {
576                    if existing_properties.get(key) != Some(value) {
577                        updates.push((key, value));
578                    }
579                } else {
580                    inserts.push((key, value));
581                }
582            }
583
584            let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
585            let update_stmt = format!(
586                "UPDATE {NAMESPACE_TABLE_NAME} SET {NAMESPACE_FIELD_PROPERTY_VALUE} = ?
587                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ? 
588                 AND {NAMESPACE_FIELD_NAME} = ?
589                 AND {NAMESPACE_FIELD_PROPERTY_KEY} = ?"
590            );
591
592            let insert_stmt = format!(
593                "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
594                 VALUES (?, ?, ?, ?)"
595            );
596
597            for (key, value) in updates {
598                self.execute(
599                    &update_stmt,
600                    vec![
601                        Some(value),
602                        Some(&self.name),
603                        Some(&namespace_str),
604                        Some(key),
605                    ],
606                    Some(&mut tx),
607                )
608                .await?;
609            }
610
611            for (key, value) in inserts {
612                self.execute(
613                    &insert_stmt,
614                    vec![
615                        Some(&self.name),
616                        Some(&namespace_str),
617                        Some(key),
618                        Some(value),
619                    ],
620                    Some(&mut tx),
621                )
622                .await?;
623            }
624
625            let _ = tx.commit().await.map_err(from_sqlx_error)?;
626
627            Ok(())
628        } else {
629            no_such_namespace_err(namespace)
630        }
631    }
632
633    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
634        let exists = self.namespace_exists(namespace).await?;
635        if exists {
636            // if there are tables in the namespace, don't allow drop.
637            let tables = self.list_tables(namespace).await?;
638            if !tables.is_empty() {
639                return Err(Error::new(
640                    iceberg::ErrorKind::Unexpected,
641                    format!(
642                        "Namespace {:?} is not empty. {} tables exist.",
643                        namespace,
644                        tables.len()
645                    ),
646                ));
647            }
648
649            self.execute(
650                &format!(
651                    "DELETE FROM {NAMESPACE_TABLE_NAME}
652                     WHERE {NAMESPACE_FIELD_NAME} = ?
653                      AND {CATALOG_FIELD_CATALOG_NAME} = ?"
654                ),
655                vec![Some(&namespace.join(".")), Some(&self.name)],
656                None,
657            )
658            .await?;
659
660            Ok(())
661        } else {
662            no_such_namespace_err(namespace)
663        }
664    }
665
666    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
667        let exists = self.namespace_exists(namespace).await?;
668        if exists {
669            let rows = self
670                .fetch_rows(
671                    &format!(
672                        "SELECT {CATALOG_FIELD_TABLE_NAME},
673                                {CATALOG_FIELD_TABLE_NAMESPACE}
674                         FROM {CATALOG_TABLE_NAME}
675                         WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
676                          AND {CATALOG_FIELD_CATALOG_NAME} = ?
677                          AND (
678                                {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
679                                OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
680                          )",
681                    ),
682                    vec![Some(&namespace.join(".")), Some(&self.name)],
683                )
684                .await?;
685
686            let mut tables = HashSet::<TableIdent>::with_capacity(rows.len());
687
688            for row in rows.iter() {
689                let tbl = row
690                    .try_get::<String, _>(CATALOG_FIELD_TABLE_NAME)
691                    .map_err(from_sqlx_error)?;
692                let ns_strs = row
693                    .try_get::<String, _>(CATALOG_FIELD_TABLE_NAMESPACE)
694                    .map_err(from_sqlx_error)?;
695                let ns = NamespaceIdent::from_strs(ns_strs.split("."))?;
696                tables.insert(TableIdent::new(ns, tbl));
697            }
698
699            Ok(tables.into_iter().collect::<Vec<TableIdent>>())
700        } else {
701            no_such_namespace_err(namespace)
702        }
703    }
704
705    async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
706        let namespace = identifier.namespace().join(".");
707        let table_name = identifier.name();
708        let table_counts = self
709            .fetch_rows(
710                &format!(
711                    "SELECT 1
712                     FROM {CATALOG_TABLE_NAME}
713                     WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
714                      AND {CATALOG_FIELD_CATALOG_NAME} = ?
715                      AND {CATALOG_FIELD_TABLE_NAME} = ?
716                      AND (
717                        {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
718                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
719                      )"
720                ),
721                vec![Some(&namespace), Some(&self.name), Some(table_name)],
722            )
723            .await?;
724
725        if !table_counts.is_empty() {
726            Ok(true)
727        } else {
728            Ok(false)
729        }
730    }
731
732    async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
733        if !self.table_exists(identifier).await? {
734            return no_such_table_err(identifier);
735        }
736
737        self.execute(
738            &format!(
739                "DELETE FROM {CATALOG_TABLE_NAME}
740                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
741                  AND {CATALOG_FIELD_TABLE_NAME} = ?
742                  AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
743                  AND (
744                    {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
745                    OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
746                  )"
747            ),
748            vec![
749                Some(&self.name),
750                Some(identifier.name()),
751                Some(&identifier.namespace().join(".")),
752            ],
753            None,
754        )
755        .await?;
756
757        Ok(())
758    }
759
760    async fn purge_table(&self, table: &TableIdent) -> Result<()> {
761        let table_info = self.load_table(table).await?;
762        self.drop_table(table).await?;
763        iceberg::drop_table_data(
764            table_info.file_io(),
765            table_info.metadata(),
766            table_info.metadata_location(),
767        )
768        .await
769    }
770
771    async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
772        if !self.table_exists(identifier).await? {
773            return no_such_table_err(identifier);
774        }
775
776        let rows = self
777            .fetch_rows(
778                &format!(
779                    "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP}
780                     FROM {CATALOG_TABLE_NAME}
781                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
782                      AND {CATALOG_FIELD_TABLE_NAME} = ?
783                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
784                      AND (
785                        {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
786                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
787                      )"
788                ),
789                vec![
790                    Some(&self.name),
791                    Some(identifier.name()),
792                    Some(&identifier.namespace().join(".")),
793                ],
794            )
795            .await?;
796
797        if rows.is_empty() {
798            return no_such_table_err(identifier);
799        }
800
801        let row = &rows[0];
802        let tbl_metadata_location = row
803            .try_get::<String, _>(CATALOG_FIELD_METADATA_LOCATION_PROP)
804            .map_err(from_sqlx_error)?;
805
806        let metadata = TableMetadata::read_from(&self.fileio, &tbl_metadata_location).await?;
807
808        Ok(Table::builder()
809            .file_io(self.fileio.clone())
810            .identifier(identifier.clone())
811            .metadata_location(tbl_metadata_location)
812            .metadata(metadata)
813            .build()?)
814    }
815
816    async fn create_table(
817        &self,
818        namespace: &NamespaceIdent,
819        creation: TableCreation,
820    ) -> Result<Table> {
821        if !self.namespace_exists(namespace).await? {
822            return no_such_namespace_err(namespace);
823        }
824
825        let tbl_name = creation.name.clone();
826        let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone());
827
828        if self.table_exists(&tbl_ident).await? {
829            return table_already_exists_err(&tbl_ident);
830        }
831
832        let (tbl_creation, location) = match creation.location.clone() {
833            Some(location) => (creation, location),
834            None => {
835                // fall back to namespace-specific location
836                // and then to warehouse location
837                let nsp_properties = self.get_namespace(namespace).await?.properties().clone();
838                let nsp_location = match nsp_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) {
839                    Some(location) => location.clone(),
840                    None => {
841                        format!(
842                            "{}/{}",
843                            self.warehouse_location.clone(),
844                            namespace.join("/")
845                        )
846                    }
847                };
848
849                let tbl_location = format!("{}/{}", nsp_location, tbl_ident.name());
850
851                (
852                    TableCreation {
853                        location: Some(tbl_location.clone()),
854                        ..creation
855                    },
856                    tbl_location,
857                )
858            }
859        };
860
861        let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
862            .build()?
863            .metadata;
864        let tbl_metadata_location =
865            MetadataLocation::new_with_metadata(location.clone(), &tbl_metadata);
866
867        tbl_metadata
868            .write_to(&self.fileio, &tbl_metadata_location)
869            .await?;
870
871        let tbl_metadata_location_str = tbl_metadata_location.to_string();
872        self.execute(&format!(
873            "INSERT INTO {CATALOG_TABLE_NAME}
874             ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
875             VALUES (?, ?, ?, ?, ?)
876            "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location_str), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
877
878        Ok(Table::builder()
879            .file_io(self.fileio.clone())
880            .metadata_location(tbl_metadata_location_str)
881            .identifier(tbl_ident)
882            .metadata(tbl_metadata)
883            .build()?)
884    }
885
886    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
887        if src == dest {
888            return Ok(());
889        }
890
891        if !self.table_exists(src).await? {
892            return no_such_table_err(src);
893        }
894
895        if !self.namespace_exists(dest.namespace()).await? {
896            return no_such_namespace_err(dest.namespace());
897        }
898
899        if self.table_exists(dest).await? {
900            return table_already_exists_err(dest);
901        }
902
903        self.execute(
904            &format!(
905                "UPDATE {CATALOG_TABLE_NAME}
906                 SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ?
907                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
908                  AND {CATALOG_FIELD_TABLE_NAME} = ?
909                  AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
910                  AND (
911                    {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
912                    OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
913                )"
914            ),
915            vec![
916                Some(dest.name()),
917                Some(&dest.namespace().join(".")),
918                Some(&self.name),
919                Some(src.name()),
920                Some(&src.namespace().join(".")),
921            ],
922            None,
923        )
924        .await?;
925
926        Ok(())
927    }
928
929    async fn register_table(
930        &self,
931        table_ident: &TableIdent,
932        metadata_location: String,
933    ) -> Result<Table> {
934        if self.table_exists(table_ident).await? {
935            return table_already_exists_err(table_ident);
936        }
937
938        let metadata = TableMetadata::read_from(&self.fileio, &metadata_location).await?;
939
940        let namespace = table_ident.namespace();
941        let tbl_name = table_ident.name().to_string();
942
943        self.execute(&format!(
944            "INSERT INTO {CATALOG_TABLE_NAME}
945             ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
946             VALUES (?, ?, ?, ?, ?)
947            "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name), Some(&metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
948
949        Ok(Table::builder()
950            .identifier(table_ident.clone())
951            .metadata_location(metadata_location)
952            .metadata(metadata)
953            .file_io(self.fileio.clone())
954            .build()?)
955    }
956
957    /// Updates an existing table within the SQL catalog.
958    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
959        let table_ident = commit.identifier().clone();
960        let current_table = self.load_table(&table_ident).await?;
961        let current_metadata_location = current_table.metadata_location_result()?.to_string();
962
963        let staged_table = commit.apply(current_table)?;
964        let staged_metadata_location_str = staged_table.metadata_location_result()?;
965        let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
966
967        staged_table
968            .metadata()
969            .write_to(staged_table.file_io(), &staged_metadata_location)
970            .await?;
971
972        let staged_metadata_location_str = staged_metadata_location.to_string();
973        let update_result = self
974            .execute(
975                &format!(
976                    "UPDATE {CATALOG_TABLE_NAME}
977                     SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?, {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} = ?
978                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
979                      AND {CATALOG_FIELD_TABLE_NAME} = ?
980                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
981                      AND (
982                        {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
983                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
984                      )
985                      AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?"
986                ),
987                vec![
988                    Some(&staged_metadata_location_str),
989                    Some(current_metadata_location.as_str()),
990                    Some(&self.name),
991                    Some(table_ident.name()),
992                    Some(&table_ident.namespace().join(".")),
993                    Some(current_metadata_location.as_str()),
994                ],
995                None,
996            )
997            .await?;
998
999        if update_result.rows_affected() == 0 {
1000            return Err(Error::new(
1001                ErrorKind::CatalogCommitConflicts,
1002                format!("Commit conflicted for table: {table_ident}"),
1003            )
1004            .with_retryable(true));
1005        }
1006
1007        Ok(staged_table)
1008    }
1009}
1010
1011#[cfg(test)]
1012mod tests {
1013    use std::collections::{HashMap, HashSet};
1014    use std::hash::Hash;
1015    use std::sync::Arc;
1016
1017    use iceberg::io::LocalFsStorageFactory;
1018    use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
1019    use iceberg::table::Table;
1020    use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent};
1021    use itertools::Itertools;
1022    use regex::Regex;
1023    use sqlx::migrate::MigrateDatabase;
1024    use tempfile::TempDir;
1025
1026    use crate::catalog::{
1027        NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, SQL_CATALOG_PROP_URI,
1028        SQL_CATALOG_PROP_WAREHOUSE,
1029    };
1030    use crate::{SqlBindStyle, SqlCatalogBuilder};
1031
1032    const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
1033
1034    fn temp_path() -> String {
1035        let temp_dir = TempDir::new().unwrap();
1036        temp_dir.path().to_str().unwrap().to_string()
1037    }
1038
1039    fn to_set<T: std::cmp::Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
1040        HashSet::from_iter(vec)
1041    }
1042
1043    fn default_properties() -> HashMap<String, String> {
1044        HashMap::from([("exists".to_string(), "true".to_string())])
1045    }
1046
1047    /// Create a new SQLite catalog for testing. If name is not specified it defaults to "iceberg".
1048    async fn new_sql_catalog(
1049        warehouse_location: String,
1050        name: Option<impl ToString>,
1051    ) -> impl Catalog {
1052        let name = if let Some(name) = name {
1053            name.to_string()
1054        } else {
1055            "iceberg".to_string()
1056        };
1057        let sql_lite_uri = format!("sqlite:{}", temp_path());
1058        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1059
1060        let props = HashMap::from_iter([
1061            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.to_string()),
1062            (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1063            (
1064                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1065                SqlBindStyle::DollarNumeric.to_string(),
1066            ),
1067        ]);
1068        SqlCatalogBuilder::default()
1069            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1070            .load(&name, props)
1071            .await
1072            .unwrap()
1073    }
1074
1075    async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
1076        let _ = catalog
1077            .create_namespace(namespace_ident, HashMap::new())
1078            .await
1079            .unwrap();
1080    }
1081
1082    async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
1083        for namespace_ident in namespace_idents {
1084            let _ = create_namespace(catalog, namespace_ident).await;
1085        }
1086    }
1087
1088    fn simple_table_schema() -> Schema {
1089        Schema::builder()
1090            .with_fields(vec![
1091                NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
1092            ])
1093            .build()
1094            .unwrap()
1095    }
1096
1097    async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
1098        let _ = catalog
1099            .create_table(
1100                &table_ident.namespace,
1101                TableCreation::builder()
1102                    .name(table_ident.name().into())
1103                    .schema(simple_table_schema())
1104                    .location(temp_path())
1105                    .build(),
1106            )
1107            .await
1108            .unwrap();
1109    }
1110
1111    async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
1112        for table_ident in table_idents {
1113            create_table(catalog, table_ident).await;
1114        }
1115    }
1116
1117    fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
1118        assert_eq!(table.identifier(), expected_table_ident);
1119
1120        let metadata = table.metadata();
1121
1122        assert_eq!(metadata.current_schema().as_ref(), expected_schema);
1123
1124        let expected_partition_spec = PartitionSpec::builder(expected_schema.clone())
1125            .with_spec_id(0)
1126            .build()
1127            .unwrap();
1128
1129        assert_eq!(
1130            metadata
1131                .partition_specs_iter()
1132                .map(|p| p.as_ref())
1133                .collect_vec(),
1134            vec![&expected_partition_spec]
1135        );
1136
1137        let expected_sorted_order = SortOrder::builder()
1138            .with_order_id(0)
1139            .with_fields(vec![])
1140            .build(expected_schema)
1141            .unwrap();
1142
1143        assert_eq!(
1144            metadata
1145                .sort_orders_iter()
1146                .map(|s| s.as_ref())
1147                .collect_vec(),
1148            vec![&expected_sorted_order]
1149        );
1150
1151        assert_eq!(metadata.properties(), &HashMap::new());
1152
1153        assert!(!table.readonly());
1154    }
1155
1156    fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
1157        let actual = table.metadata_location().unwrap().to_string();
1158        let regex = Regex::new(regex_str).unwrap();
1159        assert!(regex.is_match(&actual))
1160    }
1161
1162    #[tokio::test]
1163    async fn test_initialized() {
1164        let warehouse_loc = temp_path();
1165        new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1166        // catalog instantiation should not fail even if tables exist
1167        new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1168        new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1169    }
1170
1171    #[tokio::test]
1172    async fn test_builder_method() {
1173        let sql_lite_uri = format!("sqlite:{}", temp_path());
1174        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1175        let warehouse_location = temp_path();
1176
1177        let catalog = SqlCatalogBuilder::default()
1178            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1179            .uri(sql_lite_uri.to_string())
1180            .warehouse_location(warehouse_location.clone())
1181            .sql_bind_style(SqlBindStyle::QMark)
1182            .load("iceberg", HashMap::default())
1183            .await;
1184        assert!(catalog.is_ok());
1185
1186        let catalog = catalog.unwrap();
1187        assert!(catalog.warehouse_location == warehouse_location);
1188        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1189    }
1190
1191    /// Overwriting an sqlite database with a non-existent path causes
1192    /// catalog generation to fail
1193    #[tokio::test]
1194    async fn test_builder_props_non_existent_path_fails() {
1195        let sql_lite_uri = format!("sqlite:{}", temp_path());
1196        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1197        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1198        let warehouse_location = temp_path();
1199
1200        let catalog = SqlCatalogBuilder::default()
1201            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1202            .uri(sql_lite_uri)
1203            .warehouse_location(warehouse_location)
1204            .load(
1205                "iceberg",
1206                HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)]),
1207            )
1208            .await;
1209        assert!(catalog.is_err());
1210    }
1211
1212    /// Even when an invalid URI is specified in a builder method,
1213    /// it can be successfully overridden with a valid URI in props
1214    /// for catalog generation to succeed.
1215    #[tokio::test]
1216    async fn test_builder_props_set_valid_uri() {
1217        let sql_lite_uri = format!("sqlite:{}", temp_path());
1218        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1219        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1220        let warehouse_location = temp_path();
1221
1222        let catalog = SqlCatalogBuilder::default()
1223            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1224            .uri(sql_lite_uri2)
1225            .warehouse_location(warehouse_location)
1226            .load(
1227                "iceberg",
1228                HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone())]),
1229            )
1230            .await;
1231        assert!(catalog.is_ok());
1232    }
1233
1234    /// values assigned via props take precedence
1235    #[tokio::test]
1236    async fn test_builder_props_take_precedence() {
1237        let sql_lite_uri = format!("sqlite:{}", temp_path());
1238        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1239        let warehouse_location = temp_path();
1240        let warehouse_location2 = temp_path();
1241
1242        let catalog = SqlCatalogBuilder::default()
1243            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1244            .warehouse_location(warehouse_location2)
1245            .sql_bind_style(SqlBindStyle::DollarNumeric)
1246            .load(
1247                "iceberg",
1248                HashMap::from_iter([
1249                    (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1250                    (
1251                        SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1252                        warehouse_location.clone(),
1253                    ),
1254                    (
1255                        SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1256                        SqlBindStyle::QMark.to_string(),
1257                    ),
1258                ]),
1259            )
1260            .await;
1261
1262        assert!(catalog.is_ok());
1263
1264        let catalog = catalog.unwrap();
1265        assert!(catalog.warehouse_location == warehouse_location);
1266        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1267    }
1268
1269    /// values assigned via props take precedence
1270    #[tokio::test]
1271    async fn test_builder_props_take_precedence_props() {
1272        let sql_lite_uri = format!("sqlite:{}", temp_path());
1273        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1274        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1275        let warehouse_location = temp_path();
1276        let warehouse_location2 = temp_path();
1277
1278        let props = HashMap::from_iter([
1279            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1280            (
1281                SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1282                warehouse_location.clone(),
1283            ),
1284            (
1285                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1286                SqlBindStyle::QMark.to_string(),
1287            ),
1288        ]);
1289        let props2 = HashMap::from_iter([
1290            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2.clone()),
1291            (
1292                SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1293                warehouse_location2.clone(),
1294            ),
1295            (
1296                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1297                SqlBindStyle::DollarNumeric.to_string(),
1298            ),
1299        ]);
1300
1301        let catalog = SqlCatalogBuilder::default()
1302            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1303            .props(props2)
1304            .load("iceberg", props)
1305            .await;
1306
1307        assert!(catalog.is_ok());
1308
1309        let catalog = catalog.unwrap();
1310        assert!(catalog.warehouse_location == warehouse_location);
1311        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1312    }
1313
1314    /// values assigned via props take precedence
1315    #[tokio::test]
1316    async fn test_builder_props_take_precedence_prop() {
1317        let sql_lite_uri = format!("sqlite:{}", temp_path());
1318        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1319        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1320        let warehouse_location = temp_path();
1321        let warehouse_location2 = temp_path();
1322
1323        let props = HashMap::from_iter([
1324            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1325            (
1326                SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1327                warehouse_location.clone(),
1328            ),
1329            (
1330                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1331                SqlBindStyle::QMark.to_string(),
1332            ),
1333        ]);
1334
1335        let catalog = SqlCatalogBuilder::default()
1336            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1337            .prop(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)
1338            .prop(SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location2)
1339            .prop(
1340                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1341                SqlBindStyle::DollarNumeric.to_string(),
1342            )
1343            .load("iceberg", props)
1344            .await;
1345
1346        assert!(catalog.is_ok());
1347
1348        let catalog = catalog.unwrap();
1349        assert!(catalog.warehouse_location == warehouse_location);
1350        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1351    }
1352
1353    /// invalid value for `SqlBindStyle` causes catalog creation to fail
1354    #[tokio::test]
1355    async fn test_builder_props_invalid_bind_style_fails() {
1356        let sql_lite_uri = format!("sqlite:{}", temp_path());
1357        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1358        let warehouse_location = temp_path();
1359
1360        let catalog = SqlCatalogBuilder::default()
1361            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1362            .load(
1363                "iceberg",
1364                HashMap::from_iter([
1365                    (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1366                    (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1367                    (SQL_CATALOG_PROP_BIND_STYLE.to_string(), "AAA".to_string()),
1368                ]),
1369            )
1370            .await;
1371
1372        assert!(catalog.is_err());
1373    }
1374
1375    #[tokio::test]
1376    async fn test_list_namespaces_returns_empty_vector() {
1377        let warehouse_loc = temp_path();
1378        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1379
1380        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
1381    }
1382
1383    #[tokio::test]
1384    async fn test_list_namespaces_returns_empty_different_name() {
1385        let warehouse_loc = temp_path();
1386        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1387        let namespace_ident_1 = NamespaceIdent::new("a".into());
1388        let namespace_ident_2 = NamespaceIdent::new("b".into());
1389        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1390        assert_eq!(
1391            to_set(catalog.list_namespaces(None).await.unwrap()),
1392            to_set(vec![namespace_ident_1, namespace_ident_2])
1393        );
1394
1395        let catalog2 = new_sql_catalog(warehouse_loc, Some("test")).await;
1396        assert_eq!(catalog2.list_namespaces(None).await.unwrap(), vec![]);
1397    }
1398
1399    #[tokio::test]
1400    async fn test_list_namespaces_returns_only_top_level_namespaces() {
1401        let warehouse_loc = temp_path();
1402        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1403        let namespace_ident_1 = NamespaceIdent::new("a".into());
1404        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1405        let namespace_ident_3 = NamespaceIdent::new("b".into());
1406        create_namespaces(&catalog, &vec![
1407            &namespace_ident_1,
1408            &namespace_ident_2,
1409            &namespace_ident_3,
1410        ])
1411        .await;
1412
1413        assert_eq!(
1414            to_set(catalog.list_namespaces(None).await.unwrap()),
1415            to_set(vec![namespace_ident_1, namespace_ident_3])
1416        );
1417    }
1418
1419    #[tokio::test]
1420    async fn test_list_namespaces_returns_no_namespaces_under_parent() {
1421        let warehouse_loc = temp_path();
1422        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1423        let namespace_ident_1 = NamespaceIdent::new("a".into());
1424        let namespace_ident_2 = NamespaceIdent::new("b".into());
1425        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1426
1427        assert_eq!(
1428            catalog
1429                .list_namespaces(Some(&namespace_ident_1))
1430                .await
1431                .unwrap(),
1432            vec![]
1433        );
1434    }
1435
1436    #[tokio::test]
1437    async fn test_list_namespaces_returns_namespace_under_parent() {
1438        let warehouse_loc = temp_path();
1439        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1440        let namespace_ident_1 = NamespaceIdent::new("a".into());
1441        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1442        let namespace_ident_3 = NamespaceIdent::new("c".into());
1443        create_namespaces(&catalog, &vec![
1444            &namespace_ident_1,
1445            &namespace_ident_2,
1446            &namespace_ident_3,
1447        ])
1448        .await;
1449
1450        assert_eq!(
1451            to_set(catalog.list_namespaces(None).await.unwrap()),
1452            to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
1453        );
1454
1455        assert_eq!(
1456            catalog
1457                .list_namespaces(Some(&namespace_ident_1))
1458                .await
1459                .unwrap(),
1460            vec![NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()]
1461        );
1462    }
1463
1464    #[tokio::test]
1465    async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
1466        let warehouse_loc = temp_path();
1467        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1468        let namespace_ident_1 = NamespaceIdent::new("a".to_string());
1469        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
1470        let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1471        let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
1472        let namespace_ident_5 = NamespaceIdent::new("b".into());
1473        create_namespaces(&catalog, &vec![
1474            &namespace_ident_1,
1475            &namespace_ident_2,
1476            &namespace_ident_3,
1477            &namespace_ident_4,
1478            &namespace_ident_5,
1479        ])
1480        .await;
1481
1482        assert_eq!(
1483            to_set(
1484                catalog
1485                    .list_namespaces(Some(&namespace_ident_1))
1486                    .await
1487                    .unwrap()
1488            ),
1489            to_set(vec![
1490                NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(),
1491                NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(),
1492                NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(),
1493            ])
1494        );
1495    }
1496
1497    #[tokio::test]
1498    async fn test_namespace_exists_returns_false() {
1499        let warehouse_loc = temp_path();
1500        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1501        let namespace_ident = NamespaceIdent::new("a".into());
1502        create_namespace(&catalog, &namespace_ident).await;
1503
1504        assert!(
1505            !catalog
1506                .namespace_exists(&NamespaceIdent::new("b".into()))
1507                .await
1508                .unwrap()
1509        );
1510    }
1511
1512    #[tokio::test]
1513    async fn test_namespace_exists_returns_true() {
1514        let warehouse_loc = temp_path();
1515        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1516        let namespace_ident = NamespaceIdent::new("a".into());
1517        create_namespace(&catalog, &namespace_ident).await;
1518
1519        assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
1520    }
1521
1522    #[tokio::test]
1523    async fn test_create_namespace_with_properties() {
1524        let warehouse_loc = temp_path();
1525        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1526        let namespace_ident = NamespaceIdent::new("abc".into());
1527
1528        let mut properties = default_properties();
1529        properties.insert("k".into(), "v".into());
1530
1531        assert_eq!(
1532            catalog
1533                .create_namespace(&namespace_ident, properties.clone())
1534                .await
1535                .unwrap(),
1536            Namespace::with_properties(namespace_ident.clone(), properties.clone())
1537        );
1538
1539        assert_eq!(
1540            catalog.get_namespace(&namespace_ident).await.unwrap(),
1541            Namespace::with_properties(namespace_ident, properties)
1542        );
1543    }
1544
1545    #[tokio::test]
1546    async fn test_create_nested_namespace() {
1547        let warehouse_loc = temp_path();
1548        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1549        let parent_namespace_ident = NamespaceIdent::new("a".into());
1550        create_namespace(&catalog, &parent_namespace_ident).await;
1551
1552        let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1553
1554        assert_eq!(
1555            catalog
1556                .create_namespace(&child_namespace_ident, HashMap::new())
1557                .await
1558                .unwrap(),
1559            Namespace::new(child_namespace_ident.clone())
1560        );
1561
1562        assert_eq!(
1563            catalog.get_namespace(&child_namespace_ident).await.unwrap(),
1564            Namespace::with_properties(child_namespace_ident, default_properties())
1565        );
1566    }
1567
1568    #[tokio::test]
1569    async fn test_create_deeply_nested_namespace() {
1570        let warehouse_loc = temp_path();
1571        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1572        let namespace_ident_a = NamespaceIdent::new("a".into());
1573        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1574        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1575
1576        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1577
1578        assert_eq!(
1579            catalog
1580                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
1581                .await
1582                .unwrap(),
1583            Namespace::new(namespace_ident_a_b_c.clone())
1584        );
1585
1586        assert_eq!(
1587            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
1588            Namespace::with_properties(namespace_ident_a_b_c, default_properties())
1589        );
1590    }
1591
1592    #[tokio::test]
1593    async fn test_update_namespace_noop() {
1594        let warehouse_loc = temp_path();
1595        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1596        let namespace_ident = NamespaceIdent::new("a".into());
1597        create_namespace(&catalog, &namespace_ident).await;
1598
1599        catalog
1600            .update_namespace(&namespace_ident, HashMap::new())
1601            .await
1602            .unwrap();
1603
1604        assert_eq!(
1605            *catalog
1606                .get_namespace(&namespace_ident)
1607                .await
1608                .unwrap()
1609                .properties(),
1610            HashMap::from_iter([("exists".to_string(), "true".to_string())])
1611        )
1612    }
1613
1614    #[tokio::test]
1615    async fn test_update_nested_namespace() {
1616        let warehouse_loc = temp_path();
1617        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1618        let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1619        create_namespace(&catalog, &namespace_ident).await;
1620
1621        let mut props = HashMap::from_iter([
1622            ("prop1".to_string(), "val1".to_string()),
1623            ("prop2".into(), "val2".into()),
1624        ]);
1625
1626        catalog
1627            .update_namespace(&namespace_ident, props.clone())
1628            .await
1629            .unwrap();
1630
1631        props.insert("exists".into(), "true".into());
1632
1633        assert_eq!(
1634            *catalog
1635                .get_namespace(&namespace_ident)
1636                .await
1637                .unwrap()
1638                .properties(),
1639            props
1640        )
1641    }
1642
1643    #[tokio::test]
1644    async fn test_update_namespace_errors_if_nested_namespace_doesnt_exist() {
1645        let warehouse_loc = temp_path();
1646        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1647        let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1648
1649        let props = HashMap::from_iter([
1650            ("prop1".to_string(), "val1".to_string()),
1651            ("prop2".into(), "val2".into()),
1652        ]);
1653
1654        let err = catalog
1655            .update_namespace(&namespace_ident, props)
1656            .await
1657            .unwrap_err();
1658
1659        assert_eq!(
1660            err.message(),
1661            format!("No such namespace: {namespace_ident:?}")
1662        );
1663    }
1664
1665    #[tokio::test]
1666    async fn test_drop_nested_namespace() {
1667        let warehouse_loc = temp_path();
1668        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1669        let namespace_ident_a = NamespaceIdent::new("a".into());
1670        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1671        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1672
1673        catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1674
1675        assert!(
1676            !catalog
1677                .namespace_exists(&namespace_ident_a_b)
1678                .await
1679                .unwrap()
1680        );
1681
1682        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1683    }
1684
1685    #[tokio::test]
1686    async fn test_drop_deeply_nested_namespace() {
1687        let warehouse_loc = temp_path();
1688        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1689        let namespace_ident_a = NamespaceIdent::new("a".into());
1690        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1691        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1692        create_namespaces(&catalog, &vec![
1693            &namespace_ident_a,
1694            &namespace_ident_a_b,
1695            &namespace_ident_a_b_c,
1696        ])
1697        .await;
1698
1699        catalog
1700            .drop_namespace(&namespace_ident_a_b_c)
1701            .await
1702            .unwrap();
1703
1704        assert!(
1705            !catalog
1706                .namespace_exists(&namespace_ident_a_b_c)
1707                .await
1708                .unwrap()
1709        );
1710
1711        assert!(
1712            catalog
1713                .namespace_exists(&namespace_ident_a_b)
1714                .await
1715                .unwrap()
1716        );
1717
1718        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1719    }
1720
1721    #[tokio::test]
1722    async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1723        let warehouse_loc = temp_path();
1724        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1725        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1726
1727        let non_existent_namespace_ident =
1728            NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1729        assert_eq!(
1730            catalog
1731                .drop_namespace(&non_existent_namespace_ident)
1732                .await
1733                .unwrap_err()
1734                .to_string(),
1735            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1736        )
1737    }
1738
1739    #[tokio::test]
1740    async fn test_dropping_a_namespace_does_not_drop_namespaces_nested_under_that_one() {
1741        let warehouse_loc = temp_path();
1742        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1743        let namespace_ident_a = NamespaceIdent::new("a".into());
1744        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1745        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1746
1747        catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1748
1749        assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1750
1751        assert!(
1752            catalog
1753                .namespace_exists(&namespace_ident_a_b)
1754                .await
1755                .unwrap()
1756        );
1757    }
1758
1759    #[tokio::test]
1760    async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1761        let warehouse_loc = temp_path();
1762        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1763
1764        let namespace_ident = NamespaceIdent::new("a".into());
1765        let mut namespace_properties = HashMap::new();
1766        let namespace_location = temp_path();
1767        namespace_properties.insert(
1768            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1769            namespace_location.to_string(),
1770        );
1771        catalog
1772            .create_namespace(&namespace_ident, namespace_properties)
1773            .await
1774            .unwrap();
1775
1776        let table_name = "tbl1";
1777        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1778        let expected_table_metadata_location_regex =
1779            format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1780
1781        let table = catalog
1782            .create_table(
1783                &namespace_ident,
1784                TableCreation::builder()
1785                    .name(table_name.into())
1786                    .schema(simple_table_schema())
1787                    // no location specified for table
1788                    .build(),
1789            )
1790            .await
1791            .unwrap();
1792        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1793        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1794
1795        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1796        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1797        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1798    }
1799
1800    #[tokio::test]
1801    async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1802     {
1803        let warehouse_loc = temp_path();
1804        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1805
1806        let namespace_ident = NamespaceIdent::new("a".into());
1807        let mut namespace_properties = HashMap::new();
1808        let namespace_location = temp_path();
1809        namespace_properties.insert(
1810            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1811            namespace_location.to_string(),
1812        );
1813        catalog
1814            .create_namespace(&namespace_ident, namespace_properties)
1815            .await
1816            .unwrap();
1817
1818        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1819        let mut nested_namespace_properties = HashMap::new();
1820        let nested_namespace_location = temp_path();
1821        nested_namespace_properties.insert(
1822            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1823            nested_namespace_location.to_string(),
1824        );
1825        catalog
1826            .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1827            .await
1828            .unwrap();
1829
1830        let table_name = "tbl1";
1831        let expected_table_ident =
1832            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1833        let expected_table_metadata_location_regex = format!(
1834            "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1835        );
1836
1837        let table = catalog
1838            .create_table(
1839                &nested_namespace_ident,
1840                TableCreation::builder()
1841                    .name(table_name.into())
1842                    .schema(simple_table_schema())
1843                    // no location specified for table
1844                    .build(),
1845            )
1846            .await
1847            .unwrap();
1848        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1849        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1850
1851        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1852        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1853        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1854    }
1855
1856    #[tokio::test]
1857    async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1858     {
1859        let warehouse_loc = temp_path();
1860        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1861
1862        let namespace_ident = NamespaceIdent::new("a".into());
1863        // note: no location specified in namespace_properties
1864        let namespace_properties = HashMap::new();
1865        catalog
1866            .create_namespace(&namespace_ident, namespace_properties)
1867            .await
1868            .unwrap();
1869
1870        let table_name = "tbl1";
1871        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1872        let expected_table_metadata_location_regex =
1873            format!("^{warehouse_loc}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1874
1875        let table = catalog
1876            .create_table(
1877                &namespace_ident,
1878                TableCreation::builder()
1879                    .name(table_name.into())
1880                    .schema(simple_table_schema())
1881                    // no location specified for table
1882                    .build(),
1883            )
1884            .await
1885            .unwrap();
1886        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1887        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1888
1889        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1890        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1891        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1892    }
1893
1894    #[tokio::test]
1895    async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1896     {
1897        let warehouse_loc = temp_path();
1898        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1899
1900        let namespace_ident = NamespaceIdent::new("a".into());
1901        create_namespace(&catalog, &namespace_ident).await;
1902
1903        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1904        create_namespace(&catalog, &nested_namespace_ident).await;
1905
1906        let table_name = "tbl1";
1907        let expected_table_ident =
1908            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1909        let expected_table_metadata_location_regex =
1910            format!("^{warehouse_loc}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1911
1912        let table = catalog
1913            .create_table(
1914                &nested_namespace_ident,
1915                TableCreation::builder()
1916                    .name(table_name.into())
1917                    .schema(simple_table_schema())
1918                    // no location specified for table
1919                    .build(),
1920            )
1921            .await
1922            .unwrap();
1923        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1924        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1925
1926        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1927        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1928        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1929    }
1930
1931    #[tokio::test]
1932    async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1933        let warehouse_loc = temp_path();
1934        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1935        let namespace_ident = NamespaceIdent::new("a".into());
1936        create_namespace(&catalog, &namespace_ident).await;
1937        let table_name = "tbl1";
1938        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1939        create_table(&catalog, &table_ident).await;
1940
1941        let tmp_dir = TempDir::new().unwrap();
1942        let location = tmp_dir.path().to_str().unwrap().to_string();
1943
1944        assert_eq!(
1945            catalog
1946                .create_table(
1947                    &namespace_ident,
1948                    TableCreation::builder()
1949                        .name(table_name.into())
1950                        .schema(simple_table_schema())
1951                        .location(location)
1952                        .build()
1953                )
1954                .await
1955                .unwrap_err()
1956                .to_string(),
1957            format!(
1958                "TableAlreadyExists => Table {:?} already exists.",
1959                &table_ident
1960            )
1961        );
1962    }
1963
1964    #[tokio::test]
1965    async fn test_rename_table_src_table_is_same_as_dst_table() {
1966        let warehouse_loc = temp_path();
1967        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1968        let namespace_ident = NamespaceIdent::new("n1".into());
1969        create_namespace(&catalog, &namespace_ident).await;
1970        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1971        create_table(&catalog, &table_ident).await;
1972
1973        catalog
1974            .rename_table(&table_ident, &table_ident)
1975            .await
1976            .unwrap();
1977
1978        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1979            table_ident
1980        ],);
1981    }
1982
1983    #[tokio::test]
1984    async fn test_rename_table_across_nested_namespaces() {
1985        let warehouse_loc = temp_path();
1986        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1987        let namespace_ident_a = NamespaceIdent::new("a".into());
1988        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1989        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1990        create_namespaces(&catalog, &vec![
1991            &namespace_ident_a,
1992            &namespace_ident_a_b,
1993            &namespace_ident_a_b_c,
1994        ])
1995        .await;
1996
1997        let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
1998        create_tables(&catalog, vec![&src_table_ident]).await;
1999
2000        let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
2001        catalog
2002            .rename_table(&src_table_ident, &dst_table_ident)
2003            .await
2004            .unwrap();
2005
2006        assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
2007
2008        assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
2009    }
2010
2011    #[tokio::test]
2012    async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
2013        let warehouse_loc = temp_path();
2014        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2015        let src_namespace_ident = NamespaceIdent::new("n1".into());
2016        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
2017        create_namespace(&catalog, &src_namespace_ident).await;
2018        create_table(&catalog, &src_table_ident).await;
2019
2020        let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
2021        let dst_table_ident =
2022            TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
2023        assert_eq!(
2024            catalog
2025                .rename_table(&src_table_ident, &dst_table_ident)
2026                .await
2027                .unwrap_err()
2028                .to_string(),
2029            format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"),
2030        );
2031    }
2032}