1use std::collections::HashMap;
19use std::future::Future;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use aws_sdk_s3tables::operation::create_table::CreateTableOutput;
25use aws_sdk_s3tables::operation::get_namespace::GetNamespaceOutput;
26use aws_sdk_s3tables::operation::get_table::GetTableOutput;
27use aws_sdk_s3tables::operation::list_tables::ListTablesOutput;
28use aws_sdk_s3tables::operation::update_table_metadata_location::UpdateTableMetadataLocationError;
29use aws_sdk_s3tables::types::OpenTableFormat;
30use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
31use iceberg::spec::{TableMetadata, TableMetadataBuilder};
32use iceberg::table::Table;
33use iceberg::{
34 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
35 Runtime, TableCommit, TableCreation, TableIdent,
36};
37use iceberg_storage_opendal::OpenDalStorageFactory;
38
39use crate::utils::create_sdk_config;
40
41pub const S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN: &str = "table_bucket_arn";
43pub const S3TABLES_CATALOG_PROP_ENDPOINT_URL: &str = "endpoint_url";
45
46#[derive(Debug)]
48struct S3TablesCatalogConfig {
49 name: Option<String>,
51 table_bucket_arn: String,
56 endpoint_url: Option<String>,
58 client: Option<aws_sdk_s3tables::Client>,
60 props: HashMap<String, String>,
67}
68
69#[derive(Debug)]
71pub struct S3TablesCatalogBuilder {
72 config: S3TablesCatalogConfig,
73 storage_factory: Option<Arc<dyn StorageFactory>>,
74 runtime: Option<Runtime>,
75}
76
77impl Default for S3TablesCatalogBuilder {
79 fn default() -> Self {
80 Self {
81 config: S3TablesCatalogConfig {
82 name: None,
83 table_bucket_arn: "".to_string(),
84 endpoint_url: None,
85 client: None,
86 props: HashMap::new(),
87 },
88 storage_factory: None,
89 runtime: None,
90 }
91 }
92}
93
94impl S3TablesCatalogBuilder {
96 pub fn with_endpoint_url(mut self, endpoint_url: impl Into<String>) -> Self {
105 self.config.endpoint_url = Some(endpoint_url.into());
106 self
107 }
108
109 pub fn with_client(mut self, client: aws_sdk_s3tables::Client) -> Self {
111 self.config.client = Some(client);
112 self
113 }
114
115 pub fn with_table_bucket_arn(mut self, table_bucket_arn: impl Into<String>) -> Self {
124 self.config.table_bucket_arn = table_bucket_arn.into();
125 self
126 }
127}
128
129impl CatalogBuilder for S3TablesCatalogBuilder {
130 type C = S3TablesCatalog;
131
132 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
133 self.storage_factory = Some(storage_factory);
134 self
135 }
136
137 fn with_runtime(mut self, runtime: Runtime) -> Self {
138 self.runtime = Some(runtime);
139 self
140 }
141
142 fn load(
143 mut self,
144 name: impl Into<String>,
145 props: HashMap<String, String>,
146 ) -> impl Future<Output = Result<Self::C>> + Send {
147 let catalog_name = name.into();
148 self.config.name = Some(catalog_name.clone());
149
150 if props.contains_key(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN) {
151 self.config.table_bucket_arn = props
152 .get(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN)
153 .cloned()
154 .unwrap_or_default();
155 }
156
157 if props.contains_key(S3TABLES_CATALOG_PROP_ENDPOINT_URL) {
158 self.config.endpoint_url = props.get(S3TABLES_CATALOG_PROP_ENDPOINT_URL).cloned();
159 }
160
161 self.config.props = props
163 .into_iter()
164 .filter(|(k, _)| {
165 k != S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN
166 && k != S3TABLES_CATALOG_PROP_ENDPOINT_URL
167 })
168 .collect();
169
170 async move {
171 if catalog_name.trim().is_empty() {
172 Err(Error::new(
173 ErrorKind::DataInvalid,
174 "Catalog name cannot be empty",
175 ))
176 } else if self.config.table_bucket_arn.is_empty() {
177 Err(Error::new(
178 ErrorKind::DataInvalid,
179 "Table bucket ARN is required",
180 ))
181 } else {
182 let runtime = match self.runtime {
183 Some(rt) => rt,
184 None => Runtime::try_current()?,
185 };
186 S3TablesCatalog::new(self.config, self.storage_factory, runtime).await
187 }
188 }
189 }
190}
191
192#[derive(Debug)]
194pub struct S3TablesCatalog {
195 config: S3TablesCatalogConfig,
196 s3tables_client: aws_sdk_s3tables::Client,
197 file_io: FileIO,
198 runtime: Runtime,
199}
200
201impl S3TablesCatalog {
202 async fn new(
204 config: S3TablesCatalogConfig,
205 storage_factory: Option<Arc<dyn StorageFactory>>,
206 runtime: Runtime,
207 ) -> Result<Self> {
208 let s3tables_client = if let Some(client) = config.client.clone() {
209 client
210 } else {
211 let aws_config = create_sdk_config(&config.props, config.endpoint_url.clone()).await;
212 aws_sdk_s3tables::Client::new(&aws_config)
213 };
214
215 let factory = storage_factory.unwrap_or_else(|| {
217 Arc::new(OpenDalStorageFactory::S3 {
218 customized_credential_load: None,
219 })
220 });
221 let file_io = FileIOBuilder::new(factory)
222 .with_props(&config.props)
223 .build();
224
225 Ok(Self {
226 config,
227 s3tables_client,
228 file_io,
229 runtime,
230 })
231 }
232
233 async fn load_table_with_version_token(
234 &self,
235 table_ident: &TableIdent,
236 ) -> Result<(Table, String)> {
237 let req = self
238 .s3tables_client
239 .get_table()
240 .table_bucket_arn(self.config.table_bucket_arn.clone())
241 .namespace(table_ident.namespace().to_url_string())
242 .name(table_ident.name());
243 let resp: GetTableOutput = req.send().await.map_err(from_aws_sdk_error)?;
244
245 let metadata_location = resp.metadata_location().ok_or_else(|| {
247 Error::new(
248 ErrorKind::Unexpected,
249 format!(
250 "Table {} does not have metadata location",
251 table_ident.name()
252 ),
253 )
254 })?;
255 let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
256
257 let table = Table::builder()
258 .identifier(table_ident.clone())
259 .metadata(metadata)
260 .metadata_location(metadata_location)
261 .file_io(self.file_io.clone())
262 .runtime(self.runtime.clone())
263 .build()?;
264 Ok((table, resp.version_token))
265 }
266}
267
268#[async_trait]
269impl Catalog for S3TablesCatalog {
270 async fn list_namespaces(
275 &self,
276 parent: Option<&NamespaceIdent>,
277 ) -> Result<Vec<NamespaceIdent>> {
278 if parent.is_some() {
279 return Ok(vec![]);
280 }
281
282 let mut result = Vec::new();
283 let mut continuation_token = None;
284 loop {
285 let mut req = self
286 .s3tables_client
287 .list_namespaces()
288 .table_bucket_arn(self.config.table_bucket_arn.clone());
289 if let Some(token) = continuation_token {
290 req = req.continuation_token(token);
291 }
292 let resp = req.send().await.map_err(from_aws_sdk_error)?;
293 for ns in resp.namespaces() {
294 result.push(NamespaceIdent::from_vec(ns.namespace().to_vec())?);
295 }
296 continuation_token = resp.continuation_token().map(|s| s.to_string());
297 if continuation_token.is_none() {
298 break;
299 }
300 }
301 Ok(result)
302 }
303
304 async fn create_namespace(
321 &self,
322 namespace: &NamespaceIdent,
323 _properties: HashMap<String, String>,
324 ) -> Result<Namespace> {
325 if self.namespace_exists(namespace).await? {
326 return Err(Error::new(
327 ErrorKind::NamespaceAlreadyExists,
328 format!("Namespace {namespace:?} already exists"),
329 ));
330 }
331
332 let req = self
333 .s3tables_client
334 .create_namespace()
335 .table_bucket_arn(self.config.table_bucket_arn.clone())
336 .namespace(namespace.to_url_string());
337 req.send().await.map_err(from_aws_sdk_error)?;
338 Ok(Namespace::with_properties(
339 namespace.clone(),
340 HashMap::new(),
341 ))
342 }
343
344 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
354 if !self.namespace_exists(namespace).await? {
355 return Err(Error::new(
356 ErrorKind::NamespaceNotFound,
357 format!("Namespace {namespace:?} does not exist"),
358 ));
359 }
360
361 let req = self
362 .s3tables_client
363 .get_namespace()
364 .table_bucket_arn(self.config.table_bucket_arn.clone())
365 .namespace(namespace.to_url_string());
366 let resp: GetNamespaceOutput = req.send().await.map_err(from_aws_sdk_error)?;
367 let properties = HashMap::new();
368 Ok(Namespace::with_properties(
369 NamespaceIdent::from_vec(resp.namespace().to_vec())?,
370 properties,
371 ))
372 }
373
374 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
387 let req = self
388 .s3tables_client
389 .get_namespace()
390 .table_bucket_arn(self.config.table_bucket_arn.clone())
391 .namespace(namespace.to_url_string());
392 match req.send().await {
393 Ok(_) => Ok(true),
394 Err(err) => {
395 if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
396 Ok(false)
397 } else {
398 Err(from_aws_sdk_error(err))
399 }
400 }
401 }
402 }
403
404 async fn update_namespace(
409 &self,
410 _namespace: &NamespaceIdent,
411 _properties: HashMap<String, String>,
412 ) -> Result<()> {
413 Err(Error::new(
414 ErrorKind::FeatureUnsupported,
415 "Update namespace is not supported for s3tables catalog",
416 ))
417 }
418
419 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
428 if !self.namespace_exists(namespace).await? {
429 return Err(Error::new(
430 ErrorKind::NamespaceNotFound,
431 format!("Namespace {namespace:?} does not exist"),
432 ));
433 }
434
435 let req = self
436 .s3tables_client
437 .delete_namespace()
438 .table_bucket_arn(self.config.table_bucket_arn.clone())
439 .namespace(namespace.to_url_string());
440 req.send().await.map_err(from_aws_sdk_error)?;
441 Ok(())
442 }
443
444 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
453 let mut result = Vec::new();
454 let mut continuation_token = None;
455 loop {
456 let mut req = self
457 .s3tables_client
458 .list_tables()
459 .table_bucket_arn(self.config.table_bucket_arn.clone())
460 .namespace(namespace.to_url_string());
461 if let Some(token) = continuation_token {
462 req = req.continuation_token(token);
463 }
464 let resp: ListTablesOutput = req.send().await.map_err(from_aws_sdk_error)?;
465 for table in resp.tables() {
466 result.push(TableIdent::new(
467 NamespaceIdent::from_vec(table.namespace().to_vec())?,
468 table.name().to_string(),
469 ));
470 }
471 continuation_token = resp.continuation_token().map(|s| s.to_string());
472 if continuation_token.is_none() {
473 break;
474 }
475 }
476 Ok(result)
477 }
478
479 async fn create_table(
494 &self,
495 namespace: &NamespaceIdent,
496 mut creation: TableCreation,
497 ) -> Result<Table> {
498 let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
499
500 let create_resp: CreateTableOutput = self
502 .s3tables_client
503 .create_table()
504 .table_bucket_arn(self.config.table_bucket_arn.clone())
505 .namespace(namespace.to_url_string())
506 .format(OpenTableFormat::Iceberg)
507 .name(table_ident.name())
508 .send()
509 .await
510 .map_err(from_aws_sdk_error)?;
511
512 let table_location = match &creation.location {
515 Some(_) => {
516 return Err(Error::new(
517 ErrorKind::DataInvalid,
518 "The location of the table is generated by s3tables catalog, can't be set by user.",
519 ));
520 }
521 None => {
522 let get_resp: GetTableOutput = self
523 .s3tables_client
524 .get_table()
525 .table_bucket_arn(self.config.table_bucket_arn.clone())
526 .namespace(namespace.to_url_string())
527 .name(table_ident.name())
528 .send()
529 .await
530 .map_err(from_aws_sdk_error)?;
531 get_resp.warehouse_location().to_string()
532 }
533 };
534
535 creation.location = Some(table_location.clone());
537 let metadata = TableMetadataBuilder::from_table_creation(creation)?
538 .build()?
539 .metadata;
540 let metadata_location = MetadataLocation::new_with_metadata(table_location, &metadata);
541 metadata.write_to(&self.file_io, &metadata_location).await?;
542
543 let metadata_location_str = metadata_location.to_string();
545 self.s3tables_client
546 .update_table_metadata_location()
547 .table_bucket_arn(self.config.table_bucket_arn.clone())
548 .namespace(namespace.to_url_string())
549 .name(table_ident.name())
550 .metadata_location(metadata_location_str.clone())
551 .version_token(create_resp.version_token())
552 .send()
553 .await
554 .map_err(from_aws_sdk_error)?;
555
556 let table = Table::builder()
557 .identifier(table_ident)
558 .metadata_location(metadata_location_str)
559 .metadata(metadata)
560 .file_io(self.file_io.clone())
561 .runtime(self.runtime.clone())
562 .build()?;
563 Ok(table)
564 }
565
566 async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
577 Ok(self.load_table_with_version_token(table_ident).await?.0)
578 }
579
580 async fn drop_table(&self, _table: &TableIdent) -> Result<()> {
584 Err(Error::new(
585 ErrorKind::FeatureUnsupported,
586 "drop_table is not supported for S3Tables; use purge_table instead",
587 ))
588 }
589
590 async fn purge_table(&self, table: &TableIdent) -> Result<()> {
592 let req = self
593 .s3tables_client
594 .delete_table()
595 .table_bucket_arn(self.config.table_bucket_arn.clone())
596 .namespace(table.namespace().to_url_string())
597 .name(table.name());
598 req.send().await.map_err(from_aws_sdk_error)?;
599 Ok(())
600 }
601
602 async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
615 let req = self
616 .s3tables_client
617 .get_table()
618 .table_bucket_arn(self.config.table_bucket_arn.clone())
619 .namespace(table_ident.namespace().to_url_string())
620 .name(table_ident.name());
621 match req.send().await {
622 Ok(_) => Ok(true),
623 Err(err) => {
624 if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
625 Ok(false)
626 } else {
627 Err(from_aws_sdk_error(err))
628 }
629 }
630 }
631 }
632
633 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
642 let req = self
643 .s3tables_client
644 .rename_table()
645 .table_bucket_arn(self.config.table_bucket_arn.clone())
646 .namespace(src.namespace().to_url_string())
647 .name(src.name())
648 .new_namespace_name(dest.namespace().to_url_string())
649 .new_name(dest.name());
650 req.send().await.map_err(from_aws_sdk_error)?;
651 Ok(())
652 }
653
654 async fn register_table(
655 &self,
656 _table_ident: &TableIdent,
657 _metadata_location: String,
658 ) -> Result<Table> {
659 Err(Error::new(
660 ErrorKind::FeatureUnsupported,
661 "Registering a table is not supported yet",
662 ))
663 }
664
665 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
667 let table_ident = commit.identifier().clone();
668 let table_namespace = table_ident.namespace();
669 let (current_table, version_token) =
670 self.load_table_with_version_token(&table_ident).await?;
671
672 let staged_table = commit.apply(current_table)?;
673 let staged_metadata_location_str = staged_table.metadata_location_result()?;
674 let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
675
676 staged_table
677 .metadata()
678 .write_to(staged_table.file_io(), &staged_metadata_location)
679 .await?;
680
681 let builder = self
682 .s3tables_client
683 .update_table_metadata_location()
684 .table_bucket_arn(&self.config.table_bucket_arn)
685 .namespace(table_namespace.to_url_string())
686 .name(table_ident.name())
687 .version_token(version_token)
688 .metadata_location(staged_metadata_location_str);
689
690 let _ = builder.send().await.map_err(|e| {
691 let error = e.into_service_error();
692 match error {
693 UpdateTableMetadataLocationError::ConflictException(_) => Error::new(
694 ErrorKind::CatalogCommitConflicts,
695 format!("Commit conflicted for table: {table_ident}"),
696 )
697 .with_retryable(true),
698 UpdateTableMetadataLocationError::NotFoundException(_) => Error::new(
699 ErrorKind::TableNotFound,
700 format!("Table {table_ident} is not found"),
701 ),
702 _ => Error::new(
703 ErrorKind::Unexpected,
704 "Operation failed for hitting aws sdk error",
705 ),
706 }
707 .with_source(anyhow::Error::msg(format!("aws sdk error: {error:?}")))
708 })?;
709
710 Ok(staged_table)
711 }
712}
713
714pub(crate) fn from_aws_sdk_error<T>(error: aws_sdk_s3tables::error::SdkError<T>) -> Error
716where T: std::fmt::Debug {
717 Error::new(
718 ErrorKind::Unexpected,
719 format!("Operation failed for hitting aws sdk error: {error:?}"),
720 )
721}
722
723#[cfg(test)]
724mod tests {
725 use futures::TryStreamExt;
726 use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
727 use iceberg::transaction::{ApplyTransactionAction, Transaction};
728
729 use super::*;
730
731 async fn load_s3tables_catalog_from_env() -> Result<Option<S3TablesCatalog>> {
732 let table_bucket_arn = match std::env::var("TABLE_BUCKET_ARN").ok() {
733 Some(table_bucket_arn) => table_bucket_arn,
734 None => return Ok(None),
735 };
736
737 let config = S3TablesCatalogConfig {
738 name: None,
739 table_bucket_arn,
740 endpoint_url: None,
741 client: None,
742 props: HashMap::new(),
743 };
744
745 Ok(Some(
746 S3TablesCatalog::new(config, None, Runtime::current()).await?,
747 ))
748 }
749
750 #[tokio::test]
751 async fn test_s3tables_list_namespace() {
752 let catalog = match load_s3tables_catalog_from_env().await {
753 Ok(Some(catalog)) => catalog,
754 Ok(None) => return,
755 Err(e) => panic!("Error loading catalog: {e}"),
756 };
757
758 let namespaces = catalog.list_namespaces(None).await.unwrap();
759 assert!(!namespaces.is_empty());
760 }
761
762 #[tokio::test]
763 async fn test_s3tables_list_tables() {
764 let catalog = match load_s3tables_catalog_from_env().await {
765 Ok(Some(catalog)) => catalog,
766 Ok(None) => return,
767 Err(e) => panic!("Error loading catalog: {e}"),
768 };
769
770 let tables = catalog
771 .list_tables(&NamespaceIdent::new("aws_s3_metadata".to_string()))
772 .await
773 .unwrap();
774 assert!(!tables.is_empty());
775 }
776
777 #[tokio::test]
778 async fn test_s3tables_load_table() {
779 let catalog = match load_s3tables_catalog_from_env().await {
780 Ok(Some(catalog)) => catalog,
781 Ok(None) => return,
782 Err(e) => panic!("Error loading catalog: {e}"),
783 };
784
785 let table = catalog
786 .load_table(&TableIdent::new(
787 NamespaceIdent::new("aws_s3_metadata".to_string()),
788 "query_storage_metadata".to_string(),
789 ))
790 .await
791 .unwrap();
792 println!("{table:?}");
793 }
794
795 #[tokio::test]
796 async fn test_s3tables_create_delete_namespace() {
797 let catalog = match load_s3tables_catalog_from_env().await {
798 Ok(Some(catalog)) => catalog,
799 Ok(None) => return,
800 Err(e) => panic!("Error loading catalog: {e}"),
801 };
802
803 let namespace = NamespaceIdent::new("test_s3tables_create_delete_namespace".to_string());
804 catalog
805 .create_namespace(&namespace, HashMap::new())
806 .await
807 .unwrap();
808 assert!(catalog.namespace_exists(&namespace).await.unwrap());
809 catalog.drop_namespace(&namespace).await.unwrap();
810 assert!(!catalog.namespace_exists(&namespace).await.unwrap());
811 }
812
813 #[tokio::test]
814 async fn test_s3tables_create_delete_table() {
815 let catalog = match load_s3tables_catalog_from_env().await {
816 Ok(Some(catalog)) => catalog,
817 Ok(None) => return,
818 Err(e) => panic!("Error loading catalog: {e}"),
819 };
820
821 let creation = {
822 let schema = Schema::builder()
823 .with_schema_id(0)
824 .with_fields(vec![
825 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
826 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
827 ])
828 .build()
829 .unwrap();
830 TableCreation::builder()
831 .name("test_s3tables_create_delete_table".to_string())
832 .properties(HashMap::new())
833 .schema(schema)
834 .build()
835 };
836
837 let namespace = NamespaceIdent::new("test_s3tables_create_delete_table".to_string());
838 let table_ident = TableIdent::new(
839 namespace.clone(),
840 "test_s3tables_create_delete_table".to_string(),
841 );
842 catalog.drop_namespace(&namespace).await.ok();
843 catalog.drop_table(&table_ident).await.ok();
844
845 catalog
846 .create_namespace(&namespace, HashMap::new())
847 .await
848 .unwrap();
849 catalog.create_table(&namespace, creation).await.unwrap();
850 assert!(catalog.table_exists(&table_ident).await.unwrap());
851 catalog.drop_table(&table_ident).await.unwrap();
852 assert!(!catalog.table_exists(&table_ident).await.unwrap());
853 catalog.drop_namespace(&namespace).await.unwrap();
854 }
855
856 #[tokio::test]
857 async fn test_s3tables_update_table() {
858 let catalog = match load_s3tables_catalog_from_env().await {
859 Ok(Some(catalog)) => catalog,
860 Ok(None) => return,
861 Err(e) => panic!("Error loading catalog: {e}"),
862 };
863
864 let namespace = NamespaceIdent::new("test_s3tables_update_table".to_string());
866 let table_ident =
867 TableIdent::new(namespace.clone(), "test_s3tables_update_table".to_string());
868
869 catalog.drop_table(&table_ident).await.ok();
871 catalog.drop_namespace(&namespace).await.ok();
872
873 catalog
875 .create_namespace(&namespace, HashMap::new())
876 .await
877 .unwrap();
878
879 let creation = {
880 let schema = Schema::builder()
881 .with_schema_id(0)
882 .with_fields(vec![
883 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
884 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
885 ])
886 .build()
887 .unwrap();
888 TableCreation::builder()
889 .name(table_ident.name().to_string())
890 .properties(HashMap::new())
891 .schema(schema)
892 .build()
893 };
894
895 let table = catalog.create_table(&namespace, creation).await.unwrap();
896
897 let tx = Transaction::new(&table);
899
900 let original_metadata_location = table.metadata_location();
902
903 let tx = tx
905 .update_table_properties()
906 .set("test_property".to_string(), "test_value".to_string())
907 .apply(tx)
908 .unwrap();
909
910 let updated_table = tx.commit(&catalog).await.unwrap();
912
913 assert_eq!(
915 updated_table.metadata().properties().get("test_property"),
916 Some(&"test_value".to_string())
917 );
918
919 assert_ne!(
921 updated_table.metadata_location(),
922 original_metadata_location,
923 "Metadata location should be updated after commit"
924 );
925
926 let reloaded_table = catalog.load_table(&table_ident).await.unwrap();
928
929 assert_eq!(
931 reloaded_table.metadata().properties().get("test_property"),
932 Some(&"test_value".to_string())
933 );
934 assert_eq!(
935 reloaded_table.metadata_location(),
936 updated_table.metadata_location(),
937 "Reloaded table should have the same metadata location as the updated table"
938 );
939 }
940
941 #[tokio::test]
942 async fn test_builder_load_missing_bucket_arn() {
943 let builder = S3TablesCatalogBuilder::default();
944 let result = builder.load("s3tables", HashMap::new()).await;
945
946 assert!(result.is_err());
947 if let Err(err) = result {
948 assert_eq!(err.kind(), ErrorKind::DataInvalid);
949 assert_eq!(err.message(), "Table bucket ARN is required");
950 }
951 }
952
953 #[tokio::test]
954 async fn test_builder_with_endpoint_url_ok() {
955 let builder = S3TablesCatalogBuilder::default().with_endpoint_url("http://localhost:4566");
956
957 let result = builder
958 .load(
959 "s3tables",
960 HashMap::from([
961 (
962 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
963 "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
964 ),
965 ("some_prop".to_string(), "some_value".to_string()),
966 ]),
967 )
968 .await;
969
970 assert!(result.is_ok());
971 }
972
973 #[tokio::test]
974 async fn test_builder_with_client_ok() {
975 use aws_config::BehaviorVersion;
976
977 let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
978 let client = aws_sdk_s3tables::Client::new(&sdk_config);
979
980 let builder = S3TablesCatalogBuilder::default().with_client(client);
981 let result = builder
982 .load(
983 "s3tables",
984 HashMap::from([(
985 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
986 "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
987 )]),
988 )
989 .await;
990
991 assert!(result.is_ok());
992 }
993
994 #[tokio::test]
995 async fn test_builder_with_table_bucket_arn() {
996 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
997 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
998
999 let result = builder.load("s3tables", HashMap::new()).await;
1000
1001 assert!(result.is_ok());
1002 let catalog = result.unwrap();
1003 assert_eq!(catalog.config.table_bucket_arn, test_arn);
1004 }
1005
1006 #[tokio::test]
1007 async fn test_builder_empty_table_bucket_arn_edge_cases() {
1008 let mut props = HashMap::new();
1009 props.insert(
1010 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1011 "".to_string(),
1012 );
1013
1014 let builder = S3TablesCatalogBuilder::default();
1015 let result = builder.load("s3tables", props).await;
1016
1017 assert!(result.is_err());
1018 if let Err(err) = result {
1019 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1020 assert_eq!(err.message(), "Table bucket ARN is required");
1021 }
1022 }
1023
1024 #[tokio::test]
1025 async fn test_endpoint_url_property_overrides_builder_method() {
1026 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1027 let builder_endpoint = "http://localhost:4566";
1028 let property_endpoint = "http://localhost:8080";
1029
1030 let builder = S3TablesCatalogBuilder::default()
1031 .with_table_bucket_arn(test_arn)
1032 .with_endpoint_url(builder_endpoint);
1033
1034 let mut props = HashMap::new();
1035 props.insert(
1036 S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1037 property_endpoint.to_string(),
1038 );
1039
1040 let result = builder.load("s3tables", props).await;
1041
1042 assert!(result.is_ok());
1043 let catalog = result.unwrap();
1044
1045 assert_eq!(
1047 catalog.config.endpoint_url,
1048 Some(property_endpoint.to_string())
1049 );
1050 assert_ne!(
1051 catalog.config.endpoint_url,
1052 Some(builder_endpoint.to_string())
1053 );
1054 }
1055
1056 #[tokio::test]
1057 async fn test_endpoint_url_builder_method_only() {
1058 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1059 let builder_endpoint = "http://localhost:4566";
1060
1061 let builder = S3TablesCatalogBuilder::default()
1062 .with_table_bucket_arn(test_arn)
1063 .with_endpoint_url(builder_endpoint);
1064
1065 let result = builder.load("s3tables", HashMap::new()).await;
1066
1067 assert!(result.is_ok());
1068 let catalog = result.unwrap();
1069
1070 assert_eq!(
1071 catalog.config.endpoint_url,
1072 Some(builder_endpoint.to_string())
1073 );
1074 }
1075
1076 #[tokio::test]
1077 async fn test_endpoint_url_property_only() {
1078 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1079 let property_endpoint = "http://localhost:8080";
1080
1081 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1082
1083 let mut props = HashMap::new();
1084 props.insert(
1085 S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1086 property_endpoint.to_string(),
1087 );
1088
1089 let result = builder.load("s3tables", props).await;
1090
1091 assert!(result.is_ok());
1092 let catalog = result.unwrap();
1093
1094 assert_eq!(
1095 catalog.config.endpoint_url,
1096 Some(property_endpoint.to_string())
1097 );
1098 }
1099
1100 #[tokio::test]
1101 async fn test_table_bucket_arn_property_overrides_builder_method() {
1102 let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1103 let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1104
1105 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1106
1107 let mut props = HashMap::new();
1108 props.insert(
1109 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1110 property_arn.to_string(),
1111 );
1112
1113 let result = builder.load("s3tables", props).await;
1114
1115 assert!(result.is_ok());
1116 let catalog = result.unwrap();
1117
1118 assert_eq!(catalog.config.table_bucket_arn, property_arn);
1119 assert_ne!(catalog.config.table_bucket_arn, builder_arn);
1120 }
1121
1122 #[tokio::test]
1123 async fn test_table_bucket_arn_builder_method_only() {
1124 let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1125
1126 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1127
1128 let result = builder.load("s3tables", HashMap::new()).await;
1129
1130 assert!(result.is_ok());
1131 let catalog = result.unwrap();
1132
1133 assert_eq!(catalog.config.table_bucket_arn, builder_arn);
1134 }
1135
1136 #[tokio::test]
1137 async fn test_table_bucket_arn_property_only() {
1138 let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1139
1140 let builder = S3TablesCatalogBuilder::default();
1141
1142 let mut props = HashMap::new();
1143 props.insert(
1144 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1145 property_arn.to_string(),
1146 );
1147
1148 let result = builder.load("s3tables", props).await;
1149
1150 assert!(result.is_ok());
1151 let catalog = result.unwrap();
1152
1153 assert_eq!(catalog.config.table_bucket_arn, property_arn);
1154 }
1155
1156 #[tokio::test]
1157 async fn test_builder_empty_name_validation() {
1158 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1159 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1160
1161 let result = builder.load("", HashMap::new()).await;
1162
1163 assert!(result.is_err());
1164 if let Err(err) = result {
1165 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1166 assert_eq!(err.message(), "Catalog name cannot be empty");
1167 }
1168 }
1169
1170 #[tokio::test]
1171 async fn test_builder_whitespace_only_name_validation() {
1172 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1173 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1174
1175 let result = builder.load(" \t\n ", HashMap::new()).await;
1176
1177 assert!(result.is_err());
1178 if let Err(err) = result {
1179 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1180 assert_eq!(err.message(), "Catalog name cannot be empty");
1181 }
1182 }
1183
1184 #[tokio::test]
1185 async fn test_builder_name_validation_with_missing_arn() {
1186 let builder = S3TablesCatalogBuilder::default();
1187
1188 let result = builder.load("", HashMap::new()).await;
1189
1190 assert!(result.is_err());
1191 if let Err(err) = result {
1192 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1193 assert_eq!(err.message(), "Catalog name cannot be empty");
1194 }
1195 }
1196
1197 #[tokio::test]
1199 async fn test_s3tables_create_table_write_load_table_read() {
1200 use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
1201 use iceberg::writer::file_writer::ParquetWriterBuilder;
1202 use iceberg::writer::file_writer::location_generator::{
1203 DefaultFileNameGenerator, DefaultLocationGenerator,
1204 };
1205 use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
1206 use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
1207
1208 let catalog = match load_s3tables_catalog_from_env().await {
1209 Ok(Some(c)) => c,
1210 Ok(None) => return,
1211 Err(e) => panic!("Error loading catalog: {e}"),
1212 };
1213
1214 let ns = NamespaceIdent::new(format!("test_rw_{}", uuid::Uuid::new_v4().simple()));
1215 catalog.create_namespace(&ns, HashMap::new()).await.unwrap();
1216
1217 let table_name = String::from("table");
1218
1219 let schema = Schema::builder()
1220 .with_fields(vec![
1221 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
1222 ])
1223 .build()
1224 .unwrap();
1225 let creation = TableCreation::builder()
1226 .name(table_name.clone())
1227 .schema(schema)
1228 .build();
1229
1230 let table = catalog.create_table(&ns, creation).await.unwrap();
1231
1232 let arrow_schema: Arc<arrow_schema::Schema> = Arc::new(
1234 table
1235 .metadata()
1236 .current_schema()
1237 .as_ref()
1238 .try_into()
1239 .unwrap(),
1240 );
1241 let batch = arrow_array::RecordBatch::try_new(arrow_schema, vec![Arc::new(
1242 arrow_array::Int32Array::from(vec![42]),
1243 )])
1244 .unwrap();
1245
1246 let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
1248 let file_name_generator = DefaultFileNameGenerator::new(
1249 "test".to_string(),
1250 None,
1251 iceberg::spec::DataFileFormat::Parquet,
1252 );
1253 let parquet_writer_builder = ParquetWriterBuilder::new(
1254 parquet::file::properties::WriterProperties::default(),
1255 table.metadata().current_schema().clone(),
1256 );
1257 let rw = RollingFileWriterBuilder::new_with_default_file_size(
1258 parquet_writer_builder,
1259 table.file_io().clone(),
1260 location_generator,
1261 file_name_generator,
1262 );
1263 let mut writer = DataFileWriterBuilder::new(rw).build(None).await.unwrap();
1264 writer.write(batch.clone()).await.unwrap();
1265 let data_files = writer.close().await.unwrap();
1266
1267 let tx = Transaction::new(&table);
1268 let tx = tx
1269 .fast_append()
1270 .add_data_files(data_files)
1271 .apply(tx)
1272 .unwrap();
1273 tx.commit(&catalog).await.unwrap();
1274
1275 let table_ident = TableIdent::new(ns.clone(), table_name.clone());
1277 let reloaded = catalog.load_table(&table_ident).await.unwrap();
1278 let batches: Vec<arrow_array::RecordBatch> = reloaded
1279 .scan()
1280 .select_all()
1281 .build()
1282 .expect("scan to be valid (snapshot exists, schema is OK)")
1283 .to_arrow()
1284 .await
1285 .expect("scan tasks should be OK")
1286 .try_collect()
1287 .await
1288 .expect("scan should complete successfully");
1289
1290 assert_eq!(batches.len(), 1);
1291 assert_eq!(
1292 batches[0], batch,
1293 "read records should match records written earlier"
1294 );
1295
1296 catalog.purge_table(&table_ident).await.ok();
1298 catalog.drop_namespace(&ns).await.ok();
1299 }
1300}