iceberg_catalog_sql/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19use std::str::FromStr;
20use std::time::Duration;
21
22use async_trait::async_trait;
23use iceberg::io::FileIO;
24use iceberg::spec::{TableMetadata, TableMetadataBuilder};
25use iceberg::table::Table;
26use iceberg::{
27    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
28    TableCommit, TableCreation, TableIdent,
29};
30use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers};
31use sqlx::{Any, AnyPool, Row, Transaction};
32
33use crate::error::{
34    from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err,
35};
36
37/// catalog URI
38pub const SQL_CATALOG_PROP_URI: &str = "uri";
39/// catalog warehouse location
40pub const SQL_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
41/// catalog sql bind style
42pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style";
43
44static CATALOG_TABLE_NAME: &str = "iceberg_tables";
45static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
46static CATALOG_FIELD_TABLE_NAME: &str = "table_name";
47static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace";
48static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location";
49static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
50static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type";
51static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE";
52
53static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties";
54static NAMESPACE_FIELD_NAME: &str = "namespace";
55static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key";
56static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value";
57
58static NAMESPACE_LOCATION_PROPERTY_KEY: &str = "location";
59
60static MAX_CONNECTIONS: u32 = 10; // Default the SQL pool to 10 connections if not provided
61static IDLE_TIMEOUT: u64 = 10; // Default the maximum idle timeout per connection to 10s before it is closed
62static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each connection to enabled prior to returning
63
64/// Builder for [`SqlCatalog`]
65#[derive(Debug)]
66pub struct SqlCatalogBuilder(SqlCatalogConfig);
67
68impl Default for SqlCatalogBuilder {
69    fn default() -> Self {
70        Self(SqlCatalogConfig {
71            uri: "".to_string(),
72            name: "".to_string(),
73            warehouse_location: "".to_string(),
74            sql_bind_style: SqlBindStyle::DollarNumeric,
75            props: HashMap::new(),
76        })
77    }
78}
79
80impl SqlCatalogBuilder {
81    /// Configure the database URI
82    ///
83    /// If `SQL_CATALOG_PROP_URI` has a value set in `props` during `SqlCatalogBuilder::load`,
84    /// that value takes precedence, and the value specified by this method will not be used.
85    pub fn uri(mut self, uri: impl Into<String>) -> Self {
86        self.0.uri = uri.into();
87        self
88    }
89
90    /// Configure the warehouse location
91    ///
92    /// If `SQL_CATALOG_PROP_WAREHOUSE` has a value set in `props` during `SqlCatalogBuilder::load`,
93    /// that value takes precedence, and the value specified by this method will not be used.
94    pub fn warehouse_location(mut self, location: impl Into<String>) -> Self {
95        self.0.warehouse_location = location.into();
96        self
97    }
98
99    /// Configure the bound SQL Statement
100    ///
101    /// If `SQL_CATALOG_PROP_BIND_STYLE` has a value set in `props` during `SqlCatalogBuilder::load`,
102    /// that value takes precedence, and the value specified by this method will not be used.
103    pub fn sql_bind_style(mut self, sql_bind_style: SqlBindStyle) -> Self {
104        self.0.sql_bind_style = sql_bind_style;
105        self
106    }
107
108    /// Configure the any properties
109    ///
110    /// If the same key has values set in `props` during `SqlCatalogBuilder::load`,
111    /// those values will take precedence.
112    pub fn props(mut self, props: HashMap<String, String>) -> Self {
113        for (k, v) in props {
114            self.0.props.insert(k, v);
115        }
116        self
117    }
118
119    /// Set a new property on the property to be configured.
120    /// When multiple methods are executed with the same key,
121    /// the later-set value takes precedence.
122    ///
123    /// If the same key has values set in `props` during `SqlCatalogBuilder::load`,
124    /// those values will take precedence.
125    pub fn prop(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
126        self.0.props.insert(key.into(), value.into());
127        self
128    }
129}
130
131impl CatalogBuilder for SqlCatalogBuilder {
132    type C = SqlCatalog;
133
134    fn load(
135        mut self,
136        name: impl Into<String>,
137        props: HashMap<String, String>,
138    ) -> impl Future<Output = Result<Self::C>> + Send {
139        for (k, v) in props {
140            self.0.props.insert(k, v);
141        }
142
143        if let Some(uri) = self.0.props.remove(SQL_CATALOG_PROP_URI) {
144            self.0.uri = uri;
145        }
146        if let Some(warehouse_location) = self.0.props.remove(SQL_CATALOG_PROP_WAREHOUSE) {
147            self.0.warehouse_location = warehouse_location;
148        }
149
150        let name = name.into();
151
152        let mut valid_sql_bind_style = true;
153        if let Some(sql_bind_style) = self.0.props.remove(SQL_CATALOG_PROP_BIND_STYLE) {
154            if let Ok(sql_bind_style) = SqlBindStyle::from_str(&sql_bind_style) {
155                self.0.sql_bind_style = sql_bind_style;
156            } else {
157                valid_sql_bind_style = false;
158            }
159        }
160
161        let valid_name = !name.trim().is_empty();
162
163        async move {
164            if !valid_name {
165                Err(Error::new(
166                    ErrorKind::DataInvalid,
167                    "Catalog name cannot be empty",
168                ))
169            } else if !valid_sql_bind_style {
170                Err(Error::new(
171                    ErrorKind::DataInvalid,
172                    format!(
173                        "`{}` values are valid only if they're `{}` or `{}`",
174                        SQL_CATALOG_PROP_BIND_STYLE,
175                        SqlBindStyle::DollarNumeric,
176                        SqlBindStyle::QMark
177                    ),
178                ))
179            } else {
180                self.0.name = name;
181                SqlCatalog::new(self.0).await
182            }
183        }
184    }
185}
186
187/// A struct representing the SQL catalog configuration.
188///
189/// This struct contains various parameters that are used to configure a SQL catalog,
190/// such as the database URI, warehouse location, and file I/O settings.
191/// You are required to provide a `SqlBindStyle`, which determines how SQL statements will be bound to values in the catalog.
192/// The options available for this parameter include:
193/// - `SqlBindStyle::DollarNumeric`: Binds SQL statements using `$1`, `$2`, etc., as placeholders. This is for PostgreSQL databases.
194/// - `SqlBindStyle::QuestionMark`: Binds SQL statements using `?` as a placeholder. This is for MySQL and SQLite databases.
195#[derive(Debug)]
196struct SqlCatalogConfig {
197    uri: String,
198    name: String,
199    warehouse_location: String,
200    sql_bind_style: SqlBindStyle,
201    props: HashMap<String, String>,
202}
203
204#[derive(Debug)]
205/// Sql catalog implementation.
206pub struct SqlCatalog {
207    name: String,
208    connection: AnyPool,
209    warehouse_location: String,
210    fileio: FileIO,
211    sql_bind_style: SqlBindStyle,
212}
213
214#[derive(Debug, PartialEq, strum::EnumString, strum::Display)]
215/// Set the SQL parameter bind style to either $1..$N (Postgres style) or ? (SQLite/MySQL/MariaDB)
216pub enum SqlBindStyle {
217    /// DollarNumeric uses parameters of the form `$1..$N``, which is the Postgres style
218    DollarNumeric,
219    /// QMark uses parameters of the form `?` which is the style for other dialects (SQLite/MySQL/MariaDB)
220    QMark,
221}
222
223impl SqlCatalog {
224    /// Create new sql catalog instance
225    async fn new(config: SqlCatalogConfig) -> Result<Self> {
226        let fileio = FileIO::from_path(&config.warehouse_location)?.build()?;
227        install_default_drivers();
228        let max_connections: u32 = config
229            .props
230            .get("pool.max-connections")
231            .map(|v| v.parse().unwrap())
232            .unwrap_or(MAX_CONNECTIONS);
233        let idle_timeout: u64 = config
234            .props
235            .get("pool.idle-timeout")
236            .map(|v| v.parse().unwrap())
237            .unwrap_or(IDLE_TIMEOUT);
238        let test_before_acquire: bool = config
239            .props
240            .get("pool.test-before-acquire")
241            .map(|v| v.parse().unwrap())
242            .unwrap_or(TEST_BEFORE_ACQUIRE);
243
244        let pool = AnyPoolOptions::new()
245            .max_connections(max_connections)
246            .idle_timeout(Duration::from_secs(idle_timeout))
247            .test_before_acquire(test_before_acquire)
248            .connect(&config.uri)
249            .await
250            .map_err(from_sqlx_error)?;
251
252        sqlx::query(&format!(
253            "CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_NAME} (
254                {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
255                {CATALOG_FIELD_TABLE_NAMESPACE} VARCHAR(255) NOT NULL,
256                {CATALOG_FIELD_TABLE_NAME} VARCHAR(255) NOT NULL,
257                {CATALOG_FIELD_METADATA_LOCATION_PROP} VARCHAR(1000),
258                {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} VARCHAR(1000),
259                {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5),
260                PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}))"
261        ))
262        .execute(&pool)
263        .await
264        .map_err(from_sqlx_error)?;
265
266        sqlx::query(&format!(
267            "CREATE TABLE IF NOT EXISTS {NAMESPACE_TABLE_NAME} (
268                {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
269                {NAMESPACE_FIELD_NAME} VARCHAR(255) NOT NULL,
270                {NAMESPACE_FIELD_PROPERTY_KEY} VARCHAR(255),
271                {NAMESPACE_FIELD_PROPERTY_VALUE} VARCHAR(1000),
272                PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}))"
273        ))
274        .execute(&pool)
275        .await
276        .map_err(from_sqlx_error)?;
277
278        Ok(SqlCatalog {
279            name: config.name.to_owned(),
280            connection: pool,
281            warehouse_location: config.warehouse_location,
282            fileio,
283            sql_bind_style: config.sql_bind_style,
284        })
285    }
286
287    /// SQLX Any does not implement PostgresSQL bindings, so we have to do this.
288    fn replace_placeholders(&self, query: &str) -> String {
289        match self.sql_bind_style {
290            SqlBindStyle::DollarNumeric => {
291                let mut count = 1;
292                query
293                    .chars()
294                    .fold(String::with_capacity(query.len()), |mut acc, c| {
295                        if c == '?' {
296                            acc.push('$');
297                            acc.push_str(&count.to_string());
298                            count += 1;
299                        } else {
300                            acc.push(c);
301                        }
302                        acc
303                    })
304            }
305            _ => query.to_owned(),
306        }
307    }
308
309    /// Fetch a vec of AnyRows from a given query
310    async fn fetch_rows(&self, query: &str, args: Vec<Option<&str>>) -> Result<Vec<AnyRow>> {
311        let query_with_placeholders = self.replace_placeholders(query);
312
313        let mut sqlx_query = sqlx::query(&query_with_placeholders);
314        for arg in args {
315            sqlx_query = sqlx_query.bind(arg);
316        }
317
318        sqlx_query
319            .fetch_all(&self.connection)
320            .await
321            .map_err(from_sqlx_error)
322    }
323
324    /// Execute statements in a transaction, provided or not
325    async fn execute(
326        &self,
327        query: &str,
328        args: Vec<Option<&str>>,
329        transaction: Option<&mut Transaction<'_, Any>>,
330    ) -> Result<AnyQueryResult> {
331        let query_with_placeholders = self.replace_placeholders(query);
332
333        let mut sqlx_query = sqlx::query(&query_with_placeholders);
334        for arg in args {
335            sqlx_query = sqlx_query.bind(arg);
336        }
337
338        match transaction {
339            Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error),
340            None => {
341                let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
342                let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error);
343                let _ = tx.commit().await.map_err(from_sqlx_error);
344                result
345            }
346        }
347    }
348}
349
350#[async_trait]
351impl Catalog for SqlCatalog {
352    async fn list_namespaces(
353        &self,
354        parent: Option<&NamespaceIdent>,
355    ) -> Result<Vec<NamespaceIdent>> {
356        // UNION will remove duplicates.
357        let all_namespaces_stmt = format!(
358            "SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
359             FROM {CATALOG_TABLE_NAME}
360             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
361             UNION
362             SELECT {NAMESPACE_FIELD_NAME}
363             FROM {NAMESPACE_TABLE_NAME}
364             WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
365        );
366
367        let namespace_rows = self
368            .fetch_rows(&all_namespaces_stmt, vec![
369                Some(&self.name),
370                Some(&self.name),
371            ])
372            .await?;
373
374        let mut namespaces = HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len());
375
376        if let Some(parent) = parent {
377            if self.namespace_exists(parent).await? {
378                let parent_str = parent.join(".");
379
380                for row in namespace_rows.iter() {
381                    let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
382                    // if parent = a, then we only want to see a.b, a.c returned.
383                    if nsp != parent_str && nsp.starts_with(&parent_str) {
384                        namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?);
385                    }
386                }
387
388                Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
389            } else {
390                no_such_namespace_err(parent)
391            }
392        } else {
393            for row in namespace_rows.iter() {
394                let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
395                let mut levels = nsp.split(".").collect::<Vec<&str>>();
396                if !levels.is_empty() {
397                    let first_level = levels.drain(..1).collect::<Vec<&str>>();
398                    namespaces.insert(NamespaceIdent::from_strs(first_level)?);
399                }
400            }
401
402            Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
403        }
404    }
405
406    async fn create_namespace(
407        &self,
408        namespace: &NamespaceIdent,
409        properties: HashMap<String, String>,
410    ) -> Result<Namespace> {
411        let exists = self.namespace_exists(namespace).await?;
412
413        if exists {
414            return Err(Error::new(
415                iceberg::ErrorKind::Unexpected,
416                format!("Namespace {namespace:?} already exists"),
417            ));
418        }
419
420        let namespace_str = namespace.join(".");
421        let insert = format!(
422            "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
423             VALUES (?, ?, ?, ?)");
424        if !properties.is_empty() {
425            let mut insert_properties = properties.clone();
426            insert_properties.insert("exists".to_string(), "true".to_string());
427
428            let mut query_args = Vec::with_capacity(insert_properties.len() * 4);
429            let mut insert_stmt = insert.clone();
430            for (index, (key, value)) in insert_properties.iter().enumerate() {
431                query_args.extend_from_slice(&[
432                    Some(self.name.as_str()),
433                    Some(namespace_str.as_str()),
434                    Some(key.as_str()),
435                    Some(value.as_str()),
436                ]);
437                if index > 0 {
438                    insert_stmt.push_str(", (?, ?, ?, ?)");
439                }
440            }
441
442            self.execute(&insert_stmt, query_args, None).await?;
443
444            Ok(Namespace::with_properties(
445                namespace.clone(),
446                insert_properties,
447            ))
448        } else {
449            // set a default property of exists = true
450            self.execute(
451                &insert,
452                vec![
453                    Some(&self.name),
454                    Some(&namespace_str),
455                    Some("exists"),
456                    Some("true"),
457                ],
458                None,
459            )
460            .await?;
461            Ok(Namespace::with_properties(namespace.clone(), properties))
462        }
463    }
464
465    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
466        let exists = self.namespace_exists(namespace).await?;
467        if exists {
468            let namespace_props = self
469                .fetch_rows(
470                    &format!(
471                        "SELECT
472                            {NAMESPACE_FIELD_NAME},
473                            {NAMESPACE_FIELD_PROPERTY_KEY},
474                            {NAMESPACE_FIELD_PROPERTY_VALUE}
475                            FROM {NAMESPACE_TABLE_NAME}
476                            WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
477                            AND {NAMESPACE_FIELD_NAME} = ?"
478                    ),
479                    vec![Some(&self.name), Some(&namespace.join("."))],
480                )
481                .await?;
482
483            let mut properties = HashMap::with_capacity(namespace_props.len());
484
485            for row in namespace_props {
486                let key = row
487                    .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_KEY)
488                    .map_err(from_sqlx_error)?;
489                let value = row
490                    .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_VALUE)
491                    .map_err(from_sqlx_error)?;
492
493                properties.insert(key, value);
494            }
495
496            Ok(Namespace::with_properties(namespace.clone(), properties))
497        } else {
498            no_such_namespace_err(namespace)
499        }
500    }
501
502    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
503        let namespace_str = namespace.join(".");
504
505        let table_namespaces = self
506            .fetch_rows(
507                &format!(
508                    "SELECT 1 FROM {CATALOG_TABLE_NAME}
509                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
510                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
511                     LIMIT 1"
512                ),
513                vec![Some(&self.name), Some(&namespace_str)],
514            )
515            .await?;
516
517        if !table_namespaces.is_empty() {
518            Ok(true)
519        } else {
520            let namespaces = self
521                .fetch_rows(
522                    &format!(
523                        "SELECT 1 FROM {NAMESPACE_TABLE_NAME}
524                         WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
525                          AND {NAMESPACE_FIELD_NAME} = ?
526                         LIMIT 1"
527                    ),
528                    vec![Some(&self.name), Some(&namespace_str)],
529                )
530                .await?;
531            if !namespaces.is_empty() {
532                Ok(true)
533            } else {
534                Ok(false)
535            }
536        }
537    }
538
539    async fn update_namespace(
540        &self,
541        namespace: &NamespaceIdent,
542        properties: HashMap<String, String>,
543    ) -> Result<()> {
544        let exists = self.namespace_exists(namespace).await?;
545        if exists {
546            let existing_properties = self.get_namespace(namespace).await?.properties().clone();
547            let namespace_str = namespace.join(".");
548
549            let mut updates = vec![];
550            let mut inserts = vec![];
551
552            for (key, value) in properties.iter() {
553                if existing_properties.contains_key(key) {
554                    if existing_properties.get(key) != Some(value) {
555                        updates.push((key, value));
556                    }
557                } else {
558                    inserts.push((key, value));
559                }
560            }
561
562            let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
563            let update_stmt = format!(
564                "UPDATE {NAMESPACE_TABLE_NAME} SET {NAMESPACE_FIELD_PROPERTY_VALUE} = ?
565                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ? 
566                 AND {NAMESPACE_FIELD_NAME} = ?
567                 AND {NAMESPACE_FIELD_PROPERTY_KEY} = ?"
568            );
569
570            let insert_stmt = format!(
571                "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
572                 VALUES (?, ?, ?, ?)"
573            );
574
575            for (key, value) in updates {
576                self.execute(
577                    &update_stmt,
578                    vec![
579                        Some(value),
580                        Some(&self.name),
581                        Some(&namespace_str),
582                        Some(key),
583                    ],
584                    Some(&mut tx),
585                )
586                .await?;
587            }
588
589            for (key, value) in inserts {
590                self.execute(
591                    &insert_stmt,
592                    vec![
593                        Some(&self.name),
594                        Some(&namespace_str),
595                        Some(key),
596                        Some(value),
597                    ],
598                    Some(&mut tx),
599                )
600                .await?;
601            }
602
603            let _ = tx.commit().await.map_err(from_sqlx_error)?;
604
605            Ok(())
606        } else {
607            no_such_namespace_err(namespace)
608        }
609    }
610
611    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
612        let exists = self.namespace_exists(namespace).await?;
613        if exists {
614            // if there are tables in the namespace, don't allow drop.
615            let tables = self.list_tables(namespace).await?;
616            if !tables.is_empty() {
617                return Err(Error::new(
618                    iceberg::ErrorKind::Unexpected,
619                    format!(
620                        "Namespace {:?} is not empty. {} tables exist.",
621                        namespace,
622                        tables.len()
623                    ),
624                ));
625            }
626
627            self.execute(
628                &format!(
629                    "DELETE FROM {NAMESPACE_TABLE_NAME}
630                     WHERE {NAMESPACE_FIELD_NAME} = ?
631                      AND {CATALOG_FIELD_CATALOG_NAME} = ?"
632                ),
633                vec![Some(&namespace.join(".")), Some(&self.name)],
634                None,
635            )
636            .await?;
637
638            Ok(())
639        } else {
640            no_such_namespace_err(namespace)
641        }
642    }
643
644    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
645        let exists = self.namespace_exists(namespace).await?;
646        if exists {
647            let rows = self
648                .fetch_rows(
649                    &format!(
650                        "SELECT {CATALOG_FIELD_TABLE_NAME},
651                                {CATALOG_FIELD_TABLE_NAMESPACE}
652                         FROM {CATALOG_TABLE_NAME}
653                         WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
654                          AND {CATALOG_FIELD_CATALOG_NAME} = ?
655                          AND (
656                                {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
657                                OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
658                          )",
659                    ),
660                    vec![Some(&namespace.join(".")), Some(&self.name)],
661                )
662                .await?;
663
664            let mut tables = HashSet::<TableIdent>::with_capacity(rows.len());
665
666            for row in rows.iter() {
667                let tbl = row
668                    .try_get::<String, _>(CATALOG_FIELD_TABLE_NAME)
669                    .map_err(from_sqlx_error)?;
670                let ns_strs = row
671                    .try_get::<String, _>(CATALOG_FIELD_TABLE_NAMESPACE)
672                    .map_err(from_sqlx_error)?;
673                let ns = NamespaceIdent::from_strs(ns_strs.split("."))?;
674                tables.insert(TableIdent::new(ns, tbl));
675            }
676
677            Ok(tables.into_iter().collect::<Vec<TableIdent>>())
678        } else {
679            no_such_namespace_err(namespace)
680        }
681    }
682
683    async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
684        let namespace = identifier.namespace().join(".");
685        let table_name = identifier.name();
686        let table_counts = self
687            .fetch_rows(
688                &format!(
689                    "SELECT 1
690                     FROM {CATALOG_TABLE_NAME}
691                     WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
692                      AND {CATALOG_FIELD_CATALOG_NAME} = ?
693                      AND {CATALOG_FIELD_TABLE_NAME} = ?
694                      AND (
695                        {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
696                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
697                      )"
698                ),
699                vec![Some(&namespace), Some(&self.name), Some(table_name)],
700            )
701            .await?;
702
703        if !table_counts.is_empty() {
704            Ok(true)
705        } else {
706            Ok(false)
707        }
708    }
709
710    async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
711        if !self.table_exists(identifier).await? {
712            return no_such_table_err(identifier);
713        }
714
715        self.execute(
716            &format!(
717                "DELETE FROM {CATALOG_TABLE_NAME}
718                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
719                  AND {CATALOG_FIELD_TABLE_NAME} = ?
720                  AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
721                  AND (
722                    {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
723                    OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
724                  )"
725            ),
726            vec![
727                Some(&self.name),
728                Some(identifier.name()),
729                Some(&identifier.namespace().join(".")),
730            ],
731            None,
732        )
733        .await?;
734
735        Ok(())
736    }
737
738    async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
739        if !self.table_exists(identifier).await? {
740            return no_such_table_err(identifier);
741        }
742
743        let rows = self
744            .fetch_rows(
745                &format!(
746                    "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP}
747                     FROM {CATALOG_TABLE_NAME}
748                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
749                      AND {CATALOG_FIELD_TABLE_NAME} = ?
750                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
751                      AND (
752                        {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
753                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
754                      )"
755                ),
756                vec![
757                    Some(&self.name),
758                    Some(identifier.name()),
759                    Some(&identifier.namespace().join(".")),
760                ],
761            )
762            .await?;
763
764        if rows.is_empty() {
765            return no_such_table_err(identifier);
766        }
767
768        let row = &rows[0];
769        let tbl_metadata_location = row
770            .try_get::<String, _>(CATALOG_FIELD_METADATA_LOCATION_PROP)
771            .map_err(from_sqlx_error)?;
772
773        let metadata = TableMetadata::read_from(&self.fileio, &tbl_metadata_location).await?;
774
775        Ok(Table::builder()
776            .file_io(self.fileio.clone())
777            .identifier(identifier.clone())
778            .metadata_location(tbl_metadata_location)
779            .metadata(metadata)
780            .build()?)
781    }
782
783    async fn create_table(
784        &self,
785        namespace: &NamespaceIdent,
786        creation: TableCreation,
787    ) -> Result<Table> {
788        if !self.namespace_exists(namespace).await? {
789            return no_such_namespace_err(namespace);
790        }
791
792        let tbl_name = creation.name.clone();
793        let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone());
794
795        if self.table_exists(&tbl_ident).await? {
796            return table_already_exists_err(&tbl_ident);
797        }
798
799        let (tbl_creation, location) = match creation.location.clone() {
800            Some(location) => (creation, location),
801            None => {
802                // fall back to namespace-specific location
803                // and then to warehouse location
804                let nsp_properties = self.get_namespace(namespace).await?.properties().clone();
805                let nsp_location = match nsp_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) {
806                    Some(location) => location.clone(),
807                    None => {
808                        format!(
809                            "{}/{}",
810                            self.warehouse_location.clone(),
811                            namespace.join("/")
812                        )
813                    }
814                };
815
816                let tbl_location = format!("{}/{}", nsp_location, tbl_ident.name());
817
818                (
819                    TableCreation {
820                        location: Some(tbl_location.clone()),
821                        ..creation
822                    },
823                    tbl_location,
824                )
825            }
826        };
827
828        let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
829            .build()?
830            .metadata;
831        let tbl_metadata_location =
832            MetadataLocation::new_with_table_location(location.clone()).to_string();
833
834        tbl_metadata
835            .write_to(&self.fileio, &tbl_metadata_location)
836            .await?;
837
838        self.execute(&format!(
839            "INSERT INTO {CATALOG_TABLE_NAME}
840             ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
841             VALUES (?, ?, ?, ?, ?)
842            "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
843
844        Ok(Table::builder()
845            .file_io(self.fileio.clone())
846            .metadata_location(tbl_metadata_location)
847            .identifier(tbl_ident)
848            .metadata(tbl_metadata)
849            .build()?)
850    }
851
852    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
853        if src == dest {
854            return Ok(());
855        }
856
857        if !self.table_exists(src).await? {
858            return no_such_table_err(src);
859        }
860
861        if !self.namespace_exists(dest.namespace()).await? {
862            return no_such_namespace_err(dest.namespace());
863        }
864
865        if self.table_exists(dest).await? {
866            return table_already_exists_err(dest);
867        }
868
869        self.execute(
870            &format!(
871                "UPDATE {CATALOG_TABLE_NAME}
872                 SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ?
873                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
874                  AND {CATALOG_FIELD_TABLE_NAME} = ?
875                  AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
876                  AND (
877                    {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
878                    OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
879                )"
880            ),
881            vec![
882                Some(dest.name()),
883                Some(&dest.namespace().join(".")),
884                Some(&self.name),
885                Some(src.name()),
886                Some(&src.namespace().join(".")),
887            ],
888            None,
889        )
890        .await?;
891
892        Ok(())
893    }
894
895    async fn register_table(
896        &self,
897        table_ident: &TableIdent,
898        metadata_location: String,
899    ) -> Result<Table> {
900        if self.table_exists(table_ident).await? {
901            return table_already_exists_err(table_ident);
902        }
903
904        let metadata = TableMetadata::read_from(&self.fileio, &metadata_location).await?;
905
906        let namespace = table_ident.namespace();
907        let tbl_name = table_ident.name().to_string();
908
909        self.execute(&format!(
910            "INSERT INTO {CATALOG_TABLE_NAME}
911             ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
912             VALUES (?, ?, ?, ?, ?)
913            "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name), Some(&metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
914
915        Ok(Table::builder()
916            .identifier(table_ident.clone())
917            .metadata_location(metadata_location)
918            .metadata(metadata)
919            .file_io(self.fileio.clone())
920            .build()?)
921    }
922
923    /// Updates an existing table within the SQL catalog.
924    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
925        let table_ident = commit.identifier().clone();
926        let current_table = self.load_table(&table_ident).await?;
927        let current_metadata_location = current_table.metadata_location_result()?.to_string();
928
929        let staged_table = commit.apply(current_table)?;
930        let staged_metadata_location = staged_table.metadata_location_result()?;
931
932        staged_table
933            .metadata()
934            .write_to(staged_table.file_io(), &staged_metadata_location)
935            .await?;
936
937        let update_result = self
938            .execute(
939                &format!(
940                    "UPDATE {CATALOG_TABLE_NAME}
941                     SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?, {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} = ?
942                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
943                      AND {CATALOG_FIELD_TABLE_NAME} = ?
944                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
945                      AND (
946                        {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
947                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
948                      )
949                      AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?"
950                ),
951                vec![
952                    Some(staged_metadata_location),
953                    Some(current_metadata_location.as_str()),
954                    Some(&self.name),
955                    Some(table_ident.name()),
956                    Some(&table_ident.namespace().join(".")),
957                    Some(current_metadata_location.as_str()),
958                ],
959                None,
960            )
961            .await?;
962
963        if update_result.rows_affected() == 0 {
964            return Err(Error::new(
965                ErrorKind::CatalogCommitConflicts,
966                format!("Commit conflicted for table: {table_ident}"),
967            )
968            .with_retryable(true));
969        }
970
971        Ok(staged_table)
972    }
973}
974
975#[cfg(test)]
976mod tests {
977    use std::collections::{HashMap, HashSet};
978    use std::hash::Hash;
979
980    use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
981    use iceberg::table::Table;
982    use iceberg::transaction::{ApplyTransactionAction, Transaction};
983    use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent};
984    use itertools::Itertools;
985    use regex::Regex;
986    use sqlx::migrate::MigrateDatabase;
987    use tempfile::TempDir;
988
989    use crate::catalog::{
990        NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, SQL_CATALOG_PROP_URI,
991        SQL_CATALOG_PROP_WAREHOUSE,
992    };
993    use crate::{SqlBindStyle, SqlCatalogBuilder};
994
995    const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
996
997    fn temp_path() -> String {
998        let temp_dir = TempDir::new().unwrap();
999        temp_dir.path().to_str().unwrap().to_string()
1000    }
1001
1002    fn to_set<T: std::cmp::Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
1003        HashSet::from_iter(vec)
1004    }
1005
1006    fn default_properties() -> HashMap<String, String> {
1007        HashMap::from([("exists".to_string(), "true".to_string())])
1008    }
1009
1010    /// Create a new SQLite catalog for testing. If name is not specified it defaults to "iceberg".
1011    async fn new_sql_catalog(
1012        warehouse_location: String,
1013        name: Option<impl ToString>,
1014    ) -> impl Catalog {
1015        let name = if let Some(name) = name {
1016            name.to_string()
1017        } else {
1018            "iceberg".to_string()
1019        };
1020        let sql_lite_uri = format!("sqlite:{}", temp_path());
1021        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1022
1023        let props = HashMap::from_iter([
1024            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.to_string()),
1025            (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1026            (
1027                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1028                SqlBindStyle::DollarNumeric.to_string(),
1029            ),
1030        ]);
1031        SqlCatalogBuilder::default()
1032            .load(&name, props)
1033            .await
1034            .unwrap()
1035    }
1036
1037    async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
1038        let _ = catalog
1039            .create_namespace(namespace_ident, HashMap::new())
1040            .await
1041            .unwrap();
1042    }
1043
1044    async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
1045        for namespace_ident in namespace_idents {
1046            let _ = create_namespace(catalog, namespace_ident).await;
1047        }
1048    }
1049
1050    fn simple_table_schema() -> Schema {
1051        Schema::builder()
1052            .with_fields(vec![
1053                NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
1054            ])
1055            .build()
1056            .unwrap()
1057    }
1058
1059    async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
1060        let _ = catalog
1061            .create_table(
1062                &table_ident.namespace,
1063                TableCreation::builder()
1064                    .name(table_ident.name().into())
1065                    .schema(simple_table_schema())
1066                    .location(temp_path())
1067                    .build(),
1068            )
1069            .await
1070            .unwrap();
1071    }
1072
1073    async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
1074        for table_ident in table_idents {
1075            create_table(catalog, table_ident).await;
1076        }
1077    }
1078
1079    fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
1080        assert_eq!(table.identifier(), expected_table_ident);
1081
1082        let metadata = table.metadata();
1083
1084        assert_eq!(metadata.current_schema().as_ref(), expected_schema);
1085
1086        let expected_partition_spec = PartitionSpec::builder(expected_schema.clone())
1087            .with_spec_id(0)
1088            .build()
1089            .unwrap();
1090
1091        assert_eq!(
1092            metadata
1093                .partition_specs_iter()
1094                .map(|p| p.as_ref())
1095                .collect_vec(),
1096            vec![&expected_partition_spec]
1097        );
1098
1099        let expected_sorted_order = SortOrder::builder()
1100            .with_order_id(0)
1101            .with_fields(vec![])
1102            .build(expected_schema)
1103            .unwrap();
1104
1105        assert_eq!(
1106            metadata
1107                .sort_orders_iter()
1108                .map(|s| s.as_ref())
1109                .collect_vec(),
1110            vec![&expected_sorted_order]
1111        );
1112
1113        assert_eq!(metadata.properties(), &HashMap::new());
1114
1115        assert!(!table.readonly());
1116    }
1117
1118    fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
1119        let actual = table.metadata_location().unwrap().to_string();
1120        let regex = Regex::new(regex_str).unwrap();
1121        assert!(regex.is_match(&actual))
1122    }
1123
1124    #[tokio::test]
1125    async fn test_initialized() {
1126        let warehouse_loc = temp_path();
1127        new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1128        // catalog instantiation should not fail even if tables exist
1129        new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1130        new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1131    }
1132
1133    #[tokio::test]
1134    async fn test_builder_method() {
1135        let sql_lite_uri = format!("sqlite:{}", temp_path());
1136        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1137        let warehouse_location = temp_path();
1138
1139        let catalog = SqlCatalogBuilder::default()
1140            .uri(sql_lite_uri.to_string())
1141            .warehouse_location(warehouse_location.clone())
1142            .sql_bind_style(SqlBindStyle::QMark)
1143            .load("iceberg", HashMap::default())
1144            .await;
1145        assert!(catalog.is_ok());
1146
1147        let catalog = catalog.unwrap();
1148        assert!(catalog.warehouse_location == warehouse_location);
1149        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1150    }
1151
1152    /// Overwriting an sqlite database with a non-existent path causes
1153    /// catalog generation to fail
1154    #[tokio::test]
1155    async fn test_builder_props_non_existent_path_fails() {
1156        let sql_lite_uri = format!("sqlite:{}", temp_path());
1157        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1158        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1159        let warehouse_location = temp_path();
1160
1161        let catalog = SqlCatalogBuilder::default()
1162            .uri(sql_lite_uri)
1163            .warehouse_location(warehouse_location)
1164            .load(
1165                "iceberg",
1166                HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)]),
1167            )
1168            .await;
1169        assert!(catalog.is_err());
1170    }
1171
1172    /// Even when an invalid URI is specified in a builder method,
1173    /// it can be successfully overridden with a valid URI in props
1174    /// for catalog generation to succeed.
1175    #[tokio::test]
1176    async fn test_builder_props_set_valid_uri() {
1177        let sql_lite_uri = format!("sqlite:{}", temp_path());
1178        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1179        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1180        let warehouse_location = temp_path();
1181
1182        let catalog = SqlCatalogBuilder::default()
1183            .uri(sql_lite_uri2)
1184            .warehouse_location(warehouse_location)
1185            .load(
1186                "iceberg",
1187                HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone())]),
1188            )
1189            .await;
1190        assert!(catalog.is_ok());
1191    }
1192
1193    /// values assigned via props take precedence
1194    #[tokio::test]
1195    async fn test_builder_props_take_precedence() {
1196        let sql_lite_uri = format!("sqlite:{}", temp_path());
1197        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1198        let warehouse_location = temp_path();
1199        let warehouse_location2 = temp_path();
1200
1201        let catalog = SqlCatalogBuilder::default()
1202            .warehouse_location(warehouse_location2)
1203            .sql_bind_style(SqlBindStyle::DollarNumeric)
1204            .load(
1205                "iceberg",
1206                HashMap::from_iter([
1207                    (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1208                    (
1209                        SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1210                        warehouse_location.clone(),
1211                    ),
1212                    (
1213                        SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1214                        SqlBindStyle::QMark.to_string(),
1215                    ),
1216                ]),
1217            )
1218            .await;
1219
1220        assert!(catalog.is_ok());
1221
1222        let catalog = catalog.unwrap();
1223        assert!(catalog.warehouse_location == warehouse_location);
1224        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1225    }
1226
1227    /// values assigned via props take precedence
1228    #[tokio::test]
1229    async fn test_builder_props_take_precedence_props() {
1230        let sql_lite_uri = format!("sqlite:{}", temp_path());
1231        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1232        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1233        let warehouse_location = temp_path();
1234        let warehouse_location2 = temp_path();
1235
1236        let props = HashMap::from_iter([
1237            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1238            (
1239                SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1240                warehouse_location.clone(),
1241            ),
1242            (
1243                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1244                SqlBindStyle::QMark.to_string(),
1245            ),
1246        ]);
1247        let props2 = HashMap::from_iter([
1248            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2.clone()),
1249            (
1250                SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1251                warehouse_location2.clone(),
1252            ),
1253            (
1254                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1255                SqlBindStyle::DollarNumeric.to_string(),
1256            ),
1257        ]);
1258
1259        let catalog = SqlCatalogBuilder::default()
1260            .props(props2)
1261            .load("iceberg", props)
1262            .await;
1263
1264        assert!(catalog.is_ok());
1265
1266        let catalog = catalog.unwrap();
1267        assert!(catalog.warehouse_location == warehouse_location);
1268        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1269    }
1270
1271    /// values assigned via props take precedence
1272    #[tokio::test]
1273    async fn test_builder_props_take_precedence_prop() {
1274        let sql_lite_uri = format!("sqlite:{}", temp_path());
1275        let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1276        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1277        let warehouse_location = temp_path();
1278        let warehouse_location2 = temp_path();
1279
1280        let props = HashMap::from_iter([
1281            (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1282            (
1283                SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1284                warehouse_location.clone(),
1285            ),
1286            (
1287                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1288                SqlBindStyle::QMark.to_string(),
1289            ),
1290        ]);
1291
1292        let catalog = SqlCatalogBuilder::default()
1293            .prop(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)
1294            .prop(SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location2)
1295            .prop(
1296                SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1297                SqlBindStyle::DollarNumeric.to_string(),
1298            )
1299            .load("iceberg", props)
1300            .await;
1301
1302        assert!(catalog.is_ok());
1303
1304        let catalog = catalog.unwrap();
1305        assert!(catalog.warehouse_location == warehouse_location);
1306        assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1307    }
1308
1309    /// invalid value for `SqlBindStyle` causes catalog creation to fail
1310    #[tokio::test]
1311    async fn test_builder_props_invalid_bind_style_fails() {
1312        let sql_lite_uri = format!("sqlite:{}", temp_path());
1313        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1314        let warehouse_location = temp_path();
1315
1316        let catalog = SqlCatalogBuilder::default()
1317            .load(
1318                "iceberg",
1319                HashMap::from_iter([
1320                    (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1321                    (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1322                    (SQL_CATALOG_PROP_BIND_STYLE.to_string(), "AAA".to_string()),
1323                ]),
1324            )
1325            .await;
1326
1327        assert!(catalog.is_err());
1328    }
1329
1330    #[tokio::test]
1331    async fn test_list_namespaces_returns_empty_vector() {
1332        let warehouse_loc = temp_path();
1333        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1334
1335        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
1336    }
1337
1338    #[tokio::test]
1339    async fn test_list_namespaces_returns_empty_different_name() {
1340        let warehouse_loc = temp_path();
1341        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1342        let namespace_ident_1 = NamespaceIdent::new("a".into());
1343        let namespace_ident_2 = NamespaceIdent::new("b".into());
1344        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1345        assert_eq!(
1346            to_set(catalog.list_namespaces(None).await.unwrap()),
1347            to_set(vec![namespace_ident_1, namespace_ident_2])
1348        );
1349
1350        let catalog2 = new_sql_catalog(warehouse_loc, Some("test")).await;
1351        assert_eq!(catalog2.list_namespaces(None).await.unwrap(), vec![]);
1352    }
1353
1354    #[tokio::test]
1355    async fn test_list_namespaces_returns_multiple_namespaces() {
1356        let warehouse_loc = temp_path();
1357        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1358        let namespace_ident_1 = NamespaceIdent::new("a".into());
1359        let namespace_ident_2 = NamespaceIdent::new("b".into());
1360        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1361
1362        assert_eq!(
1363            to_set(catalog.list_namespaces(None).await.unwrap()),
1364            to_set(vec![namespace_ident_1, namespace_ident_2])
1365        );
1366    }
1367
1368    #[tokio::test]
1369    async fn test_list_namespaces_returns_only_top_level_namespaces() {
1370        let warehouse_loc = temp_path();
1371        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1372        let namespace_ident_1 = NamespaceIdent::new("a".into());
1373        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1374        let namespace_ident_3 = NamespaceIdent::new("b".into());
1375        create_namespaces(&catalog, &vec![
1376            &namespace_ident_1,
1377            &namespace_ident_2,
1378            &namespace_ident_3,
1379        ])
1380        .await;
1381
1382        assert_eq!(
1383            to_set(catalog.list_namespaces(None).await.unwrap()),
1384            to_set(vec![namespace_ident_1, namespace_ident_3])
1385        );
1386    }
1387
1388    #[tokio::test]
1389    async fn test_list_namespaces_returns_no_namespaces_under_parent() {
1390        let warehouse_loc = temp_path();
1391        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1392        let namespace_ident_1 = NamespaceIdent::new("a".into());
1393        let namespace_ident_2 = NamespaceIdent::new("b".into());
1394        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1395
1396        assert_eq!(
1397            catalog
1398                .list_namespaces(Some(&namespace_ident_1))
1399                .await
1400                .unwrap(),
1401            vec![]
1402        );
1403    }
1404
1405    #[tokio::test]
1406    async fn test_list_namespaces_returns_namespace_under_parent() {
1407        let warehouse_loc = temp_path();
1408        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1409        let namespace_ident_1 = NamespaceIdent::new("a".into());
1410        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1411        let namespace_ident_3 = NamespaceIdent::new("c".into());
1412        create_namespaces(&catalog, &vec![
1413            &namespace_ident_1,
1414            &namespace_ident_2,
1415            &namespace_ident_3,
1416        ])
1417        .await;
1418
1419        assert_eq!(
1420            to_set(catalog.list_namespaces(None).await.unwrap()),
1421            to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
1422        );
1423
1424        assert_eq!(
1425            catalog
1426                .list_namespaces(Some(&namespace_ident_1))
1427                .await
1428                .unwrap(),
1429            vec![NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()]
1430        );
1431    }
1432
1433    #[tokio::test]
1434    async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
1435        let warehouse_loc = temp_path();
1436        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1437        let namespace_ident_1 = NamespaceIdent::new("a".to_string());
1438        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
1439        let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1440        let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
1441        let namespace_ident_5 = NamespaceIdent::new("b".into());
1442        create_namespaces(&catalog, &vec![
1443            &namespace_ident_1,
1444            &namespace_ident_2,
1445            &namespace_ident_3,
1446            &namespace_ident_4,
1447            &namespace_ident_5,
1448        ])
1449        .await;
1450
1451        assert_eq!(
1452            to_set(
1453                catalog
1454                    .list_namespaces(Some(&namespace_ident_1))
1455                    .await
1456                    .unwrap()
1457            ),
1458            to_set(vec![
1459                NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(),
1460                NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(),
1461                NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(),
1462            ])
1463        );
1464    }
1465
1466    #[tokio::test]
1467    async fn test_namespace_exists_returns_false() {
1468        let warehouse_loc = temp_path();
1469        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1470        let namespace_ident = NamespaceIdent::new("a".into());
1471        create_namespace(&catalog, &namespace_ident).await;
1472
1473        assert!(
1474            !catalog
1475                .namespace_exists(&NamespaceIdent::new("b".into()))
1476                .await
1477                .unwrap()
1478        );
1479    }
1480
1481    #[tokio::test]
1482    async fn test_namespace_exists_returns_true() {
1483        let warehouse_loc = temp_path();
1484        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1485        let namespace_ident = NamespaceIdent::new("a".into());
1486        create_namespace(&catalog, &namespace_ident).await;
1487
1488        assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
1489    }
1490
1491    #[tokio::test]
1492    async fn test_create_namespace_with_properties() {
1493        let warehouse_loc = temp_path();
1494        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1495        let namespace_ident = NamespaceIdent::new("abc".into());
1496
1497        let mut properties = default_properties();
1498        properties.insert("k".into(), "v".into());
1499
1500        assert_eq!(
1501            catalog
1502                .create_namespace(&namespace_ident, properties.clone())
1503                .await
1504                .unwrap(),
1505            Namespace::with_properties(namespace_ident.clone(), properties.clone())
1506        );
1507
1508        assert_eq!(
1509            catalog.get_namespace(&namespace_ident).await.unwrap(),
1510            Namespace::with_properties(namespace_ident, properties)
1511        );
1512    }
1513
1514    #[tokio::test]
1515    async fn test_create_namespace_throws_error_if_namespace_already_exists() {
1516        let warehouse_loc = temp_path();
1517        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1518        let namespace_ident = NamespaceIdent::new("a".into());
1519        create_namespace(&catalog, &namespace_ident).await;
1520
1521        assert_eq!(
1522            catalog
1523                .create_namespace(&namespace_ident, HashMap::new())
1524                .await
1525                .unwrap_err()
1526                .to_string(),
1527            format!(
1528                "Unexpected => Namespace {:?} already exists",
1529                &namespace_ident
1530            )
1531        );
1532
1533        assert_eq!(
1534            catalog.get_namespace(&namespace_ident).await.unwrap(),
1535            Namespace::with_properties(namespace_ident, default_properties())
1536        );
1537    }
1538
1539    #[tokio::test]
1540    async fn test_create_nested_namespace() {
1541        let warehouse_loc = temp_path();
1542        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1543        let parent_namespace_ident = NamespaceIdent::new("a".into());
1544        create_namespace(&catalog, &parent_namespace_ident).await;
1545
1546        let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1547
1548        assert_eq!(
1549            catalog
1550                .create_namespace(&child_namespace_ident, HashMap::new())
1551                .await
1552                .unwrap(),
1553            Namespace::new(child_namespace_ident.clone())
1554        );
1555
1556        assert_eq!(
1557            catalog.get_namespace(&child_namespace_ident).await.unwrap(),
1558            Namespace::with_properties(child_namespace_ident, default_properties())
1559        );
1560    }
1561
1562    #[tokio::test]
1563    async fn test_create_deeply_nested_namespace() {
1564        let warehouse_loc = temp_path();
1565        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1566        let namespace_ident_a = NamespaceIdent::new("a".into());
1567        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1568        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1569
1570        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1571
1572        assert_eq!(
1573            catalog
1574                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
1575                .await
1576                .unwrap(),
1577            Namespace::new(namespace_ident_a_b_c.clone())
1578        );
1579
1580        assert_eq!(
1581            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
1582            Namespace::with_properties(namespace_ident_a_b_c, default_properties())
1583        );
1584    }
1585
1586    #[tokio::test]
1587    async fn test_update_namespace_noop() {
1588        let warehouse_loc = temp_path();
1589        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1590        let namespace_ident = NamespaceIdent::new("a".into());
1591        create_namespace(&catalog, &namespace_ident).await;
1592
1593        catalog
1594            .update_namespace(&namespace_ident, HashMap::new())
1595            .await
1596            .unwrap();
1597
1598        assert_eq!(
1599            *catalog
1600                .get_namespace(&namespace_ident)
1601                .await
1602                .unwrap()
1603                .properties(),
1604            HashMap::from_iter([("exists".to_string(), "true".to_string())])
1605        )
1606    }
1607
1608    #[tokio::test]
1609    async fn test_update_namespace() {
1610        let warehouse_loc = temp_path();
1611        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1612        let namespace_ident = NamespaceIdent::new("a".into());
1613        create_namespace(&catalog, &namespace_ident).await;
1614
1615        let mut props = HashMap::from_iter([
1616            ("prop1".to_string(), "val1".to_string()),
1617            ("prop2".into(), "val2".into()),
1618        ]);
1619
1620        catalog
1621            .update_namespace(&namespace_ident, props.clone())
1622            .await
1623            .unwrap();
1624
1625        props.insert("exists".into(), "true".into());
1626
1627        assert_eq!(
1628            *catalog
1629                .get_namespace(&namespace_ident)
1630                .await
1631                .unwrap()
1632                .properties(),
1633            props
1634        )
1635    }
1636
1637    #[tokio::test]
1638    async fn test_update_nested_namespace() {
1639        let warehouse_loc = temp_path();
1640        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1641        let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1642        create_namespace(&catalog, &namespace_ident).await;
1643
1644        let mut props = HashMap::from_iter([
1645            ("prop1".to_string(), "val1".to_string()),
1646            ("prop2".into(), "val2".into()),
1647        ]);
1648
1649        catalog
1650            .update_namespace(&namespace_ident, props.clone())
1651            .await
1652            .unwrap();
1653
1654        props.insert("exists".into(), "true".into());
1655
1656        assert_eq!(
1657            *catalog
1658                .get_namespace(&namespace_ident)
1659                .await
1660                .unwrap()
1661                .properties(),
1662            props
1663        )
1664    }
1665
1666    #[tokio::test]
1667    async fn test_update_namespace_errors_if_namespace_doesnt_exist() {
1668        let warehouse_loc = temp_path();
1669        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1670        let namespace_ident = NamespaceIdent::new("a".into());
1671
1672        let props = HashMap::from_iter([
1673            ("prop1".to_string(), "val1".to_string()),
1674            ("prop2".into(), "val2".into()),
1675        ]);
1676
1677        let err = catalog
1678            .update_namespace(&namespace_ident, props)
1679            .await
1680            .unwrap_err();
1681
1682        assert_eq!(
1683            err.message(),
1684            format!("No such namespace: {namespace_ident:?}")
1685        );
1686    }
1687
1688    #[tokio::test]
1689    async fn test_update_namespace_errors_if_nested_namespace_doesnt_exist() {
1690        let warehouse_loc = temp_path();
1691        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1692        let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1693
1694        let props = HashMap::from_iter([
1695            ("prop1".to_string(), "val1".to_string()),
1696            ("prop2".into(), "val2".into()),
1697        ]);
1698
1699        let err = catalog
1700            .update_namespace(&namespace_ident, props)
1701            .await
1702            .unwrap_err();
1703
1704        assert_eq!(
1705            err.message(),
1706            format!("No such namespace: {namespace_ident:?}")
1707        );
1708    }
1709
1710    #[tokio::test]
1711    async fn test_drop_namespace() {
1712        let warehouse_loc = temp_path();
1713        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1714        let namespace_ident = NamespaceIdent::new("abc".into());
1715        create_namespace(&catalog, &namespace_ident).await;
1716
1717        catalog.drop_namespace(&namespace_ident).await.unwrap();
1718
1719        assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
1720    }
1721
1722    #[tokio::test]
1723    async fn test_drop_nested_namespace() {
1724        let warehouse_loc = temp_path();
1725        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1726        let namespace_ident_a = NamespaceIdent::new("a".into());
1727        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1728        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1729
1730        catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1731
1732        assert!(
1733            !catalog
1734                .namespace_exists(&namespace_ident_a_b)
1735                .await
1736                .unwrap()
1737        );
1738
1739        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1740    }
1741
1742    #[tokio::test]
1743    async fn test_drop_deeply_nested_namespace() {
1744        let warehouse_loc = temp_path();
1745        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1746        let namespace_ident_a = NamespaceIdent::new("a".into());
1747        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1748        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1749        create_namespaces(&catalog, &vec![
1750            &namespace_ident_a,
1751            &namespace_ident_a_b,
1752            &namespace_ident_a_b_c,
1753        ])
1754        .await;
1755
1756        catalog
1757            .drop_namespace(&namespace_ident_a_b_c)
1758            .await
1759            .unwrap();
1760
1761        assert!(
1762            !catalog
1763                .namespace_exists(&namespace_ident_a_b_c)
1764                .await
1765                .unwrap()
1766        );
1767
1768        assert!(
1769            catalog
1770                .namespace_exists(&namespace_ident_a_b)
1771                .await
1772                .unwrap()
1773        );
1774
1775        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1776    }
1777
1778    #[tokio::test]
1779    async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1780        let warehouse_loc = temp_path();
1781        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1782
1783        let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1784        assert_eq!(
1785            catalog
1786                .drop_namespace(&non_existent_namespace_ident)
1787                .await
1788                .unwrap_err()
1789                .to_string(),
1790            format!("Unexpected => No such namespace: {non_existent_namespace_ident:?}")
1791        )
1792    }
1793
1794    #[tokio::test]
1795    async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1796        let warehouse_loc = temp_path();
1797        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1798        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1799
1800        let non_existent_namespace_ident =
1801            NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1802        assert_eq!(
1803            catalog
1804                .drop_namespace(&non_existent_namespace_ident)
1805                .await
1806                .unwrap_err()
1807                .to_string(),
1808            format!("Unexpected => No such namespace: {non_existent_namespace_ident:?}")
1809        )
1810    }
1811
1812    #[tokio::test]
1813    async fn test_dropping_a_namespace_does_not_drop_namespaces_nested_under_that_one() {
1814        let warehouse_loc = temp_path();
1815        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1816        let namespace_ident_a = NamespaceIdent::new("a".into());
1817        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1818        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1819
1820        catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1821
1822        assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1823
1824        assert!(
1825            catalog
1826                .namespace_exists(&namespace_ident_a_b)
1827                .await
1828                .unwrap()
1829        );
1830    }
1831
1832    #[tokio::test]
1833    async fn test_list_tables_returns_empty_vector() {
1834        let warehouse_loc = temp_path();
1835        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1836        let namespace_ident = NamespaceIdent::new("a".into());
1837        create_namespace(&catalog, &namespace_ident).await;
1838
1839        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1840    }
1841
1842    #[tokio::test]
1843    async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1844        let warehouse_loc = temp_path();
1845        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1846
1847        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1848
1849        assert_eq!(
1850            catalog
1851                .list_tables(&non_existent_namespace_ident)
1852                .await
1853                .unwrap_err()
1854                .to_string(),
1855            format!("Unexpected => No such namespace: {non_existent_namespace_ident:?}"),
1856        );
1857    }
1858
1859    #[tokio::test]
1860    async fn test_create_table_with_location() {
1861        let warehouse_loc = temp_path();
1862        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1863        let namespace_ident = NamespaceIdent::new("a".into());
1864        create_namespace(&catalog, &namespace_ident).await;
1865
1866        let table_name = "abc";
1867        let location = warehouse_loc.clone();
1868        let table_creation = TableCreation::builder()
1869            .name(table_name.into())
1870            .location(location.clone())
1871            .schema(simple_table_schema())
1872            .build();
1873
1874        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1875
1876        assert_table_eq(
1877            &catalog
1878                .create_table(&namespace_ident, table_creation)
1879                .await
1880                .unwrap(),
1881            &expected_table_ident,
1882            &simple_table_schema(),
1883        );
1884
1885        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1886
1887        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1888
1889        assert!(
1890            table
1891                .metadata_location()
1892                .unwrap()
1893                .to_string()
1894                .starts_with(&location)
1895        )
1896    }
1897
1898    #[tokio::test]
1899    async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1900        let warehouse_loc = temp_path();
1901        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1902
1903        let namespace_ident = NamespaceIdent::new("a".into());
1904        let mut namespace_properties = HashMap::new();
1905        let namespace_location = temp_path();
1906        namespace_properties.insert(
1907            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1908            namespace_location.to_string(),
1909        );
1910        catalog
1911            .create_namespace(&namespace_ident, namespace_properties)
1912            .await
1913            .unwrap();
1914
1915        let table_name = "tbl1";
1916        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1917        let expected_table_metadata_location_regex =
1918            format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1919
1920        let table = catalog
1921            .create_table(
1922                &namespace_ident,
1923                TableCreation::builder()
1924                    .name(table_name.into())
1925                    .schema(simple_table_schema())
1926                    // no location specified for table
1927                    .build(),
1928            )
1929            .await
1930            .unwrap();
1931        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1932        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1933
1934        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1935        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1936        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1937    }
1938
1939    #[tokio::test]
1940    async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1941     {
1942        let warehouse_loc = temp_path();
1943        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1944
1945        let namespace_ident = NamespaceIdent::new("a".into());
1946        let mut namespace_properties = HashMap::new();
1947        let namespace_location = temp_path();
1948        namespace_properties.insert(
1949            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1950            namespace_location.to_string(),
1951        );
1952        catalog
1953            .create_namespace(&namespace_ident, namespace_properties)
1954            .await
1955            .unwrap();
1956
1957        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1958        let mut nested_namespace_properties = HashMap::new();
1959        let nested_namespace_location = temp_path();
1960        nested_namespace_properties.insert(
1961            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1962            nested_namespace_location.to_string(),
1963        );
1964        catalog
1965            .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1966            .await
1967            .unwrap();
1968
1969        let table_name = "tbl1";
1970        let expected_table_ident =
1971            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1972        let expected_table_metadata_location_regex = format!(
1973            "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1974        );
1975
1976        let table = catalog
1977            .create_table(
1978                &nested_namespace_ident,
1979                TableCreation::builder()
1980                    .name(table_name.into())
1981                    .schema(simple_table_schema())
1982                    // no location specified for table
1983                    .build(),
1984            )
1985            .await
1986            .unwrap();
1987        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1988        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1989
1990        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1991        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1992        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1993    }
1994
1995    #[tokio::test]
1996    async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1997     {
1998        let warehouse_loc = temp_path();
1999        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2000
2001        let namespace_ident = NamespaceIdent::new("a".into());
2002        // note: no location specified in namespace_properties
2003        let namespace_properties = HashMap::new();
2004        catalog
2005            .create_namespace(&namespace_ident, namespace_properties)
2006            .await
2007            .unwrap();
2008
2009        let table_name = "tbl1";
2010        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2011        let expected_table_metadata_location_regex =
2012            format!("^{warehouse_loc}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
2013
2014        let table = catalog
2015            .create_table(
2016                &namespace_ident,
2017                TableCreation::builder()
2018                    .name(table_name.into())
2019                    .schema(simple_table_schema())
2020                    // no location specified for table
2021                    .build(),
2022            )
2023            .await
2024            .unwrap();
2025        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2026        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2027
2028        let table = catalog.load_table(&expected_table_ident).await.unwrap();
2029        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2030        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2031    }
2032
2033    #[tokio::test]
2034    async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
2035     {
2036        let warehouse_loc = temp_path();
2037        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2038
2039        let namespace_ident = NamespaceIdent::new("a".into());
2040        create_namespace(&catalog, &namespace_ident).await;
2041
2042        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
2043        create_namespace(&catalog, &nested_namespace_ident).await;
2044
2045        let table_name = "tbl1";
2046        let expected_table_ident =
2047            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
2048        let expected_table_metadata_location_regex =
2049            format!("^{warehouse_loc}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
2050
2051        let table = catalog
2052            .create_table(
2053                &nested_namespace_ident,
2054                TableCreation::builder()
2055                    .name(table_name.into())
2056                    .schema(simple_table_schema())
2057                    // no location specified for table
2058                    .build(),
2059            )
2060            .await
2061            .unwrap();
2062        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2063        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2064
2065        let table = catalog.load_table(&expected_table_ident).await.unwrap();
2066        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2067        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2068    }
2069
2070    #[tokio::test]
2071    async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
2072        let warehouse_loc = temp_path();
2073        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2074        let namespace_ident = NamespaceIdent::new("a".into());
2075        create_namespace(&catalog, &namespace_ident).await;
2076        let table_name = "tbl1";
2077        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2078        create_table(&catalog, &table_ident).await;
2079
2080        let tmp_dir = TempDir::new().unwrap();
2081        let location = tmp_dir.path().to_str().unwrap().to_string();
2082
2083        assert_eq!(
2084            catalog
2085                .create_table(
2086                    &namespace_ident,
2087                    TableCreation::builder()
2088                        .name(table_name.into())
2089                        .schema(simple_table_schema())
2090                        .location(location)
2091                        .build()
2092                )
2093                .await
2094                .unwrap_err()
2095                .to_string(),
2096            format!("Unexpected => Table {:?} already exists.", &table_ident)
2097        );
2098    }
2099
2100    #[tokio::test]
2101    async fn test_rename_table_in_same_namespace() {
2102        let warehouse_loc = temp_path();
2103        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2104        let namespace_ident = NamespaceIdent::new("n1".into());
2105        create_namespace(&catalog, &namespace_ident).await;
2106        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2107        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
2108        create_table(&catalog, &src_table_ident).await;
2109
2110        catalog
2111            .rename_table(&src_table_ident, &dst_table_ident)
2112            .await
2113            .unwrap();
2114
2115        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
2116            dst_table_ident
2117        ],);
2118    }
2119
2120    #[tokio::test]
2121    async fn test_rename_table_across_namespaces() {
2122        let warehouse_loc = temp_path();
2123        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2124        let src_namespace_ident = NamespaceIdent::new("a".into());
2125        let dst_namespace_ident = NamespaceIdent::new("b".into());
2126        create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
2127        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
2128        let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
2129        create_table(&catalog, &src_table_ident).await;
2130
2131        catalog
2132            .rename_table(&src_table_ident, &dst_table_ident)
2133            .await
2134            .unwrap();
2135
2136        assert_eq!(
2137            catalog.list_tables(&src_namespace_ident).await.unwrap(),
2138            vec![],
2139        );
2140
2141        assert_eq!(
2142            catalog.list_tables(&dst_namespace_ident).await.unwrap(),
2143            vec![dst_table_ident],
2144        );
2145    }
2146
2147    #[tokio::test]
2148    async fn test_rename_table_src_table_is_same_as_dst_table() {
2149        let warehouse_loc = temp_path();
2150        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2151        let namespace_ident = NamespaceIdent::new("n1".into());
2152        create_namespace(&catalog, &namespace_ident).await;
2153        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
2154        create_table(&catalog, &table_ident).await;
2155
2156        catalog
2157            .rename_table(&table_ident, &table_ident)
2158            .await
2159            .unwrap();
2160
2161        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
2162            table_ident
2163        ],);
2164    }
2165
2166    #[tokio::test]
2167    async fn test_rename_table_across_nested_namespaces() {
2168        let warehouse_loc = temp_path();
2169        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2170        let namespace_ident_a = NamespaceIdent::new("a".into());
2171        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
2172        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
2173        create_namespaces(&catalog, &vec![
2174            &namespace_ident_a,
2175            &namespace_ident_a_b,
2176            &namespace_ident_a_b_c,
2177        ])
2178        .await;
2179
2180        let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
2181        create_tables(&catalog, vec![&src_table_ident]).await;
2182
2183        let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
2184        catalog
2185            .rename_table(&src_table_ident, &dst_table_ident)
2186            .await
2187            .unwrap();
2188
2189        assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
2190
2191        assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
2192    }
2193
2194    #[tokio::test]
2195    async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
2196        let warehouse_loc = temp_path();
2197        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2198        let src_namespace_ident = NamespaceIdent::new("n1".into());
2199        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
2200        create_namespace(&catalog, &src_namespace_ident).await;
2201        create_table(&catalog, &src_table_ident).await;
2202
2203        let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
2204        let dst_table_ident =
2205            TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
2206        assert_eq!(
2207            catalog
2208                .rename_table(&src_table_ident, &dst_table_ident)
2209                .await
2210                .unwrap_err()
2211                .to_string(),
2212            format!("Unexpected => No such namespace: {non_existent_dst_namespace_ident:?}"),
2213        );
2214    }
2215
2216    #[tokio::test]
2217    async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
2218        let warehouse_loc = temp_path();
2219        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2220        let namespace_ident = NamespaceIdent::new("n1".into());
2221        create_namespace(&catalog, &namespace_ident).await;
2222        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2223        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
2224
2225        assert_eq!(
2226            catalog
2227                .rename_table(&src_table_ident, &dst_table_ident)
2228                .await
2229                .unwrap_err()
2230                .to_string(),
2231            format!("Unexpected => No such table: {src_table_ident:?}"),
2232        );
2233    }
2234
2235    #[tokio::test]
2236    async fn test_rename_table_throws_error_if_dst_table_already_exists() {
2237        let warehouse_loc = temp_path();
2238        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2239        let namespace_ident = NamespaceIdent::new("n1".into());
2240        create_namespace(&catalog, &namespace_ident).await;
2241        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2242        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
2243        create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
2244
2245        assert_eq!(
2246            catalog
2247                .rename_table(&src_table_ident, &dst_table_ident)
2248                .await
2249                .unwrap_err()
2250                .to_string(),
2251            format!("Unexpected => Table {:?} already exists.", &dst_table_ident),
2252        );
2253    }
2254
2255    #[tokio::test]
2256    async fn test_drop_table_throws_error_if_table_not_exist() {
2257        let warehouse_loc = temp_path();
2258        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2259        let namespace_ident = NamespaceIdent::new("a".into());
2260        let table_name = "tbl1";
2261        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2262        create_namespace(&catalog, &namespace_ident).await;
2263
2264        let err = catalog
2265            .drop_table(&table_ident)
2266            .await
2267            .unwrap_err()
2268            .to_string();
2269        assert_eq!(
2270            err,
2271            "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
2272        );
2273    }
2274
2275    #[tokio::test]
2276    async fn test_drop_table() {
2277        let warehouse_loc = temp_path();
2278        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2279        let namespace_ident = NamespaceIdent::new("a".into());
2280        let table_name = "tbl1";
2281        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2282        create_namespace(&catalog, &namespace_ident).await;
2283
2284        let location = warehouse_loc.clone();
2285        let table_creation = TableCreation::builder()
2286            .name(table_name.into())
2287            .location(location.clone())
2288            .schema(simple_table_schema())
2289            .build();
2290
2291        catalog
2292            .create_table(&namespace_ident, table_creation)
2293            .await
2294            .unwrap();
2295
2296        let table = catalog.load_table(&table_ident).await.unwrap();
2297        assert_table_eq(&table, &table_ident, &simple_table_schema());
2298
2299        catalog.drop_table(&table_ident).await.unwrap();
2300        let err = catalog
2301            .load_table(&table_ident)
2302            .await
2303            .unwrap_err()
2304            .to_string();
2305        assert_eq!(
2306            err,
2307            "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
2308        );
2309    }
2310
2311    #[tokio::test]
2312    async fn test_register_table_throws_error_if_table_with_same_name_already_exists() {
2313        let warehouse_loc = temp_path();
2314        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2315        let namespace_ident = NamespaceIdent::new("a".into());
2316        create_namespace(&catalog, &namespace_ident).await;
2317        let table_name = "tbl1";
2318        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2319        create_table(&catalog, &table_ident).await;
2320
2321        assert_eq!(
2322            catalog
2323                .register_table(&table_ident, warehouse_loc)
2324                .await
2325                .unwrap_err()
2326                .to_string(),
2327            format!("Unexpected => Table {:?} already exists.", &table_ident)
2328        );
2329    }
2330
2331    #[tokio::test]
2332    async fn test_register_table() {
2333        let warehouse_loc = temp_path();
2334        let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2335        let namespace_ident = NamespaceIdent::new("a".into());
2336        create_namespace(&catalog, &namespace_ident).await;
2337
2338        let table_name = "abc";
2339        let location = warehouse_loc.clone();
2340        let table_creation = TableCreation::builder()
2341            .name(table_name.into())
2342            .location(location.clone())
2343            .schema(simple_table_schema())
2344            .build();
2345
2346        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2347        let expected_table = catalog
2348            .create_table(&namespace_ident, table_creation)
2349            .await
2350            .unwrap();
2351
2352        let metadata_location = expected_table
2353            .metadata_location()
2354            .expect("Expected metadata location to be set")
2355            .to_string();
2356
2357        assert_table_eq(&expected_table, &table_ident, &simple_table_schema());
2358
2359        let _ = catalog.drop_table(&table_ident).await;
2360
2361        let table = catalog
2362            .register_table(&table_ident, metadata_location.clone())
2363            .await
2364            .unwrap();
2365
2366        assert_eq!(table.identifier(), expected_table.identifier());
2367        assert_eq!(table.metadata_location(), Some(metadata_location.as_str()));
2368    }
2369
2370    #[tokio::test]
2371    async fn test_update_table() {
2372        let warehouse_loc = temp_path();
2373        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2374
2375        // Create a test namespace and table
2376        let namespace_ident = NamespaceIdent::new("ns1".into());
2377        create_namespace(&catalog, &namespace_ident).await;
2378        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2379        create_table(&catalog, &table_ident).await;
2380
2381        let table = catalog.load_table(&table_ident).await.unwrap();
2382
2383        // Store the original metadata location for comparison
2384        let original_metadata_location = table.metadata_location().unwrap().to_string();
2385
2386        // Create a transaction to update the table
2387        let tx = Transaction::new(&table);
2388        let tx = tx
2389            .update_table_properties()
2390            .set("test_property".to_string(), "test_value".to_string())
2391            .apply(tx)
2392            .unwrap();
2393
2394        // Commit the transaction to the catalog
2395        let updated_table = tx.commit(&catalog).await.unwrap();
2396
2397        // Verify the update was successful
2398        assert_eq!(
2399            updated_table.metadata().properties().get("test_property"),
2400            Some(&"test_value".to_string())
2401        );
2402        // Verify the metadata location has been updated
2403        assert_ne!(
2404            updated_table.metadata_location().unwrap(),
2405            original_metadata_location.as_str()
2406        );
2407
2408        // Load the table again from the catalog to verify changes were persisted
2409        let reloaded = catalog.load_table(&table_ident).await.unwrap();
2410
2411        // Verify the reloaded table matches the updated table
2412        assert_eq!(
2413            reloaded.metadata().properties().get("test_property"),
2414            Some(&"test_value".to_string())
2415        );
2416        assert_eq!(
2417            reloaded.metadata_location(),
2418            updated_table.metadata_location()
2419        );
2420    }
2421}