1use std::collections::HashMap;
19use std::future::Future;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use aws_sdk_s3tables::operation::create_table::CreateTableOutput;
25use aws_sdk_s3tables::operation::get_namespace::GetNamespaceOutput;
26use aws_sdk_s3tables::operation::get_table::GetTableOutput;
27use aws_sdk_s3tables::operation::list_tables::ListTablesOutput;
28use aws_sdk_s3tables::operation::update_table_metadata_location::UpdateTableMetadataLocationError;
29use aws_sdk_s3tables::types::OpenTableFormat;
30use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
31use iceberg::spec::{TableMetadata, TableMetadataBuilder};
32use iceberg::table::Table;
33use iceberg::{
34 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
35 TableCommit, TableCreation, TableIdent,
36};
37use iceberg_storage_opendal::OpenDalStorageFactory;
38
39use crate::utils::create_sdk_config;
40
41pub const S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN: &str = "table_bucket_arn";
43pub const S3TABLES_CATALOG_PROP_ENDPOINT_URL: &str = "endpoint_url";
45
46#[derive(Debug)]
48struct S3TablesCatalogConfig {
49 name: Option<String>,
51 table_bucket_arn: String,
56 endpoint_url: Option<String>,
58 client: Option<aws_sdk_s3tables::Client>,
60 props: HashMap<String, String>,
67}
68
69#[derive(Debug)]
71pub struct S3TablesCatalogBuilder {
72 config: S3TablesCatalogConfig,
73 storage_factory: Option<Arc<dyn StorageFactory>>,
74}
75
76impl Default for S3TablesCatalogBuilder {
78 fn default() -> Self {
79 Self {
80 config: S3TablesCatalogConfig {
81 name: None,
82 table_bucket_arn: "".to_string(),
83 endpoint_url: None,
84 client: None,
85 props: HashMap::new(),
86 },
87 storage_factory: None,
88 }
89 }
90}
91
92impl S3TablesCatalogBuilder {
94 pub fn with_endpoint_url(mut self, endpoint_url: impl Into<String>) -> Self {
103 self.config.endpoint_url = Some(endpoint_url.into());
104 self
105 }
106
107 pub fn with_client(mut self, client: aws_sdk_s3tables::Client) -> Self {
109 self.config.client = Some(client);
110 self
111 }
112
113 pub fn with_table_bucket_arn(mut self, table_bucket_arn: impl Into<String>) -> Self {
122 self.config.table_bucket_arn = table_bucket_arn.into();
123 self
124 }
125}
126
127impl CatalogBuilder for S3TablesCatalogBuilder {
128 type C = S3TablesCatalog;
129
130 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
131 self.storage_factory = Some(storage_factory);
132 self
133 }
134
135 fn load(
136 mut self,
137 name: impl Into<String>,
138 props: HashMap<String, String>,
139 ) -> impl Future<Output = Result<Self::C>> + Send {
140 let catalog_name = name.into();
141 self.config.name = Some(catalog_name.clone());
142
143 if props.contains_key(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN) {
144 self.config.table_bucket_arn = props
145 .get(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN)
146 .cloned()
147 .unwrap_or_default();
148 }
149
150 if props.contains_key(S3TABLES_CATALOG_PROP_ENDPOINT_URL) {
151 self.config.endpoint_url = props.get(S3TABLES_CATALOG_PROP_ENDPOINT_URL).cloned();
152 }
153
154 self.config.props = props
156 .into_iter()
157 .filter(|(k, _)| {
158 k != S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN
159 && k != S3TABLES_CATALOG_PROP_ENDPOINT_URL
160 })
161 .collect();
162
163 async move {
164 if catalog_name.trim().is_empty() {
165 Err(Error::new(
166 ErrorKind::DataInvalid,
167 "Catalog name cannot be empty",
168 ))
169 } else if self.config.table_bucket_arn.is_empty() {
170 Err(Error::new(
171 ErrorKind::DataInvalid,
172 "Table bucket ARN is required",
173 ))
174 } else {
175 S3TablesCatalog::new(self.config, self.storage_factory).await
176 }
177 }
178 }
179}
180
181#[derive(Debug)]
183pub struct S3TablesCatalog {
184 config: S3TablesCatalogConfig,
185 s3tables_client: aws_sdk_s3tables::Client,
186 file_io: FileIO,
187}
188
189impl S3TablesCatalog {
190 async fn new(
192 config: S3TablesCatalogConfig,
193 storage_factory: Option<Arc<dyn StorageFactory>>,
194 ) -> Result<Self> {
195 let s3tables_client = if let Some(client) = config.client.clone() {
196 client
197 } else {
198 let aws_config = create_sdk_config(&config.props, config.endpoint_url.clone()).await;
199 aws_sdk_s3tables::Client::new(&aws_config)
200 };
201
202 let factory = storage_factory.unwrap_or_else(|| {
204 Arc::new(OpenDalStorageFactory::S3 {
205 customized_credential_load: None,
206 })
207 });
208 let file_io = FileIOBuilder::new(factory)
209 .with_props(&config.props)
210 .build();
211
212 Ok(Self {
213 config,
214 s3tables_client,
215 file_io,
216 })
217 }
218
219 async fn load_table_with_version_token(
220 &self,
221 table_ident: &TableIdent,
222 ) -> Result<(Table, String)> {
223 let req = self
224 .s3tables_client
225 .get_table()
226 .table_bucket_arn(self.config.table_bucket_arn.clone())
227 .namespace(table_ident.namespace().to_url_string())
228 .name(table_ident.name());
229 let resp: GetTableOutput = req.send().await.map_err(from_aws_sdk_error)?;
230
231 let metadata_location = resp.metadata_location().ok_or_else(|| {
233 Error::new(
234 ErrorKind::Unexpected,
235 format!(
236 "Table {} does not have metadata location",
237 table_ident.name()
238 ),
239 )
240 })?;
241 let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
242
243 let table = Table::builder()
244 .identifier(table_ident.clone())
245 .metadata(metadata)
246 .metadata_location(metadata_location)
247 .file_io(self.file_io.clone())
248 .build()?;
249 Ok((table, resp.version_token))
250 }
251}
252
253#[async_trait]
254impl Catalog for S3TablesCatalog {
255 async fn list_namespaces(
260 &self,
261 parent: Option<&NamespaceIdent>,
262 ) -> Result<Vec<NamespaceIdent>> {
263 if parent.is_some() {
264 return Ok(vec![]);
265 }
266
267 let mut result = Vec::new();
268 let mut continuation_token = None;
269 loop {
270 let mut req = self
271 .s3tables_client
272 .list_namespaces()
273 .table_bucket_arn(self.config.table_bucket_arn.clone());
274 if let Some(token) = continuation_token {
275 req = req.continuation_token(token);
276 }
277 let resp = req.send().await.map_err(from_aws_sdk_error)?;
278 for ns in resp.namespaces() {
279 result.push(NamespaceIdent::from_vec(ns.namespace().to_vec())?);
280 }
281 continuation_token = resp.continuation_token().map(|s| s.to_string());
282 if continuation_token.is_none() {
283 break;
284 }
285 }
286 Ok(result)
287 }
288
289 async fn create_namespace(
306 &self,
307 namespace: &NamespaceIdent,
308 _properties: HashMap<String, String>,
309 ) -> Result<Namespace> {
310 if self.namespace_exists(namespace).await? {
311 return Err(Error::new(
312 ErrorKind::NamespaceAlreadyExists,
313 format!("Namespace {namespace:?} already exists"),
314 ));
315 }
316
317 let req = self
318 .s3tables_client
319 .create_namespace()
320 .table_bucket_arn(self.config.table_bucket_arn.clone())
321 .namespace(namespace.to_url_string());
322 req.send().await.map_err(from_aws_sdk_error)?;
323 Ok(Namespace::with_properties(
324 namespace.clone(),
325 HashMap::new(),
326 ))
327 }
328
329 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
339 if !self.namespace_exists(namespace).await? {
340 return Err(Error::new(
341 ErrorKind::NamespaceNotFound,
342 format!("Namespace {namespace:?} does not exist"),
343 ));
344 }
345
346 let req = self
347 .s3tables_client
348 .get_namespace()
349 .table_bucket_arn(self.config.table_bucket_arn.clone())
350 .namespace(namespace.to_url_string());
351 let resp: GetNamespaceOutput = req.send().await.map_err(from_aws_sdk_error)?;
352 let properties = HashMap::new();
353 Ok(Namespace::with_properties(
354 NamespaceIdent::from_vec(resp.namespace().to_vec())?,
355 properties,
356 ))
357 }
358
359 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
372 let req = self
373 .s3tables_client
374 .get_namespace()
375 .table_bucket_arn(self.config.table_bucket_arn.clone())
376 .namespace(namespace.to_url_string());
377 match req.send().await {
378 Ok(_) => Ok(true),
379 Err(err) => {
380 if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
381 Ok(false)
382 } else {
383 Err(from_aws_sdk_error(err))
384 }
385 }
386 }
387 }
388
389 async fn update_namespace(
394 &self,
395 _namespace: &NamespaceIdent,
396 _properties: HashMap<String, String>,
397 ) -> Result<()> {
398 Err(Error::new(
399 ErrorKind::FeatureUnsupported,
400 "Update namespace is not supported for s3tables catalog",
401 ))
402 }
403
404 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
413 if !self.namespace_exists(namespace).await? {
414 return Err(Error::new(
415 ErrorKind::NamespaceNotFound,
416 format!("Namespace {namespace:?} does not exist"),
417 ));
418 }
419
420 let req = self
421 .s3tables_client
422 .delete_namespace()
423 .table_bucket_arn(self.config.table_bucket_arn.clone())
424 .namespace(namespace.to_url_string());
425 req.send().await.map_err(from_aws_sdk_error)?;
426 Ok(())
427 }
428
429 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
438 let mut result = Vec::new();
439 let mut continuation_token = None;
440 loop {
441 let mut req = self
442 .s3tables_client
443 .list_tables()
444 .table_bucket_arn(self.config.table_bucket_arn.clone())
445 .namespace(namespace.to_url_string());
446 if let Some(token) = continuation_token {
447 req = req.continuation_token(token);
448 }
449 let resp: ListTablesOutput = req.send().await.map_err(from_aws_sdk_error)?;
450 for table in resp.tables() {
451 result.push(TableIdent::new(
452 NamespaceIdent::from_vec(table.namespace().to_vec())?,
453 table.name().to_string(),
454 ));
455 }
456 continuation_token = resp.continuation_token().map(|s| s.to_string());
457 if continuation_token.is_none() {
458 break;
459 }
460 }
461 Ok(result)
462 }
463
464 async fn create_table(
479 &self,
480 namespace: &NamespaceIdent,
481 mut creation: TableCreation,
482 ) -> Result<Table> {
483 let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
484
485 let create_resp: CreateTableOutput = self
487 .s3tables_client
488 .create_table()
489 .table_bucket_arn(self.config.table_bucket_arn.clone())
490 .namespace(namespace.to_url_string())
491 .format(OpenTableFormat::Iceberg)
492 .name(table_ident.name())
493 .send()
494 .await
495 .map_err(from_aws_sdk_error)?;
496
497 let table_location = match &creation.location {
500 Some(_) => {
501 return Err(Error::new(
502 ErrorKind::DataInvalid,
503 "The location of the table is generated by s3tables catalog, can't be set by user.",
504 ));
505 }
506 None => {
507 let get_resp: GetTableOutput = self
508 .s3tables_client
509 .get_table()
510 .table_bucket_arn(self.config.table_bucket_arn.clone())
511 .namespace(namespace.to_url_string())
512 .name(table_ident.name())
513 .send()
514 .await
515 .map_err(from_aws_sdk_error)?;
516 get_resp.warehouse_location().to_string()
517 }
518 };
519
520 creation.location = Some(table_location.clone());
522 let metadata = TableMetadataBuilder::from_table_creation(creation)?
523 .build()?
524 .metadata;
525 let metadata_location = MetadataLocation::new_with_metadata(table_location, &metadata);
526 metadata.write_to(&self.file_io, &metadata_location).await?;
527
528 let metadata_location_str = metadata_location.to_string();
530 self.s3tables_client
531 .update_table_metadata_location()
532 .table_bucket_arn(self.config.table_bucket_arn.clone())
533 .namespace(namespace.to_url_string())
534 .name(table_ident.name())
535 .metadata_location(metadata_location_str.clone())
536 .version_token(create_resp.version_token())
537 .send()
538 .await
539 .map_err(from_aws_sdk_error)?;
540
541 let table = Table::builder()
542 .identifier(table_ident)
543 .metadata_location(metadata_location_str)
544 .metadata(metadata)
545 .file_io(self.file_io.clone())
546 .build()?;
547 Ok(table)
548 }
549
550 async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
561 Ok(self.load_table_with_version_token(table_ident).await?.0)
562 }
563
564 async fn drop_table(&self, _table: &TableIdent) -> Result<()> {
568 Err(Error::new(
569 ErrorKind::FeatureUnsupported,
570 "drop_table is not supported for S3Tables; use purge_table instead",
571 ))
572 }
573
574 async fn purge_table(&self, table: &TableIdent) -> Result<()> {
576 let req = self
577 .s3tables_client
578 .delete_table()
579 .table_bucket_arn(self.config.table_bucket_arn.clone())
580 .namespace(table.namespace().to_url_string())
581 .name(table.name());
582 req.send().await.map_err(from_aws_sdk_error)?;
583 Ok(())
584 }
585
586 async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
599 let req = self
600 .s3tables_client
601 .get_table()
602 .table_bucket_arn(self.config.table_bucket_arn.clone())
603 .namespace(table_ident.namespace().to_url_string())
604 .name(table_ident.name());
605 match req.send().await {
606 Ok(_) => Ok(true),
607 Err(err) => {
608 if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
609 Ok(false)
610 } else {
611 Err(from_aws_sdk_error(err))
612 }
613 }
614 }
615 }
616
617 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
626 let req = self
627 .s3tables_client
628 .rename_table()
629 .table_bucket_arn(self.config.table_bucket_arn.clone())
630 .namespace(src.namespace().to_url_string())
631 .name(src.name())
632 .new_namespace_name(dest.namespace().to_url_string())
633 .new_name(dest.name());
634 req.send().await.map_err(from_aws_sdk_error)?;
635 Ok(())
636 }
637
638 async fn register_table(
639 &self,
640 _table_ident: &TableIdent,
641 _metadata_location: String,
642 ) -> Result<Table> {
643 Err(Error::new(
644 ErrorKind::FeatureUnsupported,
645 "Registering a table is not supported yet",
646 ))
647 }
648
649 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
651 let table_ident = commit.identifier().clone();
652 let table_namespace = table_ident.namespace();
653 let (current_table, version_token) =
654 self.load_table_with_version_token(&table_ident).await?;
655
656 let staged_table = commit.apply(current_table)?;
657 let staged_metadata_location_str = staged_table.metadata_location_result()?;
658 let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
659
660 staged_table
661 .metadata()
662 .write_to(staged_table.file_io(), &staged_metadata_location)
663 .await?;
664
665 let builder = self
666 .s3tables_client
667 .update_table_metadata_location()
668 .table_bucket_arn(&self.config.table_bucket_arn)
669 .namespace(table_namespace.to_url_string())
670 .name(table_ident.name())
671 .version_token(version_token)
672 .metadata_location(staged_metadata_location_str);
673
674 let _ = builder.send().await.map_err(|e| {
675 let error = e.into_service_error();
676 match error {
677 UpdateTableMetadataLocationError::ConflictException(_) => Error::new(
678 ErrorKind::CatalogCommitConflicts,
679 format!("Commit conflicted for table: {table_ident}"),
680 )
681 .with_retryable(true),
682 UpdateTableMetadataLocationError::NotFoundException(_) => Error::new(
683 ErrorKind::TableNotFound,
684 format!("Table {table_ident} is not found"),
685 ),
686 _ => Error::new(
687 ErrorKind::Unexpected,
688 "Operation failed for hitting aws sdk error",
689 ),
690 }
691 .with_source(anyhow::Error::msg(format!("aws sdk error: {error:?}")))
692 })?;
693
694 Ok(staged_table)
695 }
696}
697
698pub(crate) fn from_aws_sdk_error<T>(error: aws_sdk_s3tables::error::SdkError<T>) -> Error
700where T: std::fmt::Debug {
701 Error::new(
702 ErrorKind::Unexpected,
703 format!("Operation failed for hitting aws sdk error: {error:?}"),
704 )
705}
706
707#[cfg(test)]
708mod tests {
709 use futures::TryStreamExt;
710 use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
711 use iceberg::transaction::{ApplyTransactionAction, Transaction};
712
713 use super::*;
714
715 async fn load_s3tables_catalog_from_env() -> Result<Option<S3TablesCatalog>> {
716 let table_bucket_arn = match std::env::var("TABLE_BUCKET_ARN").ok() {
717 Some(table_bucket_arn) => table_bucket_arn,
718 None => return Ok(None),
719 };
720
721 let config = S3TablesCatalogConfig {
722 name: None,
723 table_bucket_arn,
724 endpoint_url: None,
725 client: None,
726 props: HashMap::new(),
727 };
728
729 Ok(Some(S3TablesCatalog::new(config, None).await?))
730 }
731
732 #[tokio::test]
733 async fn test_s3tables_list_namespace() {
734 let catalog = match load_s3tables_catalog_from_env().await {
735 Ok(Some(catalog)) => catalog,
736 Ok(None) => return,
737 Err(e) => panic!("Error loading catalog: {e}"),
738 };
739
740 let namespaces = catalog.list_namespaces(None).await.unwrap();
741 assert!(!namespaces.is_empty());
742 }
743
744 #[tokio::test]
745 async fn test_s3tables_list_tables() {
746 let catalog = match load_s3tables_catalog_from_env().await {
747 Ok(Some(catalog)) => catalog,
748 Ok(None) => return,
749 Err(e) => panic!("Error loading catalog: {e}"),
750 };
751
752 let tables = catalog
753 .list_tables(&NamespaceIdent::new("aws_s3_metadata".to_string()))
754 .await
755 .unwrap();
756 assert!(!tables.is_empty());
757 }
758
759 #[tokio::test]
760 async fn test_s3tables_load_table() {
761 let catalog = match load_s3tables_catalog_from_env().await {
762 Ok(Some(catalog)) => catalog,
763 Ok(None) => return,
764 Err(e) => panic!("Error loading catalog: {e}"),
765 };
766
767 let table = catalog
768 .load_table(&TableIdent::new(
769 NamespaceIdent::new("aws_s3_metadata".to_string()),
770 "query_storage_metadata".to_string(),
771 ))
772 .await
773 .unwrap();
774 println!("{table:?}");
775 }
776
777 #[tokio::test]
778 async fn test_s3tables_create_delete_namespace() {
779 let catalog = match load_s3tables_catalog_from_env().await {
780 Ok(Some(catalog)) => catalog,
781 Ok(None) => return,
782 Err(e) => panic!("Error loading catalog: {e}"),
783 };
784
785 let namespace = NamespaceIdent::new("test_s3tables_create_delete_namespace".to_string());
786 catalog
787 .create_namespace(&namespace, HashMap::new())
788 .await
789 .unwrap();
790 assert!(catalog.namespace_exists(&namespace).await.unwrap());
791 catalog.drop_namespace(&namespace).await.unwrap();
792 assert!(!catalog.namespace_exists(&namespace).await.unwrap());
793 }
794
795 #[tokio::test]
796 async fn test_s3tables_create_delete_table() {
797 let catalog = match load_s3tables_catalog_from_env().await {
798 Ok(Some(catalog)) => catalog,
799 Ok(None) => return,
800 Err(e) => panic!("Error loading catalog: {e}"),
801 };
802
803 let creation = {
804 let schema = Schema::builder()
805 .with_schema_id(0)
806 .with_fields(vec![
807 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
808 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
809 ])
810 .build()
811 .unwrap();
812 TableCreation::builder()
813 .name("test_s3tables_create_delete_table".to_string())
814 .properties(HashMap::new())
815 .schema(schema)
816 .build()
817 };
818
819 let namespace = NamespaceIdent::new("test_s3tables_create_delete_table".to_string());
820 let table_ident = TableIdent::new(
821 namespace.clone(),
822 "test_s3tables_create_delete_table".to_string(),
823 );
824 catalog.drop_namespace(&namespace).await.ok();
825 catalog.drop_table(&table_ident).await.ok();
826
827 catalog
828 .create_namespace(&namespace, HashMap::new())
829 .await
830 .unwrap();
831 catalog.create_table(&namespace, creation).await.unwrap();
832 assert!(catalog.table_exists(&table_ident).await.unwrap());
833 catalog.drop_table(&table_ident).await.unwrap();
834 assert!(!catalog.table_exists(&table_ident).await.unwrap());
835 catalog.drop_namespace(&namespace).await.unwrap();
836 }
837
838 #[tokio::test]
839 async fn test_s3tables_update_table() {
840 let catalog = match load_s3tables_catalog_from_env().await {
841 Ok(Some(catalog)) => catalog,
842 Ok(None) => return,
843 Err(e) => panic!("Error loading catalog: {e}"),
844 };
845
846 let namespace = NamespaceIdent::new("test_s3tables_update_table".to_string());
848 let table_ident =
849 TableIdent::new(namespace.clone(), "test_s3tables_update_table".to_string());
850
851 catalog.drop_table(&table_ident).await.ok();
853 catalog.drop_namespace(&namespace).await.ok();
854
855 catalog
857 .create_namespace(&namespace, HashMap::new())
858 .await
859 .unwrap();
860
861 let creation = {
862 let schema = Schema::builder()
863 .with_schema_id(0)
864 .with_fields(vec![
865 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
866 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
867 ])
868 .build()
869 .unwrap();
870 TableCreation::builder()
871 .name(table_ident.name().to_string())
872 .properties(HashMap::new())
873 .schema(schema)
874 .build()
875 };
876
877 let table = catalog.create_table(&namespace, creation).await.unwrap();
878
879 let tx = Transaction::new(&table);
881
882 let original_metadata_location = table.metadata_location();
884
885 let tx = tx
887 .update_table_properties()
888 .set("test_property".to_string(), "test_value".to_string())
889 .apply(tx)
890 .unwrap();
891
892 let updated_table = tx.commit(&catalog).await.unwrap();
894
895 assert_eq!(
897 updated_table.metadata().properties().get("test_property"),
898 Some(&"test_value".to_string())
899 );
900
901 assert_ne!(
903 updated_table.metadata_location(),
904 original_metadata_location,
905 "Metadata location should be updated after commit"
906 );
907
908 let reloaded_table = catalog.load_table(&table_ident).await.unwrap();
910
911 assert_eq!(
913 reloaded_table.metadata().properties().get("test_property"),
914 Some(&"test_value".to_string())
915 );
916 assert_eq!(
917 reloaded_table.metadata_location(),
918 updated_table.metadata_location(),
919 "Reloaded table should have the same metadata location as the updated table"
920 );
921 }
922
923 #[tokio::test]
924 async fn test_builder_load_missing_bucket_arn() {
925 let builder = S3TablesCatalogBuilder::default();
926 let result = builder.load("s3tables", HashMap::new()).await;
927
928 assert!(result.is_err());
929 if let Err(err) = result {
930 assert_eq!(err.kind(), ErrorKind::DataInvalid);
931 assert_eq!(err.message(), "Table bucket ARN is required");
932 }
933 }
934
935 #[tokio::test]
936 async fn test_builder_with_endpoint_url_ok() {
937 let builder = S3TablesCatalogBuilder::default().with_endpoint_url("http://localhost:4566");
938
939 let result = builder
940 .load(
941 "s3tables",
942 HashMap::from([
943 (
944 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
945 "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
946 ),
947 ("some_prop".to_string(), "some_value".to_string()),
948 ]),
949 )
950 .await;
951
952 assert!(result.is_ok());
953 }
954
955 #[tokio::test]
956 async fn test_builder_with_client_ok() {
957 use aws_config::BehaviorVersion;
958
959 let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
960 let client = aws_sdk_s3tables::Client::new(&sdk_config);
961
962 let builder = S3TablesCatalogBuilder::default().with_client(client);
963 let result = builder
964 .load(
965 "s3tables",
966 HashMap::from([(
967 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
968 "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
969 )]),
970 )
971 .await;
972
973 assert!(result.is_ok());
974 }
975
976 #[tokio::test]
977 async fn test_builder_with_table_bucket_arn() {
978 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
979 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
980
981 let result = builder.load("s3tables", HashMap::new()).await;
982
983 assert!(result.is_ok());
984 let catalog = result.unwrap();
985 assert_eq!(catalog.config.table_bucket_arn, test_arn);
986 }
987
988 #[tokio::test]
989 async fn test_builder_empty_table_bucket_arn_edge_cases() {
990 let mut props = HashMap::new();
991 props.insert(
992 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
993 "".to_string(),
994 );
995
996 let builder = S3TablesCatalogBuilder::default();
997 let result = builder.load("s3tables", props).await;
998
999 assert!(result.is_err());
1000 if let Err(err) = result {
1001 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1002 assert_eq!(err.message(), "Table bucket ARN is required");
1003 }
1004 }
1005
1006 #[tokio::test]
1007 async fn test_endpoint_url_property_overrides_builder_method() {
1008 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1009 let builder_endpoint = "http://localhost:4566";
1010 let property_endpoint = "http://localhost:8080";
1011
1012 let builder = S3TablesCatalogBuilder::default()
1013 .with_table_bucket_arn(test_arn)
1014 .with_endpoint_url(builder_endpoint);
1015
1016 let mut props = HashMap::new();
1017 props.insert(
1018 S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1019 property_endpoint.to_string(),
1020 );
1021
1022 let result = builder.load("s3tables", props).await;
1023
1024 assert!(result.is_ok());
1025 let catalog = result.unwrap();
1026
1027 assert_eq!(
1029 catalog.config.endpoint_url,
1030 Some(property_endpoint.to_string())
1031 );
1032 assert_ne!(
1033 catalog.config.endpoint_url,
1034 Some(builder_endpoint.to_string())
1035 );
1036 }
1037
1038 #[tokio::test]
1039 async fn test_endpoint_url_builder_method_only() {
1040 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1041 let builder_endpoint = "http://localhost:4566";
1042
1043 let builder = S3TablesCatalogBuilder::default()
1044 .with_table_bucket_arn(test_arn)
1045 .with_endpoint_url(builder_endpoint);
1046
1047 let result = builder.load("s3tables", HashMap::new()).await;
1048
1049 assert!(result.is_ok());
1050 let catalog = result.unwrap();
1051
1052 assert_eq!(
1053 catalog.config.endpoint_url,
1054 Some(builder_endpoint.to_string())
1055 );
1056 }
1057
1058 #[tokio::test]
1059 async fn test_endpoint_url_property_only() {
1060 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1061 let property_endpoint = "http://localhost:8080";
1062
1063 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1064
1065 let mut props = HashMap::new();
1066 props.insert(
1067 S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1068 property_endpoint.to_string(),
1069 );
1070
1071 let result = builder.load("s3tables", props).await;
1072
1073 assert!(result.is_ok());
1074 let catalog = result.unwrap();
1075
1076 assert_eq!(
1077 catalog.config.endpoint_url,
1078 Some(property_endpoint.to_string())
1079 );
1080 }
1081
1082 #[tokio::test]
1083 async fn test_table_bucket_arn_property_overrides_builder_method() {
1084 let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1085 let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1086
1087 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1088
1089 let mut props = HashMap::new();
1090 props.insert(
1091 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1092 property_arn.to_string(),
1093 );
1094
1095 let result = builder.load("s3tables", props).await;
1096
1097 assert!(result.is_ok());
1098 let catalog = result.unwrap();
1099
1100 assert_eq!(catalog.config.table_bucket_arn, property_arn);
1101 assert_ne!(catalog.config.table_bucket_arn, builder_arn);
1102 }
1103
1104 #[tokio::test]
1105 async fn test_table_bucket_arn_builder_method_only() {
1106 let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1107
1108 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1109
1110 let result = builder.load("s3tables", HashMap::new()).await;
1111
1112 assert!(result.is_ok());
1113 let catalog = result.unwrap();
1114
1115 assert_eq!(catalog.config.table_bucket_arn, builder_arn);
1116 }
1117
1118 #[tokio::test]
1119 async fn test_table_bucket_arn_property_only() {
1120 let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1121
1122 let builder = S3TablesCatalogBuilder::default();
1123
1124 let mut props = HashMap::new();
1125 props.insert(
1126 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1127 property_arn.to_string(),
1128 );
1129
1130 let result = builder.load("s3tables", props).await;
1131
1132 assert!(result.is_ok());
1133 let catalog = result.unwrap();
1134
1135 assert_eq!(catalog.config.table_bucket_arn, property_arn);
1136 }
1137
1138 #[tokio::test]
1139 async fn test_builder_empty_name_validation() {
1140 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1141 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1142
1143 let result = builder.load("", HashMap::new()).await;
1144
1145 assert!(result.is_err());
1146 if let Err(err) = result {
1147 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1148 assert_eq!(err.message(), "Catalog name cannot be empty");
1149 }
1150 }
1151
1152 #[tokio::test]
1153 async fn test_builder_whitespace_only_name_validation() {
1154 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1155 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1156
1157 let result = builder.load(" \t\n ", HashMap::new()).await;
1158
1159 assert!(result.is_err());
1160 if let Err(err) = result {
1161 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1162 assert_eq!(err.message(), "Catalog name cannot be empty");
1163 }
1164 }
1165
1166 #[tokio::test]
1167 async fn test_builder_name_validation_with_missing_arn() {
1168 let builder = S3TablesCatalogBuilder::default();
1169
1170 let result = builder.load("", HashMap::new()).await;
1171
1172 assert!(result.is_err());
1173 if let Err(err) = result {
1174 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1175 assert_eq!(err.message(), "Catalog name cannot be empty");
1176 }
1177 }
1178
1179 #[tokio::test]
1181 async fn test_s3tables_create_table_write_load_table_read() {
1182 use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
1183 use iceberg::writer::file_writer::ParquetWriterBuilder;
1184 use iceberg::writer::file_writer::location_generator::{
1185 DefaultFileNameGenerator, DefaultLocationGenerator,
1186 };
1187 use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
1188 use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
1189
1190 let catalog = match load_s3tables_catalog_from_env().await {
1191 Ok(Some(c)) => c,
1192 Ok(None) => return,
1193 Err(e) => panic!("Error loading catalog: {e}"),
1194 };
1195
1196 let ns = NamespaceIdent::new(format!("test_rw_{}", uuid::Uuid::new_v4().simple()));
1197 catalog.create_namespace(&ns, HashMap::new()).await.unwrap();
1198
1199 let table_name = String::from("table");
1200
1201 let schema = Schema::builder()
1202 .with_fields(vec![
1203 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
1204 ])
1205 .build()
1206 .unwrap();
1207 let creation = TableCreation::builder()
1208 .name(table_name.clone())
1209 .schema(schema)
1210 .build();
1211
1212 let table = catalog.create_table(&ns, creation).await.unwrap();
1213
1214 let arrow_schema: Arc<arrow_schema::Schema> = Arc::new(
1216 table
1217 .metadata()
1218 .current_schema()
1219 .as_ref()
1220 .try_into()
1221 .unwrap(),
1222 );
1223 let batch = arrow_array::RecordBatch::try_new(arrow_schema, vec![Arc::new(
1224 arrow_array::Int32Array::from(vec![42]),
1225 )])
1226 .unwrap();
1227
1228 let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
1230 let file_name_generator = DefaultFileNameGenerator::new(
1231 "test".to_string(),
1232 None,
1233 iceberg::spec::DataFileFormat::Parquet,
1234 );
1235 let parquet_writer_builder = ParquetWriterBuilder::new(
1236 parquet::file::properties::WriterProperties::default(),
1237 table.metadata().current_schema().clone(),
1238 );
1239 let rw = RollingFileWriterBuilder::new_with_default_file_size(
1240 parquet_writer_builder,
1241 table.file_io().clone(),
1242 location_generator,
1243 file_name_generator,
1244 );
1245 let mut writer = DataFileWriterBuilder::new(rw).build(None).await.unwrap();
1246 writer.write(batch.clone()).await.unwrap();
1247 let data_files = writer.close().await.unwrap();
1248
1249 let tx = Transaction::new(&table);
1250 let tx = tx
1251 .fast_append()
1252 .add_data_files(data_files)
1253 .apply(tx)
1254 .unwrap();
1255 tx.commit(&catalog).await.unwrap();
1256
1257 let table_ident = TableIdent::new(ns.clone(), table_name.clone());
1259 let reloaded = catalog.load_table(&table_ident).await.unwrap();
1260 let batches: Vec<arrow_array::RecordBatch> = reloaded
1261 .scan()
1262 .select_all()
1263 .build()
1264 .expect("scan to be valid (snapshot exists, schema is OK)")
1265 .to_arrow()
1266 .await
1267 .expect("scan tasks should be OK")
1268 .try_collect()
1269 .await
1270 .expect("scan should complete successfully");
1271
1272 assert_eq!(batches.len(), 1);
1273 assert_eq!(
1274 batches[0], batch,
1275 "read records should match records written earlier"
1276 );
1277
1278 catalog.purge_table(&table_ident).await.ok();
1280 catalog.drop_namespace(&ns).await.ok();
1281 }
1282}