1use std::collections::HashMap;
19use std::future::Future;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use aws_sdk_s3tables::operation::create_table::CreateTableOutput;
25use aws_sdk_s3tables::operation::get_namespace::GetNamespaceOutput;
26use aws_sdk_s3tables::operation::get_table::{GetTableError, GetTableOutput};
27use aws_sdk_s3tables::operation::list_tables::ListTablesOutput;
28use aws_sdk_s3tables::operation::update_table_metadata_location::UpdateTableMetadataLocationError;
29use aws_sdk_s3tables::types::OpenTableFormat;
30use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
31use iceberg::spec::{TableMetadata, TableMetadataBuilder};
32use iceberg::table::Table;
33use iceberg::{
34 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
35 Runtime, TableCommit, TableCreation, TableIdent,
36};
37use iceberg_storage_opendal::OpenDalStorageFactory;
38
39use crate::utils::create_sdk_config;
40
41pub const S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN: &str = "table_bucket_arn";
43pub const S3TABLES_CATALOG_PROP_ENDPOINT_URL: &str = "endpoint_url";
45
46#[derive(Debug)]
48struct S3TablesCatalogConfig {
49 name: Option<String>,
51 table_bucket_arn: String,
56 endpoint_url: Option<String>,
58 client: Option<aws_sdk_s3tables::Client>,
60 props: HashMap<String, String>,
67}
68
69#[derive(Debug)]
71pub struct S3TablesCatalogBuilder {
72 config: S3TablesCatalogConfig,
73 storage_factory: Option<Arc<dyn StorageFactory>>,
74 runtime: Option<Runtime>,
75}
76
77impl Default for S3TablesCatalogBuilder {
79 fn default() -> Self {
80 Self {
81 config: S3TablesCatalogConfig {
82 name: None,
83 table_bucket_arn: "".to_string(),
84 endpoint_url: None,
85 client: None,
86 props: HashMap::new(),
87 },
88 storage_factory: None,
89 runtime: None,
90 }
91 }
92}
93
94impl S3TablesCatalogBuilder {
96 pub fn with_endpoint_url(mut self, endpoint_url: impl Into<String>) -> Self {
105 self.config.endpoint_url = Some(endpoint_url.into());
106 self
107 }
108
109 pub fn with_client(mut self, client: aws_sdk_s3tables::Client) -> Self {
111 self.config.client = Some(client);
112 self
113 }
114
115 pub fn with_table_bucket_arn(mut self, table_bucket_arn: impl Into<String>) -> Self {
124 self.config.table_bucket_arn = table_bucket_arn.into();
125 self
126 }
127}
128
129impl CatalogBuilder for S3TablesCatalogBuilder {
130 type C = S3TablesCatalog;
131
132 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
133 self.storage_factory = Some(storage_factory);
134 self
135 }
136
137 fn with_runtime(mut self, runtime: Runtime) -> Self {
138 self.runtime = Some(runtime);
139 self
140 }
141
142 fn load(
143 mut self,
144 name: impl Into<String>,
145 props: HashMap<String, String>,
146 ) -> impl Future<Output = Result<Self::C>> + Send {
147 let catalog_name = name.into();
148 self.config.name = Some(catalog_name.clone());
149
150 if props.contains_key(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN) {
151 self.config.table_bucket_arn = props
152 .get(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN)
153 .cloned()
154 .unwrap_or_default();
155 }
156
157 if props.contains_key(S3TABLES_CATALOG_PROP_ENDPOINT_URL) {
158 self.config.endpoint_url = props.get(S3TABLES_CATALOG_PROP_ENDPOINT_URL).cloned();
159 }
160
161 self.config.props = props
163 .into_iter()
164 .filter(|(k, _)| {
165 k != S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN
166 && k != S3TABLES_CATALOG_PROP_ENDPOINT_URL
167 })
168 .collect();
169
170 async move {
171 if catalog_name.trim().is_empty() {
172 Err(Error::new(
173 ErrorKind::DataInvalid,
174 "Catalog name cannot be empty",
175 ))
176 } else if self.config.table_bucket_arn.is_empty() {
177 Err(Error::new(
178 ErrorKind::DataInvalid,
179 "Table bucket ARN is required",
180 ))
181 } else {
182 let runtime = match self.runtime {
183 Some(rt) => rt,
184 None => Runtime::try_current()?,
185 };
186 S3TablesCatalog::new(self.config, self.storage_factory, runtime).await
187 }
188 }
189 }
190}
191
192#[derive(Debug)]
194pub struct S3TablesCatalog {
195 config: S3TablesCatalogConfig,
196 s3tables_client: aws_sdk_s3tables::Client,
197 file_io: FileIO,
198 runtime: Runtime,
199}
200
201impl S3TablesCatalog {
202 async fn new(
204 config: S3TablesCatalogConfig,
205 storage_factory: Option<Arc<dyn StorageFactory>>,
206 runtime: Runtime,
207 ) -> Result<Self> {
208 let s3tables_client = if let Some(client) = config.client.clone() {
209 client
210 } else {
211 let aws_config = create_sdk_config(&config.props, config.endpoint_url.clone()).await;
212 aws_sdk_s3tables::Client::new(&aws_config)
213 };
214
215 let factory = storage_factory.unwrap_or_else(|| {
217 Arc::new(OpenDalStorageFactory::S3 {
218 customized_credential_load: None,
219 })
220 });
221 let file_io = FileIOBuilder::new(factory)
222 .with_props(&config.props)
223 .build();
224
225 Ok(Self {
226 config,
227 s3tables_client,
228 file_io,
229 runtime,
230 })
231 }
232
233 async fn load_table_with_version_token(
234 &self,
235 table_ident: &TableIdent,
236 ) -> Result<(Table, String)> {
237 let req = self
238 .s3tables_client
239 .get_table()
240 .table_bucket_arn(self.config.table_bucket_arn.clone())
241 .namespace(table_ident.namespace().to_url_string())
242 .name(table_ident.name());
243
244 let resp = req.send().await.map_err(|err| {
245 if err
246 .as_service_error()
247 .is_some_and(GetTableError::is_not_found_exception)
248 {
249 Error::new(
252 ErrorKind::TableNotFound,
253 format!("Table {table_ident} is not found, either because the namespace or table did not exist"),
254 )
255 .with_source(err)
256 } else {
257 from_aws_sdk_error(err)
258 }
259 })?;
260
261 let metadata_location = resp.metadata_location().ok_or_else(|| {
263 Error::new(
264 ErrorKind::Unexpected,
265 format!(
266 "Table {} does not have metadata location",
267 table_ident.name()
268 ),
269 )
270 })?;
271 let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
272
273 let table = Table::builder()
274 .identifier(table_ident.clone())
275 .metadata(metadata)
276 .metadata_location(metadata_location)
277 .file_io(self.file_io.clone())
278 .runtime(self.runtime.clone())
279 .build()?;
280 Ok((table, resp.version_token))
281 }
282}
283
284#[async_trait]
285impl Catalog for S3TablesCatalog {
286 async fn list_namespaces(
291 &self,
292 parent: Option<&NamespaceIdent>,
293 ) -> Result<Vec<NamespaceIdent>> {
294 if parent.is_some() {
295 return Ok(vec![]);
296 }
297
298 let mut result = Vec::new();
299 let mut continuation_token = None;
300 loop {
301 let mut req = self
302 .s3tables_client
303 .list_namespaces()
304 .table_bucket_arn(self.config.table_bucket_arn.clone());
305 if let Some(token) = continuation_token {
306 req = req.continuation_token(token);
307 }
308 let resp = req.send().await.map_err(from_aws_sdk_error)?;
309 for ns in resp.namespaces() {
310 result.push(NamespaceIdent::from_vec(ns.namespace().to_vec())?);
311 }
312 continuation_token = resp.continuation_token().map(|s| s.to_string());
313 if continuation_token.is_none() {
314 break;
315 }
316 }
317 Ok(result)
318 }
319
320 async fn create_namespace(
337 &self,
338 namespace: &NamespaceIdent,
339 _properties: HashMap<String, String>,
340 ) -> Result<Namespace> {
341 if self.namespace_exists(namespace).await? {
342 return Err(Error::new(
343 ErrorKind::NamespaceAlreadyExists,
344 format!("Namespace {namespace:?} already exists"),
345 ));
346 }
347
348 let req = self
349 .s3tables_client
350 .create_namespace()
351 .table_bucket_arn(self.config.table_bucket_arn.clone())
352 .namespace(namespace.to_url_string());
353 req.send().await.map_err(from_aws_sdk_error)?;
354 Ok(Namespace::with_properties(
355 namespace.clone(),
356 HashMap::new(),
357 ))
358 }
359
360 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
370 if !self.namespace_exists(namespace).await? {
371 return Err(Error::new(
372 ErrorKind::NamespaceNotFound,
373 format!("Namespace {namespace:?} does not exist"),
374 ));
375 }
376
377 let req = self
378 .s3tables_client
379 .get_namespace()
380 .table_bucket_arn(self.config.table_bucket_arn.clone())
381 .namespace(namespace.to_url_string());
382 let resp: GetNamespaceOutput = req.send().await.map_err(from_aws_sdk_error)?;
383 let properties = HashMap::new();
384 Ok(Namespace::with_properties(
385 NamespaceIdent::from_vec(resp.namespace().to_vec())?,
386 properties,
387 ))
388 }
389
390 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
403 let req = self
404 .s3tables_client
405 .get_namespace()
406 .table_bucket_arn(self.config.table_bucket_arn.clone())
407 .namespace(namespace.to_url_string());
408 match req.send().await {
409 Ok(_) => Ok(true),
410 Err(err) => {
411 if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
412 Ok(false)
413 } else {
414 Err(from_aws_sdk_error(err))
415 }
416 }
417 }
418 }
419
420 async fn update_namespace(
425 &self,
426 _namespace: &NamespaceIdent,
427 _properties: HashMap<String, String>,
428 ) -> Result<()> {
429 Err(Error::new(
430 ErrorKind::FeatureUnsupported,
431 "Update namespace is not supported for s3tables catalog",
432 ))
433 }
434
435 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
444 if !self.namespace_exists(namespace).await? {
445 return Err(Error::new(
446 ErrorKind::NamespaceNotFound,
447 format!("Namespace {namespace:?} does not exist"),
448 ));
449 }
450
451 let req = self
452 .s3tables_client
453 .delete_namespace()
454 .table_bucket_arn(self.config.table_bucket_arn.clone())
455 .namespace(namespace.to_url_string());
456 req.send().await.map_err(from_aws_sdk_error)?;
457 Ok(())
458 }
459
460 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
469 let mut result = Vec::new();
470 let mut continuation_token = None;
471 loop {
472 let mut req = self
473 .s3tables_client
474 .list_tables()
475 .table_bucket_arn(self.config.table_bucket_arn.clone())
476 .namespace(namespace.to_url_string());
477 if let Some(token) = continuation_token {
478 req = req.continuation_token(token);
479 }
480 let resp: ListTablesOutput = req.send().await.map_err(from_aws_sdk_error)?;
481 for table in resp.tables() {
482 result.push(TableIdent::new(
483 NamespaceIdent::from_vec(table.namespace().to_vec())?,
484 table.name().to_string(),
485 ));
486 }
487 continuation_token = resp.continuation_token().map(|s| s.to_string());
488 if continuation_token.is_none() {
489 break;
490 }
491 }
492 Ok(result)
493 }
494
495 async fn create_table(
510 &self,
511 namespace: &NamespaceIdent,
512 mut creation: TableCreation,
513 ) -> Result<Table> {
514 let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
515
516 let create_resp: CreateTableOutput = self
518 .s3tables_client
519 .create_table()
520 .table_bucket_arn(self.config.table_bucket_arn.clone())
521 .namespace(namespace.to_url_string())
522 .format(OpenTableFormat::Iceberg)
523 .name(table_ident.name())
524 .send()
525 .await
526 .map_err(from_aws_sdk_error)?;
527
528 let table_location = match &creation.location {
531 Some(_) => {
532 return Err(Error::new(
533 ErrorKind::DataInvalid,
534 "The location of the table is generated by s3tables catalog, can't be set by user.",
535 ));
536 }
537 None => {
538 let get_resp: GetTableOutput = self
539 .s3tables_client
540 .get_table()
541 .table_bucket_arn(self.config.table_bucket_arn.clone())
542 .namespace(namespace.to_url_string())
543 .name(table_ident.name())
544 .send()
545 .await
546 .map_err(from_aws_sdk_error)?;
547 get_resp.warehouse_location().to_string()
548 }
549 };
550
551 creation.location = Some(table_location.clone());
553 let metadata = TableMetadataBuilder::from_table_creation(creation)?
554 .build()?
555 .metadata;
556 let metadata_location = MetadataLocation::new_with_metadata(table_location, &metadata);
557 metadata.write_to(&self.file_io, &metadata_location).await?;
558
559 let metadata_location_str = metadata_location.to_string();
561 self.s3tables_client
562 .update_table_metadata_location()
563 .table_bucket_arn(self.config.table_bucket_arn.clone())
564 .namespace(namespace.to_url_string())
565 .name(table_ident.name())
566 .metadata_location(metadata_location_str.clone())
567 .version_token(create_resp.version_token())
568 .send()
569 .await
570 .map_err(from_aws_sdk_error)?;
571
572 let table = Table::builder()
573 .identifier(table_ident)
574 .metadata_location(metadata_location_str)
575 .metadata(metadata)
576 .file_io(self.file_io.clone())
577 .runtime(self.runtime.clone())
578 .build()?;
579 Ok(table)
580 }
581
582 async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
593 Ok(self.load_table_with_version_token(table_ident).await?.0)
594 }
595
596 async fn drop_table(&self, _table: &TableIdent) -> Result<()> {
600 Err(Error::new(
601 ErrorKind::FeatureUnsupported,
602 "drop_table is not supported for S3Tables; use purge_table instead",
603 ))
604 }
605
606 async fn purge_table(&self, table: &TableIdent) -> Result<()> {
608 let req = self
609 .s3tables_client
610 .delete_table()
611 .table_bucket_arn(self.config.table_bucket_arn.clone())
612 .namespace(table.namespace().to_url_string())
613 .name(table.name());
614 req.send().await.map_err(from_aws_sdk_error)?;
615 Ok(())
616 }
617
618 async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
631 let req = self
632 .s3tables_client
633 .get_table()
634 .table_bucket_arn(self.config.table_bucket_arn.clone())
635 .namespace(table_ident.namespace().to_url_string())
636 .name(table_ident.name());
637 match req.send().await {
638 Ok(_) => Ok(true),
639 Err(err) => {
640 if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
641 Ok(false)
642 } else {
643 Err(from_aws_sdk_error(err))
644 }
645 }
646 }
647 }
648
649 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
658 let req = self
659 .s3tables_client
660 .rename_table()
661 .table_bucket_arn(self.config.table_bucket_arn.clone())
662 .namespace(src.namespace().to_url_string())
663 .name(src.name())
664 .new_namespace_name(dest.namespace().to_url_string())
665 .new_name(dest.name());
666 req.send().await.map_err(from_aws_sdk_error)?;
667 Ok(())
668 }
669
670 async fn register_table(
671 &self,
672 _table_ident: &TableIdent,
673 _metadata_location: String,
674 ) -> Result<Table> {
675 Err(Error::new(
676 ErrorKind::FeatureUnsupported,
677 "Registering a table is not supported yet",
678 ))
679 }
680
681 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
683 let table_ident = commit.identifier().clone();
684 let table_namespace = table_ident.namespace();
685 let (current_table, version_token) =
686 self.load_table_with_version_token(&table_ident).await?;
687
688 let staged_table = commit.apply(current_table)?;
689 let staged_metadata_location_str = staged_table.metadata_location_result()?;
690 let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
691
692 staged_table
693 .metadata()
694 .write_to(staged_table.file_io(), &staged_metadata_location)
695 .await?;
696
697 let builder = self
698 .s3tables_client
699 .update_table_metadata_location()
700 .table_bucket_arn(&self.config.table_bucket_arn)
701 .namespace(table_namespace.to_url_string())
702 .name(table_ident.name())
703 .version_token(version_token)
704 .metadata_location(staged_metadata_location_str);
705
706 let _ = builder.send().await.map_err(|e| {
707 let error = e.into_service_error();
708 match error {
709 UpdateTableMetadataLocationError::ConflictException(_) => Error::new(
710 ErrorKind::CatalogCommitConflicts,
711 format!("Commit conflicted for table: {table_ident}"),
712 )
713 .with_retryable(true),
714 UpdateTableMetadataLocationError::NotFoundException(_) => Error::new(
715 ErrorKind::TableNotFound,
716 format!("Table {table_ident} is not found"),
717 ),
718 _ => Error::new(
719 ErrorKind::Unexpected,
720 "Operation failed for hitting aws sdk error",
721 ),
722 }
723 .with_source(anyhow::Error::msg(format!("aws sdk error: {error:?}")))
724 })?;
725
726 Ok(staged_table)
727 }
728}
729
730pub(crate) fn from_aws_sdk_error<T>(error: aws_sdk_s3tables::error::SdkError<T>) -> Error
732where T: std::fmt::Debug {
733 Error::new(
734 ErrorKind::Unexpected,
735 format!("Operation failed for hitting aws sdk error: {error:?}"),
736 )
737}
738
739#[cfg(test)]
740mod tests {
741 use futures::TryStreamExt;
742 use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
743 use iceberg::transaction::{ApplyTransactionAction, Transaction};
744
745 use super::*;
746
747 async fn load_s3tables_catalog_from_env() -> Result<Option<S3TablesCatalog>> {
748 let table_bucket_arn = match std::env::var("TABLE_BUCKET_ARN").ok() {
749 Some(table_bucket_arn) => table_bucket_arn,
750 None => return Ok(None),
751 };
752
753 let config = S3TablesCatalogConfig {
754 name: None,
755 table_bucket_arn,
756 endpoint_url: None,
757 client: None,
758 props: HashMap::new(),
759 };
760
761 Ok(Some(
762 S3TablesCatalog::new(config, None, Runtime::current()).await?,
763 ))
764 }
765
766 #[tokio::test]
767 async fn test_s3tables_list_namespace() {
768 let catalog = match load_s3tables_catalog_from_env().await {
769 Ok(Some(catalog)) => catalog,
770 Ok(None) => return,
771 Err(e) => panic!("Error loading catalog: {e}"),
772 };
773
774 let namespaces = catalog.list_namespaces(None).await.unwrap();
775 assert!(!namespaces.is_empty());
776 }
777
778 #[tokio::test]
779 async fn test_s3tables_list_tables() {
780 let catalog = match load_s3tables_catalog_from_env().await {
781 Ok(Some(catalog)) => catalog,
782 Ok(None) => return,
783 Err(e) => panic!("Error loading catalog: {e}"),
784 };
785
786 let tables = catalog
787 .list_tables(&NamespaceIdent::new("aws_s3_metadata".to_string()))
788 .await
789 .unwrap();
790 assert!(!tables.is_empty());
791 }
792
793 #[tokio::test]
794 async fn test_s3tables_load_table() {
795 let catalog = match load_s3tables_catalog_from_env().await {
796 Ok(Some(catalog)) => catalog,
797 Ok(None) => return,
798 Err(e) => panic!("Error loading catalog: {e}"),
799 };
800
801 let _table = catalog
802 .load_table(&TableIdent::new(
803 NamespaceIdent::new("aws_s3_metadata".to_string()),
804 "query_storage_metadata".to_string(),
805 ))
806 .await
807 .expect("table that exists should be loaded");
808
809 let load_table_err = catalog
810 .load_table(&TableIdent::new(
811 NamespaceIdent::new("not_a_namespace".to_string()),
812 "not_a_table_name".to_string(),
813 ))
814 .await
815 .expect_err("loading a table that does not exist should fail");
816 assert_eq!(
817 load_table_err.kind(),
818 ErrorKind::TableNotFound,
819 "must return table not found error for non-existent table"
820 );
821 }
822
823 #[tokio::test]
824 async fn test_s3tables_create_delete_namespace() {
825 let catalog = match load_s3tables_catalog_from_env().await {
826 Ok(Some(catalog)) => catalog,
827 Ok(None) => return,
828 Err(e) => panic!("Error loading catalog: {e}"),
829 };
830
831 let namespace = NamespaceIdent::new("test_s3tables_create_delete_namespace".to_string());
832 catalog
833 .create_namespace(&namespace, HashMap::new())
834 .await
835 .unwrap();
836 assert!(catalog.namespace_exists(&namespace).await.unwrap());
837 catalog.drop_namespace(&namespace).await.unwrap();
838 assert!(!catalog.namespace_exists(&namespace).await.unwrap());
839 }
840
841 #[tokio::test]
842 async fn test_s3tables_create_delete_table() {
843 let catalog = match load_s3tables_catalog_from_env().await {
844 Ok(Some(catalog)) => catalog,
845 Ok(None) => return,
846 Err(e) => panic!("Error loading catalog: {e}"),
847 };
848
849 let creation = {
850 let schema = Schema::builder()
851 .with_schema_id(0)
852 .with_fields(vec![
853 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
854 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
855 ])
856 .build()
857 .unwrap();
858 TableCreation::builder()
859 .name("test_s3tables_create_delete_table".to_string())
860 .properties(HashMap::new())
861 .schema(schema)
862 .build()
863 };
864
865 let namespace = NamespaceIdent::new("test_s3tables_create_delete_table".to_string());
866 let table_ident = TableIdent::new(
867 namespace.clone(),
868 "test_s3tables_create_delete_table".to_string(),
869 );
870 catalog.drop_namespace(&namespace).await.ok();
871 catalog.drop_table(&table_ident).await.ok();
872
873 catalog
874 .create_namespace(&namespace, HashMap::new())
875 .await
876 .unwrap();
877 catalog.create_table(&namespace, creation).await.unwrap();
878 assert!(catalog.table_exists(&table_ident).await.unwrap());
879 catalog.drop_table(&table_ident).await.unwrap();
880 assert!(!catalog.table_exists(&table_ident).await.unwrap());
881 catalog.drop_namespace(&namespace).await.unwrap();
882 }
883
884 #[tokio::test]
885 async fn test_s3tables_update_table() {
886 let catalog = match load_s3tables_catalog_from_env().await {
887 Ok(Some(catalog)) => catalog,
888 Ok(None) => return,
889 Err(e) => panic!("Error loading catalog: {e}"),
890 };
891
892 let namespace = NamespaceIdent::new("test_s3tables_update_table".to_string());
894 let table_ident =
895 TableIdent::new(namespace.clone(), "test_s3tables_update_table".to_string());
896
897 catalog.drop_table(&table_ident).await.ok();
899 catalog.drop_namespace(&namespace).await.ok();
900
901 catalog
903 .create_namespace(&namespace, HashMap::new())
904 .await
905 .unwrap();
906
907 let creation = {
908 let schema = Schema::builder()
909 .with_schema_id(0)
910 .with_fields(vec![
911 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
912 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
913 ])
914 .build()
915 .unwrap();
916 TableCreation::builder()
917 .name(table_ident.name().to_string())
918 .properties(HashMap::new())
919 .schema(schema)
920 .build()
921 };
922
923 let table = catalog.create_table(&namespace, creation).await.unwrap();
924
925 let tx = Transaction::new(&table);
927
928 let original_metadata_location = table.metadata_location();
930
931 let tx = tx
933 .update_table_properties()
934 .set("test_property".to_string(), "test_value".to_string())
935 .apply(tx)
936 .unwrap();
937
938 let updated_table = tx.commit(&catalog).await.unwrap();
940
941 assert_eq!(
943 updated_table.metadata().properties().get("test_property"),
944 Some(&"test_value".to_string())
945 );
946
947 assert_ne!(
949 updated_table.metadata_location(),
950 original_metadata_location,
951 "Metadata location should be updated after commit"
952 );
953
954 let reloaded_table = catalog.load_table(&table_ident).await.unwrap();
956
957 assert_eq!(
959 reloaded_table.metadata().properties().get("test_property"),
960 Some(&"test_value".to_string())
961 );
962 assert_eq!(
963 reloaded_table.metadata_location(),
964 updated_table.metadata_location(),
965 "Reloaded table should have the same metadata location as the updated table"
966 );
967 }
968
969 #[tokio::test]
970 async fn test_builder_load_missing_bucket_arn() {
971 let builder = S3TablesCatalogBuilder::default();
972 let result = builder.load("s3tables", HashMap::new()).await;
973
974 assert!(result.is_err());
975 if let Err(err) = result {
976 assert_eq!(err.kind(), ErrorKind::DataInvalid);
977 assert_eq!(err.message(), "Table bucket ARN is required");
978 }
979 }
980
981 #[tokio::test]
982 async fn test_builder_with_endpoint_url_ok() {
983 let builder = S3TablesCatalogBuilder::default().with_endpoint_url("http://localhost:4566");
984
985 let result = builder
986 .load(
987 "s3tables",
988 HashMap::from([
989 (
990 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
991 "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
992 ),
993 ("some_prop".to_string(), "some_value".to_string()),
994 ]),
995 )
996 .await;
997
998 assert!(result.is_ok());
999 }
1000
1001 #[tokio::test]
1002 async fn test_builder_with_client_ok() {
1003 use aws_config::BehaviorVersion;
1004
1005 let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
1006 let client = aws_sdk_s3tables::Client::new(&sdk_config);
1007
1008 let builder = S3TablesCatalogBuilder::default().with_client(client);
1009 let result = builder
1010 .load(
1011 "s3tables",
1012 HashMap::from([(
1013 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1014 "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
1015 )]),
1016 )
1017 .await;
1018
1019 assert!(result.is_ok());
1020 }
1021
1022 #[tokio::test]
1023 async fn test_builder_with_table_bucket_arn() {
1024 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1025 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1026
1027 let result = builder.load("s3tables", HashMap::new()).await;
1028
1029 assert!(result.is_ok());
1030 let catalog = result.unwrap();
1031 assert_eq!(catalog.config.table_bucket_arn, test_arn);
1032 }
1033
1034 #[tokio::test]
1035 async fn test_builder_empty_table_bucket_arn_edge_cases() {
1036 let mut props = HashMap::new();
1037 props.insert(
1038 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1039 "".to_string(),
1040 );
1041
1042 let builder = S3TablesCatalogBuilder::default();
1043 let result = builder.load("s3tables", props).await;
1044
1045 assert!(result.is_err());
1046 if let Err(err) = result {
1047 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1048 assert_eq!(err.message(), "Table bucket ARN is required");
1049 }
1050 }
1051
1052 #[tokio::test]
1053 async fn test_endpoint_url_property_overrides_builder_method() {
1054 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1055 let builder_endpoint = "http://localhost:4566";
1056 let property_endpoint = "http://localhost:8080";
1057
1058 let builder = S3TablesCatalogBuilder::default()
1059 .with_table_bucket_arn(test_arn)
1060 .with_endpoint_url(builder_endpoint);
1061
1062 let mut props = HashMap::new();
1063 props.insert(
1064 S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1065 property_endpoint.to_string(),
1066 );
1067
1068 let result = builder.load("s3tables", props).await;
1069
1070 assert!(result.is_ok());
1071 let catalog = result.unwrap();
1072
1073 assert_eq!(
1075 catalog.config.endpoint_url,
1076 Some(property_endpoint.to_string())
1077 );
1078 assert_ne!(
1079 catalog.config.endpoint_url,
1080 Some(builder_endpoint.to_string())
1081 );
1082 }
1083
1084 #[tokio::test]
1085 async fn test_endpoint_url_builder_method_only() {
1086 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1087 let builder_endpoint = "http://localhost:4566";
1088
1089 let builder = S3TablesCatalogBuilder::default()
1090 .with_table_bucket_arn(test_arn)
1091 .with_endpoint_url(builder_endpoint);
1092
1093 let result = builder.load("s3tables", HashMap::new()).await;
1094
1095 assert!(result.is_ok());
1096 let catalog = result.unwrap();
1097
1098 assert_eq!(
1099 catalog.config.endpoint_url,
1100 Some(builder_endpoint.to_string())
1101 );
1102 }
1103
1104 #[tokio::test]
1105 async fn test_endpoint_url_property_only() {
1106 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1107 let property_endpoint = "http://localhost:8080";
1108
1109 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1110
1111 let mut props = HashMap::new();
1112 props.insert(
1113 S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1114 property_endpoint.to_string(),
1115 );
1116
1117 let result = builder.load("s3tables", props).await;
1118
1119 assert!(result.is_ok());
1120 let catalog = result.unwrap();
1121
1122 assert_eq!(
1123 catalog.config.endpoint_url,
1124 Some(property_endpoint.to_string())
1125 );
1126 }
1127
1128 #[tokio::test]
1129 async fn test_table_bucket_arn_property_overrides_builder_method() {
1130 let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1131 let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1132
1133 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1134
1135 let mut props = HashMap::new();
1136 props.insert(
1137 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1138 property_arn.to_string(),
1139 );
1140
1141 let result = builder.load("s3tables", props).await;
1142
1143 assert!(result.is_ok());
1144 let catalog = result.unwrap();
1145
1146 assert_eq!(catalog.config.table_bucket_arn, property_arn);
1147 assert_ne!(catalog.config.table_bucket_arn, builder_arn);
1148 }
1149
1150 #[tokio::test]
1151 async fn test_table_bucket_arn_builder_method_only() {
1152 let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1153
1154 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1155
1156 let result = builder.load("s3tables", HashMap::new()).await;
1157
1158 assert!(result.is_ok());
1159 let catalog = result.unwrap();
1160
1161 assert_eq!(catalog.config.table_bucket_arn, builder_arn);
1162 }
1163
1164 #[tokio::test]
1165 async fn test_table_bucket_arn_property_only() {
1166 let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1167
1168 let builder = S3TablesCatalogBuilder::default();
1169
1170 let mut props = HashMap::new();
1171 props.insert(
1172 S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1173 property_arn.to_string(),
1174 );
1175
1176 let result = builder.load("s3tables", props).await;
1177
1178 assert!(result.is_ok());
1179 let catalog = result.unwrap();
1180
1181 assert_eq!(catalog.config.table_bucket_arn, property_arn);
1182 }
1183
1184 #[tokio::test]
1185 async fn test_builder_empty_name_validation() {
1186 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1187 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1188
1189 let result = builder.load("", HashMap::new()).await;
1190
1191 assert!(result.is_err());
1192 if let Err(err) = result {
1193 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1194 assert_eq!(err.message(), "Catalog name cannot be empty");
1195 }
1196 }
1197
1198 #[tokio::test]
1199 async fn test_builder_whitespace_only_name_validation() {
1200 let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1201 let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1202
1203 let result = builder.load(" \t\n ", HashMap::new()).await;
1204
1205 assert!(result.is_err());
1206 if let Err(err) = result {
1207 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1208 assert_eq!(err.message(), "Catalog name cannot be empty");
1209 }
1210 }
1211
1212 #[tokio::test]
1213 async fn test_builder_name_validation_with_missing_arn() {
1214 let builder = S3TablesCatalogBuilder::default();
1215
1216 let result = builder.load("", HashMap::new()).await;
1217
1218 assert!(result.is_err());
1219 if let Err(err) = result {
1220 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1221 assert_eq!(err.message(), "Catalog name cannot be empty");
1222 }
1223 }
1224
1225 #[tokio::test]
1227 async fn test_s3tables_create_table_write_load_table_read() {
1228 use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
1229 use iceberg::writer::file_writer::ParquetWriterBuilder;
1230 use iceberg::writer::file_writer::location_generator::{
1231 DefaultFileNameGenerator, DefaultLocationGenerator,
1232 };
1233 use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
1234 use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
1235
1236 let catalog = match load_s3tables_catalog_from_env().await {
1237 Ok(Some(c)) => c,
1238 Ok(None) => return,
1239 Err(e) => panic!("Error loading catalog: {e}"),
1240 };
1241
1242 let ns = NamespaceIdent::new(format!("test_rw_{}", uuid::Uuid::new_v4().simple()));
1243 catalog.create_namespace(&ns, HashMap::new()).await.unwrap();
1244
1245 let table_name = String::from("table");
1246
1247 let schema = Schema::builder()
1248 .with_fields(vec![
1249 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
1250 ])
1251 .build()
1252 .unwrap();
1253 let creation = TableCreation::builder()
1254 .name(table_name.clone())
1255 .schema(schema)
1256 .build();
1257
1258 let table = catalog.create_table(&ns, creation).await.unwrap();
1259
1260 let arrow_schema: Arc<arrow_schema::Schema> = Arc::new(
1262 table
1263 .metadata()
1264 .current_schema()
1265 .as_ref()
1266 .try_into()
1267 .unwrap(),
1268 );
1269 let batch = arrow_array::RecordBatch::try_new(arrow_schema, vec![Arc::new(
1270 arrow_array::Int32Array::from(vec![42]),
1271 )])
1272 .unwrap();
1273
1274 let location_generator = DefaultLocationGenerator::new(table.metadata()).unwrap();
1276 let file_name_generator = DefaultFileNameGenerator::new(
1277 "test".to_string(),
1278 None,
1279 iceberg::spec::DataFileFormat::Parquet,
1280 );
1281 let parquet_writer_builder = ParquetWriterBuilder::new(
1282 parquet::file::properties::WriterProperties::default(),
1283 table.metadata().current_schema().clone(),
1284 );
1285 let rw = RollingFileWriterBuilder::new_with_default_file_size(
1286 parquet_writer_builder,
1287 table.file_io().clone(),
1288 location_generator,
1289 file_name_generator,
1290 );
1291 let mut writer = DataFileWriterBuilder::new(rw).build(None).await.unwrap();
1292 writer.write(batch.clone()).await.unwrap();
1293 let data_files = writer.close().await.unwrap();
1294
1295 let tx = Transaction::new(&table);
1296 let tx = tx
1297 .fast_append()
1298 .add_data_files(data_files)
1299 .apply(tx)
1300 .unwrap();
1301 tx.commit(&catalog).await.unwrap();
1302
1303 let table_ident = TableIdent::new(ns.clone(), table_name.clone());
1305 let reloaded = catalog.load_table(&table_ident).await.unwrap();
1306 let batches: Vec<arrow_array::RecordBatch> = reloaded
1307 .scan()
1308 .select_all()
1309 .build()
1310 .expect("scan to be valid (snapshot exists, schema is OK)")
1311 .to_arrow()
1312 .await
1313 .expect("scan tasks should be OK")
1314 .try_collect()
1315 .await
1316 .expect("scan should complete successfully");
1317
1318 assert_eq!(batches.len(), 1);
1319 assert_eq!(
1320 batches[0], batch,
1321 "read records should match records written earlier"
1322 );
1323
1324 catalog.purge_table(&table_ident).await.ok();
1326 catalog.drop_namespace(&ns).await.ok();
1327 }
1328}