1use std::collections::HashMap;
19use std::future::Future;
20
21use async_trait::async_trait;
22use aws_sdk_s3tables::operation::create_table::CreateTableOutput;
23use aws_sdk_s3tables::operation::get_namespace::GetNamespaceOutput;
24use aws_sdk_s3tables::operation::get_table::GetTableOutput;
25use aws_sdk_s3tables::operation::list_tables::ListTablesOutput;
26use aws_sdk_s3tables::operation::update_table_metadata_location::UpdateTableMetadataLocationError;
27use aws_sdk_s3tables::types::OpenTableFormat;
28use iceberg::io::{FileIO, FileIOBuilder};
29use iceberg::spec::{TableMetadata, TableMetadataBuilder};
30use iceberg::table::Table;
31use iceberg::{
32 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
33 TableCommit, TableCreation, TableIdent,
34};
35
36use crate::utils::create_sdk_config;
37
38pub const S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN: &str = "table_bucket_arn";
40pub const S3TABLES_CATALOG_PROP_ENDPOINT_URL: &str = "endpoint_url";
42
43#[derive(Debug)]
45struct S3TablesCatalogConfig {
46 name: Option<String>,
48 table_bucket_arn: String,
53 endpoint_url: Option<String>,
55 client: Option<aws_sdk_s3tables::Client>,
57 props: HashMap<String, String>,
64}
65
66#[derive(Debug)]
68pub struct S3TablesCatalogBuilder(S3TablesCatalogConfig);
69
70impl Default for S3TablesCatalogBuilder {
72 fn default() -> Self {
73 Self(S3TablesCatalogConfig {
74 name: None,
75 table_bucket_arn: "".to_string(),
76 endpoint_url: None,
77 client: None,
78 props: HashMap::new(),
79 })
80 }
81}
82
83impl S3TablesCatalogBuilder {
85 pub fn with_endpoint_url(mut self, endpoint_url: impl Into<String>) -> Self {
94 self.0.endpoint_url = Some(endpoint_url.into());
95 self
96 }
97
98 pub fn with_client(mut self, client: aws_sdk_s3tables::Client) -> Self {
100 self.0.client = Some(client);
101 self
102 }
103
104 pub fn with_table_bucket_arn(mut self, table_bucket_arn: impl Into<String>) -> Self {
113 self.0.table_bucket_arn = table_bucket_arn.into();
114 self
115 }
116}
117
118impl CatalogBuilder for S3TablesCatalogBuilder {
119 type C = S3TablesCatalog;
120
121 fn load(
122 mut self,
123 name: impl Into<String>,
124 props: HashMap<String, String>,
125 ) -> impl Future<Output = Result<Self::C>> + Send {
126 let catalog_name = name.into();
127 self.0.name = Some(catalog_name.clone());
128
129 if props.contains_key(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN) {
130 self.0.table_bucket_arn = props
131 .get(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN)
132 .cloned()
133 .unwrap_or_default();
134 }
135
136 if props.contains_key(S3TABLES_CATALOG_PROP_ENDPOINT_URL) {
137 self.0.endpoint_url = props.get(S3TABLES_CATALOG_PROP_ENDPOINT_URL).cloned();
138 }
139
140 self.0.props = props
142 .into_iter()
143 .filter(|(k, _)| {
144 k != S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN
145 && k != S3TABLES_CATALOG_PROP_ENDPOINT_URL
146 })
147 .collect();
148
149 async move {
150 if catalog_name.trim().is_empty() {
151 Err(Error::new(
152 ErrorKind::DataInvalid,
153 "Catalog name cannot be empty",
154 ))
155 } else if self.0.table_bucket_arn.is_empty() {
156 Err(Error::new(
157 ErrorKind::DataInvalid,
158 "Table bucket ARN is required",
159 ))
160 } else {
161 S3TablesCatalog::new(self.0).await
162 }
163 }
164 }
165}
166
167#[derive(Debug)]
169pub struct S3TablesCatalog {
170 config: S3TablesCatalogConfig,
171 s3tables_client: aws_sdk_s3tables::Client,
172 file_io: FileIO,
173}
174
175impl S3TablesCatalog {
176 async fn new(config: S3TablesCatalogConfig) -> Result<Self> {
178 let s3tables_client = if let Some(client) = config.client.clone() {
179 client
180 } else {
181 let aws_config = create_sdk_config(&config.props, config.endpoint_url.clone()).await;
182 aws_sdk_s3tables::Client::new(&aws_config)
183 };
184
185 let file_io = FileIOBuilder::new("s3").with_props(&config.props).build()?;
186
187 Ok(Self {
188 config,
189 s3tables_client,
190 file_io,
191 })
192 }
193
194 async fn load_table_with_version_token(
195 &self,
196 table_ident: &TableIdent,
197 ) -> Result<(Table, String)> {
198 let req = self
199 .s3tables_client
200 .get_table()
201 .table_bucket_arn(self.config.table_bucket_arn.clone())
202 .namespace(table_ident.namespace().to_url_string())
203 .name(table_ident.name());
204 let resp: GetTableOutput = req.send().await.map_err(from_aws_sdk_error)?;
205
206 let metadata_location = resp.metadata_location().ok_or_else(|| {
208 Error::new(
209 ErrorKind::Unexpected,
210 format!(
211 "Table {} does not have metadata location",
212 table_ident.name()
213 ),
214 )
215 })?;
216 let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
217
218 let table = Table::builder()
219 .identifier(table_ident.clone())
220 .metadata(metadata)
221 .metadata_location(metadata_location)
222 .file_io(self.file_io.clone())
223 .build()?;
224 Ok((table, resp.version_token))
225 }
226}
227
228#[async_trait]
229impl Catalog for S3TablesCatalog {
230 async fn list_namespaces(
235 &self,
236 parent: Option<&NamespaceIdent>,
237 ) -> Result<Vec<NamespaceIdent>> {
238 if parent.is_some() {
239 return Ok(vec![]);
240 }
241
242 let mut result = Vec::new();
243 let mut continuation_token = None;
244 loop {
245 let mut req = self
246 .s3tables_client
247 .list_namespaces()
248 .table_bucket_arn(self.config.table_bucket_arn.clone());
249 if let Some(token) = continuation_token {
250 req = req.continuation_token(token);
251 }
252 let resp = req.send().await.map_err(from_aws_sdk_error)?;
253 for ns in resp.namespaces() {
254 result.push(NamespaceIdent::from_vec(ns.namespace().to_vec())?);
255 }
256 continuation_token = resp.continuation_token().map(|s| s.to_string());
257 if continuation_token.is_none() {
258 break;
259 }
260 }
261 Ok(result)
262 }
263
264 async fn create_namespace(
281 &self,
282 namespace: &NamespaceIdent,
283 _properties: HashMap<String, String>,
284 ) -> Result<Namespace> {
285 let req = self
286 .s3tables_client
287 .create_namespace()
288 .table_bucket_arn(self.config.table_bucket_arn.clone())
289 .namespace(namespace.to_url_string());
290 req.send().await.map_err(from_aws_sdk_error)?;
291 Ok(Namespace::with_properties(
292 namespace.clone(),
293 HashMap::new(),
294 ))
295 }
296
297 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
307 let req = self
308 .s3tables_client
309 .get_namespace()
310 .table_bucket_arn(self.config.table_bucket_arn.clone())
311 .namespace(namespace.to_url_string());
312 let resp: GetNamespaceOutput = req.send().await.map_err(from_aws_sdk_error)?;
313 let properties = HashMap::new();
314 Ok(Namespace::with_properties(
315 NamespaceIdent::from_vec(resp.namespace().to_vec())?,
316 properties,
317 ))
318 }
319
320 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
333 let req = self
334 .s3tables_client
335 .get_namespace()
336 .table_bucket_arn(self.config.table_bucket_arn.clone())
337 .namespace(namespace.to_url_string());
338 match req.send().await {
339 Ok(_) => Ok(true),
340 Err(err) => {
341 if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
342 Ok(false)
343 } else {
344 Err(from_aws_sdk_error(err))
345 }
346 }
347 }
348 }
349
350 async fn update_namespace(
355 &self,
356 _namespace: &NamespaceIdent,
357 _properties: HashMap<String, String>,
358 ) -> Result<()> {
359 Err(Error::new(
360 ErrorKind::FeatureUnsupported,
361 "Update namespace is not supported for s3tables catalog",
362 ))
363 }
364
365 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
374 let req = self
375 .s3tables_client
376 .delete_namespace()
377 .table_bucket_arn(self.config.table_bucket_arn.clone())
378 .namespace(namespace.to_url_string());
379 req.send().await.map_err(from_aws_sdk_error)?;
380 Ok(())
381 }
382
383 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
392 let mut result = Vec::new();
393 let mut continuation_token = None;
394 loop {
395 let mut req = self
396 .s3tables_client
397 .list_tables()
398 .table_bucket_arn(self.config.table_bucket_arn.clone())
399 .namespace(namespace.to_url_string());
400 if let Some(token) = continuation_token {
401 req = req.continuation_token(token);
402 }
403 let resp: ListTablesOutput = req.send().await.map_err(from_aws_sdk_error)?;
404 for table in resp.tables() {
405 result.push(TableIdent::new(
406 NamespaceIdent::from_vec(table.namespace().to_vec())?,
407 table.name().to_string(),
408 ));
409 }
410 continuation_token = resp.continuation_token().map(|s| s.to_string());
411 if continuation_token.is_none() {
412 break;
413 }
414 }
415 Ok(result)
416 }
417
418 async fn create_table(
433 &self,
434 namespace: &NamespaceIdent,
435 mut creation: TableCreation,
436 ) -> Result<Table> {
437 let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
438
439 let create_resp: CreateTableOutput = self
441 .s3tables_client
442 .create_table()
443 .table_bucket_arn(self.config.table_bucket_arn.clone())
444 .namespace(namespace.to_url_string())
445 .format(OpenTableFormat::Iceberg)
446 .name(table_ident.name())
447 .send()
448 .await
449 .map_err(from_aws_sdk_error)?;
450
451 let metadata_location = match &creation.location {
454 Some(_) => {
455 return Err(Error::new(
456 ErrorKind::DataInvalid,
457 "The location of the table is generated by s3tables catalog, can't be set by user.",
458 ));
459 }
460 None => {
461 let get_resp: GetTableOutput = self
462 .s3tables_client
463 .get_table()
464 .table_bucket_arn(self.config.table_bucket_arn.clone())
465 .namespace(namespace.to_url_string())
466 .name(table_ident.name())
467 .send()
468 .await
469 .map_err(from_aws_sdk_error)?;
470 let warehouse_location = get_resp.warehouse_location().to_string();
471 MetadataLocation::new_with_table_location(warehouse_location).to_string()
472 }
473 };
474
475 creation.location = Some(metadata_location.clone());
477 let metadata = TableMetadataBuilder::from_table_creation(creation)?
478 .build()?
479 .metadata;
480 metadata.write_to(&self.file_io, &metadata_location).await?;
481
482 self.s3tables_client
484 .update_table_metadata_location()
485 .table_bucket_arn(self.config.table_bucket_arn.clone())
486 .namespace(namespace.to_url_string())
487 .name(table_ident.name())
488 .metadata_location(metadata_location.clone())
489 .version_token(create_resp.version_token())
490 .send()
491 .await
492 .map_err(from_aws_sdk_error)?;
493
494 let table = Table::builder()
495 .identifier(table_ident)
496 .metadata_location(metadata_location)
497 .metadata(metadata)
498 .file_io(self.file_io.clone())
499 .build()?;
500 Ok(table)
501 }
502
503 async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
514 Ok(self.load_table_with_version_token(table_ident).await?.0)
515 }
516
517 async fn drop_table(&self, table: &TableIdent) -> Result<()> {
526 let req = self
527 .s3tables_client
528 .delete_table()
529 .table_bucket_arn(self.config.table_bucket_arn.clone())
530 .namespace(table.namespace().to_url_string())
531 .name(table.name());
532 req.send().await.map_err(from_aws_sdk_error)?;
533 Ok(())
534 }
535
536 async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
549 let req = self
550 .s3tables_client
551 .get_table()
552 .table_bucket_arn(self.config.table_bucket_arn.clone())
553 .namespace(table_ident.namespace().to_url_string())
554 .name(table_ident.name());
555 match req.send().await {
556 Ok(_) => Ok(true),
557 Err(err) => {
558 if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
559 Ok(false)
560 } else {
561 Err(from_aws_sdk_error(err))
562 }
563 }
564 }
565 }
566
567 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
576 let req = self
577 .s3tables_client
578 .rename_table()
579 .table_bucket_arn(self.config.table_bucket_arn.clone())
580 .namespace(src.namespace().to_url_string())
581 .name(src.name())
582 .new_namespace_name(dest.namespace().to_url_string())
583 .new_name(dest.name());
584 req.send().await.map_err(from_aws_sdk_error)?;
585 Ok(())
586 }
587
588 async fn register_table(
589 &self,
590 _table_ident: &TableIdent,
591 _metadata_location: String,
592 ) -> Result<Table> {
593 Err(Error::new(
594 ErrorKind::FeatureUnsupported,
595 "Registering a table is not supported yet",
596 ))
597 }
598
599 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
601 let table_ident = commit.identifier().clone();
602 let table_namespace = table_ident.namespace();
603 let (current_table, version_token) =
604 self.load_table_with_version_token(&table_ident).await?;
605
606 let staged_table = commit.apply(current_table)?;
607 let staged_metadata_location = staged_table.metadata_location_result()?;
608
609 staged_table
610 .metadata()
611 .write_to(staged_table.file_io(), staged_metadata_location)
612 .await?;
613
614 let builder = self
615 .s3tables_client
616 .update_table_metadata_location()
617 .table_bucket_arn(&self.config.table_bucket_arn)
618 .namespace(table_namespace.to_url_string())
619 .name(table_ident.name())
620 .version_token(version_token)
621 .metadata_location(staged_metadata_location);
622
623 let _ = builder.send().await.map_err(|e| {
624 let error = e.into_service_error();
625 match error {
626 UpdateTableMetadataLocationError::ConflictException(_) => Error::new(
627 ErrorKind::CatalogCommitConflicts,
628 format!("Commit conflicted for table: {table_ident}"),
629 )
630 .with_retryable(true),
631 UpdateTableMetadataLocationError::NotFoundException(_) => Error::new(
632 ErrorKind::TableNotFound,
633 format!("Table {table_ident} is not found"),
634 ),
635 _ => Error::new(
636 ErrorKind::Unexpected,
637 "Operation failed for hitting aws sdk error",
638 ),
639 }
640 .with_source(anyhow::Error::msg(format!("aws sdk error: {error:?}")))
641 })?;
642
643 Ok(staged_table)
644 }
645}
646
647pub(crate) fn from_aws_sdk_error<T>(error: aws_sdk_s3tables::error::SdkError<T>) -> Error
649where T: std::fmt::Debug {
650 Error::new(
651 ErrorKind::Unexpected,
652 format!("Operation failed for hitting aws sdk error: {error:?}"),
653 )
654}
655
656#[cfg(test)]
657mod tests {
658 use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
659 use iceberg::transaction::{ApplyTransactionAction, Transaction};
660
661 use super::*;
662
663 async fn load_s3tables_catalog_from_env() -> Result<Option<S3TablesCatalog>> {
664 let table_bucket_arn = match std::env::var("TABLE_BUCKET_ARN").ok() {
665 Some(table_bucket_arn) => table_bucket_arn,
666 None => return Ok(None),
667 };
668
669 let config = S3TablesCatalogConfig {
670 name: None,
671 table_bucket_arn,
672 endpoint_url: None,
673 client: None,
674 props: HashMap::new(),
675 };
676
677 Ok(Some(S3TablesCatalog::new(config).await?))
678 }
679
680 #[tokio::test]
681 async fn test_s3tables_list_namespace() {
682 let catalog = match load_s3tables_catalog_from_env().await {
683 Ok(Some(catalog)) => catalog,
684 Ok(None) => return,
685 Err(e) => panic!("Error loading catalog: {e}"),
686 };
687
688 let namespaces = catalog.list_namespaces(None).await.unwrap();
689 assert!(!namespaces.is_empty());
690 }
691
692 #[tokio::test]
693 async fn test_s3tables_list_tables() {
694 let catalog = match load_s3tables_catalog_from_env().await {
695 Ok(Some(catalog)) => catalog,
696 Ok(None) => return,
697 Err(e) => panic!("Error loading catalog: {e}"),
698 };
699
700 let tables = catalog
701 .list_tables(&NamespaceIdent::new("aws_s3_metadata".to_string()))
702 .await
703 .unwrap();
704 assert!(!tables.is_empty());
705 }
706
707 #[tokio::test]
708 async fn test_s3tables_load_table() {
709 let catalog = match load_s3tables_catalog_from_env().await {
710 Ok(Some(catalog)) => catalog,
711 Ok(None) => return,
712 Err(e) => panic!("Error loading catalog: {e}"),
713 };
714
715 let table = catalog
716 .load_table(&TableIdent::new(
717 NamespaceIdent::new("aws_s3_metadata".to_string()),
718 "query_storage_metadata".to_string(),
719 ))
720 .await
721 .unwrap();
722 println!("{table:?}");
723 }
724
725 #[tokio::test]
726 async fn test_s3tables_create_delete_namespace() {
727 let catalog = match load_s3tables_catalog_from_env().await {
728 Ok(Some(catalog)) => catalog,
729 Ok(None) => return,
730 Err(e) => panic!("Error loading catalog: {e}"),
731 };
732
733 let namespace = NamespaceIdent::new("test_s3tables_create_delete_namespace".to_string());
734 catalog
735 .create_namespace(&namespace, HashMap::new())
736 .await
737 .unwrap();
738 assert!(catalog.namespace_exists(&namespace).await.unwrap());
739 catalog.drop_namespace(&namespace).await.unwrap();
740 assert!(!catalog.namespace_exists(&namespace).await.unwrap());
741 }
742
743 #[tokio::test]
744 async fn test_s3tables_create_delete_table() {
745 let catalog = match load_s3tables_catalog_from_env().await {
746 Ok(Some(catalog)) => catalog,
747 Ok(None) => return,
748 Err(e) => panic!("Error loading catalog: {e}"),
749 };
750
751 let creation = {
752 let schema = Schema::builder()
753 .with_schema_id(0)
754 .with_fields(vec![
755 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
756 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
757 ])
758 .build()
759 .unwrap();
760 TableCreation::builder()
761 .name("test_s3tables_create_delete_table".to_string())
762 .properties(HashMap::new())
763 .schema(schema)
764 .build()
765 };
766
767 let namespace = NamespaceIdent::new("test_s3tables_create_delete_table".to_string());
768 let table_ident = TableIdent::new(
769 namespace.clone(),
770 "test_s3tables_create_delete_table".to_string(),
771 );
772 catalog.drop_namespace(&namespace).await.ok();
773 catalog.drop_table(&table_ident).await.ok();
774
775 catalog
776 .create_namespace(&namespace, HashMap::new())
777 .await
778 .unwrap();
779 catalog.create_table(&namespace, creation).await.unwrap();
780 assert!(catalog.table_exists(&table_ident).await.unwrap());
781 catalog.drop_table(&table_ident).await.unwrap();
782 assert!(!catalog.table_exists(&table_ident).await.unwrap());
783 catalog.drop_namespace(&namespace).await.unwrap();
784 }
785
786 #[tokio::test]
787 async fn test_s3tables_update_table() {
788 let catalog = match load_s3tables_catalog_from_env().await {
789 Ok(Some(catalog)) => catalog,
790 Ok(None) => return,
791 Err(e) => panic!("Error loading catalog: {e}"),
792 };
793
794 let namespace = NamespaceIdent::new("test_s3tables_update_table".to_string());
796 let table_ident =
797 TableIdent::new(namespace.clone(), "test_s3tables_update_table".to_string());
798
799 catalog.drop_table(&table_ident).await.ok();
801 catalog.drop_namespace(&namespace).await.ok();
802
803 catalog
805 .create_namespace(&namespace, HashMap::new())
806 .await
807 .unwrap();
808
809 let creation = {
810 let schema = Schema::builder()
811 .with_schema_id(0)
812 .with_fields(vec![
813 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
814 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
815 ])
816 .build()
817 .unwrap();
818 TableCreation::builder()
819 .name(table_ident.name().to_string())
820 .properties(HashMap::new())
821 .schema(schema)
822 .build()
823 };
824
825 let table = catalog.create_table(&namespace, creation).await.unwrap();
826
827 let tx = Transaction::new(&table);
829
830 let original_metadata_location = table.metadata_location();
832
833 let tx = tx
835 .update_table_properties()
836 .set("test_property".to_string(), "test_value".to_string())
837 .apply(tx)
838 .unwrap();
839
840 let updated_table = tx.commit(&catalog).await.unwrap();
842
843 assert_eq!(
845 updated_table.metadata().properties().get("test_property"),
846 Some(&"test_value".to_string())
847 );
848
849 assert_ne!(
851 updated_table.metadata_location(),
852 original_metadata_location,
853 "Metadata location should be updated after commit"
854 );
855
856 let reloaded_table = catalog.load_table(&table_ident).await.unwrap();
858
859 assert_eq!(
861 reloaded_table.metadata().properties().get("test_property"),
862 Some(&"test_value".to_string())
863 );
864 assert_eq!(
865 reloaded_table.metadata_location(),
866 updated_table.metadata_location(),
867 "Reloaded table should have the same metadata location as the updated table"
868 );
869 }
870
871 #[tokio::test]
872 async fn test_builder_load_missing_bucket_arn() {
873 let builder = S3TablesCatalogBuilder::default();
874 let result = builder.load("s3tables", HashMap::new()).await;
875
876 assert!(result.is_err());
877 if let Err(err) = result {
878 assert_eq!(err.kind(), ErrorKind::DataInvalid);
879 assert_eq!(err.message(), "Table bucket ARN is required");
880 }
881 }
882
883 #[tokio::test]
884 async fn test_builder_with_endpoint_url_ok() {
885 let builder = S3TablesCatalogBuilder::default().with_endpoint_url("http://localhost:4566");
886
887 let result = builder
888 .load(
889 "s3tables",
890 HashMap::from([
891 (
892 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
893 "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
894 ),
895 ("some_prop".to_string(), "some_value".to_string()),
896 ]),
897 )
898 .await;
899
900 assert!(result.is_ok());
901 }
902
903 #[tokio::test]
904 async fn test_builder_with_client_ok() {
905 use aws_config::BehaviorVersion;
906
907 let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
908 let client = aws_sdk_s3tables::Client::new(&sdk_config);
909
910 let builder = S3TablesCatalogBuilder::default().with_client(client);
911 let result = builder
912 .load(
913 "s3tables",
914 HashMap::from([(
915 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
916 "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
917 )]),
918 )
919 .await;
920
921 assert!(result.is_ok());
922 }
923
924 #[tokio::test]
925 async fn test_builder_with_table_bucket_arn() {
926 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
927 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
928
929 let result = builder.load("s3tables", HashMap::new()).await;
930
931 assert!(result.is_ok());
932 let catalog = result.unwrap();
933 assert_eq!(catalog.config.table_bucket_arn, test_arn);
934 }
935
936 #[tokio::test]
937 async fn test_builder_empty_table_bucket_arn_edge_cases() {
938 let mut props = HashMap::new();
939 props.insert(
940 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
941 "".to_string(),
942 );
943
944 let builder = S3TablesCatalogBuilder::default();
945 let result = builder.load("s3tables", props).await;
946
947 assert!(result.is_err());
948 if let Err(err) = result {
949 assert_eq!(err.kind(), ErrorKind::DataInvalid);
950 assert_eq!(err.message(), "Table bucket ARN is required");
951 }
952 }
953
954 #[tokio::test]
955 async fn test_endpoint_url_property_overrides_builder_method() {
956 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
957 let builder_endpoint = "http://localhost:4566";
958 let property_endpoint = "http://localhost:8080";
959
960 let builder = S3TablesCatalogBuilder::default()
961 .with_table_bucket_arn(test_arn)
962 .with_endpoint_url(builder_endpoint);
963
964 let mut props = HashMap::new();
965 props.insert(
966 S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
967 property_endpoint.to_string(),
968 );
969
970 let result = builder.load("s3tables", props).await;
971
972 assert!(result.is_ok());
973 let catalog = result.unwrap();
974
975 assert_eq!(
977 catalog.config.endpoint_url,
978 Some(property_endpoint.to_string())
979 );
980 assert_ne!(
981 catalog.config.endpoint_url,
982 Some(builder_endpoint.to_string())
983 );
984 }
985
986 #[tokio::test]
987 async fn test_endpoint_url_builder_method_only() {
988 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
989 let builder_endpoint = "http://localhost:4566";
990
991 let builder = S3TablesCatalogBuilder::default()
992 .with_table_bucket_arn(test_arn)
993 .with_endpoint_url(builder_endpoint);
994
995 let result = builder.load("s3tables", HashMap::new()).await;
996
997 assert!(result.is_ok());
998 let catalog = result.unwrap();
999
1000 assert_eq!(
1001 catalog.config.endpoint_url,
1002 Some(builder_endpoint.to_string())
1003 );
1004 }
1005
1006 #[tokio::test]
1007 async fn test_endpoint_url_property_only() {
1008 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1009 let property_endpoint = "http://localhost:8080";
1010
1011 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1012
1013 let mut props = HashMap::new();
1014 props.insert(
1015 S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1016 property_endpoint.to_string(),
1017 );
1018
1019 let result = builder.load("s3tables", props).await;
1020
1021 assert!(result.is_ok());
1022 let catalog = result.unwrap();
1023
1024 assert_eq!(
1025 catalog.config.endpoint_url,
1026 Some(property_endpoint.to_string())
1027 );
1028 }
1029
1030 #[tokio::test]
1031 async fn test_table_bucket_arn_property_overrides_builder_method() {
1032 let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1033 let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1034
1035 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1036
1037 let mut props = HashMap::new();
1038 props.insert(
1039 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1040 property_arn.to_string(),
1041 );
1042
1043 let result = builder.load("s3tables", props).await;
1044
1045 assert!(result.is_ok());
1046 let catalog = result.unwrap();
1047
1048 assert_eq!(catalog.config.table_bucket_arn, property_arn);
1049 assert_ne!(catalog.config.table_bucket_arn, builder_arn);
1050 }
1051
1052 #[tokio::test]
1053 async fn test_table_bucket_arn_builder_method_only() {
1054 let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1055
1056 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1057
1058 let result = builder.load("s3tables", HashMap::new()).await;
1059
1060 assert!(result.is_ok());
1061 let catalog = result.unwrap();
1062
1063 assert_eq!(catalog.config.table_bucket_arn, builder_arn);
1064 }
1065
1066 #[tokio::test]
1067 async fn test_table_bucket_arn_property_only() {
1068 let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1069
1070 let builder = S3TablesCatalogBuilder::default();
1071
1072 let mut props = HashMap::new();
1073 props.insert(
1074 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1075 property_arn.to_string(),
1076 );
1077
1078 let result = builder.load("s3tables", props).await;
1079
1080 assert!(result.is_ok());
1081 let catalog = result.unwrap();
1082
1083 assert_eq!(catalog.config.table_bucket_arn, property_arn);
1084 }
1085
1086 #[tokio::test]
1087 async fn test_builder_empty_name_validation() {
1088 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1089 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1090
1091 let result = builder.load("", HashMap::new()).await;
1092
1093 assert!(result.is_err());
1094 if let Err(err) = result {
1095 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1096 assert_eq!(err.message(), "Catalog name cannot be empty");
1097 }
1098 }
1099
1100 #[tokio::test]
1101 async fn test_builder_whitespace_only_name_validation() {
1102 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1103 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1104
1105 let result = builder.load(" \t\n ", HashMap::new()).await;
1106
1107 assert!(result.is_err());
1108 if let Err(err) = result {
1109 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1110 assert_eq!(err.message(), "Catalog name cannot be empty");
1111 }
1112 }
1113
1114 #[tokio::test]
1115 async fn test_builder_name_validation_with_missing_arn() {
1116 let builder = S3TablesCatalogBuilder::default();
1117
1118 let result = builder.load("", HashMap::new()).await;
1119
1120 assert!(result.is_err());
1121 if let Err(err) = result {
1122 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1123 assert_eq!(err.message(), "Catalog name cannot be empty");
1124 }
1125 }
1126}