1use std::collections::{HashMap, HashSet};
19use std::str::FromStr;
20use std::time::Duration;
21
22use async_trait::async_trait;
23use iceberg::io::FileIO;
24use iceberg::spec::{TableMetadata, TableMetadataBuilder};
25use iceberg::table::Table;
26use iceberg::{
27 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
28 TableCommit, TableCreation, TableIdent,
29};
30use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers};
31use sqlx::{Any, AnyPool, Row, Transaction};
32
33use crate::error::{
34 from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err,
35};
36
37pub const SQL_CATALOG_PROP_URI: &str = "uri";
39pub const SQL_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
41pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style";
43
44static CATALOG_TABLE_NAME: &str = "iceberg_tables";
45static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
46static CATALOG_FIELD_TABLE_NAME: &str = "table_name";
47static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace";
48static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location";
49static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
50static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type";
51static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE";
52
53static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties";
54static NAMESPACE_FIELD_NAME: &str = "namespace";
55static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key";
56static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value";
57
58static NAMESPACE_LOCATION_PROPERTY_KEY: &str = "location";
59
60static MAX_CONNECTIONS: u32 = 10; static IDLE_TIMEOUT: u64 = 10; static TEST_BEFORE_ACQUIRE: bool = true; #[derive(Debug)]
66pub struct SqlCatalogBuilder(SqlCatalogConfig);
67
68impl Default for SqlCatalogBuilder {
69 fn default() -> Self {
70 Self(SqlCatalogConfig {
71 uri: "".to_string(),
72 name: "".to_string(),
73 warehouse_location: "".to_string(),
74 sql_bind_style: SqlBindStyle::DollarNumeric,
75 props: HashMap::new(),
76 })
77 }
78}
79
80impl SqlCatalogBuilder {
81 pub fn uri(mut self, uri: impl Into<String>) -> Self {
86 self.0.uri = uri.into();
87 self
88 }
89
90 pub fn warehouse_location(mut self, location: impl Into<String>) -> Self {
95 self.0.warehouse_location = location.into();
96 self
97 }
98
99 pub fn sql_bind_style(mut self, sql_bind_style: SqlBindStyle) -> Self {
104 self.0.sql_bind_style = sql_bind_style;
105 self
106 }
107
108 pub fn props(mut self, props: HashMap<String, String>) -> Self {
113 for (k, v) in props {
114 self.0.props.insert(k, v);
115 }
116 self
117 }
118
119 pub fn prop(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
126 self.0.props.insert(key.into(), value.into());
127 self
128 }
129}
130
131impl CatalogBuilder for SqlCatalogBuilder {
132 type C = SqlCatalog;
133
134 fn load(
135 mut self,
136 name: impl Into<String>,
137 props: HashMap<String, String>,
138 ) -> impl Future<Output = Result<Self::C>> + Send {
139 let name = name.into();
140
141 for (k, v) in props {
142 self.0.props.insert(k, v);
143 }
144
145 if let Some(uri) = self.0.props.remove(SQL_CATALOG_PROP_URI) {
146 self.0.uri = uri;
147 }
148 if let Some(warehouse_location) = self.0.props.remove(SQL_CATALOG_PROP_WAREHOUSE) {
149 self.0.warehouse_location = warehouse_location;
150 }
151
152 let mut valid_sql_bind_style = true;
153 if let Some(sql_bind_style) = self.0.props.remove(SQL_CATALOG_PROP_BIND_STYLE) {
154 if let Ok(sql_bind_style) = SqlBindStyle::from_str(&sql_bind_style) {
155 self.0.sql_bind_style = sql_bind_style;
156 } else {
157 valid_sql_bind_style = false;
158 }
159 }
160
161 async move {
162 if name.trim().is_empty() {
163 Err(Error::new(
164 ErrorKind::DataInvalid,
165 "Catalog name cannot be empty",
166 ))
167 } else if !valid_sql_bind_style {
168 Err(Error::new(
169 ErrorKind::DataInvalid,
170 format!(
171 "`{}` values are valid only if they're `{}` or `{}`",
172 SQL_CATALOG_PROP_BIND_STYLE,
173 SqlBindStyle::DollarNumeric,
174 SqlBindStyle::QMark
175 ),
176 ))
177 } else {
178 SqlCatalog::new(self.0).await
179 }
180 }
181 }
182}
183
184#[derive(Debug)]
193struct SqlCatalogConfig {
194 uri: String,
195 name: String,
196 warehouse_location: String,
197 sql_bind_style: SqlBindStyle,
198 props: HashMap<String, String>,
199}
200
201#[derive(Debug)]
202pub struct SqlCatalog {
204 name: String,
205 connection: AnyPool,
206 warehouse_location: String,
207 fileio: FileIO,
208 sql_bind_style: SqlBindStyle,
209}
210
211#[derive(Debug, PartialEq, strum::EnumString, strum::Display)]
212pub enum SqlBindStyle {
214 DollarNumeric,
216 QMark,
218}
219
220impl SqlCatalog {
221 async fn new(config: SqlCatalogConfig) -> Result<Self> {
223 let fileio = FileIO::from_path(&config.warehouse_location)?.build()?;
224 install_default_drivers();
225 let max_connections: u32 = config
226 .props
227 .get("pool.max-connections")
228 .map(|v| v.parse().unwrap())
229 .unwrap_or(MAX_CONNECTIONS);
230 let idle_timeout: u64 = config
231 .props
232 .get("pool.idle-timeout")
233 .map(|v| v.parse().unwrap())
234 .unwrap_or(IDLE_TIMEOUT);
235 let test_before_acquire: bool = config
236 .props
237 .get("pool.test-before-acquire")
238 .map(|v| v.parse().unwrap())
239 .unwrap_or(TEST_BEFORE_ACQUIRE);
240
241 let pool = AnyPoolOptions::new()
242 .max_connections(max_connections)
243 .idle_timeout(Duration::from_secs(idle_timeout))
244 .test_before_acquire(test_before_acquire)
245 .connect(&config.uri)
246 .await
247 .map_err(from_sqlx_error)?;
248
249 sqlx::query(&format!(
250 "CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_NAME} (
251 {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
252 {CATALOG_FIELD_TABLE_NAMESPACE} VARCHAR(255) NOT NULL,
253 {CATALOG_FIELD_TABLE_NAME} VARCHAR(255) NOT NULL,
254 {CATALOG_FIELD_METADATA_LOCATION_PROP} VARCHAR(1000),
255 {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} VARCHAR(1000),
256 {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5),
257 PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}))"
258 ))
259 .execute(&pool)
260 .await
261 .map_err(from_sqlx_error)?;
262
263 sqlx::query(&format!(
264 "CREATE TABLE IF NOT EXISTS {NAMESPACE_TABLE_NAME} (
265 {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
266 {NAMESPACE_FIELD_NAME} VARCHAR(255) NOT NULL,
267 {NAMESPACE_FIELD_PROPERTY_KEY} VARCHAR(255),
268 {NAMESPACE_FIELD_PROPERTY_VALUE} VARCHAR(1000),
269 PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}))"
270 ))
271 .execute(&pool)
272 .await
273 .map_err(from_sqlx_error)?;
274
275 Ok(SqlCatalog {
276 name: config.name.to_owned(),
277 connection: pool,
278 warehouse_location: config.warehouse_location,
279 fileio,
280 sql_bind_style: config.sql_bind_style,
281 })
282 }
283
284 fn replace_placeholders(&self, query: &str) -> String {
286 match self.sql_bind_style {
287 SqlBindStyle::DollarNumeric => {
288 let mut count = 1;
289 query
290 .chars()
291 .fold(String::with_capacity(query.len()), |mut acc, c| {
292 if c == '?' {
293 acc.push('$');
294 acc.push_str(&count.to_string());
295 count += 1;
296 } else {
297 acc.push(c);
298 }
299 acc
300 })
301 }
302 _ => query.to_owned(),
303 }
304 }
305
306 async fn fetch_rows(&self, query: &str, args: Vec<Option<&str>>) -> Result<Vec<AnyRow>> {
308 let query_with_placeholders = self.replace_placeholders(query);
309
310 let mut sqlx_query = sqlx::query(&query_with_placeholders);
311 for arg in args {
312 sqlx_query = sqlx_query.bind(arg);
313 }
314
315 sqlx_query
316 .fetch_all(&self.connection)
317 .await
318 .map_err(from_sqlx_error)
319 }
320
321 async fn execute(
323 &self,
324 query: &str,
325 args: Vec<Option<&str>>,
326 transaction: Option<&mut Transaction<'_, Any>>,
327 ) -> Result<AnyQueryResult> {
328 let query_with_placeholders = self.replace_placeholders(query);
329
330 let mut sqlx_query = sqlx::query(&query_with_placeholders);
331 for arg in args {
332 sqlx_query = sqlx_query.bind(arg);
333 }
334
335 match transaction {
336 Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error),
337 None => {
338 let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
339 let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error);
340 let _ = tx.commit().await.map_err(from_sqlx_error);
341 result
342 }
343 }
344 }
345}
346
347#[async_trait]
348impl Catalog for SqlCatalog {
349 async fn list_namespaces(
350 &self,
351 parent: Option<&NamespaceIdent>,
352 ) -> Result<Vec<NamespaceIdent>> {
353 let all_namespaces_stmt = format!(
355 "SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
356 FROM {CATALOG_TABLE_NAME}
357 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
358 UNION
359 SELECT {NAMESPACE_FIELD_NAME}
360 FROM {NAMESPACE_TABLE_NAME}
361 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
362 );
363
364 let namespace_rows = self
365 .fetch_rows(&all_namespaces_stmt, vec![
366 Some(&self.name),
367 Some(&self.name),
368 ])
369 .await?;
370
371 let mut namespaces = HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len());
372
373 if let Some(parent) = parent {
374 if self.namespace_exists(parent).await? {
375 let parent_str = parent.join(".");
376
377 for row in namespace_rows.iter() {
378 let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
379 if nsp != parent_str && nsp.starts_with(&parent_str) {
381 namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?);
382 }
383 }
384
385 Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
386 } else {
387 no_such_namespace_err(parent)
388 }
389 } else {
390 for row in namespace_rows.iter() {
391 let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
392 let mut levels = nsp.split(".").collect::<Vec<&str>>();
393 if !levels.is_empty() {
394 let first_level = levels.drain(..1).collect::<Vec<&str>>();
395 namespaces.insert(NamespaceIdent::from_strs(first_level)?);
396 }
397 }
398
399 Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
400 }
401 }
402
403 async fn create_namespace(
404 &self,
405 namespace: &NamespaceIdent,
406 properties: HashMap<String, String>,
407 ) -> Result<Namespace> {
408 let exists = self.namespace_exists(namespace).await?;
409
410 if exists {
411 return Err(Error::new(
412 iceberg::ErrorKind::Unexpected,
413 format!("Namespace {namespace:?} already exists"),
414 ));
415 }
416
417 let namespace_str = namespace.join(".");
418 let insert = format!(
419 "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
420 VALUES (?, ?, ?, ?)");
421 if !properties.is_empty() {
422 let mut insert_properties = properties.clone();
423 insert_properties.insert("exists".to_string(), "true".to_string());
424
425 let mut query_args = Vec::with_capacity(insert_properties.len() * 4);
426 let mut insert_stmt = insert.clone();
427 for (index, (key, value)) in insert_properties.iter().enumerate() {
428 query_args.extend_from_slice(&[
429 Some(self.name.as_str()),
430 Some(namespace_str.as_str()),
431 Some(key.as_str()),
432 Some(value.as_str()),
433 ]);
434 if index > 0 {
435 insert_stmt.push_str(", (?, ?, ?, ?)");
436 }
437 }
438
439 self.execute(&insert_stmt, query_args, None).await?;
440
441 Ok(Namespace::with_properties(
442 namespace.clone(),
443 insert_properties,
444 ))
445 } else {
446 self.execute(
448 &insert,
449 vec![
450 Some(&self.name),
451 Some(&namespace_str),
452 Some("exists"),
453 Some("true"),
454 ],
455 None,
456 )
457 .await?;
458 Ok(Namespace::with_properties(namespace.clone(), properties))
459 }
460 }
461
462 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
463 let exists = self.namespace_exists(namespace).await?;
464 if exists {
465 let namespace_props = self
466 .fetch_rows(
467 &format!(
468 "SELECT
469 {NAMESPACE_FIELD_NAME},
470 {NAMESPACE_FIELD_PROPERTY_KEY},
471 {NAMESPACE_FIELD_PROPERTY_VALUE}
472 FROM {NAMESPACE_TABLE_NAME}
473 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
474 AND {NAMESPACE_FIELD_NAME} = ?"
475 ),
476 vec![Some(&self.name), Some(&namespace.join("."))],
477 )
478 .await?;
479
480 let mut properties = HashMap::with_capacity(namespace_props.len());
481
482 for row in namespace_props {
483 let key = row
484 .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_KEY)
485 .map_err(from_sqlx_error)?;
486 let value = row
487 .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_VALUE)
488 .map_err(from_sqlx_error)?;
489
490 properties.insert(key, value);
491 }
492
493 Ok(Namespace::with_properties(namespace.clone(), properties))
494 } else {
495 no_such_namespace_err(namespace)
496 }
497 }
498
499 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
500 let namespace_str = namespace.join(".");
501
502 let table_namespaces = self
503 .fetch_rows(
504 &format!(
505 "SELECT 1 FROM {CATALOG_TABLE_NAME}
506 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
507 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
508 LIMIT 1"
509 ),
510 vec![Some(&self.name), Some(&namespace_str)],
511 )
512 .await?;
513
514 if !table_namespaces.is_empty() {
515 Ok(true)
516 } else {
517 let namespaces = self
518 .fetch_rows(
519 &format!(
520 "SELECT 1 FROM {NAMESPACE_TABLE_NAME}
521 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
522 AND {NAMESPACE_FIELD_NAME} = ?
523 LIMIT 1"
524 ),
525 vec![Some(&self.name), Some(&namespace_str)],
526 )
527 .await?;
528 if !namespaces.is_empty() {
529 Ok(true)
530 } else {
531 Ok(false)
532 }
533 }
534 }
535
536 async fn update_namespace(
537 &self,
538 namespace: &NamespaceIdent,
539 properties: HashMap<String, String>,
540 ) -> Result<()> {
541 let exists = self.namespace_exists(namespace).await?;
542 if exists {
543 let existing_properties = self.get_namespace(namespace).await?.properties().clone();
544 let namespace_str = namespace.join(".");
545
546 let mut updates = vec![];
547 let mut inserts = vec![];
548
549 for (key, value) in properties.iter() {
550 if existing_properties.contains_key(key) {
551 if existing_properties.get(key) != Some(value) {
552 updates.push((key, value));
553 }
554 } else {
555 inserts.push((key, value));
556 }
557 }
558
559 let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
560 let update_stmt = format!(
561 "UPDATE {NAMESPACE_TABLE_NAME} SET {NAMESPACE_FIELD_PROPERTY_VALUE} = ?
562 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
563 AND {NAMESPACE_FIELD_NAME} = ?
564 AND {NAMESPACE_FIELD_PROPERTY_KEY} = ?"
565 );
566
567 let insert_stmt = format!(
568 "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
569 VALUES (?, ?, ?, ?)"
570 );
571
572 for (key, value) in updates {
573 self.execute(
574 &update_stmt,
575 vec![
576 Some(value),
577 Some(&self.name),
578 Some(&namespace_str),
579 Some(key),
580 ],
581 Some(&mut tx),
582 )
583 .await?;
584 }
585
586 for (key, value) in inserts {
587 self.execute(
588 &insert_stmt,
589 vec![
590 Some(&self.name),
591 Some(&namespace_str),
592 Some(key),
593 Some(value),
594 ],
595 Some(&mut tx),
596 )
597 .await?;
598 }
599
600 let _ = tx.commit().await.map_err(from_sqlx_error)?;
601
602 Ok(())
603 } else {
604 no_such_namespace_err(namespace)
605 }
606 }
607
608 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
609 let exists = self.namespace_exists(namespace).await?;
610 if exists {
611 let tables = self.list_tables(namespace).await?;
613 if !tables.is_empty() {
614 return Err(Error::new(
615 iceberg::ErrorKind::Unexpected,
616 format!(
617 "Namespace {:?} is not empty. {} tables exist.",
618 namespace,
619 tables.len()
620 ),
621 ));
622 }
623
624 self.execute(
625 &format!(
626 "DELETE FROM {NAMESPACE_TABLE_NAME}
627 WHERE {NAMESPACE_FIELD_NAME} = ?
628 AND {CATALOG_FIELD_CATALOG_NAME} = ?"
629 ),
630 vec![Some(&namespace.join(".")), Some(&self.name)],
631 None,
632 )
633 .await?;
634
635 Ok(())
636 } else {
637 no_such_namespace_err(namespace)
638 }
639 }
640
641 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
642 let exists = self.namespace_exists(namespace).await?;
643 if exists {
644 let rows = self
645 .fetch_rows(
646 &format!(
647 "SELECT {CATALOG_FIELD_TABLE_NAME},
648 {CATALOG_FIELD_TABLE_NAMESPACE}
649 FROM {CATALOG_TABLE_NAME}
650 WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
651 AND {CATALOG_FIELD_CATALOG_NAME} = ?
652 AND (
653 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
654 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
655 )",
656 ),
657 vec![Some(&namespace.join(".")), Some(&self.name)],
658 )
659 .await?;
660
661 let mut tables = HashSet::<TableIdent>::with_capacity(rows.len());
662
663 for row in rows.iter() {
664 let tbl = row
665 .try_get::<String, _>(CATALOG_FIELD_TABLE_NAME)
666 .map_err(from_sqlx_error)?;
667 let ns_strs = row
668 .try_get::<String, _>(CATALOG_FIELD_TABLE_NAMESPACE)
669 .map_err(from_sqlx_error)?;
670 let ns = NamespaceIdent::from_strs(ns_strs.split("."))?;
671 tables.insert(TableIdent::new(ns, tbl));
672 }
673
674 Ok(tables.into_iter().collect::<Vec<TableIdent>>())
675 } else {
676 no_such_namespace_err(namespace)
677 }
678 }
679
680 async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
681 let namespace = identifier.namespace().join(".");
682 let table_name = identifier.name();
683 let table_counts = self
684 .fetch_rows(
685 &format!(
686 "SELECT 1
687 FROM {CATALOG_TABLE_NAME}
688 WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
689 AND {CATALOG_FIELD_CATALOG_NAME} = ?
690 AND {CATALOG_FIELD_TABLE_NAME} = ?
691 AND (
692 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
693 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
694 )"
695 ),
696 vec![Some(&namespace), Some(&self.name), Some(table_name)],
697 )
698 .await?;
699
700 if !table_counts.is_empty() {
701 Ok(true)
702 } else {
703 Ok(false)
704 }
705 }
706
707 async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
708 if !self.table_exists(identifier).await? {
709 return no_such_table_err(identifier);
710 }
711
712 self.execute(
713 &format!(
714 "DELETE FROM {CATALOG_TABLE_NAME}
715 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
716 AND {CATALOG_FIELD_TABLE_NAME} = ?
717 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
718 AND (
719 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
720 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
721 )"
722 ),
723 vec![
724 Some(&self.name),
725 Some(identifier.name()),
726 Some(&identifier.namespace().join(".")),
727 ],
728 None,
729 )
730 .await?;
731
732 Ok(())
733 }
734
735 async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
736 if !self.table_exists(identifier).await? {
737 return no_such_table_err(identifier);
738 }
739
740 let rows = self
741 .fetch_rows(
742 &format!(
743 "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP}
744 FROM {CATALOG_TABLE_NAME}
745 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
746 AND {CATALOG_FIELD_TABLE_NAME} = ?
747 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
748 AND (
749 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
750 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
751 )"
752 ),
753 vec![
754 Some(&self.name),
755 Some(identifier.name()),
756 Some(&identifier.namespace().join(".")),
757 ],
758 )
759 .await?;
760
761 if rows.is_empty() {
762 return no_such_table_err(identifier);
763 }
764
765 let row = &rows[0];
766 let tbl_metadata_location = row
767 .try_get::<String, _>(CATALOG_FIELD_METADATA_LOCATION_PROP)
768 .map_err(from_sqlx_error)?;
769
770 let metadata = TableMetadata::read_from(&self.fileio, &tbl_metadata_location).await?;
771
772 Ok(Table::builder()
773 .file_io(self.fileio.clone())
774 .identifier(identifier.clone())
775 .metadata_location(tbl_metadata_location)
776 .metadata(metadata)
777 .build()?)
778 }
779
780 async fn create_table(
781 &self,
782 namespace: &NamespaceIdent,
783 creation: TableCreation,
784 ) -> Result<Table> {
785 if !self.namespace_exists(namespace).await? {
786 return no_such_namespace_err(namespace);
787 }
788
789 let tbl_name = creation.name.clone();
790 let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone());
791
792 if self.table_exists(&tbl_ident).await? {
793 return table_already_exists_err(&tbl_ident);
794 }
795
796 let (tbl_creation, location) = match creation.location.clone() {
797 Some(location) => (creation, location),
798 None => {
799 let nsp_properties = self.get_namespace(namespace).await?.properties().clone();
802 let nsp_location = match nsp_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) {
803 Some(location) => location.clone(),
804 None => {
805 format!(
806 "{}/{}",
807 self.warehouse_location.clone(),
808 namespace.join("/")
809 )
810 }
811 };
812
813 let tbl_location = format!("{}/{}", nsp_location, tbl_ident.name());
814
815 (
816 TableCreation {
817 location: Some(tbl_location.clone()),
818 ..creation
819 },
820 tbl_location,
821 )
822 }
823 };
824
825 let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
826 .build()?
827 .metadata;
828 let tbl_metadata_location =
829 MetadataLocation::new_with_table_location(location.clone()).to_string();
830
831 tbl_metadata
832 .write_to(&self.fileio, &tbl_metadata_location)
833 .await?;
834
835 self.execute(&format!(
836 "INSERT INTO {CATALOG_TABLE_NAME}
837 ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
838 VALUES (?, ?, ?, ?, ?)
839 "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
840
841 Ok(Table::builder()
842 .file_io(self.fileio.clone())
843 .metadata_location(tbl_metadata_location)
844 .identifier(tbl_ident)
845 .metadata(tbl_metadata)
846 .build()?)
847 }
848
849 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
850 if src == dest {
851 return Ok(());
852 }
853
854 if !self.table_exists(src).await? {
855 return no_such_table_err(src);
856 }
857
858 if !self.namespace_exists(dest.namespace()).await? {
859 return no_such_namespace_err(dest.namespace());
860 }
861
862 if self.table_exists(dest).await? {
863 return table_already_exists_err(dest);
864 }
865
866 self.execute(
867 &format!(
868 "UPDATE {CATALOG_TABLE_NAME}
869 SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ?
870 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
871 AND {CATALOG_FIELD_TABLE_NAME} = ?
872 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
873 AND (
874 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
875 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
876 )"
877 ),
878 vec![
879 Some(dest.name()),
880 Some(&dest.namespace().join(".")),
881 Some(&self.name),
882 Some(src.name()),
883 Some(&src.namespace().join(".")),
884 ],
885 None,
886 )
887 .await?;
888
889 Ok(())
890 }
891
892 async fn register_table(
893 &self,
894 table_ident: &TableIdent,
895 metadata_location: String,
896 ) -> Result<Table> {
897 if self.table_exists(table_ident).await? {
898 return table_already_exists_err(table_ident);
899 }
900
901 let metadata = TableMetadata::read_from(&self.fileio, &metadata_location).await?;
902
903 let namespace = table_ident.namespace();
904 let tbl_name = table_ident.name().to_string();
905
906 self.execute(&format!(
907 "INSERT INTO {CATALOG_TABLE_NAME}
908 ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
909 VALUES (?, ?, ?, ?, ?)
910 "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name), Some(&metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
911
912 Ok(Table::builder()
913 .identifier(table_ident.clone())
914 .metadata_location(metadata_location)
915 .metadata(metadata)
916 .file_io(self.fileio.clone())
917 .build()?)
918 }
919
920 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
922 let table_ident = commit.identifier().clone();
923 let current_table = self.load_table(&table_ident).await?;
924 let current_metadata_location = current_table.metadata_location_result()?.to_string();
925
926 let staged_table = commit.apply(current_table)?;
927 let staged_metadata_location = staged_table.metadata_location_result()?;
928
929 staged_table
930 .metadata()
931 .write_to(staged_table.file_io(), &staged_metadata_location)
932 .await?;
933
934 let update_result = self
935 .execute(
936 &format!(
937 "UPDATE {CATALOG_TABLE_NAME}
938 SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?, {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} = ?
939 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
940 AND {CATALOG_FIELD_TABLE_NAME} = ?
941 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
942 AND (
943 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
944 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
945 )
946 AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?"
947 ),
948 vec![
949 Some(staged_metadata_location),
950 Some(current_metadata_location.as_str()),
951 Some(&self.name),
952 Some(table_ident.name()),
953 Some(&table_ident.namespace().join(".")),
954 Some(current_metadata_location.as_str()),
955 ],
956 None,
957 )
958 .await?;
959
960 if update_result.rows_affected() == 0 {
961 return Err(Error::new(
962 ErrorKind::CatalogCommitConflicts,
963 format!("Commit conflicted for table: {table_ident}"),
964 )
965 .with_retryable(true));
966 }
967
968 Ok(staged_table)
969 }
970}
971
972#[cfg(test)]
973mod tests {
974 use std::collections::{HashMap, HashSet};
975 use std::hash::Hash;
976
977 use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
978 use iceberg::table::Table;
979 use iceberg::transaction::{ApplyTransactionAction, Transaction};
980 use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent};
981 use itertools::Itertools;
982 use regex::Regex;
983 use sqlx::migrate::MigrateDatabase;
984 use tempfile::TempDir;
985
986 use crate::catalog::{
987 NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, SQL_CATALOG_PROP_URI,
988 SQL_CATALOG_PROP_WAREHOUSE,
989 };
990 use crate::{SqlBindStyle, SqlCatalogBuilder};
991
992 const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
993
994 fn temp_path() -> String {
995 let temp_dir = TempDir::new().unwrap();
996 temp_dir.path().to_str().unwrap().to_string()
997 }
998
999 fn to_set<T: std::cmp::Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
1000 HashSet::from_iter(vec)
1001 }
1002
1003 fn default_properties() -> HashMap<String, String> {
1004 HashMap::from([("exists".to_string(), "true".to_string())])
1005 }
1006
1007 async fn new_sql_catalog(warehouse_location: String) -> impl Catalog {
1008 let sql_lite_uri = format!("sqlite:{}", temp_path());
1009 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1010
1011 let props = HashMap::from_iter([
1012 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.to_string()),
1013 (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1014 (
1015 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1016 SqlBindStyle::DollarNumeric.to_string(),
1017 ),
1018 ]);
1019 SqlCatalogBuilder::default()
1020 .load("iceberg", props)
1021 .await
1022 .unwrap()
1023 }
1024
1025 async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
1026 let _ = catalog
1027 .create_namespace(namespace_ident, HashMap::new())
1028 .await
1029 .unwrap();
1030 }
1031
1032 async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
1033 for namespace_ident in namespace_idents {
1034 let _ = create_namespace(catalog, namespace_ident).await;
1035 }
1036 }
1037
1038 fn simple_table_schema() -> Schema {
1039 Schema::builder()
1040 .with_fields(vec![
1041 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
1042 ])
1043 .build()
1044 .unwrap()
1045 }
1046
1047 async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
1048 let _ = catalog
1049 .create_table(
1050 &table_ident.namespace,
1051 TableCreation::builder()
1052 .name(table_ident.name().into())
1053 .schema(simple_table_schema())
1054 .location(temp_path())
1055 .build(),
1056 )
1057 .await
1058 .unwrap();
1059 }
1060
1061 async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
1062 for table_ident in table_idents {
1063 create_table(catalog, table_ident).await;
1064 }
1065 }
1066
1067 fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
1068 assert_eq!(table.identifier(), expected_table_ident);
1069
1070 let metadata = table.metadata();
1071
1072 assert_eq!(metadata.current_schema().as_ref(), expected_schema);
1073
1074 let expected_partition_spec = PartitionSpec::builder(expected_schema.clone())
1075 .with_spec_id(0)
1076 .build()
1077 .unwrap();
1078
1079 assert_eq!(
1080 metadata
1081 .partition_specs_iter()
1082 .map(|p| p.as_ref())
1083 .collect_vec(),
1084 vec![&expected_partition_spec]
1085 );
1086
1087 let expected_sorted_order = SortOrder::builder()
1088 .with_order_id(0)
1089 .with_fields(vec![])
1090 .build(expected_schema)
1091 .unwrap();
1092
1093 assert_eq!(
1094 metadata
1095 .sort_orders_iter()
1096 .map(|s| s.as_ref())
1097 .collect_vec(),
1098 vec![&expected_sorted_order]
1099 );
1100
1101 assert_eq!(metadata.properties(), &HashMap::new());
1102
1103 assert!(!table.readonly());
1104 }
1105
1106 fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
1107 let actual = table.metadata_location().unwrap().to_string();
1108 let regex = Regex::new(regex_str).unwrap();
1109 assert!(regex.is_match(&actual))
1110 }
1111
1112 #[tokio::test]
1113 async fn test_initialized() {
1114 let warehouse_loc = temp_path();
1115 new_sql_catalog(warehouse_loc.clone()).await;
1116 new_sql_catalog(warehouse_loc.clone()).await;
1118 new_sql_catalog(warehouse_loc.clone()).await;
1119 }
1120
1121 #[tokio::test]
1122 async fn test_builder_method() {
1123 let sql_lite_uri = format!("sqlite:{}", temp_path());
1124 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1125 let warehouse_location = temp_path();
1126
1127 let catalog = SqlCatalogBuilder::default()
1128 .uri(sql_lite_uri.to_string())
1129 .warehouse_location(warehouse_location.clone())
1130 .sql_bind_style(SqlBindStyle::QMark)
1131 .load("iceberg", HashMap::default())
1132 .await;
1133 assert!(catalog.is_ok());
1134
1135 let catalog = catalog.unwrap();
1136 assert!(catalog.warehouse_location == warehouse_location);
1137 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1138 }
1139
1140 #[tokio::test]
1143 async fn test_builder_props_non_existent_path_fails() {
1144 let sql_lite_uri = format!("sqlite:{}", temp_path());
1145 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1146 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1147 let warehouse_location = temp_path();
1148
1149 let catalog = SqlCatalogBuilder::default()
1150 .uri(sql_lite_uri)
1151 .warehouse_location(warehouse_location)
1152 .load(
1153 "iceberg",
1154 HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)]),
1155 )
1156 .await;
1157 assert!(catalog.is_err());
1158 }
1159
1160 #[tokio::test]
1164 async fn test_builder_props_set_valid_uri() {
1165 let sql_lite_uri = format!("sqlite:{}", temp_path());
1166 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1167 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1168 let warehouse_location = temp_path();
1169
1170 let catalog = SqlCatalogBuilder::default()
1171 .uri(sql_lite_uri2)
1172 .warehouse_location(warehouse_location)
1173 .load(
1174 "iceberg",
1175 HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone())]),
1176 )
1177 .await;
1178 assert!(catalog.is_ok());
1179 }
1180
1181 #[tokio::test]
1183 async fn test_builder_props_take_precedence() {
1184 let sql_lite_uri = format!("sqlite:{}", temp_path());
1185 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1186 let warehouse_location = temp_path();
1187 let warehouse_location2 = temp_path();
1188
1189 let catalog = SqlCatalogBuilder::default()
1190 .warehouse_location(warehouse_location2)
1191 .sql_bind_style(SqlBindStyle::DollarNumeric)
1192 .load(
1193 "iceberg",
1194 HashMap::from_iter([
1195 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1196 (
1197 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1198 warehouse_location.clone(),
1199 ),
1200 (
1201 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1202 SqlBindStyle::QMark.to_string(),
1203 ),
1204 ]),
1205 )
1206 .await;
1207
1208 assert!(catalog.is_ok());
1209
1210 let catalog = catalog.unwrap();
1211 assert!(catalog.warehouse_location == warehouse_location);
1212 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1213 }
1214
1215 #[tokio::test]
1217 async fn test_builder_props_take_precedence_props() {
1218 let sql_lite_uri = format!("sqlite:{}", temp_path());
1219 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1220 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1221 let warehouse_location = temp_path();
1222 let warehouse_location2 = temp_path();
1223
1224 let props = HashMap::from_iter([
1225 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1226 (
1227 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1228 warehouse_location.clone(),
1229 ),
1230 (
1231 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1232 SqlBindStyle::QMark.to_string(),
1233 ),
1234 ]);
1235 let props2 = HashMap::from_iter([
1236 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2.clone()),
1237 (
1238 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1239 warehouse_location2.clone(),
1240 ),
1241 (
1242 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1243 SqlBindStyle::DollarNumeric.to_string(),
1244 ),
1245 ]);
1246
1247 let catalog = SqlCatalogBuilder::default()
1248 .props(props2)
1249 .load("iceberg", props)
1250 .await;
1251
1252 assert!(catalog.is_ok());
1253
1254 let catalog = catalog.unwrap();
1255 assert!(catalog.warehouse_location == warehouse_location);
1256 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1257 }
1258
1259 #[tokio::test]
1261 async fn test_builder_props_take_precedence_prop() {
1262 let sql_lite_uri = format!("sqlite:{}", temp_path());
1263 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1264 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1265 let warehouse_location = temp_path();
1266 let warehouse_location2 = temp_path();
1267
1268 let props = HashMap::from_iter([
1269 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1270 (
1271 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1272 warehouse_location.clone(),
1273 ),
1274 (
1275 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1276 SqlBindStyle::QMark.to_string(),
1277 ),
1278 ]);
1279
1280 let catalog = SqlCatalogBuilder::default()
1281 .prop(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)
1282 .prop(SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location2)
1283 .prop(
1284 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1285 SqlBindStyle::DollarNumeric.to_string(),
1286 )
1287 .load("iceberg", props)
1288 .await;
1289
1290 assert!(catalog.is_ok());
1291
1292 let catalog = catalog.unwrap();
1293 assert!(catalog.warehouse_location == warehouse_location);
1294 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1295 }
1296
1297 #[tokio::test]
1299 async fn test_builder_props_invalid_bind_style_fails() {
1300 let sql_lite_uri = format!("sqlite:{}", temp_path());
1301 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1302 let warehouse_location = temp_path();
1303
1304 let catalog = SqlCatalogBuilder::default()
1305 .load(
1306 "iceberg",
1307 HashMap::from_iter([
1308 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1309 (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1310 (SQL_CATALOG_PROP_BIND_STYLE.to_string(), "AAA".to_string()),
1311 ]),
1312 )
1313 .await;
1314
1315 assert!(catalog.is_err());
1316 }
1317
1318 #[tokio::test]
1319 async fn test_list_namespaces_returns_empty_vector() {
1320 let warehouse_loc = temp_path();
1321 let catalog = new_sql_catalog(warehouse_loc).await;
1322
1323 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
1324 }
1325
1326 #[tokio::test]
1327 async fn test_list_namespaces_returns_multiple_namespaces() {
1328 let warehouse_loc = temp_path();
1329 let catalog = new_sql_catalog(warehouse_loc).await;
1330 let namespace_ident_1 = NamespaceIdent::new("a".into());
1331 let namespace_ident_2 = NamespaceIdent::new("b".into());
1332 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1333
1334 assert_eq!(
1335 to_set(catalog.list_namespaces(None).await.unwrap()),
1336 to_set(vec![namespace_ident_1, namespace_ident_2])
1337 );
1338 }
1339
1340 #[tokio::test]
1341 async fn test_list_namespaces_returns_only_top_level_namespaces() {
1342 let warehouse_loc = temp_path();
1343 let catalog = new_sql_catalog(warehouse_loc).await;
1344 let namespace_ident_1 = NamespaceIdent::new("a".into());
1345 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1346 let namespace_ident_3 = NamespaceIdent::new("b".into());
1347 create_namespaces(&catalog, &vec![
1348 &namespace_ident_1,
1349 &namespace_ident_2,
1350 &namespace_ident_3,
1351 ])
1352 .await;
1353
1354 assert_eq!(
1355 to_set(catalog.list_namespaces(None).await.unwrap()),
1356 to_set(vec![namespace_ident_1, namespace_ident_3])
1357 );
1358 }
1359
1360 #[tokio::test]
1361 async fn test_list_namespaces_returns_no_namespaces_under_parent() {
1362 let warehouse_loc = temp_path();
1363 let catalog = new_sql_catalog(warehouse_loc).await;
1364 let namespace_ident_1 = NamespaceIdent::new("a".into());
1365 let namespace_ident_2 = NamespaceIdent::new("b".into());
1366 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1367
1368 assert_eq!(
1369 catalog
1370 .list_namespaces(Some(&namespace_ident_1))
1371 .await
1372 .unwrap(),
1373 vec![]
1374 );
1375 }
1376
1377 #[tokio::test]
1378 async fn test_list_namespaces_returns_namespace_under_parent() {
1379 let warehouse_loc = temp_path();
1380 let catalog = new_sql_catalog(warehouse_loc).await;
1381 let namespace_ident_1 = NamespaceIdent::new("a".into());
1382 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1383 let namespace_ident_3 = NamespaceIdent::new("c".into());
1384 create_namespaces(&catalog, &vec![
1385 &namespace_ident_1,
1386 &namespace_ident_2,
1387 &namespace_ident_3,
1388 ])
1389 .await;
1390
1391 assert_eq!(
1392 to_set(catalog.list_namespaces(None).await.unwrap()),
1393 to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
1394 );
1395
1396 assert_eq!(
1397 catalog
1398 .list_namespaces(Some(&namespace_ident_1))
1399 .await
1400 .unwrap(),
1401 vec![NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()]
1402 );
1403 }
1404
1405 #[tokio::test]
1406 async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
1407 let warehouse_loc = temp_path();
1408 let catalog = new_sql_catalog(warehouse_loc).await;
1409 let namespace_ident_1 = NamespaceIdent::new("a".to_string());
1410 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
1411 let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1412 let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
1413 let namespace_ident_5 = NamespaceIdent::new("b".into());
1414 create_namespaces(&catalog, &vec![
1415 &namespace_ident_1,
1416 &namespace_ident_2,
1417 &namespace_ident_3,
1418 &namespace_ident_4,
1419 &namespace_ident_5,
1420 ])
1421 .await;
1422
1423 assert_eq!(
1424 to_set(
1425 catalog
1426 .list_namespaces(Some(&namespace_ident_1))
1427 .await
1428 .unwrap()
1429 ),
1430 to_set(vec![
1431 NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(),
1432 NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(),
1433 NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(),
1434 ])
1435 );
1436 }
1437
1438 #[tokio::test]
1439 async fn test_namespace_exists_returns_false() {
1440 let warehouse_loc = temp_path();
1441 let catalog = new_sql_catalog(warehouse_loc).await;
1442 let namespace_ident = NamespaceIdent::new("a".into());
1443 create_namespace(&catalog, &namespace_ident).await;
1444
1445 assert!(
1446 !catalog
1447 .namespace_exists(&NamespaceIdent::new("b".into()))
1448 .await
1449 .unwrap()
1450 );
1451 }
1452
1453 #[tokio::test]
1454 async fn test_namespace_exists_returns_true() {
1455 let warehouse_loc = temp_path();
1456 let catalog = new_sql_catalog(warehouse_loc).await;
1457 let namespace_ident = NamespaceIdent::new("a".into());
1458 create_namespace(&catalog, &namespace_ident).await;
1459
1460 assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
1461 }
1462
1463 #[tokio::test]
1464 async fn test_create_namespace_with_properties() {
1465 let warehouse_loc = temp_path();
1466 let catalog = new_sql_catalog(warehouse_loc).await;
1467 let namespace_ident = NamespaceIdent::new("abc".into());
1468
1469 let mut properties = default_properties();
1470 properties.insert("k".into(), "v".into());
1471
1472 assert_eq!(
1473 catalog
1474 .create_namespace(&namespace_ident, properties.clone())
1475 .await
1476 .unwrap(),
1477 Namespace::with_properties(namespace_ident.clone(), properties.clone())
1478 );
1479
1480 assert_eq!(
1481 catalog.get_namespace(&namespace_ident).await.unwrap(),
1482 Namespace::with_properties(namespace_ident, properties)
1483 );
1484 }
1485
1486 #[tokio::test]
1487 async fn test_create_namespace_throws_error_if_namespace_already_exists() {
1488 let warehouse_loc = temp_path();
1489 let catalog = new_sql_catalog(warehouse_loc).await;
1490 let namespace_ident = NamespaceIdent::new("a".into());
1491 create_namespace(&catalog, &namespace_ident).await;
1492
1493 assert_eq!(
1494 catalog
1495 .create_namespace(&namespace_ident, HashMap::new())
1496 .await
1497 .unwrap_err()
1498 .to_string(),
1499 format!(
1500 "Unexpected => Namespace {:?} already exists",
1501 &namespace_ident
1502 )
1503 );
1504
1505 assert_eq!(
1506 catalog.get_namespace(&namespace_ident).await.unwrap(),
1507 Namespace::with_properties(namespace_ident, default_properties())
1508 );
1509 }
1510
1511 #[tokio::test]
1512 async fn test_create_nested_namespace() {
1513 let warehouse_loc = temp_path();
1514 let catalog = new_sql_catalog(warehouse_loc).await;
1515 let parent_namespace_ident = NamespaceIdent::new("a".into());
1516 create_namespace(&catalog, &parent_namespace_ident).await;
1517
1518 let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1519
1520 assert_eq!(
1521 catalog
1522 .create_namespace(&child_namespace_ident, HashMap::new())
1523 .await
1524 .unwrap(),
1525 Namespace::new(child_namespace_ident.clone())
1526 );
1527
1528 assert_eq!(
1529 catalog.get_namespace(&child_namespace_ident).await.unwrap(),
1530 Namespace::with_properties(child_namespace_ident, default_properties())
1531 );
1532 }
1533
1534 #[tokio::test]
1535 async fn test_create_deeply_nested_namespace() {
1536 let warehouse_loc = temp_path();
1537 let catalog = new_sql_catalog(warehouse_loc).await;
1538 let namespace_ident_a = NamespaceIdent::new("a".into());
1539 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1540 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1541
1542 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1543
1544 assert_eq!(
1545 catalog
1546 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
1547 .await
1548 .unwrap(),
1549 Namespace::new(namespace_ident_a_b_c.clone())
1550 );
1551
1552 assert_eq!(
1553 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
1554 Namespace::with_properties(namespace_ident_a_b_c, default_properties())
1555 );
1556 }
1557
1558 #[tokio::test]
1559 async fn test_update_namespace_noop() {
1560 let warehouse_loc = temp_path();
1561 let catalog = new_sql_catalog(warehouse_loc).await;
1562 let namespace_ident = NamespaceIdent::new("a".into());
1563 create_namespace(&catalog, &namespace_ident).await;
1564
1565 catalog
1566 .update_namespace(&namespace_ident, HashMap::new())
1567 .await
1568 .unwrap();
1569
1570 assert_eq!(
1571 *catalog
1572 .get_namespace(&namespace_ident)
1573 .await
1574 .unwrap()
1575 .properties(),
1576 HashMap::from_iter([("exists".to_string(), "true".to_string())])
1577 )
1578 }
1579
1580 #[tokio::test]
1581 async fn test_update_namespace() {
1582 let warehouse_loc = temp_path();
1583 let catalog = new_sql_catalog(warehouse_loc).await;
1584 let namespace_ident = NamespaceIdent::new("a".into());
1585 create_namespace(&catalog, &namespace_ident).await;
1586
1587 let mut props = HashMap::from_iter([
1588 ("prop1".to_string(), "val1".to_string()),
1589 ("prop2".into(), "val2".into()),
1590 ]);
1591
1592 catalog
1593 .update_namespace(&namespace_ident, props.clone())
1594 .await
1595 .unwrap();
1596
1597 props.insert("exists".into(), "true".into());
1598
1599 assert_eq!(
1600 *catalog
1601 .get_namespace(&namespace_ident)
1602 .await
1603 .unwrap()
1604 .properties(),
1605 props
1606 )
1607 }
1608
1609 #[tokio::test]
1610 async fn test_update_nested_namespace() {
1611 let warehouse_loc = temp_path();
1612 let catalog = new_sql_catalog(warehouse_loc).await;
1613 let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1614 create_namespace(&catalog, &namespace_ident).await;
1615
1616 let mut props = HashMap::from_iter([
1617 ("prop1".to_string(), "val1".to_string()),
1618 ("prop2".into(), "val2".into()),
1619 ]);
1620
1621 catalog
1622 .update_namespace(&namespace_ident, props.clone())
1623 .await
1624 .unwrap();
1625
1626 props.insert("exists".into(), "true".into());
1627
1628 assert_eq!(
1629 *catalog
1630 .get_namespace(&namespace_ident)
1631 .await
1632 .unwrap()
1633 .properties(),
1634 props
1635 )
1636 }
1637
1638 #[tokio::test]
1639 async fn test_update_namespace_errors_if_namespace_doesnt_exist() {
1640 let warehouse_loc = temp_path();
1641 let catalog = new_sql_catalog(warehouse_loc).await;
1642 let namespace_ident = NamespaceIdent::new("a".into());
1643
1644 let props = HashMap::from_iter([
1645 ("prop1".to_string(), "val1".to_string()),
1646 ("prop2".into(), "val2".into()),
1647 ]);
1648
1649 let err = catalog
1650 .update_namespace(&namespace_ident, props)
1651 .await
1652 .unwrap_err();
1653
1654 assert_eq!(
1655 err.message(),
1656 format!("No such namespace: {namespace_ident:?}")
1657 );
1658 }
1659
1660 #[tokio::test]
1661 async fn test_update_namespace_errors_if_nested_namespace_doesnt_exist() {
1662 let warehouse_loc = temp_path();
1663 let catalog = new_sql_catalog(warehouse_loc).await;
1664 let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1665
1666 let props = HashMap::from_iter([
1667 ("prop1".to_string(), "val1".to_string()),
1668 ("prop2".into(), "val2".into()),
1669 ]);
1670
1671 let err = catalog
1672 .update_namespace(&namespace_ident, props)
1673 .await
1674 .unwrap_err();
1675
1676 assert_eq!(
1677 err.message(),
1678 format!("No such namespace: {namespace_ident:?}")
1679 );
1680 }
1681
1682 #[tokio::test]
1683 async fn test_drop_namespace() {
1684 let warehouse_loc = temp_path();
1685 let catalog = new_sql_catalog(warehouse_loc).await;
1686 let namespace_ident = NamespaceIdent::new("abc".into());
1687 create_namespace(&catalog, &namespace_ident).await;
1688
1689 catalog.drop_namespace(&namespace_ident).await.unwrap();
1690
1691 assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
1692 }
1693
1694 #[tokio::test]
1695 async fn test_drop_nested_namespace() {
1696 let warehouse_loc = temp_path();
1697 let catalog = new_sql_catalog(warehouse_loc).await;
1698 let namespace_ident_a = NamespaceIdent::new("a".into());
1699 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1700 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1701
1702 catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1703
1704 assert!(
1705 !catalog
1706 .namespace_exists(&namespace_ident_a_b)
1707 .await
1708 .unwrap()
1709 );
1710
1711 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1712 }
1713
1714 #[tokio::test]
1715 async fn test_drop_deeply_nested_namespace() {
1716 let warehouse_loc = temp_path();
1717 let catalog = new_sql_catalog(warehouse_loc).await;
1718 let namespace_ident_a = NamespaceIdent::new("a".into());
1719 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1720 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1721 create_namespaces(&catalog, &vec![
1722 &namespace_ident_a,
1723 &namespace_ident_a_b,
1724 &namespace_ident_a_b_c,
1725 ])
1726 .await;
1727
1728 catalog
1729 .drop_namespace(&namespace_ident_a_b_c)
1730 .await
1731 .unwrap();
1732
1733 assert!(
1734 !catalog
1735 .namespace_exists(&namespace_ident_a_b_c)
1736 .await
1737 .unwrap()
1738 );
1739
1740 assert!(
1741 catalog
1742 .namespace_exists(&namespace_ident_a_b)
1743 .await
1744 .unwrap()
1745 );
1746
1747 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1748 }
1749
1750 #[tokio::test]
1751 async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1752 let warehouse_loc = temp_path();
1753 let catalog = new_sql_catalog(warehouse_loc).await;
1754
1755 let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1756 assert_eq!(
1757 catalog
1758 .drop_namespace(&non_existent_namespace_ident)
1759 .await
1760 .unwrap_err()
1761 .to_string(),
1762 format!("Unexpected => No such namespace: {non_existent_namespace_ident:?}")
1763 )
1764 }
1765
1766 #[tokio::test]
1767 async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1768 let warehouse_loc = temp_path();
1769 let catalog = new_sql_catalog(warehouse_loc).await;
1770 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1771
1772 let non_existent_namespace_ident =
1773 NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1774 assert_eq!(
1775 catalog
1776 .drop_namespace(&non_existent_namespace_ident)
1777 .await
1778 .unwrap_err()
1779 .to_string(),
1780 format!("Unexpected => No such namespace: {non_existent_namespace_ident:?}")
1781 )
1782 }
1783
1784 #[tokio::test]
1785 async fn test_dropping_a_namespace_does_not_drop_namespaces_nested_under_that_one() {
1786 let warehouse_loc = temp_path();
1787 let catalog = new_sql_catalog(warehouse_loc).await;
1788 let namespace_ident_a = NamespaceIdent::new("a".into());
1789 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1790 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1791
1792 catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1793
1794 assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1795
1796 assert!(
1797 catalog
1798 .namespace_exists(&namespace_ident_a_b)
1799 .await
1800 .unwrap()
1801 );
1802 }
1803
1804 #[tokio::test]
1805 async fn test_list_tables_returns_empty_vector() {
1806 let warehouse_loc = temp_path();
1807 let catalog = new_sql_catalog(warehouse_loc).await;
1808 let namespace_ident = NamespaceIdent::new("a".into());
1809 create_namespace(&catalog, &namespace_ident).await;
1810
1811 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1812 }
1813
1814 #[tokio::test]
1815 async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1816 let warehouse_loc = temp_path();
1817 let catalog = new_sql_catalog(warehouse_loc).await;
1818
1819 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1820
1821 assert_eq!(
1822 catalog
1823 .list_tables(&non_existent_namespace_ident)
1824 .await
1825 .unwrap_err()
1826 .to_string(),
1827 format!("Unexpected => No such namespace: {non_existent_namespace_ident:?}"),
1828 );
1829 }
1830
1831 #[tokio::test]
1832 async fn test_create_table_with_location() {
1833 let warehouse_loc = temp_path();
1834 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
1835 let namespace_ident = NamespaceIdent::new("a".into());
1836 create_namespace(&catalog, &namespace_ident).await;
1837
1838 let table_name = "abc";
1839 let location = warehouse_loc.clone();
1840 let table_creation = TableCreation::builder()
1841 .name(table_name.into())
1842 .location(location.clone())
1843 .schema(simple_table_schema())
1844 .build();
1845
1846 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1847
1848 assert_table_eq(
1849 &catalog
1850 .create_table(&namespace_ident, table_creation)
1851 .await
1852 .unwrap(),
1853 &expected_table_ident,
1854 &simple_table_schema(),
1855 );
1856
1857 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1858
1859 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1860
1861 assert!(
1862 table
1863 .metadata_location()
1864 .unwrap()
1865 .to_string()
1866 .starts_with(&location)
1867 )
1868 }
1869
1870 #[tokio::test]
1871 async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1872 let warehouse_loc = temp_path();
1873 let catalog = new_sql_catalog(warehouse_loc).await;
1874
1875 let namespace_ident = NamespaceIdent::new("a".into());
1876 let mut namespace_properties = HashMap::new();
1877 let namespace_location = temp_path();
1878 namespace_properties.insert(
1879 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1880 namespace_location.to_string(),
1881 );
1882 catalog
1883 .create_namespace(&namespace_ident, namespace_properties)
1884 .await
1885 .unwrap();
1886
1887 let table_name = "tbl1";
1888 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1889 let expected_table_metadata_location_regex =
1890 format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1891
1892 let table = catalog
1893 .create_table(
1894 &namespace_ident,
1895 TableCreation::builder()
1896 .name(table_name.into())
1897 .schema(simple_table_schema())
1898 .build(),
1900 )
1901 .await
1902 .unwrap();
1903 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1904 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1905
1906 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1907 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1908 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1909 }
1910
1911 #[tokio::test]
1912 async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1913 {
1914 let warehouse_loc = temp_path();
1915 let catalog = new_sql_catalog(warehouse_loc).await;
1916
1917 let namespace_ident = NamespaceIdent::new("a".into());
1918 let mut namespace_properties = HashMap::new();
1919 let namespace_location = temp_path();
1920 namespace_properties.insert(
1921 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1922 namespace_location.to_string(),
1923 );
1924 catalog
1925 .create_namespace(&namespace_ident, namespace_properties)
1926 .await
1927 .unwrap();
1928
1929 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1930 let mut nested_namespace_properties = HashMap::new();
1931 let nested_namespace_location = temp_path();
1932 nested_namespace_properties.insert(
1933 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1934 nested_namespace_location.to_string(),
1935 );
1936 catalog
1937 .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1938 .await
1939 .unwrap();
1940
1941 let table_name = "tbl1";
1942 let expected_table_ident =
1943 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1944 let expected_table_metadata_location_regex = format!(
1945 "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1946 );
1947
1948 let table = catalog
1949 .create_table(
1950 &nested_namespace_ident,
1951 TableCreation::builder()
1952 .name(table_name.into())
1953 .schema(simple_table_schema())
1954 .build(),
1956 )
1957 .await
1958 .unwrap();
1959 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1960 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1961
1962 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1963 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1964 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1965 }
1966
1967 #[tokio::test]
1968 async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1969 {
1970 let warehouse_loc = temp_path();
1971 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
1972
1973 let namespace_ident = NamespaceIdent::new("a".into());
1974 let namespace_properties = HashMap::new();
1976 catalog
1977 .create_namespace(&namespace_ident, namespace_properties)
1978 .await
1979 .unwrap();
1980
1981 let table_name = "tbl1";
1982 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1983 let expected_table_metadata_location_regex =
1984 format!("^{warehouse_loc}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1985
1986 let table = catalog
1987 .create_table(
1988 &namespace_ident,
1989 TableCreation::builder()
1990 .name(table_name.into())
1991 .schema(simple_table_schema())
1992 .build(),
1994 )
1995 .await
1996 .unwrap();
1997 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1998 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1999
2000 let table = catalog.load_table(&expected_table_ident).await.unwrap();
2001 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2002 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2003 }
2004
2005 #[tokio::test]
2006 async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
2007 {
2008 let warehouse_loc = temp_path();
2009 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
2010
2011 let namespace_ident = NamespaceIdent::new("a".into());
2012 create_namespace(&catalog, &namespace_ident).await;
2013
2014 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
2015 create_namespace(&catalog, &nested_namespace_ident).await;
2016
2017 let table_name = "tbl1";
2018 let expected_table_ident =
2019 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
2020 let expected_table_metadata_location_regex =
2021 format!("^{warehouse_loc}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
2022
2023 let table = catalog
2024 .create_table(
2025 &nested_namespace_ident,
2026 TableCreation::builder()
2027 .name(table_name.into())
2028 .schema(simple_table_schema())
2029 .build(),
2031 )
2032 .await
2033 .unwrap();
2034 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2035 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2036
2037 let table = catalog.load_table(&expected_table_ident).await.unwrap();
2038 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
2039 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
2040 }
2041
2042 #[tokio::test]
2043 async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
2044 let warehouse_loc = temp_path();
2045 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
2046 let namespace_ident = NamespaceIdent::new("a".into());
2047 create_namespace(&catalog, &namespace_ident).await;
2048 let table_name = "tbl1";
2049 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2050 create_table(&catalog, &table_ident).await;
2051
2052 let tmp_dir = TempDir::new().unwrap();
2053 let location = tmp_dir.path().to_str().unwrap().to_string();
2054
2055 assert_eq!(
2056 catalog
2057 .create_table(
2058 &namespace_ident,
2059 TableCreation::builder()
2060 .name(table_name.into())
2061 .schema(simple_table_schema())
2062 .location(location)
2063 .build()
2064 )
2065 .await
2066 .unwrap_err()
2067 .to_string(),
2068 format!("Unexpected => Table {:?} already exists.", &table_ident)
2069 );
2070 }
2071
2072 #[tokio::test]
2073 async fn test_rename_table_in_same_namespace() {
2074 let warehouse_loc = temp_path();
2075 let catalog = new_sql_catalog(warehouse_loc).await;
2076 let namespace_ident = NamespaceIdent::new("n1".into());
2077 create_namespace(&catalog, &namespace_ident).await;
2078 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2079 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
2080 create_table(&catalog, &src_table_ident).await;
2081
2082 catalog
2083 .rename_table(&src_table_ident, &dst_table_ident)
2084 .await
2085 .unwrap();
2086
2087 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
2088 dst_table_ident
2089 ],);
2090 }
2091
2092 #[tokio::test]
2093 async fn test_rename_table_across_namespaces() {
2094 let warehouse_loc = temp_path();
2095 let catalog = new_sql_catalog(warehouse_loc).await;
2096 let src_namespace_ident = NamespaceIdent::new("a".into());
2097 let dst_namespace_ident = NamespaceIdent::new("b".into());
2098 create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
2099 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
2100 let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
2101 create_table(&catalog, &src_table_ident).await;
2102
2103 catalog
2104 .rename_table(&src_table_ident, &dst_table_ident)
2105 .await
2106 .unwrap();
2107
2108 assert_eq!(
2109 catalog.list_tables(&src_namespace_ident).await.unwrap(),
2110 vec![],
2111 );
2112
2113 assert_eq!(
2114 catalog.list_tables(&dst_namespace_ident).await.unwrap(),
2115 vec![dst_table_ident],
2116 );
2117 }
2118
2119 #[tokio::test]
2120 async fn test_rename_table_src_table_is_same_as_dst_table() {
2121 let warehouse_loc = temp_path();
2122 let catalog = new_sql_catalog(warehouse_loc).await;
2123 let namespace_ident = NamespaceIdent::new("n1".into());
2124 create_namespace(&catalog, &namespace_ident).await;
2125 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
2126 create_table(&catalog, &table_ident).await;
2127
2128 catalog
2129 .rename_table(&table_ident, &table_ident)
2130 .await
2131 .unwrap();
2132
2133 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
2134 table_ident
2135 ],);
2136 }
2137
2138 #[tokio::test]
2139 async fn test_rename_table_across_nested_namespaces() {
2140 let warehouse_loc = temp_path();
2141 let catalog = new_sql_catalog(warehouse_loc).await;
2142 let namespace_ident_a = NamespaceIdent::new("a".into());
2143 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
2144 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
2145 create_namespaces(&catalog, &vec![
2146 &namespace_ident_a,
2147 &namespace_ident_a_b,
2148 &namespace_ident_a_b_c,
2149 ])
2150 .await;
2151
2152 let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
2153 create_tables(&catalog, vec![&src_table_ident]).await;
2154
2155 let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
2156 catalog
2157 .rename_table(&src_table_ident, &dst_table_ident)
2158 .await
2159 .unwrap();
2160
2161 assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
2162
2163 assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
2164 }
2165
2166 #[tokio::test]
2167 async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
2168 let warehouse_loc = temp_path();
2169 let catalog = new_sql_catalog(warehouse_loc).await;
2170 let src_namespace_ident = NamespaceIdent::new("n1".into());
2171 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
2172 create_namespace(&catalog, &src_namespace_ident).await;
2173 create_table(&catalog, &src_table_ident).await;
2174
2175 let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
2176 let dst_table_ident =
2177 TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
2178 assert_eq!(
2179 catalog
2180 .rename_table(&src_table_ident, &dst_table_ident)
2181 .await
2182 .unwrap_err()
2183 .to_string(),
2184 format!("Unexpected => No such namespace: {non_existent_dst_namespace_ident:?}"),
2185 );
2186 }
2187
2188 #[tokio::test]
2189 async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
2190 let warehouse_loc = temp_path();
2191 let catalog = new_sql_catalog(warehouse_loc).await;
2192 let namespace_ident = NamespaceIdent::new("n1".into());
2193 create_namespace(&catalog, &namespace_ident).await;
2194 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2195 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
2196
2197 assert_eq!(
2198 catalog
2199 .rename_table(&src_table_ident, &dst_table_ident)
2200 .await
2201 .unwrap_err()
2202 .to_string(),
2203 format!("Unexpected => No such table: {src_table_ident:?}"),
2204 );
2205 }
2206
2207 #[tokio::test]
2208 async fn test_rename_table_throws_error_if_dst_table_already_exists() {
2209 let warehouse_loc = temp_path();
2210 let catalog = new_sql_catalog(warehouse_loc).await;
2211 let namespace_ident = NamespaceIdent::new("n1".into());
2212 create_namespace(&catalog, &namespace_ident).await;
2213 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2214 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
2215 create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
2216
2217 assert_eq!(
2218 catalog
2219 .rename_table(&src_table_ident, &dst_table_ident)
2220 .await
2221 .unwrap_err()
2222 .to_string(),
2223 format!("Unexpected => Table {:?} already exists.", &dst_table_ident),
2224 );
2225 }
2226
2227 #[tokio::test]
2228 async fn test_drop_table_throws_error_if_table_not_exist() {
2229 let warehouse_loc = temp_path();
2230 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
2231 let namespace_ident = NamespaceIdent::new("a".into());
2232 let table_name = "tbl1";
2233 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2234 create_namespace(&catalog, &namespace_ident).await;
2235
2236 let err = catalog
2237 .drop_table(&table_ident)
2238 .await
2239 .unwrap_err()
2240 .to_string();
2241 assert_eq!(
2242 err,
2243 "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
2244 );
2245 }
2246
2247 #[tokio::test]
2248 async fn test_drop_table() {
2249 let warehouse_loc = temp_path();
2250 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
2251 let namespace_ident = NamespaceIdent::new("a".into());
2252 let table_name = "tbl1";
2253 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2254 create_namespace(&catalog, &namespace_ident).await;
2255
2256 let location = warehouse_loc.clone();
2257 let table_creation = TableCreation::builder()
2258 .name(table_name.into())
2259 .location(location.clone())
2260 .schema(simple_table_schema())
2261 .build();
2262
2263 catalog
2264 .create_table(&namespace_ident, table_creation)
2265 .await
2266 .unwrap();
2267
2268 let table = catalog.load_table(&table_ident).await.unwrap();
2269 assert_table_eq(&table, &table_ident, &simple_table_schema());
2270
2271 catalog.drop_table(&table_ident).await.unwrap();
2272 let err = catalog
2273 .load_table(&table_ident)
2274 .await
2275 .unwrap_err()
2276 .to_string();
2277 assert_eq!(
2278 err,
2279 "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
2280 );
2281 }
2282
2283 #[tokio::test]
2284 async fn test_register_table_throws_error_if_table_with_same_name_already_exists() {
2285 let warehouse_loc = temp_path();
2286 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
2287 let namespace_ident = NamespaceIdent::new("a".into());
2288 create_namespace(&catalog, &namespace_ident).await;
2289 let table_name = "tbl1";
2290 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2291 create_table(&catalog, &table_ident).await;
2292
2293 assert_eq!(
2294 catalog
2295 .register_table(&table_ident, warehouse_loc)
2296 .await
2297 .unwrap_err()
2298 .to_string(),
2299 format!("Unexpected => Table {:?} already exists.", &table_ident)
2300 );
2301 }
2302
2303 #[tokio::test]
2304 async fn test_register_table() {
2305 let warehouse_loc = temp_path();
2306 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
2307 let namespace_ident = NamespaceIdent::new("a".into());
2308 create_namespace(&catalog, &namespace_ident).await;
2309
2310 let table_name = "abc";
2311 let location = warehouse_loc.clone();
2312 let table_creation = TableCreation::builder()
2313 .name(table_name.into())
2314 .location(location.clone())
2315 .schema(simple_table_schema())
2316 .build();
2317
2318 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2319 let expected_table = catalog
2320 .create_table(&namespace_ident, table_creation)
2321 .await
2322 .unwrap();
2323
2324 let metadata_location = expected_table
2325 .metadata_location()
2326 .expect("Expected metadata location to be set")
2327 .to_string();
2328
2329 assert_table_eq(&expected_table, &table_ident, &simple_table_schema());
2330
2331 let _ = catalog.drop_table(&table_ident).await;
2332
2333 let table = catalog
2334 .register_table(&table_ident, metadata_location.clone())
2335 .await
2336 .unwrap();
2337
2338 assert_eq!(table.identifier(), expected_table.identifier());
2339 assert_eq!(table.metadata_location(), Some(metadata_location.as_str()));
2340 }
2341
2342 #[tokio::test]
2343 async fn test_update_table() {
2344 let warehouse_loc = temp_path();
2345 let catalog = new_sql_catalog(warehouse_loc).await;
2346
2347 let namespace_ident = NamespaceIdent::new("ns1".into());
2349 create_namespace(&catalog, &namespace_ident).await;
2350 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2351 create_table(&catalog, &table_ident).await;
2352
2353 let table = catalog.load_table(&table_ident).await.unwrap();
2354
2355 let original_metadata_location = table.metadata_location().unwrap().to_string();
2357
2358 let tx = Transaction::new(&table);
2360 let tx = tx
2361 .update_table_properties()
2362 .set("test_property".to_string(), "test_value".to_string())
2363 .apply(tx)
2364 .unwrap();
2365
2366 let updated_table = tx.commit(&catalog).await.unwrap();
2368
2369 assert_eq!(
2371 updated_table.metadata().properties().get("test_property"),
2372 Some(&"test_value".to_string())
2373 );
2374 assert_ne!(
2376 updated_table.metadata_location().unwrap(),
2377 original_metadata_location.as_str()
2378 );
2379
2380 let reloaded = catalog.load_table(&table_ident).await.unwrap();
2382
2383 assert_eq!(
2385 reloaded.metadata().properties().get("test_property"),
2386 Some(&"test_value".to_string())
2387 );
2388 assert_eq!(
2389 reloaded.metadata_location(),
2390 updated_table.metadata_location()
2391 );
2392 }
2393}