iceberg_catalog_sql/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use async_trait::async_trait;
24use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
25use iceberg::spec::{TableMetadata, TableMetadataBuilder};
26use iceberg::table::Table;
27use iceberg::{
28    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
29    Runtime, TableCommit, TableCreation, TableIdent,
30};
31use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers};
32use sqlx::{Any, AnyPool, Row, Transaction};
33
34use crate::error::{
35    from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err,
36};
37
38/// catalog URI
39pub const SQL_CATALOG_PROP_URI: &str = "uri";
40/// catalog warehouse location
41pub const SQL_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
42/// catalog sql bind style
43pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style";
44
45static CATALOG_TABLE_NAME: &str = "iceberg_tables";
46static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
47static CATALOG_FIELD_TABLE_NAME: &str = "table_name";
48static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace";
49static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location";
50static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
51static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type";
52static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE";
53
54static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties";
55static NAMESPACE_FIELD_NAME: &str = "namespace";
56static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key";
57static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value";
58
59static NAMESPACE_LOCATION_PROPERTY_KEY: &str = "location";
60
61static MAX_CONNECTIONS: u32 = 10; // Default the SQL pool to 10 connections if not provided
62static IDLE_TIMEOUT: u64 = 10; // Default the maximum idle timeout per connection to 10s before it is closed
63static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each connection to enabled prior to returning
64
65/// Builder for [`SqlCatalog`]
66#[derive(Debug)]
67pub struct SqlCatalogBuilder {
68    config: SqlCatalogConfig,
69    storage_factory: Option<Arc<dyn StorageFactory>>,
70    runtime: Option<Runtime>,
71}
72
73impl Default for SqlCatalogBuilder {
74    fn default() -> Self {
75        Self {
76            config: SqlCatalogConfig {
77                uri: "".to_string(),
78                name: "".to_string(),
79                warehouse_location: "".to_string(),
80                sql_bind_style: SqlBindStyle::DollarNumeric,
81                props: HashMap::new(),
82            },
83            storage_factory: None,
84            runtime: None,
85        }
86    }
87}
88
89impl SqlCatalogBuilder {
90    /// Configure the database URI
91    ///
92    /// If `SQL_CATALOG_PROP_URI` has a value set in `props` during `SqlCatalogBuilder::load`,
93    /// that value takes precedence, and the value specified by this method will not be used.
94    pub fn uri(mut self, uri: impl Into<String>) -> Self {
95        self.config.uri = uri.into();
96        self
97    }
98
99    /// Configure the warehouse location
100    ///
101    /// If `SQL_CATALOG_PROP_WAREHOUSE` has a value set in `props` during `SqlCatalogBuilder::load`,
102    /// that value takes precedence, and the value specified by this method will not be used.
103    pub fn warehouse_location(mut self, location: impl Into<String>) -> Self {
104        self.config.warehouse_location = location.into();
105        self
106    }
107
108    /// Configure the bound SQL Statement
109    ///
110    /// If `SQL_CATALOG_PROP_BIND_STYLE` has a value set in `props` during `SqlCatalogBuilder::load`,
111    /// that value takes precedence, and the value specified by this method will not be used.
112    pub fn sql_bind_style(mut self, sql_bind_style: SqlBindStyle) -> Self {
113        self.config.sql_bind_style = sql_bind_style;
114        self
115    }
116
117    /// Configure the any properties
118    ///
119    /// If the same key has values set in `props` during `SqlCatalogBuilder::load`,
120    /// those values will take precedence.
121    pub fn props(mut self, props: HashMap<String, String>) -> Self {
122        for (k, v) in props {
123            self.config.props.insert(k, v);
124        }
125        self
126    }
127
128    /// Set a new property on the property to be configured.
129    /// When multiple methods are executed with the same key,
130    /// the later-set value takes precedence.
131    ///
132    /// If the same key has values set in `props` during `SqlCatalogBuilder::load`,
133    /// those values will take precedence.
134    pub fn prop(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
135        self.config.props.insert(key.into(), value.into());
136        self
137    }
138}
139
140impl CatalogBuilder for SqlCatalogBuilder {
141    type C = SqlCatalog;
142
143    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
144        self.storage_factory = Some(storage_factory);
145        self
146    }
147
148    fn with_runtime(mut self, runtime: Runtime) -> Self {
149        self.runtime = Some(runtime);
150        self
151    }
152
153    fn load(
154        mut self,
155        name: impl Into<String>,
156        props: HashMap<String, String>,
157    ) -> impl Future<Output = Result<Self::C>> + Send {
158        for (k, v) in props {
159            self.config.props.insert(k, v);
160        }
161
162        if let Some(uri) = self.config.props.remove(SQL_CATALOG_PROP_URI) {
163            self.config.uri = uri;
164        }
165        if let Some(warehouse_location) = self.config.props.remove(SQL_CATALOG_PROP_WAREHOUSE) {
166            self.config.warehouse_location = warehouse_location;
167        }
168
169        let name = name.into();
170
171        let mut valid_sql_bind_style = true;
172        if let Some(sql_bind_style) = self.config.props.remove(SQL_CATALOG_PROP_BIND_STYLE) {
173            if let Ok(sql_bind_style) = SqlBindStyle::from_str(&sql_bind_style) {
174                self.config.sql_bind_style = sql_bind_style;
175            } else {
176                valid_sql_bind_style = false;
177            }
178        }
179
180        let valid_name = !name.trim().is_empty();
181
182        async move {
183            if !valid_name {
184                Err(Error::new(
185                    ErrorKind::DataInvalid,
186                    "Catalog name cannot be empty",
187                ))
188            } else if !valid_sql_bind_style {
189                Err(Error::new(
190                    ErrorKind::DataInvalid,
191                    format!(
192                        "`{}` values are valid only if they're `{}` or `{}`",
193                        SQL_CATALOG_PROP_BIND_STYLE,
194                        SqlBindStyle::DollarNumeric,
195                        SqlBindStyle::QMark
196                    ),
197                ))
198            } else {
199                self.config.name = name;
200                let runtime = match self.runtime {
201                    Some(rt) => rt,
202                    None => Runtime::try_current()?,
203                };
204                SqlCatalog::new(self.config, self.storage_factory, runtime).await
205            }
206        }
207    }
208}
209
210/// A struct representing the SQL catalog configuration.
211///
212/// This struct contains various parameters that are used to configure a SQL catalog,
213/// such as the database URI, warehouse location, and file I/O settings.
214/// You are required to provide a `SqlBindStyle`, which determines how SQL statements will be bound to values in the catalog.
215/// The options available for this parameter include:
216/// - `SqlBindStyle::DollarNumeric`: Binds SQL statements using `$1`, `$2`, etc., as placeholders. This is for PostgreSQL databases.
217/// - `SqlBindStyle::QuestionMark`: Binds SQL statements using `?` as a placeholder. This is for MySQL and SQLite databases.
218#[derive(Debug)]
219struct SqlCatalogConfig {
220    uri: String,
221    name: String,
222    warehouse_location: String,
223    sql_bind_style: SqlBindStyle,
224    props: HashMap<String, String>,
225}
226
227#[derive(Debug)]
228/// Sql catalog implementation.
229pub struct SqlCatalog {
230    name: String,
231    connection: AnyPool,
232    warehouse_location: String,
233    fileio: FileIO,
234    sql_bind_style: SqlBindStyle,
235    runtime: Runtime,
236}
237
238#[derive(Debug, PartialEq, strum::EnumString, strum::Display)]
239/// Set the SQL parameter bind style to either $1..$N (Postgres style) or ? (SQLite/MySQL/MariaDB)
240pub enum SqlBindStyle {
241    /// DollarNumeric uses parameters of the form `$1..$N``, which is the Postgres style
242    DollarNumeric,
243    /// QMark uses parameters of the form `?` which is the style for other dialects (SQLite/MySQL/MariaDB)
244    QMark,
245}
246
247impl SqlCatalog {
248    /// Create new sql catalog instance
249    async fn new(
250        config: SqlCatalogConfig,
251        storage_factory: Option<Arc<dyn StorageFactory>>,
252        runtime: Runtime,
253    ) -> Result<Self> {
254        let factory = storage_factory.ok_or_else(|| {
255            Error::new(
256                ErrorKind::Unexpected,
257                "StorageFactory must be provided for SqlCatalog. Use `with_storage_factory` to configure it.",
258            )
259        })?;
260        let fileio = FileIOBuilder::new(factory).build();
261
262        install_default_drivers();
263        let max_connections: u32 = config
264            .props
265            .get("pool.max-connections")
266            .map(|v| v.parse().unwrap())
267            .unwrap_or(MAX_CONNECTIONS);
268        let idle_timeout: u64 = config
269            .props
270            .get("pool.idle-timeout")
271            .map(|v| v.parse().unwrap())
272            .unwrap_or(IDLE_TIMEOUT);
273        let test_before_acquire: bool = config
274            .props
275            .get("pool.test-before-acquire")
276            .map(|v| v.parse().unwrap())
277            .unwrap_or(TEST_BEFORE_ACQUIRE);
278
279        let pool = AnyPoolOptions::new()
280            .max_connections(max_connections)
281            .idle_timeout(Duration::from_secs(idle_timeout))
282            .test_before_acquire(test_before_acquire)
283            .connect(&config.uri)
284            .await
285            .map_err(from_sqlx_error)?;
286
287        sqlx::query(&format!(
288            "CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_NAME} (
289                {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
290                {CATALOG_FIELD_TABLE_NAMESPACE} VARCHAR(255) NOT NULL,
291                {CATALOG_FIELD_TABLE_NAME} VARCHAR(255) NOT NULL,
292                {CATALOG_FIELD_METADATA_LOCATION_PROP} VARCHAR(1000),
293                {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} VARCHAR(1000),
294                {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5),
295                PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}))"
296        ))
297        .execute(&pool)
298        .await
299        .map_err(from_sqlx_error)?;
300
301        sqlx::query(&format!(
302            "CREATE TABLE IF NOT EXISTS {NAMESPACE_TABLE_NAME} (
303                {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
304                {NAMESPACE_FIELD_NAME} VARCHAR(255) NOT NULL,
305                {NAMESPACE_FIELD_PROPERTY_KEY} VARCHAR(255),
306                {NAMESPACE_FIELD_PROPERTY_VALUE} VARCHAR(1000),
307                PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}))"
308        ))
309        .execute(&pool)
310        .await
311        .map_err(from_sqlx_error)?;
312
313        Ok(SqlCatalog {
314            name: config.name.to_owned(),
315            connection: pool,
316            warehouse_location: config.warehouse_location,
317            fileio,
318            sql_bind_style: config.sql_bind_style,
319            runtime,
320        })
321    }
322
323    /// SQLX Any does not implement PostgresSQL bindings, so we have to do this.
324    fn replace_placeholders(&self, query: &str) -> String {
325        match self.sql_bind_style {
326            SqlBindStyle::DollarNumeric => {
327                let mut count = 1;
328                query
329                    .chars()
330                    .fold(String::with_capacity(query.len()), |mut acc, c| {
331                        if c == '?' {
332                            acc.push('$');
333                            acc.push_str(&count.to_string());
334                            count += 1;
335                        } else {
336                            acc.push(c);
337                        }
338                        acc
339                    })
340            }
341            _ => query.to_owned(),
342        }
343    }
344
345    /// Fetch a vec of AnyRows from a given query
346    async fn fetch_rows(&self, query: &str, args: Vec<Option<&str>>) -> Result<Vec<AnyRow>> {
347        let query_with_placeholders = self.replace_placeholders(query);
348
349        let mut sqlx_query = sqlx::query(&query_with_placeholders);
350        for arg in args {
351            sqlx_query = sqlx_query.bind(arg);
352        }
353
354        sqlx_query
355            .fetch_all(&self.connection)
356            .await
357            .map_err(from_sqlx_error)
358    }
359
360    /// Execute statements in a transaction, provided or not
361    async fn execute(
362        &self,
363        query: &str,
364        args: Vec<Option<&str>>,
365        transaction: Option<&mut Transaction<'_, Any>>,
366    ) -> Result<AnyQueryResult> {
367        let query_with_placeholders = self.replace_placeholders(query);
368
369        let mut sqlx_query = sqlx::query(&query_with_placeholders);
370        for arg in args {
371            sqlx_query = sqlx_query.bind(arg);
372        }
373
374        match transaction {
375            Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error),
376            None => {
377                let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
378                let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error);
379                let _ = tx.commit().await.map_err(from_sqlx_error);
380                result
381            }
382        }
383    }
384}
385
386#[async_trait]
387impl Catalog for SqlCatalog {
388    async fn list_namespaces(
389        &self,
390        parent: Option<&NamespaceIdent>,
391    ) -> Result<Vec<NamespaceIdent>> {
392        // UNION will remove duplicates.
393        let all_namespaces_stmt = format!(
394            "SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
395             FROM {CATALOG_TABLE_NAME}
396             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
397             UNION
398             SELECT {NAMESPACE_FIELD_NAME}
399             FROM {NAMESPACE_TABLE_NAME}
400             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
401        );
402
403        let namespace_rows = self
404            .fetch_rows(&all_namespaces_stmt, vec![
405                Some(&self.name),
406                Some(&self.name),
407            ])
408            .await?;
409
410        let mut namespaces = HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len());
411
412        if let Some(parent) = parent {
413            if self.namespace_exists(parent).await? {
414                let parent_str = parent.join(".");
415
416                for row in namespace_rows.iter() {
417                    let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
418                    // if parent = a, then we only want to see a.b, a.c returned.
419                    if nsp != parent_str && nsp.starts_with(&parent_str) {
420                        namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?);
421                    }
422                }
423
424                Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
425            } else {
426                no_such_namespace_err(parent)
427            }
428        } else {
429            for row in namespace_rows.iter() {
430                let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
431                let mut levels = nsp.split(".").collect::<Vec<&str>>();
432                if !levels.is_empty() {
433                    let first_level = levels.drain(..1).collect::<Vec<&str>>();
434                    namespaces.insert(NamespaceIdent::from_strs(first_level)?);
435                }
436            }
437
438            Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
439        }
440    }
441
442    async fn create_namespace(
443        &self,
444        namespace: &NamespaceIdent,
445        properties: HashMap<String, String>,
446    ) -> Result<Namespace> {
447        let exists = self.namespace_exists(namespace).await?;
448
449        if exists {
450            return Err(Error::new(
451                iceberg::ErrorKind::NamespaceAlreadyExists,
452                format!("Namespace {namespace:?} already exists"),
453            ));
454        }
455
456        let namespace_str = namespace.join(".");
457        let insert = format!(
458            "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
459             VALUES (?, ?, ?, ?)");
460        if !properties.is_empty() {
461            let mut insert_properties = properties.clone();
462            insert_properties.insert("exists".to_string(), "true".to_string());
463
464            let mut query_args = Vec::with_capacity(insert_properties.len() * 4);
465            let mut insert_stmt = insert.clone();
466            for (index, (key, value)) in insert_properties.iter().enumerate() {
467                query_args.extend_from_slice(&[
468                    Some(self.name.as_str()),
469                    Some(namespace_str.as_str()),
470                    Some(key.as_str()),
471                    Some(value.as_str()),
472                ]);
473                if index > 0 {
474                    insert_stmt.push_str(", (?, ?, ?, ?)");
475                }
476            }
477
478            self.execute(&insert_stmt, query_args, None).await?;
479
480            Ok(Namespace::with_properties(
481                namespace.clone(),
482                insert_properties,
483            ))
484        } else {
485            // set a default property of exists = true
486            self.execute(
487                &insert,
488                vec![
489                    Some(&self.name),
490                    Some(&namespace_str),
491                    Some("exists"),
492                    Some("true"),
493                ],
494                None,
495            )
496            .await?;
497            Ok(Namespace::with_properties(namespace.clone(), properties))
498        }
499    }
500
501    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
502        let exists = self.namespace_exists(namespace).await?;
503        if exists {
504            let namespace_props = self
505                .fetch_rows(
506                    &format!(
507                        "SELECT
508                            {NAMESPACE_FIELD_NAME},
509                            {NAMESPACE_FIELD_PROPERTY_KEY},
510                            {NAMESPACE_FIELD_PROPERTY_VALUE}
511                            FROM {NAMESPACE_TABLE_NAME}
512                            WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
513                            AND {NAMESPACE_FIELD_NAME} = ?"
514                    ),
515                    vec![Some(&self.name), Some(&namespace.join("."))],
516                )
517                .await?;
518
519            let mut properties = HashMap::with_capacity(namespace_props.len());
520
521            for row in namespace_props {
522                let key = row
523                    .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_KEY)
524                    .map_err(from_sqlx_error)?;
525                let value = row
526                    .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_VALUE)
527                    .map_err(from_sqlx_error)?;
528
529                properties.insert(key, value);
530            }
531
532            Ok(Namespace::with_properties(namespace.clone(), properties))
533        } else {
534            no_such_namespace_err(namespace)
535        }
536    }
537
538    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
539        let namespace_str = namespace.join(".");
540
541        let table_namespaces = self
542            .fetch_rows(
543                &format!(
544                    "SELECT 1 FROM {CATALOG_TABLE_NAME}
545                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
546                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
547                     LIMIT 1"
548                ),
549                vec![Some(&self.name), Some(&namespace_str)],
550            )
551            .await?;
552
553        if !table_namespaces.is_empty() {
554            Ok(true)
555        } else {
556            let namespaces = self
557                .fetch_rows(
558                    &format!(
559                        "SELECT 1 FROM {NAMESPACE_TABLE_NAME}
560                         WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
561                          AND {NAMESPACE_FIELD_NAME} = ?
562                         LIMIT 1"
563                    ),
564                    vec![Some(&self.name), Some(&namespace_str)],
565                )
566                .await?;
567            if !namespaces.is_empty() {
568                Ok(true)
569            } else {
570                Ok(false)
571            }
572        }
573    }
574
575    async fn update_namespace(
576        &self,
577        namespace: &NamespaceIdent,
578        properties: HashMap<String, String>,
579    ) -> Result<()> {
580        let exists = self.namespace_exists(namespace).await?;
581        if exists {
582            let existing_properties = self.get_namespace(namespace).await?.properties().clone();
583            let namespace_str = namespace.join(".");
584
585            let mut updates = vec![];
586            let mut inserts = vec![];
587
588            for (key, value) in properties.iter() {
589                if existing_properties.contains_key(key) {
590                    if existing_properties.get(key) != Some(value) {
591                        updates.push((key, value));
592                    }
593                } else {
594                    inserts.push((key, value));
595                }
596            }
597
598            let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
599            let update_stmt = format!(
600                "UPDATE {NAMESPACE_TABLE_NAME} SET {NAMESPACE_FIELD_PROPERTY_VALUE} = ?
601                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ? 
602                 AND {NAMESPACE_FIELD_NAME} = ?
603                 AND {NAMESPACE_FIELD_PROPERTY_KEY} = ?"
604            );
605
606            let insert_stmt = format!(
607                "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
608                 VALUES (?, ?, ?, ?)"
609            );
610
611            for (key, value) in updates {
612                self.execute(
613                    &update_stmt,
614                    vec![
615                        Some(value),
616                        Some(&self.name),
617                        Some(&namespace_str),
618                        Some(key),
619                    ],
620                    Some(&mut tx),
621                )
622                .await?;
623            }
624
625            for (key, value) in inserts {
626                self.execute(
627                    &insert_stmt,
628                    vec![
629                        Some(&self.name),
630                        Some(&namespace_str),
631                        Some(key),
632                        Some(value),
633                    ],
634                    Some(&mut tx),
635                )
636                .await?;
637            }
638
639            let _ = tx.commit().await.map_err(from_sqlx_error)?;
640
641            Ok(())
642        } else {
643            no_such_namespace_err(namespace)
644        }
645    }
646
647    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
648        let exists = self.namespace_exists(namespace).await?;
649        if exists {
650            // if there are tables in the namespace, don't allow drop.
651            let tables = self.list_tables(namespace).await?;
652            if !tables.is_empty() {
653                return Err(Error::new(
654                    iceberg::ErrorKind::Unexpected,
655                    format!(
656                        "Namespace {:?} is not empty. {} tables exist.",
657                        namespace,
658                        tables.len()
659                    ),
660                ));
661            }
662
663            self.execute(
664                &format!(
665                    "DELETE FROM {NAMESPACE_TABLE_NAME}
666                     WHERE {NAMESPACE_FIELD_NAME} = ?
667                      AND {CATALOG_FIELD_CATALOG_NAME} = ?"
668                ),
669                vec![Some(&namespace.join(".")), Some(&self.name)],
670                None,
671            )
672            .await?;
673
674            Ok(())
675        } else {
676            no_such_namespace_err(namespace)
677        }
678    }
679
680    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
681        let exists = self.namespace_exists(namespace).await?;
682        if exists {
683            let rows = self
684                .fetch_rows(
685                    &format!(
686                        "SELECT {CATALOG_FIELD_TABLE_NAME},
687                                {CATALOG_FIELD_TABLE_NAMESPACE}
688                         FROM {CATALOG_TABLE_NAME}
689                         WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
690                          AND {CATALOG_FIELD_CATALOG_NAME} = ?
691                          AND (
692                                {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
693                                OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
694                          )",
695                    ),
696                    vec![Some(&namespace.join(".")), Some(&self.name)],
697                )
698                .await?;
699
700            let mut tables = HashSet::<TableIdent>::with_capacity(rows.len());
701
702            for row in rows.iter() {
703                let tbl = row
704                    .try_get::<String, _>(CATALOG_FIELD_TABLE_NAME)
705                    .map_err(from_sqlx_error)?;
706                let ns_strs = row
707                    .try_get::<String, _>(CATALOG_FIELD_TABLE_NAMESPACE)
708                    .map_err(from_sqlx_error)?;
709                let ns = NamespaceIdent::from_strs(ns_strs.split("."))?;
710                tables.insert(TableIdent::new(ns, tbl));
711            }
712
713            Ok(tables.into_iter().collect::<Vec<TableIdent>>())
714        } else {
715            no_such_namespace_err(namespace)
716        }
717    }
718
719    async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
720        let namespace = identifier.namespace().join(".");
721        let table_name = identifier.name();
722        let table_counts = self
723            .fetch_rows(
724                &format!(
725                    "SELECT 1
726                     FROM {CATALOG_TABLE_NAME}
727                     WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
728                      AND {CATALOG_FIELD_CATALOG_NAME} = ?
729                      AND {CATALOG_FIELD_TABLE_NAME} = ?
730                      AND (
731                        {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
732                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
733                      )"
734                ),
735                vec![Some(&namespace), Some(&self.name), Some(table_name)],
736            )
737            .await?;
738
739        if !table_counts.is_empty() {
740            Ok(true)
741        } else {
742            Ok(false)
743        }
744    }
745
746    async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
747        if !self.table_exists(identifier).await? {
748            return no_such_table_err(identifier);
749        }
750
751        self.execute(
752            &format!(
753                "DELETE FROM {CATALOG_TABLE_NAME}
754                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
755                  AND {CATALOG_FIELD_TABLE_NAME} = ?
756                  AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
757                  AND (
758                    {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
759                    OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
760                  )"
761            ),
762            vec![
763                Some(&self.name),
764                Some(identifier.name()),
765                Some(&identifier.namespace().join(".")),
766            ],
767            None,
768        )
769        .await?;
770
771        Ok(())
772    }
773
774    async fn purge_table(&self, table: &TableIdent) -> Result<()> {
775        let table_info = self.load_table(table).await?;
776        self.drop_table(table).await?;
777        iceberg::drop_table_data(
778            table_info.file_io(),
779            table_info.metadata(),
780            table_info.metadata_location(),
781        )
782        .await
783    }
784
785    async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
786        if !self.table_exists(identifier).await? {
787            return no_such_table_err(identifier);
788        }
789
790        let rows = self
791            .fetch_rows(
792                &format!(
793                    "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP}
794                     FROM {CATALOG_TABLE_NAME}
795                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
796                      AND {CATALOG_FIELD_TABLE_NAME} = ?
797                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
798                      AND (
799                        {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
800                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
801                      )"
802                ),
803                vec![
804                    Some(&self.name),
805                    Some(identifier.name()),
806                    Some(&identifier.namespace().join(".")),
807                ],
808            )
809            .await?;
810
811        if rows.is_empty() {
812            return no_such_table_err(identifier);
813        }
814
815        let row = &rows[0];
816        let tbl_metadata_location = row
817            .try_get::<String, _>(CATALOG_FIELD_METADATA_LOCATION_PROP)
818            .map_err(from_sqlx_error)?;
819
820        let metadata = TableMetadata::read_from(&self.fileio, &tbl_metadata_location).await?;
821
822        Ok(Table::builder()
823            .file_io(self.fileio.clone())
824            .identifier(identifier.clone())
825            .metadata_location(tbl_metadata_location)
826            .metadata(metadata)
827            .runtime(self.runtime.clone())
828            .build()?)
829    }
830
831    async fn create_table(
832        &self,
833        namespace: &NamespaceIdent,
834        creation: TableCreation,
835    ) -> Result<Table> {
836        if !self.namespace_exists(namespace).await? {
837            return no_such_namespace_err(namespace);
838        }
839
840        let tbl_name = creation.name.clone();
841        let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone());
842
843        if self.table_exists(&tbl_ident).await? {
844            return table_already_exists_err(&tbl_ident);
845        }
846
847        let (tbl_creation, location) = match creation.location.clone() {
848            Some(location) => (creation, location),
849            None => {
850                // fall back to namespace-specific location
851                // and then to warehouse location
852                let nsp_properties = self.get_namespace(namespace).await?.properties().clone();
853                let nsp_location = match nsp_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) {
854                    Some(location) => location.clone(),
855                    None => {
856                        format!(
857                            "{}/{}",
858                            self.warehouse_location.clone(),
859                            namespace.join("/")
860                        )
861                    }
862                };
863
864                let tbl_location = format!("{}/{}", nsp_location, tbl_ident.name());
865
866                (
867                    TableCreation {
868                        location: Some(tbl_location.clone()),
869                        ..creation
870                    },
871                    tbl_location,
872                )
873            }
874        };
875
876        let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
877            .build()?
878            .metadata;
879        let tbl_metadata_location =
880            MetadataLocation::new_with_metadata(location.clone(), &tbl_metadata);
881
882        tbl_metadata
883            .write_to(&self.fileio, &tbl_metadata_location)
884            .await?;
885
886        let tbl_metadata_location_str = tbl_metadata_location.to_string();
887        self.execute(&format!(
888            "INSERT INTO {CATALOG_TABLE_NAME}
889             ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
890             VALUES (?, ?, ?, ?, ?)
891            "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location_str), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
892
893        Ok(Table::builder()
894            .file_io(self.fileio.clone())
895            .metadata_location(tbl_metadata_location_str)
896            .identifier(tbl_ident)
897            .metadata(tbl_metadata)
898            .runtime(self.runtime.clone())
899            .build()?)
900    }
901
902    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
903        if src == dest {
904            return Ok(());
905        }
906
907        if !self.table_exists(src).await? {
908            return no_such_table_err(src);
909        }
910
911        if !self.namespace_exists(dest.namespace()).await? {
912            return no_such_namespace_err(dest.namespace());
913        }
914
915        if self.table_exists(dest).await? {
916            return table_already_exists_err(dest);
917        }
918
919        self.execute(
920            &format!(
921                "UPDATE {CATALOG_TABLE_NAME}
922                 SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ?
923                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
924                  AND {CATALOG_FIELD_TABLE_NAME} = ?
925                  AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
926                  AND (
927                    {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
928                    OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
929                )"
930            ),
931            vec![
932                Some(dest.name()),
933                Some(&dest.namespace().join(".")),
934                Some(&self.name),
935                Some(src.name()),
936                Some(&src.namespace().join(".")),
937            ],
938            None,
939        )
940        .await?;
941
942        Ok(())
943    }
944
945    async fn register_table(
946        &self,
947        table_ident: &TableIdent,
948        metadata_location: String,
949    ) -> Result<Table> {
950        if self.table_exists(table_ident).await? {
951            return table_already_exists_err(table_ident);
952        }
953
954        let metadata = TableMetadata::read_from(&self.fileio, &metadata_location).await?;
955
956        let namespace = table_ident.namespace();
957        let tbl_name = table_ident.name().to_string();
958
959        self.execute(&format!(
960            "INSERT INTO {CATALOG_TABLE_NAME}
961             ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
962             VALUES (?, ?, ?, ?, ?)
963            "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name), Some(&metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
964
965        Ok(Table::builder()
966            .identifier(table_ident.clone())
967            .metadata_location(metadata_location)
968            .metadata(metadata)
969            .file_io(self.fileio.clone())
970            .runtime(self.runtime.clone())
971            .build()?)
972    }
973
974    /// Updates an existing table within the SQL catalog.
975    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
976        let table_ident = commit.identifier().clone();
977        let current_table = self.load_table(&table_ident).await?;
978        let current_metadata_location = current_table.metadata_location_result()?.to_string();
979
980        let staged_table = commit.apply(current_table)?;
981        let staged_metadata_location_str = staged_table.metadata_location_result()?;
982        let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
983
984        staged_table
985            .metadata()
986            .write_to(staged_table.file_io(), &staged_metadata_location)
987            .await?;
988
989        let staged_metadata_location_str = staged_metadata_location.to_string();
990        let update_result = self
991            .execute(
992                &format!(
993                    "UPDATE {CATALOG_TABLE_NAME}
994                     SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?, {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} = ?
995                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
996                      AND {CATALOG_FIELD_TABLE_NAME} = ?
997                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
998                      AND (
999                        {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
1000                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
1001                      )
1002                      AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?"
1003                ),
1004                vec![
1005                    Some(&staged_metadata_location_str),
1006                    Some(current_metadata_location.as_str()),
1007                    Some(&self.name),
1008                    Some(table_ident.name()),
1009                    Some(&table_ident.namespace().join(".")),
1010                    Some(current_metadata_location.as_str()),
1011                ],
1012                None,
1013            )
1014            .await?;
1015
1016        if update_result.rows_affected() == 0 {
1017            return Err(Error::new(
1018                ErrorKind::CatalogCommitConflicts,
1019                format!("Commit conflicted for table: {table_ident}"),
1020            )
1021            .with_retryable(true));
1022        }
1023
1024        Ok(staged_table)
1025    }
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030    use std::collections::{HashMap, HashSet};
1031    use std::hash::Hash;
1032    use std::sync::Arc;
1033
1034    use iceberg::io::LocalFsStorageFactory;
1035    use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
1036    use iceberg::table::Table;
1037    use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent};
1038    use itertools::Itertools;
1039    use regex::Regex;
1040    use sqlx::migrate::MigrateDatabase;
1041    use tempfile::TempDir;
1042
1043    use crate::catalog::{
1044        NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, SQL_CATALOG_PROP_URI,
1045        SQL_CATALOG_PROP_WAREHOUSE,
1046    };
1047    use crate::{SqlBindStyle, SqlCatalogBuilder};
1048
1049    const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
1050
1051    fn temp_path() -> String {
1052        let temp_dir = TempDir::new().unwrap();
1053        temp_dir.path().to_str().unwrap().to_string()
1054    }
1055
1056    fn to_set<T: std::cmp::Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
1057        HashSet::from_iter(vec)
1058    }
1059
1060    fn default_properties() -> HashMap<String, String> {
1061        HashMap::from([("exists".to_string(), "true".to_string())])
1062    }
1063
1064    /// Create a new SQLite catalog for testing. If name is not specified it defaults to "iceberg".
1065    async fn new_sql_catalog(
1066        warehouse_location: String,
1067        name: Option<impl ToString>,
1068    ) -> impl Catalog {
1069        let name = if let Some(name) = name {
1070            name.to_string()
1071        } else {
1072            "iceberg".to_string()
1073        };
1074        let sql_lite_uri = format!("sqlite:{}", temp_path());
1075        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1076
1077        let props = HashMap::from_iter([
1078            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.to_string()),
1079            (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1080            (
1081                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1082                SqlBindStyle::DollarNumeric.to_string(),
1083            ),
1084        ]);
1085        SqlCatalogBuilder::default()
1086            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1087            .load(&name, props)
1088            .await
1089            .unwrap()
1090    }
1091
1092    async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
1093        let _ = catalog
1094            .create_namespace(namespace_ident, HashMap::new())
1095            .await
1096            .unwrap();
1097    }
1098
1099    async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
1100        for namespace_ident in namespace_idents {
1101            let _ = create_namespace(catalog, namespace_ident).await;
1102        }
1103    }
1104
1105    fn simple_table_schema() -> Schema {
1106        Schema::builder()
1107            .with_fields(vec![
1108                NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
1109            ])
1110            .build()
1111            .unwrap()
1112    }
1113
1114    async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
1115        let _ = catalog
1116            .create_table(
1117                &table_ident.namespace,
1118                TableCreation::builder()
1119                    .name(table_ident.name().into())
1120                    .schema(simple_table_schema())
1121                    .location(temp_path())
1122                    .build(),
1123            )
1124            .await
1125            .unwrap();
1126    }
1127
1128    async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
1129        for table_ident in table_idents {
1130            create_table(catalog, table_ident).await;
1131        }
1132    }
1133
1134    fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
1135        assert_eq!(table.identifier(), expected_table_ident);
1136
1137        let metadata = table.metadata();
1138
1139        assert_eq!(metadata.current_schema().as_ref(), expected_schema);
1140
1141        let expected_partition_spec = PartitionSpec::builder(expected_schema.clone())
1142            .with_spec_id(0)
1143            .build()
1144            .unwrap();
1145
1146        assert_eq!(
1147            metadata
1148                .partition_specs_iter()
1149                .map(|p| p.as_ref())
1150                .collect_vec(),
1151            vec![&expected_partition_spec]
1152        );
1153
1154        let expected_sorted_order = SortOrder::builder()
1155            .with_order_id(0)
1156            .with_fields(vec![])
1157            .build(expected_schema)
1158            .unwrap();
1159
1160        assert_eq!(
1161            metadata
1162                .sort_orders_iter()
1163                .map(|s| s.as_ref())
1164                .collect_vec(),
1165            vec![&expected_sorted_order]
1166        );
1167
1168        assert_eq!(metadata.properties(), &HashMap::new());
1169
1170        assert!(!table.readonly());
1171    }
1172
1173    fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
1174        let actual = table.metadata_location().unwrap().to_string();
1175        let regex = Regex::new(regex_str).unwrap();
1176        assert!(regex.is_match(&actual))
1177    }
1178
1179    #[tokio::test]
1180    async fn test_initialized() {
1181        let warehouse_loc = temp_path();
1182        new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1183        // catalog instantiation should not fail even if tables exist
1184        new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1185        new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1186    }
1187
1188    #[tokio::test]
1189    async fn test_builder_method() {
1190        let sql_lite_uri = format!("sqlite:{}", temp_path());
1191        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1192        let warehouse_location = temp_path();
1193
1194        let catalog = SqlCatalogBuilder::default()
1195            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1196            .uri(sql_lite_uri.to_string())
1197            .warehouse_location(warehouse_location.clone())
1198            .sql_bind_style(SqlBindStyle::QMark)
1199            .load("iceberg", HashMap::default())
1200            .await;
1201        assert!(catalog.is_ok());
1202
1203        let catalog = catalog.unwrap();
1204        assert!(catalog.warehouse_location == warehouse_location);
1205        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1206    }
1207
1208    /// Overwriting an sqlite database with a non-existent path causes
1209    /// catalog generation to fail
1210    #[tokio::test]
1211    async fn test_builder_props_non_existent_path_fails() {
1212        let sql_lite_uri = format!("sqlite:{}", temp_path());
1213        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1214        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1215        let warehouse_location = temp_path();
1216
1217        let catalog = SqlCatalogBuilder::default()
1218            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1219            .uri(sql_lite_uri)
1220            .warehouse_location(warehouse_location)
1221            .load(
1222                "iceberg",
1223                HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)]),
1224            )
1225            .await;
1226        assert!(catalog.is_err());
1227    }
1228
1229    /// Even when an invalid URI is specified in a builder method,
1230    /// it can be successfully overridden with a valid URI in props
1231    /// for catalog generation to succeed.
1232    #[tokio::test]
1233    async fn test_builder_props_set_valid_uri() {
1234        let sql_lite_uri = format!("sqlite:{}", temp_path());
1235        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1236        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1237        let warehouse_location = temp_path();
1238
1239        let catalog = SqlCatalogBuilder::default()
1240            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1241            .uri(sql_lite_uri2)
1242            .warehouse_location(warehouse_location)
1243            .load(
1244                "iceberg",
1245                HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone())]),
1246            )
1247            .await;
1248        assert!(catalog.is_ok());
1249    }
1250
1251    /// values assigned via props take precedence
1252    #[tokio::test]
1253    async fn test_builder_props_take_precedence() {
1254        let sql_lite_uri = format!("sqlite:{}", temp_path());
1255        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1256        let warehouse_location = temp_path();
1257        let warehouse_location2 = temp_path();
1258
1259        let catalog = SqlCatalogBuilder::default()
1260            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1261            .warehouse_location(warehouse_location2)
1262            .sql_bind_style(SqlBindStyle::DollarNumeric)
1263            .load(
1264                "iceberg",
1265                HashMap::from_iter([
1266                    (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1267                    (
1268                        SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1269                        warehouse_location.clone(),
1270                    ),
1271                    (
1272                        SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1273                        SqlBindStyle::QMark.to_string(),
1274                    ),
1275                ]),
1276            )
1277            .await;
1278
1279        assert!(catalog.is_ok());
1280
1281        let catalog = catalog.unwrap();
1282        assert!(catalog.warehouse_location == warehouse_location);
1283        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1284    }
1285
1286    /// values assigned via props take precedence
1287    #[tokio::test]
1288    async fn test_builder_props_take_precedence_props() {
1289        let sql_lite_uri = format!("sqlite:{}", temp_path());
1290        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1291        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1292        let warehouse_location = temp_path();
1293        let warehouse_location2 = temp_path();
1294
1295        let props = HashMap::from_iter([
1296            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1297            (
1298                SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1299                warehouse_location.clone(),
1300            ),
1301            (
1302                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1303                SqlBindStyle::QMark.to_string(),
1304            ),
1305        ]);
1306        let props2 = HashMap::from_iter([
1307            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2.clone()),
1308            (
1309                SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1310                warehouse_location2.clone(),
1311            ),
1312            (
1313                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1314                SqlBindStyle::DollarNumeric.to_string(),
1315            ),
1316        ]);
1317
1318        let catalog = SqlCatalogBuilder::default()
1319            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1320            .props(props2)
1321            .load("iceberg", props)
1322            .await;
1323
1324        assert!(catalog.is_ok());
1325
1326        let catalog = catalog.unwrap();
1327        assert!(catalog.warehouse_location == warehouse_location);
1328        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1329    }
1330
1331    /// values assigned via props take precedence
1332    #[tokio::test]
1333    async fn test_builder_props_take_precedence_prop() {
1334        let sql_lite_uri = format!("sqlite:{}", temp_path());
1335        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1336        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1337        let warehouse_location = temp_path();
1338        let warehouse_location2 = temp_path();
1339
1340        let props = HashMap::from_iter([
1341            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1342            (
1343                SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1344                warehouse_location.clone(),
1345            ),
1346            (
1347                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1348                SqlBindStyle::QMark.to_string(),
1349            ),
1350        ]);
1351
1352        let catalog = SqlCatalogBuilder::default()
1353            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1354            .prop(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)
1355            .prop(SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location2)
1356            .prop(
1357                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1358                SqlBindStyle::DollarNumeric.to_string(),
1359            )
1360            .load("iceberg", props)
1361            .await;
1362
1363        assert!(catalog.is_ok());
1364
1365        let catalog = catalog.unwrap();
1366        assert!(catalog.warehouse_location == warehouse_location);
1367        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1368    }
1369
1370    /// invalid value for `SqlBindStyle` causes catalog creation to fail
1371    #[tokio::test]
1372    async fn test_builder_props_invalid_bind_style_fails() {
1373        let sql_lite_uri = format!("sqlite:{}", temp_path());
1374        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1375        let warehouse_location = temp_path();
1376
1377        let catalog = SqlCatalogBuilder::default()
1378            .with_storage_factory(Arc::new(LocalFsStorageFactory))
1379            .load(
1380                "iceberg",
1381                HashMap::from_iter([
1382                    (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1383                    (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1384                    (SQL_CATALOG_PROP_BIND_STYLE.to_string(), "AAA".to_string()),
1385                ]),
1386            )
1387            .await;
1388
1389        assert!(catalog.is_err());
1390    }
1391
1392    #[tokio::test]
1393    async fn test_list_namespaces_returns_empty_vector() {
1394        let warehouse_loc = temp_path();
1395        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1396
1397        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
1398    }
1399
1400    #[tokio::test]
1401    async fn test_list_namespaces_returns_empty_different_name() {
1402        let warehouse_loc = temp_path();
1403        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1404        let namespace_ident_1 = NamespaceIdent::new("a".into());
1405        let namespace_ident_2 = NamespaceIdent::new("b".into());
1406        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1407        assert_eq!(
1408            to_set(catalog.list_namespaces(None).await.unwrap()),
1409            to_set(vec![namespace_ident_1, namespace_ident_2])
1410        );
1411
1412        let catalog2 = new_sql_catalog(warehouse_loc, Some("test")).await;
1413        assert_eq!(catalog2.list_namespaces(None).await.unwrap(), vec![]);
1414    }
1415
1416    #[tokio::test]
1417    async fn test_list_namespaces_returns_only_top_level_namespaces() {
1418        let warehouse_loc = temp_path();
1419        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1420        let namespace_ident_1 = NamespaceIdent::new("a".into());
1421        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1422        let namespace_ident_3 = NamespaceIdent::new("b".into());
1423        create_namespaces(&catalog, &vec![
1424            &namespace_ident_1,
1425            &namespace_ident_2,
1426            &namespace_ident_3,
1427        ])
1428        .await;
1429
1430        assert_eq!(
1431            to_set(catalog.list_namespaces(None).await.unwrap()),
1432            to_set(vec![namespace_ident_1, namespace_ident_3])
1433        );
1434    }
1435
1436    #[tokio::test]
1437    async fn test_list_namespaces_returns_no_namespaces_under_parent() {
1438        let warehouse_loc = temp_path();
1439        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1440        let namespace_ident_1 = NamespaceIdent::new("a".into());
1441        let namespace_ident_2 = NamespaceIdent::new("b".into());
1442        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1443
1444        assert_eq!(
1445            catalog
1446                .list_namespaces(Some(&namespace_ident_1))
1447                .await
1448                .unwrap(),
1449            vec![]
1450        );
1451    }
1452
1453    #[tokio::test]
1454    async fn test_list_namespaces_returns_namespace_under_parent() {
1455        let warehouse_loc = temp_path();
1456        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1457        let namespace_ident_1 = NamespaceIdent::new("a".into());
1458        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1459        let namespace_ident_3 = NamespaceIdent::new("c".into());
1460        create_namespaces(&catalog, &vec![
1461            &namespace_ident_1,
1462            &namespace_ident_2,
1463            &namespace_ident_3,
1464        ])
1465        .await;
1466
1467        assert_eq!(
1468            to_set(catalog.list_namespaces(None).await.unwrap()),
1469            to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
1470        );
1471
1472        assert_eq!(
1473            catalog
1474                .list_namespaces(Some(&namespace_ident_1))
1475                .await
1476                .unwrap(),
1477            vec![NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()]
1478        );
1479    }
1480
1481    #[tokio::test]
1482    async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
1483        let warehouse_loc = temp_path();
1484        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1485        let namespace_ident_1 = NamespaceIdent::new("a".to_string());
1486        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
1487        let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1488        let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
1489        let namespace_ident_5 = NamespaceIdent::new("b".into());
1490        create_namespaces(&catalog, &vec![
1491            &namespace_ident_1,
1492            &namespace_ident_2,
1493            &namespace_ident_3,
1494            &namespace_ident_4,
1495            &namespace_ident_5,
1496        ])
1497        .await;
1498
1499        assert_eq!(
1500            to_set(
1501                catalog
1502                    .list_namespaces(Some(&namespace_ident_1))
1503                    .await
1504                    .unwrap()
1505            ),
1506            to_set(vec![
1507                NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(),
1508                NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(),
1509                NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(),
1510            ])
1511        );
1512    }
1513
1514    #[tokio::test]
1515    async fn test_namespace_exists_returns_false() {
1516        let warehouse_loc = temp_path();
1517        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1518        let namespace_ident = NamespaceIdent::new("a".into());
1519        create_namespace(&catalog, &namespace_ident).await;
1520
1521        assert!(
1522            !catalog
1523                .namespace_exists(&NamespaceIdent::new("b".into()))
1524                .await
1525                .unwrap()
1526        );
1527    }
1528
1529    #[tokio::test]
1530    async fn test_namespace_exists_returns_true() {
1531        let warehouse_loc = temp_path();
1532        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1533        let namespace_ident = NamespaceIdent::new("a".into());
1534        create_namespace(&catalog, &namespace_ident).await;
1535
1536        assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
1537    }
1538
1539    #[tokio::test]
1540    async fn test_create_namespace_with_properties() {
1541        let warehouse_loc = temp_path();
1542        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1543        let namespace_ident = NamespaceIdent::new("abc".into());
1544
1545        let mut properties = default_properties();
1546        properties.insert("k".into(), "v".into());
1547
1548        assert_eq!(
1549            catalog
1550                .create_namespace(&namespace_ident, properties.clone())
1551                .await
1552                .unwrap(),
1553            Namespace::with_properties(namespace_ident.clone(), properties.clone())
1554        );
1555
1556        assert_eq!(
1557            catalog.get_namespace(&namespace_ident).await.unwrap(),
1558            Namespace::with_properties(namespace_ident, properties)
1559        );
1560    }
1561
1562    #[tokio::test]
1563    async fn test_create_nested_namespace() {
1564        let warehouse_loc = temp_path();
1565        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1566        let parent_namespace_ident = NamespaceIdent::new("a".into());
1567        create_namespace(&catalog, &parent_namespace_ident).await;
1568
1569        let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1570
1571        assert_eq!(
1572            catalog
1573                .create_namespace(&child_namespace_ident, HashMap::new())
1574                .await
1575                .unwrap(),
1576            Namespace::new(child_namespace_ident.clone())
1577        );
1578
1579        assert_eq!(
1580            catalog.get_namespace(&child_namespace_ident).await.unwrap(),
1581            Namespace::with_properties(child_namespace_ident, default_properties())
1582        );
1583    }
1584
1585    #[tokio::test]
1586    async fn test_create_deeply_nested_namespace() {
1587        let warehouse_loc = temp_path();
1588        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1589        let namespace_ident_a = NamespaceIdent::new("a".into());
1590        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1591        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1592
1593        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1594
1595        assert_eq!(
1596            catalog
1597                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
1598                .await
1599                .unwrap(),
1600            Namespace::new(namespace_ident_a_b_c.clone())
1601        );
1602
1603        assert_eq!(
1604            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
1605            Namespace::with_properties(namespace_ident_a_b_c, default_properties())
1606        );
1607    }
1608
1609    #[tokio::test]
1610    async fn test_update_namespace_noop() {
1611        let warehouse_loc = temp_path();
1612        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1613        let namespace_ident = NamespaceIdent::new("a".into());
1614        create_namespace(&catalog, &namespace_ident).await;
1615
1616        catalog
1617            .update_namespace(&namespace_ident, HashMap::new())
1618            .await
1619            .unwrap();
1620
1621        assert_eq!(
1622            *catalog
1623                .get_namespace(&namespace_ident)
1624                .await
1625                .unwrap()
1626                .properties(),
1627            HashMap::from_iter([("exists".to_string(), "true".to_string())])
1628        )
1629    }
1630
1631    #[tokio::test]
1632    async fn test_update_nested_namespace() {
1633        let warehouse_loc = temp_path();
1634        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1635        let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1636        create_namespace(&catalog, &namespace_ident).await;
1637
1638        let mut props = HashMap::from_iter([
1639            ("prop1".to_string(), "val1".to_string()),
1640            ("prop2".into(), "val2".into()),
1641        ]);
1642
1643        catalog
1644            .update_namespace(&namespace_ident, props.clone())
1645            .await
1646            .unwrap();
1647
1648        props.insert("exists".into(), "true".into());
1649
1650        assert_eq!(
1651            *catalog
1652                .get_namespace(&namespace_ident)
1653                .await
1654                .unwrap()
1655                .properties(),
1656            props
1657        )
1658    }
1659
1660    #[tokio::test]
1661    async fn test_update_namespace_errors_if_nested_namespace_doesnt_exist() {
1662        let warehouse_loc = temp_path();
1663        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1664        let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1665
1666        let props = HashMap::from_iter([
1667            ("prop1".to_string(), "val1".to_string()),
1668            ("prop2".into(), "val2".into()),
1669        ]);
1670
1671        let err = catalog
1672            .update_namespace(&namespace_ident, props)
1673            .await
1674            .unwrap_err();
1675
1676        assert_eq!(
1677            err.message(),
1678            format!("No such namespace: {namespace_ident:?}")
1679        );
1680    }
1681
1682    #[tokio::test]
1683    async fn test_drop_nested_namespace() {
1684        let warehouse_loc = temp_path();
1685        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1686        let namespace_ident_a = NamespaceIdent::new("a".into());
1687        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1688        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1689
1690        catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1691
1692        assert!(
1693            !catalog
1694                .namespace_exists(&namespace_ident_a_b)
1695                .await
1696                .unwrap()
1697        );
1698
1699        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1700    }
1701
1702    #[tokio::test]
1703    async fn test_drop_deeply_nested_namespace() {
1704        let warehouse_loc = temp_path();
1705        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1706        let namespace_ident_a = NamespaceIdent::new("a".into());
1707        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1708        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1709        create_namespaces(&catalog, &vec![
1710            &namespace_ident_a,
1711            &namespace_ident_a_b,
1712            &namespace_ident_a_b_c,
1713        ])
1714        .await;
1715
1716        catalog
1717            .drop_namespace(&namespace_ident_a_b_c)
1718            .await
1719            .unwrap();
1720
1721        assert!(
1722            !catalog
1723                .namespace_exists(&namespace_ident_a_b_c)
1724                .await
1725                .unwrap()
1726        );
1727
1728        assert!(
1729            catalog
1730                .namespace_exists(&namespace_ident_a_b)
1731                .await
1732                .unwrap()
1733        );
1734
1735        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1736    }
1737
1738    #[tokio::test]
1739    async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1740        let warehouse_loc = temp_path();
1741        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1742        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1743
1744        let non_existent_namespace_ident =
1745            NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1746        assert_eq!(
1747            catalog
1748                .drop_namespace(&non_existent_namespace_ident)
1749                .await
1750                .unwrap_err()
1751                .to_string(),
1752            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1753        )
1754    }
1755
1756    #[tokio::test]
1757    async fn test_dropping_a_namespace_does_not_drop_namespaces_nested_under_that_one() {
1758        let warehouse_loc = temp_path();
1759        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1760        let namespace_ident_a = NamespaceIdent::new("a".into());
1761        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1762        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1763
1764        catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1765
1766        assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1767
1768        assert!(
1769            catalog
1770                .namespace_exists(&namespace_ident_a_b)
1771                .await
1772                .unwrap()
1773        );
1774    }
1775
1776    #[tokio::test]
1777    async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1778        let warehouse_loc = temp_path();
1779        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1780
1781        let namespace_ident = NamespaceIdent::new("a".into());
1782        let mut namespace_properties = HashMap::new();
1783        let namespace_location = temp_path();
1784        namespace_properties.insert(
1785            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1786            namespace_location.to_string(),
1787        );
1788        catalog
1789            .create_namespace(&namespace_ident, namespace_properties)
1790            .await
1791            .unwrap();
1792
1793        let table_name = "tbl1";
1794        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1795        let expected_table_metadata_location_regex =
1796            format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1797
1798        let table = catalog
1799            .create_table(
1800                &namespace_ident,
1801                TableCreation::builder()
1802                    .name(table_name.into())
1803                    .schema(simple_table_schema())
1804                    // no location specified for table
1805                    .build(),
1806            )
1807            .await
1808            .unwrap();
1809        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1810        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1811
1812        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1813        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1814        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1815    }
1816
1817    #[tokio::test]
1818    async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1819     {
1820        let warehouse_loc = temp_path();
1821        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1822
1823        let namespace_ident = NamespaceIdent::new("a".into());
1824        let mut namespace_properties = HashMap::new();
1825        let namespace_location = temp_path();
1826        namespace_properties.insert(
1827            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1828            namespace_location.to_string(),
1829        );
1830        catalog
1831            .create_namespace(&namespace_ident, namespace_properties)
1832            .await
1833            .unwrap();
1834
1835        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1836        let mut nested_namespace_properties = HashMap::new();
1837        let nested_namespace_location = temp_path();
1838        nested_namespace_properties.insert(
1839            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1840            nested_namespace_location.to_string(),
1841        );
1842        catalog
1843            .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1844            .await
1845            .unwrap();
1846
1847        let table_name = "tbl1";
1848        let expected_table_ident =
1849            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1850        let expected_table_metadata_location_regex = format!(
1851            "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1852        );
1853
1854        let table = catalog
1855            .create_table(
1856                &nested_namespace_ident,
1857                TableCreation::builder()
1858                    .name(table_name.into())
1859                    .schema(simple_table_schema())
1860                    // no location specified for table
1861                    .build(),
1862            )
1863            .await
1864            .unwrap();
1865        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1866        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1867
1868        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1869        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1870        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1871    }
1872
1873    #[tokio::test]
1874    async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1875     {
1876        let warehouse_loc = temp_path();
1877        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1878
1879        let namespace_ident = NamespaceIdent::new("a".into());
1880        // note: no location specified in namespace_properties
1881        let namespace_properties = HashMap::new();
1882        catalog
1883            .create_namespace(&namespace_ident, namespace_properties)
1884            .await
1885            .unwrap();
1886
1887        let table_name = "tbl1";
1888        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1889        let expected_table_metadata_location_regex =
1890            format!("^{warehouse_loc}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1891
1892        let table = catalog
1893            .create_table(
1894                &namespace_ident,
1895                TableCreation::builder()
1896                    .name(table_name.into())
1897                    .schema(simple_table_schema())
1898                    // no location specified for table
1899                    .build(),
1900            )
1901            .await
1902            .unwrap();
1903        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1904        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1905
1906        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1907        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1908        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1909    }
1910
1911    #[tokio::test]
1912    async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1913     {
1914        let warehouse_loc = temp_path();
1915        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1916
1917        let namespace_ident = NamespaceIdent::new("a".into());
1918        create_namespace(&catalog, &namespace_ident).await;
1919
1920        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1921        create_namespace(&catalog, &nested_namespace_ident).await;
1922
1923        let table_name = "tbl1";
1924        let expected_table_ident =
1925            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1926        let expected_table_metadata_location_regex =
1927            format!("^{warehouse_loc}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1928
1929        let table = catalog
1930            .create_table(
1931                &nested_namespace_ident,
1932                TableCreation::builder()
1933                    .name(table_name.into())
1934                    .schema(simple_table_schema())
1935                    // no location specified for table
1936                    .build(),
1937            )
1938            .await
1939            .unwrap();
1940        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1941        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1942
1943        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1944        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1945        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1946    }
1947
1948    #[tokio::test]
1949    async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1950        let warehouse_loc = temp_path();
1951        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1952        let namespace_ident = NamespaceIdent::new("a".into());
1953        create_namespace(&catalog, &namespace_ident).await;
1954        let table_name = "tbl1";
1955        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1956        create_table(&catalog, &table_ident).await;
1957
1958        let tmp_dir = TempDir::new().unwrap();
1959        let location = tmp_dir.path().to_str().unwrap().to_string();
1960
1961        assert_eq!(
1962            catalog
1963                .create_table(
1964                    &namespace_ident,
1965                    TableCreation::builder()
1966                        .name(table_name.into())
1967                        .schema(simple_table_schema())
1968                        .location(location)
1969                        .build()
1970                )
1971                .await
1972                .unwrap_err()
1973                .to_string(),
1974            format!(
1975                "TableAlreadyExists => Table {:?} already exists.",
1976                &table_ident
1977            )
1978        );
1979    }
1980
1981    #[tokio::test]
1982    async fn test_rename_table_src_table_is_same_as_dst_table() {
1983        let warehouse_loc = temp_path();
1984        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1985        let namespace_ident = NamespaceIdent::new("n1".into());
1986        create_namespace(&catalog, &namespace_ident).await;
1987        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1988        create_table(&catalog, &table_ident).await;
1989
1990        catalog
1991            .rename_table(&table_ident, &table_ident)
1992            .await
1993            .unwrap();
1994
1995        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1996            table_ident
1997        ],);
1998    }
1999
2000    #[tokio::test]
2001    async fn test_rename_table_across_nested_namespaces() {
2002        let warehouse_loc = temp_path();
2003        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2004        let namespace_ident_a = NamespaceIdent::new("a".into());
2005        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
2006        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
2007        create_namespaces(&catalog, &vec![
2008            &namespace_ident_a,
2009            &namespace_ident_a_b,
2010            &namespace_ident_a_b_c,
2011        ])
2012        .await;
2013
2014        let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
2015        create_tables(&catalog, vec![&src_table_ident]).await;
2016
2017        let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
2018        catalog
2019            .rename_table(&src_table_ident, &dst_table_ident)
2020            .await
2021            .unwrap();
2022
2023        assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
2024
2025        assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
2026    }
2027
2028    #[tokio::test]
2029    async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
2030        let warehouse_loc = temp_path();
2031        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2032        let src_namespace_ident = NamespaceIdent::new("n1".into());
2033        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
2034        create_namespace(&catalog, &src_namespace_ident).await;
2035        create_table(&catalog, &src_table_ident).await;
2036
2037        let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
2038        let dst_table_ident =
2039            TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
2040        assert_eq!(
2041            catalog
2042                .rename_table(&src_table_ident, &dst_table_ident)
2043                .await
2044                .unwrap_err()
2045                .to_string(),
2046            format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"),
2047        );
2048    }
2049}