iceberg_catalog_sql/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use async_trait::async_trait;
24use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
25use iceberg::spec::{TableMetadata, TableMetadataBuilder};
26use iceberg::table::Table;
27use iceberg::{
28    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
29    Runtime, TableCommit, TableCreation, TableIdent,
30};
31use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers};
32use sqlx::{Any, AnyPool, Row, Transaction};
33
34use crate::error::{
35    from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err,
36};
37
38/// catalog URI
39pub const SQL_CATALOG_PROP_URI: &str = "uri";
40/// catalog warehouse location
41pub const SQL_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
42/// catalog sql bind style
43pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style";
44
45static CATALOG_TABLE_NAME: &str = "iceberg_tables";
46static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
47static CATALOG_FIELD_TABLE_NAME: &str = "table_name";
48static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace";
49static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location";
50static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
51static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type";
52static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE";
53
54static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties";
55static NAMESPACE_FIELD_NAME: &str = "namespace";
56static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key";
57static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value";
58
59static NAMESPACE_LOCATION_PROPERTY_KEY: &str = "location";
60
61static MAX_CONNECTIONS: u32 = 10; // Default the SQL pool to 10 connections if not provided
62static IDLE_TIMEOUT: u64 = 10; // Default the maximum idle timeout per connection to 10s before it is closed
63static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each connection to enabled prior to returning
64
65/// Builder for [`SqlCatalog`]
66#[derive(Debug)]
67pub struct SqlCatalogBuilder {
68    config: SqlCatalogConfig,
69    storage_factory: Option<Arc<dyn StorageFactory>>,
70    runtime: Option<Runtime>,
71}
72
73impl Default for SqlCatalogBuilder {
74    fn default() -> Self {
75        Self {
76            config: SqlCatalogConfig {
77                uri: "".to_string(),
78                name: "".to_string(),
79                warehouse_location: "".to_string(),
80                sql_bind_style: SqlBindStyle::DollarNumeric,
81                props: HashMap::new(),
82            },
83            storage_factory: None,
84            runtime: None,
85        }
86    }
87}
88
89impl SqlCatalogBuilder {
90    /// Configure the database URI
91    ///
92    /// If `SQL_CATALOG_PROP_URI` has a value set in `props` during `SqlCatalogBuilder::load`,
93    /// that value takes precedence, and the value specified by this method will not be used.
94    pub fn uri(mut self, uri: impl Into<String>) -> Self {
95        self.config.uri = uri.into();
96        self
97    }
98
99    /// Configure the warehouse location
100    ///
101    /// If `SQL_CATALOG_PROP_WAREHOUSE` has a value set in `props` during `SqlCatalogBuilder::load`,
102    /// that value takes precedence, and the value specified by this method will not be used.
103    pub fn warehouse_location(mut self, location: impl Into<String>) -> Self {
104        self.config.warehouse_location = location.into();
105        self
106    }
107
108    /// Configure the bound SQL Statement
109    ///
110    /// If `SQL_CATALOG_PROP_BIND_STYLE` has a value set in `props` during `SqlCatalogBuilder::load`,
111    /// that value takes precedence, and the value specified by this method will not be used.
112    pub fn sql_bind_style(mut self, sql_bind_style: SqlBindStyle) -> Self {
113        self.config.sql_bind_style = sql_bind_style;
114        self
115    }
116
117    /// Configure the any properties
118    ///
119    /// If the same key has values set in `props` during `SqlCatalogBuilder::load`,
120    /// those values will take precedence.
121    pub fn props(mut self, props: HashMap<String, String>) -> Self {
122        for (k, v) in props {
123            self.config.props.insert(k, v);
124        }
125        self
126    }
127
128    /// Set a new property on the property to be configured.
129    /// When multiple methods are executed with the same key,
130    /// the later-set value takes precedence.
131    ///
132    /// If the same key has values set in `props` during `SqlCatalogBuilder::load`,
133    /// those values will take precedence.
134    pub fn prop(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
135        self.config.props.insert(key.into(), value.into());
136        self
137    }
138}
139
140impl CatalogBuilder for SqlCatalogBuilder {
141    type C = SqlCatalog;
142
143    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
144        self.storage_factory = Some(storage_factory);
145        self
146    }
147
148    fn with_runtime(mut self, runtime: Runtime) -> Self {
149        self.runtime = Some(runtime);
150        self
151    }
152
153    fn load(
154        mut self,
155        name: impl Into<String>,
156        props: HashMap<String, String>,
157    ) -> impl Future<Output = Result<Self::C>> + Send {
158        for (k, v) in props {
159            self.config.props.insert(k, v);
160        }
161
162        if let Some(uri) = self.config.props.remove(SQL_CATALOG_PROP_URI) {
163            self.config.uri = uri;
164        }
165        if let Some(warehouse_location) = self.config.props.remove(SQL_CATALOG_PROP_WAREHOUSE) {
166            self.config.warehouse_location = warehouse_location;
167        }
168
169        let name = name.into();
170
171        let mut valid_sql_bind_style = true;
172        if let Some(sql_bind_style) = self.config.props.remove(SQL_CATALOG_PROP_BIND_STYLE) {
173            if let Ok(sql_bind_style) = SqlBindStyle::from_str(&sql_bind_style) {
174                self.config.sql_bind_style = sql_bind_style;
175            } else {
176                valid_sql_bind_style = false;
177            }
178        }
179
180        let valid_name = !name.trim().is_empty();
181
182        async move {
183            if !valid_name {
184                Err(Error::new(
185                    ErrorKind::DataInvalid,
186                    "Catalog name cannot be empty",
187                ))
188            } else if !valid_sql_bind_style {
189                Err(Error::new(
190                    ErrorKind::DataInvalid,
191                    format!(
192                        "`{}` values are valid only if they're `{}` or `{}`",
193                        SQL_CATALOG_PROP_BIND_STYLE,
194                        SqlBindStyle::DollarNumeric,
195                        SqlBindStyle::QMark
196                    ),
197                ))
198            } else {
199                self.config.name = name;
200                let runtime = match self.runtime {
201                    Some(rt) => rt,
202                    None => Runtime::try_current()?,
203                };
204                SqlCatalog::new(self.config, self.storage_factory, runtime).await
205            }
206        }
207    }
208}
209
210/// A struct representing the SQL catalog configuration.
211///
212/// This struct contains various parameters that are used to configure a SQL catalog,
213/// such as the database URI, warehouse location, and file I/O settings.
214/// You are required to provide a `SqlBindStyle`, which determines how SQL statements will be bound to values in the catalog.
215/// The options available for this parameter include:
216/// - `SqlBindStyle::DollarNumeric`: Binds SQL statements using `$1`, `$2`, etc., as placeholders. This is for PostgreSQL databases.
217/// - `SqlBindStyle::QuestionMark`: Binds SQL statements using `?` as a placeholder. This is for MySQL and SQLite databases.
218#[derive(Debug)]
219struct SqlCatalogConfig {
220    uri: String,
221    name: String,
222    warehouse_location: String,
223    sql_bind_style: SqlBindStyle,
224    props: HashMap<String, String>,
225}
226
227#[derive(Debug)]
228/// Sql catalog implementation.
229pub struct SqlCatalog {
230    name: String,
231    connection: AnyPool,
232    warehouse_location: String,
233    fileio: FileIO,
234    sql_bind_style: SqlBindStyle,
235    runtime: Runtime,
236}
237
238#[derive(Debug, PartialEq, strum::EnumString, strum::Display)]
239/// Set the SQL parameter bind style to either $1..$N (Postgres style) or ? (SQLite/MySQL/MariaDB)
240pub enum SqlBindStyle {
241    /// DollarNumeric uses parameters of the form `$1..$N``, which is the Postgres style
242    DollarNumeric,
243    /// QMark uses parameters of the form `?` which is the style for other dialects (SQLite/MySQL/MariaDB)
244    QMark,
245}
246
247impl SqlCatalog {
248    /// Create new sql catalog instance
249    async fn new(
250        config: SqlCatalogConfig,
251        storage_factory: Option<Arc<dyn StorageFactory>>,
252        runtime: Runtime,
253    ) -> Result<Self> {
254        let factory = storage_factory.ok_or_else(|| {
255            Error::new(
256                ErrorKind::Unexpected,
257                "StorageFactory must be provided for SqlCatalog. Use `with_storage_factory` to configure it.",
258            )
259        })?;
260        // Forward catalog props so storage-backend keys reach the FileIO.
261        // Unrecognized keys are ignored by backends.
262        let fileio = FileIOBuilder::new(factory)
263            .with_props(config.props.clone())
264            .build();
265
266        install_default_drivers();
267        let max_connections: u32 = config
268            .props
269            .get("pool.max-connections")
270            .map(|v| v.parse().unwrap())
271            .unwrap_or(MAX_CONNECTIONS);
272        let idle_timeout: u64 = config
273            .props
274            .get("pool.idle-timeout")
275            .map(|v| v.parse().unwrap())
276            .unwrap_or(IDLE_TIMEOUT);
277        let test_before_acquire: bool = config
278            .props
279            .get("pool.test-before-acquire")
280            .map(|v| v.parse().unwrap())
281            .unwrap_or(TEST_BEFORE_ACQUIRE);
282
283        let pool = AnyPoolOptions::new()
284            .max_connections(max_connections)
285            .idle_timeout(Duration::from_secs(idle_timeout))
286            .test_before_acquire(test_before_acquire)
287            .connect(&config.uri)
288            .await
289            .map_err(from_sqlx_error)?;
290
291        sqlx::query(&format!(
292            "CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_NAME} (
293                {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
294                {CATALOG_FIELD_TABLE_NAMESPACE} VARCHAR(255) NOT NULL,
295                {CATALOG_FIELD_TABLE_NAME} VARCHAR(255) NOT NULL,
296                {CATALOG_FIELD_METADATA_LOCATION_PROP} VARCHAR(1000),
297                {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} VARCHAR(1000),
298                {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5),
299                PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}))"
300        ))
301        .execute(&pool)
302        .await
303        .map_err(from_sqlx_error)?;
304
305        sqlx::query(&format!(
306            "CREATE TABLE IF NOT EXISTS {NAMESPACE_TABLE_NAME} (
307                {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
308                {NAMESPACE_FIELD_NAME} VARCHAR(255) NOT NULL,
309                {NAMESPACE_FIELD_PROPERTY_KEY} VARCHAR(255),
310                {NAMESPACE_FIELD_PROPERTY_VALUE} VARCHAR(1000),
311                PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}))"
312        ))
313        .execute(&pool)
314        .await
315        .map_err(from_sqlx_error)?;
316
317        Ok(SqlCatalog {
318            name: config.name.to_owned(),
319            connection: pool,
320            warehouse_location: config.warehouse_location,
321            fileio,
322            sql_bind_style: config.sql_bind_style,
323            runtime,
324        })
325    }
326
327    /// SQLX Any does not implement PostgresSQL bindings, so we have to do this.
328    fn replace_placeholders(&self, query: &str) -> String {
329        match self.sql_bind_style {
330            SqlBindStyle::DollarNumeric => {
331                let mut count = 1;
332                query
333                    .chars()
334                    .fold(String::with_capacity(query.len()), |mut acc, c| {
335                        if c == '?' {
336                            acc.push('$');
337                            acc.push_str(&count.to_string());
338                            count += 1;
339                        } else {
340                            acc.push(c);
341                        }
342                        acc
343                    })
344            }
345            _ => query.to_owned(),
346        }
347    }
348
349    /// Fetch a vec of AnyRows from a given query
350    async fn fetch_rows(&self, query: &str, args: Vec<Option<&str>>) -> Result<Vec<AnyRow>> {
351        let query_with_placeholders = self.replace_placeholders(query);
352
353        let mut sqlx_query = sqlx::query(&query_with_placeholders);
354        for arg in args {
355            sqlx_query = sqlx_query.bind(arg);
356        }
357
358        sqlx_query
359            .fetch_all(&self.connection)
360            .await
361            .map_err(from_sqlx_error)
362    }
363
364    /// Execute statements in a transaction, provided or not
365    async fn execute(
366        &self,
367        query: &str,
368        args: Vec<Option<&str>>,
369        transaction: Option<&mut Transaction<'_, Any>>,
370    ) -> Result<AnyQueryResult> {
371        let query_with_placeholders = self.replace_placeholders(query);
372
373        let mut sqlx_query = sqlx::query(&query_with_placeholders);
374        for arg in args {
375            sqlx_query = sqlx_query.bind(arg);
376        }
377
378        match transaction {
379            Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error),
380            None => {
381                let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
382                let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error);
383                let _ = tx.commit().await.map_err(from_sqlx_error);
384                result
385            }
386        }
387    }
388}
389
390#[async_trait]
391impl Catalog for SqlCatalog {
392    async fn list_namespaces(
393        &self,
394        parent: Option<&NamespaceIdent>,
395    ) -> Result<Vec<NamespaceIdent>> {
396        // UNION will remove duplicates.
397        let all_namespaces_stmt = format!(
398            "SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
399             FROM {CATALOG_TABLE_NAME}
400             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
401             UNION
402             SELECT {NAMESPACE_FIELD_NAME}
403             FROM {NAMESPACE_TABLE_NAME}
404             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
405        );
406
407        let namespace_rows = self
408            .fetch_rows(&all_namespaces_stmt, vec![
409                Some(&self.name),
410                Some(&self.name),
411            ])
412            .await?;
413
414        let mut namespaces = HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len());
415
416        if let Some(parent) = parent {
417            if self.namespace_exists(parent).await? {
418                let parent_str = parent.join(".");
419
420                for row in namespace_rows.iter() {
421                    let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
422                    // if parent = a, then we only want to see a.b, a.c returned.
423                    if nsp != parent_str && nsp.starts_with(&parent_str) {
424                        namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?);
425                    }
426                }
427
428                Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
429            } else {
430                no_such_namespace_err(parent)
431            }
432        } else {
433            for row in namespace_rows.iter() {
434                let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
435                let mut levels = nsp.split(".").collect::<Vec<&str>>();
436                if !levels.is_empty() {
437                    let first_level = levels.drain(..1).collect::<Vec<&str>>();
438                    namespaces.insert(NamespaceIdent::from_strs(first_level)?);
439                }
440            }
441
442            Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
443        }
444    }
445
446    async fn create_namespace(
447        &self,
448        namespace: &NamespaceIdent,
449        properties: HashMap<String, String>,
450    ) -> Result<Namespace> {
451        let exists = self.namespace_exists(namespace).await?;
452
453        if exists {
454            return Err(Error::new(
455                iceberg::ErrorKind::NamespaceAlreadyExists,
456                format!("Namespace {namespace:?} already exists"),
457            ));
458        }
459
460        let namespace_str = namespace.join(".");
461        let insert = format!(
462            "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
463             VALUES (?, ?, ?, ?)");
464        if !properties.is_empty() {
465            let mut insert_properties = properties.clone();
466            insert_properties.insert("exists".to_string(), "true".to_string());
467
468            let mut query_args = Vec::with_capacity(insert_properties.len() * 4);
469            let mut insert_stmt = insert.clone();
470            for (index, (key, value)) in insert_properties.iter().enumerate() {
471                query_args.extend_from_slice(&[
472                    Some(self.name.as_str()),
473                    Some(namespace_str.as_str()),
474                    Some(key.as_str()),
475                    Some(value.as_str()),
476                ]);
477                if index > 0 {
478                    insert_stmt.push_str(", (?, ?, ?, ?)");
479                }
480            }
481
482            self.execute(&insert_stmt, query_args, None).await?;
483
484            Ok(Namespace::with_properties(
485                namespace.clone(),
486                insert_properties,
487            ))
488        } else {
489            // set a default property of exists = true
490            self.execute(
491                &insert,
492                vec![
493                    Some(&self.name),
494                    Some(&namespace_str),
495                    Some("exists"),
496                    Some("true"),
497                ],
498                None,
499            )
500            .await?;
501            Ok(Namespace::with_properties(namespace.clone(), properties))
502        }
503    }
504
505    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
506        let exists = self.namespace_exists(namespace).await?;
507        if exists {
508            let namespace_props = self
509                .fetch_rows(
510                    &format!(
511                        "SELECT
512                            {NAMESPACE_FIELD_NAME},
513                            {NAMESPACE_FIELD_PROPERTY_KEY},
514                            {NAMESPACE_FIELD_PROPERTY_VALUE}
515                            FROM {NAMESPACE_TABLE_NAME}
516                            WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
517                            AND {NAMESPACE_FIELD_NAME} = ?"
518                    ),
519                    vec![Some(&self.name), Some(&namespace.join("."))],
520                )
521                .await?;
522
523            let mut properties = HashMap::with_capacity(namespace_props.len());
524
525            for row in namespace_props {
526                let key = row
527                    .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_KEY)
528                    .map_err(from_sqlx_error)?;
529                let value = row
530                    .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_VALUE)
531                    .map_err(from_sqlx_error)?;
532
533                properties.insert(key, value);
534            }
535
536            Ok(Namespace::with_properties(namespace.clone(), properties))
537        } else {
538            no_such_namespace_err(namespace)
539        }
540    }
541
542    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
543        let namespace_str = namespace.join(".");
544
545        let table_namespaces = self
546            .fetch_rows(
547                &format!(
548                    "SELECT 1 FROM {CATALOG_TABLE_NAME}
549                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
550                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
551                     LIMIT 1"
552                ),
553                vec![Some(&self.name), Some(&namespace_str)],
554            )
555            .await?;
556
557        if !table_namespaces.is_empty() {
558            Ok(true)
559        } else {
560            let namespaces = self
561                .fetch_rows(
562                    &format!(
563                        "SELECT 1 FROM {NAMESPACE_TABLE_NAME}
564                         WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
565                          AND {NAMESPACE_FIELD_NAME} = ?
566                         LIMIT 1"
567                    ),
568                    vec![Some(&self.name), Some(&namespace_str)],
569                )
570                .await?;
571            if !namespaces.is_empty() {
572                Ok(true)
573            } else {
574                Ok(false)
575            }
576        }
577    }
578
579    async fn update_namespace(
580        &self,
581        namespace: &NamespaceIdent,
582        properties: HashMap<String, String>,
583    ) -> Result<()> {
584        let exists = self.namespace_exists(namespace).await?;
585        if exists {
586            let existing_properties = self.get_namespace(namespace).await?.properties().clone();
587            let namespace_str = namespace.join(".");
588
589            let mut updates = vec![];
590            let mut inserts = vec![];
591
592            for (key, value) in properties.iter() {
593                if existing_properties.contains_key(key) {
594                    if existing_properties.get(key) != Some(value) {
595                        updates.push((key, value));
596                    }
597                } else {
598                    inserts.push((key, value));
599                }
600            }
601
602            let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
603            let update_stmt = format!(
604                "UPDATE {NAMESPACE_TABLE_NAME} SET {NAMESPACE_FIELD_PROPERTY_VALUE} = ?
605                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
606                 AND {NAMESPACE_FIELD_NAME} = ?
607                 AND {NAMESPACE_FIELD_PROPERTY_KEY} = ?"
608            );
609
610            let insert_stmt = format!(
611                "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
612                 VALUES (?, ?, ?, ?)"
613            );
614
615            for (key, value) in updates {
616                self.execute(
617                    &update_stmt,
618                    vec![
619                        Some(value),
620                        Some(&self.name),
621                        Some(&namespace_str),
622                        Some(key),
623                    ],
624                    Some(&mut tx),
625                )
626                .await?;
627            }
628
629            for (key, value) in inserts {
630                self.execute(
631                    &insert_stmt,
632                    vec![
633                        Some(&self.name),
634                        Some(&namespace_str),
635                        Some(key),
636                        Some(value),
637                    ],
638                    Some(&mut tx),
639                )
640                .await?;
641            }
642
643            let _ = tx.commit().await.map_err(from_sqlx_error)?;
644
645            Ok(())
646        } else {
647            no_such_namespace_err(namespace)
648        }
649    }
650
651    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
652        let exists = self.namespace_exists(namespace).await?;
653        if exists {
654            // if there are tables in the namespace, don't allow drop.
655            let tables = self.list_tables(namespace).await?;
656            if !tables.is_empty() {
657                return Err(Error::new(
658                    iceberg::ErrorKind::Unexpected,
659                    format!(
660                        "Namespace {:?} is not empty. {} tables exist.",
661                        namespace,
662                        tables.len()
663                    ),
664                ));
665            }
666
667            self.execute(
668                &format!(
669                    "DELETE FROM {NAMESPACE_TABLE_NAME}
670                     WHERE {NAMESPACE_FIELD_NAME} = ?
671                      AND {CATALOG_FIELD_CATALOG_NAME} = ?"
672                ),
673                vec![Some(&namespace.join(".")), Some(&self.name)],
674                None,
675            )
676            .await?;
677
678            Ok(())
679        } else {
680            no_such_namespace_err(namespace)
681        }
682    }
683
684    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
685        let exists = self.namespace_exists(namespace).await?;
686        if exists {
687            let rows = self
688                .fetch_rows(
689                    &format!(
690                        "SELECT {CATALOG_FIELD_TABLE_NAME},
691                                {CATALOG_FIELD_TABLE_NAMESPACE}
692                         FROM {CATALOG_TABLE_NAME}
693                         WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
694                          AND {CATALOG_FIELD_CATALOG_NAME} = ?
695                          AND (
696                                {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
697                                OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
698                          )",
699                    ),
700                    vec![Some(&namespace.join(".")), Some(&self.name)],
701                )
702                .await?;
703
704            let mut tables = HashSet::<TableIdent>::with_capacity(rows.len());
705
706            for row in rows.iter() {
707                let tbl = row
708                    .try_get::<String, _>(CATALOG_FIELD_TABLE_NAME)
709                    .map_err(from_sqlx_error)?;
710                let ns_strs = row
711                    .try_get::<String, _>(CATALOG_FIELD_TABLE_NAMESPACE)
712                    .map_err(from_sqlx_error)?;
713                let ns = NamespaceIdent::from_strs(ns_strs.split("."))?;
714                tables.insert(TableIdent::new(ns, tbl));
715            }
716
717            Ok(tables.into_iter().collect::<Vec<TableIdent>>())
718        } else {
719            no_such_namespace_err(namespace)
720        }
721    }
722
723    async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
724        let namespace = identifier.namespace().join(".");
725        let table_name = identifier.name();
726        let table_counts = self
727            .fetch_rows(
728                &format!(
729                    "SELECT 1
730                     FROM {CATALOG_TABLE_NAME}
731                     WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
732                      AND {CATALOG_FIELD_CATALOG_NAME} = ?
733                      AND {CATALOG_FIELD_TABLE_NAME} = ?
734                      AND (
735                        {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
736                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
737                      )"
738                ),
739                vec![Some(&namespace), Some(&self.name), Some(table_name)],
740            )
741            .await?;
742
743        if !table_counts.is_empty() {
744            Ok(true)
745        } else {
746            Ok(false)
747        }
748    }
749
750    async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
751        if !self.table_exists(identifier).await? {
752            return no_such_table_err(identifier);
753        }
754
755        self.execute(
756            &format!(
757                "DELETE FROM {CATALOG_TABLE_NAME}
758                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
759                  AND {CATALOG_FIELD_TABLE_NAME} = ?
760                  AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
761                  AND (
762                    {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
763                    OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
764                  )"
765            ),
766            vec![
767                Some(&self.name),
768                Some(identifier.name()),
769                Some(&identifier.namespace().join(".")),
770            ],
771            None,
772        )
773        .await?;
774
775        Ok(())
776    }
777
778    async fn purge_table(&self, table: &TableIdent) -> Result<()> {
779        let table_info = self.load_table(table).await?;
780        self.drop_table(table).await?;
781        iceberg::drop_table_data(&table_info).await
782    }
783
784    async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
785        if !self.table_exists(identifier).await? {
786            return no_such_table_err(identifier);
787        }
788
789        let rows = self
790            .fetch_rows(
791                &format!(
792                    "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP}
793                     FROM {CATALOG_TABLE_NAME}
794                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
795                      AND {CATALOG_FIELD_TABLE_NAME} = ?
796                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
797                      AND (
798                        {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
799                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
800                      )"
801                ),
802                vec![
803                    Some(&self.name),
804                    Some(identifier.name()),
805                    Some(&identifier.namespace().join(".")),
806                ],
807            )
808            .await?;
809
810        if rows.is_empty() {
811            return no_such_table_err(identifier);
812        }
813
814        let row = &rows[0];
815        let tbl_metadata_location = row
816            .try_get::<String, _>(CATALOG_FIELD_METADATA_LOCATION_PROP)
817            .map_err(from_sqlx_error)?;
818
819        let metadata = TableMetadata::read_from(&self.fileio, &tbl_metadata_location).await?;
820
821        Ok(Table::builder()
822            .file_io(self.fileio.clone())
823            .identifier(identifier.clone())
824            .metadata_location(tbl_metadata_location)
825            .metadata(metadata)
826            .runtime(self.runtime.clone())
827            .build()?)
828    }
829
830    async fn create_table(
831        &self,
832        namespace: &NamespaceIdent,
833        creation: TableCreation,
834    ) -> Result<Table> {
835        if !self.namespace_exists(namespace).await? {
836            return no_such_namespace_err(namespace);
837        }
838
839        let tbl_name = creation.name.clone();
840        let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone());
841
842        if self.table_exists(&tbl_ident).await? {
843            return table_already_exists_err(&tbl_ident);
844        }
845
846        let (tbl_creation, location) = match creation.location.clone() {
847            Some(location) => (creation, location),
848            None => {
849                // fall back to namespace-specific location
850                // and then to warehouse location
851                let nsp_properties = self.get_namespace(namespace).await?.properties().clone();
852                let nsp_location = match nsp_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) {
853                    Some(location) => location.clone(),
854                    None => {
855                        format!(
856                            "{}/{}",
857                            self.warehouse_location.clone(),
858                            namespace.join("/")
859                        )
860                    }
861                };
862
863                let tbl_location = format!("{}/{}", nsp_location, tbl_ident.name());
864
865                (
866                    TableCreation {
867                        location: Some(tbl_location.clone()),
868                        ..creation
869                    },
870                    tbl_location,
871                )
872            }
873        };
874
875        let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
876            .build()?
877            .metadata;
878        let tbl_metadata_location =
879            MetadataLocation::new_with_metadata(location.clone(), &tbl_metadata);
880
881        tbl_metadata
882            .write_to(&self.fileio, &tbl_metadata_location)
883            .await?;
884
885        let tbl_metadata_location_str = tbl_metadata_location.to_string();
886        self.execute(&format!(
887            "INSERT INTO {CATALOG_TABLE_NAME}
888             ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
889             VALUES (?, ?, ?, ?, ?)
890            "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location_str), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
891
892        Ok(Table::builder()
893            .file_io(self.fileio.clone())
894            .metadata_location(tbl_metadata_location_str)
895            .identifier(tbl_ident)
896            .metadata(tbl_metadata)
897            .runtime(self.runtime.clone())
898            .build()?)
899    }
900
901    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
902        if src == dest {
903            return Ok(());
904        }
905
906        if !self.table_exists(src).await? {
907            return no_such_table_err(src);
908        }
909
910        if !self.namespace_exists(dest.namespace()).await? {
911            return no_such_namespace_err(dest.namespace());
912        }
913
914        if self.table_exists(dest).await? {
915            return table_already_exists_err(dest);
916        }
917
918        self.execute(
919            &format!(
920                "UPDATE {CATALOG_TABLE_NAME}
921                 SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ?
922                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
923                  AND {CATALOG_FIELD_TABLE_NAME} = ?
924                  AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
925                  AND (
926                    {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
927                    OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
928                )"
929            ),
930            vec![
931                Some(dest.name()),
932                Some(&dest.namespace().join(".")),
933                Some(&self.name),
934                Some(src.name()),
935                Some(&src.namespace().join(".")),
936            ],
937            None,
938        )
939        .await?;
940
941        Ok(())
942    }
943
944    async fn register_table(
945        &self,
946        table_ident: &TableIdent,
947        metadata_location: String,
948    ) -> Result<Table> {
949        if self.table_exists(table_ident).await? {
950            return table_already_exists_err(table_ident);
951        }
952
953        let metadata = TableMetadata::read_from(&self.fileio, &metadata_location).await?;
954
955        let namespace = table_ident.namespace();
956        let tbl_name = table_ident.name().to_string();
957
958        self.execute(&format!(
959            "INSERT INTO {CATALOG_TABLE_NAME}
960             ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
961             VALUES (?, ?, ?, ?, ?)
962            "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name), Some(&metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
963
964        Ok(Table::builder()
965            .identifier(table_ident.clone())
966            .metadata_location(metadata_location)
967            .metadata(metadata)
968            .file_io(self.fileio.clone())
969            .runtime(self.runtime.clone())
970            .build()?)
971    }
972
973    /// Updates an existing table within the SQL catalog.
974    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
975        let table_ident = commit.identifier().clone();
976        let current_table = self.load_table(&table_ident).await?;
977        let current_metadata_location = current_table.metadata_location_result()?.to_string();
978
979        let staged_table = commit.apply(current_table)?;
980        let staged_metadata_location_str = staged_table.metadata_location_result()?;
981        let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
982
983        staged_table
984            .metadata()
985            .write_to(staged_table.file_io(), &staged_metadata_location)
986            .await?;
987
988        let staged_metadata_location_str = staged_metadata_location.to_string();
989        let update_result = self
990            .execute(
991                &format!(
992                    "UPDATE {CATALOG_TABLE_NAME}
993                     SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?, {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} = ?
994                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
995                      AND {CATALOG_FIELD_TABLE_NAME} = ?
996                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
997                      AND (
998                        {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
999                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
1000                      )
1001                      AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?"
1002                ),
1003                vec![
1004                    Some(&staged_metadata_location_str),
1005                    Some(current_metadata_location.as_str()),
1006                    Some(&self.name),
1007                    Some(table_ident.name()),
1008                    Some(&table_ident.namespace().join(".")),
1009                    Some(current_metadata_location.as_str()),
1010                ],
1011                None,
1012            )
1013            .await?;
1014
1015        if update_result.rows_affected() == 0 {
1016            return Err(Error::new(
1017                ErrorKind::CatalogCommitConflicts,
1018                format!("Commit conflicted for table: {table_ident}"),
1019            )
1020            .with_retryable(true));
1021        }
1022
1023        Ok(staged_table)
1024    }
1025}
1026
1027#[cfg(test)]
1028mod tests {
1029    use std::collections::{HashMap, HashSet};
1030    use std::hash::Hash;
1031    use std::sync::Arc;
1032
1033    use iceberg::io::LocalFsStorageFactory;
1034    use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
1035    use iceberg::table::Table;
1036    use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent};
1037    use itertools::Itertools;
1038    use regex::Regex;
1039    use sqlx::migrate::MigrateDatabase;
1040    use tempfile::TempDir;
1041
1042    use crate::catalog::{
1043        NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, SQL_CATALOG_PROP_URI,
1044        SQL_CATALOG_PROP_WAREHOUSE,
1045    };
1046    use crate::{SqlBindStyle, SqlCatalogBuilder};
1047
1048    const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
1049
1050    fn temp_path() -> String {
1051        let temp_dir = TempDir::new().unwrap();
1052        temp_dir.path().to_str().unwrap().to_string()
1053    }
1054
1055    fn to_set<T: std::cmp::Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
1056        HashSet::from_iter(vec)
1057    }
1058
1059    fn default_properties() -> HashMap<String, String> {
1060        HashMap::from([("exists".to_string(), "true".to_string())])
1061    }
1062
1063    /// Create a new SQLite catalog for testing. If name is not specified it defaults to "iceberg".
1064    async fn new_sql_catalog(
1065        warehouse_location: String,
1066        name: Option<impl ToString>,
1067    ) -> impl Catalog {
1068        let name = if let Some(name) = name {
1069            name.to_string()
1070        } else {
1071            "iceberg".to_string()
1072        };
1073        let sql_lite_uri = format!("sqlite:{}", temp_path());
1074        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1075
1076        let props = HashMap::from_iter([
1077            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.to_string()),
1078            (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1079            (
1080                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1081                SqlBindStyle::DollarNumeric.to_string(),
1082            ),
1083        ]);
1084        SqlCatalogBuilder::default()
1085            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1086            .load(&name, props)
1087            .await
1088            .unwrap()
1089    }
1090
1091    async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
1092        let _ = catalog
1093            .create_namespace(namespace_ident, HashMap::new())
1094            .await
1095            .unwrap();
1096    }
1097
1098    async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
1099        for namespace_ident in namespace_idents {
1100            let _ = create_namespace(catalog, namespace_ident).await;
1101        }
1102    }
1103
1104    fn simple_table_schema() -> Schema {
1105        Schema::builder()
1106            .with_fields(vec![
1107                NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
1108            ])
1109            .build()
1110            .unwrap()
1111    }
1112
1113    async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
1114        let _ = catalog
1115            .create_table(
1116                &table_ident.namespace,
1117                TableCreation::builder()
1118                    .name(table_ident.name().into())
1119                    .schema(simple_table_schema())
1120                    .location(temp_path())
1121                    .build(),
1122            )
1123            .await
1124            .unwrap();
1125    }
1126
1127    async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
1128        for table_ident in table_idents {
1129            create_table(catalog, table_ident).await;
1130        }
1131    }
1132
1133    fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
1134        assert_eq!(table.identifier(), expected_table_ident);
1135
1136        let metadata = table.metadata();
1137
1138        assert_eq!(metadata.current_schema().as_ref(), expected_schema);
1139
1140        let expected_partition_spec = PartitionSpec::builder(expected_schema.clone())
1141            .with_spec_id(0)
1142            .build()
1143            .unwrap();
1144
1145        assert_eq!(
1146            metadata
1147                .partition_specs_iter()
1148                .map(|p| p.as_ref())
1149                .collect_vec(),
1150            vec![&expected_partition_spec]
1151        );
1152
1153        let expected_sorted_order = SortOrder::builder()
1154            .with_order_id(0)
1155            .with_fields(vec![])
1156            .build(expected_schema)
1157            .unwrap();
1158
1159        assert_eq!(
1160            metadata
1161                .sort_orders_iter()
1162                .map(|s| s.as_ref())
1163                .collect_vec(),
1164            vec![&expected_sorted_order]
1165        );
1166
1167        assert_eq!(metadata.properties(), &HashMap::new());
1168
1169        assert!(!table.readonly());
1170    }
1171
1172    fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
1173        let actual = table.metadata_location().unwrap().to_string();
1174        let regex = Regex::new(regex_str).unwrap();
1175        assert!(regex.is_match(&actual))
1176    }
1177
1178    #[tokio::test]
1179    async fn test_initialized() {
1180        let warehouse_loc = temp_path();
1181        new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1182        // catalog instantiation should not fail even if tables exist
1183        new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1184        new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1185    }
1186
1187    // Regression test: storage-backend props set on the catalog must reach
1188    // the FileIO; otherwise authenticated backends fail with 401s on writes.
1189    #[tokio::test]
1190    async fn test_storage_props_propagate_to_file_io() {
1191        let sql_lite_uri = format!("sqlite:{}", temp_path());
1192        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1193        let warehouse_location = temp_path();
1194
1195        let catalog = SqlCatalogBuilder::default()
1196            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1197            .load(
1198                "iceberg",
1199                HashMap::from_iter([
1200                    (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1201                    (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1202                    ("s3.region".to_string(), "us-east-1".to_string()),
1203                    ("hf.token".to_string(), "hf_test_token".to_string()),
1204                ]),
1205            )
1206            .await
1207            .unwrap();
1208
1209        let props = catalog.fileio.config().props();
1210        assert_eq!(props.get("s3.region"), Some(&"us-east-1".to_string()));
1211        assert_eq!(props.get("hf.token"), Some(&"hf_test_token".to_string()));
1212    }
1213
1214    #[tokio::test]
1215    async fn test_builder_method() {
1216        let sql_lite_uri = format!("sqlite:{}", temp_path());
1217        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1218        let warehouse_location = temp_path();
1219
1220        let catalog = SqlCatalogBuilder::default()
1221            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1222            .uri(sql_lite_uri.to_string())
1223            .warehouse_location(warehouse_location.clone())
1224            .sql_bind_style(SqlBindStyle::QMark)
1225            .load("iceberg", HashMap::default())
1226            .await;
1227        assert!(catalog.is_ok());
1228
1229        let catalog = catalog.unwrap();
1230        assert!(catalog.warehouse_location == warehouse_location);
1231        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1232    }
1233
1234    /// Overwriting an sqlite database with a non-existent path causes
1235    /// catalog generation to fail
1236    #[tokio::test]
1237    async fn test_builder_props_non_existent_path_fails() {
1238        let sql_lite_uri = format!("sqlite:{}", temp_path());
1239        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1240        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1241        let warehouse_location = temp_path();
1242
1243        let catalog = SqlCatalogBuilder::default()
1244            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1245            .uri(sql_lite_uri)
1246            .warehouse_location(warehouse_location)
1247            .load(
1248                "iceberg",
1249                HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)]),
1250            )
1251            .await;
1252        assert!(catalog.is_err());
1253    }
1254
1255    /// Even when an invalid URI is specified in a builder method,
1256    /// it can be successfully overridden with a valid URI in props
1257    /// for catalog generation to succeed.
1258    #[tokio::test]
1259    async fn test_builder_props_set_valid_uri() {
1260        let sql_lite_uri = format!("sqlite:{}", temp_path());
1261        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1262        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1263        let warehouse_location = temp_path();
1264
1265        let catalog = SqlCatalogBuilder::default()
1266            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1267            .uri(sql_lite_uri2)
1268            .warehouse_location(warehouse_location)
1269            .load(
1270                "iceberg",
1271                HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone())]),
1272            )
1273            .await;
1274        assert!(catalog.is_ok());
1275    }
1276
1277    /// values assigned via props take precedence
1278    #[tokio::test]
1279    async fn test_builder_props_take_precedence() {
1280        let sql_lite_uri = format!("sqlite:{}", temp_path());
1281        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1282        let warehouse_location = temp_path();
1283        let warehouse_location2 = temp_path();
1284
1285        let catalog = SqlCatalogBuilder::default()
1286            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1287            .warehouse_location(warehouse_location2)
1288            .sql_bind_style(SqlBindStyle::DollarNumeric)
1289            .load(
1290                "iceberg",
1291                HashMap::from_iter([
1292                    (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1293                    (
1294                        SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1295                        warehouse_location.clone(),
1296                    ),
1297                    (
1298                        SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1299                        SqlBindStyle::QMark.to_string(),
1300                    ),
1301                ]),
1302            )
1303            .await;
1304
1305        assert!(catalog.is_ok());
1306
1307        let catalog = catalog.unwrap();
1308        assert!(catalog.warehouse_location == warehouse_location);
1309        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1310    }
1311
1312    /// values assigned via props take precedence
1313    #[tokio::test]
1314    async fn test_builder_props_take_precedence_props() {
1315        let sql_lite_uri = format!("sqlite:{}", temp_path());
1316        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1317        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1318        let warehouse_location = temp_path();
1319        let warehouse_location2 = temp_path();
1320
1321        let props = HashMap::from_iter([
1322            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1323            (
1324                SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1325                warehouse_location.clone(),
1326            ),
1327            (
1328                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1329                SqlBindStyle::QMark.to_string(),
1330            ),
1331        ]);
1332        let props2 = HashMap::from_iter([
1333            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2.clone()),
1334            (
1335                SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1336                warehouse_location2.clone(),
1337            ),
1338            (
1339                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1340                SqlBindStyle::DollarNumeric.to_string(),
1341            ),
1342        ]);
1343
1344        let catalog = SqlCatalogBuilder::default()
1345            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1346            .props(props2)
1347            .load("iceberg", props)
1348            .await;
1349
1350        assert!(catalog.is_ok());
1351
1352        let catalog = catalog.unwrap();
1353        assert!(catalog.warehouse_location == warehouse_location);
1354        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1355    }
1356
1357    /// values assigned via props take precedence
1358    #[tokio::test]
1359    async fn test_builder_props_take_precedence_prop() {
1360        let sql_lite_uri = format!("sqlite:{}", temp_path());
1361        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1362        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1363        let warehouse_location = temp_path();
1364        let warehouse_location2 = temp_path();
1365
1366        let props = HashMap::from_iter([
1367            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1368            (
1369                SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1370                warehouse_location.clone(),
1371            ),
1372            (
1373                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1374                SqlBindStyle::QMark.to_string(),
1375            ),
1376        ]);
1377
1378        let catalog = SqlCatalogBuilder::default()
1379            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1380            .prop(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)
1381            .prop(SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location2)
1382            .prop(
1383                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1384                SqlBindStyle::DollarNumeric.to_string(),
1385            )
1386            .load("iceberg", props)
1387            .await;
1388
1389        assert!(catalog.is_ok());
1390
1391        let catalog = catalog.unwrap();
1392        assert!(catalog.warehouse_location == warehouse_location);
1393        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1394    }
1395
1396    /// invalid value for `SqlBindStyle` causes catalog creation to fail
1397    #[tokio::test]
1398    async fn test_builder_props_invalid_bind_style_fails() {
1399        let sql_lite_uri = format!("sqlite:{}", temp_path());
1400        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1401        let warehouse_location = temp_path();
1402
1403        let catalog = SqlCatalogBuilder::default()
1404            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1405            .load(
1406                "iceberg",
1407                HashMap::from_iter([
1408                    (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1409                    (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1410                    (SQL_CATALOG_PROP_BIND_STYLE.to_string(), "AAA".to_string()),
1411                ]),
1412            )
1413            .await;
1414
1415        assert!(catalog.is_err());
1416    }
1417
1418    #[tokio::test]
1419    async fn test_list_namespaces_returns_empty_vector() {
1420        let warehouse_loc = temp_path();
1421        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1422
1423        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
1424    }
1425
1426    #[tokio::test]
1427    async fn test_list_namespaces_returns_empty_different_name() {
1428        let warehouse_loc = temp_path();
1429        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1430        let namespace_ident_1 = NamespaceIdent::new("a".into());
1431        let namespace_ident_2 = NamespaceIdent::new("b".into());
1432        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1433        assert_eq!(
1434            to_set(catalog.list_namespaces(None).await.unwrap()),
1435            to_set(vec![namespace_ident_1, namespace_ident_2])
1436        );
1437
1438        let catalog2 = new_sql_catalog(warehouse_loc, Some("test")).await;
1439        assert_eq!(catalog2.list_namespaces(None).await.unwrap(), vec![]);
1440    }
1441
1442    #[tokio::test]
1443    async fn test_list_namespaces_returns_only_top_level_namespaces() {
1444        let warehouse_loc = temp_path();
1445        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1446        let namespace_ident_1 = NamespaceIdent::new("a".into());
1447        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1448        let namespace_ident_3 = NamespaceIdent::new("b".into());
1449        create_namespaces(&catalog, &vec![
1450            &namespace_ident_1,
1451            &namespace_ident_2,
1452            &namespace_ident_3,
1453        ])
1454        .await;
1455
1456        assert_eq!(
1457            to_set(catalog.list_namespaces(None).await.unwrap()),
1458            to_set(vec![namespace_ident_1, namespace_ident_3])
1459        );
1460    }
1461
1462    #[tokio::test]
1463    async fn test_list_namespaces_returns_no_namespaces_under_parent() {
1464        let warehouse_loc = temp_path();
1465        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1466        let namespace_ident_1 = NamespaceIdent::new("a".into());
1467        let namespace_ident_2 = NamespaceIdent::new("b".into());
1468        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1469
1470        assert_eq!(
1471            catalog
1472                .list_namespaces(Some(&namespace_ident_1))
1473                .await
1474                .unwrap(),
1475            vec![]
1476        );
1477    }
1478
1479    #[tokio::test]
1480    async fn test_list_namespaces_returns_namespace_under_parent() {
1481        let warehouse_loc = temp_path();
1482        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1483        let namespace_ident_1 = NamespaceIdent::new("a".into());
1484        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1485        let namespace_ident_3 = NamespaceIdent::new("c".into());
1486        create_namespaces(&catalog, &vec![
1487            &namespace_ident_1,
1488            &namespace_ident_2,
1489            &namespace_ident_3,
1490        ])
1491        .await;
1492
1493        assert_eq!(
1494            to_set(catalog.list_namespaces(None).await.unwrap()),
1495            to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
1496        );
1497
1498        assert_eq!(
1499            catalog
1500                .list_namespaces(Some(&namespace_ident_1))
1501                .await
1502                .unwrap(),
1503            vec![NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()]
1504        );
1505    }
1506
1507    #[tokio::test]
1508    async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
1509        let warehouse_loc = temp_path();
1510        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1511        let namespace_ident_1 = NamespaceIdent::new("a".to_string());
1512        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
1513        let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1514        let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
1515        let namespace_ident_5 = NamespaceIdent::new("b".into());
1516        create_namespaces(&catalog, &vec![
1517            &namespace_ident_1,
1518            &namespace_ident_2,
1519            &namespace_ident_3,
1520            &namespace_ident_4,
1521            &namespace_ident_5,
1522        ])
1523        .await;
1524
1525        assert_eq!(
1526            to_set(
1527                catalog
1528                    .list_namespaces(Some(&namespace_ident_1))
1529                    .await
1530                    .unwrap()
1531            ),
1532            to_set(vec![
1533                NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(),
1534                NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(),
1535                NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(),
1536            ])
1537        );
1538    }
1539
1540    #[tokio::test]
1541    async fn test_namespace_exists_returns_false() {
1542        let warehouse_loc = temp_path();
1543        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1544        let namespace_ident = NamespaceIdent::new("a".into());
1545        create_namespace(&catalog, &namespace_ident).await;
1546
1547        assert!(
1548            !catalog
1549                .namespace_exists(&NamespaceIdent::new("b".into()))
1550                .await
1551                .unwrap()
1552        );
1553    }
1554
1555    #[tokio::test]
1556    async fn test_namespace_exists_returns_true() {
1557        let warehouse_loc = temp_path();
1558        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1559        let namespace_ident = NamespaceIdent::new("a".into());
1560        create_namespace(&catalog, &namespace_ident).await;
1561
1562        assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
1563    }
1564
1565    #[tokio::test]
1566    async fn test_create_namespace_with_properties() {
1567        let warehouse_loc = temp_path();
1568        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1569        let namespace_ident = NamespaceIdent::new("abc".into());
1570
1571        let mut properties = default_properties();
1572        properties.insert("k".into(), "v".into());
1573
1574        assert_eq!(
1575            catalog
1576                .create_namespace(&namespace_ident, properties.clone())
1577                .await
1578                .unwrap(),
1579            Namespace::with_properties(namespace_ident.clone(), properties.clone())
1580        );
1581
1582        assert_eq!(
1583            catalog.get_namespace(&namespace_ident).await.unwrap(),
1584            Namespace::with_properties(namespace_ident, properties)
1585        );
1586    }
1587
1588    #[tokio::test]
1589    async fn test_create_nested_namespace() {
1590        let warehouse_loc = temp_path();
1591        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1592        let parent_namespace_ident = NamespaceIdent::new("a".into());
1593        create_namespace(&catalog, &parent_namespace_ident).await;
1594
1595        let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1596
1597        assert_eq!(
1598            catalog
1599                .create_namespace(&child_namespace_ident, HashMap::new())
1600                .await
1601                .unwrap(),
1602            Namespace::new(child_namespace_ident.clone())
1603        );
1604
1605        assert_eq!(
1606            catalog.get_namespace(&child_namespace_ident).await.unwrap(),
1607            Namespace::with_properties(child_namespace_ident, default_properties())
1608        );
1609    }
1610
1611    #[tokio::test]
1612    async fn test_create_deeply_nested_namespace() {
1613        let warehouse_loc = temp_path();
1614        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1615        let namespace_ident_a = NamespaceIdent::new("a".into());
1616        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1617        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1618
1619        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1620
1621        assert_eq!(
1622            catalog
1623                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
1624                .await
1625                .unwrap(),
1626            Namespace::new(namespace_ident_a_b_c.clone())
1627        );
1628
1629        assert_eq!(
1630            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
1631            Namespace::with_properties(namespace_ident_a_b_c, default_properties())
1632        );
1633    }
1634
1635    #[tokio::test]
1636    async fn test_update_namespace_noop() {
1637        let warehouse_loc = temp_path();
1638        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1639        let namespace_ident = NamespaceIdent::new("a".into());
1640        create_namespace(&catalog, &namespace_ident).await;
1641
1642        catalog
1643            .update_namespace(&namespace_ident, HashMap::new())
1644            .await
1645            .unwrap();
1646
1647        assert_eq!(
1648            *catalog
1649                .get_namespace(&namespace_ident)
1650                .await
1651                .unwrap()
1652                .properties(),
1653            HashMap::from_iter([("exists".to_string(), "true".to_string())])
1654        )
1655    }
1656
1657    #[tokio::test]
1658    async fn test_update_nested_namespace() {
1659        let warehouse_loc = temp_path();
1660        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1661        let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1662        create_namespace(&catalog, &namespace_ident).await;
1663
1664        let mut props = HashMap::from_iter([
1665            ("prop1".to_string(), "val1".to_string()),
1666            ("prop2".into(), "val2".into()),
1667        ]);
1668
1669        catalog
1670            .update_namespace(&namespace_ident, props.clone())
1671            .await
1672            .unwrap();
1673
1674        props.insert("exists".into(), "true".into());
1675
1676        assert_eq!(
1677            *catalog
1678                .get_namespace(&namespace_ident)
1679                .await
1680                .unwrap()
1681                .properties(),
1682            props
1683        )
1684    }
1685
1686    #[tokio::test]
1687    async fn test_update_namespace_errors_if_nested_namespace_doesnt_exist() {
1688        let warehouse_loc = temp_path();
1689        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1690        let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1691
1692        let props = HashMap::from_iter([
1693            ("prop1".to_string(), "val1".to_string()),
1694            ("prop2".into(), "val2".into()),
1695        ]);
1696
1697        let err = catalog
1698            .update_namespace(&namespace_ident, props)
1699            .await
1700            .unwrap_err();
1701
1702        assert_eq!(
1703            err.message(),
1704            format!("No such namespace: {namespace_ident:?}")
1705        );
1706    }
1707
1708    #[tokio::test]
1709    async fn test_drop_nested_namespace() {
1710        let warehouse_loc = temp_path();
1711        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1712        let namespace_ident_a = NamespaceIdent::new("a".into());
1713        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1714        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1715
1716        catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1717
1718        assert!(
1719            !catalog
1720                .namespace_exists(&namespace_ident_a_b)
1721                .await
1722                .unwrap()
1723        );
1724
1725        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1726    }
1727
1728    #[tokio::test]
1729    async fn test_drop_deeply_nested_namespace() {
1730        let warehouse_loc = temp_path();
1731        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1732        let namespace_ident_a = NamespaceIdent::new("a".into());
1733        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1734        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1735        create_namespaces(&catalog, &vec![
1736            &namespace_ident_a,
1737            &namespace_ident_a_b,
1738            &namespace_ident_a_b_c,
1739        ])
1740        .await;
1741
1742        catalog
1743            .drop_namespace(&namespace_ident_a_b_c)
1744            .await
1745            .unwrap();
1746
1747        assert!(
1748            !catalog
1749                .namespace_exists(&namespace_ident_a_b_c)
1750                .await
1751                .unwrap()
1752        );
1753
1754        assert!(
1755            catalog
1756                .namespace_exists(&namespace_ident_a_b)
1757                .await
1758                .unwrap()
1759        );
1760
1761        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1762    }
1763
1764    #[tokio::test]
1765    async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1766        let warehouse_loc = temp_path();
1767        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1768        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1769
1770        let non_existent_namespace_ident =
1771            NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1772        assert_eq!(
1773            catalog
1774                .drop_namespace(&non_existent_namespace_ident)
1775                .await
1776                .unwrap_err()
1777                .to_string(),
1778            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1779        )
1780    }
1781
1782    #[tokio::test]
1783    async fn test_dropping_a_namespace_does_not_drop_namespaces_nested_under_that_one() {
1784        let warehouse_loc = temp_path();
1785        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1786        let namespace_ident_a = NamespaceIdent::new("a".into());
1787        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1788        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1789
1790        catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1791
1792        assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1793
1794        assert!(
1795            catalog
1796                .namespace_exists(&namespace_ident_a_b)
1797                .await
1798                .unwrap()
1799        );
1800    }
1801
1802    #[tokio::test]
1803    async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1804        let warehouse_loc = temp_path();
1805        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1806
1807        let namespace_ident = NamespaceIdent::new("a".into());
1808        let mut namespace_properties = HashMap::new();
1809        let namespace_location = temp_path();
1810        namespace_properties.insert(
1811            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1812            namespace_location.to_string(),
1813        );
1814        catalog
1815            .create_namespace(&namespace_ident, namespace_properties)
1816            .await
1817            .unwrap();
1818
1819        let table_name = "tbl1";
1820        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1821        let expected_table_metadata_location_regex =
1822            format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1823
1824        let table = catalog
1825            .create_table(
1826                &namespace_ident,
1827                TableCreation::builder()
1828                    .name(table_name.into())
1829                    .schema(simple_table_schema())
1830                    // no location specified for table
1831                    .build(),
1832            )
1833            .await
1834            .unwrap();
1835        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1836        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1837
1838        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1839        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1840        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1841    }
1842
1843    #[tokio::test]
1844    async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1845     {
1846        let warehouse_loc = temp_path();
1847        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1848
1849        let namespace_ident = NamespaceIdent::new("a".into());
1850        let mut namespace_properties = HashMap::new();
1851        let namespace_location = temp_path();
1852        namespace_properties.insert(
1853            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1854            namespace_location.to_string(),
1855        );
1856        catalog
1857            .create_namespace(&namespace_ident, namespace_properties)
1858            .await
1859            .unwrap();
1860
1861        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1862        let mut nested_namespace_properties = HashMap::new();
1863        let nested_namespace_location = temp_path();
1864        nested_namespace_properties.insert(
1865            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1866            nested_namespace_location.to_string(),
1867        );
1868        catalog
1869            .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1870            .await
1871            .unwrap();
1872
1873        let table_name = "tbl1";
1874        let expected_table_ident =
1875            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1876        let expected_table_metadata_location_regex = format!(
1877            "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1878        );
1879
1880        let table = catalog
1881            .create_table(
1882                &nested_namespace_ident,
1883                TableCreation::builder()
1884                    .name(table_name.into())
1885                    .schema(simple_table_schema())
1886                    // no location specified for table
1887                    .build(),
1888            )
1889            .await
1890            .unwrap();
1891        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1892        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1893
1894        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1895        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1896        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1897    }
1898
1899    #[tokio::test]
1900    async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1901     {
1902        let warehouse_loc = temp_path();
1903        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1904
1905        let namespace_ident = NamespaceIdent::new("a".into());
1906        // note: no location specified in namespace_properties
1907        let namespace_properties = HashMap::new();
1908        catalog
1909            .create_namespace(&namespace_ident, namespace_properties)
1910            .await
1911            .unwrap();
1912
1913        let table_name = "tbl1";
1914        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1915        let expected_table_metadata_location_regex =
1916            format!("^{warehouse_loc}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1917
1918        let table = catalog
1919            .create_table(
1920                &namespace_ident,
1921                TableCreation::builder()
1922                    .name(table_name.into())
1923                    .schema(simple_table_schema())
1924                    // no location specified for table
1925                    .build(),
1926            )
1927            .await
1928            .unwrap();
1929        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1930        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1931
1932        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1933        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1934        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1935    }
1936
1937    #[tokio::test]
1938    async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1939     {
1940        let warehouse_loc = temp_path();
1941        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1942
1943        let namespace_ident = NamespaceIdent::new("a".into());
1944        create_namespace(&catalog, &namespace_ident).await;
1945
1946        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1947        create_namespace(&catalog, &nested_namespace_ident).await;
1948
1949        let table_name = "tbl1";
1950        let expected_table_ident =
1951            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1952        let expected_table_metadata_location_regex =
1953            format!("^{warehouse_loc}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1954
1955        let table = catalog
1956            .create_table(
1957                &nested_namespace_ident,
1958                TableCreation::builder()
1959                    .name(table_name.into())
1960                    .schema(simple_table_schema())
1961                    // no location specified for table
1962                    .build(),
1963            )
1964            .await
1965            .unwrap();
1966        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1967        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1968
1969        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1970        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1971        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1972    }
1973
1974    #[tokio::test]
1975    async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1976        let warehouse_loc = temp_path();
1977        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1978        let namespace_ident = NamespaceIdent::new("a".into());
1979        create_namespace(&catalog, &namespace_ident).await;
1980        let table_name = "tbl1";
1981        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1982        create_table(&catalog, &table_ident).await;
1983
1984        let tmp_dir = TempDir::new().unwrap();
1985        let location = tmp_dir.path().to_str().unwrap().to_string();
1986
1987        assert_eq!(
1988            catalog
1989                .create_table(
1990                    &namespace_ident,
1991                    TableCreation::builder()
1992                        .name(table_name.into())
1993                        .schema(simple_table_schema())
1994                        .location(location)
1995                        .build()
1996                )
1997                .await
1998                .unwrap_err()
1999                .to_string(),
2000            format!(
2001                "TableAlreadyExists => Table {:?} already exists.",
2002                &table_ident
2003            )
2004        );
2005    }
2006
2007    #[tokio::test]
2008    async fn test_rename_table_src_table_is_same_as_dst_table() {
2009        let warehouse_loc = temp_path();
2010        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2011        let namespace_ident = NamespaceIdent::new("n1".into());
2012        create_namespace(&catalog, &namespace_ident).await;
2013        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
2014        create_table(&catalog, &table_ident).await;
2015
2016        catalog
2017            .rename_table(&table_ident, &table_ident)
2018            .await
2019            .unwrap();
2020
2021        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
2022            table_ident
2023        ],);
2024    }
2025
2026    #[tokio::test]
2027    async fn test_rename_table_across_nested_namespaces() {
2028        let warehouse_loc = temp_path();
2029        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2030        let namespace_ident_a = NamespaceIdent::new("a".into());
2031        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
2032        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
2033        create_namespaces(&catalog, &vec![
2034            &namespace_ident_a,
2035            &namespace_ident_a_b,
2036            &namespace_ident_a_b_c,
2037        ])
2038        .await;
2039
2040        let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
2041        create_tables(&catalog, vec![&src_table_ident]).await;
2042
2043        let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
2044        catalog
2045            .rename_table(&src_table_ident, &dst_table_ident)
2046            .await
2047            .unwrap();
2048
2049        assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
2050
2051        assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
2052    }
2053
2054    #[tokio::test]
2055    async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
2056        let warehouse_loc = temp_path();
2057        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2058        let src_namespace_ident = NamespaceIdent::new("n1".into());
2059        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
2060        create_namespace(&catalog, &src_namespace_ident).await;
2061        create_table(&catalog, &src_table_ident).await;
2062
2063        let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
2064        let dst_table_ident =
2065            TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
2066        assert_eq!(
2067            catalog
2068                .rename_table(&src_table_ident, &dst_table_ident)
2069                .await
2070                .unwrap_err()
2071                .to_string(),
2072            format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"),
2073        );
2074    }
2075}