1use std::collections::HashMap;
19use std::future::Future;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use aws_sdk_s3tables::operation::create_table::CreateTableOutput;
25use aws_sdk_s3tables::operation::get_namespace::GetNamespaceOutput;
26use aws_sdk_s3tables::operation::get_table::GetTableOutput;
27use aws_sdk_s3tables::operation::list_tables::ListTablesOutput;
28use aws_sdk_s3tables::operation::update_table_metadata_location::UpdateTableMetadataLocationError;
29use aws_sdk_s3tables::types::OpenTableFormat;
30use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
31use iceberg::spec::{TableMetadata, TableMetadataBuilder};
32use iceberg::table::Table;
33use iceberg::{
34 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
35 TableCommit, TableCreation, TableIdent,
36};
37use iceberg_storage_opendal::OpenDalStorageFactory;
38
39use crate::utils::create_sdk_config;
40
41pub const S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN: &str = "table_bucket_arn";
43pub const S3TABLES_CATALOG_PROP_ENDPOINT_URL: &str = "endpoint_url";
45
46#[derive(Debug)]
48struct S3TablesCatalogConfig {
49 name: Option<String>,
51 table_bucket_arn: String,
56 endpoint_url: Option<String>,
58 client: Option<aws_sdk_s3tables::Client>,
60 props: HashMap<String, String>,
67}
68
69#[derive(Debug)]
71pub struct S3TablesCatalogBuilder {
72 config: S3TablesCatalogConfig,
73 storage_factory: Option<Arc<dyn StorageFactory>>,
74}
75
76impl Default for S3TablesCatalogBuilder {
78 fn default() -> Self {
79 Self {
80 config: S3TablesCatalogConfig {
81 name: None,
82 table_bucket_arn: "".to_string(),
83 endpoint_url: None,
84 client: None,
85 props: HashMap::new(),
86 },
87 storage_factory: None,
88 }
89 }
90}
91
92impl S3TablesCatalogBuilder {
94 pub fn with_endpoint_url(mut self, endpoint_url: impl Into<String>) -> Self {
103 self.config.endpoint_url = Some(endpoint_url.into());
104 self
105 }
106
107 pub fn with_client(mut self, client: aws_sdk_s3tables::Client) -> Self {
109 self.config.client = Some(client);
110 self
111 }
112
113 pub fn with_table_bucket_arn(mut self, table_bucket_arn: impl Into<String>) -> Self {
122 self.config.table_bucket_arn = table_bucket_arn.into();
123 self
124 }
125}
126
127impl CatalogBuilder for S3TablesCatalogBuilder {
128 type C = S3TablesCatalog;
129
130 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
131 self.storage_factory = Some(storage_factory);
132 self
133 }
134
135 fn load(
136 mut self,
137 name: impl Into<String>,
138 props: HashMap<String, String>,
139 ) -> impl Future<Output = Result<Self::C>> + Send {
140 let catalog_name = name.into();
141 self.config.name = Some(catalog_name.clone());
142
143 if props.contains_key(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN) {
144 self.config.table_bucket_arn = props
145 .get(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN)
146 .cloned()
147 .unwrap_or_default();
148 }
149
150 if props.contains_key(S3TABLES_CATALOG_PROP_ENDPOINT_URL) {
151 self.config.endpoint_url = props.get(S3TABLES_CATALOG_PROP_ENDPOINT_URL).cloned();
152 }
153
154 self.config.props = props
156 .into_iter()
157 .filter(|(k, _)| {
158 k != S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN
159 && k != S3TABLES_CATALOG_PROP_ENDPOINT_URL
160 })
161 .collect();
162
163 async move {
164 if catalog_name.trim().is_empty() {
165 Err(Error::new(
166 ErrorKind::DataInvalid,
167 "Catalog name cannot be empty",
168 ))
169 } else if self.config.table_bucket_arn.is_empty() {
170 Err(Error::new(
171 ErrorKind::DataInvalid,
172 "Table bucket ARN is required",
173 ))
174 } else {
175 S3TablesCatalog::new(self.config, self.storage_factory).await
176 }
177 }
178 }
179}
180
181#[derive(Debug)]
183pub struct S3TablesCatalog {
184 config: S3TablesCatalogConfig,
185 s3tables_client: aws_sdk_s3tables::Client,
186 file_io: FileIO,
187}
188
189impl S3TablesCatalog {
190 async fn new(
192 config: S3TablesCatalogConfig,
193 storage_factory: Option<Arc<dyn StorageFactory>>,
194 ) -> Result<Self> {
195 let s3tables_client = if let Some(client) = config.client.clone() {
196 client
197 } else {
198 let aws_config = create_sdk_config(&config.props, config.endpoint_url.clone()).await;
199 aws_sdk_s3tables::Client::new(&aws_config)
200 };
201
202 let factory = storage_factory.unwrap_or_else(|| {
204 Arc::new(OpenDalStorageFactory::S3 {
205 configured_scheme: "s3a".to_string(),
206 customized_credential_load: None,
207 })
208 });
209 let file_io = FileIOBuilder::new(factory)
210 .with_props(&config.props)
211 .build();
212
213 Ok(Self {
214 config,
215 s3tables_client,
216 file_io,
217 })
218 }
219
220 async fn load_table_with_version_token(
221 &self,
222 table_ident: &TableIdent,
223 ) -> Result<(Table, String)> {
224 let req = self
225 .s3tables_client
226 .get_table()
227 .table_bucket_arn(self.config.table_bucket_arn.clone())
228 .namespace(table_ident.namespace().to_url_string())
229 .name(table_ident.name());
230 let resp: GetTableOutput = req.send().await.map_err(from_aws_sdk_error)?;
231
232 let metadata_location = resp.metadata_location().ok_or_else(|| {
234 Error::new(
235 ErrorKind::Unexpected,
236 format!(
237 "Table {} does not have metadata location",
238 table_ident.name()
239 ),
240 )
241 })?;
242 let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
243
244 let table = Table::builder()
245 .identifier(table_ident.clone())
246 .metadata(metadata)
247 .metadata_location(metadata_location)
248 .file_io(self.file_io.clone())
249 .build()?;
250 Ok((table, resp.version_token))
251 }
252}
253
254#[async_trait]
255impl Catalog for S3TablesCatalog {
256 async fn list_namespaces(
261 &self,
262 parent: Option<&NamespaceIdent>,
263 ) -> Result<Vec<NamespaceIdent>> {
264 if parent.is_some() {
265 return Ok(vec![]);
266 }
267
268 let mut result = Vec::new();
269 let mut continuation_token = None;
270 loop {
271 let mut req = self
272 .s3tables_client
273 .list_namespaces()
274 .table_bucket_arn(self.config.table_bucket_arn.clone());
275 if let Some(token) = continuation_token {
276 req = req.continuation_token(token);
277 }
278 let resp = req.send().await.map_err(from_aws_sdk_error)?;
279 for ns in resp.namespaces() {
280 result.push(NamespaceIdent::from_vec(ns.namespace().to_vec())?);
281 }
282 continuation_token = resp.continuation_token().map(|s| s.to_string());
283 if continuation_token.is_none() {
284 break;
285 }
286 }
287 Ok(result)
288 }
289
290 async fn create_namespace(
307 &self,
308 namespace: &NamespaceIdent,
309 _properties: HashMap<String, String>,
310 ) -> Result<Namespace> {
311 if self.namespace_exists(namespace).await? {
312 return Err(Error::new(
313 ErrorKind::NamespaceAlreadyExists,
314 format!("Namespace {namespace:?} already exists"),
315 ));
316 }
317
318 let req = self
319 .s3tables_client
320 .create_namespace()
321 .table_bucket_arn(self.config.table_bucket_arn.clone())
322 .namespace(namespace.to_url_string());
323 req.send().await.map_err(from_aws_sdk_error)?;
324 Ok(Namespace::with_properties(
325 namespace.clone(),
326 HashMap::new(),
327 ))
328 }
329
330 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
340 if !self.namespace_exists(namespace).await? {
341 return Err(Error::new(
342 ErrorKind::NamespaceNotFound,
343 format!("Namespace {namespace:?} does not exist"),
344 ));
345 }
346
347 let req = self
348 .s3tables_client
349 .get_namespace()
350 .table_bucket_arn(self.config.table_bucket_arn.clone())
351 .namespace(namespace.to_url_string());
352 let resp: GetNamespaceOutput = req.send().await.map_err(from_aws_sdk_error)?;
353 let properties = HashMap::new();
354 Ok(Namespace::with_properties(
355 NamespaceIdent::from_vec(resp.namespace().to_vec())?,
356 properties,
357 ))
358 }
359
360 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
373 let req = self
374 .s3tables_client
375 .get_namespace()
376 .table_bucket_arn(self.config.table_bucket_arn.clone())
377 .namespace(namespace.to_url_string());
378 match req.send().await {
379 Ok(_) => Ok(true),
380 Err(err) => {
381 if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
382 Ok(false)
383 } else {
384 Err(from_aws_sdk_error(err))
385 }
386 }
387 }
388 }
389
390 async fn update_namespace(
395 &self,
396 _namespace: &NamespaceIdent,
397 _properties: HashMap<String, String>,
398 ) -> Result<()> {
399 Err(Error::new(
400 ErrorKind::FeatureUnsupported,
401 "Update namespace is not supported for s3tables catalog",
402 ))
403 }
404
405 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
414 if !self.namespace_exists(namespace).await? {
415 return Err(Error::new(
416 ErrorKind::NamespaceNotFound,
417 format!("Namespace {namespace:?} does not exist"),
418 ));
419 }
420
421 let req = self
422 .s3tables_client
423 .delete_namespace()
424 .table_bucket_arn(self.config.table_bucket_arn.clone())
425 .namespace(namespace.to_url_string());
426 req.send().await.map_err(from_aws_sdk_error)?;
427 Ok(())
428 }
429
430 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
439 let mut result = Vec::new();
440 let mut continuation_token = None;
441 loop {
442 let mut req = self
443 .s3tables_client
444 .list_tables()
445 .table_bucket_arn(self.config.table_bucket_arn.clone())
446 .namespace(namespace.to_url_string());
447 if let Some(token) = continuation_token {
448 req = req.continuation_token(token);
449 }
450 let resp: ListTablesOutput = req.send().await.map_err(from_aws_sdk_error)?;
451 for table in resp.tables() {
452 result.push(TableIdent::new(
453 NamespaceIdent::from_vec(table.namespace().to_vec())?,
454 table.name().to_string(),
455 ));
456 }
457 continuation_token = resp.continuation_token().map(|s| s.to_string());
458 if continuation_token.is_none() {
459 break;
460 }
461 }
462 Ok(result)
463 }
464
465 async fn create_table(
480 &self,
481 namespace: &NamespaceIdent,
482 mut creation: TableCreation,
483 ) -> Result<Table> {
484 let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
485
486 let create_resp: CreateTableOutput = self
488 .s3tables_client
489 .create_table()
490 .table_bucket_arn(self.config.table_bucket_arn.clone())
491 .namespace(namespace.to_url_string())
492 .format(OpenTableFormat::Iceberg)
493 .name(table_ident.name())
494 .send()
495 .await
496 .map_err(from_aws_sdk_error)?;
497
498 let table_location = match &creation.location {
501 Some(_) => {
502 return Err(Error::new(
503 ErrorKind::DataInvalid,
504 "The location of the table is generated by s3tables catalog, can't be set by user.",
505 ));
506 }
507 None => {
508 let get_resp: GetTableOutput = self
509 .s3tables_client
510 .get_table()
511 .table_bucket_arn(self.config.table_bucket_arn.clone())
512 .namespace(namespace.to_url_string())
513 .name(table_ident.name())
514 .send()
515 .await
516 .map_err(from_aws_sdk_error)?;
517 get_resp.warehouse_location().to_string()
518 }
519 };
520
521 creation.location = Some(table_location.clone());
523 let metadata = TableMetadataBuilder::from_table_creation(creation)?
524 .build()?
525 .metadata;
526 let metadata_location = MetadataLocation::new_with_metadata(table_location, &metadata);
527 metadata.write_to(&self.file_io, &metadata_location).await?;
528
529 let metadata_location_str = metadata_location.to_string();
531 self.s3tables_client
532 .update_table_metadata_location()
533 .table_bucket_arn(self.config.table_bucket_arn.clone())
534 .namespace(namespace.to_url_string())
535 .name(table_ident.name())
536 .metadata_location(metadata_location_str.clone())
537 .version_token(create_resp.version_token())
538 .send()
539 .await
540 .map_err(from_aws_sdk_error)?;
541
542 let table = Table::builder()
543 .identifier(table_ident)
544 .metadata_location(metadata_location_str)
545 .metadata(metadata)
546 .file_io(self.file_io.clone())
547 .build()?;
548 Ok(table)
549 }
550
551 async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
562 Ok(self.load_table_with_version_token(table_ident).await?.0)
563 }
564
565 async fn drop_table(&self, _table: &TableIdent) -> Result<()> {
569 Err(Error::new(
570 ErrorKind::FeatureUnsupported,
571 "drop_table is not supported for S3Tables; use purge_table instead",
572 ))
573 }
574
575 async fn purge_table(&self, table: &TableIdent) -> Result<()> {
577 let req = self
578 .s3tables_client
579 .delete_table()
580 .table_bucket_arn(self.config.table_bucket_arn.clone())
581 .namespace(table.namespace().to_url_string())
582 .name(table.name());
583 req.send().await.map_err(from_aws_sdk_error)?;
584 Ok(())
585 }
586
587 async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
600 let req = self
601 .s3tables_client
602 .get_table()
603 .table_bucket_arn(self.config.table_bucket_arn.clone())
604 .namespace(table_ident.namespace().to_url_string())
605 .name(table_ident.name());
606 match req.send().await {
607 Ok(_) => Ok(true),
608 Err(err) => {
609 if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
610 Ok(false)
611 } else {
612 Err(from_aws_sdk_error(err))
613 }
614 }
615 }
616 }
617
618 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
627 let req = self
628 .s3tables_client
629 .rename_table()
630 .table_bucket_arn(self.config.table_bucket_arn.clone())
631 .namespace(src.namespace().to_url_string())
632 .name(src.name())
633 .new_namespace_name(dest.namespace().to_url_string())
634 .new_name(dest.name());
635 req.send().await.map_err(from_aws_sdk_error)?;
636 Ok(())
637 }
638
639 async fn register_table(
640 &self,
641 _table_ident: &TableIdent,
642 _metadata_location: String,
643 ) -> Result<Table> {
644 Err(Error::new(
645 ErrorKind::FeatureUnsupported,
646 "Registering a table is not supported yet",
647 ))
648 }
649
650 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
652 let table_ident = commit.identifier().clone();
653 let table_namespace = table_ident.namespace();
654 let (current_table, version_token) =
655 self.load_table_with_version_token(&table_ident).await?;
656
657 let staged_table = commit.apply(current_table)?;
658 let staged_metadata_location_str = staged_table.metadata_location_result()?;
659 let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
660
661 staged_table
662 .metadata()
663 .write_to(staged_table.file_io(), &staged_metadata_location)
664 .await?;
665
666 let builder = self
667 .s3tables_client
668 .update_table_metadata_location()
669 .table_bucket_arn(&self.config.table_bucket_arn)
670 .namespace(table_namespace.to_url_string())
671 .name(table_ident.name())
672 .version_token(version_token)
673 .metadata_location(staged_metadata_location_str);
674
675 let _ = builder.send().await.map_err(|e| {
676 let error = e.into_service_error();
677 match error {
678 UpdateTableMetadataLocationError::ConflictException(_) => Error::new(
679 ErrorKind::CatalogCommitConflicts,
680 format!("Commit conflicted for table: {table_ident}"),
681 )
682 .with_retryable(true),
683 UpdateTableMetadataLocationError::NotFoundException(_) => Error::new(
684 ErrorKind::TableNotFound,
685 format!("Table {table_ident} is not found"),
686 ),
687 _ => Error::new(
688 ErrorKind::Unexpected,
689 "Operation failed for hitting aws sdk error",
690 ),
691 }
692 .with_source(anyhow::Error::msg(format!("aws sdk error: {error:?}")))
693 })?;
694
695 Ok(staged_table)
696 }
697}
698
699pub(crate) fn from_aws_sdk_error<T>(error: aws_sdk_s3tables::error::SdkError<T>) -> Error
701where T: std::fmt::Debug {
702 Error::new(
703 ErrorKind::Unexpected,
704 format!("Operation failed for hitting aws sdk error: {error:?}"),
705 )
706}
707
708#[cfg(test)]
709mod tests {
710 use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
711 use iceberg::transaction::{ApplyTransactionAction, Transaction};
712
713 use super::*;
714
715 async fn load_s3tables_catalog_from_env() -> Result<Option<S3TablesCatalog>> {
716 let table_bucket_arn = match std::env::var("TABLE_BUCKET_ARN").ok() {
717 Some(table_bucket_arn) => table_bucket_arn,
718 None => return Ok(None),
719 };
720
721 let config = S3TablesCatalogConfig {
722 name: None,
723 table_bucket_arn,
724 endpoint_url: None,
725 client: None,
726 props: HashMap::new(),
727 };
728
729 Ok(Some(S3TablesCatalog::new(config, None).await?))
730 }
731
732 #[tokio::test]
733 async fn test_s3tables_list_namespace() {
734 let catalog = match load_s3tables_catalog_from_env().await {
735 Ok(Some(catalog)) => catalog,
736 Ok(None) => return,
737 Err(e) => panic!("Error loading catalog: {e}"),
738 };
739
740 let namespaces = catalog.list_namespaces(None).await.unwrap();
741 assert!(!namespaces.is_empty());
742 }
743
744 #[tokio::test]
745 async fn test_s3tables_list_tables() {
746 let catalog = match load_s3tables_catalog_from_env().await {
747 Ok(Some(catalog)) => catalog,
748 Ok(None) => return,
749 Err(e) => panic!("Error loading catalog: {e}"),
750 };
751
752 let tables = catalog
753 .list_tables(&NamespaceIdent::new("aws_s3_metadata".to_string()))
754 .await
755 .unwrap();
756 assert!(!tables.is_empty());
757 }
758
759 #[tokio::test]
760 async fn test_s3tables_load_table() {
761 let catalog = match load_s3tables_catalog_from_env().await {
762 Ok(Some(catalog)) => catalog,
763 Ok(None) => return,
764 Err(e) => panic!("Error loading catalog: {e}"),
765 };
766
767 let table = catalog
768 .load_table(&TableIdent::new(
769 NamespaceIdent::new("aws_s3_metadata".to_string()),
770 "query_storage_metadata".to_string(),
771 ))
772 .await
773 .unwrap();
774 println!("{table:?}");
775 }
776
777 #[tokio::test]
778 async fn test_s3tables_create_delete_namespace() {
779 let catalog = match load_s3tables_catalog_from_env().await {
780 Ok(Some(catalog)) => catalog,
781 Ok(None) => return,
782 Err(e) => panic!("Error loading catalog: {e}"),
783 };
784
785 let namespace = NamespaceIdent::new("test_s3tables_create_delete_namespace".to_string());
786 catalog
787 .create_namespace(&namespace, HashMap::new())
788 .await
789 .unwrap();
790 assert!(catalog.namespace_exists(&namespace).await.unwrap());
791 catalog.drop_namespace(&namespace).await.unwrap();
792 assert!(!catalog.namespace_exists(&namespace).await.unwrap());
793 }
794
795 #[tokio::test]
796 async fn test_s3tables_create_delete_table() {
797 let catalog = match load_s3tables_catalog_from_env().await {
798 Ok(Some(catalog)) => catalog,
799 Ok(None) => return,
800 Err(e) => panic!("Error loading catalog: {e}"),
801 };
802
803 let creation = {
804 let schema = Schema::builder()
805 .with_schema_id(0)
806 .with_fields(vec![
807 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
808 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
809 ])
810 .build()
811 .unwrap();
812 TableCreation::builder()
813 .name("test_s3tables_create_delete_table".to_string())
814 .properties(HashMap::new())
815 .schema(schema)
816 .build()
817 };
818
819 let namespace = NamespaceIdent::new("test_s3tables_create_delete_table".to_string());
820 let table_ident = TableIdent::new(
821 namespace.clone(),
822 "test_s3tables_create_delete_table".to_string(),
823 );
824 catalog.drop_namespace(&namespace).await.ok();
825 catalog.drop_table(&table_ident).await.ok();
826
827 catalog
828 .create_namespace(&namespace, HashMap::new())
829 .await
830 .unwrap();
831 catalog.create_table(&namespace, creation).await.unwrap();
832 assert!(catalog.table_exists(&table_ident).await.unwrap());
833 catalog.drop_table(&table_ident).await.unwrap();
834 assert!(!catalog.table_exists(&table_ident).await.unwrap());
835 catalog.drop_namespace(&namespace).await.unwrap();
836 }
837
838 #[tokio::test]
839 async fn test_s3tables_update_table() {
840 let catalog = match load_s3tables_catalog_from_env().await {
841 Ok(Some(catalog)) => catalog,
842 Ok(None) => return,
843 Err(e) => panic!("Error loading catalog: {e}"),
844 };
845
846 let namespace = NamespaceIdent::new("test_s3tables_update_table".to_string());
848 let table_ident =
849 TableIdent::new(namespace.clone(), "test_s3tables_update_table".to_string());
850
851 catalog.drop_table(&table_ident).await.ok();
853 catalog.drop_namespace(&namespace).await.ok();
854
855 catalog
857 .create_namespace(&namespace, HashMap::new())
858 .await
859 .unwrap();
860
861 let creation = {
862 let schema = Schema::builder()
863 .with_schema_id(0)
864 .with_fields(vec![
865 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
866 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
867 ])
868 .build()
869 .unwrap();
870 TableCreation::builder()
871 .name(table_ident.name().to_string())
872 .properties(HashMap::new())
873 .schema(schema)
874 .build()
875 };
876
877 let table = catalog.create_table(&namespace, creation).await.unwrap();
878
879 let tx = Transaction::new(&table);
881
882 let original_metadata_location = table.metadata_location();
884
885 let tx = tx
887 .update_table_properties()
888 .set("test_property".to_string(), "test_value".to_string())
889 .apply(tx)
890 .unwrap();
891
892 let updated_table = tx.commit(&catalog).await.unwrap();
894
895 assert_eq!(
897 updated_table.metadata().properties().get("test_property"),
898 Some(&"test_value".to_string())
899 );
900
901 assert_ne!(
903 updated_table.metadata_location(),
904 original_metadata_location,
905 "Metadata location should be updated after commit"
906 );
907
908 let reloaded_table = catalog.load_table(&table_ident).await.unwrap();
910
911 assert_eq!(
913 reloaded_table.metadata().properties().get("test_property"),
914 Some(&"test_value".to_string())
915 );
916 assert_eq!(
917 reloaded_table.metadata_location(),
918 updated_table.metadata_location(),
919 "Reloaded table should have the same metadata location as the updated table"
920 );
921 }
922
923 #[tokio::test]
924 async fn test_builder_load_missing_bucket_arn() {
925 let builder = S3TablesCatalogBuilder::default();
926 let result = builder.load("s3tables", HashMap::new()).await;
927
928 assert!(result.is_err());
929 if let Err(err) = result {
930 assert_eq!(err.kind(), ErrorKind::DataInvalid);
931 assert_eq!(err.message(), "Table bucket ARN is required");
932 }
933 }
934
935 #[tokio::test]
936 async fn test_builder_with_endpoint_url_ok() {
937 let builder = S3TablesCatalogBuilder::default().with_endpoint_url("http://localhost:4566");
938
939 let result = builder
940 .load(
941 "s3tables",
942 HashMap::from([
943 (
944 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
945 "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
946 ),
947 ("some_prop".to_string(), "some_value".to_string()),
948 ]),
949 )
950 .await;
951
952 assert!(result.is_ok());
953 }
954
955 #[tokio::test]
956 async fn test_builder_with_client_ok() {
957 use aws_config::BehaviorVersion;
958
959 let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
960 let client = aws_sdk_s3tables::Client::new(&sdk_config);
961
962 let builder = S3TablesCatalogBuilder::default().with_client(client);
963 let result = builder
964 .load(
965 "s3tables",
966 HashMap::from([(
967 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
968 "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
969 )]),
970 )
971 .await;
972
973 assert!(result.is_ok());
974 }
975
976 #[tokio::test]
977 async fn test_builder_with_table_bucket_arn() {
978 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
979 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
980
981 let result = builder.load("s3tables", HashMap::new()).await;
982
983 assert!(result.is_ok());
984 let catalog = result.unwrap();
985 assert_eq!(catalog.config.table_bucket_arn, test_arn);
986 }
987
988 #[tokio::test]
989 async fn test_builder_empty_table_bucket_arn_edge_cases() {
990 let mut props = HashMap::new();
991 props.insert(
992 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
993 "".to_string(),
994 );
995
996 let builder = S3TablesCatalogBuilder::default();
997 let result = builder.load("s3tables", props).await;
998
999 assert!(result.is_err());
1000 if let Err(err) = result {
1001 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1002 assert_eq!(err.message(), "Table bucket ARN is required");
1003 }
1004 }
1005
1006 #[tokio::test]
1007 async fn test_endpoint_url_property_overrides_builder_method() {
1008 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1009 let builder_endpoint = "http://localhost:4566";
1010 let property_endpoint = "http://localhost:8080";
1011
1012 let builder = S3TablesCatalogBuilder::default()
1013 .with_table_bucket_arn(test_arn)
1014 .with_endpoint_url(builder_endpoint);
1015
1016 let mut props = HashMap::new();
1017 props.insert(
1018 S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1019 property_endpoint.to_string(),
1020 );
1021
1022 let result = builder.load("s3tables", props).await;
1023
1024 assert!(result.is_ok());
1025 let catalog = result.unwrap();
1026
1027 assert_eq!(
1029 catalog.config.endpoint_url,
1030 Some(property_endpoint.to_string())
1031 );
1032 assert_ne!(
1033 catalog.config.endpoint_url,
1034 Some(builder_endpoint.to_string())
1035 );
1036 }
1037
1038 #[tokio::test]
1039 async fn test_endpoint_url_builder_method_only() {
1040 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1041 let builder_endpoint = "http://localhost:4566";
1042
1043 let builder = S3TablesCatalogBuilder::default()
1044 .with_table_bucket_arn(test_arn)
1045 .with_endpoint_url(builder_endpoint);
1046
1047 let result = builder.load("s3tables", HashMap::new()).await;
1048
1049 assert!(result.is_ok());
1050 let catalog = result.unwrap();
1051
1052 assert_eq!(
1053 catalog.config.endpoint_url,
1054 Some(builder_endpoint.to_string())
1055 );
1056 }
1057
1058 #[tokio::test]
1059 async fn test_endpoint_url_property_only() {
1060 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1061 let property_endpoint = "http://localhost:8080";
1062
1063 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1064
1065 let mut props = HashMap::new();
1066 props.insert(
1067 S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1068 property_endpoint.to_string(),
1069 );
1070
1071 let result = builder.load("s3tables", props).await;
1072
1073 assert!(result.is_ok());
1074 let catalog = result.unwrap();
1075
1076 assert_eq!(
1077 catalog.config.endpoint_url,
1078 Some(property_endpoint.to_string())
1079 );
1080 }
1081
1082 #[tokio::test]
1083 async fn test_table_bucket_arn_property_overrides_builder_method() {
1084 let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1085 let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1086
1087 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1088
1089 let mut props = HashMap::new();
1090 props.insert(
1091 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1092 property_arn.to_string(),
1093 );
1094
1095 let result = builder.load("s3tables", props).await;
1096
1097 assert!(result.is_ok());
1098 let catalog = result.unwrap();
1099
1100 assert_eq!(catalog.config.table_bucket_arn, property_arn);
1101 assert_ne!(catalog.config.table_bucket_arn, builder_arn);
1102 }
1103
1104 #[tokio::test]
1105 async fn test_table_bucket_arn_builder_method_only() {
1106 let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1107
1108 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1109
1110 let result = builder.load("s3tables", HashMap::new()).await;
1111
1112 assert!(result.is_ok());
1113 let catalog = result.unwrap();
1114
1115 assert_eq!(catalog.config.table_bucket_arn, builder_arn);
1116 }
1117
1118 #[tokio::test]
1119 async fn test_table_bucket_arn_property_only() {
1120 let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1121
1122 let builder = S3TablesCatalogBuilder::default();
1123
1124 let mut props = HashMap::new();
1125 props.insert(
1126 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1127 property_arn.to_string(),
1128 );
1129
1130 let result = builder.load("s3tables", props).await;
1131
1132 assert!(result.is_ok());
1133 let catalog = result.unwrap();
1134
1135 assert_eq!(catalog.config.table_bucket_arn, property_arn);
1136 }
1137
1138 #[tokio::test]
1139 async fn test_builder_empty_name_validation() {
1140 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1141 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1142
1143 let result = builder.load("", HashMap::new()).await;
1144
1145 assert!(result.is_err());
1146 if let Err(err) = result {
1147 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1148 assert_eq!(err.message(), "Catalog name cannot be empty");
1149 }
1150 }
1151
1152 #[tokio::test]
1153 async fn test_builder_whitespace_only_name_validation() {
1154 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1155 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1156
1157 let result = builder.load(" \t\n ", HashMap::new()).await;
1158
1159 assert!(result.is_err());
1160 if let Err(err) = result {
1161 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1162 assert_eq!(err.message(), "Catalog name cannot be empty");
1163 }
1164 }
1165
1166 #[tokio::test]
1167 async fn test_builder_name_validation_with_missing_arn() {
1168 let builder = S3TablesCatalogBuilder::default();
1169
1170 let result = builder.load("", HashMap::new()).await;
1171
1172 assert!(result.is_err());
1173 if let Err(err) = result {
1174 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1175 assert_eq!(err.message(), "Catalog name cannot be empty");
1176 }
1177 }
1178}