iceberg_catalog_sql/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19use std::str::FromStr;
20use std::time::Duration;
21
22use async_trait::async_trait;
23use iceberg::io::FileIO;
24use iceberg::spec::{TableMetadata, TableMetadataBuilder};
25use iceberg::table::Table;
26use iceberg::{
27    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
28    TableCommit, TableCreation, TableIdent,
29};
30use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers};
31use sqlx::{Any, AnyPool, Row, Transaction};
32
33use crate::error::{
34    from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err,
35};
36
37/// catalog URI
38pub const SQL_CATALOG_PROP_URI: &str = "uri";
39/// catalog warehouse location
40pub const SQL_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
41/// catalog sql bind style
42pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style";
43
44static CATALOG_TABLE_NAME: &str = "iceberg_tables";
45static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
46static CATALOG_FIELD_TABLE_NAME: &str = "table_name";
47static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace";
48static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location";
49static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
50static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type";
51static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE";
52
53static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties";
54static NAMESPACE_FIELD_NAME: &str = "namespace";
55static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key";
56static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value";
57
58static NAMESPACE_LOCATION_PROPERTY_KEY: &str = "location";
59
60static MAX_CONNECTIONS: u32 = 10; // Default the SQL pool to 10 connections if not provided
61static IDLE_TIMEOUT: u64 = 10; // Default the maximum idle timeout per connection to 10s before it is closed
62static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each connection to enabled prior to returning
63
64/// Builder for [`SqlCatalog`]
65#[derive(Debug)]
66pub struct SqlCatalogBuilder(SqlCatalogConfig);
67
68impl Default for SqlCatalogBuilder {
69    fn default() -> Self {
70        Self(SqlCatalogConfig {
71            uri: "".to_string(),
72            name: "".to_string(),
73            warehouse_location: "".to_string(),
74            sql_bind_style: SqlBindStyle::DollarNumeric,
75            props: HashMap::new(),
76        })
77    }
78}
79
80impl SqlCatalogBuilder {
81    /// Configure the database URI
82    ///
83    /// If `SQL_CATALOG_PROP_URI` has a value set in `props` during `SqlCatalogBuilder::load`,
84    /// that value takes precedence, and the value specified by this method will not be used.
85    pub fn uri(mut self, uri: impl Into<String>) -> Self {
86        self.0.uri = uri.into();
87        self
88    }
89
90    /// Configure the warehouse location
91    ///
92    /// If `SQL_CATALOG_PROP_WAREHOUSE` has a value set in `props` during `SqlCatalogBuilder::load`,
93    /// that value takes precedence, and the value specified by this method will not be used.
94    pub fn warehouse_location(mut self, location: impl Into<String>) -> Self {
95        self.0.warehouse_location = location.into();
96        self
97    }
98
99    /// Configure the bound SQL Statement
100    ///
101    /// If `SQL_CATALOG_PROP_BIND_STYLE` has a value set in `props` during `SqlCatalogBuilder::load`,
102    /// that value takes precedence, and the value specified by this method will not be used.
103    pub fn sql_bind_style(mut self, sql_bind_style: SqlBindStyle) -> Self {
104        self.0.sql_bind_style = sql_bind_style;
105        self
106    }
107
108    /// Configure the any properties
109    ///
110    /// If the same key has values set in `props` during `SqlCatalogBuilder::load`,
111    /// those values will take precedence.
112    pub fn props(mut self, props: HashMap<String, String>) -> Self {
113        for (k, v) in props {
114            self.0.props.insert(k, v);
115        }
116        self
117    }
118
119    /// Set a new property on the property to be configured.
120    /// When multiple methods are executed with the same key,
121    /// the later-set value takes precedence.
122    ///
123    /// If the same key has values set in `props` during `SqlCatalogBuilder::load`,
124    /// those values will take precedence.
125    pub fn prop(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
126        self.0.props.insert(key.into(), value.into());
127        self
128    }
129}
130
131impl CatalogBuilder for SqlCatalogBuilder {
132    type C = SqlCatalog;
133
134    fn load(
135        mut self,
136        name: impl Into<String>,
137        props: HashMap<String, String>,
138    ) -> impl Future<Output = Result<Self::C>> + Send {
139        let name = name.into();
140
141        for (k, v) in props {
142            self.0.props.insert(k, v);
143        }
144
145        if let Some(uri) = self.0.props.remove(SQL_CATALOG_PROP_URI) {
146            self.0.uri = uri;
147        }
148        if let Some(warehouse_location) = self.0.props.remove(SQL_CATALOG_PROP_WAREHOUSE) {
149            self.0.warehouse_location = warehouse_location;
150        }
151
152        let mut valid_sql_bind_style = true;
153        if let Some(sql_bind_style) = self.0.props.remove(SQL_CATALOG_PROP_BIND_STYLE) {
154            if let Ok(sql_bind_style) = SqlBindStyle::from_str(&sql_bind_style) {
155                self.0.sql_bind_style = sql_bind_style;
156            } else {
157                valid_sql_bind_style = false;
158            }
159        }
160
161        async move {
162            if name.trim().is_empty() {
163                Err(Error::new(
164                    ErrorKind::DataInvalid,
165                    "Catalog name cannot be empty",
166                ))
167            } else if !valid_sql_bind_style {
168                Err(Error::new(
169                    ErrorKind::DataInvalid,
170                    format!(
171                        "`{}` values are valid only if they're `{}` or `{}`",
172                        SQL_CATALOG_PROP_BIND_STYLE,
173                        SqlBindStyle::DollarNumeric,
174                        SqlBindStyle::QMark
175                    ),
176                ))
177            } else {
178                SqlCatalog::new(self.0).await
179            }
180        }
181    }
182}
183
184/// A struct representing the SQL catalog configuration.
185///
186/// This struct contains various parameters that are used to configure a SQL catalog,
187/// such as the database URI, warehouse location, and file I/O settings.
188/// You are required to provide a `SqlBindStyle`, which determines how SQL statements will be bound to values in the catalog.
189/// The options available for this parameter include:
190/// - `SqlBindStyle::DollarNumeric`: Binds SQL statements using `$1`, `$2`, etc., as placeholders. This is for PostgreSQL databases.
191/// - `SqlBindStyle::QuestionMark`: Binds SQL statements using `?` as a placeholder. This is for MySQL and SQLite databases.
192#[derive(Debug)]
193struct SqlCatalogConfig {
194    uri: String,
195    name: String,
196    warehouse_location: String,
197    sql_bind_style: SqlBindStyle,
198    props: HashMap<String, String>,
199}
200
201#[derive(Debug)]
202/// Sql catalog implementation.
203pub struct SqlCatalog {
204    name: String,
205    connection: AnyPool,
206    warehouse_location: String,
207    fileio: FileIO,
208    sql_bind_style: SqlBindStyle,
209}
210
211#[derive(Debug, PartialEq, strum::EnumString, strum::Display)]
212/// Set the SQL parameter bind style to either $1..$N (Postgres style) or ? (SQLite/MySQL/MariaDB)
213pub enum SqlBindStyle {
214    /// DollarNumeric uses parameters of the form `$1..$N``, which is the Postgres style
215    DollarNumeric,
216    /// QMark uses parameters of the form `?` which is the style for other dialects (SQLite/MySQL/MariaDB)
217    QMark,
218}
219
220impl SqlCatalog {
221    /// Create new sql catalog instance
222    async fn new(config: SqlCatalogConfig) -> Result<Self> {
223        let fileio = FileIO::from_path(&config.warehouse_location)?.build()?;
224        install_default_drivers();
225        let max_connections: u32 = config
226            .props
227            .get("pool.max-connections")
228            .map(|v| v.parse().unwrap())
229            .unwrap_or(MAX_CONNECTIONS);
230        let idle_timeout: u64 = config
231            .props
232            .get("pool.idle-timeout")
233            .map(|v| v.parse().unwrap())
234            .unwrap_or(IDLE_TIMEOUT);
235        let test_before_acquire: bool = config
236            .props
237            .get("pool.test-before-acquire")
238            .map(|v| v.parse().unwrap())
239            .unwrap_or(TEST_BEFORE_ACQUIRE);
240
241        let pool = AnyPoolOptions::new()
242            .max_connections(max_connections)
243            .idle_timeout(Duration::from_secs(idle_timeout))
244            .test_before_acquire(test_before_acquire)
245            .connect(&config.uri)
246            .await
247            .map_err(from_sqlx_error)?;
248
249        sqlx::query(&format!(
250            "CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_NAME} (
251                {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
252                {CATALOG_FIELD_TABLE_NAMESPACE} VARCHAR(255) NOT NULL,
253                {CATALOG_FIELD_TABLE_NAME} VARCHAR(255) NOT NULL,
254                {CATALOG_FIELD_METADATA_LOCATION_PROP} VARCHAR(1000),
255                {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} VARCHAR(1000),
256                {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5),
257                PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}))"
258        ))
259        .execute(&pool)
260        .await
261        .map_err(from_sqlx_error)?;
262
263        sqlx::query(&format!(
264            "CREATE TABLE IF NOT EXISTS {NAMESPACE_TABLE_NAME} (
265                {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
266                {NAMESPACE_FIELD_NAME} VARCHAR(255) NOT NULL,
267                {NAMESPACE_FIELD_PROPERTY_KEY} VARCHAR(255),
268                {NAMESPACE_FIELD_PROPERTY_VALUE} VARCHAR(1000),
269                PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}))"
270        ))
271        .execute(&pool)
272        .await
273        .map_err(from_sqlx_error)?;
274
275        Ok(SqlCatalog {
276            name: config.name.to_owned(),
277            connection: pool,
278            warehouse_location: config.warehouse_location,
279            fileio,
280            sql_bind_style: config.sql_bind_style,
281        })
282    }
283
284    /// SQLX Any does not implement PostgresSQL bindings, so we have to do this.
285    fn replace_placeholders(&self, query: &str) -> String {
286        match self.sql_bind_style {
287            SqlBindStyle::DollarNumeric => {
288                let mut count = 1;
289                query
290                    .chars()
291                    .fold(String::with_capacity(query.len()), |mut acc, c| {
292                        if c == '?' {
293                            acc.push('$');
294                            acc.push_str(&count.to_string());
295                            count += 1;
296                        } else {
297                            acc.push(c);
298                        }
299                        acc
300                    })
301            }
302            _ => query.to_owned(),
303        }
304    }
305
306    /// Fetch a vec of AnyRows from a given query
307    async fn fetch_rows(&self, query: &str, args: Vec<Option<&str>>) -> Result<Vec<AnyRow>> {
308        let query_with_placeholders = self.replace_placeholders(query);
309
310        let mut sqlx_query = sqlx::query(&query_with_placeholders);
311        for arg in args {
312            sqlx_query = sqlx_query.bind(arg);
313        }
314
315        sqlx_query
316            .fetch_all(&self.connection)
317            .await
318            .map_err(from_sqlx_error)
319    }
320
321    /// Execute statements in a transaction, provided or not
322    async fn execute(
323        &self,
324        query: &str,
325        args: Vec<Option<&str>>,
326        transaction: Option<&mut Transaction<'_, Any>>,
327    ) -> Result<AnyQueryResult> {
328        let query_with_placeholders = self.replace_placeholders(query);
329
330        let mut sqlx_query = sqlx::query(&query_with_placeholders);
331        for arg in args {
332            sqlx_query = sqlx_query.bind(arg);
333        }
334
335        match transaction {
336            Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error),
337            None => {
338                let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
339                let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error);
340                let _ = tx.commit().await.map_err(from_sqlx_error);
341                result
342            }
343        }
344    }
345}
346
347#[async_trait]
348impl Catalog for SqlCatalog {
349    async fn list_namespaces(
350        &self,
351        parent: Option<&NamespaceIdent>,
352    ) -> Result<Vec<NamespaceIdent>> {
353        // UNION will remove duplicates.
354        let all_namespaces_stmt = format!(
355            "SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
356             FROM {CATALOG_TABLE_NAME}
357             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
358             UNION
359             SELECT {NAMESPACE_FIELD_NAME}
360             FROM {NAMESPACE_TABLE_NAME}
361             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
362        );
363
364        let namespace_rows = self
365            .fetch_rows(&all_namespaces_stmt, vec![
366                Some(&self.name),
367                Some(&self.name),
368            ])
369            .await?;
370
371        let mut namespaces = HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len());
372
373        if let Some(parent) = parent {
374            if self.namespace_exists(parent).await? {
375                let parent_str = parent.join(".");
376
377                for row in namespace_rows.iter() {
378                    let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
379                    // if parent = a, then we only want to see a.b, a.c returned.
380                    if nsp != parent_str && nsp.starts_with(&parent_str) {
381                        namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?);
382                    }
383                }
384
385                Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
386            } else {
387                no_such_namespace_err(parent)
388            }
389        } else {
390            for row in namespace_rows.iter() {
391                let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
392                let mut levels = nsp.split(".").collect::<Vec<&str>>();
393                if !levels.is_empty() {
394                    let first_level = levels.drain(..1).collect::<Vec<&str>>();
395                    namespaces.insert(NamespaceIdent::from_strs(first_level)?);
396                }
397            }
398
399            Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
400        }
401    }
402
403    async fn create_namespace(
404        &self,
405        namespace: &NamespaceIdent,
406        properties: HashMap<String, String>,
407    ) -> Result<Namespace> {
408        let exists = self.namespace_exists(namespace).await?;
409
410        if exists {
411            return Err(Error::new(
412                iceberg::ErrorKind::Unexpected,
413                format!("Namespace {namespace:?} already exists"),
414            ));
415        }
416
417        let namespace_str = namespace.join(".");
418        let insert = format!(
419            "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
420             VALUES (?, ?, ?, ?)");
421        if !properties.is_empty() {
422            let mut insert_properties = properties.clone();
423            insert_properties.insert("exists".to_string(), "true".to_string());
424
425            let mut query_args = Vec::with_capacity(insert_properties.len() * 4);
426            let mut insert_stmt = insert.clone();
427            for (index, (key, value)) in insert_properties.iter().enumerate() {
428                query_args.extend_from_slice(&[
429                    Some(self.name.as_str()),
430                    Some(namespace_str.as_str()),
431                    Some(key.as_str()),
432                    Some(value.as_str()),
433                ]);
434                if index > 0 {
435                    insert_stmt.push_str(", (?, ?, ?, ?)");
436                }
437            }
438
439            self.execute(&insert_stmt, query_args, None).await?;
440
441            Ok(Namespace::with_properties(
442                namespace.clone(),
443                insert_properties,
444            ))
445        } else {
446            // set a default property of exists = true
447            self.execute(
448                &insert,
449                vec![
450                    Some(&self.name),
451                    Some(&namespace_str),
452                    Some("exists"),
453                    Some("true"),
454                ],
455                None,
456            )
457            .await?;
458            Ok(Namespace::with_properties(namespace.clone(), properties))
459        }
460    }
461
462    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
463        let exists = self.namespace_exists(namespace).await?;
464        if exists {
465            let namespace_props = self
466                .fetch_rows(
467                    &format!(
468                        "SELECT
469                            {NAMESPACE_FIELD_NAME},
470                            {NAMESPACE_FIELD_PROPERTY_KEY},
471                            {NAMESPACE_FIELD_PROPERTY_VALUE}
472                            FROM {NAMESPACE_TABLE_NAME}
473                            WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
474                            AND {NAMESPACE_FIELD_NAME} = ?"
475                    ),
476                    vec![Some(&self.name), Some(&namespace.join("."))],
477                )
478                .await?;
479
480            let mut properties = HashMap::with_capacity(namespace_props.len());
481
482            for row in namespace_props {
483                let key = row
484                    .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_KEY)
485                    .map_err(from_sqlx_error)?;
486                let value = row
487                    .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_VALUE)
488                    .map_err(from_sqlx_error)?;
489
490                properties.insert(key, value);
491            }
492
493            Ok(Namespace::with_properties(namespace.clone(), properties))
494        } else {
495            no_such_namespace_err(namespace)
496        }
497    }
498
499    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
500        let namespace_str = namespace.join(".");
501
502        let table_namespaces = self
503            .fetch_rows(
504                &format!(
505                    "SELECT 1 FROM {CATALOG_TABLE_NAME}
506                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
507                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
508                     LIMIT 1"
509                ),
510                vec![Some(&self.name), Some(&namespace_str)],
511            )
512            .await?;
513
514        if !table_namespaces.is_empty() {
515            Ok(true)
516        } else {
517            let namespaces = self
518                .fetch_rows(
519                    &format!(
520                        "SELECT 1 FROM {NAMESPACE_TABLE_NAME}
521                         WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
522                          AND {NAMESPACE_FIELD_NAME} = ?
523                         LIMIT 1"
524                    ),
525                    vec![Some(&self.name), Some(&namespace_str)],
526                )
527                .await?;
528            if !namespaces.is_empty() {
529                Ok(true)
530            } else {
531                Ok(false)
532            }
533        }
534    }
535
536    async fn update_namespace(
537        &self,
538        namespace: &NamespaceIdent,
539        properties: HashMap<String, String>,
540    ) -> Result<()> {
541        let exists = self.namespace_exists(namespace).await?;
542        if exists {
543            let existing_properties = self.get_namespace(namespace).await?.properties().clone();
544            let namespace_str = namespace.join(".");
545
546            let mut updates = vec![];
547            let mut inserts = vec![];
548
549            for (key, value) in properties.iter() {
550                if existing_properties.contains_key(key) {
551                    if existing_properties.get(key) != Some(value) {
552                        updates.push((key, value));
553                    }
554                } else {
555                    inserts.push((key, value));
556                }
557            }
558
559            let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
560            let update_stmt = format!(
561                "UPDATE {NAMESPACE_TABLE_NAME} SET {NAMESPACE_FIELD_PROPERTY_VALUE} = ?
562                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ? 
563                 AND {NAMESPACE_FIELD_NAME} = ?
564                 AND {NAMESPACE_FIELD_PROPERTY_KEY} = ?"
565            );
566
567            let insert_stmt = format!(
568                "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
569                 VALUES (?, ?, ?, ?)"
570            );
571
572            for (key, value) in updates {
573                self.execute(
574                    &update_stmt,
575                    vec![
576                        Some(value),
577                        Some(&self.name),
578                        Some(&namespace_str),
579                        Some(key),
580                    ],
581                    Some(&mut tx),
582                )
583                .await?;
584            }
585
586            for (key, value) in inserts {
587                self.execute(
588                    &insert_stmt,
589                    vec![
590                        Some(&self.name),
591                        Some(&namespace_str),
592                        Some(key),
593                        Some(value),
594                    ],
595                    Some(&mut tx),
596                )
597                .await?;
598            }
599
600            let _ = tx.commit().await.map_err(from_sqlx_error)?;
601
602            Ok(())
603        } else {
604            no_such_namespace_err(namespace)
605        }
606    }
607
608    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
609        let exists = self.namespace_exists(namespace).await?;
610        if exists {
611            // if there are tables in the namespace, don't allow drop.
612            let tables = self.list_tables(namespace).await?;
613            if !tables.is_empty() {
614                return Err(Error::new(
615                    iceberg::ErrorKind::Unexpected,
616                    format!(
617                        "Namespace {:?} is not empty. {} tables exist.",
618                        namespace,
619                        tables.len()
620                    ),
621                ));
622            }
623
624            self.execute(
625                &format!(
626                    "DELETE FROM {NAMESPACE_TABLE_NAME}
627                     WHERE {NAMESPACE_FIELD_NAME} = ?
628                      AND {CATALOG_FIELD_CATALOG_NAME} = ?"
629                ),
630                vec![Some(&namespace.join(".")), Some(&self.name)],
631                None,
632            )
633            .await?;
634
635            Ok(())
636        } else {
637            no_such_namespace_err(namespace)
638        }
639    }
640
641    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
642        let exists = self.namespace_exists(namespace).await?;
643        if exists {
644            let rows = self
645                .fetch_rows(
646                    &format!(
647                        "SELECT {CATALOG_FIELD_TABLE_NAME},
648                                {CATALOG_FIELD_TABLE_NAMESPACE}
649                         FROM {CATALOG_TABLE_NAME}
650                         WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
651                          AND {CATALOG_FIELD_CATALOG_NAME} = ?
652                          AND (
653                                {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
654                                OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
655                          )",
656                    ),
657                    vec![Some(&namespace.join(".")), Some(&self.name)],
658                )
659                .await?;
660
661            let mut tables = HashSet::<TableIdent>::with_capacity(rows.len());
662
663            for row in rows.iter() {
664                let tbl = row
665                    .try_get::<String, _>(CATALOG_FIELD_TABLE_NAME)
666                    .map_err(from_sqlx_error)?;
667                let ns_strs = row
668                    .try_get::<String, _>(CATALOG_FIELD_TABLE_NAMESPACE)
669                    .map_err(from_sqlx_error)?;
670                let ns = NamespaceIdent::from_strs(ns_strs.split("."))?;
671                tables.insert(TableIdent::new(ns, tbl));
672            }
673
674            Ok(tables.into_iter().collect::<Vec<TableIdent>>())
675        } else {
676            no_such_namespace_err(namespace)
677        }
678    }
679
680    async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
681        let namespace = identifier.namespace().join(".");
682        let table_name = identifier.name();
683        let table_counts = self
684            .fetch_rows(
685                &format!(
686                    "SELECT 1
687                     FROM {CATALOG_TABLE_NAME}
688                     WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
689                      AND {CATALOG_FIELD_CATALOG_NAME} = ?
690                      AND {CATALOG_FIELD_TABLE_NAME} = ?
691                      AND (
692                        {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
693                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
694                      )"
695                ),
696                vec![Some(&namespace), Some(&self.name), Some(table_name)],
697            )
698            .await?;
699
700        if !table_counts.is_empty() {
701            Ok(true)
702        } else {
703            Ok(false)
704        }
705    }
706
707    async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
708        if !self.table_exists(identifier).await? {
709            return no_such_table_err(identifier);
710        }
711
712        self.execute(
713            &format!(
714                "DELETE FROM {CATALOG_TABLE_NAME}
715                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
716                  AND {CATALOG_FIELD_TABLE_NAME} = ?
717                  AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
718                  AND (
719                    {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
720                    OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
721                  )"
722            ),
723            vec![
724                Some(&self.name),
725                Some(identifier.name()),
726                Some(&identifier.namespace().join(".")),
727            ],
728            None,
729        )
730        .await?;
731
732        Ok(())
733    }
734
735    async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
736        if !self.table_exists(identifier).await? {
737            return no_such_table_err(identifier);
738        }
739
740        let rows = self
741            .fetch_rows(
742                &format!(
743                    "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP}
744                     FROM {CATALOG_TABLE_NAME}
745                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
746                      AND {CATALOG_FIELD_TABLE_NAME} = ?
747                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
748                      AND (
749                        {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
750                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
751                      )"
752                ),
753                vec![
754                    Some(&self.name),
755                    Some(identifier.name()),
756                    Some(&identifier.namespace().join(".")),
757                ],
758            )
759            .await?;
760
761        if rows.is_empty() {
762            return no_such_table_err(identifier);
763        }
764
765        let row = &rows[0];
766        let tbl_metadata_location = row
767            .try_get::<String, _>(CATALOG_FIELD_METADATA_LOCATION_PROP)
768            .map_err(from_sqlx_error)?;
769
770        let metadata = TableMetadata::read_from(&self.fileio, &tbl_metadata_location).await?;
771
772        Ok(Table::builder()
773            .file_io(self.fileio.clone())
774            .identifier(identifier.clone())
775            .metadata_location(tbl_metadata_location)
776            .metadata(metadata)
777            .build()?)
778    }
779
780    async fn create_table(
781        &self,
782        namespace: &NamespaceIdent,
783        creation: TableCreation,
784    ) -> Result<Table> {
785        if !self.namespace_exists(namespace).await? {
786            return no_such_namespace_err(namespace);
787        }
788
789        let tbl_name = creation.name.clone();
790        let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone());
791
792        if self.table_exists(&tbl_ident).await? {
793            return table_already_exists_err(&tbl_ident);
794        }
795
796        let (tbl_creation, location) = match creation.location.clone() {
797            Some(location) => (creation, location),
798            None => {
799                // fall back to namespace-specific location
800                // and then to warehouse location
801                let nsp_properties = self.get_namespace(namespace).await?.properties().clone();
802                let nsp_location = match nsp_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) {
803                    Some(location) => location.clone(),
804                    None => {
805                        format!(
806                            "{}/{}",
807                            self.warehouse_location.clone(),
808                            namespace.join("/")
809                        )
810                    }
811                };
812
813                let tbl_location = format!("{}/{}", nsp_location, tbl_ident.name());
814
815                (
816                    TableCreation {
817                        location: Some(tbl_location.clone()),
818                        ..creation
819                    },
820                    tbl_location,
821                )
822            }
823        };
824
825        let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
826            .build()?
827            .metadata;
828        let tbl_metadata_location =
829            MetadataLocation::new_with_table_location(location.clone()).to_string();
830
831        tbl_metadata
832            .write_to(&self.fileio, &tbl_metadata_location)
833            .await?;
834
835        self.execute(&format!(
836            "INSERT INTO {CATALOG_TABLE_NAME}
837             ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
838             VALUES (?, ?, ?, ?, ?)
839            "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
840
841        Ok(Table::builder()
842            .file_io(self.fileio.clone())
843            .metadata_location(tbl_metadata_location)
844            .identifier(tbl_ident)
845            .metadata(tbl_metadata)
846            .build()?)
847    }
848
849    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
850        if src == dest {
851            return Ok(());
852        }
853
854        if !self.table_exists(src).await? {
855            return no_such_table_err(src);
856        }
857
858        if !self.namespace_exists(dest.namespace()).await? {
859            return no_such_namespace_err(dest.namespace());
860        }
861
862        if self.table_exists(dest).await? {
863            return table_already_exists_err(dest);
864        }
865
866        self.execute(
867            &format!(
868                "UPDATE {CATALOG_TABLE_NAME}
869                 SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ?
870                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
871                  AND {CATALOG_FIELD_TABLE_NAME} = ?
872                  AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
873                  AND (
874                    {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
875                    OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
876                )"
877            ),
878            vec![
879                Some(dest.name()),
880                Some(&dest.namespace().join(".")),
881                Some(&self.name),
882                Some(src.name()),
883                Some(&src.namespace().join(".")),
884            ],
885            None,
886        )
887        .await?;
888
889        Ok(())
890    }
891
892    async fn register_table(
893        &self,
894        table_ident: &TableIdent,
895        metadata_location: String,
896    ) -> Result<Table> {
897        if self.table_exists(table_ident).await? {
898            return table_already_exists_err(table_ident);
899        }
900
901        let metadata = TableMetadata::read_from(&self.fileio, &metadata_location).await?;
902
903        let namespace = table_ident.namespace();
904        let tbl_name = table_ident.name().to_string();
905
906        self.execute(&format!(
907            "INSERT INTO {CATALOG_TABLE_NAME}
908             ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
909             VALUES (?, ?, ?, ?, ?)
910            "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name), Some(&metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
911
912        Ok(Table::builder()
913            .identifier(table_ident.clone())
914            .metadata_location(metadata_location)
915            .metadata(metadata)
916            .file_io(self.fileio.clone())
917            .build()?)
918    }
919
920    /// Updates an existing table within the SQL catalog.
921    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
922        let table_ident = commit.identifier().clone();
923        let current_table = self.load_table(&table_ident).await?;
924        let current_metadata_location = current_table.metadata_location_result()?.to_string();
925
926        let staged_table = commit.apply(current_table)?;
927        let staged_metadata_location = staged_table.metadata_location_result()?;
928
929        staged_table
930            .metadata()
931            .write_to(staged_table.file_io(), &staged_metadata_location)
932            .await?;
933
934        let update_result = self
935            .execute(
936                &format!(
937                    "UPDATE {CATALOG_TABLE_NAME}
938                     SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?, {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} = ?
939                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
940                      AND {CATALOG_FIELD_TABLE_NAME} = ?
941                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
942                      AND (
943                        {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
944                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
945                      )
946                      AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?"
947                ),
948                vec![
949                    Some(staged_metadata_location),
950                    Some(current_metadata_location.as_str()),
951                    Some(&self.name),
952                    Some(table_ident.name()),
953                    Some(&table_ident.namespace().join(".")),
954                    Some(current_metadata_location.as_str()),
955                ],
956                None,
957            )
958            .await?;
959
960        if update_result.rows_affected() == 0 {
961            return Err(Error::new(
962                ErrorKind::CatalogCommitConflicts,
963                format!("Commit conflicted for table: {table_ident}"),
964            )
965            .with_retryable(true));
966        }
967
968        Ok(staged_table)
969    }
970}
971
972#[cfg(test)]
973mod tests {
974    use std::collections::{HashMap, HashSet};
975    use std::hash::Hash;
976
977    use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
978    use iceberg::table::Table;
979    use iceberg::transaction::{ApplyTransactionAction, Transaction};
980    use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent};
981    use itertools::Itertools;
982    use regex::Regex;
983    use sqlx::migrate::MigrateDatabase;
984    use tempfile::TempDir;
985
986    use crate::catalog::{
987        NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, SQL_CATALOG_PROP_URI,
988        SQL_CATALOG_PROP_WAREHOUSE,
989    };
990    use crate::{SqlBindStyle, SqlCatalogBuilder};
991
992    const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
993
994    fn temp_path() -> String {
995        let temp_dir = TempDir::new().unwrap();
996        temp_dir.path().to_str().unwrap().to_string()
997    }
998
999    fn to_set<T: std::cmp::Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
1000        HashSet::from_iter(vec)
1001    }
1002
1003    fn default_properties() -> HashMap<String, String> {
1004        HashMap::from([("exists".to_string(), "true".to_string())])
1005    }
1006
1007    async fn new_sql_catalog(warehouse_location: String) -> impl Catalog {
1008        let sql_lite_uri = format!("sqlite:{}", temp_path());
1009        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1010
1011        let props = HashMap::from_iter([
1012            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.to_string()),
1013            (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1014            (
1015                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1016                SqlBindStyle::DollarNumeric.to_string(),
1017            ),
1018        ]);
1019        SqlCatalogBuilder::default()
1020            .load("iceberg", props)
1021            .await
1022            .unwrap()
1023    }
1024
1025    async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
1026        let _ = catalog
1027            .create_namespace(namespace_ident, HashMap::new())
1028            .await
1029            .unwrap();
1030    }
1031
1032    async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
1033        for namespace_ident in namespace_idents {
1034            let _ = create_namespace(catalog, namespace_ident).await;
1035        }
1036    }
1037
1038    fn simple_table_schema() -> Schema {
1039        Schema::builder()
1040            .with_fields(vec![
1041                NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
1042            ])
1043            .build()
1044            .unwrap()
1045    }
1046
1047    async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
1048        let _ = catalog
1049            .create_table(
1050                &table_ident.namespace,
1051                TableCreation::builder()
1052                    .name(table_ident.name().into())
1053                    .schema(simple_table_schema())
1054                    .location(temp_path())
1055                    .build(),
1056            )
1057            .await
1058            .unwrap();
1059    }
1060
1061    async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
1062        for table_ident in table_idents {
1063            create_table(catalog, table_ident).await;
1064        }
1065    }
1066
1067    fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
1068        assert_eq!(table.identifier(), expected_table_ident);
1069
1070        let metadata = table.metadata();
1071
1072        assert_eq!(metadata.current_schema().as_ref(), expected_schema);
1073
1074        let expected_partition_spec = PartitionSpec::builder(expected_schema.clone())
1075            .with_spec_id(0)
1076            .build()
1077            .unwrap();
1078
1079        assert_eq!(
1080            metadata
1081                .partition_specs_iter()
1082                .map(|p| p.as_ref())
1083                .collect_vec(),
1084            vec![&expected_partition_spec]
1085        );
1086
1087        let expected_sorted_order = SortOrder::builder()
1088            .with_order_id(0)
1089            .with_fields(vec![])
1090            .build(expected_schema)
1091            .unwrap();
1092
1093        assert_eq!(
1094            metadata
1095                .sort_orders_iter()
1096                .map(|s| s.as_ref())
1097                .collect_vec(),
1098            vec![&expected_sorted_order]
1099        );
1100
1101        assert_eq!(metadata.properties(), &HashMap::new());
1102
1103        assert!(!table.readonly());
1104    }
1105
1106    fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
1107        let actual = table.metadata_location().unwrap().to_string();
1108        let regex = Regex::new(regex_str).unwrap();
1109        assert!(regex.is_match(&actual))
1110    }
1111
1112    #[tokio::test]
1113    async fn test_initialized() {
1114        let warehouse_loc = temp_path();
1115        new_sql_catalog(warehouse_loc.clone()).await;
1116        // catalog instantiation should not fail even if tables exist
1117        new_sql_catalog(warehouse_loc.clone()).await;
1118        new_sql_catalog(warehouse_loc.clone()).await;
1119    }
1120
1121    #[tokio::test]
1122    async fn test_builder_method() {
1123        let sql_lite_uri = format!("sqlite:{}", temp_path());
1124        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1125        let warehouse_location = temp_path();
1126
1127        let catalog = SqlCatalogBuilder::default()
1128            .uri(sql_lite_uri.to_string())
1129            .warehouse_location(warehouse_location.clone())
1130            .sql_bind_style(SqlBindStyle::QMark)
1131            .load("iceberg", HashMap::default())
1132            .await;
1133        assert!(catalog.is_ok());
1134
1135        let catalog = catalog.unwrap();
1136        assert!(catalog.warehouse_location == warehouse_location);
1137        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1138    }
1139
1140    /// Overwriting an sqlite database with a non-existent path causes
1141    /// catalog generation to fail
1142    #[tokio::test]
1143    async fn test_builder_props_non_existent_path_fails() {
1144        let sql_lite_uri = format!("sqlite:{}", temp_path());
1145        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1146        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1147        let warehouse_location = temp_path();
1148
1149        let catalog = SqlCatalogBuilder::default()
1150            .uri(sql_lite_uri)
1151            .warehouse_location(warehouse_location)
1152            .load(
1153                "iceberg",
1154                HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)]),
1155            )
1156            .await;
1157        assert!(catalog.is_err());
1158    }
1159
1160    /// Even when an invalid URI is specified in a builder method,
1161    /// it can be successfully overridden with a valid URI in props
1162    /// for catalog generation to succeed.
1163    #[tokio::test]
1164    async fn test_builder_props_set_valid_uri() {
1165        let sql_lite_uri = format!("sqlite:{}", temp_path());
1166        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1167        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1168        let warehouse_location = temp_path();
1169
1170        let catalog = SqlCatalogBuilder::default()
1171            .uri(sql_lite_uri2)
1172            .warehouse_location(warehouse_location)
1173            .load(
1174                "iceberg",
1175                HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone())]),
1176            )
1177            .await;
1178        assert!(catalog.is_ok());
1179    }
1180
1181    /// values assigned via props take precedence
1182    #[tokio::test]
1183    async fn test_builder_props_take_precedence() {
1184        let sql_lite_uri = format!("sqlite:{}", temp_path());
1185        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1186        let warehouse_location = temp_path();
1187        let warehouse_location2 = temp_path();
1188
1189        let catalog = SqlCatalogBuilder::default()
1190            .warehouse_location(warehouse_location2)
1191            .sql_bind_style(SqlBindStyle::DollarNumeric)
1192            .load(
1193                "iceberg",
1194                HashMap::from_iter([
1195                    (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1196                    (
1197                        SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1198                        warehouse_location.clone(),
1199                    ),
1200                    (
1201                        SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1202                        SqlBindStyle::QMark.to_string(),
1203                    ),
1204                ]),
1205            )
1206            .await;
1207
1208        assert!(catalog.is_ok());
1209
1210        let catalog = catalog.unwrap();
1211        assert!(catalog.warehouse_location == warehouse_location);
1212        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1213    }
1214
1215    /// values assigned via props take precedence
1216    #[tokio::test]
1217    async fn test_builder_props_take_precedence_props() {
1218        let sql_lite_uri = format!("sqlite:{}", temp_path());
1219        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1220        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1221        let warehouse_location = temp_path();
1222        let warehouse_location2 = temp_path();
1223
1224        let props = HashMap::from_iter([
1225            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1226            (
1227                SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1228                warehouse_location.clone(),
1229            ),
1230            (
1231                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1232                SqlBindStyle::QMark.to_string(),
1233            ),
1234        ]);
1235        let props2 = HashMap::from_iter([
1236            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2.clone()),
1237            (
1238                SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1239                warehouse_location2.clone(),
1240            ),
1241            (
1242                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1243                SqlBindStyle::DollarNumeric.to_string(),
1244            ),
1245        ]);
1246
1247        let catalog = SqlCatalogBuilder::default()
1248            .props(props2)
1249            .load("iceberg", props)
1250            .await;
1251
1252        assert!(catalog.is_ok());
1253
1254        let catalog = catalog.unwrap();
1255        assert!(catalog.warehouse_location == warehouse_location);
1256        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1257    }
1258
1259    /// values assigned via props take precedence
1260    #[tokio::test]
1261    async fn test_builder_props_take_precedence_prop() {
1262        let sql_lite_uri = format!("sqlite:{}", temp_path());
1263        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1264        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1265        let warehouse_location = temp_path();
1266        let warehouse_location2 = temp_path();
1267
1268        let props = HashMap::from_iter([
1269            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1270            (
1271                SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1272                warehouse_location.clone(),
1273            ),
1274            (
1275                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1276                SqlBindStyle::QMark.to_string(),
1277            ),
1278        ]);
1279
1280        let catalog = SqlCatalogBuilder::default()
1281            .prop(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)
1282            .prop(SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location2)
1283            .prop(
1284                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1285                SqlBindStyle::DollarNumeric.to_string(),
1286            )
1287            .load("iceberg", props)
1288            .await;
1289
1290        assert!(catalog.is_ok());
1291
1292        let catalog = catalog.unwrap();
1293        assert!(catalog.warehouse_location == warehouse_location);
1294        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1295    }
1296
1297    /// invalid value for `SqlBindStyle` causes catalog creation to fail
1298    #[tokio::test]
1299    async fn test_builder_props_invalid_bind_style_fails() {
1300        let sql_lite_uri = format!("sqlite:{}", temp_path());
1301        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1302        let warehouse_location = temp_path();
1303
1304        let catalog = SqlCatalogBuilder::default()
1305            .load(
1306                "iceberg",
1307                HashMap::from_iter([
1308                    (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1309                    (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1310                    (SQL_CATALOG_PROP_BIND_STYLE.to_string(), "AAA".to_string()),
1311                ]),
1312            )
1313            .await;
1314
1315        assert!(catalog.is_err());
1316    }
1317
1318    #[tokio::test]
1319    async fn test_list_namespaces_returns_empty_vector() {
1320        let warehouse_loc = temp_path();
1321        let catalog = new_sql_catalog(warehouse_loc).await;
1322
1323        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
1324    }
1325
1326    #[tokio::test]
1327    async fn test_list_namespaces_returns_multiple_namespaces() {
1328        let warehouse_loc = temp_path();
1329        let catalog = new_sql_catalog(warehouse_loc).await;
1330        let namespace_ident_1 = NamespaceIdent::new("a".into());
1331        let namespace_ident_2 = NamespaceIdent::new("b".into());
1332        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1333
1334        assert_eq!(
1335            to_set(catalog.list_namespaces(None).await.unwrap()),
1336            to_set(vec![namespace_ident_1, namespace_ident_2])
1337        );
1338    }
1339
1340    #[tokio::test]
1341    async fn test_list_namespaces_returns_only_top_level_namespaces() {
1342        let warehouse_loc = temp_path();
1343        let catalog = new_sql_catalog(warehouse_loc).await;
1344        let namespace_ident_1 = NamespaceIdent::new("a".into());
1345        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1346        let namespace_ident_3 = NamespaceIdent::new("b".into());
1347        create_namespaces(&catalog, &vec![
1348            &namespace_ident_1,
1349            &namespace_ident_2,
1350            &namespace_ident_3,
1351        ])
1352        .await;
1353
1354        assert_eq!(
1355            to_set(catalog.list_namespaces(None).await.unwrap()),
1356            to_set(vec![namespace_ident_1, namespace_ident_3])
1357        );
1358    }
1359
1360    #[tokio::test]
1361    async fn test_list_namespaces_returns_no_namespaces_under_parent() {
1362        let warehouse_loc = temp_path();
1363        let catalog = new_sql_catalog(warehouse_loc).await;
1364        let namespace_ident_1 = NamespaceIdent::new("a".into());
1365        let namespace_ident_2 = NamespaceIdent::new("b".into());
1366        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1367
1368        assert_eq!(
1369            catalog
1370                .list_namespaces(Some(&namespace_ident_1))
1371                .await
1372                .unwrap(),
1373            vec![]
1374        );
1375    }
1376
1377    #[tokio::test]
1378    async fn test_list_namespaces_returns_namespace_under_parent() {
1379        let warehouse_loc = temp_path();
1380        let catalog = new_sql_catalog(warehouse_loc).await;
1381        let namespace_ident_1 = NamespaceIdent::new("a".into());
1382        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1383        let namespace_ident_3 = NamespaceIdent::new("c".into());
1384        create_namespaces(&catalog, &vec![
1385            &namespace_ident_1,
1386            &namespace_ident_2,
1387            &namespace_ident_3,
1388        ])
1389        .await;
1390
1391        assert_eq!(
1392            to_set(catalog.list_namespaces(None).await.unwrap()),
1393            to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
1394        );
1395
1396        assert_eq!(
1397            catalog
1398                .list_namespaces(Some(&namespace_ident_1))
1399                .await
1400                .unwrap(),
1401            vec![NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()]
1402        );
1403    }
1404
1405    #[tokio::test]
1406    async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
1407        let warehouse_loc = temp_path();
1408        let catalog = new_sql_catalog(warehouse_loc).await;
1409        let namespace_ident_1 = NamespaceIdent::new("a".to_string());
1410        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
1411        let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1412        let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
1413        let namespace_ident_5 = NamespaceIdent::new("b".into());
1414        create_namespaces(&catalog, &vec![
1415            &namespace_ident_1,
1416            &namespace_ident_2,
1417            &namespace_ident_3,
1418            &namespace_ident_4,
1419            &namespace_ident_5,
1420        ])
1421        .await;
1422
1423        assert_eq!(
1424            to_set(
1425                catalog
1426                    .list_namespaces(Some(&namespace_ident_1))
1427                    .await
1428                    .unwrap()
1429            ),
1430            to_set(vec![
1431                NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(),
1432                NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(),
1433                NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(),
1434            ])
1435        );
1436    }
1437
1438    #[tokio::test]
1439    async fn test_namespace_exists_returns_false() {
1440        let warehouse_loc = temp_path();
1441        let catalog = new_sql_catalog(warehouse_loc).await;
1442        let namespace_ident = NamespaceIdent::new("a".into());
1443        create_namespace(&catalog, &namespace_ident).await;
1444
1445        assert!(
1446            !catalog
1447                .namespace_exists(&NamespaceIdent::new("b".into()))
1448                .await
1449                .unwrap()
1450        );
1451    }
1452
1453    #[tokio::test]
1454    async fn test_namespace_exists_returns_true() {
1455        let warehouse_loc = temp_path();
1456        let catalog = new_sql_catalog(warehouse_loc).await;
1457        let namespace_ident = NamespaceIdent::new("a".into());
1458        create_namespace(&catalog, &namespace_ident).await;
1459
1460        assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
1461    }
1462
1463    #[tokio::test]
1464    async fn test_create_namespace_with_properties() {
1465        let warehouse_loc = temp_path();
1466        let catalog = new_sql_catalog(warehouse_loc).await;
1467        let namespace_ident = NamespaceIdent::new("abc".into());
1468
1469        let mut properties = default_properties();
1470        properties.insert("k".into(), "v".into());
1471
1472        assert_eq!(
1473            catalog
1474                .create_namespace(&namespace_ident, properties.clone())
1475                .await
1476                .unwrap(),
1477            Namespace::with_properties(namespace_ident.clone(), properties.clone())
1478        );
1479
1480        assert_eq!(
1481            catalog.get_namespace(&namespace_ident).await.unwrap(),
1482            Namespace::with_properties(namespace_ident, properties)
1483        );
1484    }
1485
1486    #[tokio::test]
1487    async fn test_create_namespace_throws_error_if_namespace_already_exists() {
1488        let warehouse_loc = temp_path();
1489        let catalog = new_sql_catalog(warehouse_loc).await;
1490        let namespace_ident = NamespaceIdent::new("a".into());
1491        create_namespace(&catalog, &namespace_ident).await;
1492
1493        assert_eq!(
1494            catalog
1495                .create_namespace(&namespace_ident, HashMap::new())
1496                .await
1497                .unwrap_err()
1498                .to_string(),
1499            format!(
1500                "Unexpected => Namespace {:?} already exists",
1501                &namespace_ident
1502            )
1503        );
1504
1505        assert_eq!(
1506            catalog.get_namespace(&namespace_ident).await.unwrap(),
1507            Namespace::with_properties(namespace_ident, default_properties())
1508        );
1509    }
1510
1511    #[tokio::test]
1512    async fn test_create_nested_namespace() {
1513        let warehouse_loc = temp_path();
1514        let catalog = new_sql_catalog(warehouse_loc).await;
1515        let parent_namespace_ident = NamespaceIdent::new("a".into());
1516        create_namespace(&catalog, &parent_namespace_ident).await;
1517
1518        let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1519
1520        assert_eq!(
1521            catalog
1522                .create_namespace(&child_namespace_ident, HashMap::new())
1523                .await
1524                .unwrap(),
1525            Namespace::new(child_namespace_ident.clone())
1526        );
1527
1528        assert_eq!(
1529            catalog.get_namespace(&child_namespace_ident).await.unwrap(),
1530            Namespace::with_properties(child_namespace_ident, default_properties())
1531        );
1532    }
1533
1534    #[tokio::test]
1535    async fn test_create_deeply_nested_namespace() {
1536        let warehouse_loc = temp_path();
1537        let catalog = new_sql_catalog(warehouse_loc).await;
1538        let namespace_ident_a = NamespaceIdent::new("a".into());
1539        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1540        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1541
1542        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1543
1544        assert_eq!(
1545            catalog
1546                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
1547                .await
1548                .unwrap(),
1549            Namespace::new(namespace_ident_a_b_c.clone())
1550        );
1551
1552        assert_eq!(
1553            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
1554            Namespace::with_properties(namespace_ident_a_b_c, default_properties())
1555        );
1556    }
1557
1558    #[tokio::test]
1559    async fn test_update_namespace_noop() {
1560        let warehouse_loc = temp_path();
1561        let catalog = new_sql_catalog(warehouse_loc).await;
1562        let namespace_ident = NamespaceIdent::new("a".into());
1563        create_namespace(&catalog, &namespace_ident).await;
1564
1565        catalog
1566            .update_namespace(&namespace_ident, HashMap::new())
1567            .await
1568            .unwrap();
1569
1570        assert_eq!(
1571            *catalog
1572                .get_namespace(&namespace_ident)
1573                .await
1574                .unwrap()
1575                .properties(),
1576            HashMap::from_iter([("exists".to_string(), "true".to_string())])
1577        )
1578    }
1579
1580    #[tokio::test]
1581    async fn test_update_namespace() {
1582        let warehouse_loc = temp_path();
1583        let catalog = new_sql_catalog(warehouse_loc).await;
1584        let namespace_ident = NamespaceIdent::new("a".into());
1585        create_namespace(&catalog, &namespace_ident).await;
1586
1587        let mut props = HashMap::from_iter([
1588            ("prop1".to_string(), "val1".to_string()),
1589            ("prop2".into(), "val2".into()),
1590        ]);
1591
1592        catalog
1593            .update_namespace(&namespace_ident, props.clone())
1594            .await
1595            .unwrap();
1596
1597        props.insert("exists".into(), "true".into());
1598
1599        assert_eq!(
1600            *catalog
1601                .get_namespace(&namespace_ident)
1602                .await
1603                .unwrap()
1604                .properties(),
1605            props
1606        )
1607    }
1608
1609    #[tokio::test]
1610    async fn test_update_nested_namespace() {
1611        let warehouse_loc = temp_path();
1612        let catalog = new_sql_catalog(warehouse_loc).await;
1613        let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1614        create_namespace(&catalog, &namespace_ident).await;
1615
1616        let mut props = HashMap::from_iter([
1617            ("prop1".to_string(), "val1".to_string()),
1618            ("prop2".into(), "val2".into()),
1619        ]);
1620
1621        catalog
1622            .update_namespace(&namespace_ident, props.clone())
1623            .await
1624            .unwrap();
1625
1626        props.insert("exists".into(), "true".into());
1627
1628        assert_eq!(
1629            *catalog
1630                .get_namespace(&namespace_ident)
1631                .await
1632                .unwrap()
1633                .properties(),
1634            props
1635        )
1636    }
1637
1638    #[tokio::test]
1639    async fn test_update_namespace_errors_if_namespace_doesnt_exist() {
1640        let warehouse_loc = temp_path();
1641        let catalog = new_sql_catalog(warehouse_loc).await;
1642        let namespace_ident = NamespaceIdent::new("a".into());
1643
1644        let props = HashMap::from_iter([
1645            ("prop1".to_string(), "val1".to_string()),
1646            ("prop2".into(), "val2".into()),
1647        ]);
1648
1649        let err = catalog
1650            .update_namespace(&namespace_ident, props)
1651            .await
1652            .unwrap_err();
1653
1654        assert_eq!(
1655            err.message(),
1656            format!("No such namespace: {namespace_ident:?}")
1657        );
1658    }
1659
1660    #[tokio::test]
1661    async fn test_update_namespace_errors_if_nested_namespace_doesnt_exist() {
1662        let warehouse_loc = temp_path();
1663        let catalog = new_sql_catalog(warehouse_loc).await;
1664        let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1665
1666        let props = HashMap::from_iter([
1667            ("prop1".to_string(), "val1".to_string()),
1668            ("prop2".into(), "val2".into()),
1669        ]);
1670
1671        let err = catalog
1672            .update_namespace(&namespace_ident, props)
1673            .await
1674            .unwrap_err();
1675
1676        assert_eq!(
1677            err.message(),
1678            format!("No such namespace: {namespace_ident:?}")
1679        );
1680    }
1681
1682    #[tokio::test]
1683    async fn test_drop_namespace() {
1684        let warehouse_loc = temp_path();
1685        let catalog = new_sql_catalog(warehouse_loc).await;
1686        let namespace_ident = NamespaceIdent::new("abc".into());
1687        create_namespace(&catalog, &namespace_ident).await;
1688
1689        catalog.drop_namespace(&namespace_ident).await.unwrap();
1690
1691        assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
1692    }
1693
1694    #[tokio::test]
1695    async fn test_drop_nested_namespace() {
1696        let warehouse_loc = temp_path();
1697        let catalog = new_sql_catalog(warehouse_loc).await;
1698        let namespace_ident_a = NamespaceIdent::new("a".into());
1699        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1700        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1701
1702        catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1703
1704        assert!(
1705            !catalog
1706                .namespace_exists(&namespace_ident_a_b)
1707                .await
1708                .unwrap()
1709        );
1710
1711        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1712    }
1713
1714    #[tokio::test]
1715    async fn test_drop_deeply_nested_namespace() {
1716        let warehouse_loc = temp_path();
1717        let catalog = new_sql_catalog(warehouse_loc).await;
1718        let namespace_ident_a = NamespaceIdent::new("a".into());
1719        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1720        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1721        create_namespaces(&catalog, &vec![
1722            &namespace_ident_a,
1723            &namespace_ident_a_b,
1724            &namespace_ident_a_b_c,
1725        ])
1726        .await;
1727
1728        catalog
1729            .drop_namespace(&namespace_ident_a_b_c)
1730            .await
1731            .unwrap();
1732
1733        assert!(
1734            !catalog
1735                .namespace_exists(&namespace_ident_a_b_c)
1736                .await
1737                .unwrap()
1738        );
1739
1740        assert!(
1741            catalog
1742                .namespace_exists(&namespace_ident_a_b)
1743                .await
1744                .unwrap()
1745        );
1746
1747        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1748    }
1749
1750    #[tokio::test]
1751    async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1752        let warehouse_loc = temp_path();
1753        let catalog = new_sql_catalog(warehouse_loc).await;
1754
1755        let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1756        assert_eq!(
1757            catalog
1758                .drop_namespace(&non_existent_namespace_ident)
1759                .await
1760                .unwrap_err()
1761                .to_string(),
1762            format!("Unexpected => No such namespace: {non_existent_namespace_ident:?}")
1763        )
1764    }
1765
1766    #[tokio::test]
1767    async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1768        let warehouse_loc = temp_path();
1769        let catalog = new_sql_catalog(warehouse_loc).await;
1770        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1771
1772        let non_existent_namespace_ident =
1773            NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1774        assert_eq!(
1775            catalog
1776                .drop_namespace(&non_existent_namespace_ident)
1777                .await
1778                .unwrap_err()
1779                .to_string(),
1780            format!("Unexpected => No such namespace: {non_existent_namespace_ident:?}")
1781        )
1782    }
1783
1784    #[tokio::test]
1785    async fn test_dropping_a_namespace_does_not_drop_namespaces_nested_under_that_one() {
1786        let warehouse_loc = temp_path();
1787        let catalog = new_sql_catalog(warehouse_loc).await;
1788        let namespace_ident_a = NamespaceIdent::new("a".into());
1789        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1790        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1791
1792        catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1793
1794        assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1795
1796        assert!(
1797            catalog
1798                .namespace_exists(&namespace_ident_a_b)
1799                .await
1800                .unwrap()
1801        );
1802    }
1803
1804    #[tokio::test]
1805    async fn test_list_tables_returns_empty_vector() {
1806        let warehouse_loc = temp_path();
1807        let catalog = new_sql_catalog(warehouse_loc).await;
1808        let namespace_ident = NamespaceIdent::new("a".into());
1809        create_namespace(&catalog, &namespace_ident).await;
1810
1811        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1812    }
1813
1814    #[tokio::test]
1815    async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1816        let warehouse_loc = temp_path();
1817        let catalog = new_sql_catalog(warehouse_loc).await;
1818
1819        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1820
1821        assert_eq!(
1822            catalog
1823                .list_tables(&non_existent_namespace_ident)
1824                .await
1825                .unwrap_err()
1826                .to_string(),
1827            format!("Unexpected => No such namespace: {non_existent_namespace_ident:?}"),
1828        );
1829    }
1830
1831    #[tokio::test]
1832    async fn test_create_table_with_location() {
1833        let warehouse_loc = temp_path();
1834        let catalog = new_sql_catalog(warehouse_loc.clone()).await;
1835        let namespace_ident = NamespaceIdent::new("a".into());
1836        create_namespace(&catalog, &namespace_ident).await;
1837
1838        let table_name = "abc";
1839        let location = warehouse_loc.clone();
1840        let table_creation = TableCreation::builder()
1841            .name(table_name.into())
1842            .location(location.clone())
1843            .schema(simple_table_schema())
1844            .build();
1845
1846        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1847
1848        assert_table_eq(
1849            &catalog
1850                .create_table(&namespace_ident, table_creation)
1851                .await
1852                .unwrap(),
1853            &expected_table_ident,
1854            &simple_table_schema(),
1855        );
1856
1857        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1858
1859        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1860
1861        assert!(
1862            table
1863                .metadata_location()
1864                .unwrap()
1865                .to_string()
1866                .starts_with(&location)
1867        )
1868    }
1869
1870    #[tokio::test]
1871    async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1872        let warehouse_loc = temp_path();
1873        let catalog = new_sql_catalog(warehouse_loc).await;
1874
1875        let namespace_ident = NamespaceIdent::new("a".into());
1876        let mut namespace_properties = HashMap::new();
1877        let namespace_location = temp_path();
1878        namespace_properties.insert(
1879            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1880            namespace_location.to_string(),
1881        );
1882        catalog
1883            .create_namespace(&namespace_ident, namespace_properties)
1884            .await
1885            .unwrap();
1886
1887        let table_name = "tbl1";
1888        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1889        let expected_table_metadata_location_regex =
1890            format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1891
1892        let table = catalog
1893            .create_table(
1894                &namespace_ident,
1895                TableCreation::builder()
1896                    .name(table_name.into())
1897                    .schema(simple_table_schema())
1898                    // no location specified for table
1899                    .build(),
1900            )
1901            .await
1902            .unwrap();
1903        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1904        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1905
1906        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1907        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1908        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1909    }
1910
1911    #[tokio::test]
1912    async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1913     {
1914        let warehouse_loc = temp_path();
1915        let catalog = new_sql_catalog(warehouse_loc).await;
1916
1917        let namespace_ident = NamespaceIdent::new("a".into());
1918        let mut namespace_properties = HashMap::new();
1919        let namespace_location = temp_path();
1920        namespace_properties.insert(
1921            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1922            namespace_location.to_string(),
1923        );
1924        catalog
1925            .create_namespace(&namespace_ident, namespace_properties)
1926            .await
1927            .unwrap();
1928
1929        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1930        let mut nested_namespace_properties = HashMap::new();
1931        let nested_namespace_location = temp_path();
1932        nested_namespace_properties.insert(
1933            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1934            nested_namespace_location.to_string(),
1935        );
1936        catalog
1937            .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1938            .await
1939            .unwrap();
1940
1941        let table_name = "tbl1";
1942        let expected_table_ident =
1943            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1944        let expected_table_metadata_location_regex = format!(
1945            "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1946        );
1947
1948        let table = catalog
1949            .create_table(
1950                &nested_namespace_ident,
1951                TableCreation::builder()
1952                    .name(table_name.into())
1953                    .schema(simple_table_schema())
1954                    // no location specified for table
1955                    .build(),
1956            )
1957            .await
1958            .unwrap();
1959        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1960        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1961
1962        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1963        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1964        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1965    }
1966
1967    #[tokio::test]
1968    async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1969     {
1970        let warehouse_loc = temp_path();
1971        let catalog = new_sql_catalog(warehouse_loc.clone()).await;
1972
1973        let namespace_ident = NamespaceIdent::new("a".into());
1974        // note: no location specified in namespace_properties
1975        let namespace_properties = HashMap::new();
1976        catalog
1977            .create_namespace(&namespace_ident, namespace_properties)
1978            .await
1979            .unwrap();
1980
1981        let table_name = "tbl1";
1982        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1983        let expected_table_metadata_location_regex =
1984            format!("^{warehouse_loc}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1985
1986        let table = catalog
1987            .create_table(
1988                &namespace_ident,
1989                TableCreation::builder()
1990                    .name(table_name.into())
1991                    .schema(simple_table_schema())
1992                    // no location specified for table
1993                    .build(),
1994            )
1995            .await
1996            .unwrap();
1997        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1998        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1999
2000        let table = catalog.load_table(&expected_table_ident).await.unwrap();
2001        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2002        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2003    }
2004
2005    #[tokio::test]
2006    async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
2007     {
2008        let warehouse_loc = temp_path();
2009        let catalog = new_sql_catalog(warehouse_loc.clone()).await;
2010
2011        let namespace_ident = NamespaceIdent::new("a".into());
2012        create_namespace(&catalog, &namespace_ident).await;
2013
2014        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
2015        create_namespace(&catalog, &nested_namespace_ident).await;
2016
2017        let table_name = "tbl1";
2018        let expected_table_ident =
2019            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
2020        let expected_table_metadata_location_regex =
2021            format!("^{warehouse_loc}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
2022
2023        let table = catalog
2024            .create_table(
2025                &nested_namespace_ident,
2026                TableCreation::builder()
2027                    .name(table_name.into())
2028                    .schema(simple_table_schema())
2029                    // no location specified for table
2030                    .build(),
2031            )
2032            .await
2033            .unwrap();
2034        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2035        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2036
2037        let table = catalog.load_table(&expected_table_ident).await.unwrap();
2038        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2039        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2040    }
2041
2042    #[tokio::test]
2043    async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
2044        let warehouse_loc = temp_path();
2045        let catalog = new_sql_catalog(warehouse_loc.clone()).await;
2046        let namespace_ident = NamespaceIdent::new("a".into());
2047        create_namespace(&catalog, &namespace_ident).await;
2048        let table_name = "tbl1";
2049        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2050        create_table(&catalog, &table_ident).await;
2051
2052        let tmp_dir = TempDir::new().unwrap();
2053        let location = tmp_dir.path().to_str().unwrap().to_string();
2054
2055        assert_eq!(
2056            catalog
2057                .create_table(
2058                    &namespace_ident,
2059                    TableCreation::builder()
2060                        .name(table_name.into())
2061                        .schema(simple_table_schema())
2062                        .location(location)
2063                        .build()
2064                )
2065                .await
2066                .unwrap_err()
2067                .to_string(),
2068            format!("Unexpected => Table {:?} already exists.", &table_ident)
2069        );
2070    }
2071
2072    #[tokio::test]
2073    async fn test_rename_table_in_same_namespace() {
2074        let warehouse_loc = temp_path();
2075        let catalog = new_sql_catalog(warehouse_loc).await;
2076        let namespace_ident = NamespaceIdent::new("n1".into());
2077        create_namespace(&catalog, &namespace_ident).await;
2078        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2079        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
2080        create_table(&catalog, &src_table_ident).await;
2081
2082        catalog
2083            .rename_table(&src_table_ident, &dst_table_ident)
2084            .await
2085            .unwrap();
2086
2087        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
2088            dst_table_ident
2089        ],);
2090    }
2091
2092    #[tokio::test]
2093    async fn test_rename_table_across_namespaces() {
2094        let warehouse_loc = temp_path();
2095        let catalog = new_sql_catalog(warehouse_loc).await;
2096        let src_namespace_ident = NamespaceIdent::new("a".into());
2097        let dst_namespace_ident = NamespaceIdent::new("b".into());
2098        create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
2099        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
2100        let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
2101        create_table(&catalog, &src_table_ident).await;
2102
2103        catalog
2104            .rename_table(&src_table_ident, &dst_table_ident)
2105            .await
2106            .unwrap();
2107
2108        assert_eq!(
2109            catalog.list_tables(&src_namespace_ident).await.unwrap(),
2110            vec![],
2111        );
2112
2113        assert_eq!(
2114            catalog.list_tables(&dst_namespace_ident).await.unwrap(),
2115            vec![dst_table_ident],
2116        );
2117    }
2118
2119    #[tokio::test]
2120    async fn test_rename_table_src_table_is_same_as_dst_table() {
2121        let warehouse_loc = temp_path();
2122        let catalog = new_sql_catalog(warehouse_loc).await;
2123        let namespace_ident = NamespaceIdent::new("n1".into());
2124        create_namespace(&catalog, &namespace_ident).await;
2125        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
2126        create_table(&catalog, &table_ident).await;
2127
2128        catalog
2129            .rename_table(&table_ident, &table_ident)
2130            .await
2131            .unwrap();
2132
2133        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
2134            table_ident
2135        ],);
2136    }
2137
2138    #[tokio::test]
2139    async fn test_rename_table_across_nested_namespaces() {
2140        let warehouse_loc = temp_path();
2141        let catalog = new_sql_catalog(warehouse_loc).await;
2142        let namespace_ident_a = NamespaceIdent::new("a".into());
2143        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
2144        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
2145        create_namespaces(&catalog, &vec![
2146            &namespace_ident_a,
2147            &namespace_ident_a_b,
2148            &namespace_ident_a_b_c,
2149        ])
2150        .await;
2151
2152        let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
2153        create_tables(&catalog, vec![&src_table_ident]).await;
2154
2155        let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
2156        catalog
2157            .rename_table(&src_table_ident, &dst_table_ident)
2158            .await
2159            .unwrap();
2160
2161        assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
2162
2163        assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
2164    }
2165
2166    #[tokio::test]
2167    async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
2168        let warehouse_loc = temp_path();
2169        let catalog = new_sql_catalog(warehouse_loc).await;
2170        let src_namespace_ident = NamespaceIdent::new("n1".into());
2171        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
2172        create_namespace(&catalog, &src_namespace_ident).await;
2173        create_table(&catalog, &src_table_ident).await;
2174
2175        let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
2176        let dst_table_ident =
2177            TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
2178        assert_eq!(
2179            catalog
2180                .rename_table(&src_table_ident, &dst_table_ident)
2181                .await
2182                .unwrap_err()
2183                .to_string(),
2184            format!("Unexpected => No such namespace: {non_existent_dst_namespace_ident:?}"),
2185        );
2186    }
2187
2188    #[tokio::test]
2189    async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
2190        let warehouse_loc = temp_path();
2191        let catalog = new_sql_catalog(warehouse_loc).await;
2192        let namespace_ident = NamespaceIdent::new("n1".into());
2193        create_namespace(&catalog, &namespace_ident).await;
2194        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2195        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
2196
2197        assert_eq!(
2198            catalog
2199                .rename_table(&src_table_ident, &dst_table_ident)
2200                .await
2201                .unwrap_err()
2202                .to_string(),
2203            format!("Unexpected => No such table: {src_table_ident:?}"),
2204        );
2205    }
2206
2207    #[tokio::test]
2208    async fn test_rename_table_throws_error_if_dst_table_already_exists() {
2209        let warehouse_loc = temp_path();
2210        let catalog = new_sql_catalog(warehouse_loc).await;
2211        let namespace_ident = NamespaceIdent::new("n1".into());
2212        create_namespace(&catalog, &namespace_ident).await;
2213        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2214        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
2215        create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
2216
2217        assert_eq!(
2218            catalog
2219                .rename_table(&src_table_ident, &dst_table_ident)
2220                .await
2221                .unwrap_err()
2222                .to_string(),
2223            format!("Unexpected => Table {:?} already exists.", &dst_table_ident),
2224        );
2225    }
2226
2227    #[tokio::test]
2228    async fn test_drop_table_throws_error_if_table_not_exist() {
2229        let warehouse_loc = temp_path();
2230        let catalog = new_sql_catalog(warehouse_loc.clone()).await;
2231        let namespace_ident = NamespaceIdent::new("a".into());
2232        let table_name = "tbl1";
2233        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2234        create_namespace(&catalog, &namespace_ident).await;
2235
2236        let err = catalog
2237            .drop_table(&table_ident)
2238            .await
2239            .unwrap_err()
2240            .to_string();
2241        assert_eq!(
2242            err,
2243            "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
2244        );
2245    }
2246
2247    #[tokio::test]
2248    async fn test_drop_table() {
2249        let warehouse_loc = temp_path();
2250        let catalog = new_sql_catalog(warehouse_loc.clone()).await;
2251        let namespace_ident = NamespaceIdent::new("a".into());
2252        let table_name = "tbl1";
2253        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2254        create_namespace(&catalog, &namespace_ident).await;
2255
2256        let location = warehouse_loc.clone();
2257        let table_creation = TableCreation::builder()
2258            .name(table_name.into())
2259            .location(location.clone())
2260            .schema(simple_table_schema())
2261            .build();
2262
2263        catalog
2264            .create_table(&namespace_ident, table_creation)
2265            .await
2266            .unwrap();
2267
2268        let table = catalog.load_table(&table_ident).await.unwrap();
2269        assert_table_eq(&table, &table_ident, &simple_table_schema());
2270
2271        catalog.drop_table(&table_ident).await.unwrap();
2272        let err = catalog
2273            .load_table(&table_ident)
2274            .await
2275            .unwrap_err()
2276            .to_string();
2277        assert_eq!(
2278            err,
2279            "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
2280        );
2281    }
2282
2283    #[tokio::test]
2284    async fn test_register_table_throws_error_if_table_with_same_name_already_exists() {
2285        let warehouse_loc = temp_path();
2286        let catalog = new_sql_catalog(warehouse_loc.clone()).await;
2287        let namespace_ident = NamespaceIdent::new("a".into());
2288        create_namespace(&catalog, &namespace_ident).await;
2289        let table_name = "tbl1";
2290        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2291        create_table(&catalog, &table_ident).await;
2292
2293        assert_eq!(
2294            catalog
2295                .register_table(&table_ident, warehouse_loc)
2296                .await
2297                .unwrap_err()
2298                .to_string(),
2299            format!("Unexpected => Table {:?} already exists.", &table_ident)
2300        );
2301    }
2302
2303    #[tokio::test]
2304    async fn test_register_table() {
2305        let warehouse_loc = temp_path();
2306        let catalog = new_sql_catalog(warehouse_loc.clone()).await;
2307        let namespace_ident = NamespaceIdent::new("a".into());
2308        create_namespace(&catalog, &namespace_ident).await;
2309
2310        let table_name = "abc";
2311        let location = warehouse_loc.clone();
2312        let table_creation = TableCreation::builder()
2313            .name(table_name.into())
2314            .location(location.clone())
2315            .schema(simple_table_schema())
2316            .build();
2317
2318        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2319        let expected_table = catalog
2320            .create_table(&namespace_ident, table_creation)
2321            .await
2322            .unwrap();
2323
2324        let metadata_location = expected_table
2325            .metadata_location()
2326            .expect("Expected metadata location to be set")
2327            .to_string();
2328
2329        assert_table_eq(&expected_table, &table_ident, &simple_table_schema());
2330
2331        let _ = catalog.drop_table(&table_ident).await;
2332
2333        let table = catalog
2334            .register_table(&table_ident, metadata_location.clone())
2335            .await
2336            .unwrap();
2337
2338        assert_eq!(table.identifier(), expected_table.identifier());
2339        assert_eq!(table.metadata_location(), Some(metadata_location.as_str()));
2340    }
2341
2342    #[tokio::test]
2343    async fn test_update_table() {
2344        let warehouse_loc = temp_path();
2345        let catalog = new_sql_catalog(warehouse_loc).await;
2346
2347        // Create a test namespace and table
2348        let namespace_ident = NamespaceIdent::new("ns1".into());
2349        create_namespace(&catalog, &namespace_ident).await;
2350        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2351        create_table(&catalog, &table_ident).await;
2352
2353        let table = catalog.load_table(&table_ident).await.unwrap();
2354
2355        // Store the original metadata location for comparison
2356        let original_metadata_location = table.metadata_location().unwrap().to_string();
2357
2358        // Create a transaction to update the table
2359        let tx = Transaction::new(&table);
2360        let tx = tx
2361            .update_table_properties()
2362            .set("test_property".to_string(), "test_value".to_string())
2363            .apply(tx)
2364            .unwrap();
2365
2366        // Commit the transaction to the catalog
2367        let updated_table = tx.commit(&catalog).await.unwrap();
2368
2369        // Verify the update was successful
2370        assert_eq!(
2371            updated_table.metadata().properties().get("test_property"),
2372            Some(&"test_value".to_string())
2373        );
2374        // Verify the metadata location has been updated
2375        assert_ne!(
2376            updated_table.metadata_location().unwrap(),
2377            original_metadata_location.as_str()
2378        );
2379
2380        // Load the table again from the catalog to verify changes were persisted
2381        let reloaded = catalog.load_table(&table_ident).await.unwrap();
2382
2383        // Verify the reloaded table matches the updated table
2384        assert_eq!(
2385            reloaded.metadata().properties().get("test_property"),
2386            Some(&"test_value".to_string())
2387        );
2388        assert_eq!(
2389            reloaded.metadata_location(),
2390            updated_table.metadata_location()
2391        );
2392    }
2393}