1use std::collections::{HashMap, HashSet};
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use async_trait::async_trait;
24use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
25use iceberg::spec::{TableMetadata, TableMetadataBuilder};
26use iceberg::table::Table;
27use iceberg::{
28 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
29 TableCommit, TableCreation, TableIdent,
30};
31use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers};
32use sqlx::{Any, AnyPool, Row, Transaction};
33
34use crate::error::{
35 from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err,
36};
37
38pub const SQL_CATALOG_PROP_URI: &str = "uri";
40pub const SQL_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
42pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style";
44
45static CATALOG_TABLE_NAME: &str = "iceberg_tables";
46static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
47static CATALOG_FIELD_TABLE_NAME: &str = "table_name";
48static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace";
49static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location";
50static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
51static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type";
52static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE";
53
54static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties";
55static NAMESPACE_FIELD_NAME: &str = "namespace";
56static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key";
57static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value";
58
59static NAMESPACE_LOCATION_PROPERTY_KEY: &str = "location";
60
61static MAX_CONNECTIONS: u32 = 10; static IDLE_TIMEOUT: u64 = 10; static TEST_BEFORE_ACQUIRE: bool = true; #[derive(Debug)]
67pub struct SqlCatalogBuilder {
68 config: SqlCatalogConfig,
69 storage_factory: Option<Arc<dyn StorageFactory>>,
70}
71
72impl Default for SqlCatalogBuilder {
73 fn default() -> Self {
74 Self {
75 config: SqlCatalogConfig {
76 uri: "".to_string(),
77 name: "".to_string(),
78 warehouse_location: "".to_string(),
79 sql_bind_style: SqlBindStyle::DollarNumeric,
80 props: HashMap::new(),
81 },
82 storage_factory: None,
83 }
84 }
85}
86
87impl SqlCatalogBuilder {
88 pub fn uri(mut self, uri: impl Into<String>) -> Self {
93 self.config.uri = uri.into();
94 self
95 }
96
97 pub fn warehouse_location(mut self, location: impl Into<String>) -> Self {
102 self.config.warehouse_location = location.into();
103 self
104 }
105
106 pub fn sql_bind_style(mut self, sql_bind_style: SqlBindStyle) -> Self {
111 self.config.sql_bind_style = sql_bind_style;
112 self
113 }
114
115 pub fn props(mut self, props: HashMap<String, String>) -> Self {
120 for (k, v) in props {
121 self.config.props.insert(k, v);
122 }
123 self
124 }
125
126 pub fn prop(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
133 self.config.props.insert(key.into(), value.into());
134 self
135 }
136}
137
138impl CatalogBuilder for SqlCatalogBuilder {
139 type C = SqlCatalog;
140
141 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
142 self.storage_factory = Some(storage_factory);
143 self
144 }
145
146 fn load(
147 mut self,
148 name: impl Into<String>,
149 props: HashMap<String, String>,
150 ) -> impl Future<Output = Result<Self::C>> + Send {
151 for (k, v) in props {
152 self.config.props.insert(k, v);
153 }
154
155 if let Some(uri) = self.config.props.remove(SQL_CATALOG_PROP_URI) {
156 self.config.uri = uri;
157 }
158 if let Some(warehouse_location) = self.config.props.remove(SQL_CATALOG_PROP_WAREHOUSE) {
159 self.config.warehouse_location = warehouse_location;
160 }
161
162 let name = name.into();
163
164 let mut valid_sql_bind_style = true;
165 if let Some(sql_bind_style) = self.config.props.remove(SQL_CATALOG_PROP_BIND_STYLE) {
166 if let Ok(sql_bind_style) = SqlBindStyle::from_str(&sql_bind_style) {
167 self.config.sql_bind_style = sql_bind_style;
168 } else {
169 valid_sql_bind_style = false;
170 }
171 }
172
173 let valid_name = !name.trim().is_empty();
174
175 async move {
176 if !valid_name {
177 Err(Error::new(
178 ErrorKind::DataInvalid,
179 "Catalog name cannot be empty",
180 ))
181 } else if !valid_sql_bind_style {
182 Err(Error::new(
183 ErrorKind::DataInvalid,
184 format!(
185 "`{}` values are valid only if they're `{}` or `{}`",
186 SQL_CATALOG_PROP_BIND_STYLE,
187 SqlBindStyle::DollarNumeric,
188 SqlBindStyle::QMark
189 ),
190 ))
191 } else {
192 self.config.name = name;
193 SqlCatalog::new(self.config, self.storage_factory).await
194 }
195 }
196 }
197}
198
199#[derive(Debug)]
208struct SqlCatalogConfig {
209 uri: String,
210 name: String,
211 warehouse_location: String,
212 sql_bind_style: SqlBindStyle,
213 props: HashMap<String, String>,
214}
215
216#[derive(Debug)]
217pub struct SqlCatalog {
219 name: String,
220 connection: AnyPool,
221 warehouse_location: String,
222 fileio: FileIO,
223 sql_bind_style: SqlBindStyle,
224}
225
226#[derive(Debug, PartialEq, strum::EnumString, strum::Display)]
227pub enum SqlBindStyle {
229 DollarNumeric,
231 QMark,
233}
234
235impl SqlCatalog {
236 async fn new(
238 config: SqlCatalogConfig,
239 storage_factory: Option<Arc<dyn StorageFactory>>,
240 ) -> Result<Self> {
241 let factory = storage_factory.ok_or_else(|| {
242 Error::new(
243 ErrorKind::Unexpected,
244 "StorageFactory must be provided for SqlCatalog. Use `with_storage_factory` to configure it.",
245 )
246 })?;
247 let fileio = FileIOBuilder::new(factory).build();
248
249 install_default_drivers();
250 let max_connections: u32 = config
251 .props
252 .get("pool.max-connections")
253 .map(|v| v.parse().unwrap())
254 .unwrap_or(MAX_CONNECTIONS);
255 let idle_timeout: u64 = config
256 .props
257 .get("pool.idle-timeout")
258 .map(|v| v.parse().unwrap())
259 .unwrap_or(IDLE_TIMEOUT);
260 let test_before_acquire: bool = config
261 .props
262 .get("pool.test-before-acquire")
263 .map(|v| v.parse().unwrap())
264 .unwrap_or(TEST_BEFORE_ACQUIRE);
265
266 let pool = AnyPoolOptions::new()
267 .max_connections(max_connections)
268 .idle_timeout(Duration::from_secs(idle_timeout))
269 .test_before_acquire(test_before_acquire)
270 .connect(&config.uri)
271 .await
272 .map_err(from_sqlx_error)?;
273
274 sqlx::query(&format!(
275 "CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_NAME} (
276 {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
277 {CATALOG_FIELD_TABLE_NAMESPACE} VARCHAR(255) NOT NULL,
278 {CATALOG_FIELD_TABLE_NAME} VARCHAR(255) NOT NULL,
279 {CATALOG_FIELD_METADATA_LOCATION_PROP} VARCHAR(1000),
280 {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} VARCHAR(1000),
281 {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5),
282 PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}))"
283 ))
284 .execute(&pool)
285 .await
286 .map_err(from_sqlx_error)?;
287
288 sqlx::query(&format!(
289 "CREATE TABLE IF NOT EXISTS {NAMESPACE_TABLE_NAME} (
290 {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
291 {NAMESPACE_FIELD_NAME} VARCHAR(255) NOT NULL,
292 {NAMESPACE_FIELD_PROPERTY_KEY} VARCHAR(255),
293 {NAMESPACE_FIELD_PROPERTY_VALUE} VARCHAR(1000),
294 PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}))"
295 ))
296 .execute(&pool)
297 .await
298 .map_err(from_sqlx_error)?;
299
300 Ok(SqlCatalog {
301 name: config.name.to_owned(),
302 connection: pool,
303 warehouse_location: config.warehouse_location,
304 fileio,
305 sql_bind_style: config.sql_bind_style,
306 })
307 }
308
309 fn replace_placeholders(&self, query: &str) -> String {
311 match self.sql_bind_style {
312 SqlBindStyle::DollarNumeric => {
313 let mut count = 1;
314 query
315 .chars()
316 .fold(String::with_capacity(query.len()), |mut acc, c| {
317 if c == '?' {
318 acc.push('$');
319 acc.push_str(&count.to_string());
320 count += 1;
321 } else {
322 acc.push(c);
323 }
324 acc
325 })
326 }
327 _ => query.to_owned(),
328 }
329 }
330
331 async fn fetch_rows(&self, query: &str, args: Vec<Option<&str>>) -> Result<Vec<AnyRow>> {
333 let query_with_placeholders = self.replace_placeholders(query);
334
335 let mut sqlx_query = sqlx::query(&query_with_placeholders);
336 for arg in args {
337 sqlx_query = sqlx_query.bind(arg);
338 }
339
340 sqlx_query
341 .fetch_all(&self.connection)
342 .await
343 .map_err(from_sqlx_error)
344 }
345
346 async fn execute(
348 &self,
349 query: &str,
350 args: Vec<Option<&str>>,
351 transaction: Option<&mut Transaction<'_, Any>>,
352 ) -> Result<AnyQueryResult> {
353 let query_with_placeholders = self.replace_placeholders(query);
354
355 let mut sqlx_query = sqlx::query(&query_with_placeholders);
356 for arg in args {
357 sqlx_query = sqlx_query.bind(arg);
358 }
359
360 match transaction {
361 Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error),
362 None => {
363 let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
364 let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error);
365 let _ = tx.commit().await.map_err(from_sqlx_error);
366 result
367 }
368 }
369 }
370}
371
372#[async_trait]
373impl Catalog for SqlCatalog {
374 async fn list_namespaces(
375 &self,
376 parent: Option<&NamespaceIdent>,
377 ) -> Result<Vec<NamespaceIdent>> {
378 let all_namespaces_stmt = format!(
380 "SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
381 FROM {CATALOG_TABLE_NAME}
382 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
383 UNION
384 SELECT {NAMESPACE_FIELD_NAME}
385 FROM {NAMESPACE_TABLE_NAME}
386 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
387 );
388
389 let namespace_rows = self
390 .fetch_rows(&all_namespaces_stmt, vec![
391 Some(&self.name),
392 Some(&self.name),
393 ])
394 .await?;
395
396 let mut namespaces = HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len());
397
398 if let Some(parent) = parent {
399 if self.namespace_exists(parent).await? {
400 let parent_str = parent.join(".");
401
402 for row in namespace_rows.iter() {
403 let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
404 if nsp != parent_str && nsp.starts_with(&parent_str) {
406 namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?);
407 }
408 }
409
410 Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
411 } else {
412 no_such_namespace_err(parent)
413 }
414 } else {
415 for row in namespace_rows.iter() {
416 let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
417 let mut levels = nsp.split(".").collect::<Vec<&str>>();
418 if !levels.is_empty() {
419 let first_level = levels.drain(..1).collect::<Vec<&str>>();
420 namespaces.insert(NamespaceIdent::from_strs(first_level)?);
421 }
422 }
423
424 Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
425 }
426 }
427
428 async fn create_namespace(
429 &self,
430 namespace: &NamespaceIdent,
431 properties: HashMap<String, String>,
432 ) -> Result<Namespace> {
433 let exists = self.namespace_exists(namespace).await?;
434
435 if exists {
436 return Err(Error::new(
437 iceberg::ErrorKind::NamespaceAlreadyExists,
438 format!("Namespace {namespace:?} already exists"),
439 ));
440 }
441
442 let namespace_str = namespace.join(".");
443 let insert = format!(
444 "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
445 VALUES (?, ?, ?, ?)");
446 if !properties.is_empty() {
447 let mut insert_properties = properties.clone();
448 insert_properties.insert("exists".to_string(), "true".to_string());
449
450 let mut query_args = Vec::with_capacity(insert_properties.len() * 4);
451 let mut insert_stmt = insert.clone();
452 for (index, (key, value)) in insert_properties.iter().enumerate() {
453 query_args.extend_from_slice(&[
454 Some(self.name.as_str()),
455 Some(namespace_str.as_str()),
456 Some(key.as_str()),
457 Some(value.as_str()),
458 ]);
459 if index > 0 {
460 insert_stmt.push_str(", (?, ?, ?, ?)");
461 }
462 }
463
464 self.execute(&insert_stmt, query_args, None).await?;
465
466 Ok(Namespace::with_properties(
467 namespace.clone(),
468 insert_properties,
469 ))
470 } else {
471 self.execute(
473 &insert,
474 vec![
475 Some(&self.name),
476 Some(&namespace_str),
477 Some("exists"),
478 Some("true"),
479 ],
480 None,
481 )
482 .await?;
483 Ok(Namespace::with_properties(namespace.clone(), properties))
484 }
485 }
486
487 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
488 let exists = self.namespace_exists(namespace).await?;
489 if exists {
490 let namespace_props = self
491 .fetch_rows(
492 &format!(
493 "SELECT
494 {NAMESPACE_FIELD_NAME},
495 {NAMESPACE_FIELD_PROPERTY_KEY},
496 {NAMESPACE_FIELD_PROPERTY_VALUE}
497 FROM {NAMESPACE_TABLE_NAME}
498 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
499 AND {NAMESPACE_FIELD_NAME} = ?"
500 ),
501 vec![Some(&self.name), Some(&namespace.join("."))],
502 )
503 .await?;
504
505 let mut properties = HashMap::with_capacity(namespace_props.len());
506
507 for row in namespace_props {
508 let key = row
509 .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_KEY)
510 .map_err(from_sqlx_error)?;
511 let value = row
512 .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_VALUE)
513 .map_err(from_sqlx_error)?;
514
515 properties.insert(key, value);
516 }
517
518 Ok(Namespace::with_properties(namespace.clone(), properties))
519 } else {
520 no_such_namespace_err(namespace)
521 }
522 }
523
524 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
525 let namespace_str = namespace.join(".");
526
527 let table_namespaces = self
528 .fetch_rows(
529 &format!(
530 "SELECT 1 FROM {CATALOG_TABLE_NAME}
531 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
532 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
533 LIMIT 1"
534 ),
535 vec![Some(&self.name), Some(&namespace_str)],
536 )
537 .await?;
538
539 if !table_namespaces.is_empty() {
540 Ok(true)
541 } else {
542 let namespaces = self
543 .fetch_rows(
544 &format!(
545 "SELECT 1 FROM {NAMESPACE_TABLE_NAME}
546 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
547 AND {NAMESPACE_FIELD_NAME} = ?
548 LIMIT 1"
549 ),
550 vec![Some(&self.name), Some(&namespace_str)],
551 )
552 .await?;
553 if !namespaces.is_empty() {
554 Ok(true)
555 } else {
556 Ok(false)
557 }
558 }
559 }
560
561 async fn update_namespace(
562 &self,
563 namespace: &NamespaceIdent,
564 properties: HashMap<String, String>,
565 ) -> Result<()> {
566 let exists = self.namespace_exists(namespace).await?;
567 if exists {
568 let existing_properties = self.get_namespace(namespace).await?.properties().clone();
569 let namespace_str = namespace.join(".");
570
571 let mut updates = vec![];
572 let mut inserts = vec![];
573
574 for (key, value) in properties.iter() {
575 if existing_properties.contains_key(key) {
576 if existing_properties.get(key) != Some(value) {
577 updates.push((key, value));
578 }
579 } else {
580 inserts.push((key, value));
581 }
582 }
583
584 let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
585 let update_stmt = format!(
586 "UPDATE {NAMESPACE_TABLE_NAME} SET {NAMESPACE_FIELD_PROPERTY_VALUE} = ?
587 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
588 AND {NAMESPACE_FIELD_NAME} = ?
589 AND {NAMESPACE_FIELD_PROPERTY_KEY} = ?"
590 );
591
592 let insert_stmt = format!(
593 "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
594 VALUES (?, ?, ?, ?)"
595 );
596
597 for (key, value) in updates {
598 self.execute(
599 &update_stmt,
600 vec![
601 Some(value),
602 Some(&self.name),
603 Some(&namespace_str),
604 Some(key),
605 ],
606 Some(&mut tx),
607 )
608 .await?;
609 }
610
611 for (key, value) in inserts {
612 self.execute(
613 &insert_stmt,
614 vec![
615 Some(&self.name),
616 Some(&namespace_str),
617 Some(key),
618 Some(value),
619 ],
620 Some(&mut tx),
621 )
622 .await?;
623 }
624
625 let _ = tx.commit().await.map_err(from_sqlx_error)?;
626
627 Ok(())
628 } else {
629 no_such_namespace_err(namespace)
630 }
631 }
632
633 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
634 let exists = self.namespace_exists(namespace).await?;
635 if exists {
636 let tables = self.list_tables(namespace).await?;
638 if !tables.is_empty() {
639 return Err(Error::new(
640 iceberg::ErrorKind::Unexpected,
641 format!(
642 "Namespace {:?} is not empty. {} tables exist.",
643 namespace,
644 tables.len()
645 ),
646 ));
647 }
648
649 self.execute(
650 &format!(
651 "DELETE FROM {NAMESPACE_TABLE_NAME}
652 WHERE {NAMESPACE_FIELD_NAME} = ?
653 AND {CATALOG_FIELD_CATALOG_NAME} = ?"
654 ),
655 vec![Some(&namespace.join(".")), Some(&self.name)],
656 None,
657 )
658 .await?;
659
660 Ok(())
661 } else {
662 no_such_namespace_err(namespace)
663 }
664 }
665
666 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
667 let exists = self.namespace_exists(namespace).await?;
668 if exists {
669 let rows = self
670 .fetch_rows(
671 &format!(
672 "SELECT {CATALOG_FIELD_TABLE_NAME},
673 {CATALOG_FIELD_TABLE_NAMESPACE}
674 FROM {CATALOG_TABLE_NAME}
675 WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
676 AND {CATALOG_FIELD_CATALOG_NAME} = ?
677 AND (
678 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
679 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
680 )",
681 ),
682 vec![Some(&namespace.join(".")), Some(&self.name)],
683 )
684 .await?;
685
686 let mut tables = HashSet::<TableIdent>::with_capacity(rows.len());
687
688 for row in rows.iter() {
689 let tbl = row
690 .try_get::<String, _>(CATALOG_FIELD_TABLE_NAME)
691 .map_err(from_sqlx_error)?;
692 let ns_strs = row
693 .try_get::<String, _>(CATALOG_FIELD_TABLE_NAMESPACE)
694 .map_err(from_sqlx_error)?;
695 let ns = NamespaceIdent::from_strs(ns_strs.split("."))?;
696 tables.insert(TableIdent::new(ns, tbl));
697 }
698
699 Ok(tables.into_iter().collect::<Vec<TableIdent>>())
700 } else {
701 no_such_namespace_err(namespace)
702 }
703 }
704
705 async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
706 let namespace = identifier.namespace().join(".");
707 let table_name = identifier.name();
708 let table_counts = self
709 .fetch_rows(
710 &format!(
711 "SELECT 1
712 FROM {CATALOG_TABLE_NAME}
713 WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
714 AND {CATALOG_FIELD_CATALOG_NAME} = ?
715 AND {CATALOG_FIELD_TABLE_NAME} = ?
716 AND (
717 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
718 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
719 )"
720 ),
721 vec![Some(&namespace), Some(&self.name), Some(table_name)],
722 )
723 .await?;
724
725 if !table_counts.is_empty() {
726 Ok(true)
727 } else {
728 Ok(false)
729 }
730 }
731
732 async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
733 if !self.table_exists(identifier).await? {
734 return no_such_table_err(identifier);
735 }
736
737 self.execute(
738 &format!(
739 "DELETE FROM {CATALOG_TABLE_NAME}
740 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
741 AND {CATALOG_FIELD_TABLE_NAME} = ?
742 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
743 AND (
744 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
745 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
746 )"
747 ),
748 vec![
749 Some(&self.name),
750 Some(identifier.name()),
751 Some(&identifier.namespace().join(".")),
752 ],
753 None,
754 )
755 .await?;
756
757 Ok(())
758 }
759
760 async fn purge_table(&self, table: &TableIdent) -> Result<()> {
761 let table_info = self.load_table(table).await?;
762 self.drop_table(table).await?;
763 iceberg::drop_table_data(
764 table_info.file_io(),
765 table_info.metadata(),
766 table_info.metadata_location(),
767 )
768 .await
769 }
770
771 async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
772 if !self.table_exists(identifier).await? {
773 return no_such_table_err(identifier);
774 }
775
776 let rows = self
777 .fetch_rows(
778 &format!(
779 "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP}
780 FROM {CATALOG_TABLE_NAME}
781 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
782 AND {CATALOG_FIELD_TABLE_NAME} = ?
783 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
784 AND (
785 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
786 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
787 )"
788 ),
789 vec![
790 Some(&self.name),
791 Some(identifier.name()),
792 Some(&identifier.namespace().join(".")),
793 ],
794 )
795 .await?;
796
797 if rows.is_empty() {
798 return no_such_table_err(identifier);
799 }
800
801 let row = &rows[0];
802 let tbl_metadata_location = row
803 .try_get::<String, _>(CATALOG_FIELD_METADATA_LOCATION_PROP)
804 .map_err(from_sqlx_error)?;
805
806 let metadata = TableMetadata::read_from(&self.fileio, &tbl_metadata_location).await?;
807
808 Ok(Table::builder()
809 .file_io(self.fileio.clone())
810 .identifier(identifier.clone())
811 .metadata_location(tbl_metadata_location)
812 .metadata(metadata)
813 .build()?)
814 }
815
816 async fn create_table(
817 &self,
818 namespace: &NamespaceIdent,
819 creation: TableCreation,
820 ) -> Result<Table> {
821 if !self.namespace_exists(namespace).await? {
822 return no_such_namespace_err(namespace);
823 }
824
825 let tbl_name = creation.name.clone();
826 let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone());
827
828 if self.table_exists(&tbl_ident).await? {
829 return table_already_exists_err(&tbl_ident);
830 }
831
832 let (tbl_creation, location) = match creation.location.clone() {
833 Some(location) => (creation, location),
834 None => {
835 let nsp_properties = self.get_namespace(namespace).await?.properties().clone();
838 let nsp_location = match nsp_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) {
839 Some(location) => location.clone(),
840 None => {
841 format!(
842 "{}/{}",
843 self.warehouse_location.clone(),
844 namespace.join("/")
845 )
846 }
847 };
848
849 let tbl_location = format!("{}/{}", nsp_location, tbl_ident.name());
850
851 (
852 TableCreation {
853 location: Some(tbl_location.clone()),
854 ..creation
855 },
856 tbl_location,
857 )
858 }
859 };
860
861 let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
862 .build()?
863 .metadata;
864 let tbl_metadata_location =
865 MetadataLocation::new_with_metadata(location.clone(), &tbl_metadata);
866
867 tbl_metadata
868 .write_to(&self.fileio, &tbl_metadata_location)
869 .await?;
870
871 let tbl_metadata_location_str = tbl_metadata_location.to_string();
872 self.execute(&format!(
873 "INSERT INTO {CATALOG_TABLE_NAME}
874 ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
875 VALUES (?, ?, ?, ?, ?)
876 "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location_str), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
877
878 Ok(Table::builder()
879 .file_io(self.fileio.clone())
880 .metadata_location(tbl_metadata_location_str)
881 .identifier(tbl_ident)
882 .metadata(tbl_metadata)
883 .build()?)
884 }
885
886 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
887 if src == dest {
888 return Ok(());
889 }
890
891 if !self.table_exists(src).await? {
892 return no_such_table_err(src);
893 }
894
895 if !self.namespace_exists(dest.namespace()).await? {
896 return no_such_namespace_err(dest.namespace());
897 }
898
899 if self.table_exists(dest).await? {
900 return table_already_exists_err(dest);
901 }
902
903 self.execute(
904 &format!(
905 "UPDATE {CATALOG_TABLE_NAME}
906 SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ?
907 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
908 AND {CATALOG_FIELD_TABLE_NAME} = ?
909 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
910 AND (
911 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
912 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
913 )"
914 ),
915 vec![
916 Some(dest.name()),
917 Some(&dest.namespace().join(".")),
918 Some(&self.name),
919 Some(src.name()),
920 Some(&src.namespace().join(".")),
921 ],
922 None,
923 )
924 .await?;
925
926 Ok(())
927 }
928
929 async fn register_table(
930 &self,
931 table_ident: &TableIdent,
932 metadata_location: String,
933 ) -> Result<Table> {
934 if self.table_exists(table_ident).await? {
935 return table_already_exists_err(table_ident);
936 }
937
938 let metadata = TableMetadata::read_from(&self.fileio, &metadata_location).await?;
939
940 let namespace = table_ident.namespace();
941 let tbl_name = table_ident.name().to_string();
942
943 self.execute(&format!(
944 "INSERT INTO {CATALOG_TABLE_NAME}
945 ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
946 VALUES (?, ?, ?, ?, ?)
947 "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name), Some(&metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
948
949 Ok(Table::builder()
950 .identifier(table_ident.clone())
951 .metadata_location(metadata_location)
952 .metadata(metadata)
953 .file_io(self.fileio.clone())
954 .build()?)
955 }
956
957 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
959 let table_ident = commit.identifier().clone();
960 let current_table = self.load_table(&table_ident).await?;
961 let current_metadata_location = current_table.metadata_location_result()?.to_string();
962
963 let staged_table = commit.apply(current_table)?;
964 let staged_metadata_location_str = staged_table.metadata_location_result()?;
965 let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
966
967 staged_table
968 .metadata()
969 .write_to(staged_table.file_io(), &staged_metadata_location)
970 .await?;
971
972 let staged_metadata_location_str = staged_metadata_location.to_string();
973 let update_result = self
974 .execute(
975 &format!(
976 "UPDATE {CATALOG_TABLE_NAME}
977 SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?, {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} = ?
978 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
979 AND {CATALOG_FIELD_TABLE_NAME} = ?
980 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
981 AND (
982 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
983 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
984 )
985 AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?"
986 ),
987 vec![
988 Some(&staged_metadata_location_str),
989 Some(current_metadata_location.as_str()),
990 Some(&self.name),
991 Some(table_ident.name()),
992 Some(&table_ident.namespace().join(".")),
993 Some(current_metadata_location.as_str()),
994 ],
995 None,
996 )
997 .await?;
998
999 if update_result.rows_affected() == 0 {
1000 return Err(Error::new(
1001 ErrorKind::CatalogCommitConflicts,
1002 format!("Commit conflicted for table: {table_ident}"),
1003 )
1004 .with_retryable(true));
1005 }
1006
1007 Ok(staged_table)
1008 }
1009}
1010
1011#[cfg(test)]
1012mod tests {
1013 use std::collections::{HashMap, HashSet};
1014 use std::hash::Hash;
1015 use std::sync::Arc;
1016
1017 use iceberg::io::LocalFsStorageFactory;
1018 use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
1019 use iceberg::table::Table;
1020 use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent};
1021 use itertools::Itertools;
1022 use regex::Regex;
1023 use sqlx::migrate::MigrateDatabase;
1024 use tempfile::TempDir;
1025
1026 use crate::catalog::{
1027 NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, SQL_CATALOG_PROP_URI,
1028 SQL_CATALOG_PROP_WAREHOUSE,
1029 };
1030 use crate::{SqlBindStyle, SqlCatalogBuilder};
1031
1032 const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
1033
1034 fn temp_path() -> String {
1035 let temp_dir = TempDir::new().unwrap();
1036 temp_dir.path().to_str().unwrap().to_string()
1037 }
1038
1039 fn to_set<T: std::cmp::Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
1040 HashSet::from_iter(vec)
1041 }
1042
1043 fn default_properties() -> HashMap<String, String> {
1044 HashMap::from([("exists".to_string(), "true".to_string())])
1045 }
1046
1047 async fn new_sql_catalog(
1049 warehouse_location: String,
1050 name: Option<impl ToString>,
1051 ) -> impl Catalog {
1052 let name = if let Some(name) = name {
1053 name.to_string()
1054 } else {
1055 "iceberg".to_string()
1056 };
1057 let sql_lite_uri = format!("sqlite:{}", temp_path());
1058 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1059
1060 let props = HashMap::from_iter([
1061 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.to_string()),
1062 (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1063 (
1064 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1065 SqlBindStyle::DollarNumeric.to_string(),
1066 ),
1067 ]);
1068 SqlCatalogBuilder::default()
1069 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1070 .load(&name, props)
1071 .await
1072 .unwrap()
1073 }
1074
1075 async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
1076 let _ = catalog
1077 .create_namespace(namespace_ident, HashMap::new())
1078 .await
1079 .unwrap();
1080 }
1081
1082 async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
1083 for namespace_ident in namespace_idents {
1084 let _ = create_namespace(catalog, namespace_ident).await;
1085 }
1086 }
1087
1088 fn simple_table_schema() -> Schema {
1089 Schema::builder()
1090 .with_fields(vec![
1091 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
1092 ])
1093 .build()
1094 .unwrap()
1095 }
1096
1097 async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
1098 let _ = catalog
1099 .create_table(
1100 &table_ident.namespace,
1101 TableCreation::builder()
1102 .name(table_ident.name().into())
1103 .schema(simple_table_schema())
1104 .location(temp_path())
1105 .build(),
1106 )
1107 .await
1108 .unwrap();
1109 }
1110
1111 async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
1112 for table_ident in table_idents {
1113 create_table(catalog, table_ident).await;
1114 }
1115 }
1116
1117 fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
1118 assert_eq!(table.identifier(), expected_table_ident);
1119
1120 let metadata = table.metadata();
1121
1122 assert_eq!(metadata.current_schema().as_ref(), expected_schema);
1123
1124 let expected_partition_spec = PartitionSpec::builder(expected_schema.clone())
1125 .with_spec_id(0)
1126 .build()
1127 .unwrap();
1128
1129 assert_eq!(
1130 metadata
1131 .partition_specs_iter()
1132 .map(|p| p.as_ref())
1133 .collect_vec(),
1134 vec![&expected_partition_spec]
1135 );
1136
1137 let expected_sorted_order = SortOrder::builder()
1138 .with_order_id(0)
1139 .with_fields(vec![])
1140 .build(expected_schema)
1141 .unwrap();
1142
1143 assert_eq!(
1144 metadata
1145 .sort_orders_iter()
1146 .map(|s| s.as_ref())
1147 .collect_vec(),
1148 vec![&expected_sorted_order]
1149 );
1150
1151 assert_eq!(metadata.properties(), &HashMap::new());
1152
1153 assert!(!table.readonly());
1154 }
1155
1156 fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
1157 let actual = table.metadata_location().unwrap().to_string();
1158 let regex = Regex::new(regex_str).unwrap();
1159 assert!(regex.is_match(&actual))
1160 }
1161
1162 #[tokio::test]
1163 async fn test_initialized() {
1164 let warehouse_loc = temp_path();
1165 new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1166 new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1168 new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1169 }
1170
1171 #[tokio::test]
1172 async fn test_builder_method() {
1173 let sql_lite_uri = format!("sqlite:{}", temp_path());
1174 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1175 let warehouse_location = temp_path();
1176
1177 let catalog = SqlCatalogBuilder::default()
1178 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1179 .uri(sql_lite_uri.to_string())
1180 .warehouse_location(warehouse_location.clone())
1181 .sql_bind_style(SqlBindStyle::QMark)
1182 .load("iceberg", HashMap::default())
1183 .await;
1184 assert!(catalog.is_ok());
1185
1186 let catalog = catalog.unwrap();
1187 assert!(catalog.warehouse_location == warehouse_location);
1188 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1189 }
1190
1191 #[tokio::test]
1194 async fn test_builder_props_non_existent_path_fails() {
1195 let sql_lite_uri = format!("sqlite:{}", temp_path());
1196 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1197 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1198 let warehouse_location = temp_path();
1199
1200 let catalog = SqlCatalogBuilder::default()
1201 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1202 .uri(sql_lite_uri)
1203 .warehouse_location(warehouse_location)
1204 .load(
1205 "iceberg",
1206 HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)]),
1207 )
1208 .await;
1209 assert!(catalog.is_err());
1210 }
1211
1212 #[tokio::test]
1216 async fn test_builder_props_set_valid_uri() {
1217 let sql_lite_uri = format!("sqlite:{}", temp_path());
1218 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1219 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1220 let warehouse_location = temp_path();
1221
1222 let catalog = SqlCatalogBuilder::default()
1223 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1224 .uri(sql_lite_uri2)
1225 .warehouse_location(warehouse_location)
1226 .load(
1227 "iceberg",
1228 HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone())]),
1229 )
1230 .await;
1231 assert!(catalog.is_ok());
1232 }
1233
1234 #[tokio::test]
1236 async fn test_builder_props_take_precedence() {
1237 let sql_lite_uri = format!("sqlite:{}", temp_path());
1238 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1239 let warehouse_location = temp_path();
1240 let warehouse_location2 = temp_path();
1241
1242 let catalog = SqlCatalogBuilder::default()
1243 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1244 .warehouse_location(warehouse_location2)
1245 .sql_bind_style(SqlBindStyle::DollarNumeric)
1246 .load(
1247 "iceberg",
1248 HashMap::from_iter([
1249 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1250 (
1251 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1252 warehouse_location.clone(),
1253 ),
1254 (
1255 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1256 SqlBindStyle::QMark.to_string(),
1257 ),
1258 ]),
1259 )
1260 .await;
1261
1262 assert!(catalog.is_ok());
1263
1264 let catalog = catalog.unwrap();
1265 assert!(catalog.warehouse_location == warehouse_location);
1266 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1267 }
1268
1269 #[tokio::test]
1271 async fn test_builder_props_take_precedence_props() {
1272 let sql_lite_uri = format!("sqlite:{}", temp_path());
1273 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1274 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1275 let warehouse_location = temp_path();
1276 let warehouse_location2 = temp_path();
1277
1278 let props = HashMap::from_iter([
1279 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1280 (
1281 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1282 warehouse_location.clone(),
1283 ),
1284 (
1285 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1286 SqlBindStyle::QMark.to_string(),
1287 ),
1288 ]);
1289 let props2 = HashMap::from_iter([
1290 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2.clone()),
1291 (
1292 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1293 warehouse_location2.clone(),
1294 ),
1295 (
1296 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1297 SqlBindStyle::DollarNumeric.to_string(),
1298 ),
1299 ]);
1300
1301 let catalog = SqlCatalogBuilder::default()
1302 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1303 .props(props2)
1304 .load("iceberg", props)
1305 .await;
1306
1307 assert!(catalog.is_ok());
1308
1309 let catalog = catalog.unwrap();
1310 assert!(catalog.warehouse_location == warehouse_location);
1311 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1312 }
1313
1314 #[tokio::test]
1316 async fn test_builder_props_take_precedence_prop() {
1317 let sql_lite_uri = format!("sqlite:{}", temp_path());
1318 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1319 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1320 let warehouse_location = temp_path();
1321 let warehouse_location2 = temp_path();
1322
1323 let props = HashMap::from_iter([
1324 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1325 (
1326 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1327 warehouse_location.clone(),
1328 ),
1329 (
1330 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1331 SqlBindStyle::QMark.to_string(),
1332 ),
1333 ]);
1334
1335 let catalog = SqlCatalogBuilder::default()
1336 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1337 .prop(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)
1338 .prop(SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location2)
1339 .prop(
1340 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1341 SqlBindStyle::DollarNumeric.to_string(),
1342 )
1343 .load("iceberg", props)
1344 .await;
1345
1346 assert!(catalog.is_ok());
1347
1348 let catalog = catalog.unwrap();
1349 assert!(catalog.warehouse_location == warehouse_location);
1350 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1351 }
1352
1353 #[tokio::test]
1355 async fn test_builder_props_invalid_bind_style_fails() {
1356 let sql_lite_uri = format!("sqlite:{}", temp_path());
1357 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1358 let warehouse_location = temp_path();
1359
1360 let catalog = SqlCatalogBuilder::default()
1361 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1362 .load(
1363 "iceberg",
1364 HashMap::from_iter([
1365 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1366 (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1367 (SQL_CATALOG_PROP_BIND_STYLE.to_string(), "AAA".to_string()),
1368 ]),
1369 )
1370 .await;
1371
1372 assert!(catalog.is_err());
1373 }
1374
1375 #[tokio::test]
1376 async fn test_list_namespaces_returns_empty_vector() {
1377 let warehouse_loc = temp_path();
1378 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1379
1380 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
1381 }
1382
1383 #[tokio::test]
1384 async fn test_list_namespaces_returns_empty_different_name() {
1385 let warehouse_loc = temp_path();
1386 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1387 let namespace_ident_1 = NamespaceIdent::new("a".into());
1388 let namespace_ident_2 = NamespaceIdent::new("b".into());
1389 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1390 assert_eq!(
1391 to_set(catalog.list_namespaces(None).await.unwrap()),
1392 to_set(vec![namespace_ident_1, namespace_ident_2])
1393 );
1394
1395 let catalog2 = new_sql_catalog(warehouse_loc, Some("test")).await;
1396 assert_eq!(catalog2.list_namespaces(None).await.unwrap(), vec![]);
1397 }
1398
1399 #[tokio::test]
1400 async fn test_list_namespaces_returns_only_top_level_namespaces() {
1401 let warehouse_loc = temp_path();
1402 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1403 let namespace_ident_1 = NamespaceIdent::new("a".into());
1404 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1405 let namespace_ident_3 = NamespaceIdent::new("b".into());
1406 create_namespaces(&catalog, &vec![
1407 &namespace_ident_1,
1408 &namespace_ident_2,
1409 &namespace_ident_3,
1410 ])
1411 .await;
1412
1413 assert_eq!(
1414 to_set(catalog.list_namespaces(None).await.unwrap()),
1415 to_set(vec![namespace_ident_1, namespace_ident_3])
1416 );
1417 }
1418
1419 #[tokio::test]
1420 async fn test_list_namespaces_returns_no_namespaces_under_parent() {
1421 let warehouse_loc = temp_path();
1422 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1423 let namespace_ident_1 = NamespaceIdent::new("a".into());
1424 let namespace_ident_2 = NamespaceIdent::new("b".into());
1425 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1426
1427 assert_eq!(
1428 catalog
1429 .list_namespaces(Some(&namespace_ident_1))
1430 .await
1431 .unwrap(),
1432 vec![]
1433 );
1434 }
1435
1436 #[tokio::test]
1437 async fn test_list_namespaces_returns_namespace_under_parent() {
1438 let warehouse_loc = temp_path();
1439 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1440 let namespace_ident_1 = NamespaceIdent::new("a".into());
1441 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1442 let namespace_ident_3 = NamespaceIdent::new("c".into());
1443 create_namespaces(&catalog, &vec![
1444 &namespace_ident_1,
1445 &namespace_ident_2,
1446 &namespace_ident_3,
1447 ])
1448 .await;
1449
1450 assert_eq!(
1451 to_set(catalog.list_namespaces(None).await.unwrap()),
1452 to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
1453 );
1454
1455 assert_eq!(
1456 catalog
1457 .list_namespaces(Some(&namespace_ident_1))
1458 .await
1459 .unwrap(),
1460 vec![NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()]
1461 );
1462 }
1463
1464 #[tokio::test]
1465 async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
1466 let warehouse_loc = temp_path();
1467 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1468 let namespace_ident_1 = NamespaceIdent::new("a".to_string());
1469 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
1470 let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1471 let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
1472 let namespace_ident_5 = NamespaceIdent::new("b".into());
1473 create_namespaces(&catalog, &vec![
1474 &namespace_ident_1,
1475 &namespace_ident_2,
1476 &namespace_ident_3,
1477 &namespace_ident_4,
1478 &namespace_ident_5,
1479 ])
1480 .await;
1481
1482 assert_eq!(
1483 to_set(
1484 catalog
1485 .list_namespaces(Some(&namespace_ident_1))
1486 .await
1487 .unwrap()
1488 ),
1489 to_set(vec![
1490 NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(),
1491 NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(),
1492 NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(),
1493 ])
1494 );
1495 }
1496
1497 #[tokio::test]
1498 async fn test_namespace_exists_returns_false() {
1499 let warehouse_loc = temp_path();
1500 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1501 let namespace_ident = NamespaceIdent::new("a".into());
1502 create_namespace(&catalog, &namespace_ident).await;
1503
1504 assert!(
1505 !catalog
1506 .namespace_exists(&NamespaceIdent::new("b".into()))
1507 .await
1508 .unwrap()
1509 );
1510 }
1511
1512 #[tokio::test]
1513 async fn test_namespace_exists_returns_true() {
1514 let warehouse_loc = temp_path();
1515 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1516 let namespace_ident = NamespaceIdent::new("a".into());
1517 create_namespace(&catalog, &namespace_ident).await;
1518
1519 assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
1520 }
1521
1522 #[tokio::test]
1523 async fn test_create_namespace_with_properties() {
1524 let warehouse_loc = temp_path();
1525 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1526 let namespace_ident = NamespaceIdent::new("abc".into());
1527
1528 let mut properties = default_properties();
1529 properties.insert("k".into(), "v".into());
1530
1531 assert_eq!(
1532 catalog
1533 .create_namespace(&namespace_ident, properties.clone())
1534 .await
1535 .unwrap(),
1536 Namespace::with_properties(namespace_ident.clone(), properties.clone())
1537 );
1538
1539 assert_eq!(
1540 catalog.get_namespace(&namespace_ident).await.unwrap(),
1541 Namespace::with_properties(namespace_ident, properties)
1542 );
1543 }
1544
1545 #[tokio::test]
1546 async fn test_create_nested_namespace() {
1547 let warehouse_loc = temp_path();
1548 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1549 let parent_namespace_ident = NamespaceIdent::new("a".into());
1550 create_namespace(&catalog, &parent_namespace_ident).await;
1551
1552 let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1553
1554 assert_eq!(
1555 catalog
1556 .create_namespace(&child_namespace_ident, HashMap::new())
1557 .await
1558 .unwrap(),
1559 Namespace::new(child_namespace_ident.clone())
1560 );
1561
1562 assert_eq!(
1563 catalog.get_namespace(&child_namespace_ident).await.unwrap(),
1564 Namespace::with_properties(child_namespace_ident, default_properties())
1565 );
1566 }
1567
1568 #[tokio::test]
1569 async fn test_create_deeply_nested_namespace() {
1570 let warehouse_loc = temp_path();
1571 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1572 let namespace_ident_a = NamespaceIdent::new("a".into());
1573 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1574 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1575
1576 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1577
1578 assert_eq!(
1579 catalog
1580 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
1581 .await
1582 .unwrap(),
1583 Namespace::new(namespace_ident_a_b_c.clone())
1584 );
1585
1586 assert_eq!(
1587 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
1588 Namespace::with_properties(namespace_ident_a_b_c, default_properties())
1589 );
1590 }
1591
1592 #[tokio::test]
1593 async fn test_update_namespace_noop() {
1594 let warehouse_loc = temp_path();
1595 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1596 let namespace_ident = NamespaceIdent::new("a".into());
1597 create_namespace(&catalog, &namespace_ident).await;
1598
1599 catalog
1600 .update_namespace(&namespace_ident, HashMap::new())
1601 .await
1602 .unwrap();
1603
1604 assert_eq!(
1605 *catalog
1606 .get_namespace(&namespace_ident)
1607 .await
1608 .unwrap()
1609 .properties(),
1610 HashMap::from_iter([("exists".to_string(), "true".to_string())])
1611 )
1612 }
1613
1614 #[tokio::test]
1615 async fn test_update_nested_namespace() {
1616 let warehouse_loc = temp_path();
1617 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1618 let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1619 create_namespace(&catalog, &namespace_ident).await;
1620
1621 let mut props = HashMap::from_iter([
1622 ("prop1".to_string(), "val1".to_string()),
1623 ("prop2".into(), "val2".into()),
1624 ]);
1625
1626 catalog
1627 .update_namespace(&namespace_ident, props.clone())
1628 .await
1629 .unwrap();
1630
1631 props.insert("exists".into(), "true".into());
1632
1633 assert_eq!(
1634 *catalog
1635 .get_namespace(&namespace_ident)
1636 .await
1637 .unwrap()
1638 .properties(),
1639 props
1640 )
1641 }
1642
1643 #[tokio::test]
1644 async fn test_update_namespace_errors_if_nested_namespace_doesnt_exist() {
1645 let warehouse_loc = temp_path();
1646 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1647 let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1648
1649 let props = HashMap::from_iter([
1650 ("prop1".to_string(), "val1".to_string()),
1651 ("prop2".into(), "val2".into()),
1652 ]);
1653
1654 let err = catalog
1655 .update_namespace(&namespace_ident, props)
1656 .await
1657 .unwrap_err();
1658
1659 assert_eq!(
1660 err.message(),
1661 format!("No such namespace: {namespace_ident:?}")
1662 );
1663 }
1664
1665 #[tokio::test]
1666 async fn test_drop_nested_namespace() {
1667 let warehouse_loc = temp_path();
1668 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1669 let namespace_ident_a = NamespaceIdent::new("a".into());
1670 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1671 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1672
1673 catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1674
1675 assert!(
1676 !catalog
1677 .namespace_exists(&namespace_ident_a_b)
1678 .await
1679 .unwrap()
1680 );
1681
1682 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1683 }
1684
1685 #[tokio::test]
1686 async fn test_drop_deeply_nested_namespace() {
1687 let warehouse_loc = temp_path();
1688 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1689 let namespace_ident_a = NamespaceIdent::new("a".into());
1690 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1691 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1692 create_namespaces(&catalog, &vec![
1693 &namespace_ident_a,
1694 &namespace_ident_a_b,
1695 &namespace_ident_a_b_c,
1696 ])
1697 .await;
1698
1699 catalog
1700 .drop_namespace(&namespace_ident_a_b_c)
1701 .await
1702 .unwrap();
1703
1704 assert!(
1705 !catalog
1706 .namespace_exists(&namespace_ident_a_b_c)
1707 .await
1708 .unwrap()
1709 );
1710
1711 assert!(
1712 catalog
1713 .namespace_exists(&namespace_ident_a_b)
1714 .await
1715 .unwrap()
1716 );
1717
1718 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1719 }
1720
1721 #[tokio::test]
1722 async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1723 let warehouse_loc = temp_path();
1724 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1725 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1726
1727 let non_existent_namespace_ident =
1728 NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1729 assert_eq!(
1730 catalog
1731 .drop_namespace(&non_existent_namespace_ident)
1732 .await
1733 .unwrap_err()
1734 .to_string(),
1735 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1736 )
1737 }
1738
1739 #[tokio::test]
1740 async fn test_dropping_a_namespace_does_not_drop_namespaces_nested_under_that_one() {
1741 let warehouse_loc = temp_path();
1742 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1743 let namespace_ident_a = NamespaceIdent::new("a".into());
1744 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1745 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1746
1747 catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1748
1749 assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1750
1751 assert!(
1752 catalog
1753 .namespace_exists(&namespace_ident_a_b)
1754 .await
1755 .unwrap()
1756 );
1757 }
1758
1759 #[tokio::test]
1760 async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1761 let warehouse_loc = temp_path();
1762 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1763
1764 let namespace_ident = NamespaceIdent::new("a".into());
1765 let mut namespace_properties = HashMap::new();
1766 let namespace_location = temp_path();
1767 namespace_properties.insert(
1768 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1769 namespace_location.to_string(),
1770 );
1771 catalog
1772 .create_namespace(&namespace_ident, namespace_properties)
1773 .await
1774 .unwrap();
1775
1776 let table_name = "tbl1";
1777 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1778 let expected_table_metadata_location_regex =
1779 format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1780
1781 let table = catalog
1782 .create_table(
1783 &namespace_ident,
1784 TableCreation::builder()
1785 .name(table_name.into())
1786 .schema(simple_table_schema())
1787 .build(),
1789 )
1790 .await
1791 .unwrap();
1792 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1793 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1794
1795 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1796 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1797 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1798 }
1799
1800 #[tokio::test]
1801 async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1802 {
1803 let warehouse_loc = temp_path();
1804 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1805
1806 let namespace_ident = NamespaceIdent::new("a".into());
1807 let mut namespace_properties = HashMap::new();
1808 let namespace_location = temp_path();
1809 namespace_properties.insert(
1810 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1811 namespace_location.to_string(),
1812 );
1813 catalog
1814 .create_namespace(&namespace_ident, namespace_properties)
1815 .await
1816 .unwrap();
1817
1818 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1819 let mut nested_namespace_properties = HashMap::new();
1820 let nested_namespace_location = temp_path();
1821 nested_namespace_properties.insert(
1822 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1823 nested_namespace_location.to_string(),
1824 );
1825 catalog
1826 .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1827 .await
1828 .unwrap();
1829
1830 let table_name = "tbl1";
1831 let expected_table_ident =
1832 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1833 let expected_table_metadata_location_regex = format!(
1834 "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1835 );
1836
1837 let table = catalog
1838 .create_table(
1839 &nested_namespace_ident,
1840 TableCreation::builder()
1841 .name(table_name.into())
1842 .schema(simple_table_schema())
1843 .build(),
1845 )
1846 .await
1847 .unwrap();
1848 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1849 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1850
1851 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1852 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1853 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1854 }
1855
1856 #[tokio::test]
1857 async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1858 {
1859 let warehouse_loc = temp_path();
1860 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1861
1862 let namespace_ident = NamespaceIdent::new("a".into());
1863 let namespace_properties = HashMap::new();
1865 catalog
1866 .create_namespace(&namespace_ident, namespace_properties)
1867 .await
1868 .unwrap();
1869
1870 let table_name = "tbl1";
1871 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1872 let expected_table_metadata_location_regex =
1873 format!("^{warehouse_loc}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1874
1875 let table = catalog
1876 .create_table(
1877 &namespace_ident,
1878 TableCreation::builder()
1879 .name(table_name.into())
1880 .schema(simple_table_schema())
1881 .build(),
1883 )
1884 .await
1885 .unwrap();
1886 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1887 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1888
1889 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1890 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1891 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1892 }
1893
1894 #[tokio::test]
1895 async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1896 {
1897 let warehouse_loc = temp_path();
1898 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1899
1900 let namespace_ident = NamespaceIdent::new("a".into());
1901 create_namespace(&catalog, &namespace_ident).await;
1902
1903 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1904 create_namespace(&catalog, &nested_namespace_ident).await;
1905
1906 let table_name = "tbl1";
1907 let expected_table_ident =
1908 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1909 let expected_table_metadata_location_regex =
1910 format!("^{warehouse_loc}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1911
1912 let table = catalog
1913 .create_table(
1914 &nested_namespace_ident,
1915 TableCreation::builder()
1916 .name(table_name.into())
1917 .schema(simple_table_schema())
1918 .build(),
1920 )
1921 .await
1922 .unwrap();
1923 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1924 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1925
1926 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1927 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1928 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1929 }
1930
1931 #[tokio::test]
1932 async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1933 let warehouse_loc = temp_path();
1934 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1935 let namespace_ident = NamespaceIdent::new("a".into());
1936 create_namespace(&catalog, &namespace_ident).await;
1937 let table_name = "tbl1";
1938 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1939 create_table(&catalog, &table_ident).await;
1940
1941 let tmp_dir = TempDir::new().unwrap();
1942 let location = tmp_dir.path().to_str().unwrap().to_string();
1943
1944 assert_eq!(
1945 catalog
1946 .create_table(
1947 &namespace_ident,
1948 TableCreation::builder()
1949 .name(table_name.into())
1950 .schema(simple_table_schema())
1951 .location(location)
1952 .build()
1953 )
1954 .await
1955 .unwrap_err()
1956 .to_string(),
1957 format!(
1958 "TableAlreadyExists => Table {:?} already exists.",
1959 &table_ident
1960 )
1961 );
1962 }
1963
1964 #[tokio::test]
1965 async fn test_rename_table_src_table_is_same_as_dst_table() {
1966 let warehouse_loc = temp_path();
1967 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1968 let namespace_ident = NamespaceIdent::new("n1".into());
1969 create_namespace(&catalog, &namespace_ident).await;
1970 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1971 create_table(&catalog, &table_ident).await;
1972
1973 catalog
1974 .rename_table(&table_ident, &table_ident)
1975 .await
1976 .unwrap();
1977
1978 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1979 table_ident
1980 ],);
1981 }
1982
1983 #[tokio::test]
1984 async fn test_rename_table_across_nested_namespaces() {
1985 let warehouse_loc = temp_path();
1986 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1987 let namespace_ident_a = NamespaceIdent::new("a".into());
1988 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1989 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1990 create_namespaces(&catalog, &vec![
1991 &namespace_ident_a,
1992 &namespace_ident_a_b,
1993 &namespace_ident_a_b_c,
1994 ])
1995 .await;
1996
1997 let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
1998 create_tables(&catalog, vec![&src_table_ident]).await;
1999
2000 let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
2001 catalog
2002 .rename_table(&src_table_ident, &dst_table_ident)
2003 .await
2004 .unwrap();
2005
2006 assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
2007
2008 assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
2009 }
2010
2011 #[tokio::test]
2012 async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
2013 let warehouse_loc = temp_path();
2014 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2015 let src_namespace_ident = NamespaceIdent::new("n1".into());
2016 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
2017 create_namespace(&catalog, &src_namespace_ident).await;
2018 create_table(&catalog, &src_table_ident).await;
2019
2020 let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
2021 let dst_table_ident =
2022 TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
2023 assert_eq!(
2024 catalog
2025 .rename_table(&src_table_ident, &dst_table_ident)
2026 .await
2027 .unwrap_err()
2028 .to_string(),
2029 format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"),
2030 );
2031 }
2032}