1use std::collections::{HashMap, HashSet};
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use async_trait::async_trait;
24use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
25use iceberg::spec::{TableMetadata, TableMetadataBuilder};
26use iceberg::table::Table;
27use iceberg::{
28 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
29 Runtime, TableCommit, TableCreation, TableIdent,
30};
31use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers};
32use sqlx::{Any, AnyPool, Row, Transaction};
33
34use crate::error::{
35 from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err,
36};
37
38pub const SQL_CATALOG_PROP_URI: &str = "uri";
40pub const SQL_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
42pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style";
44
45static CATALOG_TABLE_NAME: &str = "iceberg_tables";
46static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
47static CATALOG_FIELD_TABLE_NAME: &str = "table_name";
48static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace";
49static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location";
50static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
51static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type";
52static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE";
53
54static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties";
55static NAMESPACE_FIELD_NAME: &str = "namespace";
56static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key";
57static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value";
58
59static NAMESPACE_LOCATION_PROPERTY_KEY: &str = "location";
60
61static MAX_CONNECTIONS: u32 = 10; static IDLE_TIMEOUT: u64 = 10; static TEST_BEFORE_ACQUIRE: bool = true; #[derive(Debug)]
67pub struct SqlCatalogBuilder {
68 config: SqlCatalogConfig,
69 storage_factory: Option<Arc<dyn StorageFactory>>,
70 runtime: Option<Runtime>,
71}
72
73impl Default for SqlCatalogBuilder {
74 fn default() -> Self {
75 Self {
76 config: SqlCatalogConfig {
77 uri: "".to_string(),
78 name: "".to_string(),
79 warehouse_location: "".to_string(),
80 sql_bind_style: SqlBindStyle::DollarNumeric,
81 props: HashMap::new(),
82 },
83 storage_factory: None,
84 runtime: None,
85 }
86 }
87}
88
89impl SqlCatalogBuilder {
90 pub fn uri(mut self, uri: impl Into<String>) -> Self {
95 self.config.uri = uri.into();
96 self
97 }
98
99 pub fn warehouse_location(mut self, location: impl Into<String>) -> Self {
104 self.config.warehouse_location = location.into();
105 self
106 }
107
108 pub fn sql_bind_style(mut self, sql_bind_style: SqlBindStyle) -> Self {
113 self.config.sql_bind_style = sql_bind_style;
114 self
115 }
116
117 pub fn props(mut self, props: HashMap<String, String>) -> Self {
122 for (k, v) in props {
123 self.config.props.insert(k, v);
124 }
125 self
126 }
127
128 pub fn prop(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
135 self.config.props.insert(key.into(), value.into());
136 self
137 }
138}
139
140impl CatalogBuilder for SqlCatalogBuilder {
141 type C = SqlCatalog;
142
143 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
144 self.storage_factory = Some(storage_factory);
145 self
146 }
147
148 fn with_runtime(mut self, runtime: Runtime) -> Self {
149 self.runtime = Some(runtime);
150 self
151 }
152
153 fn load(
154 mut self,
155 name: impl Into<String>,
156 props: HashMap<String, String>,
157 ) -> impl Future<Output = Result<Self::C>> + Send {
158 for (k, v) in props {
159 self.config.props.insert(k, v);
160 }
161
162 if let Some(uri) = self.config.props.remove(SQL_CATALOG_PROP_URI) {
163 self.config.uri = uri;
164 }
165 if let Some(warehouse_location) = self.config.props.remove(SQL_CATALOG_PROP_WAREHOUSE) {
166 self.config.warehouse_location = warehouse_location;
167 }
168
169 let name = name.into();
170
171 let mut valid_sql_bind_style = true;
172 if let Some(sql_bind_style) = self.config.props.remove(SQL_CATALOG_PROP_BIND_STYLE) {
173 if let Ok(sql_bind_style) = SqlBindStyle::from_str(&sql_bind_style) {
174 self.config.sql_bind_style = sql_bind_style;
175 } else {
176 valid_sql_bind_style = false;
177 }
178 }
179
180 let valid_name = !name.trim().is_empty();
181
182 async move {
183 if !valid_name {
184 Err(Error::new(
185 ErrorKind::DataInvalid,
186 "Catalog name cannot be empty",
187 ))
188 } else if !valid_sql_bind_style {
189 Err(Error::new(
190 ErrorKind::DataInvalid,
191 format!(
192 "`{}` values are valid only if they're `{}` or `{}`",
193 SQL_CATALOG_PROP_BIND_STYLE,
194 SqlBindStyle::DollarNumeric,
195 SqlBindStyle::QMark
196 ),
197 ))
198 } else {
199 self.config.name = name;
200 let runtime = match self.runtime {
201 Some(rt) => rt,
202 None => Runtime::try_current()?,
203 };
204 SqlCatalog::new(self.config, self.storage_factory, runtime).await
205 }
206 }
207 }
208}
209
210#[derive(Debug)]
219struct SqlCatalogConfig {
220 uri: String,
221 name: String,
222 warehouse_location: String,
223 sql_bind_style: SqlBindStyle,
224 props: HashMap<String, String>,
225}
226
227#[derive(Debug)]
228pub struct SqlCatalog {
230 name: String,
231 connection: AnyPool,
232 warehouse_location: String,
233 fileio: FileIO,
234 sql_bind_style: SqlBindStyle,
235 runtime: Runtime,
236}
237
238#[derive(Debug, PartialEq, strum::EnumString, strum::Display)]
239pub enum SqlBindStyle {
241 DollarNumeric,
243 QMark,
245}
246
247impl SqlCatalog {
248 async fn new(
250 config: SqlCatalogConfig,
251 storage_factory: Option<Arc<dyn StorageFactory>>,
252 runtime: Runtime,
253 ) -> Result<Self> {
254 let factory = storage_factory.ok_or_else(|| {
255 Error::new(
256 ErrorKind::Unexpected,
257 "StorageFactory must be provided for SqlCatalog. Use `with_storage_factory` to configure it.",
258 )
259 })?;
260 let fileio = FileIOBuilder::new(factory)
263 .with_props(config.props.clone())
264 .build();
265
266 install_default_drivers();
267 let max_connections: u32 = config
268 .props
269 .get("pool.max-connections")
270 .map(|v| v.parse().unwrap())
271 .unwrap_or(MAX_CONNECTIONS);
272 let idle_timeout: u64 = config
273 .props
274 .get("pool.idle-timeout")
275 .map(|v| v.parse().unwrap())
276 .unwrap_or(IDLE_TIMEOUT);
277 let test_before_acquire: bool = config
278 .props
279 .get("pool.test-before-acquire")
280 .map(|v| v.parse().unwrap())
281 .unwrap_or(TEST_BEFORE_ACQUIRE);
282
283 let pool = AnyPoolOptions::new()
284 .max_connections(max_connections)
285 .idle_timeout(Duration::from_secs(idle_timeout))
286 .test_before_acquire(test_before_acquire)
287 .connect(&config.uri)
288 .await
289 .map_err(from_sqlx_error)?;
290
291 sqlx::query(&format!(
292 "CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_NAME} (
293 {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
294 {CATALOG_FIELD_TABLE_NAMESPACE} VARCHAR(255) NOT NULL,
295 {CATALOG_FIELD_TABLE_NAME} VARCHAR(255) NOT NULL,
296 {CATALOG_FIELD_METADATA_LOCATION_PROP} VARCHAR(1000),
297 {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} VARCHAR(1000),
298 {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5),
299 PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}))"
300 ))
301 .execute(&pool)
302 .await
303 .map_err(from_sqlx_error)?;
304
305 sqlx::query(&format!(
306 "CREATE TABLE IF NOT EXISTS {NAMESPACE_TABLE_NAME} (
307 {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
308 {NAMESPACE_FIELD_NAME} VARCHAR(255) NOT NULL,
309 {NAMESPACE_FIELD_PROPERTY_KEY} VARCHAR(255),
310 {NAMESPACE_FIELD_PROPERTY_VALUE} VARCHAR(1000),
311 PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}))"
312 ))
313 .execute(&pool)
314 .await
315 .map_err(from_sqlx_error)?;
316
317 Ok(SqlCatalog {
318 name: config.name.to_owned(),
319 connection: pool,
320 warehouse_location: config.warehouse_location,
321 fileio,
322 sql_bind_style: config.sql_bind_style,
323 runtime,
324 })
325 }
326
327 fn replace_placeholders(&self, query: &str) -> String {
329 match self.sql_bind_style {
330 SqlBindStyle::DollarNumeric => {
331 let mut count = 1;
332 query
333 .chars()
334 .fold(String::with_capacity(query.len()), |mut acc, c| {
335 if c == '?' {
336 acc.push('$');
337 acc.push_str(&count.to_string());
338 count += 1;
339 } else {
340 acc.push(c);
341 }
342 acc
343 })
344 }
345 _ => query.to_owned(),
346 }
347 }
348
349 async fn fetch_rows(&self, query: &str, args: Vec<Option<&str>>) -> Result<Vec<AnyRow>> {
351 let query_with_placeholders = self.replace_placeholders(query);
352
353 let mut sqlx_query = sqlx::query(&query_with_placeholders);
354 for arg in args {
355 sqlx_query = sqlx_query.bind(arg);
356 }
357
358 sqlx_query
359 .fetch_all(&self.connection)
360 .await
361 .map_err(from_sqlx_error)
362 }
363
364 async fn execute(
366 &self,
367 query: &str,
368 args: Vec<Option<&str>>,
369 transaction: Option<&mut Transaction<'_, Any>>,
370 ) -> Result<AnyQueryResult> {
371 let query_with_placeholders = self.replace_placeholders(query);
372
373 let mut sqlx_query = sqlx::query(&query_with_placeholders);
374 for arg in args {
375 sqlx_query = sqlx_query.bind(arg);
376 }
377
378 match transaction {
379 Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error),
380 None => {
381 let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
382 let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error);
383 let _ = tx.commit().await.map_err(from_sqlx_error);
384 result
385 }
386 }
387 }
388}
389
390#[async_trait]
391impl Catalog for SqlCatalog {
392 async fn list_namespaces(
393 &self,
394 parent: Option<&NamespaceIdent>,
395 ) -> Result<Vec<NamespaceIdent>> {
396 let all_namespaces_stmt = format!(
398 "SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
399 FROM {CATALOG_TABLE_NAME}
400 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
401 UNION
402 SELECT {NAMESPACE_FIELD_NAME}
403 FROM {NAMESPACE_TABLE_NAME}
404 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
405 );
406
407 let namespace_rows = self
408 .fetch_rows(&all_namespaces_stmt, vec![
409 Some(&self.name),
410 Some(&self.name),
411 ])
412 .await?;
413
414 let mut namespaces = HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len());
415
416 if let Some(parent) = parent {
417 if self.namespace_exists(parent).await? {
418 let parent_str = parent.join(".");
419
420 for row in namespace_rows.iter() {
421 let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
422 if nsp != parent_str && nsp.starts_with(&parent_str) {
424 namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?);
425 }
426 }
427
428 Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
429 } else {
430 no_such_namespace_err(parent)
431 }
432 } else {
433 for row in namespace_rows.iter() {
434 let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
435 let mut levels = nsp.split(".").collect::<Vec<&str>>();
436 if !levels.is_empty() {
437 let first_level = levels.drain(..1).collect::<Vec<&str>>();
438 namespaces.insert(NamespaceIdent::from_strs(first_level)?);
439 }
440 }
441
442 Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
443 }
444 }
445
446 async fn create_namespace(
447 &self,
448 namespace: &NamespaceIdent,
449 properties: HashMap<String, String>,
450 ) -> Result<Namespace> {
451 let exists = self.namespace_exists(namespace).await?;
452
453 if exists {
454 return Err(Error::new(
455 iceberg::ErrorKind::NamespaceAlreadyExists,
456 format!("Namespace {namespace:?} already exists"),
457 ));
458 }
459
460 let namespace_str = namespace.join(".");
461 let insert = format!(
462 "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
463 VALUES (?, ?, ?, ?)");
464 if !properties.is_empty() {
465 let mut insert_properties = properties.clone();
466 insert_properties.insert("exists".to_string(), "true".to_string());
467
468 let mut query_args = Vec::with_capacity(insert_properties.len() * 4);
469 let mut insert_stmt = insert.clone();
470 for (index, (key, value)) in insert_properties.iter().enumerate() {
471 query_args.extend_from_slice(&[
472 Some(self.name.as_str()),
473 Some(namespace_str.as_str()),
474 Some(key.as_str()),
475 Some(value.as_str()),
476 ]);
477 if index > 0 {
478 insert_stmt.push_str(", (?, ?, ?, ?)");
479 }
480 }
481
482 self.execute(&insert_stmt, query_args, None).await?;
483
484 Ok(Namespace::with_properties(
485 namespace.clone(),
486 insert_properties,
487 ))
488 } else {
489 self.execute(
491 &insert,
492 vec![
493 Some(&self.name),
494 Some(&namespace_str),
495 Some("exists"),
496 Some("true"),
497 ],
498 None,
499 )
500 .await?;
501 Ok(Namespace::with_properties(namespace.clone(), properties))
502 }
503 }
504
505 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
506 let exists = self.namespace_exists(namespace).await?;
507 if exists {
508 let namespace_props = self
509 .fetch_rows(
510 &format!(
511 "SELECT
512 {NAMESPACE_FIELD_NAME},
513 {NAMESPACE_FIELD_PROPERTY_KEY},
514 {NAMESPACE_FIELD_PROPERTY_VALUE}
515 FROM {NAMESPACE_TABLE_NAME}
516 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
517 AND {NAMESPACE_FIELD_NAME} = ?"
518 ),
519 vec![Some(&self.name), Some(&namespace.join("."))],
520 )
521 .await?;
522
523 let mut properties = HashMap::with_capacity(namespace_props.len());
524
525 for row in namespace_props {
526 let key = row
527 .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_KEY)
528 .map_err(from_sqlx_error)?;
529 let value = row
530 .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_VALUE)
531 .map_err(from_sqlx_error)?;
532
533 properties.insert(key, value);
534 }
535
536 Ok(Namespace::with_properties(namespace.clone(), properties))
537 } else {
538 no_such_namespace_err(namespace)
539 }
540 }
541
542 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
543 let namespace_str = namespace.join(".");
544
545 let table_namespaces = self
546 .fetch_rows(
547 &format!(
548 "SELECT 1 FROM {CATALOG_TABLE_NAME}
549 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
550 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
551 LIMIT 1"
552 ),
553 vec![Some(&self.name), Some(&namespace_str)],
554 )
555 .await?;
556
557 if !table_namespaces.is_empty() {
558 Ok(true)
559 } else {
560 let namespaces = self
561 .fetch_rows(
562 &format!(
563 "SELECT 1 FROM {NAMESPACE_TABLE_NAME}
564 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
565 AND {NAMESPACE_FIELD_NAME} = ?
566 LIMIT 1"
567 ),
568 vec![Some(&self.name), Some(&namespace_str)],
569 )
570 .await?;
571 if !namespaces.is_empty() {
572 Ok(true)
573 } else {
574 Ok(false)
575 }
576 }
577 }
578
579 async fn update_namespace(
580 &self,
581 namespace: &NamespaceIdent,
582 properties: HashMap<String, String>,
583 ) -> Result<()> {
584 let exists = self.namespace_exists(namespace).await?;
585 if exists {
586 let existing_properties = self.get_namespace(namespace).await?.properties().clone();
587 let namespace_str = namespace.join(".");
588
589 let mut updates = vec![];
590 let mut inserts = vec![];
591
592 for (key, value) in properties.iter() {
593 if existing_properties.contains_key(key) {
594 if existing_properties.get(key) != Some(value) {
595 updates.push((key, value));
596 }
597 } else {
598 inserts.push((key, value));
599 }
600 }
601
602 let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
603 let update_stmt = format!(
604 "UPDATE {NAMESPACE_TABLE_NAME} SET {NAMESPACE_FIELD_PROPERTY_VALUE} = ?
605 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
606 AND {NAMESPACE_FIELD_NAME} = ?
607 AND {NAMESPACE_FIELD_PROPERTY_KEY} = ?"
608 );
609
610 let insert_stmt = format!(
611 "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
612 VALUES (?, ?, ?, ?)"
613 );
614
615 for (key, value) in updates {
616 self.execute(
617 &update_stmt,
618 vec![
619 Some(value),
620 Some(&self.name),
621 Some(&namespace_str),
622 Some(key),
623 ],
624 Some(&mut tx),
625 )
626 .await?;
627 }
628
629 for (key, value) in inserts {
630 self.execute(
631 &insert_stmt,
632 vec![
633 Some(&self.name),
634 Some(&namespace_str),
635 Some(key),
636 Some(value),
637 ],
638 Some(&mut tx),
639 )
640 .await?;
641 }
642
643 let _ = tx.commit().await.map_err(from_sqlx_error)?;
644
645 Ok(())
646 } else {
647 no_such_namespace_err(namespace)
648 }
649 }
650
651 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
652 let exists = self.namespace_exists(namespace).await?;
653 if exists {
654 let tables = self.list_tables(namespace).await?;
656 if !tables.is_empty() {
657 return Err(Error::new(
658 iceberg::ErrorKind::Unexpected,
659 format!(
660 "Namespace {:?} is not empty. {} tables exist.",
661 namespace,
662 tables.len()
663 ),
664 ));
665 }
666
667 self.execute(
668 &format!(
669 "DELETE FROM {NAMESPACE_TABLE_NAME}
670 WHERE {NAMESPACE_FIELD_NAME} = ?
671 AND {CATALOG_FIELD_CATALOG_NAME} = ?"
672 ),
673 vec![Some(&namespace.join(".")), Some(&self.name)],
674 None,
675 )
676 .await?;
677
678 Ok(())
679 } else {
680 no_such_namespace_err(namespace)
681 }
682 }
683
684 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
685 let exists = self.namespace_exists(namespace).await?;
686 if exists {
687 let rows = self
688 .fetch_rows(
689 &format!(
690 "SELECT {CATALOG_FIELD_TABLE_NAME},
691 {CATALOG_FIELD_TABLE_NAMESPACE}
692 FROM {CATALOG_TABLE_NAME}
693 WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
694 AND {CATALOG_FIELD_CATALOG_NAME} = ?
695 AND (
696 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
697 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
698 )",
699 ),
700 vec![Some(&namespace.join(".")), Some(&self.name)],
701 )
702 .await?;
703
704 let mut tables = HashSet::<TableIdent>::with_capacity(rows.len());
705
706 for row in rows.iter() {
707 let tbl = row
708 .try_get::<String, _>(CATALOG_FIELD_TABLE_NAME)
709 .map_err(from_sqlx_error)?;
710 let ns_strs = row
711 .try_get::<String, _>(CATALOG_FIELD_TABLE_NAMESPACE)
712 .map_err(from_sqlx_error)?;
713 let ns = NamespaceIdent::from_strs(ns_strs.split("."))?;
714 tables.insert(TableIdent::new(ns, tbl));
715 }
716
717 Ok(tables.into_iter().collect::<Vec<TableIdent>>())
718 } else {
719 no_such_namespace_err(namespace)
720 }
721 }
722
723 async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
724 let namespace = identifier.namespace().join(".");
725 let table_name = identifier.name();
726 let table_counts = self
727 .fetch_rows(
728 &format!(
729 "SELECT 1
730 FROM {CATALOG_TABLE_NAME}
731 WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
732 AND {CATALOG_FIELD_CATALOG_NAME} = ?
733 AND {CATALOG_FIELD_TABLE_NAME} = ?
734 AND (
735 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
736 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
737 )"
738 ),
739 vec![Some(&namespace), Some(&self.name), Some(table_name)],
740 )
741 .await?;
742
743 if !table_counts.is_empty() {
744 Ok(true)
745 } else {
746 Ok(false)
747 }
748 }
749
750 async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
751 if !self.table_exists(identifier).await? {
752 return no_such_table_err(identifier);
753 }
754
755 self.execute(
756 &format!(
757 "DELETE FROM {CATALOG_TABLE_NAME}
758 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
759 AND {CATALOG_FIELD_TABLE_NAME} = ?
760 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
761 AND (
762 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
763 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
764 )"
765 ),
766 vec![
767 Some(&self.name),
768 Some(identifier.name()),
769 Some(&identifier.namespace().join(".")),
770 ],
771 None,
772 )
773 .await?;
774
775 Ok(())
776 }
777
778 async fn purge_table(&self, table: &TableIdent) -> Result<()> {
779 let table_info = self.load_table(table).await?;
780 self.drop_table(table).await?;
781 iceberg::drop_table_data(&table_info).await
782 }
783
784 async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
785 if !self.table_exists(identifier).await? {
786 return no_such_table_err(identifier);
787 }
788
789 let rows = self
790 .fetch_rows(
791 &format!(
792 "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP}
793 FROM {CATALOG_TABLE_NAME}
794 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
795 AND {CATALOG_FIELD_TABLE_NAME} = ?
796 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
797 AND (
798 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
799 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
800 )"
801 ),
802 vec![
803 Some(&self.name),
804 Some(identifier.name()),
805 Some(&identifier.namespace().join(".")),
806 ],
807 )
808 .await?;
809
810 if rows.is_empty() {
811 return no_such_table_err(identifier);
812 }
813
814 let row = &rows[0];
815 let tbl_metadata_location = row
816 .try_get::<String, _>(CATALOG_FIELD_METADATA_LOCATION_PROP)
817 .map_err(from_sqlx_error)?;
818
819 let metadata = TableMetadata::read_from(&self.fileio, &tbl_metadata_location).await?;
820
821 Ok(Table::builder()
822 .file_io(self.fileio.clone())
823 .identifier(identifier.clone())
824 .metadata_location(tbl_metadata_location)
825 .metadata(metadata)
826 .runtime(self.runtime.clone())
827 .build()?)
828 }
829
830 async fn create_table(
831 &self,
832 namespace: &NamespaceIdent,
833 creation: TableCreation,
834 ) -> Result<Table> {
835 if !self.namespace_exists(namespace).await? {
836 return no_such_namespace_err(namespace);
837 }
838
839 let tbl_name = creation.name.clone();
840 let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone());
841
842 if self.table_exists(&tbl_ident).await? {
843 return table_already_exists_err(&tbl_ident);
844 }
845
846 let (tbl_creation, location) = match creation.location.clone() {
847 Some(location) => (creation, location),
848 None => {
849 let nsp_properties = self.get_namespace(namespace).await?.properties().clone();
852 let nsp_location = match nsp_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) {
853 Some(location) => location.clone(),
854 None => {
855 format!(
856 "{}/{}",
857 self.warehouse_location.clone(),
858 namespace.join("/")
859 )
860 }
861 };
862
863 let tbl_location = format!("{}/{}", nsp_location, tbl_ident.name());
864
865 (
866 TableCreation {
867 location: Some(tbl_location.clone()),
868 ..creation
869 },
870 tbl_location,
871 )
872 }
873 };
874
875 let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
876 .build()?
877 .metadata;
878 let tbl_metadata_location =
879 MetadataLocation::new_with_metadata(location.clone(), &tbl_metadata);
880
881 tbl_metadata
882 .write_to(&self.fileio, &tbl_metadata_location)
883 .await?;
884
885 let tbl_metadata_location_str = tbl_metadata_location.to_string();
886 self.execute(&format!(
887 "INSERT INTO {CATALOG_TABLE_NAME}
888 ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
889 VALUES (?, ?, ?, ?, ?)
890 "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location_str), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
891
892 Ok(Table::builder()
893 .file_io(self.fileio.clone())
894 .metadata_location(tbl_metadata_location_str)
895 .identifier(tbl_ident)
896 .metadata(tbl_metadata)
897 .runtime(self.runtime.clone())
898 .build()?)
899 }
900
901 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
902 if src == dest {
903 return Ok(());
904 }
905
906 if !self.table_exists(src).await? {
907 return no_such_table_err(src);
908 }
909
910 if !self.namespace_exists(dest.namespace()).await? {
911 return no_such_namespace_err(dest.namespace());
912 }
913
914 if self.table_exists(dest).await? {
915 return table_already_exists_err(dest);
916 }
917
918 self.execute(
919 &format!(
920 "UPDATE {CATALOG_TABLE_NAME}
921 SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ?
922 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
923 AND {CATALOG_FIELD_TABLE_NAME} = ?
924 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
925 AND (
926 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
927 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
928 )"
929 ),
930 vec![
931 Some(dest.name()),
932 Some(&dest.namespace().join(".")),
933 Some(&self.name),
934 Some(src.name()),
935 Some(&src.namespace().join(".")),
936 ],
937 None,
938 )
939 .await?;
940
941 Ok(())
942 }
943
944 async fn register_table(
945 &self,
946 table_ident: &TableIdent,
947 metadata_location: String,
948 ) -> Result<Table> {
949 if self.table_exists(table_ident).await? {
950 return table_already_exists_err(table_ident);
951 }
952
953 let metadata = TableMetadata::read_from(&self.fileio, &metadata_location).await?;
954
955 let namespace = table_ident.namespace();
956 let tbl_name = table_ident.name().to_string();
957
958 self.execute(&format!(
959 "INSERT INTO {CATALOG_TABLE_NAME}
960 ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
961 VALUES (?, ?, ?, ?, ?)
962 "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name), Some(&metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
963
964 Ok(Table::builder()
965 .identifier(table_ident.clone())
966 .metadata_location(metadata_location)
967 .metadata(metadata)
968 .file_io(self.fileio.clone())
969 .runtime(self.runtime.clone())
970 .build()?)
971 }
972
973 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
975 let table_ident = commit.identifier().clone();
976 let current_table = self.load_table(&table_ident).await?;
977 let current_metadata_location = current_table.metadata_location_result()?.to_string();
978
979 let staged_table = commit.apply(current_table)?;
980 let staged_metadata_location_str = staged_table.metadata_location_result()?;
981 let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
982
983 staged_table
984 .metadata()
985 .write_to(staged_table.file_io(), &staged_metadata_location)
986 .await?;
987
988 let staged_metadata_location_str = staged_metadata_location.to_string();
989 let update_result = self
990 .execute(
991 &format!(
992 "UPDATE {CATALOG_TABLE_NAME}
993 SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?, {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} = ?
994 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
995 AND {CATALOG_FIELD_TABLE_NAME} = ?
996 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
997 AND (
998 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
999 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
1000 )
1001 AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?"
1002 ),
1003 vec![
1004 Some(&staged_metadata_location_str),
1005 Some(current_metadata_location.as_str()),
1006 Some(&self.name),
1007 Some(table_ident.name()),
1008 Some(&table_ident.namespace().join(".")),
1009 Some(current_metadata_location.as_str()),
1010 ],
1011 None,
1012 )
1013 .await?;
1014
1015 if update_result.rows_affected() == 0 {
1016 return Err(Error::new(
1017 ErrorKind::CatalogCommitConflicts,
1018 format!("Commit conflicted for table: {table_ident}"),
1019 )
1020 .with_retryable(true));
1021 }
1022
1023 Ok(staged_table)
1024 }
1025}
1026
1027#[cfg(test)]
1028mod tests {
1029 use std::collections::{HashMap, HashSet};
1030 use std::hash::Hash;
1031 use std::sync::Arc;
1032
1033 use iceberg::io::LocalFsStorageFactory;
1034 use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
1035 use iceberg::table::Table;
1036 use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent};
1037 use itertools::Itertools;
1038 use regex::Regex;
1039 use sqlx::migrate::MigrateDatabase;
1040 use tempfile::TempDir;
1041
1042 use crate::catalog::{
1043 NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, SQL_CATALOG_PROP_URI,
1044 SQL_CATALOG_PROP_WAREHOUSE,
1045 };
1046 use crate::{SqlBindStyle, SqlCatalogBuilder};
1047
1048 const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
1049
1050 fn temp_path() -> String {
1051 let temp_dir = TempDir::new().unwrap();
1052 temp_dir.path().to_str().unwrap().to_string()
1053 }
1054
1055 fn to_set<T: std::cmp::Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
1056 HashSet::from_iter(vec)
1057 }
1058
1059 fn default_properties() -> HashMap<String, String> {
1060 HashMap::from([("exists".to_string(), "true".to_string())])
1061 }
1062
1063 async fn new_sql_catalog(
1065 warehouse_location: String,
1066 name: Option<impl ToString>,
1067 ) -> impl Catalog {
1068 let name = if let Some(name) = name {
1069 name.to_string()
1070 } else {
1071 "iceberg".to_string()
1072 };
1073 let sql_lite_uri = format!("sqlite:{}", temp_path());
1074 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1075
1076 let props = HashMap::from_iter([
1077 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.to_string()),
1078 (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1079 (
1080 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1081 SqlBindStyle::DollarNumeric.to_string(),
1082 ),
1083 ]);
1084 SqlCatalogBuilder::default()
1085 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1086 .load(&name, props)
1087 .await
1088 .unwrap()
1089 }
1090
1091 async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
1092 let _ = catalog
1093 .create_namespace(namespace_ident, HashMap::new())
1094 .await
1095 .unwrap();
1096 }
1097
1098 async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
1099 for namespace_ident in namespace_idents {
1100 let _ = create_namespace(catalog, namespace_ident).await;
1101 }
1102 }
1103
1104 fn simple_table_schema() -> Schema {
1105 Schema::builder()
1106 .with_fields(vec![
1107 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
1108 ])
1109 .build()
1110 .unwrap()
1111 }
1112
1113 async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
1114 let _ = catalog
1115 .create_table(
1116 &table_ident.namespace,
1117 TableCreation::builder()
1118 .name(table_ident.name().into())
1119 .schema(simple_table_schema())
1120 .location(temp_path())
1121 .build(),
1122 )
1123 .await
1124 .unwrap();
1125 }
1126
1127 async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
1128 for table_ident in table_idents {
1129 create_table(catalog, table_ident).await;
1130 }
1131 }
1132
1133 fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
1134 assert_eq!(table.identifier(), expected_table_ident);
1135
1136 let metadata = table.metadata();
1137
1138 assert_eq!(metadata.current_schema().as_ref(), expected_schema);
1139
1140 let expected_partition_spec = PartitionSpec::builder(expected_schema.clone())
1141 .with_spec_id(0)
1142 .build()
1143 .unwrap();
1144
1145 assert_eq!(
1146 metadata
1147 .partition_specs_iter()
1148 .map(|p| p.as_ref())
1149 .collect_vec(),
1150 vec![&expected_partition_spec]
1151 );
1152
1153 let expected_sorted_order = SortOrder::builder()
1154 .with_order_id(0)
1155 .with_fields(vec![])
1156 .build(expected_schema)
1157 .unwrap();
1158
1159 assert_eq!(
1160 metadata
1161 .sort_orders_iter()
1162 .map(|s| s.as_ref())
1163 .collect_vec(),
1164 vec![&expected_sorted_order]
1165 );
1166
1167 assert_eq!(metadata.properties(), &HashMap::new());
1168
1169 assert!(!table.readonly());
1170 }
1171
1172 fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
1173 let actual = table.metadata_location().unwrap().to_string();
1174 let regex = Regex::new(regex_str).unwrap();
1175 assert!(regex.is_match(&actual))
1176 }
1177
1178 #[tokio::test]
1179 async fn test_initialized() {
1180 let warehouse_loc = temp_path();
1181 new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1182 new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1184 new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1185 }
1186
1187 #[tokio::test]
1190 async fn test_storage_props_propagate_to_file_io() {
1191 let sql_lite_uri = format!("sqlite:{}", temp_path());
1192 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1193 let warehouse_location = temp_path();
1194
1195 let catalog = SqlCatalogBuilder::default()
1196 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1197 .load(
1198 "iceberg",
1199 HashMap::from_iter([
1200 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1201 (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1202 ("s3.region".to_string(), "us-east-1".to_string()),
1203 ("hf.token".to_string(), "hf_test_token".to_string()),
1204 ]),
1205 )
1206 .await
1207 .unwrap();
1208
1209 let props = catalog.fileio.config().props();
1210 assert_eq!(props.get("s3.region"), Some(&"us-east-1".to_string()));
1211 assert_eq!(props.get("hf.token"), Some(&"hf_test_token".to_string()));
1212 }
1213
1214 #[tokio::test]
1215 async fn test_builder_method() {
1216 let sql_lite_uri = format!("sqlite:{}", temp_path());
1217 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1218 let warehouse_location = temp_path();
1219
1220 let catalog = SqlCatalogBuilder::default()
1221 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1222 .uri(sql_lite_uri.to_string())
1223 .warehouse_location(warehouse_location.clone())
1224 .sql_bind_style(SqlBindStyle::QMark)
1225 .load("iceberg", HashMap::default())
1226 .await;
1227 assert!(catalog.is_ok());
1228
1229 let catalog = catalog.unwrap();
1230 assert!(catalog.warehouse_location == warehouse_location);
1231 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1232 }
1233
1234 #[tokio::test]
1237 async fn test_builder_props_non_existent_path_fails() {
1238 let sql_lite_uri = format!("sqlite:{}", temp_path());
1239 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1240 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1241 let warehouse_location = temp_path();
1242
1243 let catalog = SqlCatalogBuilder::default()
1244 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1245 .uri(sql_lite_uri)
1246 .warehouse_location(warehouse_location)
1247 .load(
1248 "iceberg",
1249 HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)]),
1250 )
1251 .await;
1252 assert!(catalog.is_err());
1253 }
1254
1255 #[tokio::test]
1259 async fn test_builder_props_set_valid_uri() {
1260 let sql_lite_uri = format!("sqlite:{}", temp_path());
1261 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1262 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1263 let warehouse_location = temp_path();
1264
1265 let catalog = SqlCatalogBuilder::default()
1266 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1267 .uri(sql_lite_uri2)
1268 .warehouse_location(warehouse_location)
1269 .load(
1270 "iceberg",
1271 HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone())]),
1272 )
1273 .await;
1274 assert!(catalog.is_ok());
1275 }
1276
1277 #[tokio::test]
1279 async fn test_builder_props_take_precedence() {
1280 let sql_lite_uri = format!("sqlite:{}", temp_path());
1281 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1282 let warehouse_location = temp_path();
1283 let warehouse_location2 = temp_path();
1284
1285 let catalog = SqlCatalogBuilder::default()
1286 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1287 .warehouse_location(warehouse_location2)
1288 .sql_bind_style(SqlBindStyle::DollarNumeric)
1289 .load(
1290 "iceberg",
1291 HashMap::from_iter([
1292 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1293 (
1294 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1295 warehouse_location.clone(),
1296 ),
1297 (
1298 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1299 SqlBindStyle::QMark.to_string(),
1300 ),
1301 ]),
1302 )
1303 .await;
1304
1305 assert!(catalog.is_ok());
1306
1307 let catalog = catalog.unwrap();
1308 assert!(catalog.warehouse_location == warehouse_location);
1309 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1310 }
1311
1312 #[tokio::test]
1314 async fn test_builder_props_take_precedence_props() {
1315 let sql_lite_uri = format!("sqlite:{}", temp_path());
1316 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1317 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1318 let warehouse_location = temp_path();
1319 let warehouse_location2 = temp_path();
1320
1321 let props = HashMap::from_iter([
1322 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1323 (
1324 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1325 warehouse_location.clone(),
1326 ),
1327 (
1328 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1329 SqlBindStyle::QMark.to_string(),
1330 ),
1331 ]);
1332 let props2 = HashMap::from_iter([
1333 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2.clone()),
1334 (
1335 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1336 warehouse_location2.clone(),
1337 ),
1338 (
1339 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1340 SqlBindStyle::DollarNumeric.to_string(),
1341 ),
1342 ]);
1343
1344 let catalog = SqlCatalogBuilder::default()
1345 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1346 .props(props2)
1347 .load("iceberg", props)
1348 .await;
1349
1350 assert!(catalog.is_ok());
1351
1352 let catalog = catalog.unwrap();
1353 assert!(catalog.warehouse_location == warehouse_location);
1354 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1355 }
1356
1357 #[tokio::test]
1359 async fn test_builder_props_take_precedence_prop() {
1360 let sql_lite_uri = format!("sqlite:{}", temp_path());
1361 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1362 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1363 let warehouse_location = temp_path();
1364 let warehouse_location2 = temp_path();
1365
1366 let props = HashMap::from_iter([
1367 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1368 (
1369 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1370 warehouse_location.clone(),
1371 ),
1372 (
1373 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1374 SqlBindStyle::QMark.to_string(),
1375 ),
1376 ]);
1377
1378 let catalog = SqlCatalogBuilder::default()
1379 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1380 .prop(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)
1381 .prop(SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location2)
1382 .prop(
1383 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1384 SqlBindStyle::DollarNumeric.to_string(),
1385 )
1386 .load("iceberg", props)
1387 .await;
1388
1389 assert!(catalog.is_ok());
1390
1391 let catalog = catalog.unwrap();
1392 assert!(catalog.warehouse_location == warehouse_location);
1393 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1394 }
1395
1396 #[tokio::test]
1398 async fn test_builder_props_invalid_bind_style_fails() {
1399 let sql_lite_uri = format!("sqlite:{}", temp_path());
1400 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1401 let warehouse_location = temp_path();
1402
1403 let catalog = SqlCatalogBuilder::default()
1404 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1405 .load(
1406 "iceberg",
1407 HashMap::from_iter([
1408 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1409 (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1410 (SQL_CATALOG_PROP_BIND_STYLE.to_string(), "AAA".to_string()),
1411 ]),
1412 )
1413 .await;
1414
1415 assert!(catalog.is_err());
1416 }
1417
1418 #[tokio::test]
1419 async fn test_list_namespaces_returns_empty_vector() {
1420 let warehouse_loc = temp_path();
1421 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1422
1423 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
1424 }
1425
1426 #[tokio::test]
1427 async fn test_list_namespaces_returns_empty_different_name() {
1428 let warehouse_loc = temp_path();
1429 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1430 let namespace_ident_1 = NamespaceIdent::new("a".into());
1431 let namespace_ident_2 = NamespaceIdent::new("b".into());
1432 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1433 assert_eq!(
1434 to_set(catalog.list_namespaces(None).await.unwrap()),
1435 to_set(vec![namespace_ident_1, namespace_ident_2])
1436 );
1437
1438 let catalog2 = new_sql_catalog(warehouse_loc, Some("test")).await;
1439 assert_eq!(catalog2.list_namespaces(None).await.unwrap(), vec![]);
1440 }
1441
1442 #[tokio::test]
1443 async fn test_list_namespaces_returns_only_top_level_namespaces() {
1444 let warehouse_loc = temp_path();
1445 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1446 let namespace_ident_1 = NamespaceIdent::new("a".into());
1447 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1448 let namespace_ident_3 = NamespaceIdent::new("b".into());
1449 create_namespaces(&catalog, &vec![
1450 &namespace_ident_1,
1451 &namespace_ident_2,
1452 &namespace_ident_3,
1453 ])
1454 .await;
1455
1456 assert_eq!(
1457 to_set(catalog.list_namespaces(None).await.unwrap()),
1458 to_set(vec![namespace_ident_1, namespace_ident_3])
1459 );
1460 }
1461
1462 #[tokio::test]
1463 async fn test_list_namespaces_returns_no_namespaces_under_parent() {
1464 let warehouse_loc = temp_path();
1465 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1466 let namespace_ident_1 = NamespaceIdent::new("a".into());
1467 let namespace_ident_2 = NamespaceIdent::new("b".into());
1468 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1469
1470 assert_eq!(
1471 catalog
1472 .list_namespaces(Some(&namespace_ident_1))
1473 .await
1474 .unwrap(),
1475 vec![]
1476 );
1477 }
1478
1479 #[tokio::test]
1480 async fn test_list_namespaces_returns_namespace_under_parent() {
1481 let warehouse_loc = temp_path();
1482 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1483 let namespace_ident_1 = NamespaceIdent::new("a".into());
1484 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1485 let namespace_ident_3 = NamespaceIdent::new("c".into());
1486 create_namespaces(&catalog, &vec![
1487 &namespace_ident_1,
1488 &namespace_ident_2,
1489 &namespace_ident_3,
1490 ])
1491 .await;
1492
1493 assert_eq!(
1494 to_set(catalog.list_namespaces(None).await.unwrap()),
1495 to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
1496 );
1497
1498 assert_eq!(
1499 catalog
1500 .list_namespaces(Some(&namespace_ident_1))
1501 .await
1502 .unwrap(),
1503 vec![NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()]
1504 );
1505 }
1506
1507 #[tokio::test]
1508 async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
1509 let warehouse_loc = temp_path();
1510 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1511 let namespace_ident_1 = NamespaceIdent::new("a".to_string());
1512 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
1513 let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1514 let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
1515 let namespace_ident_5 = NamespaceIdent::new("b".into());
1516 create_namespaces(&catalog, &vec![
1517 &namespace_ident_1,
1518 &namespace_ident_2,
1519 &namespace_ident_3,
1520 &namespace_ident_4,
1521 &namespace_ident_5,
1522 ])
1523 .await;
1524
1525 assert_eq!(
1526 to_set(
1527 catalog
1528 .list_namespaces(Some(&namespace_ident_1))
1529 .await
1530 .unwrap()
1531 ),
1532 to_set(vec![
1533 NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(),
1534 NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(),
1535 NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(),
1536 ])
1537 );
1538 }
1539
1540 #[tokio::test]
1541 async fn test_namespace_exists_returns_false() {
1542 let warehouse_loc = temp_path();
1543 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1544 let namespace_ident = NamespaceIdent::new("a".into());
1545 create_namespace(&catalog, &namespace_ident).await;
1546
1547 assert!(
1548 !catalog
1549 .namespace_exists(&NamespaceIdent::new("b".into()))
1550 .await
1551 .unwrap()
1552 );
1553 }
1554
1555 #[tokio::test]
1556 async fn test_namespace_exists_returns_true() {
1557 let warehouse_loc = temp_path();
1558 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1559 let namespace_ident = NamespaceIdent::new("a".into());
1560 create_namespace(&catalog, &namespace_ident).await;
1561
1562 assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
1563 }
1564
1565 #[tokio::test]
1566 async fn test_create_namespace_with_properties() {
1567 let warehouse_loc = temp_path();
1568 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1569 let namespace_ident = NamespaceIdent::new("abc".into());
1570
1571 let mut properties = default_properties();
1572 properties.insert("k".into(), "v".into());
1573
1574 assert_eq!(
1575 catalog
1576 .create_namespace(&namespace_ident, properties.clone())
1577 .await
1578 .unwrap(),
1579 Namespace::with_properties(namespace_ident.clone(), properties.clone())
1580 );
1581
1582 assert_eq!(
1583 catalog.get_namespace(&namespace_ident).await.unwrap(),
1584 Namespace::with_properties(namespace_ident, properties)
1585 );
1586 }
1587
1588 #[tokio::test]
1589 async fn test_create_nested_namespace() {
1590 let warehouse_loc = temp_path();
1591 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1592 let parent_namespace_ident = NamespaceIdent::new("a".into());
1593 create_namespace(&catalog, &parent_namespace_ident).await;
1594
1595 let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1596
1597 assert_eq!(
1598 catalog
1599 .create_namespace(&child_namespace_ident, HashMap::new())
1600 .await
1601 .unwrap(),
1602 Namespace::new(child_namespace_ident.clone())
1603 );
1604
1605 assert_eq!(
1606 catalog.get_namespace(&child_namespace_ident).await.unwrap(),
1607 Namespace::with_properties(child_namespace_ident, default_properties())
1608 );
1609 }
1610
1611 #[tokio::test]
1612 async fn test_create_deeply_nested_namespace() {
1613 let warehouse_loc = temp_path();
1614 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1615 let namespace_ident_a = NamespaceIdent::new("a".into());
1616 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1617 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1618
1619 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1620
1621 assert_eq!(
1622 catalog
1623 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
1624 .await
1625 .unwrap(),
1626 Namespace::new(namespace_ident_a_b_c.clone())
1627 );
1628
1629 assert_eq!(
1630 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
1631 Namespace::with_properties(namespace_ident_a_b_c, default_properties())
1632 );
1633 }
1634
1635 #[tokio::test]
1636 async fn test_update_namespace_noop() {
1637 let warehouse_loc = temp_path();
1638 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1639 let namespace_ident = NamespaceIdent::new("a".into());
1640 create_namespace(&catalog, &namespace_ident).await;
1641
1642 catalog
1643 .update_namespace(&namespace_ident, HashMap::new())
1644 .await
1645 .unwrap();
1646
1647 assert_eq!(
1648 *catalog
1649 .get_namespace(&namespace_ident)
1650 .await
1651 .unwrap()
1652 .properties(),
1653 HashMap::from_iter([("exists".to_string(), "true".to_string())])
1654 )
1655 }
1656
1657 #[tokio::test]
1658 async fn test_update_nested_namespace() {
1659 let warehouse_loc = temp_path();
1660 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1661 let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1662 create_namespace(&catalog, &namespace_ident).await;
1663
1664 let mut props = HashMap::from_iter([
1665 ("prop1".to_string(), "val1".to_string()),
1666 ("prop2".into(), "val2".into()),
1667 ]);
1668
1669 catalog
1670 .update_namespace(&namespace_ident, props.clone())
1671 .await
1672 .unwrap();
1673
1674 props.insert("exists".into(), "true".into());
1675
1676 assert_eq!(
1677 *catalog
1678 .get_namespace(&namespace_ident)
1679 .await
1680 .unwrap()
1681 .properties(),
1682 props
1683 )
1684 }
1685
1686 #[tokio::test]
1687 async fn test_update_namespace_errors_if_nested_namespace_doesnt_exist() {
1688 let warehouse_loc = temp_path();
1689 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1690 let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1691
1692 let props = HashMap::from_iter([
1693 ("prop1".to_string(), "val1".to_string()),
1694 ("prop2".into(), "val2".into()),
1695 ]);
1696
1697 let err = catalog
1698 .update_namespace(&namespace_ident, props)
1699 .await
1700 .unwrap_err();
1701
1702 assert_eq!(
1703 err.message(),
1704 format!("No such namespace: {namespace_ident:?}")
1705 );
1706 }
1707
1708 #[tokio::test]
1709 async fn test_drop_nested_namespace() {
1710 let warehouse_loc = temp_path();
1711 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1712 let namespace_ident_a = NamespaceIdent::new("a".into());
1713 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1714 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1715
1716 catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1717
1718 assert!(
1719 !catalog
1720 .namespace_exists(&namespace_ident_a_b)
1721 .await
1722 .unwrap()
1723 );
1724
1725 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1726 }
1727
1728 #[tokio::test]
1729 async fn test_drop_deeply_nested_namespace() {
1730 let warehouse_loc = temp_path();
1731 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1732 let namespace_ident_a = NamespaceIdent::new("a".into());
1733 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1734 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1735 create_namespaces(&catalog, &vec![
1736 &namespace_ident_a,
1737 &namespace_ident_a_b,
1738 &namespace_ident_a_b_c,
1739 ])
1740 .await;
1741
1742 catalog
1743 .drop_namespace(&namespace_ident_a_b_c)
1744 .await
1745 .unwrap();
1746
1747 assert!(
1748 !catalog
1749 .namespace_exists(&namespace_ident_a_b_c)
1750 .await
1751 .unwrap()
1752 );
1753
1754 assert!(
1755 catalog
1756 .namespace_exists(&namespace_ident_a_b)
1757 .await
1758 .unwrap()
1759 );
1760
1761 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1762 }
1763
1764 #[tokio::test]
1765 async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1766 let warehouse_loc = temp_path();
1767 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1768 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1769
1770 let non_existent_namespace_ident =
1771 NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1772 assert_eq!(
1773 catalog
1774 .drop_namespace(&non_existent_namespace_ident)
1775 .await
1776 .unwrap_err()
1777 .to_string(),
1778 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1779 )
1780 }
1781
1782 #[tokio::test]
1783 async fn test_dropping_a_namespace_does_not_drop_namespaces_nested_under_that_one() {
1784 let warehouse_loc = temp_path();
1785 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1786 let namespace_ident_a = NamespaceIdent::new("a".into());
1787 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1788 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1789
1790 catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1791
1792 assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1793
1794 assert!(
1795 catalog
1796 .namespace_exists(&namespace_ident_a_b)
1797 .await
1798 .unwrap()
1799 );
1800 }
1801
1802 #[tokio::test]
1803 async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1804 let warehouse_loc = temp_path();
1805 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1806
1807 let namespace_ident = NamespaceIdent::new("a".into());
1808 let mut namespace_properties = HashMap::new();
1809 let namespace_location = temp_path();
1810 namespace_properties.insert(
1811 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1812 namespace_location.to_string(),
1813 );
1814 catalog
1815 .create_namespace(&namespace_ident, namespace_properties)
1816 .await
1817 .unwrap();
1818
1819 let table_name = "tbl1";
1820 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1821 let expected_table_metadata_location_regex =
1822 format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1823
1824 let table = catalog
1825 .create_table(
1826 &namespace_ident,
1827 TableCreation::builder()
1828 .name(table_name.into())
1829 .schema(simple_table_schema())
1830 .build(),
1832 )
1833 .await
1834 .unwrap();
1835 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1836 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1837
1838 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1839 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1840 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1841 }
1842
1843 #[tokio::test]
1844 async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1845 {
1846 let warehouse_loc = temp_path();
1847 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1848
1849 let namespace_ident = NamespaceIdent::new("a".into());
1850 let mut namespace_properties = HashMap::new();
1851 let namespace_location = temp_path();
1852 namespace_properties.insert(
1853 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1854 namespace_location.to_string(),
1855 );
1856 catalog
1857 .create_namespace(&namespace_ident, namespace_properties)
1858 .await
1859 .unwrap();
1860
1861 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1862 let mut nested_namespace_properties = HashMap::new();
1863 let nested_namespace_location = temp_path();
1864 nested_namespace_properties.insert(
1865 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1866 nested_namespace_location.to_string(),
1867 );
1868 catalog
1869 .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1870 .await
1871 .unwrap();
1872
1873 let table_name = "tbl1";
1874 let expected_table_ident =
1875 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1876 let expected_table_metadata_location_regex = format!(
1877 "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1878 );
1879
1880 let table = catalog
1881 .create_table(
1882 &nested_namespace_ident,
1883 TableCreation::builder()
1884 .name(table_name.into())
1885 .schema(simple_table_schema())
1886 .build(),
1888 )
1889 .await
1890 .unwrap();
1891 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1892 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1893
1894 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1895 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1896 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1897 }
1898
1899 #[tokio::test]
1900 async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1901 {
1902 let warehouse_loc = temp_path();
1903 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1904
1905 let namespace_ident = NamespaceIdent::new("a".into());
1906 let namespace_properties = HashMap::new();
1908 catalog
1909 .create_namespace(&namespace_ident, namespace_properties)
1910 .await
1911 .unwrap();
1912
1913 let table_name = "tbl1";
1914 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1915 let expected_table_metadata_location_regex =
1916 format!("^{warehouse_loc}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1917
1918 let table = catalog
1919 .create_table(
1920 &namespace_ident,
1921 TableCreation::builder()
1922 .name(table_name.into())
1923 .schema(simple_table_schema())
1924 .build(),
1926 )
1927 .await
1928 .unwrap();
1929 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1930 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1931
1932 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1933 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1934 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1935 }
1936
1937 #[tokio::test]
1938 async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1939 {
1940 let warehouse_loc = temp_path();
1941 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1942
1943 let namespace_ident = NamespaceIdent::new("a".into());
1944 create_namespace(&catalog, &namespace_ident).await;
1945
1946 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1947 create_namespace(&catalog, &nested_namespace_ident).await;
1948
1949 let table_name = "tbl1";
1950 let expected_table_ident =
1951 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1952 let expected_table_metadata_location_regex =
1953 format!("^{warehouse_loc}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1954
1955 let table = catalog
1956 .create_table(
1957 &nested_namespace_ident,
1958 TableCreation::builder()
1959 .name(table_name.into())
1960 .schema(simple_table_schema())
1961 .build(),
1963 )
1964 .await
1965 .unwrap();
1966 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1967 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1968
1969 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1970 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1971 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1972 }
1973
1974 #[tokio::test]
1975 async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1976 let warehouse_loc = temp_path();
1977 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1978 let namespace_ident = NamespaceIdent::new("a".into());
1979 create_namespace(&catalog, &namespace_ident).await;
1980 let table_name = "tbl1";
1981 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1982 create_table(&catalog, &table_ident).await;
1983
1984 let tmp_dir = TempDir::new().unwrap();
1985 let location = tmp_dir.path().to_str().unwrap().to_string();
1986
1987 assert_eq!(
1988 catalog
1989 .create_table(
1990 &namespace_ident,
1991 TableCreation::builder()
1992 .name(table_name.into())
1993 .schema(simple_table_schema())
1994 .location(location)
1995 .build()
1996 )
1997 .await
1998 .unwrap_err()
1999 .to_string(),
2000 format!(
2001 "TableAlreadyExists => Table {:?} already exists.",
2002 &table_ident
2003 )
2004 );
2005 }
2006
2007 #[tokio::test]
2008 async fn test_rename_table_src_table_is_same_as_dst_table() {
2009 let warehouse_loc = temp_path();
2010 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2011 let namespace_ident = NamespaceIdent::new("n1".into());
2012 create_namespace(&catalog, &namespace_ident).await;
2013 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
2014 create_table(&catalog, &table_ident).await;
2015
2016 catalog
2017 .rename_table(&table_ident, &table_ident)
2018 .await
2019 .unwrap();
2020
2021 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
2022 table_ident
2023 ],);
2024 }
2025
2026 #[tokio::test]
2027 async fn test_rename_table_across_nested_namespaces() {
2028 let warehouse_loc = temp_path();
2029 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2030 let namespace_ident_a = NamespaceIdent::new("a".into());
2031 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
2032 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
2033 create_namespaces(&catalog, &vec![
2034 &namespace_ident_a,
2035 &namespace_ident_a_b,
2036 &namespace_ident_a_b_c,
2037 ])
2038 .await;
2039
2040 let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
2041 create_tables(&catalog, vec![&src_table_ident]).await;
2042
2043 let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
2044 catalog
2045 .rename_table(&src_table_ident, &dst_table_ident)
2046 .await
2047 .unwrap();
2048
2049 assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
2050
2051 assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
2052 }
2053
2054 #[tokio::test]
2055 async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
2056 let warehouse_loc = temp_path();
2057 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2058 let src_namespace_ident = NamespaceIdent::new("n1".into());
2059 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
2060 create_namespace(&catalog, &src_namespace_ident).await;
2061 create_table(&catalog, &src_table_ident).await;
2062
2063 let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
2064 let dst_table_ident =
2065 TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
2066 assert_eq!(
2067 catalog
2068 .rename_table(&src_table_ident, &dst_table_ident)
2069 .await
2070 .unwrap_err()
2071 .to_string(),
2072 format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"),
2073 );
2074 }
2075}