1use std::collections::HashMap;
19use std::future::Future;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use aws_sdk_s3tables::operation::create_table::CreateTableOutput;
25use aws_sdk_s3tables::operation::get_namespace::GetNamespaceOutput;
26use aws_sdk_s3tables::operation::get_table::GetTableOutput;
27use aws_sdk_s3tables::operation::list_tables::ListTablesOutput;
28use aws_sdk_s3tables::operation::update_table_metadata_location::UpdateTableMetadataLocationError;
29use aws_sdk_s3tables::types::OpenTableFormat;
30use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
31use iceberg::spec::{TableMetadata, TableMetadataBuilder};
32use iceberg::table::Table;
33use iceberg::{
34 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
35 TableCommit, TableCreation, TableIdent,
36};
37use iceberg_storage_opendal::OpenDalStorageFactory;
38
39use crate::utils::create_sdk_config;
40
41pub const S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN: &str = "table_bucket_arn";
43pub const S3TABLES_CATALOG_PROP_ENDPOINT_URL: &str = "endpoint_url";
45
46#[derive(Debug)]
48struct S3TablesCatalogConfig {
49 name: Option<String>,
51 table_bucket_arn: String,
56 endpoint_url: Option<String>,
58 client: Option<aws_sdk_s3tables::Client>,
60 props: HashMap<String, String>,
67}
68
69#[derive(Debug)]
71pub struct S3TablesCatalogBuilder {
72 config: S3TablesCatalogConfig,
73 storage_factory: Option<Arc<dyn StorageFactory>>,
74}
75
76impl Default for S3TablesCatalogBuilder {
78 fn default() -> Self {
79 Self {
80 config: S3TablesCatalogConfig {
81 name: None,
82 table_bucket_arn: "".to_string(),
83 endpoint_url: None,
84 client: None,
85 props: HashMap::new(),
86 },
87 storage_factory: None,
88 }
89 }
90}
91
92impl S3TablesCatalogBuilder {
94 pub fn with_endpoint_url(mut self, endpoint_url: impl Into<String>) -> Self {
103 self.config.endpoint_url = Some(endpoint_url.into());
104 self
105 }
106
107 pub fn with_client(mut self, client: aws_sdk_s3tables::Client) -> Self {
109 self.config.client = Some(client);
110 self
111 }
112
113 pub fn with_table_bucket_arn(mut self, table_bucket_arn: impl Into<String>) -> Self {
122 self.config.table_bucket_arn = table_bucket_arn.into();
123 self
124 }
125}
126
127impl CatalogBuilder for S3TablesCatalogBuilder {
128 type C = S3TablesCatalog;
129
130 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
131 self.storage_factory = Some(storage_factory);
132 self
133 }
134
135 fn load(
136 mut self,
137 name: impl Into<String>,
138 props: HashMap<String, String>,
139 ) -> impl Future<Output = Result<Self::C>> + Send {
140 let catalog_name = name.into();
141 self.config.name = Some(catalog_name.clone());
142
143 if props.contains_key(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN) {
144 self.config.table_bucket_arn = props
145 .get(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN)
146 .cloned()
147 .unwrap_or_default();
148 }
149
150 if props.contains_key(S3TABLES_CATALOG_PROP_ENDPOINT_URL) {
151 self.config.endpoint_url = props.get(S3TABLES_CATALOG_PROP_ENDPOINT_URL).cloned();
152 }
153
154 self.config.props = props
156 .into_iter()
157 .filter(|(k, _)| {
158 k != S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN
159 && k != S3TABLES_CATALOG_PROP_ENDPOINT_URL
160 })
161 .collect();
162
163 async move {
164 if catalog_name.trim().is_empty() {
165 Err(Error::new(
166 ErrorKind::DataInvalid,
167 "Catalog name cannot be empty",
168 ))
169 } else if self.config.table_bucket_arn.is_empty() {
170 Err(Error::new(
171 ErrorKind::DataInvalid,
172 "Table bucket ARN is required",
173 ))
174 } else {
175 S3TablesCatalog::new(self.config, self.storage_factory).await
176 }
177 }
178 }
179}
180
181#[derive(Debug)]
183pub struct S3TablesCatalog {
184 config: S3TablesCatalogConfig,
185 s3tables_client: aws_sdk_s3tables::Client,
186 file_io: FileIO,
187}
188
189impl S3TablesCatalog {
190 async fn new(
192 config: S3TablesCatalogConfig,
193 storage_factory: Option<Arc<dyn StorageFactory>>,
194 ) -> Result<Self> {
195 let s3tables_client = if let Some(client) = config.client.clone() {
196 client
197 } else {
198 let aws_config = create_sdk_config(&config.props, config.endpoint_url.clone()).await;
199 aws_sdk_s3tables::Client::new(&aws_config)
200 };
201
202 let factory = storage_factory.unwrap_or_else(|| {
204 Arc::new(OpenDalStorageFactory::S3 {
205 configured_scheme: "s3a".to_string(),
206 customized_credential_load: None,
207 })
208 });
209 let file_io = FileIOBuilder::new(factory)
210 .with_props(&config.props)
211 .build();
212
213 Ok(Self {
214 config,
215 s3tables_client,
216 file_io,
217 })
218 }
219
220 async fn load_table_with_version_token(
221 &self,
222 table_ident: &TableIdent,
223 ) -> Result<(Table, String)> {
224 let req = self
225 .s3tables_client
226 .get_table()
227 .table_bucket_arn(self.config.table_bucket_arn.clone())
228 .namespace(table_ident.namespace().to_url_string())
229 .name(table_ident.name());
230 let resp: GetTableOutput = req.send().await.map_err(from_aws_sdk_error)?;
231
232 let metadata_location = resp.metadata_location().ok_or_else(|| {
234 Error::new(
235 ErrorKind::Unexpected,
236 format!(
237 "Table {} does not have metadata location",
238 table_ident.name()
239 ),
240 )
241 })?;
242 let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
243
244 let table = Table::builder()
245 .identifier(table_ident.clone())
246 .metadata(metadata)
247 .metadata_location(metadata_location)
248 .file_io(self.file_io.clone())
249 .build()?;
250 Ok((table, resp.version_token))
251 }
252}
253
254#[async_trait]
255impl Catalog for S3TablesCatalog {
256 async fn list_namespaces(
261 &self,
262 parent: Option<&NamespaceIdent>,
263 ) -> Result<Vec<NamespaceIdent>> {
264 if parent.is_some() {
265 return Ok(vec![]);
266 }
267
268 let mut result = Vec::new();
269 let mut continuation_token = None;
270 loop {
271 let mut req = self
272 .s3tables_client
273 .list_namespaces()
274 .table_bucket_arn(self.config.table_bucket_arn.clone());
275 if let Some(token) = continuation_token {
276 req = req.continuation_token(token);
277 }
278 let resp = req.send().await.map_err(from_aws_sdk_error)?;
279 for ns in resp.namespaces() {
280 result.push(NamespaceIdent::from_vec(ns.namespace().to_vec())?);
281 }
282 continuation_token = resp.continuation_token().map(|s| s.to_string());
283 if continuation_token.is_none() {
284 break;
285 }
286 }
287 Ok(result)
288 }
289
290 async fn create_namespace(
307 &self,
308 namespace: &NamespaceIdent,
309 _properties: HashMap<String, String>,
310 ) -> Result<Namespace> {
311 if self.namespace_exists(namespace).await? {
312 return Err(Error::new(
313 ErrorKind::NamespaceAlreadyExists,
314 format!("Namespace {namespace:?} already exists"),
315 ));
316 }
317
318 let req = self
319 .s3tables_client
320 .create_namespace()
321 .table_bucket_arn(self.config.table_bucket_arn.clone())
322 .namespace(namespace.to_url_string());
323 req.send().await.map_err(from_aws_sdk_error)?;
324 Ok(Namespace::with_properties(
325 namespace.clone(),
326 HashMap::new(),
327 ))
328 }
329
330 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
340 if !self.namespace_exists(namespace).await? {
341 return Err(Error::new(
342 ErrorKind::NamespaceNotFound,
343 format!("Namespace {namespace:?} does not exist"),
344 ));
345 }
346
347 let req = self
348 .s3tables_client
349 .get_namespace()
350 .table_bucket_arn(self.config.table_bucket_arn.clone())
351 .namespace(namespace.to_url_string());
352 let resp: GetNamespaceOutput = req.send().await.map_err(from_aws_sdk_error)?;
353 let properties = HashMap::new();
354 Ok(Namespace::with_properties(
355 NamespaceIdent::from_vec(resp.namespace().to_vec())?,
356 properties,
357 ))
358 }
359
360 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
373 let req = self
374 .s3tables_client
375 .get_namespace()
376 .table_bucket_arn(self.config.table_bucket_arn.clone())
377 .namespace(namespace.to_url_string());
378 match req.send().await {
379 Ok(_) => Ok(true),
380 Err(err) => {
381 if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
382 Ok(false)
383 } else {
384 Err(from_aws_sdk_error(err))
385 }
386 }
387 }
388 }
389
390 async fn update_namespace(
395 &self,
396 _namespace: &NamespaceIdent,
397 _properties: HashMap<String, String>,
398 ) -> Result<()> {
399 Err(Error::new(
400 ErrorKind::FeatureUnsupported,
401 "Update namespace is not supported for s3tables catalog",
402 ))
403 }
404
405 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
414 if !self.namespace_exists(namespace).await? {
415 return Err(Error::new(
416 ErrorKind::NamespaceNotFound,
417 format!("Namespace {namespace:?} does not exist"),
418 ));
419 }
420
421 let req = self
422 .s3tables_client
423 .delete_namespace()
424 .table_bucket_arn(self.config.table_bucket_arn.clone())
425 .namespace(namespace.to_url_string());
426 req.send().await.map_err(from_aws_sdk_error)?;
427 Ok(())
428 }
429
430 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
439 let mut result = Vec::new();
440 let mut continuation_token = None;
441 loop {
442 let mut req = self
443 .s3tables_client
444 .list_tables()
445 .table_bucket_arn(self.config.table_bucket_arn.clone())
446 .namespace(namespace.to_url_string());
447 if let Some(token) = continuation_token {
448 req = req.continuation_token(token);
449 }
450 let resp: ListTablesOutput = req.send().await.map_err(from_aws_sdk_error)?;
451 for table in resp.tables() {
452 result.push(TableIdent::new(
453 NamespaceIdent::from_vec(table.namespace().to_vec())?,
454 table.name().to_string(),
455 ));
456 }
457 continuation_token = resp.continuation_token().map(|s| s.to_string());
458 if continuation_token.is_none() {
459 break;
460 }
461 }
462 Ok(result)
463 }
464
465 async fn create_table(
480 &self,
481 namespace: &NamespaceIdent,
482 mut creation: TableCreation,
483 ) -> Result<Table> {
484 let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
485
486 let create_resp: CreateTableOutput = self
488 .s3tables_client
489 .create_table()
490 .table_bucket_arn(self.config.table_bucket_arn.clone())
491 .namespace(namespace.to_url_string())
492 .format(OpenTableFormat::Iceberg)
493 .name(table_ident.name())
494 .send()
495 .await
496 .map_err(from_aws_sdk_error)?;
497
498 let table_location = match &creation.location {
501 Some(_) => {
502 return Err(Error::new(
503 ErrorKind::DataInvalid,
504 "The location of the table is generated by s3tables catalog, can't be set by user.",
505 ));
506 }
507 None => {
508 let get_resp: GetTableOutput = self
509 .s3tables_client
510 .get_table()
511 .table_bucket_arn(self.config.table_bucket_arn.clone())
512 .namespace(namespace.to_url_string())
513 .name(table_ident.name())
514 .send()
515 .await
516 .map_err(from_aws_sdk_error)?;
517 get_resp.warehouse_location().to_string()
518 }
519 };
520
521 creation.location = Some(table_location.clone());
523 let metadata = TableMetadataBuilder::from_table_creation(creation)?
524 .build()?
525 .metadata;
526 let metadata_location = MetadataLocation::new_with_metadata(table_location, &metadata);
527 metadata.write_to(&self.file_io, &metadata_location).await?;
528
529 let metadata_location_str = metadata_location.to_string();
531 self.s3tables_client
532 .update_table_metadata_location()
533 .table_bucket_arn(self.config.table_bucket_arn.clone())
534 .namespace(namespace.to_url_string())
535 .name(table_ident.name())
536 .metadata_location(metadata_location_str.clone())
537 .version_token(create_resp.version_token())
538 .send()
539 .await
540 .map_err(from_aws_sdk_error)?;
541
542 let table = Table::builder()
543 .identifier(table_ident)
544 .metadata_location(metadata_location_str)
545 .metadata(metadata)
546 .file_io(self.file_io.clone())
547 .build()?;
548 Ok(table)
549 }
550
551 async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
562 Ok(self.load_table_with_version_token(table_ident).await?.0)
563 }
564
565 async fn drop_table(&self, table: &TableIdent) -> Result<()> {
574 let req = self
575 .s3tables_client
576 .delete_table()
577 .table_bucket_arn(self.config.table_bucket_arn.clone())
578 .namespace(table.namespace().to_url_string())
579 .name(table.name());
580 req.send().await.map_err(from_aws_sdk_error)?;
581 Ok(())
582 }
583
584 async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
597 let req = self
598 .s3tables_client
599 .get_table()
600 .table_bucket_arn(self.config.table_bucket_arn.clone())
601 .namespace(table_ident.namespace().to_url_string())
602 .name(table_ident.name());
603 match req.send().await {
604 Ok(_) => Ok(true),
605 Err(err) => {
606 if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
607 Ok(false)
608 } else {
609 Err(from_aws_sdk_error(err))
610 }
611 }
612 }
613 }
614
615 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
624 let req = self
625 .s3tables_client
626 .rename_table()
627 .table_bucket_arn(self.config.table_bucket_arn.clone())
628 .namespace(src.namespace().to_url_string())
629 .name(src.name())
630 .new_namespace_name(dest.namespace().to_url_string())
631 .new_name(dest.name());
632 req.send().await.map_err(from_aws_sdk_error)?;
633 Ok(())
634 }
635
636 async fn register_table(
637 &self,
638 _table_ident: &TableIdent,
639 _metadata_location: String,
640 ) -> Result<Table> {
641 Err(Error::new(
642 ErrorKind::FeatureUnsupported,
643 "Registering a table is not supported yet",
644 ))
645 }
646
647 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
649 let table_ident = commit.identifier().clone();
650 let table_namespace = table_ident.namespace();
651 let (current_table, version_token) =
652 self.load_table_with_version_token(&table_ident).await?;
653
654 let staged_table = commit.apply(current_table)?;
655 let staged_metadata_location_str = staged_table.metadata_location_result()?;
656 let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
657
658 staged_table
659 .metadata()
660 .write_to(staged_table.file_io(), &staged_metadata_location)
661 .await?;
662
663 let builder = self
664 .s3tables_client
665 .update_table_metadata_location()
666 .table_bucket_arn(&self.config.table_bucket_arn)
667 .namespace(table_namespace.to_url_string())
668 .name(table_ident.name())
669 .version_token(version_token)
670 .metadata_location(staged_metadata_location_str);
671
672 let _ = builder.send().await.map_err(|e| {
673 let error = e.into_service_error();
674 match error {
675 UpdateTableMetadataLocationError::ConflictException(_) => Error::new(
676 ErrorKind::CatalogCommitConflicts,
677 format!("Commit conflicted for table: {table_ident}"),
678 )
679 .with_retryable(true),
680 UpdateTableMetadataLocationError::NotFoundException(_) => Error::new(
681 ErrorKind::TableNotFound,
682 format!("Table {table_ident} is not found"),
683 ),
684 _ => Error::new(
685 ErrorKind::Unexpected,
686 "Operation failed for hitting aws sdk error",
687 ),
688 }
689 .with_source(anyhow::Error::msg(format!("aws sdk error: {error:?}")))
690 })?;
691
692 Ok(staged_table)
693 }
694}
695
696pub(crate) fn from_aws_sdk_error<T>(error: aws_sdk_s3tables::error::SdkError<T>) -> Error
698where T: std::fmt::Debug {
699 Error::new(
700 ErrorKind::Unexpected,
701 format!("Operation failed for hitting aws sdk error: {error:?}"),
702 )
703}
704
705#[cfg(test)]
706mod tests {
707 use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
708 use iceberg::transaction::{ApplyTransactionAction, Transaction};
709
710 use super::*;
711
712 async fn load_s3tables_catalog_from_env() -> Result<Option<S3TablesCatalog>> {
713 let table_bucket_arn = match std::env::var("TABLE_BUCKET_ARN").ok() {
714 Some(table_bucket_arn) => table_bucket_arn,
715 None => return Ok(None),
716 };
717
718 let config = S3TablesCatalogConfig {
719 name: None,
720 table_bucket_arn,
721 endpoint_url: None,
722 client: None,
723 props: HashMap::new(),
724 };
725
726 Ok(Some(S3TablesCatalog::new(config, None).await?))
727 }
728
729 #[tokio::test]
730 async fn test_s3tables_list_namespace() {
731 let catalog = match load_s3tables_catalog_from_env().await {
732 Ok(Some(catalog)) => catalog,
733 Ok(None) => return,
734 Err(e) => panic!("Error loading catalog: {e}"),
735 };
736
737 let namespaces = catalog.list_namespaces(None).await.unwrap();
738 assert!(!namespaces.is_empty());
739 }
740
741 #[tokio::test]
742 async fn test_s3tables_list_tables() {
743 let catalog = match load_s3tables_catalog_from_env().await {
744 Ok(Some(catalog)) => catalog,
745 Ok(None) => return,
746 Err(e) => panic!("Error loading catalog: {e}"),
747 };
748
749 let tables = catalog
750 .list_tables(&NamespaceIdent::new("aws_s3_metadata".to_string()))
751 .await
752 .unwrap();
753 assert!(!tables.is_empty());
754 }
755
756 #[tokio::test]
757 async fn test_s3tables_load_table() {
758 let catalog = match load_s3tables_catalog_from_env().await {
759 Ok(Some(catalog)) => catalog,
760 Ok(None) => return,
761 Err(e) => panic!("Error loading catalog: {e}"),
762 };
763
764 let table = catalog
765 .load_table(&TableIdent::new(
766 NamespaceIdent::new("aws_s3_metadata".to_string()),
767 "query_storage_metadata".to_string(),
768 ))
769 .await
770 .unwrap();
771 println!("{table:?}");
772 }
773
774 #[tokio::test]
775 async fn test_s3tables_create_delete_namespace() {
776 let catalog = match load_s3tables_catalog_from_env().await {
777 Ok(Some(catalog)) => catalog,
778 Ok(None) => return,
779 Err(e) => panic!("Error loading catalog: {e}"),
780 };
781
782 let namespace = NamespaceIdent::new("test_s3tables_create_delete_namespace".to_string());
783 catalog
784 .create_namespace(&namespace, HashMap::new())
785 .await
786 .unwrap();
787 assert!(catalog.namespace_exists(&namespace).await.unwrap());
788 catalog.drop_namespace(&namespace).await.unwrap();
789 assert!(!catalog.namespace_exists(&namespace).await.unwrap());
790 }
791
792 #[tokio::test]
793 async fn test_s3tables_create_delete_table() {
794 let catalog = match load_s3tables_catalog_from_env().await {
795 Ok(Some(catalog)) => catalog,
796 Ok(None) => return,
797 Err(e) => panic!("Error loading catalog: {e}"),
798 };
799
800 let creation = {
801 let schema = Schema::builder()
802 .with_schema_id(0)
803 .with_fields(vec![
804 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
805 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
806 ])
807 .build()
808 .unwrap();
809 TableCreation::builder()
810 .name("test_s3tables_create_delete_table".to_string())
811 .properties(HashMap::new())
812 .schema(schema)
813 .build()
814 };
815
816 let namespace = NamespaceIdent::new("test_s3tables_create_delete_table".to_string());
817 let table_ident = TableIdent::new(
818 namespace.clone(),
819 "test_s3tables_create_delete_table".to_string(),
820 );
821 catalog.drop_namespace(&namespace).await.ok();
822 catalog.drop_table(&table_ident).await.ok();
823
824 catalog
825 .create_namespace(&namespace, HashMap::new())
826 .await
827 .unwrap();
828 catalog.create_table(&namespace, creation).await.unwrap();
829 assert!(catalog.table_exists(&table_ident).await.unwrap());
830 catalog.drop_table(&table_ident).await.unwrap();
831 assert!(!catalog.table_exists(&table_ident).await.unwrap());
832 catalog.drop_namespace(&namespace).await.unwrap();
833 }
834
835 #[tokio::test]
836 async fn test_s3tables_update_table() {
837 let catalog = match load_s3tables_catalog_from_env().await {
838 Ok(Some(catalog)) => catalog,
839 Ok(None) => return,
840 Err(e) => panic!("Error loading catalog: {e}"),
841 };
842
843 let namespace = NamespaceIdent::new("test_s3tables_update_table".to_string());
845 let table_ident =
846 TableIdent::new(namespace.clone(), "test_s3tables_update_table".to_string());
847
848 catalog.drop_table(&table_ident).await.ok();
850 catalog.drop_namespace(&namespace).await.ok();
851
852 catalog
854 .create_namespace(&namespace, HashMap::new())
855 .await
856 .unwrap();
857
858 let creation = {
859 let schema = Schema::builder()
860 .with_schema_id(0)
861 .with_fields(vec![
862 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
863 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
864 ])
865 .build()
866 .unwrap();
867 TableCreation::builder()
868 .name(table_ident.name().to_string())
869 .properties(HashMap::new())
870 .schema(schema)
871 .build()
872 };
873
874 let table = catalog.create_table(&namespace, creation).await.unwrap();
875
876 let tx = Transaction::new(&table);
878
879 let original_metadata_location = table.metadata_location();
881
882 let tx = tx
884 .update_table_properties()
885 .set("test_property".to_string(), "test_value".to_string())
886 .apply(tx)
887 .unwrap();
888
889 let updated_table = tx.commit(&catalog).await.unwrap();
891
892 assert_eq!(
894 updated_table.metadata().properties().get("test_property"),
895 Some(&"test_value".to_string())
896 );
897
898 assert_ne!(
900 updated_table.metadata_location(),
901 original_metadata_location,
902 "Metadata location should be updated after commit"
903 );
904
905 let reloaded_table = catalog.load_table(&table_ident).await.unwrap();
907
908 assert_eq!(
910 reloaded_table.metadata().properties().get("test_property"),
911 Some(&"test_value".to_string())
912 );
913 assert_eq!(
914 reloaded_table.metadata_location(),
915 updated_table.metadata_location(),
916 "Reloaded table should have the same metadata location as the updated table"
917 );
918 }
919
920 #[tokio::test]
921 async fn test_builder_load_missing_bucket_arn() {
922 let builder = S3TablesCatalogBuilder::default();
923 let result = builder.load("s3tables", HashMap::new()).await;
924
925 assert!(result.is_err());
926 if let Err(err) = result {
927 assert_eq!(err.kind(), ErrorKind::DataInvalid);
928 assert_eq!(err.message(), "Table bucket ARN is required");
929 }
930 }
931
932 #[tokio::test]
933 async fn test_builder_with_endpoint_url_ok() {
934 let builder = S3TablesCatalogBuilder::default().with_endpoint_url("http://localhost:4566");
935
936 let result = builder
937 .load(
938 "s3tables",
939 HashMap::from([
940 (
941 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
942 "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
943 ),
944 ("some_prop".to_string(), "some_value".to_string()),
945 ]),
946 )
947 .await;
948
949 assert!(result.is_ok());
950 }
951
952 #[tokio::test]
953 async fn test_builder_with_client_ok() {
954 use aws_config::BehaviorVersion;
955
956 let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
957 let client = aws_sdk_s3tables::Client::new(&sdk_config);
958
959 let builder = S3TablesCatalogBuilder::default().with_client(client);
960 let result = builder
961 .load(
962 "s3tables",
963 HashMap::from([(
964 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
965 "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
966 )]),
967 )
968 .await;
969
970 assert!(result.is_ok());
971 }
972
973 #[tokio::test]
974 async fn test_builder_with_table_bucket_arn() {
975 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
976 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
977
978 let result = builder.load("s3tables", HashMap::new()).await;
979
980 assert!(result.is_ok());
981 let catalog = result.unwrap();
982 assert_eq!(catalog.config.table_bucket_arn, test_arn);
983 }
984
985 #[tokio::test]
986 async fn test_builder_empty_table_bucket_arn_edge_cases() {
987 let mut props = HashMap::new();
988 props.insert(
989 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
990 "".to_string(),
991 );
992
993 let builder = S3TablesCatalogBuilder::default();
994 let result = builder.load("s3tables", props).await;
995
996 assert!(result.is_err());
997 if let Err(err) = result {
998 assert_eq!(err.kind(), ErrorKind::DataInvalid);
999 assert_eq!(err.message(), "Table bucket ARN is required");
1000 }
1001 }
1002
1003 #[tokio::test]
1004 async fn test_endpoint_url_property_overrides_builder_method() {
1005 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1006 let builder_endpoint = "http://localhost:4566";
1007 let property_endpoint = "http://localhost:8080";
1008
1009 let builder = S3TablesCatalogBuilder::default()
1010 .with_table_bucket_arn(test_arn)
1011 .with_endpoint_url(builder_endpoint);
1012
1013 let mut props = HashMap::new();
1014 props.insert(
1015 S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1016 property_endpoint.to_string(),
1017 );
1018
1019 let result = builder.load("s3tables", props).await;
1020
1021 assert!(result.is_ok());
1022 let catalog = result.unwrap();
1023
1024 assert_eq!(
1026 catalog.config.endpoint_url,
1027 Some(property_endpoint.to_string())
1028 );
1029 assert_ne!(
1030 catalog.config.endpoint_url,
1031 Some(builder_endpoint.to_string())
1032 );
1033 }
1034
1035 #[tokio::test]
1036 async fn test_endpoint_url_builder_method_only() {
1037 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1038 let builder_endpoint = "http://localhost:4566";
1039
1040 let builder = S3TablesCatalogBuilder::default()
1041 .with_table_bucket_arn(test_arn)
1042 .with_endpoint_url(builder_endpoint);
1043
1044 let result = builder.load("s3tables", HashMap::new()).await;
1045
1046 assert!(result.is_ok());
1047 let catalog = result.unwrap();
1048
1049 assert_eq!(
1050 catalog.config.endpoint_url,
1051 Some(builder_endpoint.to_string())
1052 );
1053 }
1054
1055 #[tokio::test]
1056 async fn test_endpoint_url_property_only() {
1057 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1058 let property_endpoint = "http://localhost:8080";
1059
1060 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1061
1062 let mut props = HashMap::new();
1063 props.insert(
1064 S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1065 property_endpoint.to_string(),
1066 );
1067
1068 let result = builder.load("s3tables", props).await;
1069
1070 assert!(result.is_ok());
1071 let catalog = result.unwrap();
1072
1073 assert_eq!(
1074 catalog.config.endpoint_url,
1075 Some(property_endpoint.to_string())
1076 );
1077 }
1078
1079 #[tokio::test]
1080 async fn test_table_bucket_arn_property_overrides_builder_method() {
1081 let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1082 let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1083
1084 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1085
1086 let mut props = HashMap::new();
1087 props.insert(
1088 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1089 property_arn.to_string(),
1090 );
1091
1092 let result = builder.load("s3tables", props).await;
1093
1094 assert!(result.is_ok());
1095 let catalog = result.unwrap();
1096
1097 assert_eq!(catalog.config.table_bucket_arn, property_arn);
1098 assert_ne!(catalog.config.table_bucket_arn, builder_arn);
1099 }
1100
1101 #[tokio::test]
1102 async fn test_table_bucket_arn_builder_method_only() {
1103 let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1104
1105 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1106
1107 let result = builder.load("s3tables", HashMap::new()).await;
1108
1109 assert!(result.is_ok());
1110 let catalog = result.unwrap();
1111
1112 assert_eq!(catalog.config.table_bucket_arn, builder_arn);
1113 }
1114
1115 #[tokio::test]
1116 async fn test_table_bucket_arn_property_only() {
1117 let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1118
1119 let builder = S3TablesCatalogBuilder::default();
1120
1121 let mut props = HashMap::new();
1122 props.insert(
1123 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1124 property_arn.to_string(),
1125 );
1126
1127 let result = builder.load("s3tables", props).await;
1128
1129 assert!(result.is_ok());
1130 let catalog = result.unwrap();
1131
1132 assert_eq!(catalog.config.table_bucket_arn, property_arn);
1133 }
1134
1135 #[tokio::test]
1136 async fn test_builder_empty_name_validation() {
1137 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1138 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1139
1140 let result = builder.load("", HashMap::new()).await;
1141
1142 assert!(result.is_err());
1143 if let Err(err) = result {
1144 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1145 assert_eq!(err.message(), "Catalog name cannot be empty");
1146 }
1147 }
1148
1149 #[tokio::test]
1150 async fn test_builder_whitespace_only_name_validation() {
1151 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1152 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1153
1154 let result = builder.load(" \t\n ", HashMap::new()).await;
1155
1156 assert!(result.is_err());
1157 if let Err(err) = result {
1158 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1159 assert_eq!(err.message(), "Catalog name cannot be empty");
1160 }
1161 }
1162
1163 #[tokio::test]
1164 async fn test_builder_name_validation_with_missing_arn() {
1165 let builder = S3TablesCatalogBuilder::default();
1166
1167 let result = builder.load("", HashMap::new()).await;
1168
1169 assert!(result.is_err());
1170 if let Err(err) = result {
1171 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1172 assert_eq!(err.message(), "Catalog name cannot be empty");
1173 }
1174 }
1175}