1use std::collections::{HashMap, HashSet};
19use std::str::FromStr;
20use std::time::Duration;
21
22use async_trait::async_trait;
23use iceberg::io::FileIO;
24use iceberg::spec::{TableMetadata, TableMetadataBuilder};
25use iceberg::table::Table;
26use iceberg::{
27 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
28 TableCommit, TableCreation, TableIdent,
29};
30use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers};
31use sqlx::{Any, AnyPool, Row, Transaction};
32
33use crate::error::{
34 from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err,
35};
36
37pub const SQL_CATALOG_PROP_URI: &str = "uri";
39pub const SQL_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
41pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style";
43
44static CATALOG_TABLE_NAME: &str = "iceberg_tables";
45static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
46static CATALOG_FIELD_TABLE_NAME: &str = "table_name";
47static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace";
48static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location";
49static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
50static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type";
51static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE";
52
53static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties";
54static NAMESPACE_FIELD_NAME: &str = "namespace";
55static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key";
56static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value";
57
58static NAMESPACE_LOCATION_PROPERTY_KEY: &str = "location";
59
60static MAX_CONNECTIONS: u32 = 10; static IDLE_TIMEOUT: u64 = 10; static TEST_BEFORE_ACQUIRE: bool = true; #[derive(Debug)]
66pub struct SqlCatalogBuilder(SqlCatalogConfig);
67
68impl Default for SqlCatalogBuilder {
69 fn default() -> Self {
70 Self(SqlCatalogConfig {
71 uri: "".to_string(),
72 name: "".to_string(),
73 warehouse_location: "".to_string(),
74 sql_bind_style: SqlBindStyle::DollarNumeric,
75 props: HashMap::new(),
76 })
77 }
78}
79
80impl SqlCatalogBuilder {
81 pub fn uri(mut self, uri: impl Into<String>) -> Self {
86 self.0.uri = uri.into();
87 self
88 }
89
90 pub fn warehouse_location(mut self, location: impl Into<String>) -> Self {
95 self.0.warehouse_location = location.into();
96 self
97 }
98
99 pub fn sql_bind_style(mut self, sql_bind_style: SqlBindStyle) -> Self {
104 self.0.sql_bind_style = sql_bind_style;
105 self
106 }
107
108 pub fn props(mut self, props: HashMap<String, String>) -> Self {
113 for (k, v) in props {
114 self.0.props.insert(k, v);
115 }
116 self
117 }
118
119 pub fn prop(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
126 self.0.props.insert(key.into(), value.into());
127 self
128 }
129}
130
131impl CatalogBuilder for SqlCatalogBuilder {
132 type C = SqlCatalog;
133
134 fn load(
135 mut self,
136 name: impl Into<String>,
137 props: HashMap<String, String>,
138 ) -> impl Future<Output = Result<Self::C>> + Send {
139 for (k, v) in props {
140 self.0.props.insert(k, v);
141 }
142
143 if let Some(uri) = self.0.props.remove(SQL_CATALOG_PROP_URI) {
144 self.0.uri = uri;
145 }
146 if let Some(warehouse_location) = self.0.props.remove(SQL_CATALOG_PROP_WAREHOUSE) {
147 self.0.warehouse_location = warehouse_location;
148 }
149
150 let name = name.into();
151
152 let mut valid_sql_bind_style = true;
153 if let Some(sql_bind_style) = self.0.props.remove(SQL_CATALOG_PROP_BIND_STYLE) {
154 if let Ok(sql_bind_style) = SqlBindStyle::from_str(&sql_bind_style) {
155 self.0.sql_bind_style = sql_bind_style;
156 } else {
157 valid_sql_bind_style = false;
158 }
159 }
160
161 let valid_name = !name.trim().is_empty();
162
163 async move {
164 if !valid_name {
165 Err(Error::new(
166 ErrorKind::DataInvalid,
167 "Catalog name cannot be empty",
168 ))
169 } else if !valid_sql_bind_style {
170 Err(Error::new(
171 ErrorKind::DataInvalid,
172 format!(
173 "`{}` values are valid only if they're `{}` or `{}`",
174 SQL_CATALOG_PROP_BIND_STYLE,
175 SqlBindStyle::DollarNumeric,
176 SqlBindStyle::QMark
177 ),
178 ))
179 } else {
180 self.0.name = name;
181 SqlCatalog::new(self.0).await
182 }
183 }
184 }
185}
186
187#[derive(Debug)]
196struct SqlCatalogConfig {
197 uri: String,
198 name: String,
199 warehouse_location: String,
200 sql_bind_style: SqlBindStyle,
201 props: HashMap<String, String>,
202}
203
204#[derive(Debug)]
205pub struct SqlCatalog {
207 name: String,
208 connection: AnyPool,
209 warehouse_location: String,
210 fileio: FileIO,
211 sql_bind_style: SqlBindStyle,
212}
213
214#[derive(Debug, PartialEq, strum::EnumString, strum::Display)]
215pub enum SqlBindStyle {
217 DollarNumeric,
219 QMark,
221}
222
223impl SqlCatalog {
224 async fn new(config: SqlCatalogConfig) -> Result<Self> {
226 let fileio = FileIO::from_path(&config.warehouse_location)?.build()?;
227 install_default_drivers();
228 let max_connections: u32 = config
229 .props
230 .get("pool.max-connections")
231 .map(|v| v.parse().unwrap())
232 .unwrap_or(MAX_CONNECTIONS);
233 let idle_timeout: u64 = config
234 .props
235 .get("pool.idle-timeout")
236 .map(|v| v.parse().unwrap())
237 .unwrap_or(IDLE_TIMEOUT);
238 let test_before_acquire: bool = config
239 .props
240 .get("pool.test-before-acquire")
241 .map(|v| v.parse().unwrap())
242 .unwrap_or(TEST_BEFORE_ACQUIRE);
243
244 let pool = AnyPoolOptions::new()
245 .max_connections(max_connections)
246 .idle_timeout(Duration::from_secs(idle_timeout))
247 .test_before_acquire(test_before_acquire)
248 .connect(&config.uri)
249 .await
250 .map_err(from_sqlx_error)?;
251
252 sqlx::query(&format!(
253 "CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_NAME} (
254 {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
255 {CATALOG_FIELD_TABLE_NAMESPACE} VARCHAR(255) NOT NULL,
256 {CATALOG_FIELD_TABLE_NAME} VARCHAR(255) NOT NULL,
257 {CATALOG_FIELD_METADATA_LOCATION_PROP} VARCHAR(1000),
258 {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} VARCHAR(1000),
259 {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5),
260 PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}))"
261 ))
262 .execute(&pool)
263 .await
264 .map_err(from_sqlx_error)?;
265
266 sqlx::query(&format!(
267 "CREATE TABLE IF NOT EXISTS {NAMESPACE_TABLE_NAME} (
268 {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
269 {NAMESPACE_FIELD_NAME} VARCHAR(255) NOT NULL,
270 {NAMESPACE_FIELD_PROPERTY_KEY} VARCHAR(255),
271 {NAMESPACE_FIELD_PROPERTY_VALUE} VARCHAR(1000),
272 PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}))"
273 ))
274 .execute(&pool)
275 .await
276 .map_err(from_sqlx_error)?;
277
278 Ok(SqlCatalog {
279 name: config.name.to_owned(),
280 connection: pool,
281 warehouse_location: config.warehouse_location,
282 fileio,
283 sql_bind_style: config.sql_bind_style,
284 })
285 }
286
287 fn replace_placeholders(&self, query: &str) -> String {
289 match self.sql_bind_style {
290 SqlBindStyle::DollarNumeric => {
291 let mut count = 1;
292 query
293 .chars()
294 .fold(String::with_capacity(query.len()), |mut acc, c| {
295 if c == '?' {
296 acc.push('$');
297 acc.push_str(&count.to_string());
298 count += 1;
299 } else {
300 acc.push(c);
301 }
302 acc
303 })
304 }
305 _ => query.to_owned(),
306 }
307 }
308
309 async fn fetch_rows(&self, query: &str, args: Vec<Option<&str>>) -> Result<Vec<AnyRow>> {
311 let query_with_placeholders = self.replace_placeholders(query);
312
313 let mut sqlx_query = sqlx::query(&query_with_placeholders);
314 for arg in args {
315 sqlx_query = sqlx_query.bind(arg);
316 }
317
318 sqlx_query
319 .fetch_all(&self.connection)
320 .await
321 .map_err(from_sqlx_error)
322 }
323
324 async fn execute(
326 &self,
327 query: &str,
328 args: Vec<Option<&str>>,
329 transaction: Option<&mut Transaction<'_, Any>>,
330 ) -> Result<AnyQueryResult> {
331 let query_with_placeholders = self.replace_placeholders(query);
332
333 let mut sqlx_query = sqlx::query(&query_with_placeholders);
334 for arg in args {
335 sqlx_query = sqlx_query.bind(arg);
336 }
337
338 match transaction {
339 Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error),
340 None => {
341 let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
342 let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error);
343 let _ = tx.commit().await.map_err(from_sqlx_error);
344 result
345 }
346 }
347 }
348}
349
350#[async_trait]
351impl Catalog for SqlCatalog {
352 async fn list_namespaces(
353 &self,
354 parent: Option<&NamespaceIdent>,
355 ) -> Result<Vec<NamespaceIdent>> {
356 let all_namespaces_stmt = format!(
358 "SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
359 FROM {CATALOG_TABLE_NAME}
360 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
361 UNION
362 SELECT {NAMESPACE_FIELD_NAME}
363 FROM {NAMESPACE_TABLE_NAME}
364 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
365 );
366
367 let namespace_rows = self
368 .fetch_rows(&all_namespaces_stmt, vec![
369 Some(&self.name),
370 Some(&self.name),
371 ])
372 .await?;
373
374 let mut namespaces = HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len());
375
376 if let Some(parent) = parent {
377 if self.namespace_exists(parent).await? {
378 let parent_str = parent.join(".");
379
380 for row in namespace_rows.iter() {
381 let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
382 if nsp != parent_str && nsp.starts_with(&parent_str) {
384 namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?);
385 }
386 }
387
388 Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
389 } else {
390 no_such_namespace_err(parent)
391 }
392 } else {
393 for row in namespace_rows.iter() {
394 let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
395 let mut levels = nsp.split(".").collect::<Vec<&str>>();
396 if !levels.is_empty() {
397 let first_level = levels.drain(..1).collect::<Vec<&str>>();
398 namespaces.insert(NamespaceIdent::from_strs(first_level)?);
399 }
400 }
401
402 Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
403 }
404 }
405
406 async fn create_namespace(
407 &self,
408 namespace: &NamespaceIdent,
409 properties: HashMap<String, String>,
410 ) -> Result<Namespace> {
411 let exists = self.namespace_exists(namespace).await?;
412
413 if exists {
414 return Err(Error::new(
415 iceberg::ErrorKind::Unexpected,
416 format!("Namespace {namespace:?} already exists"),
417 ));
418 }
419
420 let namespace_str = namespace.join(".");
421 let insert = format!(
422 "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
423 VALUES (?, ?, ?, ?)");
424 if !properties.is_empty() {
425 let mut insert_properties = properties.clone();
426 insert_properties.insert("exists".to_string(), "true".to_string());
427
428 let mut query_args = Vec::with_capacity(insert_properties.len() * 4);
429 let mut insert_stmt = insert.clone();
430 for (index, (key, value)) in insert_properties.iter().enumerate() {
431 query_args.extend_from_slice(&[
432 Some(self.name.as_str()),
433 Some(namespace_str.as_str()),
434 Some(key.as_str()),
435 Some(value.as_str()),
436 ]);
437 if index > 0 {
438 insert_stmt.push_str(", (?, ?, ?, ?)");
439 }
440 }
441
442 self.execute(&insert_stmt, query_args, None).await?;
443
444 Ok(Namespace::with_properties(
445 namespace.clone(),
446 insert_properties,
447 ))
448 } else {
449 self.execute(
451 &insert,
452 vec![
453 Some(&self.name),
454 Some(&namespace_str),
455 Some("exists"),
456 Some("true"),
457 ],
458 None,
459 )
460 .await?;
461 Ok(Namespace::with_properties(namespace.clone(), properties))
462 }
463 }
464
465 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
466 let exists = self.namespace_exists(namespace).await?;
467 if exists {
468 let namespace_props = self
469 .fetch_rows(
470 &format!(
471 "SELECT
472 {NAMESPACE_FIELD_NAME},
473 {NAMESPACE_FIELD_PROPERTY_KEY},
474 {NAMESPACE_FIELD_PROPERTY_VALUE}
475 FROM {NAMESPACE_TABLE_NAME}
476 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
477 AND {NAMESPACE_FIELD_NAME} = ?"
478 ),
479 vec![Some(&self.name), Some(&namespace.join("."))],
480 )
481 .await?;
482
483 let mut properties = HashMap::with_capacity(namespace_props.len());
484
485 for row in namespace_props {
486 let key = row
487 .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_KEY)
488 .map_err(from_sqlx_error)?;
489 let value = row
490 .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_VALUE)
491 .map_err(from_sqlx_error)?;
492
493 properties.insert(key, value);
494 }
495
496 Ok(Namespace::with_properties(namespace.clone(), properties))
497 } else {
498 no_such_namespace_err(namespace)
499 }
500 }
501
502 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
503 let namespace_str = namespace.join(".");
504
505 let table_namespaces = self
506 .fetch_rows(
507 &format!(
508 "SELECT 1 FROM {CATALOG_TABLE_NAME}
509 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
510 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
511 LIMIT 1"
512 ),
513 vec![Some(&self.name), Some(&namespace_str)],
514 )
515 .await?;
516
517 if !table_namespaces.is_empty() {
518 Ok(true)
519 } else {
520 let namespaces = self
521 .fetch_rows(
522 &format!(
523 "SELECT 1 FROM {NAMESPACE_TABLE_NAME}
524 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
525 AND {NAMESPACE_FIELD_NAME} = ?
526 LIMIT 1"
527 ),
528 vec![Some(&self.name), Some(&namespace_str)],
529 )
530 .await?;
531 if !namespaces.is_empty() {
532 Ok(true)
533 } else {
534 Ok(false)
535 }
536 }
537 }
538
539 async fn update_namespace(
540 &self,
541 namespace: &NamespaceIdent,
542 properties: HashMap<String, String>,
543 ) -> Result<()> {
544 let exists = self.namespace_exists(namespace).await?;
545 if exists {
546 let existing_properties = self.get_namespace(namespace).await?.properties().clone();
547 let namespace_str = namespace.join(".");
548
549 let mut updates = vec![];
550 let mut inserts = vec![];
551
552 for (key, value) in properties.iter() {
553 if existing_properties.contains_key(key) {
554 if existing_properties.get(key) != Some(value) {
555 updates.push((key, value));
556 }
557 } else {
558 inserts.push((key, value));
559 }
560 }
561
562 let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
563 let update_stmt = format!(
564 "UPDATE {NAMESPACE_TABLE_NAME} SET {NAMESPACE_FIELD_PROPERTY_VALUE} = ?
565 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
566 AND {NAMESPACE_FIELD_NAME} = ?
567 AND {NAMESPACE_FIELD_PROPERTY_KEY} = ?"
568 );
569
570 let insert_stmt = format!(
571 "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
572 VALUES (?, ?, ?, ?)"
573 );
574
575 for (key, value) in updates {
576 self.execute(
577 &update_stmt,
578 vec![
579 Some(value),
580 Some(&self.name),
581 Some(&namespace_str),
582 Some(key),
583 ],
584 Some(&mut tx),
585 )
586 .await?;
587 }
588
589 for (key, value) in inserts {
590 self.execute(
591 &insert_stmt,
592 vec![
593 Some(&self.name),
594 Some(&namespace_str),
595 Some(key),
596 Some(value),
597 ],
598 Some(&mut tx),
599 )
600 .await?;
601 }
602
603 let _ = tx.commit().await.map_err(from_sqlx_error)?;
604
605 Ok(())
606 } else {
607 no_such_namespace_err(namespace)
608 }
609 }
610
611 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
612 let exists = self.namespace_exists(namespace).await?;
613 if exists {
614 let tables = self.list_tables(namespace).await?;
616 if !tables.is_empty() {
617 return Err(Error::new(
618 iceberg::ErrorKind::Unexpected,
619 format!(
620 "Namespace {:?} is not empty. {} tables exist.",
621 namespace,
622 tables.len()
623 ),
624 ));
625 }
626
627 self.execute(
628 &format!(
629 "DELETE FROM {NAMESPACE_TABLE_NAME}
630 WHERE {NAMESPACE_FIELD_NAME} = ?
631 AND {CATALOG_FIELD_CATALOG_NAME} = ?"
632 ),
633 vec![Some(&namespace.join(".")), Some(&self.name)],
634 None,
635 )
636 .await?;
637
638 Ok(())
639 } else {
640 no_such_namespace_err(namespace)
641 }
642 }
643
644 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
645 let exists = self.namespace_exists(namespace).await?;
646 if exists {
647 let rows = self
648 .fetch_rows(
649 &format!(
650 "SELECT {CATALOG_FIELD_TABLE_NAME},
651 {CATALOG_FIELD_TABLE_NAMESPACE}
652 FROM {CATALOG_TABLE_NAME}
653 WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
654 AND {CATALOG_FIELD_CATALOG_NAME} = ?
655 AND (
656 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
657 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
658 )",
659 ),
660 vec![Some(&namespace.join(".")), Some(&self.name)],
661 )
662 .await?;
663
664 let mut tables = HashSet::<TableIdent>::with_capacity(rows.len());
665
666 for row in rows.iter() {
667 let tbl = row
668 .try_get::<String, _>(CATALOG_FIELD_TABLE_NAME)
669 .map_err(from_sqlx_error)?;
670 let ns_strs = row
671 .try_get::<String, _>(CATALOG_FIELD_TABLE_NAMESPACE)
672 .map_err(from_sqlx_error)?;
673 let ns = NamespaceIdent::from_strs(ns_strs.split("."))?;
674 tables.insert(TableIdent::new(ns, tbl));
675 }
676
677 Ok(tables.into_iter().collect::<Vec<TableIdent>>())
678 } else {
679 no_such_namespace_err(namespace)
680 }
681 }
682
683 async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
684 let namespace = identifier.namespace().join(".");
685 let table_name = identifier.name();
686 let table_counts = self
687 .fetch_rows(
688 &format!(
689 "SELECT 1
690 FROM {CATALOG_TABLE_NAME}
691 WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
692 AND {CATALOG_FIELD_CATALOG_NAME} = ?
693 AND {CATALOG_FIELD_TABLE_NAME} = ?
694 AND (
695 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
696 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
697 )"
698 ),
699 vec![Some(&namespace), Some(&self.name), Some(table_name)],
700 )
701 .await?;
702
703 if !table_counts.is_empty() {
704 Ok(true)
705 } else {
706 Ok(false)
707 }
708 }
709
710 async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
711 if !self.table_exists(identifier).await? {
712 return no_such_table_err(identifier);
713 }
714
715 self.execute(
716 &format!(
717 "DELETE FROM {CATALOG_TABLE_NAME}
718 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
719 AND {CATALOG_FIELD_TABLE_NAME} = ?
720 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
721 AND (
722 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
723 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
724 )"
725 ),
726 vec![
727 Some(&self.name),
728 Some(identifier.name()),
729 Some(&identifier.namespace().join(".")),
730 ],
731 None,
732 )
733 .await?;
734
735 Ok(())
736 }
737
738 async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
739 if !self.table_exists(identifier).await? {
740 return no_such_table_err(identifier);
741 }
742
743 let rows = self
744 .fetch_rows(
745 &format!(
746 "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP}
747 FROM {CATALOG_TABLE_NAME}
748 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
749 AND {CATALOG_FIELD_TABLE_NAME} = ?
750 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
751 AND (
752 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
753 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
754 )"
755 ),
756 vec![
757 Some(&self.name),
758 Some(identifier.name()),
759 Some(&identifier.namespace().join(".")),
760 ],
761 )
762 .await?;
763
764 if rows.is_empty() {
765 return no_such_table_err(identifier);
766 }
767
768 let row = &rows[0];
769 let tbl_metadata_location = row
770 .try_get::<String, _>(CATALOG_FIELD_METADATA_LOCATION_PROP)
771 .map_err(from_sqlx_error)?;
772
773 let metadata = TableMetadata::read_from(&self.fileio, &tbl_metadata_location).await?;
774
775 Ok(Table::builder()
776 .file_io(self.fileio.clone())
777 .identifier(identifier.clone())
778 .metadata_location(tbl_metadata_location)
779 .metadata(metadata)
780 .build()?)
781 }
782
783 async fn create_table(
784 &self,
785 namespace: &NamespaceIdent,
786 creation: TableCreation,
787 ) -> Result<Table> {
788 if !self.namespace_exists(namespace).await? {
789 return no_such_namespace_err(namespace);
790 }
791
792 let tbl_name = creation.name.clone();
793 let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone());
794
795 if self.table_exists(&tbl_ident).await? {
796 return table_already_exists_err(&tbl_ident);
797 }
798
799 let (tbl_creation, location) = match creation.location.clone() {
800 Some(location) => (creation, location),
801 None => {
802 let nsp_properties = self.get_namespace(namespace).await?.properties().clone();
805 let nsp_location = match nsp_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) {
806 Some(location) => location.clone(),
807 None => {
808 format!(
809 "{}/{}",
810 self.warehouse_location.clone(),
811 namespace.join("/")
812 )
813 }
814 };
815
816 let tbl_location = format!("{}/{}", nsp_location, tbl_ident.name());
817
818 (
819 TableCreation {
820 location: Some(tbl_location.clone()),
821 ..creation
822 },
823 tbl_location,
824 )
825 }
826 };
827
828 let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
829 .build()?
830 .metadata;
831 let tbl_metadata_location =
832 MetadataLocation::new_with_table_location(location.clone()).to_string();
833
834 tbl_metadata
835 .write_to(&self.fileio, &tbl_metadata_location)
836 .await?;
837
838 self.execute(&format!(
839 "INSERT INTO {CATALOG_TABLE_NAME}
840 ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
841 VALUES (?, ?, ?, ?, ?)
842 "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
843
844 Ok(Table::builder()
845 .file_io(self.fileio.clone())
846 .metadata_location(tbl_metadata_location)
847 .identifier(tbl_ident)
848 .metadata(tbl_metadata)
849 .build()?)
850 }
851
852 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
853 if src == dest {
854 return Ok(());
855 }
856
857 if !self.table_exists(src).await? {
858 return no_such_table_err(src);
859 }
860
861 if !self.namespace_exists(dest.namespace()).await? {
862 return no_such_namespace_err(dest.namespace());
863 }
864
865 if self.table_exists(dest).await? {
866 return table_already_exists_err(dest);
867 }
868
869 self.execute(
870 &format!(
871 "UPDATE {CATALOG_TABLE_NAME}
872 SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ?
873 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
874 AND {CATALOG_FIELD_TABLE_NAME} = ?
875 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
876 AND (
877 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
878 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
879 )"
880 ),
881 vec![
882 Some(dest.name()),
883 Some(&dest.namespace().join(".")),
884 Some(&self.name),
885 Some(src.name()),
886 Some(&src.namespace().join(".")),
887 ],
888 None,
889 )
890 .await?;
891
892 Ok(())
893 }
894
895 async fn register_table(
896 &self,
897 table_ident: &TableIdent,
898 metadata_location: String,
899 ) -> Result<Table> {
900 if self.table_exists(table_ident).await? {
901 return table_already_exists_err(table_ident);
902 }
903
904 let metadata = TableMetadata::read_from(&self.fileio, &metadata_location).await?;
905
906 let namespace = table_ident.namespace();
907 let tbl_name = table_ident.name().to_string();
908
909 self.execute(&format!(
910 "INSERT INTO {CATALOG_TABLE_NAME}
911 ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
912 VALUES (?, ?, ?, ?, ?)
913 "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name), Some(&metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
914
915 Ok(Table::builder()
916 .identifier(table_ident.clone())
917 .metadata_location(metadata_location)
918 .metadata(metadata)
919 .file_io(self.fileio.clone())
920 .build()?)
921 }
922
923 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
925 let table_ident = commit.identifier().clone();
926 let current_table = self.load_table(&table_ident).await?;
927 let current_metadata_location = current_table.metadata_location_result()?.to_string();
928
929 let staged_table = commit.apply(current_table)?;
930 let staged_metadata_location = staged_table.metadata_location_result()?;
931
932 staged_table
933 .metadata()
934 .write_to(staged_table.file_io(), &staged_metadata_location)
935 .await?;
936
937 let update_result = self
938 .execute(
939 &format!(
940 "UPDATE {CATALOG_TABLE_NAME}
941 SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?, {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} = ?
942 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
943 AND {CATALOG_FIELD_TABLE_NAME} = ?
944 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
945 AND (
946 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
947 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
948 )
949 AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?"
950 ),
951 vec![
952 Some(staged_metadata_location),
953 Some(current_metadata_location.as_str()),
954 Some(&self.name),
955 Some(table_ident.name()),
956 Some(&table_ident.namespace().join(".")),
957 Some(current_metadata_location.as_str()),
958 ],
959 None,
960 )
961 .await?;
962
963 if update_result.rows_affected() == 0 {
964 return Err(Error::new(
965 ErrorKind::CatalogCommitConflicts,
966 format!("Commit conflicted for table: {table_ident}"),
967 )
968 .with_retryable(true));
969 }
970
971 Ok(staged_table)
972 }
973}
974
975#[cfg(test)]
976mod tests {
977 use std::collections::{HashMap, HashSet};
978 use std::hash::Hash;
979
980 use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
981 use iceberg::table::Table;
982 use iceberg::transaction::{ApplyTransactionAction, Transaction};
983 use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent};
984 use itertools::Itertools;
985 use regex::Regex;
986 use sqlx::migrate::MigrateDatabase;
987 use tempfile::TempDir;
988
989 use crate::catalog::{
990 NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, SQL_CATALOG_PROP_URI,
991 SQL_CATALOG_PROP_WAREHOUSE,
992 };
993 use crate::{SqlBindStyle, SqlCatalogBuilder};
994
995 const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
996
997 fn temp_path() -> String {
998 let temp_dir = TempDir::new().unwrap();
999 temp_dir.path().to_str().unwrap().to_string()
1000 }
1001
1002 fn to_set<T: std::cmp::Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
1003 HashSet::from_iter(vec)
1004 }
1005
1006 fn default_properties() -> HashMap<String, String> {
1007 HashMap::from([("exists".to_string(), "true".to_string())])
1008 }
1009
1010 async fn new_sql_catalog(
1012 warehouse_location: String,
1013 name: Option<impl ToString>,
1014 ) -> impl Catalog {
1015 let name = if let Some(name) = name {
1016 name.to_string()
1017 } else {
1018 "iceberg".to_string()
1019 };
1020 let sql_lite_uri = format!("sqlite:{}", temp_path());
1021 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1022
1023 let props = HashMap::from_iter([
1024 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.to_string()),
1025 (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1026 (
1027 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1028 SqlBindStyle::DollarNumeric.to_string(),
1029 ),
1030 ]);
1031 SqlCatalogBuilder::default()
1032 .load(&name, props)
1033 .await
1034 .unwrap()
1035 }
1036
1037 async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
1038 let _ = catalog
1039 .create_namespace(namespace_ident, HashMap::new())
1040 .await
1041 .unwrap();
1042 }
1043
1044 async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
1045 for namespace_ident in namespace_idents {
1046 let _ = create_namespace(catalog, namespace_ident).await;
1047 }
1048 }
1049
1050 fn simple_table_schema() -> Schema {
1051 Schema::builder()
1052 .with_fields(vec![
1053 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
1054 ])
1055 .build()
1056 .unwrap()
1057 }
1058
1059 async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
1060 let _ = catalog
1061 .create_table(
1062 &table_ident.namespace,
1063 TableCreation::builder()
1064 .name(table_ident.name().into())
1065 .schema(simple_table_schema())
1066 .location(temp_path())
1067 .build(),
1068 )
1069 .await
1070 .unwrap();
1071 }
1072
1073 async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
1074 for table_ident in table_idents {
1075 create_table(catalog, table_ident).await;
1076 }
1077 }
1078
1079 fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
1080 assert_eq!(table.identifier(), expected_table_ident);
1081
1082 let metadata = table.metadata();
1083
1084 assert_eq!(metadata.current_schema().as_ref(), expected_schema);
1085
1086 let expected_partition_spec = PartitionSpec::builder(expected_schema.clone())
1087 .with_spec_id(0)
1088 .build()
1089 .unwrap();
1090
1091 assert_eq!(
1092 metadata
1093 .partition_specs_iter()
1094 .map(|p| p.as_ref())
1095 .collect_vec(),
1096 vec![&expected_partition_spec]
1097 );
1098
1099 let expected_sorted_order = SortOrder::builder()
1100 .with_order_id(0)
1101 .with_fields(vec![])
1102 .build(expected_schema)
1103 .unwrap();
1104
1105 assert_eq!(
1106 metadata
1107 .sort_orders_iter()
1108 .map(|s| s.as_ref())
1109 .collect_vec(),
1110 vec![&expected_sorted_order]
1111 );
1112
1113 assert_eq!(metadata.properties(), &HashMap::new());
1114
1115 assert!(!table.readonly());
1116 }
1117
1118 fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
1119 let actual = table.metadata_location().unwrap().to_string();
1120 let regex = Regex::new(regex_str).unwrap();
1121 assert!(regex.is_match(&actual))
1122 }
1123
1124 #[tokio::test]
1125 async fn test_initialized() {
1126 let warehouse_loc = temp_path();
1127 new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1128 new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1130 new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1131 }
1132
1133 #[tokio::test]
1134 async fn test_builder_method() {
1135 let sql_lite_uri = format!("sqlite:{}", temp_path());
1136 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1137 let warehouse_location = temp_path();
1138
1139 let catalog = SqlCatalogBuilder::default()
1140 .uri(sql_lite_uri.to_string())
1141 .warehouse_location(warehouse_location.clone())
1142 .sql_bind_style(SqlBindStyle::QMark)
1143 .load("iceberg", HashMap::default())
1144 .await;
1145 assert!(catalog.is_ok());
1146
1147 let catalog = catalog.unwrap();
1148 assert!(catalog.warehouse_location == warehouse_location);
1149 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1150 }
1151
1152 #[tokio::test]
1155 async fn test_builder_props_non_existent_path_fails() {
1156 let sql_lite_uri = format!("sqlite:{}", temp_path());
1157 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1158 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1159 let warehouse_location = temp_path();
1160
1161 let catalog = SqlCatalogBuilder::default()
1162 .uri(sql_lite_uri)
1163 .warehouse_location(warehouse_location)
1164 .load(
1165 "iceberg",
1166 HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)]),
1167 )
1168 .await;
1169 assert!(catalog.is_err());
1170 }
1171
1172 #[tokio::test]
1176 async fn test_builder_props_set_valid_uri() {
1177 let sql_lite_uri = format!("sqlite:{}", temp_path());
1178 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1179 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1180 let warehouse_location = temp_path();
1181
1182 let catalog = SqlCatalogBuilder::default()
1183 .uri(sql_lite_uri2)
1184 .warehouse_location(warehouse_location)
1185 .load(
1186 "iceberg",
1187 HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone())]),
1188 )
1189 .await;
1190 assert!(catalog.is_ok());
1191 }
1192
1193 #[tokio::test]
1195 async fn test_builder_props_take_precedence() {
1196 let sql_lite_uri = format!("sqlite:{}", temp_path());
1197 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1198 let warehouse_location = temp_path();
1199 let warehouse_location2 = temp_path();
1200
1201 let catalog = SqlCatalogBuilder::default()
1202 .warehouse_location(warehouse_location2)
1203 .sql_bind_style(SqlBindStyle::DollarNumeric)
1204 .load(
1205 "iceberg",
1206 HashMap::from_iter([
1207 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1208 (
1209 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1210 warehouse_location.clone(),
1211 ),
1212 (
1213 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1214 SqlBindStyle::QMark.to_string(),
1215 ),
1216 ]),
1217 )
1218 .await;
1219
1220 assert!(catalog.is_ok());
1221
1222 let catalog = catalog.unwrap();
1223 assert!(catalog.warehouse_location == warehouse_location);
1224 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1225 }
1226
1227 #[tokio::test]
1229 async fn test_builder_props_take_precedence_props() {
1230 let sql_lite_uri = format!("sqlite:{}", temp_path());
1231 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1232 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1233 let warehouse_location = temp_path();
1234 let warehouse_location2 = temp_path();
1235
1236 let props = HashMap::from_iter([
1237 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1238 (
1239 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1240 warehouse_location.clone(),
1241 ),
1242 (
1243 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1244 SqlBindStyle::QMark.to_string(),
1245 ),
1246 ]);
1247 let props2 = HashMap::from_iter([
1248 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2.clone()),
1249 (
1250 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1251 warehouse_location2.clone(),
1252 ),
1253 (
1254 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1255 SqlBindStyle::DollarNumeric.to_string(),
1256 ),
1257 ]);
1258
1259 let catalog = SqlCatalogBuilder::default()
1260 .props(props2)
1261 .load("iceberg", props)
1262 .await;
1263
1264 assert!(catalog.is_ok());
1265
1266 let catalog = catalog.unwrap();
1267 assert!(catalog.warehouse_location == warehouse_location);
1268 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1269 }
1270
1271 #[tokio::test]
1273 async fn test_builder_props_take_precedence_prop() {
1274 let sql_lite_uri = format!("sqlite:{}", temp_path());
1275 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1276 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1277 let warehouse_location = temp_path();
1278 let warehouse_location2 = temp_path();
1279
1280 let props = HashMap::from_iter([
1281 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1282 (
1283 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1284 warehouse_location.clone(),
1285 ),
1286 (
1287 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1288 SqlBindStyle::QMark.to_string(),
1289 ),
1290 ]);
1291
1292 let catalog = SqlCatalogBuilder::default()
1293 .prop(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)
1294 .prop(SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location2)
1295 .prop(
1296 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1297 SqlBindStyle::DollarNumeric.to_string(),
1298 )
1299 .load("iceberg", props)
1300 .await;
1301
1302 assert!(catalog.is_ok());
1303
1304 let catalog = catalog.unwrap();
1305 assert!(catalog.warehouse_location == warehouse_location);
1306 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1307 }
1308
1309 #[tokio::test]
1311 async fn test_builder_props_invalid_bind_style_fails() {
1312 let sql_lite_uri = format!("sqlite:{}", temp_path());
1313 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1314 let warehouse_location = temp_path();
1315
1316 let catalog = SqlCatalogBuilder::default()
1317 .load(
1318 "iceberg",
1319 HashMap::from_iter([
1320 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1321 (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1322 (SQL_CATALOG_PROP_BIND_STYLE.to_string(), "AAA".to_string()),
1323 ]),
1324 )
1325 .await;
1326
1327 assert!(catalog.is_err());
1328 }
1329
1330 #[tokio::test]
1331 async fn test_list_namespaces_returns_empty_vector() {
1332 let warehouse_loc = temp_path();
1333 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1334
1335 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
1336 }
1337
1338 #[tokio::test]
1339 async fn test_list_namespaces_returns_empty_different_name() {
1340 let warehouse_loc = temp_path();
1341 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1342 let namespace_ident_1 = NamespaceIdent::new("a".into());
1343 let namespace_ident_2 = NamespaceIdent::new("b".into());
1344 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1345 assert_eq!(
1346 to_set(catalog.list_namespaces(None).await.unwrap()),
1347 to_set(vec![namespace_ident_1, namespace_ident_2])
1348 );
1349
1350 let catalog2 = new_sql_catalog(warehouse_loc, Some("test")).await;
1351 assert_eq!(catalog2.list_namespaces(None).await.unwrap(), vec![]);
1352 }
1353
1354 #[tokio::test]
1355 async fn test_list_namespaces_returns_multiple_namespaces() {
1356 let warehouse_loc = temp_path();
1357 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1358 let namespace_ident_1 = NamespaceIdent::new("a".into());
1359 let namespace_ident_2 = NamespaceIdent::new("b".into());
1360 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1361
1362 assert_eq!(
1363 to_set(catalog.list_namespaces(None).await.unwrap()),
1364 to_set(vec![namespace_ident_1, namespace_ident_2])
1365 );
1366 }
1367
1368 #[tokio::test]
1369 async fn test_list_namespaces_returns_only_top_level_namespaces() {
1370 let warehouse_loc = temp_path();
1371 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1372 let namespace_ident_1 = NamespaceIdent::new("a".into());
1373 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1374 let namespace_ident_3 = NamespaceIdent::new("b".into());
1375 create_namespaces(&catalog, &vec![
1376 &namespace_ident_1,
1377 &namespace_ident_2,
1378 &namespace_ident_3,
1379 ])
1380 .await;
1381
1382 assert_eq!(
1383 to_set(catalog.list_namespaces(None).await.unwrap()),
1384 to_set(vec![namespace_ident_1, namespace_ident_3])
1385 );
1386 }
1387
1388 #[tokio::test]
1389 async fn test_list_namespaces_returns_no_namespaces_under_parent() {
1390 let warehouse_loc = temp_path();
1391 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1392 let namespace_ident_1 = NamespaceIdent::new("a".into());
1393 let namespace_ident_2 = NamespaceIdent::new("b".into());
1394 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1395
1396 assert_eq!(
1397 catalog
1398 .list_namespaces(Some(&namespace_ident_1))
1399 .await
1400 .unwrap(),
1401 vec![]
1402 );
1403 }
1404
1405 #[tokio::test]
1406 async fn test_list_namespaces_returns_namespace_under_parent() {
1407 let warehouse_loc = temp_path();
1408 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1409 let namespace_ident_1 = NamespaceIdent::new("a".into());
1410 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1411 let namespace_ident_3 = NamespaceIdent::new("c".into());
1412 create_namespaces(&catalog, &vec![
1413 &namespace_ident_1,
1414 &namespace_ident_2,
1415 &namespace_ident_3,
1416 ])
1417 .await;
1418
1419 assert_eq!(
1420 to_set(catalog.list_namespaces(None).await.unwrap()),
1421 to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
1422 );
1423
1424 assert_eq!(
1425 catalog
1426 .list_namespaces(Some(&namespace_ident_1))
1427 .await
1428 .unwrap(),
1429 vec![NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()]
1430 );
1431 }
1432
1433 #[tokio::test]
1434 async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
1435 let warehouse_loc = temp_path();
1436 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1437 let namespace_ident_1 = NamespaceIdent::new("a".to_string());
1438 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
1439 let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1440 let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
1441 let namespace_ident_5 = NamespaceIdent::new("b".into());
1442 create_namespaces(&catalog, &vec![
1443 &namespace_ident_1,
1444 &namespace_ident_2,
1445 &namespace_ident_3,
1446 &namespace_ident_4,
1447 &namespace_ident_5,
1448 ])
1449 .await;
1450
1451 assert_eq!(
1452 to_set(
1453 catalog
1454 .list_namespaces(Some(&namespace_ident_1))
1455 .await
1456 .unwrap()
1457 ),
1458 to_set(vec![
1459 NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(),
1460 NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(),
1461 NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(),
1462 ])
1463 );
1464 }
1465
1466 #[tokio::test]
1467 async fn test_namespace_exists_returns_false() {
1468 let warehouse_loc = temp_path();
1469 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1470 let namespace_ident = NamespaceIdent::new("a".into());
1471 create_namespace(&catalog, &namespace_ident).await;
1472
1473 assert!(
1474 !catalog
1475 .namespace_exists(&NamespaceIdent::new("b".into()))
1476 .await
1477 .unwrap()
1478 );
1479 }
1480
1481 #[tokio::test]
1482 async fn test_namespace_exists_returns_true() {
1483 let warehouse_loc = temp_path();
1484 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1485 let namespace_ident = NamespaceIdent::new("a".into());
1486 create_namespace(&catalog, &namespace_ident).await;
1487
1488 assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
1489 }
1490
1491 #[tokio::test]
1492 async fn test_create_namespace_with_properties() {
1493 let warehouse_loc = temp_path();
1494 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1495 let namespace_ident = NamespaceIdent::new("abc".into());
1496
1497 let mut properties = default_properties();
1498 properties.insert("k".into(), "v".into());
1499
1500 assert_eq!(
1501 catalog
1502 .create_namespace(&namespace_ident, properties.clone())
1503 .await
1504 .unwrap(),
1505 Namespace::with_properties(namespace_ident.clone(), properties.clone())
1506 );
1507
1508 assert_eq!(
1509 catalog.get_namespace(&namespace_ident).await.unwrap(),
1510 Namespace::with_properties(namespace_ident, properties)
1511 );
1512 }
1513
1514 #[tokio::test]
1515 async fn test_create_namespace_throws_error_if_namespace_already_exists() {
1516 let warehouse_loc = temp_path();
1517 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1518 let namespace_ident = NamespaceIdent::new("a".into());
1519 create_namespace(&catalog, &namespace_ident).await;
1520
1521 assert_eq!(
1522 catalog
1523 .create_namespace(&namespace_ident, HashMap::new())
1524 .await
1525 .unwrap_err()
1526 .to_string(),
1527 format!(
1528 "Unexpected => Namespace {:?} already exists",
1529 &namespace_ident
1530 )
1531 );
1532
1533 assert_eq!(
1534 catalog.get_namespace(&namespace_ident).await.unwrap(),
1535 Namespace::with_properties(namespace_ident, default_properties())
1536 );
1537 }
1538
1539 #[tokio::test]
1540 async fn test_create_nested_namespace() {
1541 let warehouse_loc = temp_path();
1542 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1543 let parent_namespace_ident = NamespaceIdent::new("a".into());
1544 create_namespace(&catalog, &parent_namespace_ident).await;
1545
1546 let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1547
1548 assert_eq!(
1549 catalog
1550 .create_namespace(&child_namespace_ident, HashMap::new())
1551 .await
1552 .unwrap(),
1553 Namespace::new(child_namespace_ident.clone())
1554 );
1555
1556 assert_eq!(
1557 catalog.get_namespace(&child_namespace_ident).await.unwrap(),
1558 Namespace::with_properties(child_namespace_ident, default_properties())
1559 );
1560 }
1561
1562 #[tokio::test]
1563 async fn test_create_deeply_nested_namespace() {
1564 let warehouse_loc = temp_path();
1565 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1566 let namespace_ident_a = NamespaceIdent::new("a".into());
1567 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1568 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1569
1570 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1571
1572 assert_eq!(
1573 catalog
1574 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
1575 .await
1576 .unwrap(),
1577 Namespace::new(namespace_ident_a_b_c.clone())
1578 );
1579
1580 assert_eq!(
1581 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
1582 Namespace::with_properties(namespace_ident_a_b_c, default_properties())
1583 );
1584 }
1585
1586 #[tokio::test]
1587 async fn test_update_namespace_noop() {
1588 let warehouse_loc = temp_path();
1589 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1590 let namespace_ident = NamespaceIdent::new("a".into());
1591 create_namespace(&catalog, &namespace_ident).await;
1592
1593 catalog
1594 .update_namespace(&namespace_ident, HashMap::new())
1595 .await
1596 .unwrap();
1597
1598 assert_eq!(
1599 *catalog
1600 .get_namespace(&namespace_ident)
1601 .await
1602 .unwrap()
1603 .properties(),
1604 HashMap::from_iter([("exists".to_string(), "true".to_string())])
1605 )
1606 }
1607
1608 #[tokio::test]
1609 async fn test_update_namespace() {
1610 let warehouse_loc = temp_path();
1611 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1612 let namespace_ident = NamespaceIdent::new("a".into());
1613 create_namespace(&catalog, &namespace_ident).await;
1614
1615 let mut props = HashMap::from_iter([
1616 ("prop1".to_string(), "val1".to_string()),
1617 ("prop2".into(), "val2".into()),
1618 ]);
1619
1620 catalog
1621 .update_namespace(&namespace_ident, props.clone())
1622 .await
1623 .unwrap();
1624
1625 props.insert("exists".into(), "true".into());
1626
1627 assert_eq!(
1628 *catalog
1629 .get_namespace(&namespace_ident)
1630 .await
1631 .unwrap()
1632 .properties(),
1633 props
1634 )
1635 }
1636
1637 #[tokio::test]
1638 async fn test_update_nested_namespace() {
1639 let warehouse_loc = temp_path();
1640 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1641 let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1642 create_namespace(&catalog, &namespace_ident).await;
1643
1644 let mut props = HashMap::from_iter([
1645 ("prop1".to_string(), "val1".to_string()),
1646 ("prop2".into(), "val2".into()),
1647 ]);
1648
1649 catalog
1650 .update_namespace(&namespace_ident, props.clone())
1651 .await
1652 .unwrap();
1653
1654 props.insert("exists".into(), "true".into());
1655
1656 assert_eq!(
1657 *catalog
1658 .get_namespace(&namespace_ident)
1659 .await
1660 .unwrap()
1661 .properties(),
1662 props
1663 )
1664 }
1665
1666 #[tokio::test]
1667 async fn test_update_namespace_errors_if_namespace_doesnt_exist() {
1668 let warehouse_loc = temp_path();
1669 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1670 let namespace_ident = NamespaceIdent::new("a".into());
1671
1672 let props = HashMap::from_iter([
1673 ("prop1".to_string(), "val1".to_string()),
1674 ("prop2".into(), "val2".into()),
1675 ]);
1676
1677 let err = catalog
1678 .update_namespace(&namespace_ident, props)
1679 .await
1680 .unwrap_err();
1681
1682 assert_eq!(
1683 err.message(),
1684 format!("No such namespace: {namespace_ident:?}")
1685 );
1686 }
1687
1688 #[tokio::test]
1689 async fn test_update_namespace_errors_if_nested_namespace_doesnt_exist() {
1690 let warehouse_loc = temp_path();
1691 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1692 let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1693
1694 let props = HashMap::from_iter([
1695 ("prop1".to_string(), "val1".to_string()),
1696 ("prop2".into(), "val2".into()),
1697 ]);
1698
1699 let err = catalog
1700 .update_namespace(&namespace_ident, props)
1701 .await
1702 .unwrap_err();
1703
1704 assert_eq!(
1705 err.message(),
1706 format!("No such namespace: {namespace_ident:?}")
1707 );
1708 }
1709
1710 #[tokio::test]
1711 async fn test_drop_namespace() {
1712 let warehouse_loc = temp_path();
1713 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1714 let namespace_ident = NamespaceIdent::new("abc".into());
1715 create_namespace(&catalog, &namespace_ident).await;
1716
1717 catalog.drop_namespace(&namespace_ident).await.unwrap();
1718
1719 assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
1720 }
1721
1722 #[tokio::test]
1723 async fn test_drop_nested_namespace() {
1724 let warehouse_loc = temp_path();
1725 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1726 let namespace_ident_a = NamespaceIdent::new("a".into());
1727 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1728 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1729
1730 catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1731
1732 assert!(
1733 !catalog
1734 .namespace_exists(&namespace_ident_a_b)
1735 .await
1736 .unwrap()
1737 );
1738
1739 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1740 }
1741
1742 #[tokio::test]
1743 async fn test_drop_deeply_nested_namespace() {
1744 let warehouse_loc = temp_path();
1745 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1746 let namespace_ident_a = NamespaceIdent::new("a".into());
1747 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1748 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1749 create_namespaces(&catalog, &vec![
1750 &namespace_ident_a,
1751 &namespace_ident_a_b,
1752 &namespace_ident_a_b_c,
1753 ])
1754 .await;
1755
1756 catalog
1757 .drop_namespace(&namespace_ident_a_b_c)
1758 .await
1759 .unwrap();
1760
1761 assert!(
1762 !catalog
1763 .namespace_exists(&namespace_ident_a_b_c)
1764 .await
1765 .unwrap()
1766 );
1767
1768 assert!(
1769 catalog
1770 .namespace_exists(&namespace_ident_a_b)
1771 .await
1772 .unwrap()
1773 );
1774
1775 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1776 }
1777
1778 #[tokio::test]
1779 async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1780 let warehouse_loc = temp_path();
1781 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1782
1783 let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1784 assert_eq!(
1785 catalog
1786 .drop_namespace(&non_existent_namespace_ident)
1787 .await
1788 .unwrap_err()
1789 .to_string(),
1790 format!("Unexpected => No such namespace: {non_existent_namespace_ident:?}")
1791 )
1792 }
1793
1794 #[tokio::test]
1795 async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1796 let warehouse_loc = temp_path();
1797 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1798 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1799
1800 let non_existent_namespace_ident =
1801 NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1802 assert_eq!(
1803 catalog
1804 .drop_namespace(&non_existent_namespace_ident)
1805 .await
1806 .unwrap_err()
1807 .to_string(),
1808 format!("Unexpected => No such namespace: {non_existent_namespace_ident:?}")
1809 )
1810 }
1811
1812 #[tokio::test]
1813 async fn test_dropping_a_namespace_does_not_drop_namespaces_nested_under_that_one() {
1814 let warehouse_loc = temp_path();
1815 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1816 let namespace_ident_a = NamespaceIdent::new("a".into());
1817 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1818 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1819
1820 catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1821
1822 assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1823
1824 assert!(
1825 catalog
1826 .namespace_exists(&namespace_ident_a_b)
1827 .await
1828 .unwrap()
1829 );
1830 }
1831
1832 #[tokio::test]
1833 async fn test_list_tables_returns_empty_vector() {
1834 let warehouse_loc = temp_path();
1835 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1836 let namespace_ident = NamespaceIdent::new("a".into());
1837 create_namespace(&catalog, &namespace_ident).await;
1838
1839 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1840 }
1841
1842 #[tokio::test]
1843 async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1844 let warehouse_loc = temp_path();
1845 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1846
1847 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1848
1849 assert_eq!(
1850 catalog
1851 .list_tables(&non_existent_namespace_ident)
1852 .await
1853 .unwrap_err()
1854 .to_string(),
1855 format!("Unexpected => No such namespace: {non_existent_namespace_ident:?}"),
1856 );
1857 }
1858
1859 #[tokio::test]
1860 async fn test_create_table_with_location() {
1861 let warehouse_loc = temp_path();
1862 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
1863 let namespace_ident = NamespaceIdent::new("a".into());
1864 create_namespace(&catalog, &namespace_ident).await;
1865
1866 let table_name = "abc";
1867 let location = warehouse_loc.clone();
1868 let table_creation = TableCreation::builder()
1869 .name(table_name.into())
1870 .location(location.clone())
1871 .schema(simple_table_schema())
1872 .build();
1873
1874 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1875
1876 assert_table_eq(
1877 &catalog
1878 .create_table(&namespace_ident, table_creation)
1879 .await
1880 .unwrap(),
1881 &expected_table_ident,
1882 &simple_table_schema(),
1883 );
1884
1885 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1886
1887 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1888
1889 assert!(
1890 table
1891 .metadata_location()
1892 .unwrap()
1893 .to_string()
1894 .starts_with(&location)
1895 )
1896 }
1897
1898 #[tokio::test]
1899 async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1900 let warehouse_loc = temp_path();
1901 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1902
1903 let namespace_ident = NamespaceIdent::new("a".into());
1904 let mut namespace_properties = HashMap::new();
1905 let namespace_location = temp_path();
1906 namespace_properties.insert(
1907 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1908 namespace_location.to_string(),
1909 );
1910 catalog
1911 .create_namespace(&namespace_ident, namespace_properties)
1912 .await
1913 .unwrap();
1914
1915 let table_name = "tbl1";
1916 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1917 let expected_table_metadata_location_regex =
1918 format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1919
1920 let table = catalog
1921 .create_table(
1922 &namespace_ident,
1923 TableCreation::builder()
1924 .name(table_name.into())
1925 .schema(simple_table_schema())
1926 .build(),
1928 )
1929 .await
1930 .unwrap();
1931 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1932 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1933
1934 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1935 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1936 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1937 }
1938
1939 #[tokio::test]
1940 async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1941 {
1942 let warehouse_loc = temp_path();
1943 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
1944
1945 let namespace_ident = NamespaceIdent::new("a".into());
1946 let mut namespace_properties = HashMap::new();
1947 let namespace_location = temp_path();
1948 namespace_properties.insert(
1949 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1950 namespace_location.to_string(),
1951 );
1952 catalog
1953 .create_namespace(&namespace_ident, namespace_properties)
1954 .await
1955 .unwrap();
1956
1957 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1958 let mut nested_namespace_properties = HashMap::new();
1959 let nested_namespace_location = temp_path();
1960 nested_namespace_properties.insert(
1961 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1962 nested_namespace_location.to_string(),
1963 );
1964 catalog
1965 .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1966 .await
1967 .unwrap();
1968
1969 let table_name = "tbl1";
1970 let expected_table_ident =
1971 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1972 let expected_table_metadata_location_regex = format!(
1973 "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1974 );
1975
1976 let table = catalog
1977 .create_table(
1978 &nested_namespace_ident,
1979 TableCreation::builder()
1980 .name(table_name.into())
1981 .schema(simple_table_schema())
1982 .build(),
1984 )
1985 .await
1986 .unwrap();
1987 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1988 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1989
1990 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1991 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1992 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1993 }
1994
1995 #[tokio::test]
1996 async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1997 {
1998 let warehouse_loc = temp_path();
1999 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2000
2001 let namespace_ident = NamespaceIdent::new("a".into());
2002 let namespace_properties = HashMap::new();
2004 catalog
2005 .create_namespace(&namespace_ident, namespace_properties)
2006 .await
2007 .unwrap();
2008
2009 let table_name = "tbl1";
2010 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2011 let expected_table_metadata_location_regex =
2012 format!("^{warehouse_loc}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
2013
2014 let table = catalog
2015 .create_table(
2016 &namespace_ident,
2017 TableCreation::builder()
2018 .name(table_name.into())
2019 .schema(simple_table_schema())
2020 .build(),
2022 )
2023 .await
2024 .unwrap();
2025 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2026 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2027
2028 let table = catalog.load_table(&expected_table_ident).await.unwrap();
2029 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2030 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2031 }
2032
2033 #[tokio::test]
2034 async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
2035 {
2036 let warehouse_loc = temp_path();
2037 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2038
2039 let namespace_ident = NamespaceIdent::new("a".into());
2040 create_namespace(&catalog, &namespace_ident).await;
2041
2042 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
2043 create_namespace(&catalog, &nested_namespace_ident).await;
2044
2045 let table_name = "tbl1";
2046 let expected_table_ident =
2047 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
2048 let expected_table_metadata_location_regex =
2049 format!("^{warehouse_loc}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
2050
2051 let table = catalog
2052 .create_table(
2053 &nested_namespace_ident,
2054 TableCreation::builder()
2055 .name(table_name.into())
2056 .schema(simple_table_schema())
2057 .build(),
2059 )
2060 .await
2061 .unwrap();
2062 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2063 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2064
2065 let table = catalog.load_table(&expected_table_ident).await.unwrap();
2066 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2067 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2068 }
2069
2070 #[tokio::test]
2071 async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
2072 let warehouse_loc = temp_path();
2073 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2074 let namespace_ident = NamespaceIdent::new("a".into());
2075 create_namespace(&catalog, &namespace_ident).await;
2076 let table_name = "tbl1";
2077 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2078 create_table(&catalog, &table_ident).await;
2079
2080 let tmp_dir = TempDir::new().unwrap();
2081 let location = tmp_dir.path().to_str().unwrap().to_string();
2082
2083 assert_eq!(
2084 catalog
2085 .create_table(
2086 &namespace_ident,
2087 TableCreation::builder()
2088 .name(table_name.into())
2089 .schema(simple_table_schema())
2090 .location(location)
2091 .build()
2092 )
2093 .await
2094 .unwrap_err()
2095 .to_string(),
2096 format!("Unexpected => Table {:?} already exists.", &table_ident)
2097 );
2098 }
2099
2100 #[tokio::test]
2101 async fn test_rename_table_in_same_namespace() {
2102 let warehouse_loc = temp_path();
2103 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2104 let namespace_ident = NamespaceIdent::new("n1".into());
2105 create_namespace(&catalog, &namespace_ident).await;
2106 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2107 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
2108 create_table(&catalog, &src_table_ident).await;
2109
2110 catalog
2111 .rename_table(&src_table_ident, &dst_table_ident)
2112 .await
2113 .unwrap();
2114
2115 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
2116 dst_table_ident
2117 ],);
2118 }
2119
2120 #[tokio::test]
2121 async fn test_rename_table_across_namespaces() {
2122 let warehouse_loc = temp_path();
2123 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2124 let src_namespace_ident = NamespaceIdent::new("a".into());
2125 let dst_namespace_ident = NamespaceIdent::new("b".into());
2126 create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
2127 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
2128 let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
2129 create_table(&catalog, &src_table_ident).await;
2130
2131 catalog
2132 .rename_table(&src_table_ident, &dst_table_ident)
2133 .await
2134 .unwrap();
2135
2136 assert_eq!(
2137 catalog.list_tables(&src_namespace_ident).await.unwrap(),
2138 vec![],
2139 );
2140
2141 assert_eq!(
2142 catalog.list_tables(&dst_namespace_ident).await.unwrap(),
2143 vec![dst_table_ident],
2144 );
2145 }
2146
2147 #[tokio::test]
2148 async fn test_rename_table_src_table_is_same_as_dst_table() {
2149 let warehouse_loc = temp_path();
2150 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2151 let namespace_ident = NamespaceIdent::new("n1".into());
2152 create_namespace(&catalog, &namespace_ident).await;
2153 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
2154 create_table(&catalog, &table_ident).await;
2155
2156 catalog
2157 .rename_table(&table_ident, &table_ident)
2158 .await
2159 .unwrap();
2160
2161 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
2162 table_ident
2163 ],);
2164 }
2165
2166 #[tokio::test]
2167 async fn test_rename_table_across_nested_namespaces() {
2168 let warehouse_loc = temp_path();
2169 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2170 let namespace_ident_a = NamespaceIdent::new("a".into());
2171 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
2172 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
2173 create_namespaces(&catalog, &vec![
2174 &namespace_ident_a,
2175 &namespace_ident_a_b,
2176 &namespace_ident_a_b_c,
2177 ])
2178 .await;
2179
2180 let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
2181 create_tables(&catalog, vec![&src_table_ident]).await;
2182
2183 let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
2184 catalog
2185 .rename_table(&src_table_ident, &dst_table_ident)
2186 .await
2187 .unwrap();
2188
2189 assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
2190
2191 assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
2192 }
2193
2194 #[tokio::test]
2195 async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
2196 let warehouse_loc = temp_path();
2197 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2198 let src_namespace_ident = NamespaceIdent::new("n1".into());
2199 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
2200 create_namespace(&catalog, &src_namespace_ident).await;
2201 create_table(&catalog, &src_table_ident).await;
2202
2203 let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
2204 let dst_table_ident =
2205 TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
2206 assert_eq!(
2207 catalog
2208 .rename_table(&src_table_ident, &dst_table_ident)
2209 .await
2210 .unwrap_err()
2211 .to_string(),
2212 format!("Unexpected => No such namespace: {non_existent_dst_namespace_ident:?}"),
2213 );
2214 }
2215
2216 #[tokio::test]
2217 async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
2218 let warehouse_loc = temp_path();
2219 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2220 let namespace_ident = NamespaceIdent::new("n1".into());
2221 create_namespace(&catalog, &namespace_ident).await;
2222 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2223 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
2224
2225 assert_eq!(
2226 catalog
2227 .rename_table(&src_table_ident, &dst_table_ident)
2228 .await
2229 .unwrap_err()
2230 .to_string(),
2231 format!("Unexpected => No such table: {src_table_ident:?}"),
2232 );
2233 }
2234
2235 #[tokio::test]
2236 async fn test_rename_table_throws_error_if_dst_table_already_exists() {
2237 let warehouse_loc = temp_path();
2238 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2239 let namespace_ident = NamespaceIdent::new("n1".into());
2240 create_namespace(&catalog, &namespace_ident).await;
2241 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2242 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
2243 create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
2244
2245 assert_eq!(
2246 catalog
2247 .rename_table(&src_table_ident, &dst_table_ident)
2248 .await
2249 .unwrap_err()
2250 .to_string(),
2251 format!("Unexpected => Table {:?} already exists.", &dst_table_ident),
2252 );
2253 }
2254
2255 #[tokio::test]
2256 async fn test_drop_table_throws_error_if_table_not_exist() {
2257 let warehouse_loc = temp_path();
2258 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2259 let namespace_ident = NamespaceIdent::new("a".into());
2260 let table_name = "tbl1";
2261 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2262 create_namespace(&catalog, &namespace_ident).await;
2263
2264 let err = catalog
2265 .drop_table(&table_ident)
2266 .await
2267 .unwrap_err()
2268 .to_string();
2269 assert_eq!(
2270 err,
2271 "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
2272 );
2273 }
2274
2275 #[tokio::test]
2276 async fn test_drop_table() {
2277 let warehouse_loc = temp_path();
2278 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2279 let namespace_ident = NamespaceIdent::new("a".into());
2280 let table_name = "tbl1";
2281 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2282 create_namespace(&catalog, &namespace_ident).await;
2283
2284 let location = warehouse_loc.clone();
2285 let table_creation = TableCreation::builder()
2286 .name(table_name.into())
2287 .location(location.clone())
2288 .schema(simple_table_schema())
2289 .build();
2290
2291 catalog
2292 .create_table(&namespace_ident, table_creation)
2293 .await
2294 .unwrap();
2295
2296 let table = catalog.load_table(&table_ident).await.unwrap();
2297 assert_table_eq(&table, &table_ident, &simple_table_schema());
2298
2299 catalog.drop_table(&table_ident).await.unwrap();
2300 let err = catalog
2301 .load_table(&table_ident)
2302 .await
2303 .unwrap_err()
2304 .to_string();
2305 assert_eq!(
2306 err,
2307 "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
2308 );
2309 }
2310
2311 #[tokio::test]
2312 async fn test_register_table_throws_error_if_table_with_same_name_already_exists() {
2313 let warehouse_loc = temp_path();
2314 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2315 let namespace_ident = NamespaceIdent::new("a".into());
2316 create_namespace(&catalog, &namespace_ident).await;
2317 let table_name = "tbl1";
2318 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2319 create_table(&catalog, &table_ident).await;
2320
2321 assert_eq!(
2322 catalog
2323 .register_table(&table_ident, warehouse_loc)
2324 .await
2325 .unwrap_err()
2326 .to_string(),
2327 format!("Unexpected => Table {:?} already exists.", &table_ident)
2328 );
2329 }
2330
2331 #[tokio::test]
2332 async fn test_register_table() {
2333 let warehouse_loc = temp_path();
2334 let catalog = new_sql_catalog(warehouse_loc.clone(), Some("iceberg")).await;
2335 let namespace_ident = NamespaceIdent::new("a".into());
2336 create_namespace(&catalog, &namespace_ident).await;
2337
2338 let table_name = "abc";
2339 let location = warehouse_loc.clone();
2340 let table_creation = TableCreation::builder()
2341 .name(table_name.into())
2342 .location(location.clone())
2343 .schema(simple_table_schema())
2344 .build();
2345
2346 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2347 let expected_table = catalog
2348 .create_table(&namespace_ident, table_creation)
2349 .await
2350 .unwrap();
2351
2352 let metadata_location = expected_table
2353 .metadata_location()
2354 .expect("Expected metadata location to be set")
2355 .to_string();
2356
2357 assert_table_eq(&expected_table, &table_ident, &simple_table_schema());
2358
2359 let _ = catalog.drop_table(&table_ident).await;
2360
2361 let table = catalog
2362 .register_table(&table_ident, metadata_location.clone())
2363 .await
2364 .unwrap();
2365
2366 assert_eq!(table.identifier(), expected_table.identifier());
2367 assert_eq!(table.metadata_location(), Some(metadata_location.as_str()));
2368 }
2369
2370 #[tokio::test]
2371 async fn test_update_table() {
2372 let warehouse_loc = temp_path();
2373 let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
2374
2375 let namespace_ident = NamespaceIdent::new("ns1".into());
2377 create_namespace(&catalog, &namespace_ident).await;
2378 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2379 create_table(&catalog, &table_ident).await;
2380
2381 let table = catalog.load_table(&table_ident).await.unwrap();
2382
2383 let original_metadata_location = table.metadata_location().unwrap().to_string();
2385
2386 let tx = Transaction::new(&table);
2388 let tx = tx
2389 .update_table_properties()
2390 .set("test_property".to_string(), "test_value".to_string())
2391 .apply(tx)
2392 .unwrap();
2393
2394 let updated_table = tx.commit(&catalog).await.unwrap();
2396
2397 assert_eq!(
2399 updated_table.metadata().properties().get("test_property"),
2400 Some(&"test_value".to_string())
2401 );
2402 assert_ne!(
2404 updated_table.metadata_location().unwrap(),
2405 original_metadata_location.as_str()
2406 );
2407
2408 let reloaded = catalog.load_table(&table_ident).await.unwrap();
2410
2411 assert_eq!(
2413 reloaded.metadata().properties().get("test_property"),
2414 Some(&"test_value".to_string())
2415 );
2416 assert_eq!(
2417 reloaded.metadata_location(),
2418 updated_table.metadata_location()
2419 );
2420 }
2421}