1use std::collections::{HashMap, HashSet};
19use std::str::FromStr;
20use std::time::Duration;
21
22use async_trait::async_trait;
23use iceberg::io::FileIO;
24use iceberg::spec::{TableMetadata, TableMetadataBuilder};
25use iceberg::table::Table;
26use iceberg::{
27 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
28 TableCommit, TableCreation, TableIdent,
29};
30use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers};
31use sqlx::{Any, AnyPool, Row, Transaction};
32
33use crate::error::{
34 from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err,
35};
36
37pub const SQL_CATALOG_PROP_URI: &str = "uri";
39pub const SQL_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
41pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style";
43
44static CATALOG_TABLE_NAME: &str = "iceberg_tables";
45static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
46static CATALOG_FIELD_TABLE_NAME: &str = "table_name";
47static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace";
48static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location";
49static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
50static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type";
51static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE";
52
53static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties";
54static NAMESPACE_FIELD_NAME: &str = "namespace";
55static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key";
56static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value";
57
58static NAMESPACE_LOCATION_PROPERTY_KEY: &str = "location";
59
60static MAX_CONNECTIONS: u32 = 10; static IDLE_TIMEOUT: u64 = 10; static TEST_BEFORE_ACQUIRE: bool = true; #[derive(Debug)]
66pub struct SqlCatalogBuilder(SqlCatalogConfig);
67
68impl Default for SqlCatalogBuilder {
69 fn default() -> Self {
70 Self(SqlCatalogConfig {
71 uri: "".to_string(),
72 name: "".to_string(),
73 warehouse_location: "".to_string(),
74 sql_bind_style: SqlBindStyle::DollarNumeric,
75 props: HashMap::new(),
76 })
77 }
78}
79
80impl SqlCatalogBuilder {
81 pub fn uri(mut self, uri: impl Into<String>) -> Self {
86 self.0.uri = uri.into();
87 self
88 }
89
90 pub fn warehouse_location(mut self, location: impl Into<String>) -> Self {
95 self.0.warehouse_location = location.into();
96 self
97 }
98
99 pub fn sql_bind_style(mut self, sql_bind_style: SqlBindStyle) -> Self {
104 self.0.sql_bind_style = sql_bind_style;
105 self
106 }
107
108 pub fn props(mut self, props: HashMap<String, String>) -> Self {
113 for (k, v) in props {
114 self.0.props.insert(k, v);
115 }
116 self
117 }
118
119 pub fn prop(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
126 self.0.props.insert(key.into(), value.into());
127 self
128 }
129}
130
131impl CatalogBuilder for SqlCatalogBuilder {
132 type C = SqlCatalog;
133
134 fn load(
135 mut self,
136 name: impl Into<String>,
137 props: HashMap<String, String>,
138 ) -> impl Future<Output = Result<Self::C>> + Send {
139 let name = name.into();
140
141 for (k, v) in props {
142 self.0.props.insert(k, v);
143 }
144
145 if let Some(uri) = self.0.props.remove(SQL_CATALOG_PROP_URI) {
146 self.0.uri = uri;
147 }
148 if let Some(warehouse_location) = self.0.props.remove(SQL_CATALOG_PROP_WAREHOUSE) {
149 self.0.warehouse_location = warehouse_location;
150 }
151
152 let mut valid_sql_bind_style = true;
153 if let Some(sql_bind_style) = self.0.props.remove(SQL_CATALOG_PROP_BIND_STYLE) {
154 if let Ok(sql_bind_style) = SqlBindStyle::from_str(&sql_bind_style) {
155 self.0.sql_bind_style = sql_bind_style;
156 } else {
157 valid_sql_bind_style = false;
158 }
159 }
160
161 async move {
162 if name.trim().is_empty() {
163 Err(Error::new(
164 ErrorKind::DataInvalid,
165 "Catalog name cannot be empty",
166 ))
167 } else if !valid_sql_bind_style {
168 Err(Error::new(
169 ErrorKind::DataInvalid,
170 format!(
171 "`{}` values are valid only if they're `{}` or `{}`",
172 SQL_CATALOG_PROP_BIND_STYLE,
173 SqlBindStyle::DollarNumeric,
174 SqlBindStyle::QMark
175 ),
176 ))
177 } else {
178 SqlCatalog::new(self.0).await
179 }
180 }
181 }
182}
183
184#[derive(Debug)]
193struct SqlCatalogConfig {
194 uri: String,
195 name: String,
196 warehouse_location: String,
197 sql_bind_style: SqlBindStyle,
198 props: HashMap<String, String>,
199}
200
201#[derive(Debug)]
202pub struct SqlCatalog {
204 name: String,
205 connection: AnyPool,
206 warehouse_location: String,
207 fileio: FileIO,
208 sql_bind_style: SqlBindStyle,
209}
210
211#[derive(Debug, PartialEq, strum::EnumString, strum::Display)]
212pub enum SqlBindStyle {
214 DollarNumeric,
216 QMark,
218}
219
220impl SqlCatalog {
221 async fn new(config: SqlCatalogConfig) -> Result<Self> {
223 let fileio = FileIO::from_path(&config.warehouse_location)?.build()?;
224 install_default_drivers();
225 let max_connections: u32 = config
226 .props
227 .get("pool.max-connections")
228 .map(|v| v.parse().unwrap())
229 .unwrap_or(MAX_CONNECTIONS);
230 let idle_timeout: u64 = config
231 .props
232 .get("pool.idle-timeout")
233 .map(|v| v.parse().unwrap())
234 .unwrap_or(IDLE_TIMEOUT);
235 let test_before_acquire: bool = config
236 .props
237 .get("pool.test-before-acquire")
238 .map(|v| v.parse().unwrap())
239 .unwrap_or(TEST_BEFORE_ACQUIRE);
240
241 let pool = AnyPoolOptions::new()
242 .max_connections(max_connections)
243 .idle_timeout(Duration::from_secs(idle_timeout))
244 .test_before_acquire(test_before_acquire)
245 .connect(&config.uri)
246 .await
247 .map_err(from_sqlx_error)?;
248
249 sqlx::query(&format!(
250 "CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_NAME} (
251 {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
252 {CATALOG_FIELD_TABLE_NAMESPACE} VARCHAR(255) NOT NULL,
253 {CATALOG_FIELD_TABLE_NAME} VARCHAR(255) NOT NULL,
254 {CATALOG_FIELD_METADATA_LOCATION_PROP} VARCHAR(1000),
255 {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} VARCHAR(1000),
256 {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5),
257 PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}))"
258 ))
259 .execute(&pool)
260 .await
261 .map_err(from_sqlx_error)?;
262
263 sqlx::query(&format!(
264 "CREATE TABLE IF NOT EXISTS {NAMESPACE_TABLE_NAME} (
265 {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
266 {NAMESPACE_FIELD_NAME} VARCHAR(255) NOT NULL,
267 {NAMESPACE_FIELD_PROPERTY_KEY} VARCHAR(255),
268 {NAMESPACE_FIELD_PROPERTY_VALUE} VARCHAR(1000),
269 PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}))"
270 ))
271 .execute(&pool)
272 .await
273 .map_err(from_sqlx_error)?;
274
275 Ok(SqlCatalog {
276 name: config.name.to_owned(),
277 connection: pool,
278 warehouse_location: config.warehouse_location,
279 fileio,
280 sql_bind_style: config.sql_bind_style,
281 })
282 }
283
284 fn replace_placeholders(&self, query: &str) -> String {
286 match self.sql_bind_style {
287 SqlBindStyle::DollarNumeric => {
288 let mut count = 1;
289 query
290 .chars()
291 .fold(String::with_capacity(query.len()), |mut acc, c| {
292 if c == '?' {
293 acc.push('$');
294 acc.push_str(&count.to_string());
295 count += 1;
296 } else {
297 acc.push(c);
298 }
299 acc
300 })
301 }
302 _ => query.to_owned(),
303 }
304 }
305
306 async fn fetch_rows(&self, query: &str, args: Vec<Option<&str>>) -> Result<Vec<AnyRow>> {
308 let query_with_placeholders = self.replace_placeholders(query);
309
310 let mut sqlx_query = sqlx::query(&query_with_placeholders);
311 for arg in args {
312 sqlx_query = sqlx_query.bind(arg);
313 }
314
315 sqlx_query
316 .fetch_all(&self.connection)
317 .await
318 .map_err(from_sqlx_error)
319 }
320
321 async fn execute(
323 &self,
324 query: &str,
325 args: Vec<Option<&str>>,
326 transaction: Option<&mut Transaction<'_, Any>>,
327 ) -> Result<AnyQueryResult> {
328 let query_with_placeholders = self.replace_placeholders(query);
329
330 let mut sqlx_query = sqlx::query(&query_with_placeholders);
331 for arg in args {
332 sqlx_query = sqlx_query.bind(arg);
333 }
334
335 match transaction {
336 Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error),
337 None => {
338 let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
339 let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error);
340 let _ = tx.commit().await.map_err(from_sqlx_error);
341 result
342 }
343 }
344 }
345}
346
347#[async_trait]
348impl Catalog for SqlCatalog {
349 async fn list_namespaces(
350 &self,
351 parent: Option<&NamespaceIdent>,
352 ) -> Result<Vec<NamespaceIdent>> {
353 let all_namespaces_stmt = format!(
355 "SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
356 FROM {CATALOG_TABLE_NAME}
357 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
358 UNION
359 SELECT {NAMESPACE_FIELD_NAME}
360 FROM {NAMESPACE_TABLE_NAME}
361 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
362 );
363
364 let namespace_rows = self
365 .fetch_rows(&all_namespaces_stmt, vec![
366 Some(&self.name),
367 Some(&self.name),
368 ])
369 .await?;
370
371 let mut namespaces = HashSet::<NamespaceIdent>::with_capacity(namespace_rows.len());
372
373 if let Some(parent) = parent {
374 if self.namespace_exists(parent).await? {
375 let parent_str = parent.join(".");
376
377 for row in namespace_rows.iter() {
378 let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
379 if nsp != parent_str && nsp.starts_with(&parent_str) {
381 namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?);
382 }
383 }
384
385 Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
386 } else {
387 no_such_namespace_err(parent)
388 }
389 } else {
390 for row in namespace_rows.iter() {
391 let nsp = row.try_get::<String, _>(0).map_err(from_sqlx_error)?;
392 let mut levels = nsp.split(".").collect::<Vec<&str>>();
393 if !levels.is_empty() {
394 let first_level = levels.drain(..1).collect::<Vec<&str>>();
395 namespaces.insert(NamespaceIdent::from_strs(first_level)?);
396 }
397 }
398
399 Ok(namespaces.into_iter().collect::<Vec<NamespaceIdent>>())
400 }
401 }
402
403 async fn create_namespace(
404 &self,
405 namespace: &NamespaceIdent,
406 properties: HashMap<String, String>,
407 ) -> Result<Namespace> {
408 let exists = self.namespace_exists(namespace).await?;
409
410 if exists {
411 return Err(Error::new(
412 iceberg::ErrorKind::Unexpected,
413 format!("Namespace {namespace:?} already exists"),
414 ));
415 }
416
417 let namespace_str = namespace.join(".");
418 let insert = format!(
419 "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
420 VALUES (?, ?, ?, ?)");
421 if !properties.is_empty() {
422 let mut insert_properties = properties.clone();
423 insert_properties.insert("exists".to_string(), "true".to_string());
424
425 let mut query_args = Vec::with_capacity(insert_properties.len() * 4);
426 let mut insert_stmt = insert.clone();
427 for (index, (key, value)) in insert_properties.iter().enumerate() {
428 query_args.extend_from_slice(&[
429 Some(self.name.as_str()),
430 Some(namespace_str.as_str()),
431 Some(key.as_str()),
432 Some(value.as_str()),
433 ]);
434 if index > 0 {
435 insert_stmt.push_str(", (?, ?, ?, ?)");
436 }
437 }
438
439 self.execute(&insert_stmt, query_args, None).await?;
440
441 Ok(Namespace::with_properties(
442 namespace.clone(),
443 insert_properties,
444 ))
445 } else {
446 self.execute(
448 &insert,
449 vec![
450 Some(&self.name),
451 Some(&namespace_str),
452 Some("exists"),
453 Some("true"),
454 ],
455 None,
456 )
457 .await?;
458 Ok(Namespace::with_properties(namespace.clone(), properties))
459 }
460 }
461
462 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
463 let exists = self.namespace_exists(namespace).await?;
464 if exists {
465 let namespace_props = self
466 .fetch_rows(
467 &format!(
468 "SELECT
469 {NAMESPACE_FIELD_NAME},
470 {NAMESPACE_FIELD_PROPERTY_KEY},
471 {NAMESPACE_FIELD_PROPERTY_VALUE}
472 FROM {NAMESPACE_TABLE_NAME}
473 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
474 AND {NAMESPACE_FIELD_NAME} = ?"
475 ),
476 vec![Some(&self.name), Some(&namespace.join("."))],
477 )
478 .await?;
479
480 let mut properties = HashMap::with_capacity(namespace_props.len());
481
482 for row in namespace_props {
483 let key = row
484 .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_KEY)
485 .map_err(from_sqlx_error)?;
486 let value = row
487 .try_get::<String, _>(NAMESPACE_FIELD_PROPERTY_VALUE)
488 .map_err(from_sqlx_error)?;
489
490 properties.insert(key, value);
491 }
492
493 Ok(Namespace::with_properties(namespace.clone(), properties))
494 } else {
495 no_such_namespace_err(namespace)
496 }
497 }
498
499 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
500 let namespace_str = namespace.join(".");
501
502 let table_namespaces = self
503 .fetch_rows(
504 &format!(
505 "SELECT 1 FROM {CATALOG_TABLE_NAME}
506 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
507 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
508 LIMIT 1"
509 ),
510 vec![Some(&self.name), Some(&namespace_str)],
511 )
512 .await?;
513
514 if !table_namespaces.is_empty() {
515 Ok(true)
516 } else {
517 let namespaces = self
518 .fetch_rows(
519 &format!(
520 "SELECT 1 FROM {NAMESPACE_TABLE_NAME}
521 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
522 AND {NAMESPACE_FIELD_NAME} = ?
523 LIMIT 1"
524 ),
525 vec![Some(&self.name), Some(&namespace_str)],
526 )
527 .await?;
528 if !namespaces.is_empty() {
529 Ok(true)
530 } else {
531 Ok(false)
532 }
533 }
534 }
535
536 async fn update_namespace(
537 &self,
538 namespace: &NamespaceIdent,
539 properties: HashMap<String, String>,
540 ) -> Result<()> {
541 let exists = self.namespace_exists(namespace).await?;
542 if exists {
543 let existing_properties = self.get_namespace(namespace).await?.properties().clone();
544 let namespace_str = namespace.join(".");
545
546 let mut updates = vec![];
547 let mut inserts = vec![];
548
549 for (key, value) in properties.iter() {
550 if existing_properties.contains_key(key) {
551 if existing_properties.get(key) != Some(value) {
552 updates.push((key, value));
553 }
554 } else {
555 inserts.push((key, value));
556 }
557 }
558
559 let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
560 let update_stmt = format!(
561 "UPDATE {NAMESPACE_TABLE_NAME} SET {NAMESPACE_FIELD_PROPERTY_VALUE} = ?
562 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
563 AND {NAMESPACE_FIELD_NAME} = ?
564 AND {NAMESPACE_FIELD_PROPERTY_KEY} = ?"
565 );
566
567 let insert_stmt = format!(
568 "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
569 VALUES (?, ?, ?, ?)"
570 );
571
572 for (key, value) in updates {
573 self.execute(
574 &update_stmt,
575 vec![
576 Some(value),
577 Some(&self.name),
578 Some(&namespace_str),
579 Some(key),
580 ],
581 Some(&mut tx),
582 )
583 .await?;
584 }
585
586 for (key, value) in inserts {
587 self.execute(
588 &insert_stmt,
589 vec![
590 Some(&self.name),
591 Some(&namespace_str),
592 Some(key),
593 Some(value),
594 ],
595 Some(&mut tx),
596 )
597 .await?;
598 }
599
600 let _ = tx.commit().await.map_err(from_sqlx_error)?;
601
602 Ok(())
603 } else {
604 no_such_namespace_err(namespace)
605 }
606 }
607
608 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
609 let exists = self.namespace_exists(namespace).await?;
610 if exists {
611 let tables = self.list_tables(namespace).await?;
613 if !tables.is_empty() {
614 return Err(Error::new(
615 iceberg::ErrorKind::Unexpected,
616 format!(
617 "Namespace {:?} is not empty. {} tables exist.",
618 namespace,
619 tables.len()
620 ),
621 ));
622 }
623
624 self.execute(
625 &format!(
626 "DELETE FROM {NAMESPACE_TABLE_NAME}
627 WHERE {NAMESPACE_FIELD_NAME} = ?
628 AND {CATALOG_FIELD_CATALOG_NAME} = ?"
629 ),
630 vec![Some(&namespace.join(".")), Some(&self.name)],
631 None,
632 )
633 .await?;
634
635 Ok(())
636 } else {
637 no_such_namespace_err(namespace)
638 }
639 }
640
641 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
642 let exists = self.namespace_exists(namespace).await?;
643 if exists {
644 let rows = self
645 .fetch_rows(
646 &format!(
647 "SELECT {CATALOG_FIELD_TABLE_NAME},
648 {CATALOG_FIELD_TABLE_NAMESPACE}
649 FROM {CATALOG_TABLE_NAME}
650 WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
651 AND {CATALOG_FIELD_CATALOG_NAME} = ?
652 AND (
653 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
654 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
655 )",
656 ),
657 vec![Some(&namespace.join(".")), Some(&self.name)],
658 )
659 .await?;
660
661 let mut tables = HashSet::<TableIdent>::with_capacity(rows.len());
662
663 for row in rows.iter() {
664 let tbl = row
665 .try_get::<String, _>(CATALOG_FIELD_TABLE_NAME)
666 .map_err(from_sqlx_error)?;
667 let ns_strs = row
668 .try_get::<String, _>(CATALOG_FIELD_TABLE_NAMESPACE)
669 .map_err(from_sqlx_error)?;
670 let ns = NamespaceIdent::from_strs(ns_strs.split("."))?;
671 tables.insert(TableIdent::new(ns, tbl));
672 }
673
674 Ok(tables.into_iter().collect::<Vec<TableIdent>>())
675 } else {
676 no_such_namespace_err(namespace)
677 }
678 }
679
680 async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
681 let namespace = identifier.namespace().join(".");
682 let table_name = identifier.name();
683 let table_counts = self
684 .fetch_rows(
685 &format!(
686 "SELECT 1
687 FROM {CATALOG_TABLE_NAME}
688 WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
689 AND {CATALOG_FIELD_CATALOG_NAME} = ?
690 AND {CATALOG_FIELD_TABLE_NAME} = ?
691 AND (
692 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
693 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
694 )"
695 ),
696 vec![Some(&namespace), Some(&self.name), Some(table_name)],
697 )
698 .await?;
699
700 if !table_counts.is_empty() {
701 Ok(true)
702 } else {
703 Ok(false)
704 }
705 }
706
707 async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
708 if !self.table_exists(identifier).await? {
709 return no_such_table_err(identifier);
710 }
711
712 self.execute(
713 &format!(
714 "DELETE FROM {CATALOG_TABLE_NAME}
715 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
716 AND {CATALOG_FIELD_TABLE_NAME} = ?
717 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
718 AND (
719 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
720 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
721 )"
722 ),
723 vec![
724 Some(&self.name),
725 Some(identifier.name()),
726 Some(&identifier.namespace().join(".")),
727 ],
728 None,
729 )
730 .await?;
731
732 Ok(())
733 }
734
735 async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
736 if !self.table_exists(identifier).await? {
737 return no_such_table_err(identifier);
738 }
739
740 let rows = self
741 .fetch_rows(
742 &format!(
743 "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP}
744 FROM {CATALOG_TABLE_NAME}
745 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
746 AND {CATALOG_FIELD_TABLE_NAME} = ?
747 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
748 AND (
749 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
750 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
751 )"
752 ),
753 vec![
754 Some(&self.name),
755 Some(identifier.name()),
756 Some(&identifier.namespace().join(".")),
757 ],
758 )
759 .await?;
760
761 if rows.is_empty() {
762 return no_such_table_err(identifier);
763 }
764
765 let row = &rows[0];
766 let tbl_metadata_location = row
767 .try_get::<String, _>(CATALOG_FIELD_METADATA_LOCATION_PROP)
768 .map_err(from_sqlx_error)?;
769
770 let metadata = TableMetadata::read_from(&self.fileio, &tbl_metadata_location).await?;
771
772 Ok(Table::builder()
773 .file_io(self.fileio.clone())
774 .identifier(identifier.clone())
775 .metadata_location(tbl_metadata_location)
776 .metadata(metadata)
777 .build()?)
778 }
779
780 async fn create_table(
781 &self,
782 namespace: &NamespaceIdent,
783 creation: TableCreation,
784 ) -> Result<Table> {
785 if !self.namespace_exists(namespace).await? {
786 return no_such_namespace_err(namespace);
787 }
788
789 let tbl_name = creation.name.clone();
790 let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone());
791
792 if self.table_exists(&tbl_ident).await? {
793 return table_already_exists_err(&tbl_ident);
794 }
795
796 let (tbl_creation, location) = match creation.location.clone() {
797 Some(location) => (creation, location),
798 None => {
799 let nsp_properties = self.get_namespace(namespace).await?.properties().clone();
802 let nsp_location = match nsp_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) {
803 Some(location) => location.clone(),
804 None => {
805 format!(
806 "{}/{}",
807 self.warehouse_location.clone(),
808 namespace.join("/")
809 )
810 }
811 };
812
813 let tbl_location = format!("{}/{}", nsp_location, tbl_ident.name());
814
815 (
816 TableCreation {
817 location: Some(tbl_location.clone()),
818 ..creation
819 },
820 tbl_location,
821 )
822 }
823 };
824
825 let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
826 .build()?
827 .metadata;
828 let tbl_metadata_location =
829 MetadataLocation::new_with_table_location(location.clone()).to_string();
830
831 tbl_metadata
832 .write_to(&self.fileio, &tbl_metadata_location)
833 .await?;
834
835 self.execute(&format!(
836 "INSERT INTO {CATALOG_TABLE_NAME}
837 ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
838 VALUES (?, ?, ?, ?, ?)
839 "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
840
841 Ok(Table::builder()
842 .file_io(self.fileio.clone())
843 .metadata_location(tbl_metadata_location)
844 .identifier(tbl_ident)
845 .metadata(tbl_metadata)
846 .build()?)
847 }
848
849 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
850 if src == dest {
851 return Ok(());
852 }
853
854 if !self.table_exists(src).await? {
855 return no_such_table_err(src);
856 }
857
858 if !self.namespace_exists(dest.namespace()).await? {
859 return no_such_namespace_err(dest.namespace());
860 }
861
862 if self.table_exists(dest).await? {
863 return table_already_exists_err(dest);
864 }
865
866 self.execute(
867 &format!(
868 "UPDATE {CATALOG_TABLE_NAME}
869 SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ?
870 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
871 AND {CATALOG_FIELD_TABLE_NAME} = ?
872 AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
873 AND (
874 {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
875 OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
876 )"
877 ),
878 vec![
879 Some(dest.name()),
880 Some(&dest.namespace().join(".")),
881 Some(&self.name),
882 Some(src.name()),
883 Some(&src.namespace().join(".")),
884 ],
885 None,
886 )
887 .await?;
888
889 Ok(())
890 }
891
892 async fn register_table(
893 &self,
894 table_ident: &TableIdent,
895 metadata_location: String,
896 ) -> Result<Table> {
897 if self.table_exists(table_ident).await? {
898 return table_already_exists_err(table_ident);
899 }
900
901 let metadata = TableMetadata::read_from(&self.fileio, &metadata_location).await?;
902
903 let namespace = table_ident.namespace();
904 let tbl_name = table_ident.name().to_string();
905
906 self.execute(&format!(
907 "INSERT INTO {CATALOG_TABLE_NAME}
908 ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
909 VALUES (?, ?, ?, ?, ?)
910 "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name), Some(&metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
911
912 Ok(Table::builder()
913 .identifier(table_ident.clone())
914 .metadata_location(metadata_location)
915 .metadata(metadata)
916 .file_io(self.fileio.clone())
917 .build()?)
918 }
919
920 async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
921 Err(Error::new(
922 ErrorKind::FeatureUnsupported,
923 "Updating a table is not supported yet",
924 ))
925 }
926}
927
928#[cfg(test)]
929mod tests {
930 use std::collections::{HashMap, HashSet};
931 use std::hash::Hash;
932
933 use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
934 use iceberg::table::Table;
935 use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent};
936 use itertools::Itertools;
937 use regex::Regex;
938 use sqlx::migrate::MigrateDatabase;
939 use tempfile::TempDir;
940
941 use crate::catalog::{
942 NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, SQL_CATALOG_PROP_URI,
943 SQL_CATALOG_PROP_WAREHOUSE,
944 };
945 use crate::{SqlBindStyle, SqlCatalogBuilder};
946
947 const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
948
949 fn temp_path() -> String {
950 let temp_dir = TempDir::new().unwrap();
951 temp_dir.path().to_str().unwrap().to_string()
952 }
953
954 fn to_set<T: std::cmp::Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
955 HashSet::from_iter(vec)
956 }
957
958 fn default_properties() -> HashMap<String, String> {
959 HashMap::from([("exists".to_string(), "true".to_string())])
960 }
961
962 async fn new_sql_catalog(warehouse_location: String) -> impl Catalog {
963 let sql_lite_uri = format!("sqlite:{}", temp_path());
964 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
965
966 let props = HashMap::from_iter([
967 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.to_string()),
968 (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
969 (
970 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
971 SqlBindStyle::DollarNumeric.to_string(),
972 ),
973 ]);
974 SqlCatalogBuilder::default()
975 .load("iceberg", props)
976 .await
977 .unwrap()
978 }
979
980 async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
981 let _ = catalog
982 .create_namespace(namespace_ident, HashMap::new())
983 .await
984 .unwrap();
985 }
986
987 async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
988 for namespace_ident in namespace_idents {
989 let _ = create_namespace(catalog, namespace_ident).await;
990 }
991 }
992
993 fn simple_table_schema() -> Schema {
994 Schema::builder()
995 .with_fields(vec![
996 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
997 ])
998 .build()
999 .unwrap()
1000 }
1001
1002 async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
1003 let _ = catalog
1004 .create_table(
1005 &table_ident.namespace,
1006 TableCreation::builder()
1007 .name(table_ident.name().into())
1008 .schema(simple_table_schema())
1009 .location(temp_path())
1010 .build(),
1011 )
1012 .await
1013 .unwrap();
1014 }
1015
1016 async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
1017 for table_ident in table_idents {
1018 create_table(catalog, table_ident).await;
1019 }
1020 }
1021
1022 fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
1023 assert_eq!(table.identifier(), expected_table_ident);
1024
1025 let metadata = table.metadata();
1026
1027 assert_eq!(metadata.current_schema().as_ref(), expected_schema);
1028
1029 let expected_partition_spec = PartitionSpec::builder(expected_schema.clone())
1030 .with_spec_id(0)
1031 .build()
1032 .unwrap();
1033
1034 assert_eq!(
1035 metadata
1036 .partition_specs_iter()
1037 .map(|p| p.as_ref())
1038 .collect_vec(),
1039 vec![&expected_partition_spec]
1040 );
1041
1042 let expected_sorted_order = SortOrder::builder()
1043 .with_order_id(0)
1044 .with_fields(vec![])
1045 .build(expected_schema)
1046 .unwrap();
1047
1048 assert_eq!(
1049 metadata
1050 .sort_orders_iter()
1051 .map(|s| s.as_ref())
1052 .collect_vec(),
1053 vec![&expected_sorted_order]
1054 );
1055
1056 assert_eq!(metadata.properties(), &HashMap::new());
1057
1058 assert!(!table.readonly());
1059 }
1060
1061 fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
1062 let actual = table.metadata_location().unwrap().to_string();
1063 let regex = Regex::new(regex_str).unwrap();
1064 assert!(regex.is_match(&actual))
1065 }
1066
1067 #[tokio::test]
1068 async fn test_initialized() {
1069 let warehouse_loc = temp_path();
1070 new_sql_catalog(warehouse_loc.clone()).await;
1071 new_sql_catalog(warehouse_loc.clone()).await;
1073 new_sql_catalog(warehouse_loc.clone()).await;
1074 }
1075
1076 #[tokio::test]
1077 async fn test_builder_method() {
1078 let sql_lite_uri = format!("sqlite:{}", temp_path());
1079 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1080 let warehouse_location = temp_path();
1081
1082 let catalog = SqlCatalogBuilder::default()
1083 .uri(sql_lite_uri.to_string())
1084 .warehouse_location(warehouse_location.clone())
1085 .sql_bind_style(SqlBindStyle::QMark)
1086 .load("iceberg", HashMap::default())
1087 .await;
1088 assert!(catalog.is_ok());
1089
1090 let catalog = catalog.unwrap();
1091 assert!(catalog.warehouse_location == warehouse_location);
1092 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1093 }
1094
1095 #[tokio::test]
1098 async fn test_builder_props_non_existent_path_fails() {
1099 let sql_lite_uri = format!("sqlite:{}", temp_path());
1100 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1101 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1102 let warehouse_location = temp_path();
1103
1104 let catalog = SqlCatalogBuilder::default()
1105 .uri(sql_lite_uri)
1106 .warehouse_location(warehouse_location)
1107 .load(
1108 "iceberg",
1109 HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)]),
1110 )
1111 .await;
1112 assert!(catalog.is_err());
1113 }
1114
1115 #[tokio::test]
1119 async fn test_builder_props_set_valid_uri() {
1120 let sql_lite_uri = format!("sqlite:{}", temp_path());
1121 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1122 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1123 let warehouse_location = temp_path();
1124
1125 let catalog = SqlCatalogBuilder::default()
1126 .uri(sql_lite_uri2)
1127 .warehouse_location(warehouse_location)
1128 .load(
1129 "iceberg",
1130 HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone())]),
1131 )
1132 .await;
1133 assert!(catalog.is_ok());
1134 }
1135
1136 #[tokio::test]
1138 async fn test_builder_props_take_precedence() {
1139 let sql_lite_uri = format!("sqlite:{}", temp_path());
1140 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1141 let warehouse_location = temp_path();
1142 let warehouse_location2 = temp_path();
1143
1144 let catalog = SqlCatalogBuilder::default()
1145 .warehouse_location(warehouse_location2)
1146 .sql_bind_style(SqlBindStyle::DollarNumeric)
1147 .load(
1148 "iceberg",
1149 HashMap::from_iter([
1150 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1151 (
1152 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1153 warehouse_location.clone(),
1154 ),
1155 (
1156 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1157 SqlBindStyle::QMark.to_string(),
1158 ),
1159 ]),
1160 )
1161 .await;
1162
1163 assert!(catalog.is_ok());
1164
1165 let catalog = catalog.unwrap();
1166 assert!(catalog.warehouse_location == warehouse_location);
1167 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1168 }
1169
1170 #[tokio::test]
1172 async fn test_builder_props_take_precedence_props() {
1173 let sql_lite_uri = format!("sqlite:{}", temp_path());
1174 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1175 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1176 let warehouse_location = temp_path();
1177 let warehouse_location2 = temp_path();
1178
1179 let props = HashMap::from_iter([
1180 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1181 (
1182 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1183 warehouse_location.clone(),
1184 ),
1185 (
1186 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1187 SqlBindStyle::QMark.to_string(),
1188 ),
1189 ]);
1190 let props2 = HashMap::from_iter([
1191 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2.clone()),
1192 (
1193 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1194 warehouse_location2.clone(),
1195 ),
1196 (
1197 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1198 SqlBindStyle::DollarNumeric.to_string(),
1199 ),
1200 ]);
1201
1202 let catalog = SqlCatalogBuilder::default()
1203 .props(props2)
1204 .load("iceberg", props)
1205 .await;
1206
1207 assert!(catalog.is_ok());
1208
1209 let catalog = catalog.unwrap();
1210 assert!(catalog.warehouse_location == warehouse_location);
1211 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1212 }
1213
1214 #[tokio::test]
1216 async fn test_builder_props_take_precedence_prop() {
1217 let sql_lite_uri = format!("sqlite:{}", temp_path());
1218 let sql_lite_uri2 = format!("sqlite:{}", temp_path());
1219 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1220 let warehouse_location = temp_path();
1221 let warehouse_location2 = temp_path();
1222
1223 let props = HashMap::from_iter([
1224 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()),
1225 (
1226 SQL_CATALOG_PROP_WAREHOUSE.to_string(),
1227 warehouse_location.clone(),
1228 ),
1229 (
1230 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1231 SqlBindStyle::QMark.to_string(),
1232 ),
1233 ]);
1234
1235 let catalog = SqlCatalogBuilder::default()
1236 .prop(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)
1237 .prop(SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location2)
1238 .prop(
1239 SQL_CATALOG_PROP_BIND_STYLE.to_string(),
1240 SqlBindStyle::DollarNumeric.to_string(),
1241 )
1242 .load("iceberg", props)
1243 .await;
1244
1245 assert!(catalog.is_ok());
1246
1247 let catalog = catalog.unwrap();
1248 assert!(catalog.warehouse_location == warehouse_location);
1249 assert!(catalog.sql_bind_style == SqlBindStyle::QMark);
1250 }
1251
1252 #[tokio::test]
1254 async fn test_builder_props_invalid_bind_style_fails() {
1255 let sql_lite_uri = format!("sqlite:{}", temp_path());
1256 sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
1257 let warehouse_location = temp_path();
1258
1259 let catalog = SqlCatalogBuilder::default()
1260 .load(
1261 "iceberg",
1262 HashMap::from_iter([
1263 (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri),
1264 (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location),
1265 (SQL_CATALOG_PROP_BIND_STYLE.to_string(), "AAA".to_string()),
1266 ]),
1267 )
1268 .await;
1269
1270 assert!(catalog.is_err());
1271 }
1272
1273 #[tokio::test]
1274 async fn test_list_namespaces_returns_empty_vector() {
1275 let warehouse_loc = temp_path();
1276 let catalog = new_sql_catalog(warehouse_loc).await;
1277
1278 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
1279 }
1280
1281 #[tokio::test]
1282 async fn test_list_namespaces_returns_multiple_namespaces() {
1283 let warehouse_loc = temp_path();
1284 let catalog = new_sql_catalog(warehouse_loc).await;
1285 let namespace_ident_1 = NamespaceIdent::new("a".into());
1286 let namespace_ident_2 = NamespaceIdent::new("b".into());
1287 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1288
1289 assert_eq!(
1290 to_set(catalog.list_namespaces(None).await.unwrap()),
1291 to_set(vec![namespace_ident_1, namespace_ident_2])
1292 );
1293 }
1294
1295 #[tokio::test]
1296 async fn test_list_namespaces_returns_only_top_level_namespaces() {
1297 let warehouse_loc = temp_path();
1298 let catalog = new_sql_catalog(warehouse_loc).await;
1299 let namespace_ident_1 = NamespaceIdent::new("a".into());
1300 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1301 let namespace_ident_3 = NamespaceIdent::new("b".into());
1302 create_namespaces(&catalog, &vec![
1303 &namespace_ident_1,
1304 &namespace_ident_2,
1305 &namespace_ident_3,
1306 ])
1307 .await;
1308
1309 assert_eq!(
1310 to_set(catalog.list_namespaces(None).await.unwrap()),
1311 to_set(vec![namespace_ident_1, namespace_ident_3])
1312 );
1313 }
1314
1315 #[tokio::test]
1316 async fn test_list_namespaces_returns_no_namespaces_under_parent() {
1317 let warehouse_loc = temp_path();
1318 let catalog = new_sql_catalog(warehouse_loc).await;
1319 let namespace_ident_1 = NamespaceIdent::new("a".into());
1320 let namespace_ident_2 = NamespaceIdent::new("b".into());
1321 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1322
1323 assert_eq!(
1324 catalog
1325 .list_namespaces(Some(&namespace_ident_1))
1326 .await
1327 .unwrap(),
1328 vec![]
1329 );
1330 }
1331
1332 #[tokio::test]
1333 async fn test_list_namespaces_returns_namespace_under_parent() {
1334 let warehouse_loc = temp_path();
1335 let catalog = new_sql_catalog(warehouse_loc).await;
1336 let namespace_ident_1 = NamespaceIdent::new("a".into());
1337 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1338 let namespace_ident_3 = NamespaceIdent::new("c".into());
1339 create_namespaces(&catalog, &vec![
1340 &namespace_ident_1,
1341 &namespace_ident_2,
1342 &namespace_ident_3,
1343 ])
1344 .await;
1345
1346 assert_eq!(
1347 to_set(catalog.list_namespaces(None).await.unwrap()),
1348 to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
1349 );
1350
1351 assert_eq!(
1352 catalog
1353 .list_namespaces(Some(&namespace_ident_1))
1354 .await
1355 .unwrap(),
1356 vec![NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()]
1357 );
1358 }
1359
1360 #[tokio::test]
1361 async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
1362 let warehouse_loc = temp_path();
1363 let catalog = new_sql_catalog(warehouse_loc).await;
1364 let namespace_ident_1 = NamespaceIdent::new("a".to_string());
1365 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
1366 let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1367 let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
1368 let namespace_ident_5 = NamespaceIdent::new("b".into());
1369 create_namespaces(&catalog, &vec![
1370 &namespace_ident_1,
1371 &namespace_ident_2,
1372 &namespace_ident_3,
1373 &namespace_ident_4,
1374 &namespace_ident_5,
1375 ])
1376 .await;
1377
1378 assert_eq!(
1379 to_set(
1380 catalog
1381 .list_namespaces(Some(&namespace_ident_1))
1382 .await
1383 .unwrap()
1384 ),
1385 to_set(vec![
1386 NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(),
1387 NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(),
1388 NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(),
1389 ])
1390 );
1391 }
1392
1393 #[tokio::test]
1394 async fn test_namespace_exists_returns_false() {
1395 let warehouse_loc = temp_path();
1396 let catalog = new_sql_catalog(warehouse_loc).await;
1397 let namespace_ident = NamespaceIdent::new("a".into());
1398 create_namespace(&catalog, &namespace_ident).await;
1399
1400 assert!(
1401 !catalog
1402 .namespace_exists(&NamespaceIdent::new("b".into()))
1403 .await
1404 .unwrap()
1405 );
1406 }
1407
1408 #[tokio::test]
1409 async fn test_namespace_exists_returns_true() {
1410 let warehouse_loc = temp_path();
1411 let catalog = new_sql_catalog(warehouse_loc).await;
1412 let namespace_ident = NamespaceIdent::new("a".into());
1413 create_namespace(&catalog, &namespace_ident).await;
1414
1415 assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
1416 }
1417
1418 #[tokio::test]
1419 async fn test_create_namespace_with_properties() {
1420 let warehouse_loc = temp_path();
1421 let catalog = new_sql_catalog(warehouse_loc).await;
1422 let namespace_ident = NamespaceIdent::new("abc".into());
1423
1424 let mut properties = default_properties();
1425 properties.insert("k".into(), "v".into());
1426
1427 assert_eq!(
1428 catalog
1429 .create_namespace(&namespace_ident, properties.clone())
1430 .await
1431 .unwrap(),
1432 Namespace::with_properties(namespace_ident.clone(), properties.clone())
1433 );
1434
1435 assert_eq!(
1436 catalog.get_namespace(&namespace_ident).await.unwrap(),
1437 Namespace::with_properties(namespace_ident, properties)
1438 );
1439 }
1440
1441 #[tokio::test]
1442 async fn test_create_namespace_throws_error_if_namespace_already_exists() {
1443 let warehouse_loc = temp_path();
1444 let catalog = new_sql_catalog(warehouse_loc).await;
1445 let namespace_ident = NamespaceIdent::new("a".into());
1446 create_namespace(&catalog, &namespace_ident).await;
1447
1448 assert_eq!(
1449 catalog
1450 .create_namespace(&namespace_ident, HashMap::new())
1451 .await
1452 .unwrap_err()
1453 .to_string(),
1454 format!(
1455 "Unexpected => Namespace {:?} already exists",
1456 &namespace_ident
1457 )
1458 );
1459
1460 assert_eq!(
1461 catalog.get_namespace(&namespace_ident).await.unwrap(),
1462 Namespace::with_properties(namespace_ident, default_properties())
1463 );
1464 }
1465
1466 #[tokio::test]
1467 async fn test_create_nested_namespace() {
1468 let warehouse_loc = temp_path();
1469 let catalog = new_sql_catalog(warehouse_loc).await;
1470 let parent_namespace_ident = NamespaceIdent::new("a".into());
1471 create_namespace(&catalog, &parent_namespace_ident).await;
1472
1473 let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1474
1475 assert_eq!(
1476 catalog
1477 .create_namespace(&child_namespace_ident, HashMap::new())
1478 .await
1479 .unwrap(),
1480 Namespace::new(child_namespace_ident.clone())
1481 );
1482
1483 assert_eq!(
1484 catalog.get_namespace(&child_namespace_ident).await.unwrap(),
1485 Namespace::with_properties(child_namespace_ident, default_properties())
1486 );
1487 }
1488
1489 #[tokio::test]
1490 async fn test_create_deeply_nested_namespace() {
1491 let warehouse_loc = temp_path();
1492 let catalog = new_sql_catalog(warehouse_loc).await;
1493 let namespace_ident_a = NamespaceIdent::new("a".into());
1494 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1495 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1496
1497 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1498
1499 assert_eq!(
1500 catalog
1501 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
1502 .await
1503 .unwrap(),
1504 Namespace::new(namespace_ident_a_b_c.clone())
1505 );
1506
1507 assert_eq!(
1508 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
1509 Namespace::with_properties(namespace_ident_a_b_c, default_properties())
1510 );
1511 }
1512
1513 #[tokio::test]
1514 async fn test_update_namespace_noop() {
1515 let warehouse_loc = temp_path();
1516 let catalog = new_sql_catalog(warehouse_loc).await;
1517 let namespace_ident = NamespaceIdent::new("a".into());
1518 create_namespace(&catalog, &namespace_ident).await;
1519
1520 catalog
1521 .update_namespace(&namespace_ident, HashMap::new())
1522 .await
1523 .unwrap();
1524
1525 assert_eq!(
1526 *catalog
1527 .get_namespace(&namespace_ident)
1528 .await
1529 .unwrap()
1530 .properties(),
1531 HashMap::from_iter([("exists".to_string(), "true".to_string())])
1532 )
1533 }
1534
1535 #[tokio::test]
1536 async fn test_update_namespace() {
1537 let warehouse_loc = temp_path();
1538 let catalog = new_sql_catalog(warehouse_loc).await;
1539 let namespace_ident = NamespaceIdent::new("a".into());
1540 create_namespace(&catalog, &namespace_ident).await;
1541
1542 let mut props = HashMap::from_iter([
1543 ("prop1".to_string(), "val1".to_string()),
1544 ("prop2".into(), "val2".into()),
1545 ]);
1546
1547 catalog
1548 .update_namespace(&namespace_ident, props.clone())
1549 .await
1550 .unwrap();
1551
1552 props.insert("exists".into(), "true".into());
1553
1554 assert_eq!(
1555 *catalog
1556 .get_namespace(&namespace_ident)
1557 .await
1558 .unwrap()
1559 .properties(),
1560 props
1561 )
1562 }
1563
1564 #[tokio::test]
1565 async fn test_update_nested_namespace() {
1566 let warehouse_loc = temp_path();
1567 let catalog = new_sql_catalog(warehouse_loc).await;
1568 let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1569 create_namespace(&catalog, &namespace_ident).await;
1570
1571 let mut props = HashMap::from_iter([
1572 ("prop1".to_string(), "val1".to_string()),
1573 ("prop2".into(), "val2".into()),
1574 ]);
1575
1576 catalog
1577 .update_namespace(&namespace_ident, props.clone())
1578 .await
1579 .unwrap();
1580
1581 props.insert("exists".into(), "true".into());
1582
1583 assert_eq!(
1584 *catalog
1585 .get_namespace(&namespace_ident)
1586 .await
1587 .unwrap()
1588 .properties(),
1589 props
1590 )
1591 }
1592
1593 #[tokio::test]
1594 async fn test_update_namespace_errors_if_namespace_doesnt_exist() {
1595 let warehouse_loc = temp_path();
1596 let catalog = new_sql_catalog(warehouse_loc).await;
1597 let namespace_ident = NamespaceIdent::new("a".into());
1598
1599 let props = HashMap::from_iter([
1600 ("prop1".to_string(), "val1".to_string()),
1601 ("prop2".into(), "val2".into()),
1602 ]);
1603
1604 let err = catalog
1605 .update_namespace(&namespace_ident, props)
1606 .await
1607 .unwrap_err();
1608
1609 assert_eq!(
1610 err.message(),
1611 format!("No such namespace: {namespace_ident:?}")
1612 );
1613 }
1614
1615 #[tokio::test]
1616 async fn test_update_namespace_errors_if_nested_namespace_doesnt_exist() {
1617 let warehouse_loc = temp_path();
1618 let catalog = new_sql_catalog(warehouse_loc).await;
1619 let namespace_ident = NamespaceIdent::from_strs(["a", "b"]).unwrap();
1620
1621 let props = HashMap::from_iter([
1622 ("prop1".to_string(), "val1".to_string()),
1623 ("prop2".into(), "val2".into()),
1624 ]);
1625
1626 let err = catalog
1627 .update_namespace(&namespace_ident, props)
1628 .await
1629 .unwrap_err();
1630
1631 assert_eq!(
1632 err.message(),
1633 format!("No such namespace: {namespace_ident:?}")
1634 );
1635 }
1636
1637 #[tokio::test]
1638 async fn test_drop_namespace() {
1639 let warehouse_loc = temp_path();
1640 let catalog = new_sql_catalog(warehouse_loc).await;
1641 let namespace_ident = NamespaceIdent::new("abc".into());
1642 create_namespace(&catalog, &namespace_ident).await;
1643
1644 catalog.drop_namespace(&namespace_ident).await.unwrap();
1645
1646 assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
1647 }
1648
1649 #[tokio::test]
1650 async fn test_drop_nested_namespace() {
1651 let warehouse_loc = temp_path();
1652 let catalog = new_sql_catalog(warehouse_loc).await;
1653 let namespace_ident_a = NamespaceIdent::new("a".into());
1654 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1655 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1656
1657 catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1658
1659 assert!(
1660 !catalog
1661 .namespace_exists(&namespace_ident_a_b)
1662 .await
1663 .unwrap()
1664 );
1665
1666 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1667 }
1668
1669 #[tokio::test]
1670 async fn test_drop_deeply_nested_namespace() {
1671 let warehouse_loc = temp_path();
1672 let catalog = new_sql_catalog(warehouse_loc).await;
1673 let namespace_ident_a = NamespaceIdent::new("a".into());
1674 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1675 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1676 create_namespaces(&catalog, &vec![
1677 &namespace_ident_a,
1678 &namespace_ident_a_b,
1679 &namespace_ident_a_b_c,
1680 ])
1681 .await;
1682
1683 catalog
1684 .drop_namespace(&namespace_ident_a_b_c)
1685 .await
1686 .unwrap();
1687
1688 assert!(
1689 !catalog
1690 .namespace_exists(&namespace_ident_a_b_c)
1691 .await
1692 .unwrap()
1693 );
1694
1695 assert!(
1696 catalog
1697 .namespace_exists(&namespace_ident_a_b)
1698 .await
1699 .unwrap()
1700 );
1701
1702 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1703 }
1704
1705 #[tokio::test]
1706 async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1707 let warehouse_loc = temp_path();
1708 let catalog = new_sql_catalog(warehouse_loc).await;
1709
1710 let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1711 assert_eq!(
1712 catalog
1713 .drop_namespace(&non_existent_namespace_ident)
1714 .await
1715 .unwrap_err()
1716 .to_string(),
1717 format!("Unexpected => No such namespace: {non_existent_namespace_ident:?}")
1718 )
1719 }
1720
1721 #[tokio::test]
1722 async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1723 let warehouse_loc = temp_path();
1724 let catalog = new_sql_catalog(warehouse_loc).await;
1725 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1726
1727 let non_existent_namespace_ident =
1728 NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1729 assert_eq!(
1730 catalog
1731 .drop_namespace(&non_existent_namespace_ident)
1732 .await
1733 .unwrap_err()
1734 .to_string(),
1735 format!("Unexpected => No such namespace: {non_existent_namespace_ident:?}")
1736 )
1737 }
1738
1739 #[tokio::test]
1740 async fn test_dropping_a_namespace_does_not_drop_namespaces_nested_under_that_one() {
1741 let warehouse_loc = temp_path();
1742 let catalog = new_sql_catalog(warehouse_loc).await;
1743 let namespace_ident_a = NamespaceIdent::new("a".into());
1744 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1745 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1746
1747 catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1748
1749 assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1750
1751 assert!(
1752 catalog
1753 .namespace_exists(&namespace_ident_a_b)
1754 .await
1755 .unwrap()
1756 );
1757 }
1758
1759 #[tokio::test]
1760 async fn test_list_tables_returns_empty_vector() {
1761 let warehouse_loc = temp_path();
1762 let catalog = new_sql_catalog(warehouse_loc).await;
1763 let namespace_ident = NamespaceIdent::new("a".into());
1764 create_namespace(&catalog, &namespace_ident).await;
1765
1766 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1767 }
1768
1769 #[tokio::test]
1770 async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1771 let warehouse_loc = temp_path();
1772 let catalog = new_sql_catalog(warehouse_loc).await;
1773
1774 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1775
1776 assert_eq!(
1777 catalog
1778 .list_tables(&non_existent_namespace_ident)
1779 .await
1780 .unwrap_err()
1781 .to_string(),
1782 format!("Unexpected => No such namespace: {non_existent_namespace_ident:?}"),
1783 );
1784 }
1785
1786 #[tokio::test]
1787 async fn test_create_table_with_location() {
1788 let warehouse_loc = temp_path();
1789 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
1790 let namespace_ident = NamespaceIdent::new("a".into());
1791 create_namespace(&catalog, &namespace_ident).await;
1792
1793 let table_name = "abc";
1794 let location = warehouse_loc.clone();
1795 let table_creation = TableCreation::builder()
1796 .name(table_name.into())
1797 .location(location.clone())
1798 .schema(simple_table_schema())
1799 .build();
1800
1801 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1802
1803 assert_table_eq(
1804 &catalog
1805 .create_table(&namespace_ident, table_creation)
1806 .await
1807 .unwrap(),
1808 &expected_table_ident,
1809 &simple_table_schema(),
1810 );
1811
1812 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1813
1814 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1815
1816 assert!(
1817 table
1818 .metadata_location()
1819 .unwrap()
1820 .to_string()
1821 .starts_with(&location)
1822 )
1823 }
1824
1825 #[tokio::test]
1826 async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1827 let warehouse_loc = temp_path();
1828 let catalog = new_sql_catalog(warehouse_loc).await;
1829
1830 let namespace_ident = NamespaceIdent::new("a".into());
1831 let mut namespace_properties = HashMap::new();
1832 let namespace_location = temp_path();
1833 namespace_properties.insert(
1834 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1835 namespace_location.to_string(),
1836 );
1837 catalog
1838 .create_namespace(&namespace_ident, namespace_properties)
1839 .await
1840 .unwrap();
1841
1842 let table_name = "tbl1";
1843 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1844 let expected_table_metadata_location_regex =
1845 format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1846
1847 let table = catalog
1848 .create_table(
1849 &namespace_ident,
1850 TableCreation::builder()
1851 .name(table_name.into())
1852 .schema(simple_table_schema())
1853 .build(),
1855 )
1856 .await
1857 .unwrap();
1858 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1859 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1860
1861 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1862 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1863 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1864 }
1865
1866 #[tokio::test]
1867 async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1868 {
1869 let warehouse_loc = temp_path();
1870 let catalog = new_sql_catalog(warehouse_loc).await;
1871
1872 let namespace_ident = NamespaceIdent::new("a".into());
1873 let mut namespace_properties = HashMap::new();
1874 let namespace_location = temp_path();
1875 namespace_properties.insert(
1876 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1877 namespace_location.to_string(),
1878 );
1879 catalog
1880 .create_namespace(&namespace_ident, namespace_properties)
1881 .await
1882 .unwrap();
1883
1884 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1885 let mut nested_namespace_properties = HashMap::new();
1886 let nested_namespace_location = temp_path();
1887 nested_namespace_properties.insert(
1888 NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
1889 nested_namespace_location.to_string(),
1890 );
1891 catalog
1892 .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1893 .await
1894 .unwrap();
1895
1896 let table_name = "tbl1";
1897 let expected_table_ident =
1898 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1899 let expected_table_metadata_location_regex = format!(
1900 "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1901 );
1902
1903 let table = catalog
1904 .create_table(
1905 &nested_namespace_ident,
1906 TableCreation::builder()
1907 .name(table_name.into())
1908 .schema(simple_table_schema())
1909 .build(),
1911 )
1912 .await
1913 .unwrap();
1914 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1915 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1916
1917 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1918 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1919 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1920 }
1921
1922 #[tokio::test]
1923 async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1924 {
1925 let warehouse_loc = temp_path();
1926 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
1927
1928 let namespace_ident = NamespaceIdent::new("a".into());
1929 let namespace_properties = HashMap::new();
1931 catalog
1932 .create_namespace(&namespace_ident, namespace_properties)
1933 .await
1934 .unwrap();
1935
1936 let table_name = "tbl1";
1937 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1938 let expected_table_metadata_location_regex =
1939 format!("^{warehouse_loc}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1940
1941 let table = catalog
1942 .create_table(
1943 &namespace_ident,
1944 TableCreation::builder()
1945 .name(table_name.into())
1946 .schema(simple_table_schema())
1947 .build(),
1949 )
1950 .await
1951 .unwrap();
1952 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1953 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1954
1955 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1956 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1957 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1958 }
1959
1960 #[tokio::test]
1961 async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1962 {
1963 let warehouse_loc = temp_path();
1964 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
1965
1966 let namespace_ident = NamespaceIdent::new("a".into());
1967 create_namespace(&catalog, &namespace_ident).await;
1968
1969 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1970 create_namespace(&catalog, &nested_namespace_ident).await;
1971
1972 let table_name = "tbl1";
1973 let expected_table_ident =
1974 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1975 let expected_table_metadata_location_regex =
1976 format!("^{warehouse_loc}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1977
1978 let table = catalog
1979 .create_table(
1980 &nested_namespace_ident,
1981 TableCreation::builder()
1982 .name(table_name.into())
1983 .schema(simple_table_schema())
1984 .build(),
1986 )
1987 .await
1988 .unwrap();
1989 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1990 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1991
1992 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1993 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1994 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1995 }
1996
1997 #[tokio::test]
1998 async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1999 let warehouse_loc = temp_path();
2000 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
2001 let namespace_ident = NamespaceIdent::new("a".into());
2002 create_namespace(&catalog, &namespace_ident).await;
2003 let table_name = "tbl1";
2004 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2005 create_table(&catalog, &table_ident).await;
2006
2007 let tmp_dir = TempDir::new().unwrap();
2008 let location = tmp_dir.path().to_str().unwrap().to_string();
2009
2010 assert_eq!(
2011 catalog
2012 .create_table(
2013 &namespace_ident,
2014 TableCreation::builder()
2015 .name(table_name.into())
2016 .schema(simple_table_schema())
2017 .location(location)
2018 .build()
2019 )
2020 .await
2021 .unwrap_err()
2022 .to_string(),
2023 format!("Unexpected => Table {:?} already exists.", &table_ident)
2024 );
2025 }
2026
2027 #[tokio::test]
2028 async fn test_rename_table_in_same_namespace() {
2029 let warehouse_loc = temp_path();
2030 let catalog = new_sql_catalog(warehouse_loc).await;
2031 let namespace_ident = NamespaceIdent::new("n1".into());
2032 create_namespace(&catalog, &namespace_ident).await;
2033 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2034 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
2035 create_table(&catalog, &src_table_ident).await;
2036
2037 catalog
2038 .rename_table(&src_table_ident, &dst_table_ident)
2039 .await
2040 .unwrap();
2041
2042 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
2043 dst_table_ident
2044 ],);
2045 }
2046
2047 #[tokio::test]
2048 async fn test_rename_table_across_namespaces() {
2049 let warehouse_loc = temp_path();
2050 let catalog = new_sql_catalog(warehouse_loc).await;
2051 let src_namespace_ident = NamespaceIdent::new("a".into());
2052 let dst_namespace_ident = NamespaceIdent::new("b".into());
2053 create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
2054 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
2055 let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
2056 create_table(&catalog, &src_table_ident).await;
2057
2058 catalog
2059 .rename_table(&src_table_ident, &dst_table_ident)
2060 .await
2061 .unwrap();
2062
2063 assert_eq!(
2064 catalog.list_tables(&src_namespace_ident).await.unwrap(),
2065 vec![],
2066 );
2067
2068 assert_eq!(
2069 catalog.list_tables(&dst_namespace_ident).await.unwrap(),
2070 vec![dst_table_ident],
2071 );
2072 }
2073
2074 #[tokio::test]
2075 async fn test_rename_table_src_table_is_same_as_dst_table() {
2076 let warehouse_loc = temp_path();
2077 let catalog = new_sql_catalog(warehouse_loc).await;
2078 let namespace_ident = NamespaceIdent::new("n1".into());
2079 create_namespace(&catalog, &namespace_ident).await;
2080 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
2081 create_table(&catalog, &table_ident).await;
2082
2083 catalog
2084 .rename_table(&table_ident, &table_ident)
2085 .await
2086 .unwrap();
2087
2088 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
2089 table_ident
2090 ],);
2091 }
2092
2093 #[tokio::test]
2094 async fn test_rename_table_across_nested_namespaces() {
2095 let warehouse_loc = temp_path();
2096 let catalog = new_sql_catalog(warehouse_loc).await;
2097 let namespace_ident_a = NamespaceIdent::new("a".into());
2098 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
2099 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
2100 create_namespaces(&catalog, &vec![
2101 &namespace_ident_a,
2102 &namespace_ident_a_b,
2103 &namespace_ident_a_b_c,
2104 ])
2105 .await;
2106
2107 let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
2108 create_tables(&catalog, vec![&src_table_ident]).await;
2109
2110 let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
2111 catalog
2112 .rename_table(&src_table_ident, &dst_table_ident)
2113 .await
2114 .unwrap();
2115
2116 assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
2117
2118 assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
2119 }
2120
2121 #[tokio::test]
2122 async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
2123 let warehouse_loc = temp_path();
2124 let catalog = new_sql_catalog(warehouse_loc).await;
2125 let src_namespace_ident = NamespaceIdent::new("n1".into());
2126 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
2127 create_namespace(&catalog, &src_namespace_ident).await;
2128 create_table(&catalog, &src_table_ident).await;
2129
2130 let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
2131 let dst_table_ident =
2132 TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
2133 assert_eq!(
2134 catalog
2135 .rename_table(&src_table_ident, &dst_table_ident)
2136 .await
2137 .unwrap_err()
2138 .to_string(),
2139 format!("Unexpected => No such namespace: {non_existent_dst_namespace_ident:?}"),
2140 );
2141 }
2142
2143 #[tokio::test]
2144 async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
2145 let warehouse_loc = temp_path();
2146 let catalog = new_sql_catalog(warehouse_loc).await;
2147 let namespace_ident = NamespaceIdent::new("n1".into());
2148 create_namespace(&catalog, &namespace_ident).await;
2149 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2150 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
2151
2152 assert_eq!(
2153 catalog
2154 .rename_table(&src_table_ident, &dst_table_ident)
2155 .await
2156 .unwrap_err()
2157 .to_string(),
2158 format!("Unexpected => No such table: {src_table_ident:?}"),
2159 );
2160 }
2161
2162 #[tokio::test]
2163 async fn test_rename_table_throws_error_if_dst_table_already_exists() {
2164 let warehouse_loc = temp_path();
2165 let catalog = new_sql_catalog(warehouse_loc).await;
2166 let namespace_ident = NamespaceIdent::new("n1".into());
2167 create_namespace(&catalog, &namespace_ident).await;
2168 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2169 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
2170 create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
2171
2172 assert_eq!(
2173 catalog
2174 .rename_table(&src_table_ident, &dst_table_ident)
2175 .await
2176 .unwrap_err()
2177 .to_string(),
2178 format!("Unexpected => Table {:?} already exists.", &dst_table_ident),
2179 );
2180 }
2181
2182 #[tokio::test]
2183 async fn test_drop_table_throws_error_if_table_not_exist() {
2184 let warehouse_loc = temp_path();
2185 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
2186 let namespace_ident = NamespaceIdent::new("a".into());
2187 let table_name = "tbl1";
2188 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2189 create_namespace(&catalog, &namespace_ident).await;
2190
2191 let err = catalog
2192 .drop_table(&table_ident)
2193 .await
2194 .unwrap_err()
2195 .to_string();
2196 assert_eq!(
2197 err,
2198 "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
2199 );
2200 }
2201
2202 #[tokio::test]
2203 async fn test_drop_table() {
2204 let warehouse_loc = temp_path();
2205 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
2206 let namespace_ident = NamespaceIdent::new("a".into());
2207 let table_name = "tbl1";
2208 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2209 create_namespace(&catalog, &namespace_ident).await;
2210
2211 let location = warehouse_loc.clone();
2212 let table_creation = TableCreation::builder()
2213 .name(table_name.into())
2214 .location(location.clone())
2215 .schema(simple_table_schema())
2216 .build();
2217
2218 catalog
2219 .create_table(&namespace_ident, table_creation)
2220 .await
2221 .unwrap();
2222
2223 let table = catalog.load_table(&table_ident).await.unwrap();
2224 assert_table_eq(&table, &table_ident, &simple_table_schema());
2225
2226 catalog.drop_table(&table_ident).await.unwrap();
2227 let err = catalog
2228 .load_table(&table_ident)
2229 .await
2230 .unwrap_err()
2231 .to_string();
2232 assert_eq!(
2233 err,
2234 "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
2235 );
2236 }
2237
2238 #[tokio::test]
2239 async fn test_register_table_throws_error_if_table_with_same_name_already_exists() {
2240 let warehouse_loc = temp_path();
2241 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
2242 let namespace_ident = NamespaceIdent::new("a".into());
2243 create_namespace(&catalog, &namespace_ident).await;
2244 let table_name = "tbl1";
2245 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2246 create_table(&catalog, &table_ident).await;
2247
2248 assert_eq!(
2249 catalog
2250 .register_table(&table_ident, warehouse_loc)
2251 .await
2252 .unwrap_err()
2253 .to_string(),
2254 format!("Unexpected => Table {:?} already exists.", &table_ident)
2255 );
2256 }
2257
2258 #[tokio::test]
2259 async fn test_register_table() {
2260 let warehouse_loc = temp_path();
2261 let catalog = new_sql_catalog(warehouse_loc.clone()).await;
2262 let namespace_ident = NamespaceIdent::new("a".into());
2263 create_namespace(&catalog, &namespace_ident).await;
2264
2265 let table_name = "abc";
2266 let location = warehouse_loc.clone();
2267 let table_creation = TableCreation::builder()
2268 .name(table_name.into())
2269 .location(location.clone())
2270 .schema(simple_table_schema())
2271 .build();
2272
2273 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
2274 let expected_table = catalog
2275 .create_table(&namespace_ident, table_creation)
2276 .await
2277 .unwrap();
2278
2279 let metadata_location = expected_table
2280 .metadata_location()
2281 .expect("Expected metadata location to be set")
2282 .to_string();
2283
2284 assert_table_eq(&expected_table, &table_ident, &simple_table_schema());
2285
2286 let _ = catalog.drop_table(&table_ident).await;
2287
2288 let table = catalog
2289 .register_table(&table_ident, metadata_location.clone())
2290 .await
2291 .unwrap();
2292
2293 assert_eq!(table.identifier(), expected_table.identifier());
2294 assert_eq!(table.metadata_location(), Some(metadata_location.as_str()));
2295 }
2296}