1use std::collections::{HashMap, HashSet};
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration;
22
23use async_trait::async_trait;
24use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
25use iceberg::spec::{TableMetadata, TableMetadataBuilder};
26use iceberg::table::Table;
27use iceberg::{
28 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
29 Runtime, TableCommit, TableCreation, TableIdent,
30};
31use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers};
32use sqlx::{Any, AnyPool, Row, Transaction};
33
34use crate::error::{
35 from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err,
36};
37
38pub const SQL_CATALOG_PROP_URI: &str = "uri";
40pub const SQL_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
42pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style";
44
45static CATALOG_TABLE_NAME: &str = "iceberg_tables";
46static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
47static CATALOG_FIELD_TABLE_NAME: &str = "table_name";
48static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace";
49static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location";
50static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
51static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type";
52static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE";
53
54static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties";
55static NAMESPACE_FIELD_NAME: &str = "namespace";
56static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key";
57static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value";
58
59static NAMESPACE_LOCATION_PROPERTY_KEY: &str = "location";
60
61static MAX_CONNECTIONS: u32 = 10; static IDLE_TIMEOUT: u64 = 10; static TEST_BEFORE_ACQUIRE: bool = true; #[derive(Debug)]
67pub struct SqlCatalogBuilder {
68 config: SqlCatalogConfig,
69 storage_factory: Option<Arc<dyn StorageFactory>>,
70 runtime: Option<Runtime>,
71}
72
73impl Default for SqlCatalogBuilder {
74 fn default() -> Self {
75 Self {
76 config: SqlCatalogConfig {
77 uri: "".to_string(),
78 name: "".to_string(),
79 warehouse_location: "".to_string(),
80 sql_bind_style: SqlBindStyle::DollarNumeric,
81 props: HashMap::new(),
82 },
83 storage_factory: None,
84 runtime: None,
85 }
86 }
87}
88
89impl SqlCatalogBuilder {
90 pub fn uri(mut self, uri: impl Into<String>) -> Self {
95 self.config.uri = uri.into();
96 self
97 }
98
99 pub fn warehouse_location(mut self, location: impl Into<String>) -> Self {
104 self.config.warehouse_location = location.into();
105 self
106 }
107
108 pub fn sql_bind_style(mut self, sql_bind_style: SqlBindStyle) -> Self {
113 self.config.sql_bind_style = sql_bind_style;
114 self
115 }
116
117 pub fn props(mut self, props: HashMap<String, String>) -> Self {
122 for (k, v) in props {
123 self.config.props.insert(k, v);
124 }
125 self
126 }
127
128 pub fn prop(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
135 self.config.props.insert(key.into(), value.into());
136 self
137 }
138}
139
140impl CatalogBuilder for SqlCatalogBuilder {
141 type C = SqlCatalog;
142
143 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
144 self.storage_factory = Some(storage_factory);
145 self
146 }
147
148 fn with_runtime(mut self, runtime: Runtime) -> Self {
149 self.runtime = Some(runtime);
150 self
151 }
152
153 fn load(
154 mut self,
155 name: impl Into<String>,
156 props: HashMap<String, String>,
157 ) -> impl Future<Output = Result<Self::C>> + Send {
158 for (k, v) in props {
159 self.config.props.insert(k, v);
160 }
161
162 if let Some(uri) = self.config.props.remove(SQL_CATALOG_PROP_URI) {
163 self.config.uri = uri;
164 }
165 if let Some(warehouse_location) = self.config.props.remove(SQL_CATALOG_PROP_WAREHOUSE) {
166 self.config.warehouse_location = warehouse_location;
167 }
168
169 let name = name.into();
170
171 let mut valid_sql_bind_style = true;
172 if let Some(sql_bind_style) = self.config.props.remove(SQL_CATALOG_PROP_BIND_STYLE) {
173 if let Ok(sql_bind_style) = SqlBindStyle::from_str(&sql_bind_style) {
174 self.config.sql_bind_style = sql_bind_style;
175 } else {
176 valid_sql_bind_style = false;
177 }
178 }
179
180 let valid_name = !name.trim().is_empty();
181
182 async move {
183 if !valid_name {
184 Err(Error::new(
185 ErrorKind::DataInvalid,
186 "Catalog name cannot be empty",
187 ))
188 } else if !valid_sql_bind_style {
189 Err(Error::new(
190 ErrorKind::DataInvalid,
191 format!(
192 "`{}` values are valid only if they're `{}` or `{}`",
193 SQL_CATALOG_PROP_BIND_STYLE,
194 SqlBindStyle::DollarNumeric,
195 SqlBindStyle::QMark
196 ),
197 ))
198 } else {
199 self.config.name = name;
200 let runtime = match self.runtime {
201 Some(rt) => rt,
202 None => Runtime::try_current()?,
203 };
204 SqlCatalog::new(self.config, self.storage_factory, runtime).await
205 }
206 }
207 }
208}
209
210#[derive(Debug)]
219struct SqlCatalogConfig {
220 uri: String,
221 name: String,
222 warehouse_location: String,
223 sql_bind_style: SqlBindStyle,
224 props: HashMap<String, String>,
225}
226
227#[derive(Debug)]
228pub struct SqlCatalog {
230 name: String,
231 connection: AnyPool,
232 warehouse_location: String,
233 fileio: FileIO,
234 sql_bind_style: SqlBindStyle,
235 runtime: Runtime,
236}
237
238#[derive(Debug, PartialEq, strum::EnumString, strum::Display)]
239pub enum SqlBindStyle {
241 DollarNumeric,
243 QMark,
245}
246
247impl SqlCatalog {
248 async fn new(
250 config: SqlCatalogConfig,
251 storage_factory: Option<Arc<dyn StorageFactory>>,
252 runtime: Runtime,
253 ) -> Result<Self> {
254 let factory = storage_factory.ok_or_else(|| {
255 Error::new(
256 ErrorKind::Unexpected,
257 "StorageFactory must be provided for SqlCatalog. Use `with_storage_factory` to configure it.",
258 )
259 })?;
260 let fileio = FileIOBuilder::new(factory).build();
261
262 install_default_drivers();
263 let max_connections: u32 = config
264 .props
265 .get("pool.max-connections")
266 .map(|v| v.parse().unwrap())
267 .unwrap_or(MAX_CONNECTIONS);
268 let idle_timeout: u64 = config
269 .props
270 .get("pool.idle-timeout")
271 .map(|v| v.parse().unwrap())
272 .unwrap_or(IDLE_TIMEOUT);
273 let test_before_acquire: bool = config
274 .props
275 .get("pool.test-before-acquire")
276 .map(|v| v.parse().unwrap())
277 .unwrap_or(TEST_BEFORE_ACQUIRE);
278
279 let pool = AnyPoolOptions::new()
280 .max_connections(max_connections)
281 .idle_timeout(Duration::from_secs(idle_timeout))
282 .test_before_acquire(test_before_acquire)
283 .connect(&config.uri)
284 .await
285 .map_err(from_sqlx_error)?;
286
287 sqlx::query(&format!(
288 "CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_NAME} (
289 {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
290 {CATALOG_FIELD_TABLE_NAMESPACE} VARCHAR(255) NOT NULL,
291 {CATALOG_FIELD_TABLE_NAME} VARCHAR(255) NOT NULL,
292 {CATALOG_FIELD_METADATA_LOCATION_PROP} VARCHAR(1000),
293 {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} VARCHAR(1000),
294 {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5),
295 PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}))"
296 ))
297 .execute(&pool)
298 .await
299 .map_err(from_sqlx_error)?;
300
301 sqlx::query(&format!(
302 "CREATE TABLE IF NOT EXISTS {NAMESPACE_TABLE_NAME} (
303 {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
304 {NAMESPACE_FIELD_NAME} VARCHAR(255) NOT NULL,
305 {NAMESPACE_FIELD_PROPERTY_KEY} VARCHAR(255),
306 {NAMESPACE_FIELD_PROPERTY_VALUE} VARCHAR(1000),
307 PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}))"
308 ))
309 .execute(&pool)
310 .await
311 .map_err(from_sqlx_error)?;
312
313 Ok(SqlCatalog {
314 name: config.name.to_owned(),
315 connection: pool,
316 warehouse_location: config.warehouse_location,
317 fileio,
318 sql_bind_style: config.sql_bind_style,
319 runtime,
320 })
321 }
322
323 fn replace_placeholders(&self, query: &str) -> String {
325 match self.sql_bind_style {
326 SqlBindStyle::DollarNumeric => {
327 let mut count = 1;
328 query
329 .chars()
330 .fold(String::with_capacity(query.len()), |mut acc, c| {
331 if c == '?' {
332 acc.push('$');
333 acc.push_str(&count.to_string());
334 count += 1;
335 } else {
336 acc.push(c);
337 }
338 acc
339 })
340 }
341 _ => query.to_owned(),
342 }
343 }
344
345 async fn fetch_rows(&self, query: &str, args: Vec<Option<&str>>) -> Result<Vec<AnyRow>> {
347 let query_with_placeholders = self.replace_placeholders(query);
348
349 let mut sqlx_query = sqlx::query(&query_with_placeholders);
350 for arg in args {
351 sqlx_query = sqlx_query.bind(arg);
352 }
353
354 sqlx_query
355 .fetch_all(&self.connection)
356 .await
357 .map_err(from_sqlx_error)
358 }
359
360 async fn execute(
362 &self,
363 query: &str,
364 args: Vec<Option<&str>>,
365 transaction: Option<&mut Transaction<'_, Any>>,
366 ) -> Result<AnyQueryResult> {
367 let query_with_placeholders = self.replace_placeholders(query);
368
369 let mut sqlx_query = sqlx::query(&query_with_placeholders);
370 for arg in args {
371 sqlx_query = sqlx_query.bind(arg);
372 }
373
374 match transaction {
375 Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error),
376 None => {
377 let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
378 let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error);
379 let _ = tx.commit().await.map_err(from_sqlx_error);
380 result
381 }
382 }
383 }
384}
385
386#[async_trait]
387impl Catalog for SqlCatalog {
388 async fn list_namespaces(
389 &self,
390 parent: Option<&NamespaceIdent>,
391 ) -> Result<Vec<NamespaceIdent>> {
392 let all_namespaces_stmt = format!(
394 "SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
395 FROM {CATALOG_TABLE_NAME}
396 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
397 UNION
398 SELECT {NAMESPACE_FIELD_NAME}
399 FROM {NAMESPACE_TABLE_NAME}
400 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
401 );
402
403 let namespace_rows = self
404 .fetch_rows(&all_namespaces_stmt, vec![
405 Some(&self.name),
406 Some(&self.name),
407 ])
408 .await?;
409
410 let mut namespaces = HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len());
411
412 if let Some(parent) = parent {
413 if self.namespace_exists(parent).await? {
414 let parent_str = parent.join(".");
415
416 for row in namespace_rows.iter() {
417 let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
418 if nsp != parent_str && nsp.starts_with(&parent_str) {
420 namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?);
421 }
422 }
423
424 Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
425 } else {
426 no_such_namespace_err(parent)
427 }
428 } else {
429 for row in namespace_rows.iter() {
430 let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
431 let mut levels = nsp.split(".").collect::<Vec<&str>>();
432 if !levels.is_empty() {
433 let first_level = levels.drain(..1).collect::<Vec<&str>>();
434 namespaces.insert(NamespaceIdent::from_strs(first_level)?);
435 }
436 }
437
438 Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
439 }
440 }
441
442 async fn create_namespace(
443 &self,
444 namespace: &NamespaceIdent,
445 properties: HashMap<String, String>,
446 ) -> Result<Namespace> {
447 let exists = self.namespace_exists(namespace).await?;
448
449 if exists {
450 return Err(Error::new(
451 iceberg::ErrorKind::NamespaceAlreadyExists,
452 format!("Namespace {namespace:?} already exists"),
453 ));
454 }
455
456 let namespace_str = namespace.join(".");
457 let insert = format!(
458 "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
459 VALUES (?, ?, ?, ?)");
460 if !properties.is_empty() {
461 let mut insert_properties = properties.clone();
462 insert_properties.insert("exists".to_string(), "true".to_string());
463
464 let mut query_args = Vec::with_capacity(insert_properties.len() * 4);
465 let mut insert_stmt = insert.clone();
466 for (index, (key, value)) in insert_properties.iter().enumerate() {
467 query_args.extend_from_slice(&[
468 Some(self.name.as_str()),
469 Some(namespace_str.as_str()),
470 Some(key.as_str()),
471 Some(value.as_str()),
472 ]);
473 if index > 0 {
474 insert_stmt.push_str(", (?, ?, ?, ?)");
475 }
476 }
477
478 self.execute(&insert_stmt, query_args, None).await?;
479
480 Ok(Namespace::with_properties(
481 namespace.clone(),
482 insert_properties,
483 ))
484 } else {
485 self.execute(
487 &insert,
488 vec![
489 Some(&self.name),
490 Some(&namespace_str),
491 Some("exists"),
492 Some("true"),
493 ],
494 None,
495 )
496 .await?;
497 Ok(Namespace::with_properties(namespace.clone(), properties))
498 }
499 }
500
501 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
502 let exists = self.namespace_exists(namespace).await?;
503 if exists {
504 let namespace_props = self
505 .fetch_rows(
506 &format!(
507 "SELECT
508 {NAMESPACE_FIELD_NAME},
509 {NAMESPACE_FIELD_PROPERTY_KEY},
510 {NAMESPACE_FIELD_PROPERTY_VALUE}
511 FROM {NAMESPACE_TABLE_NAME}
512 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
513 AND {NAMESPACE_FIELD_NAME} = ?"
514 ),
515 vec![Some(&self.name), Some(&namespace.join("."))],
516 )
517 .await?;
518
519 let mut properties = HashMap::with_capacity(namespace_props.len());
520
521 for row in namespace_props {
522 let key = row
523 .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_KEY)
524 .map_err(from_sqlx_error)?;
525 let value = row
526 .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_VALUE)
527 .map_err(from_sqlx_error)?;
528
529 properties.insert(key, value);
530 }
531
532 Ok(Namespace::with_properties(namespace.clone(), properties))
533 } else {
534 no_such_namespace_err(namespace)
535 }
536 }
537
538 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
539 let namespace_str = namespace.join(".");
540
541 let table_namespaces = self
542 .fetch_rows(
543 &format!(
544 "SELECT 1 FROM {CATALOG_TABLE_NAME}
545 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
546 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
547 LIMIT 1"
548 ),
549 vec![Some(&self.name), Some(&namespace_str)],
550 )
551 .await?;
552
553 if !table_namespaces.is_empty() {
554 Ok(true)
555 } else {
556 let namespaces = self
557 .fetch_rows(
558 &format!(
559 "SELECT 1 FROM {NAMESPACE_TABLE_NAME}
560 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
561 AND {NAMESPACE_FIELD_NAME} = ?
562 LIMIT 1"
563 ),
564 vec![Some(&self.name), Some(&namespace_str)],
565 )
566 .await?;
567 if !namespaces.is_empty() {
568 Ok(true)
569 } else {
570 Ok(false)
571 }
572 }
573 }
574
575 async fn update_namespace(
576 &self,
577 namespace: &NamespaceIdent,
578 properties: HashMap<String, String>,
579 ) -> Result<()> {
580 let exists = self.namespace_exists(namespace).await?;
581 if exists {
582 let existing_properties = self.get_namespace(namespace).await?.properties().clone();
583 let namespace_str = namespace.join(".");
584
585 let mut updates = vec![];
586 let mut inserts = vec![];
587
588 for (key, value) in properties.iter() {
589 if existing_properties.contains_key(key) {
590 if existing_properties.get(key) != Some(value) {
591 updates.push((key, value));
592 }
593 } else {
594 inserts.push((key, value));
595 }
596 }
597
598 let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
599 let update_stmt = format!(
600 "UPDATE {NAMESPACE_TABLE_NAME} SET {NAMESPACE_FIELD_PROPERTY_VALUE} = ?
601 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
602 AND {NAMESPACE_FIELD_NAME} = ?
603 AND {NAMESPACE_FIELD_PROPERTY_KEY} = ?"
604 );
605
606 let insert_stmt = format!(
607 "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
608 VALUES (?, ?, ?, ?)"
609 );
610
611 for (key, value) in updates {
612 self.execute(
613 &update_stmt,
614 vec![
615 Some(value),
616 Some(&self.name),
617 Some(&namespace_str),
618 Some(key),
619 ],
620 Some(&mut tx),
621 )
622 .await?;
623 }
624
625 for (key, value) in inserts {
626 self.execute(
627 &insert_stmt,
628 vec![
629 Some(&self.name),
630 Some(&namespace_str),
631 Some(key),
632 Some(value),
633 ],
634 Some(&mut tx),
635 )
636 .await?;
637 }
638
639 let _ = tx.commit().await.map_err(from_sqlx_error)?;
640
641 Ok(())
642 } else {
643 no_such_namespace_err(namespace)
644 }
645 }
646
647 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
648 let exists = self.namespace_exists(namespace).await?;
649 if exists {
650 let tables = self.list_tables(namespace).await?;
652 if !tables.is_empty() {
653 return Err(Error::new(
654 iceberg::ErrorKind::Unexpected,
655 format!(
656 "Namespace {:?} is not empty. {} tables exist.",
657 namespace,
658 tables.len()
659 ),
660 ));
661 }
662
663 self.execute(
664 &format!(
665 "DELETE FROM {NAMESPACE_TABLE_NAME}
666 WHERE {NAMESPACE_FIELD_NAME} = ?
667 AND {CATALOG_FIELD_CATALOG_NAME} = ?"
668 ),
669 vec![Some(&namespace.join(".")), Some(&self.name)],
670 None,
671 )
672 .await?;
673
674 Ok(())
675 } else {
676 no_such_namespace_err(namespace)
677 }
678 }
679
680 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
681 let exists = self.namespace_exists(namespace).await?;
682 if exists {
683 let rows = self
684 .fetch_rows(
685 &format!(
686 "SELECT {CATALOG_FIELD_TABLE_NAME},
687 {CATALOG_FIELD_TABLE_NAMESPACE}
688 FROM {CATALOG_TABLE_NAME}
689 WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
690 AND {CATALOG_FIELD_CATALOG_NAME} = ?
691 AND (
692 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
693 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
694 )",
695 ),
696 vec![Some(&namespace.join(".")), Some(&self.name)],
697 )
698 .await?;
699
700 let mut tables = HashSet::<TableIdent>::with_capacity(rows.len());
701
702 for row in rows.iter() {
703 let tbl = row
704 .try_get::<String, _>(CATALOG_FIELD_TABLE_NAME)
705 .map_err(from_sqlx_error)?;
706 let ns_strs = row
707 .try_get::<String, _>(CATALOG_FIELD_TABLE_NAMESPACE)
708 .map_err(from_sqlx_error)?;
709 let ns = NamespaceIdent::from_strs(ns_strs.split("."))?;
710 tables.insert(TableIdent::new(ns, tbl));
711 }
712
713 Ok(tables.into_iter().collect::<Vec<TableIdent>>())
714 } else {
715 no_such_namespace_err(namespace)
716 }
717 }
718
719 async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
720 let namespace = identifier.namespace().join(".");
721 let table_name = identifier.name();
722 let table_counts = self
723 .fetch_rows(
724 &format!(
725 "SELECT 1
726 FROM {CATALOG_TABLE_NAME}
727 WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
728 AND {CATALOG_FIELD_CATALOG_NAME} = ?
729 AND {CATALOG_FIELD_TABLE_NAME} = ?
730 AND (
731 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
732 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
733 )"
734 ),
735 vec![Some(&namespace), Some(&self.name), Some(table_name)],
736 )
737 .await?;
738
739 if !table_counts.is_empty() {
740 Ok(true)
741 } else {
742 Ok(false)
743 }
744 }
745
746 async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
747 if !self.table_exists(identifier).await? {
748 return no_such_table_err(identifier);
749 }
750
751 self.execute(
752 &format!(
753 "DELETE FROM {CATALOG_TABLE_NAME}
754 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
755 AND {CATALOG_FIELD_TABLE_NAME} = ?
756 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
757 AND (
758 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
759 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
760 )"
761 ),
762 vec![
763 Some(&self.name),
764 Some(identifier.name()),
765 Some(&identifier.namespace().join(".")),
766 ],
767 None,
768 )
769 .await?;
770
771 Ok(())
772 }
773
774 async fn purge_table(&self, table: &TableIdent) -> Result<()> {
775 let table_info = self.load_table(table).await?;
776 self.drop_table(table).await?;
777 iceberg::drop_table_data(
778 table_info.file_io(),
779 table_info.metadata(),
780 table_info.metadata_location(),
781 )
782 .await
783 }
784
785 async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
786 if !self.table_exists(identifier).await? {
787 return no_such_table_err(identifier);
788 }
789
790 let rows = self
791 .fetch_rows(
792 &format!(
793 "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP}
794 FROM {CATALOG_TABLE_NAME}
795 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
796 AND {CATALOG_FIELD_TABLE_NAME} = ?
797 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
798 AND (
799 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
800 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
801 )"
802 ),
803 vec![
804 Some(&self.name),
805 Some(identifier.name()),
806 Some(&identifier.namespace().join(".")),
807 ],
808 )
809 .await?;
810
811 if rows.is_empty() {
812 return no_such_table_err(identifier);
813 }
814
815 let row = &rows[0];
816 let tbl_metadata_location = row
817 .try_get::<String, _>(CATALOG_FIELD_METADATA_LOCATION_PROP)
818 .map_err(from_sqlx_error)?;
819
820 let metadata = TableMetadata::read_from(&self.fileio, &tbl_metadata_location).await?;
821
822 Ok(Table::builder()
823 .file_io(self.fileio.clone())
824 .identifier(identifier.clone())
825 .metadata_location(tbl_metadata_location)
826 .metadata(metadata)
827 .runtime(self.runtime.clone())
828 .build()?)
829 }
830
831 async fn create_table(
832 &self,
833 namespace: &NamespaceIdent,
834 creation: TableCreation,
835 ) -> Result<Table> {
836 if !self.namespace_exists(namespace).await? {
837 return no_such_namespace_err(namespace);
838 }
839
840 let tbl_name = creation.name.clone();
841 let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone());
842
843 if self.table_exists(&tbl_ident).await? {
844 return table_already_exists_err(&tbl_ident);
845 }
846
847 let (tbl_creation, location) = match creation.location.clone() {
848 Some(location) => (creation, location),
849 None => {
850 let nsp_properties = self.get_namespace(namespace).await?.properties().clone();
853 let nsp_location = match nsp_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) {
854 Some(location) => location.clone(),
855 None => {
856 format!(
857 "{}/{}",
858 self.warehouse_location.clone(),
859 namespace.join("/")
860 )
861 }
862 };
863
864 let tbl_location = format!("{}/{}", nsp_location, tbl_ident.name());
865
866 (
867 TableCreation {
868 location: Some(tbl_location.clone()),
869 ..creation
870 },
871 tbl_location,
872 )
873 }
874 };
875
876 let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
877 .build()?
878 .metadata;
879 let tbl_metadata_location =
880 MetadataLocation::new_with_metadata(location.clone(), &tbl_metadata);
881
882 tbl_metadata
883 .write_to(&self.fileio, &tbl_metadata_location)
884 .await?;
885
886 let tbl_metadata_location_str = tbl_metadata_location.to_string();
887 self.execute(&format!(
888 "INSERT INTO {CATALOG_TABLE_NAME}
889 ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
890 VALUES (?, ?, ?, ?, ?)
891 "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location_str), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
892
893 Ok(Table::builder()
894 .file_io(self.fileio.clone())
895 .metadata_location(tbl_metadata_location_str)
896 .identifier(tbl_ident)
897 .metadata(tbl_metadata)
898 .runtime(self.runtime.clone())
899 .build()?)
900 }
901
902 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
903 if src == dest {
904 return Ok(());
905 }
906
907 if !self.table_exists(src).await? {
908 return no_such_table_err(src);
909 }
910
911 if !self.namespace_exists(dest.namespace()).await? {
912 return no_such_namespace_err(dest.namespace());
913 }
914
915 if self.table_exists(dest).await? {
916 return table_already_exists_err(dest);
917 }
918
919 self.execute(
920 &format!(
921 "UPDATE {CATALOG_TABLE_NAME}
922 SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ?
923 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
924 AND {CATALOG_FIELD_TABLE_NAME} = ?
925 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
926 AND (
927 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
928 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
929 )"
930 ),
931 vec![
932 Some(dest.name()),
933 Some(&dest.namespace().join(".")),
934 Some(&self.name),
935 Some(src.name()),
936 Some(&src.namespace().join(".")),
937 ],
938 None,
939 )
940 .await?;
941
942 Ok(())
943 }
944
945 async fn register_table(
946 &self,
947 table_ident: &TableIdent,
948 metadata_location: String,
949 ) -> Result<Table> {
950 if self.table_exists(table_ident).await? {
951 return table_already_exists_err(table_ident);
952 }
953
954 let metadata = TableMetadata::read_from(&self.fileio, &metadata_location).await?;
955
956 let namespace = table_ident.namespace();
957 let tbl_name = table_ident.name().to_string();
958
959 self.execute(&format!(
960 "INSERT INTO {CATALOG_TABLE_NAME}
961 ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
962 VALUES (?, ?, ?, ?, ?)
963 "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name), Some(&metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
964
965 Ok(Table::builder()
966 .identifier(table_ident.clone())
967 .metadata_location(metadata_location)
968 .metadata(metadata)
969 .file_io(self.fileio.clone())
970 .runtime(self.runtime.clone())
971 .build()?)
972 }
973
974 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
976 let table_ident = commit.identifier().clone();
977 let current_table = self.load_table(&table_ident).await?;
978 let current_metadata_location = current_table.metadata_location_result()?.to_string();
979
980 let staged_table = commit.apply(current_table)?;
981 let staged_metadata_location_str = staged_table.metadata_location_result()?;
982 let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
983
984 staged_table
985 .metadata()
986 .write_to(staged_table.file_io(), &staged_metadata_location)
987 .await?;
988
989 let staged_metadata_location_str = staged_metadata_location.to_string();
990 let update_result = self
991 .execute(
992 &format!(
993 "UPDATE {CATALOG_TABLE_NAME}
994 SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?, {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} = ?
995 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
996 AND {CATALOG_FIELD_TABLE_NAME} = ?
997 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
998 AND (
999 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
1000 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
1001 )
1002 AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?"
1003 ),
1004 vec![
1005 Some(&staged_metadata_location_str),
1006 Some(current_metadata_location.as_str()),
1007 Some(&self.name),
1008 Some(table_ident.name()),
1009 Some(&table_ident.namespace().join(".")),
1010 Some(current_metadata_location.as_str()),
1011 ],
1012 None,
1013 )
1014 .await?;
1015
1016 if update_result.rows_affected() == 0 {
1017 return Err(Error::new(
1018 ErrorKind::CatalogCommitConflicts,
1019 format!("Commit conflicted for table: {table_ident}"),
1020 )
1021 .with_retryable(true));
1022 }
1023
1024 Ok(staged_table)
1025 }
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030 use std::collections::{HashMap, HashSet};
1031 use std::hash::Hash;
1032 use std::sync::Arc;
1033
1034 use iceberg::io::LocalFsStorageFactory;
1035 use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
1036 use iceberg::table::Table;
1037 use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent};
1038 use itertools::Itertools;
1039 use regex::Regex;
1040 use sqlx::migrate::MigrateDatabase;
1041 use tempfile::TempDir;
1042
1043 use crate::catalog::{
1044 NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, SQL_CATALOG_PROP_URI,
1045 SQL_CATALOG_PROP_WAREHOUSE,
1046 };
1047 use crate::{SqlBindStyle, SqlCatalogBuilder};
1048
1049 const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
1050
1051 fn temp_path() -> String {
1052 let temp_dir = TempDir::new().unwrap();
1053 temp_dir.path().to_str().unwrap().to_string()
1054 }
1055
1056 fn to_set<T: std::cmp::Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
1057 HashSet::from_iter(vec)
1058 }
1059
1060 fn default_properties() -> HashMap<String, String> {
1061 HashMap::from([("exists".to_string(), "true".to_string())])
1062 }
1063
1064 async fn new_sql_catalog(
1066 warehouse_location: String,
1067 name: Option<impl ToString>,
1068 ) -> impl Catalog {
1069 let name = if let Some(name) = name {
1070 name.to_string()
1071 } else {
1072 "iceberg".to_string()
1073 };
1074 let sql_lite_uri = format!("sqlite:{}", temp_path());
1075 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1076
1077 let props = HashMap::from_iter([
1078 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.to_string()),
1079 (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1080 (
1081 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1082 SqlBindStyle::DollarNumeric.to_string(),
1083 ),
1084 ]);
1085 SqlCatalogBuilder::default()
1086 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1087 .load(&name, props)
1088 .await
1089 .unwrap()
1090 }
1091
1092 async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
1093 let _ = catalog
1094 .create_namespace(namespace_ident, HashMap::new())
1095 .await
1096 .unwrap();
1097 }
1098
1099 async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
1100 for namespace_ident in namespace_idents {
1101 let _ = create_namespace(catalog, namespace_ident).await;
1102 }
1103 }
1104
1105 fn simple_table_schema() -> Schema {
1106 Schema::builder()
1107 .with_fields(vec![
1108 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
1109 ])
1110 .build()
1111 .unwrap()
1112 }
1113
1114 async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
1115 let _ = catalog
1116 .create_table(
1117 &table_ident.namespace,
1118 TableCreation::builder()
1119 .name(table_ident.name().into())
1120 .schema(simple_table_schema())
1121 .location(temp_path())
1122 .build(),
1123 )
1124 .await
1125 .unwrap();
1126 }
1127
1128 async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
1129 for table_ident in table_idents {
1130 create_table(catalog, table_ident).await;
1131 }
1132 }
1133
1134 fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
1135 assert_eq!(table.identifier(), expected_table_ident);
1136
1137 let metadata = table.metadata();
1138
1139 assert_eq!(metadata.current_schema().as_ref(), expected_schema);
1140
1141 let expected_partition_spec = PartitionSpec::builder(expected_schema.clone())
1142 .with_spec_id(0)
1143 .build()
1144 .unwrap();
1145
1146 assert_eq!(
1147 metadata
1148 .partition_specs_iter()
1149 .map(|p| p.as_ref())
1150 .collect_vec(),
1151 vec![&expected_partition_spec]
1152 );
1153
1154 let expected_sorted_order = SortOrder::builder()
1155 .with_order_id(0)
1156 .with_fields(vec![])
1157 .build(expected_schema)
1158 .unwrap();
1159
1160 assert_eq!(
1161 metadata
1162 .sort_orders_iter()
1163 .map(|s| s.as_ref())
1164 .collect_vec(),
1165 vec![&expected_sorted_order]
1166 );
1167
1168 assert_eq!(metadata.properties(), &HashMap::new());
1169
1170 assert!(!table.readonly());
1171 }
1172
1173 fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
1174 let actual = table.metadata_location().unwrap().to_string();
1175 let regex = Regex::new(regex_str).unwrap();
1176 assert!(regex.is_match(&actual))
1177 }
1178
1179 #[tokio::test]
1180 async fn test_initialized() {
1181 let warehouse_loc = temp_path();
1182 new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1183 new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1185 new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1186 }
1187
1188 #[tokio::test]
1189 async fn test_builder_method() {
1190 let sql_lite_uri = format!("sqlite:{}", temp_path());
1191 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1192 let warehouse_location = temp_path();
1193
1194 let catalog = SqlCatalogBuilder::default()
1195 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1196 .uri(sql_lite_uri.to_string())
1197 .warehouse_location(warehouse_location.clone())
1198 .sql_bind_style(SqlBindStyle::QMark)
1199 .load("iceberg", HashMap::default())
1200 .await;
1201 assert!(catalog.is_ok());
1202
1203 let catalog = catalog.unwrap();
1204 assert!(catalog.warehouse_location == warehouse_location);
1205 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1206 }
1207
1208 #[tokio::test]
1211 async fn test_builder_props_non_existent_path_fails() {
1212 let sql_lite_uri = format!("sqlite:{}", temp_path());
1213 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1214 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1215 let warehouse_location = temp_path();
1216
1217 let catalog = SqlCatalogBuilder::default()
1218 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1219 .uri(sql_lite_uri)
1220 .warehouse_location(warehouse_location)
1221 .load(
1222 "iceberg",
1223 HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)]),
1224 )
1225 .await;
1226 assert!(catalog.is_err());
1227 }
1228
1229 #[tokio::test]
1233 async fn test_builder_props_set_valid_uri() {
1234 let sql_lite_uri = format!("sqlite:{}", temp_path());
1235 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1236 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1237 let warehouse_location = temp_path();
1238
1239 let catalog = SqlCatalogBuilder::default()
1240 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1241 .uri(sql_lite_uri2)
1242 .warehouse_location(warehouse_location)
1243 .load(
1244 "iceberg",
1245 HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone())]),
1246 )
1247 .await;
1248 assert!(catalog.is_ok());
1249 }
1250
1251 #[tokio::test]
1253 async fn test_builder_props_take_precedence() {
1254 let sql_lite_uri = format!("sqlite:{}", temp_path());
1255 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1256 let warehouse_location = temp_path();
1257 let warehouse_location2 = temp_path();
1258
1259 let catalog = SqlCatalogBuilder::default()
1260 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1261 .warehouse_location(warehouse_location2)
1262 .sql_bind_style(SqlBindStyle::DollarNumeric)
1263 .load(
1264 "iceberg",
1265 HashMap::from_iter([
1266 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1267 (
1268 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1269 warehouse_location.clone(),
1270 ),
1271 (
1272 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1273 SqlBindStyle::QMark.to_string(),
1274 ),
1275 ]),
1276 )
1277 .await;
1278
1279 assert!(catalog.is_ok());
1280
1281 let catalog = catalog.unwrap();
1282 assert!(catalog.warehouse_location == warehouse_location);
1283 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1284 }
1285
1286 #[tokio::test]
1288 async fn test_builder_props_take_precedence_props() {
1289 let sql_lite_uri = format!("sqlite:{}", temp_path());
1290 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1291 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1292 let warehouse_location = temp_path();
1293 let warehouse_location2 = temp_path();
1294
1295 let props = HashMap::from_iter([
1296 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1297 (
1298 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1299 warehouse_location.clone(),
1300 ),
1301 (
1302 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1303 SqlBindStyle::QMark.to_string(),
1304 ),
1305 ]);
1306 let props2 = HashMap::from_iter([
1307 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2.clone()),
1308 (
1309 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1310 warehouse_location2.clone(),
1311 ),
1312 (
1313 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1314 SqlBindStyle::DollarNumeric.to_string(),
1315 ),
1316 ]);
1317
1318 let catalog = SqlCatalogBuilder::default()
1319 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1320 .props(props2)
1321 .load("iceberg", props)
1322 .await;
1323
1324 assert!(catalog.is_ok());
1325
1326 let catalog = catalog.unwrap();
1327 assert!(catalog.warehouse_location == warehouse_location);
1328 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1329 }
1330
1331 #[tokio::test]
1333 async fn test_builder_props_take_precedence_prop() {
1334 let sql_lite_uri = format!("sqlite:{}", temp_path());
1335 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1336 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1337 let warehouse_location = temp_path();
1338 let warehouse_location2 = temp_path();
1339
1340 let props = HashMap::from_iter([
1341 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1342 (
1343 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1344 warehouse_location.clone(),
1345 ),
1346 (
1347 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1348 SqlBindStyle::QMark.to_string(),
1349 ),
1350 ]);
1351
1352 let catalog = SqlCatalogBuilder::default()
1353 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1354 .prop(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)
1355 .prop(SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location2)
1356 .prop(
1357 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1358 SqlBindStyle::DollarNumeric.to_string(),
1359 )
1360 .load("iceberg", props)
1361 .await;
1362
1363 assert!(catalog.is_ok());
1364
1365 let catalog = catalog.unwrap();
1366 assert!(catalog.warehouse_location == warehouse_location);
1367 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1368 }
1369
1370 #[tokio::test]
1372 async fn test_builder_props_invalid_bind_style_fails() {
1373 let sql_lite_uri = format!("sqlite:{}", temp_path());
1374 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1375 let warehouse_location = temp_path();
1376
1377 let catalog = SqlCatalogBuilder::default()
1378 .with_storage_factory(Arc::new(LocalFsStorageFactory))
1379 .load(
1380 "iceberg",
1381 HashMap::from_iter([
1382 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1383 (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1384 (SQL_CATALOG_PROP_BIND_STYLE.to_string(), "AAA".to_string()),
1385 ]),
1386 )
1387 .await;
1388
1389 assert!(catalog.is_err());
1390 }
1391
1392 #[tokio::test]
1393 async fn test_list_namespaces_returns_empty_vector() {
1394 let warehouse_loc = temp_path();
1395 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1396
1397 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
1398 }
1399
1400 #[tokio::test]
1401 async fn test_list_namespaces_returns_empty_different_name() {
1402 let warehouse_loc = temp_path();
1403 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1404 let namespace_ident_1 = NamespaceIdent::new("a".into());
1405 let namespace_ident_2 = NamespaceIdent::new("b".into());
1406 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1407 assert_eq!(
1408 to_set(catalog.list_namespaces(None).await.unwrap()),
1409 to_set(vec![namespace_ident_1, namespace_ident_2])
1410 );
1411
1412 let catalog2 = new_sql_catalog(warehouse_loc, Some("test")).await;
1413 assert_eq!(catalog2.list_namespaces(None).await.unwrap(), vec![]);
1414 }
1415
1416 #[tokio::test]
1417 async fn test_list_namespaces_returns_only_top_level_namespaces() {
1418 let warehouse_loc = temp_path();
1419 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1420 let namespace_ident_1 = NamespaceIdent::new("a".into());
1421 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1422 let namespace_ident_3 = NamespaceIdent::new("b".into());
1423 create_namespaces(&catalog, &vec![
1424 &namespace_ident_1,
1425 &namespace_ident_2,
1426 &namespace_ident_3,
1427 ])
1428 .await;
1429
1430 assert_eq!(
1431 to_set(catalog.list_namespaces(None).await.unwrap()),
1432 to_set(vec![namespace_ident_1, namespace_ident_3])
1433 );
1434 }
1435
1436 #[tokio::test]
1437 async fn test_list_namespaces_returns_no_namespaces_under_parent() {
1438 let warehouse_loc = temp_path();
1439 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1440 let namespace_ident_1 = NamespaceIdent::new("a".into());
1441 let namespace_ident_2 = NamespaceIdent::new("b".into());
1442 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1443
1444 assert_eq!(
1445 catalog
1446 .list_namespaces(Some(&namespace_ident_1))
1447 .await
1448 .unwrap(),
1449 vec![]
1450 );
1451 }
1452
1453 #[tokio::test]
1454 async fn test_list_namespaces_returns_namespace_under_parent() {
1455 let warehouse_loc = temp_path();
1456 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1457 let namespace_ident_1 = NamespaceIdent::new("a".into());
1458 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1459 let namespace_ident_3 = NamespaceIdent::new("c".into());
1460 create_namespaces(&catalog, &vec![
1461 &namespace_ident_1,
1462 &namespace_ident_2,
1463 &namespace_ident_3,
1464 ])
1465 .await;
1466
1467 assert_eq!(
1468 to_set(catalog.list_namespaces(None).await.unwrap()),
1469 to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
1470 );
1471
1472 assert_eq!(
1473 catalog
1474 .list_namespaces(Some(&namespace_ident_1))
1475 .await
1476 .unwrap(),
1477 vec![NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()]
1478 );
1479 }
1480
1481 #[tokio::test]
1482 async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
1483 let warehouse_loc = temp_path();
1484 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1485 let namespace_ident_1 = NamespaceIdent::new("a".to_string());
1486 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
1487 let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1488 let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
1489 let namespace_ident_5 = NamespaceIdent::new("b".into());
1490 create_namespaces(&catalog, &vec![
1491 &namespace_ident_1,
1492 &namespace_ident_2,
1493 &namespace_ident_3,
1494 &namespace_ident_4,
1495 &namespace_ident_5,
1496 ])
1497 .await;
1498
1499 assert_eq!(
1500 to_set(
1501 catalog
1502 .list_namespaces(Some(&namespace_ident_1))
1503 .await
1504 .unwrap()
1505 ),
1506 to_set(vec![
1507 NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(),
1508 NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(),
1509 NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(),
1510 ])
1511 );
1512 }
1513
1514 #[tokio::test]
1515 async fn test_namespace_exists_returns_false() {
1516 let warehouse_loc = temp_path();
1517 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1518 let namespace_ident = NamespaceIdent::new("a".into());
1519 create_namespace(&catalog, &namespace_ident).await;
1520
1521 assert!(
1522 !catalog
1523 .namespace_exists(&NamespaceIdent::new("b".into()))
1524 .await
1525 .unwrap()
1526 );
1527 }
1528
1529 #[tokio::test]
1530 async fn test_namespace_exists_returns_true() {
1531 let warehouse_loc = temp_path();
1532 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1533 let namespace_ident = NamespaceIdent::new("a".into());
1534 create_namespace(&catalog, &namespace_ident).await;
1535
1536 assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
1537 }
1538
1539 #[tokio::test]
1540 async fn test_create_namespace_with_properties() {
1541 let warehouse_loc = temp_path();
1542 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1543 let namespace_ident = NamespaceIdent::new("abc".into());
1544
1545 let mut properties = default_properties();
1546 properties.insert("k".into(), "v".into());
1547
1548 assert_eq!(
1549 catalog
1550 .create_namespace(&namespace_ident, properties.clone())
1551 .await
1552 .unwrap(),
1553 Namespace::with_properties(namespace_ident.clone(), properties.clone())
1554 );
1555
1556 assert_eq!(
1557 catalog.get_namespace(&namespace_ident).await.unwrap(),
1558 Namespace::with_properties(namespace_ident, properties)
1559 );
1560 }
1561
1562 #[tokio::test]
1563 async fn test_create_nested_namespace() {
1564 let warehouse_loc = temp_path();
1565 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1566 let parent_namespace_ident = NamespaceIdent::new("a".into());
1567 create_namespace(&catalog, &parent_namespace_ident).await;
1568
1569 let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1570
1571 assert_eq!(
1572 catalog
1573 .create_namespace(&child_namespace_ident, HashMap::new())
1574 .await
1575 .unwrap(),
1576 Namespace::new(child_namespace_ident.clone())
1577 );
1578
1579 assert_eq!(
1580 catalog.get_namespace(&child_namespace_ident).await.unwrap(),
1581 Namespace::with_properties(child_namespace_ident, default_properties())
1582 );
1583 }
1584
1585 #[tokio::test]
1586 async fn test_create_deeply_nested_namespace() {
1587 let warehouse_loc = temp_path();
1588 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1589 let namespace_ident_a = NamespaceIdent::new("a".into());
1590 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1591 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1592
1593 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1594
1595 assert_eq!(
1596 catalog
1597 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
1598 .await
1599 .unwrap(),
1600 Namespace::new(namespace_ident_a_b_c.clone())
1601 );
1602
1603 assert_eq!(
1604 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
1605 Namespace::with_properties(namespace_ident_a_b_c, default_properties())
1606 );
1607 }
1608
1609 #[tokio::test]
1610 async fn test_update_namespace_noop() {
1611 let warehouse_loc = temp_path();
1612 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1613 let namespace_ident = NamespaceIdent::new("a".into());
1614 create_namespace(&catalog, &namespace_ident).await;
1615
1616 catalog
1617 .update_namespace(&namespace_ident, HashMap::new())
1618 .await
1619 .unwrap();
1620
1621 assert_eq!(
1622 *catalog
1623 .get_namespace(&namespace_ident)
1624 .await
1625 .unwrap()
1626 .properties(),
1627 HashMap::from_iter([("exists".to_string(), "true".to_string())])
1628 )
1629 }
1630
1631 #[tokio::test]
1632 async fn test_update_nested_namespace() {
1633 let warehouse_loc = temp_path();
1634 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1635 let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1636 create_namespace(&catalog, &namespace_ident).await;
1637
1638 let mut props = HashMap::from_iter([
1639 ("prop1".to_string(), "val1".to_string()),
1640 ("prop2".into(), "val2".into()),
1641 ]);
1642
1643 catalog
1644 .update_namespace(&namespace_ident, props.clone())
1645 .await
1646 .unwrap();
1647
1648 props.insert("exists".into(), "true".into());
1649
1650 assert_eq!(
1651 *catalog
1652 .get_namespace(&namespace_ident)
1653 .await
1654 .unwrap()
1655 .properties(),
1656 props
1657 )
1658 }
1659
1660 #[tokio::test]
1661 async fn test_update_namespace_errors_if_nested_namespace_doesnt_exist() {
1662 let warehouse_loc = temp_path();
1663 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1664 let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1665
1666 let props = HashMap::from_iter([
1667 ("prop1".to_string(), "val1".to_string()),
1668 ("prop2".into(), "val2".into()),
1669 ]);
1670
1671 let err = catalog
1672 .update_namespace(&namespace_ident, props)
1673 .await
1674 .unwrap_err();
1675
1676 assert_eq!(
1677 err.message(),
1678 format!("No such namespace: {namespace_ident:?}")
1679 );
1680 }
1681
1682 #[tokio::test]
1683 async fn test_drop_nested_namespace() {
1684 let warehouse_loc = temp_path();
1685 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1686 let namespace_ident_a = NamespaceIdent::new("a".into());
1687 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1688 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1689
1690 catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1691
1692 assert!(
1693 !catalog
1694 .namespace_exists(&namespace_ident_a_b)
1695 .await
1696 .unwrap()
1697 );
1698
1699 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1700 }
1701
1702 #[tokio::test]
1703 async fn test_drop_deeply_nested_namespace() {
1704 let warehouse_loc = temp_path();
1705 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1706 let namespace_ident_a = NamespaceIdent::new("a".into());
1707 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1708 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1709 create_namespaces(&catalog, &vec![
1710 &namespace_ident_a,
1711 &namespace_ident_a_b,
1712 &namespace_ident_a_b_c,
1713 ])
1714 .await;
1715
1716 catalog
1717 .drop_namespace(&namespace_ident_a_b_c)
1718 .await
1719 .unwrap();
1720
1721 assert!(
1722 !catalog
1723 .namespace_exists(&namespace_ident_a_b_c)
1724 .await
1725 .unwrap()
1726 );
1727
1728 assert!(
1729 catalog
1730 .namespace_exists(&namespace_ident_a_b)
1731 .await
1732 .unwrap()
1733 );
1734
1735 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1736 }
1737
1738 #[tokio::test]
1739 async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1740 let warehouse_loc = temp_path();
1741 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1742 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1743
1744 let non_existent_namespace_ident =
1745 NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1746 assert_eq!(
1747 catalog
1748 .drop_namespace(&non_existent_namespace_ident)
1749 .await
1750 .unwrap_err()
1751 .to_string(),
1752 format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1753 )
1754 }
1755
1756 #[tokio::test]
1757 async fn test_dropping_a_namespace_does_not_drop_namespaces_nested_under_that_one() {
1758 let warehouse_loc = temp_path();
1759 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1760 let namespace_ident_a = NamespaceIdent::new("a".into());
1761 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1762 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1763
1764 catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1765
1766 assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1767
1768 assert!(
1769 catalog
1770 .namespace_exists(&namespace_ident_a_b)
1771 .await
1772 .unwrap()
1773 );
1774 }
1775
1776 #[tokio::test]
1777 async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1778 let warehouse_loc = temp_path();
1779 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1780
1781 let namespace_ident = NamespaceIdent::new("a".into());
1782 let mut namespace_properties = HashMap::new();
1783 let namespace_location = temp_path();
1784 namespace_properties.insert(
1785 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1786 namespace_location.to_string(),
1787 );
1788 catalog
1789 .create_namespace(&namespace_ident, namespace_properties)
1790 .await
1791 .unwrap();
1792
1793 let table_name = "tbl1";
1794 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1795 let expected_table_metadata_location_regex =
1796 format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1797
1798 let table = catalog
1799 .create_table(
1800 &namespace_ident,
1801 TableCreation::builder()
1802 .name(table_name.into())
1803 .schema(simple_table_schema())
1804 .build(),
1806 )
1807 .await
1808 .unwrap();
1809 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1810 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1811
1812 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1813 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1814 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1815 }
1816
1817 #[tokio::test]
1818 async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1819 {
1820 let warehouse_loc = temp_path();
1821 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1822
1823 let namespace_ident = NamespaceIdent::new("a".into());
1824 let mut namespace_properties = HashMap::new();
1825 let namespace_location = temp_path();
1826 namespace_properties.insert(
1827 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1828 namespace_location.to_string(),
1829 );
1830 catalog
1831 .create_namespace(&namespace_ident, namespace_properties)
1832 .await
1833 .unwrap();
1834
1835 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1836 let mut nested_namespace_properties = HashMap::new();
1837 let nested_namespace_location = temp_path();
1838 nested_namespace_properties.insert(
1839 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1840 nested_namespace_location.to_string(),
1841 );
1842 catalog
1843 .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1844 .await
1845 .unwrap();
1846
1847 let table_name = "tbl1";
1848 let expected_table_ident =
1849 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1850 let expected_table_metadata_location_regex = format!(
1851 "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1852 );
1853
1854 let table = catalog
1855 .create_table(
1856 &nested_namespace_ident,
1857 TableCreation::builder()
1858 .name(table_name.into())
1859 .schema(simple_table_schema())
1860 .build(),
1862 )
1863 .await
1864 .unwrap();
1865 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1866 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1867
1868 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1869 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1870 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1871 }
1872
1873 #[tokio::test]
1874 async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1875 {
1876 let warehouse_loc = temp_path();
1877 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1878
1879 let namespace_ident = NamespaceIdent::new("a".into());
1880 let namespace_properties = HashMap::new();
1882 catalog
1883 .create_namespace(&namespace_ident, namespace_properties)
1884 .await
1885 .unwrap();
1886
1887 let table_name = "tbl1";
1888 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1889 let expected_table_metadata_location_regex =
1890 format!("^{warehouse_loc}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1891
1892 let table = catalog
1893 .create_table(
1894 &namespace_ident,
1895 TableCreation::builder()
1896 .name(table_name.into())
1897 .schema(simple_table_schema())
1898 .build(),
1900 )
1901 .await
1902 .unwrap();
1903 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1904 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1905
1906 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1907 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1908 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1909 }
1910
1911 #[tokio::test]
1912 async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1913 {
1914 let warehouse_loc = temp_path();
1915 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1916
1917 let namespace_ident = NamespaceIdent::new("a".into());
1918 create_namespace(&catalog, &namespace_ident).await;
1919
1920 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1921 create_namespace(&catalog, &nested_namespace_ident).await;
1922
1923 let table_name = "tbl1";
1924 let expected_table_ident =
1925 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1926 let expected_table_metadata_location_regex =
1927 format!("^{warehouse_loc}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1928
1929 let table = catalog
1930 .create_table(
1931 &nested_namespace_ident,
1932 TableCreation::builder()
1933 .name(table_name.into())
1934 .schema(simple_table_schema())
1935 .build(),
1937 )
1938 .await
1939 .unwrap();
1940 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1941 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1942
1943 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1944 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1945 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1946 }
1947
1948 #[tokio::test]
1949 async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1950 let warehouse_loc = temp_path();
1951 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1952 let namespace_ident = NamespaceIdent::new("a".into());
1953 create_namespace(&catalog, &namespace_ident).await;
1954 let table_name = "tbl1";
1955 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1956 create_table(&catalog, &table_ident).await;
1957
1958 let tmp_dir = TempDir::new().unwrap();
1959 let location = tmp_dir.path().to_str().unwrap().to_string();
1960
1961 assert_eq!(
1962 catalog
1963 .create_table(
1964 &namespace_ident,
1965 TableCreation::builder()
1966 .name(table_name.into())
1967 .schema(simple_table_schema())
1968 .location(location)
1969 .build()
1970 )
1971 .await
1972 .unwrap_err()
1973 .to_string(),
1974 format!(
1975 "TableAlreadyExists => Table {:?} already exists.",
1976 &table_ident
1977 )
1978 );
1979 }
1980
1981 #[tokio::test]
1982 async fn test_rename_table_src_table_is_same_as_dst_table() {
1983 let warehouse_loc = temp_path();
1984 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1985 let namespace_ident = NamespaceIdent::new("n1".into());
1986 create_namespace(&catalog, &namespace_ident).await;
1987 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1988 create_table(&catalog, &table_ident).await;
1989
1990 catalog
1991 .rename_table(&table_ident, &table_ident)
1992 .await
1993 .unwrap();
1994
1995 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1996 table_ident
1997 ],);
1998 }
1999
2000 #[tokio::test]
2001 async fn test_rename_table_across_nested_namespaces() {
2002 let warehouse_loc = temp_path();
2003 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2004 let namespace_ident_a = NamespaceIdent::new("a".into());
2005 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
2006 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
2007 create_namespaces(&catalog, &vec![
2008 &namespace_ident_a,
2009 &namespace_ident_a_b,
2010 &namespace_ident_a_b_c,
2011 ])
2012 .await;
2013
2014 let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
2015 create_tables(&catalog, vec![&src_table_ident]).await;
2016
2017 let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
2018 catalog
2019 .rename_table(&src_table_ident, &dst_table_ident)
2020 .await
2021 .unwrap();
2022
2023 assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
2024
2025 assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
2026 }
2027
2028 #[tokio::test]
2029 async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
2030 let warehouse_loc = temp_path();
2031 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2032 let src_namespace_ident = NamespaceIdent::new("n1".into());
2033 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
2034 create_namespace(&catalog, &src_namespace_ident).await;
2035 create_table(&catalog, &src_table_ident).await;
2036
2037 let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
2038 let dst_table_ident =
2039 TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
2040 assert_eq!(
2041 catalog
2042 .rename_table(&src_table_ident, &dst_table_ident)
2043 .await
2044 .unwrap_err()
2045 .to_string(),
2046 format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"),
2047 );
2048 }
2049}