iceberg_catalog_s3tables/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::future::Future;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use aws_sdk_s3tables::operation::create_table::CreateTableOutput;
25use aws_sdk_s3tables::operation::get_namespace::GetNamespaceOutput;
26use aws_sdk_s3tables::operation::get_table::GetTableOutput;
27use aws_sdk_s3tables::operation::list_tables::ListTablesOutput;
28use aws_sdk_s3tables::operation::update_table_metadata_location::UpdateTableMetadataLocationError;
29use aws_sdk_s3tables::types::OpenTableFormat;
30use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
31use iceberg::spec::{TableMetadata, TableMetadataBuilder};
32use iceberg::table::Table;
33use iceberg::{
34    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
35    TableCommit, TableCreation, TableIdent,
36};
37use iceberg_storage_opendal::OpenDalStorageFactory;
38
39use crate::utils::create_sdk_config;
40
41/// S3Tables table bucket ARN property
42pub const S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN: &str = "table_bucket_arn";
43/// S3Tables endpoint URL property
44pub const S3TABLES_CATALOG_PROP_ENDPOINT_URL: &str = "endpoint_url";
45
46/// S3Tables catalog configuration.
47#[derive(Debug)]
48struct S3TablesCatalogConfig {
49    /// Catalog name.
50    name: Option<String>,
51    /// Unlike other buckets, S3Tables bucket is not a physical bucket, but a virtual bucket
52    /// that is managed by s3tables. We can't directly access the bucket with path like
53    /// s3://{bucket_name}/{file_path}, all the operations are done with respect of the bucket
54    /// ARN.
55    table_bucket_arn: String,
56    /// Endpoint URL for the catalog.
57    endpoint_url: Option<String>,
58    /// Optional pre-configured AWS SDK client for S3Tables.
59    client: Option<aws_sdk_s3tables::Client>,
60    /// Properties for the catalog. The available properties are:
61    /// - `profile_name`: The name of the AWS profile to use.
62    /// - `region_name`: The AWS region to use.
63    /// - `aws_access_key_id`: The AWS access key ID to use.
64    /// - `aws_secret_access_key`: The AWS secret access key to use.
65    /// - `aws_session_token`: The AWS session token to use.
66    props: HashMap<String, String>,
67}
68
69/// Builder for [`S3TablesCatalog`].
70#[derive(Debug)]
71pub struct S3TablesCatalogBuilder {
72    config: S3TablesCatalogConfig,
73    storage_factory: Option<Arc<dyn StorageFactory>>,
74}
75
76/// Default builder for [`S3TablesCatalog`].
77impl Default for S3TablesCatalogBuilder {
78    fn default() -> Self {
79        Self {
80            config: S3TablesCatalogConfig {
81                name: None,
82                table_bucket_arn: "".to_string(),
83                endpoint_url: None,
84                client: None,
85                props: HashMap::new(),
86            },
87            storage_factory: None,
88        }
89    }
90}
91
92/// Builder methods for [`S3TablesCatalog`].
93impl S3TablesCatalogBuilder {
94    /// Configure the catalog with a custom endpoint URL (useful for local testing/mocking).
95    ///
96    /// # Behavior with Properties
97    ///
98    /// If both this method and the `endpoint_url` property are provided during catalog loading,
99    /// the property value will take precedence and overwrite the value set by this method.
100    /// This follows the general pattern where properties specified in the `load()` method
101    /// have higher priority than builder method configurations.
102    pub fn with_endpoint_url(mut self, endpoint_url: impl Into<String>) -> Self {
103        self.config.endpoint_url = Some(endpoint_url.into());
104        self
105    }
106
107    /// Configure the catalog with a pre-built AWS SDK client.
108    pub fn with_client(mut self, client: aws_sdk_s3tables::Client) -> Self {
109        self.config.client = Some(client);
110        self
111    }
112
113    /// Configure the catalog with a table bucket ARN.
114    ///
115    /// # Behavior with Properties
116    ///
117    /// If both this method and the `table_bucket_arn` property are provided during catalog loading,
118    /// the property value will take precedence and overwrite the value set by this method.
119    /// This follows the general pattern where properties specified in the `load()` method
120    /// have higher priority than builder method configurations.
121    pub fn with_table_bucket_arn(mut self, table_bucket_arn: impl Into<String>) -> Self {
122        self.config.table_bucket_arn = table_bucket_arn.into();
123        self
124    }
125}
126
127impl CatalogBuilder for S3TablesCatalogBuilder {
128    type C = S3TablesCatalog;
129
130    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
131        self.storage_factory = Some(storage_factory);
132        self
133    }
134
135    fn load(
136        mut self,
137        name: impl Into<String>,
138        props: HashMap<String, String>,
139    ) -> impl Future<Output = Result<Self::C>> + Send {
140        let catalog_name = name.into();
141        self.config.name = Some(catalog_name.clone());
142
143        if props.contains_key(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN) {
144            self.config.table_bucket_arn = props
145                .get(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN)
146                .cloned()
147                .unwrap_or_default();
148        }
149
150        if props.contains_key(S3TABLES_CATALOG_PROP_ENDPOINT_URL) {
151            self.config.endpoint_url = props.get(S3TABLES_CATALOG_PROP_ENDPOINT_URL).cloned();
152        }
153
154        // Collect other remaining properties
155        self.config.props = props
156            .into_iter()
157            .filter(|(k, _)| {
158                k != S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN
159                    && k != S3TABLES_CATALOG_PROP_ENDPOINT_URL
160            })
161            .collect();
162
163        async move {
164            if catalog_name.trim().is_empty() {
165                Err(Error::new(
166                    ErrorKind::DataInvalid,
167                    "Catalog name cannot be empty",
168                ))
169            } else if self.config.table_bucket_arn.is_empty() {
170                Err(Error::new(
171                    ErrorKind::DataInvalid,
172                    "Table bucket ARN is required",
173                ))
174            } else {
175                S3TablesCatalog::new(self.config, self.storage_factory).await
176            }
177        }
178    }
179}
180
181/// S3Tables catalog implementation.
182#[derive(Debug)]
183pub struct S3TablesCatalog {
184    config: S3TablesCatalogConfig,
185    s3tables_client: aws_sdk_s3tables::Client,
186    file_io: FileIO,
187}
188
189impl S3TablesCatalog {
190    /// Creates a new S3Tables catalog.
191    async fn new(
192        config: S3TablesCatalogConfig,
193        storage_factory: Option<Arc<dyn StorageFactory>>,
194    ) -> Result<Self> {
195        let s3tables_client = if let Some(client) = config.client.clone() {
196            client
197        } else {
198            let aws_config = create_sdk_config(&config.props, config.endpoint_url.clone()).await;
199            aws_sdk_s3tables::Client::new(&aws_config)
200        };
201
202        // Use provided factory or default to OpenDalStorageFactory::S3
203        let factory = storage_factory.unwrap_or_else(|| {
204            Arc::new(OpenDalStorageFactory::S3 {
205                configured_scheme: "s3a".to_string(),
206                customized_credential_load: None,
207            })
208        });
209        let file_io = FileIOBuilder::new(factory)
210            .with_props(&config.props)
211            .build();
212
213        Ok(Self {
214            config,
215            s3tables_client,
216            file_io,
217        })
218    }
219
220    async fn load_table_with_version_token(
221        &self,
222        table_ident: &TableIdent,
223    ) -> Result<(Table, String)> {
224        let req = self
225            .s3tables_client
226            .get_table()
227            .table_bucket_arn(self.config.table_bucket_arn.clone())
228            .namespace(table_ident.namespace().to_url_string())
229            .name(table_ident.name());
230        let resp: GetTableOutput = req.send().await.map_err(from_aws_sdk_error)?;
231
232        // when a table is created, it's possible that the metadata location is not set.
233        let metadata_location = resp.metadata_location().ok_or_else(|| {
234            Error::new(
235                ErrorKind::Unexpected,
236                format!(
237                    "Table {} does not have metadata location",
238                    table_ident.name()
239                ),
240            )
241        })?;
242        let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
243
244        let table = Table::builder()
245            .identifier(table_ident.clone())
246            .metadata(metadata)
247            .metadata_location(metadata_location)
248            .file_io(self.file_io.clone())
249            .build()?;
250        Ok((table, resp.version_token))
251    }
252}
253
254#[async_trait]
255impl Catalog for S3TablesCatalog {
256    /// List namespaces from s3tables catalog.
257    ///
258    /// S3Tables doesn't support nested namespaces. If parent is provided, it will
259    /// return an empty list.
260    async fn list_namespaces(
261        &self,
262        parent: Option<&NamespaceIdent>,
263    ) -> Result<Vec<NamespaceIdent>> {
264        if parent.is_some() {
265            return Ok(vec![]);
266        }
267
268        let mut result = Vec::new();
269        let mut continuation_token = None;
270        loop {
271            let mut req = self
272                .s3tables_client
273                .list_namespaces()
274                .table_bucket_arn(self.config.table_bucket_arn.clone());
275            if let Some(token) = continuation_token {
276                req = req.continuation_token(token);
277            }
278            let resp = req.send().await.map_err(from_aws_sdk_error)?;
279            for ns in resp.namespaces() {
280                result.push(NamespaceIdent::from_vec(ns.namespace().to_vec())?);
281            }
282            continuation_token = resp.continuation_token().map(|s| s.to_string());
283            if continuation_token.is_none() {
284                break;
285            }
286        }
287        Ok(result)
288    }
289
290    /// Creates a new namespace with the given identifier and properties.
291    ///
292    /// Attempts to create a namespace defined by the `namespace`. The `properties`
293    /// parameter is ignored.
294    ///
295    /// The following naming rules apply to namespaces:
296    ///
297    /// - Names must be between 3 (min) and 63 (max) characters long.
298    /// - Names can consist only of lowercase letters, numbers, and underscores (_).
299    /// - Names must begin and end with a letter or number.
300    /// - Names must not contain hyphens (-) or periods (.).
301    ///
302    /// This function can return an error in the following situations:
303    ///
304    /// - Errors from the underlying database creation process, converted using
305    /// `from_aws_sdk_error`.
306    async fn create_namespace(
307        &self,
308        namespace: &NamespaceIdent,
309        _properties: HashMap<String, String>,
310    ) -> Result<Namespace> {
311        if self.namespace_exists(namespace).await? {
312            return Err(Error::new(
313                ErrorKind::NamespaceAlreadyExists,
314                format!("Namespace {namespace:?} already exists"),
315            ));
316        }
317
318        let req = self
319            .s3tables_client
320            .create_namespace()
321            .table_bucket_arn(self.config.table_bucket_arn.clone())
322            .namespace(namespace.to_url_string());
323        req.send().await.map_err(from_aws_sdk_error)?;
324        Ok(Namespace::with_properties(
325            namespace.clone(),
326            HashMap::new(),
327        ))
328    }
329
330    /// Retrieves a namespace by its identifier.
331    ///
332    /// Validates the given namespace identifier and then queries the
333    /// underlying database client to fetch the corresponding namespace data.
334    /// Constructs a `Namespace` object with the retrieved data and returns it.
335    ///
336    /// This function can return an error in any of the following situations:
337    /// - If there is an error querying the database, returned by
338    /// `from_aws_sdk_error`.
339    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
340        if !self.namespace_exists(namespace).await? {
341            return Err(Error::new(
342                ErrorKind::NamespaceNotFound,
343                format!("Namespace {namespace:?} does not exist"),
344            ));
345        }
346
347        let req = self
348            .s3tables_client
349            .get_namespace()
350            .table_bucket_arn(self.config.table_bucket_arn.clone())
351            .namespace(namespace.to_url_string());
352        let resp: GetNamespaceOutput = req.send().await.map_err(from_aws_sdk_error)?;
353        let properties = HashMap::new();
354        Ok(Namespace::with_properties(
355            NamespaceIdent::from_vec(resp.namespace().to_vec())?,
356            properties,
357        ))
358    }
359
360    /// Checks if a namespace exists within the s3tables catalog.
361    ///
362    /// Validates the namespace identifier by querying the s3tables catalog
363    /// to determine if the specified namespace exists.
364    ///
365    /// # Returns
366    /// A `Result<bool>` indicating the outcome of the check:
367    /// - `Ok(true)` if the namespace exists.
368    /// - `Ok(false)` if the namespace does not exist, identified by a specific
369    /// `IsNotFoundException` variant.
370    /// - `Err(...)` if an error occurs during validation or the s3tables catalog
371    /// query, with the error encapsulating the issue.
372    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
373        let req = self
374            .s3tables_client
375            .get_namespace()
376            .table_bucket_arn(self.config.table_bucket_arn.clone())
377            .namespace(namespace.to_url_string());
378        match req.send().await {
379            Ok(_) => Ok(true),
380            Err(err) => {
381                if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
382                    Ok(false)
383                } else {
384                    Err(from_aws_sdk_error(err))
385                }
386            }
387        }
388    }
389
390    /// Updates the properties of an existing namespace.
391    ///
392    /// S3Tables doesn't support updating namespace properties, so this function
393    /// will always return an error.
394    async fn update_namespace(
395        &self,
396        _namespace: &NamespaceIdent,
397        _properties: HashMap<String, String>,
398    ) -> Result<()> {
399        Err(Error::new(
400            ErrorKind::FeatureUnsupported,
401            "Update namespace is not supported for s3tables catalog",
402        ))
403    }
404
405    /// Drops an existing namespace from the s3tables catalog.
406    ///
407    /// Validates the namespace identifier and then deletes the corresponding
408    /// namespace from the s3tables catalog.
409    ///
410    /// This function can return an error in the following situations:
411    /// - Errors from the underlying database deletion process, converted using
412    /// `from_aws_sdk_error`.
413    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
414        if !self.namespace_exists(namespace).await? {
415            return Err(Error::new(
416                ErrorKind::NamespaceNotFound,
417                format!("Namespace {namespace:?} does not exist"),
418            ));
419        }
420
421        let req = self
422            .s3tables_client
423            .delete_namespace()
424            .table_bucket_arn(self.config.table_bucket_arn.clone())
425            .namespace(namespace.to_url_string());
426        req.send().await.map_err(from_aws_sdk_error)?;
427        Ok(())
428    }
429
430    /// Lists all tables within a given namespace.
431    ///
432    /// Retrieves all tables associated with the specified namespace and returns
433    /// their identifiers.
434    ///
435    /// This function can return an error in the following situations:
436    /// - Errors from the underlying database query process, converted using
437    /// `from_aws_sdk_error`.
438    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
439        let mut result = Vec::new();
440        let mut continuation_token = None;
441        loop {
442            let mut req = self
443                .s3tables_client
444                .list_tables()
445                .table_bucket_arn(self.config.table_bucket_arn.clone())
446                .namespace(namespace.to_url_string());
447            if let Some(token) = continuation_token {
448                req = req.continuation_token(token);
449            }
450            let resp: ListTablesOutput = req.send().await.map_err(from_aws_sdk_error)?;
451            for table in resp.tables() {
452                result.push(TableIdent::new(
453                    NamespaceIdent::from_vec(table.namespace().to_vec())?,
454                    table.name().to_string(),
455                ));
456            }
457            continuation_token = resp.continuation_token().map(|s| s.to_string());
458            if continuation_token.is_none() {
459                break;
460            }
461        }
462        Ok(result)
463    }
464
465    /// Creates a new table within a specified namespace.
466    ///
467    /// Attempts to create a table defined by the `creation` parameter. The metadata
468    /// location is generated by the s3tables catalog, looks like:
469    ///
470    /// s3://{RANDOM WAREHOUSE LOCATION}/metadata/{VERSION}-{UUID}.metadata.json
471    ///
472    /// We have to get this random warehouse location after the table is created.
473    ///
474    /// This function can return an error in the following situations:
475    /// - If the location of the table is set by user, identified by a specific
476    /// `DataInvalid` variant.
477    /// - Errors from the underlying database creation process, converted using
478    /// `from_aws_sdk_error`.
479    async fn create_table(
480        &self,
481        namespace: &NamespaceIdent,
482        mut creation: TableCreation,
483    ) -> Result<Table> {
484        let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
485
486        // create table
487        let create_resp: CreateTableOutput = self
488            .s3tables_client
489            .create_table()
490            .table_bucket_arn(self.config.table_bucket_arn.clone())
491            .namespace(namespace.to_url_string())
492            .format(OpenTableFormat::Iceberg)
493            .name(table_ident.name())
494            .send()
495            .await
496            .map_err(from_aws_sdk_error)?;
497
498        // prepare table location. the warehouse location is generated by s3tables catalog,
499        // which looks like: s3://e6c9bf20-991a-46fb-kni5xs1q2yxi3xxdyxzjzigdeop1quse2b--table-s3
500        let table_location = match &creation.location {
501            Some(_) => {
502                return Err(Error::new(
503                    ErrorKind::DataInvalid,
504                    "The location of the table is generated by s3tables catalog, can't be set by user.",
505                ));
506            }
507            None => {
508                let get_resp: GetTableOutput = self
509                    .s3tables_client
510                    .get_table()
511                    .table_bucket_arn(self.config.table_bucket_arn.clone())
512                    .namespace(namespace.to_url_string())
513                    .name(table_ident.name())
514                    .send()
515                    .await
516                    .map_err(from_aws_sdk_error)?;
517                get_resp.warehouse_location().to_string()
518            }
519        };
520
521        // write metadata to file
522        creation.location = Some(table_location.clone());
523        let metadata = TableMetadataBuilder::from_table_creation(creation)?
524            .build()?
525            .metadata;
526        let metadata_location = MetadataLocation::new_with_metadata(table_location, &metadata);
527        metadata.write_to(&self.file_io, &metadata_location).await?;
528
529        // update metadata location
530        let metadata_location_str = metadata_location.to_string();
531        self.s3tables_client
532            .update_table_metadata_location()
533            .table_bucket_arn(self.config.table_bucket_arn.clone())
534            .namespace(namespace.to_url_string())
535            .name(table_ident.name())
536            .metadata_location(metadata_location_str.clone())
537            .version_token(create_resp.version_token())
538            .send()
539            .await
540            .map_err(from_aws_sdk_error)?;
541
542        let table = Table::builder()
543            .identifier(table_ident)
544            .metadata_location(metadata_location_str)
545            .metadata(metadata)
546            .file_io(self.file_io.clone())
547            .build()?;
548        Ok(table)
549    }
550
551    /// Loads an existing table from the s3tables catalog.
552    ///
553    /// Retrieves the metadata location of the specified table and constructs a
554    /// `Table` object with the retrieved metadata.
555    ///
556    /// This function can return an error in the following situations:
557    /// - If the table does not have a metadata location, identified by a specific
558    /// `Unexpected` variant.
559    /// - Errors from the underlying database query process, converted using
560    /// `from_aws_sdk_error`.
561    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
562        Ok(self.load_table_with_version_token(table_ident).await?.0)
563    }
564
565    /// Not supported for S3Tables. Use `purge_table` instead.
566    ///
567    /// S3 Tables doesn't support soft delete, so dropping a table will permanently remove it from the catalog.
568    async fn drop_table(&self, _table: &TableIdent) -> Result<()> {
569        Err(Error::new(
570            ErrorKind::FeatureUnsupported,
571            "drop_table is not supported for S3Tables; use purge_table instead",
572        ))
573    }
574
575    /// Purge a table from the S3 Tables catalog.
576    async fn purge_table(&self, table: &TableIdent) -> Result<()> {
577        let req = self
578            .s3tables_client
579            .delete_table()
580            .table_bucket_arn(self.config.table_bucket_arn.clone())
581            .namespace(table.namespace().to_url_string())
582            .name(table.name());
583        req.send().await.map_err(from_aws_sdk_error)?;
584        Ok(())
585    }
586
587    /// Checks if a table exists within the s3tables catalog.
588    ///
589    /// Validates the table identifier by querying the s3tables catalog
590    /// to determine if the specified table exists.
591    ///
592    /// # Returns
593    /// A `Result<bool>` indicating the outcome of the check:
594    /// - `Ok(true)` if the table exists.
595    /// - `Ok(false)` if the table does not exist, identified by a specific
596    /// `IsNotFoundException` variant.
597    /// - `Err(...)` if an error occurs during validation or the s3tables catalog
598    /// query, with the error encapsulating the issue.
599    async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
600        let req = self
601            .s3tables_client
602            .get_table()
603            .table_bucket_arn(self.config.table_bucket_arn.clone())
604            .namespace(table_ident.namespace().to_url_string())
605            .name(table_ident.name());
606        match req.send().await {
607            Ok(_) => Ok(true),
608            Err(err) => {
609                if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
610                    Ok(false)
611                } else {
612                    Err(from_aws_sdk_error(err))
613                }
614            }
615        }
616    }
617
618    /// Renames an existing table within the s3tables catalog.
619    ///
620    /// Validates the source and destination table identifiers and then renames
621    /// the source table to the destination table.
622    ///
623    /// This function can return an error in the following situations:
624    /// - Errors from the underlying database renaming process, converted using
625    /// `from_aws_sdk_error`.
626    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
627        let req = self
628            .s3tables_client
629            .rename_table()
630            .table_bucket_arn(self.config.table_bucket_arn.clone())
631            .namespace(src.namespace().to_url_string())
632            .name(src.name())
633            .new_namespace_name(dest.namespace().to_url_string())
634            .new_name(dest.name());
635        req.send().await.map_err(from_aws_sdk_error)?;
636        Ok(())
637    }
638
639    async fn register_table(
640        &self,
641        _table_ident: &TableIdent,
642        _metadata_location: String,
643    ) -> Result<Table> {
644        Err(Error::new(
645            ErrorKind::FeatureUnsupported,
646            "Registering a table is not supported yet",
647        ))
648    }
649
650    /// Updates an existing table within the s3tables catalog.
651    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
652        let table_ident = commit.identifier().clone();
653        let table_namespace = table_ident.namespace();
654        let (current_table, version_token) =
655            self.load_table_with_version_token(&table_ident).await?;
656
657        let staged_table = commit.apply(current_table)?;
658        let staged_metadata_location_str = staged_table.metadata_location_result()?;
659        let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
660
661        staged_table
662            .metadata()
663            .write_to(staged_table.file_io(), &staged_metadata_location)
664            .await?;
665
666        let builder = self
667            .s3tables_client
668            .update_table_metadata_location()
669            .table_bucket_arn(&self.config.table_bucket_arn)
670            .namespace(table_namespace.to_url_string())
671            .name(table_ident.name())
672            .version_token(version_token)
673            .metadata_location(staged_metadata_location_str);
674
675        let _ = builder.send().await.map_err(|e| {
676            let error = e.into_service_error();
677            match error {
678                UpdateTableMetadataLocationError::ConflictException(_) => Error::new(
679                    ErrorKind::CatalogCommitConflicts,
680                    format!("Commit conflicted for table: {table_ident}"),
681                )
682                .with_retryable(true),
683                UpdateTableMetadataLocationError::NotFoundException(_) => Error::new(
684                    ErrorKind::TableNotFound,
685                    format!("Table {table_ident} is not found"),
686                ),
687                _ => Error::new(
688                    ErrorKind::Unexpected,
689                    "Operation failed for hitting aws sdk error",
690                ),
691            }
692            .with_source(anyhow::Error::msg(format!("aws sdk error: {error:?}")))
693        })?;
694
695        Ok(staged_table)
696    }
697}
698
699/// Format AWS SDK error into iceberg error
700pub(crate) fn from_aws_sdk_error<T>(error: aws_sdk_s3tables::error::SdkError<T>) -> Error
701where T: std::fmt::Debug {
702    Error::new(
703        ErrorKind::Unexpected,
704        format!("Operation failed for hitting aws sdk error: {error:?}"),
705    )
706}
707
708#[cfg(test)]
709mod tests {
710    use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
711    use iceberg::transaction::{ApplyTransactionAction, Transaction};
712
713    use super::*;
714
715    async fn load_s3tables_catalog_from_env() -> Result<Option<S3TablesCatalog>> {
716        let table_bucket_arn = match std::env::var("TABLE_BUCKET_ARN").ok() {
717            Some(table_bucket_arn) => table_bucket_arn,
718            None => return Ok(None),
719        };
720
721        let config = S3TablesCatalogConfig {
722            name: None,
723            table_bucket_arn,
724            endpoint_url: None,
725            client: None,
726            props: HashMap::new(),
727        };
728
729        Ok(Some(S3TablesCatalog::new(config, None).await?))
730    }
731
732    #[tokio::test]
733    async fn test_s3tables_list_namespace() {
734        let catalog = match load_s3tables_catalog_from_env().await {
735            Ok(Some(catalog)) => catalog,
736            Ok(None) => return,
737            Err(e) => panic!("Error loading catalog: {e}"),
738        };
739
740        let namespaces = catalog.list_namespaces(None).await.unwrap();
741        assert!(!namespaces.is_empty());
742    }
743
744    #[tokio::test]
745    async fn test_s3tables_list_tables() {
746        let catalog = match load_s3tables_catalog_from_env().await {
747            Ok(Some(catalog)) => catalog,
748            Ok(None) => return,
749            Err(e) => panic!("Error loading catalog: {e}"),
750        };
751
752        let tables = catalog
753            .list_tables(&NamespaceIdent::new("aws_s3_metadata".to_string()))
754            .await
755            .unwrap();
756        assert!(!tables.is_empty());
757    }
758
759    #[tokio::test]
760    async fn test_s3tables_load_table() {
761        let catalog = match load_s3tables_catalog_from_env().await {
762            Ok(Some(catalog)) => catalog,
763            Ok(None) => return,
764            Err(e) => panic!("Error loading catalog: {e}"),
765        };
766
767        let table = catalog
768            .load_table(&TableIdent::new(
769                NamespaceIdent::new("aws_s3_metadata".to_string()),
770                "query_storage_metadata".to_string(),
771            ))
772            .await
773            .unwrap();
774        println!("{table:?}");
775    }
776
777    #[tokio::test]
778    async fn test_s3tables_create_delete_namespace() {
779        let catalog = match load_s3tables_catalog_from_env().await {
780            Ok(Some(catalog)) => catalog,
781            Ok(None) => return,
782            Err(e) => panic!("Error loading catalog: {e}"),
783        };
784
785        let namespace = NamespaceIdent::new("test_s3tables_create_delete_namespace".to_string());
786        catalog
787            .create_namespace(&namespace, HashMap::new())
788            .await
789            .unwrap();
790        assert!(catalog.namespace_exists(&namespace).await.unwrap());
791        catalog.drop_namespace(&namespace).await.unwrap();
792        assert!(!catalog.namespace_exists(&namespace).await.unwrap());
793    }
794
795    #[tokio::test]
796    async fn test_s3tables_create_delete_table() {
797        let catalog = match load_s3tables_catalog_from_env().await {
798            Ok(Some(catalog)) => catalog,
799            Ok(None) => return,
800            Err(e) => panic!("Error loading catalog: {e}"),
801        };
802
803        let creation = {
804            let schema = Schema::builder()
805                .with_schema_id(0)
806                .with_fields(vec![
807                    NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
808                    NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
809                ])
810                .build()
811                .unwrap();
812            TableCreation::builder()
813                .name("test_s3tables_create_delete_table".to_string())
814                .properties(HashMap::new())
815                .schema(schema)
816                .build()
817        };
818
819        let namespace = NamespaceIdent::new("test_s3tables_create_delete_table".to_string());
820        let table_ident = TableIdent::new(
821            namespace.clone(),
822            "test_s3tables_create_delete_table".to_string(),
823        );
824        catalog.drop_namespace(&namespace).await.ok();
825        catalog.drop_table(&table_ident).await.ok();
826
827        catalog
828            .create_namespace(&namespace, HashMap::new())
829            .await
830            .unwrap();
831        catalog.create_table(&namespace, creation).await.unwrap();
832        assert!(catalog.table_exists(&table_ident).await.unwrap());
833        catalog.drop_table(&table_ident).await.unwrap();
834        assert!(!catalog.table_exists(&table_ident).await.unwrap());
835        catalog.drop_namespace(&namespace).await.unwrap();
836    }
837
838    #[tokio::test]
839    async fn test_s3tables_update_table() {
840        let catalog = match load_s3tables_catalog_from_env().await {
841            Ok(Some(catalog)) => catalog,
842            Ok(None) => return,
843            Err(e) => panic!("Error loading catalog: {e}"),
844        };
845
846        // Create a test namespace and table
847        let namespace = NamespaceIdent::new("test_s3tables_update_table".to_string());
848        let table_ident =
849            TableIdent::new(namespace.clone(), "test_s3tables_update_table".to_string());
850
851        // Clean up any existing resources from previous test runs
852        catalog.drop_table(&table_ident).await.ok();
853        catalog.drop_namespace(&namespace).await.ok();
854
855        // Create namespace and table
856        catalog
857            .create_namespace(&namespace, HashMap::new())
858            .await
859            .unwrap();
860
861        let creation = {
862            let schema = Schema::builder()
863                .with_schema_id(0)
864                .with_fields(vec![
865                    NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
866                    NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
867                ])
868                .build()
869                .unwrap();
870            TableCreation::builder()
871                .name(table_ident.name().to_string())
872                .properties(HashMap::new())
873                .schema(schema)
874                .build()
875        };
876
877        let table = catalog.create_table(&namespace, creation).await.unwrap();
878
879        // Create a transaction to update the table
880        let tx = Transaction::new(&table);
881
882        // Store the original metadata location for comparison
883        let original_metadata_location = table.metadata_location();
884
885        // Update table properties using the transaction
886        let tx = tx
887            .update_table_properties()
888            .set("test_property".to_string(), "test_value".to_string())
889            .apply(tx)
890            .unwrap();
891
892        // Commit the transaction to the catalog
893        let updated_table = tx.commit(&catalog).await.unwrap();
894
895        // Verify the update was successful
896        assert_eq!(
897            updated_table.metadata().properties().get("test_property"),
898            Some(&"test_value".to_string())
899        );
900
901        // Verify the metadata location has been updated
902        assert_ne!(
903            updated_table.metadata_location(),
904            original_metadata_location,
905            "Metadata location should be updated after commit"
906        );
907
908        // Load the table again from the catalog to verify changes were persisted
909        let reloaded_table = catalog.load_table(&table_ident).await.unwrap();
910
911        // Verify the reloaded table matches the updated table
912        assert_eq!(
913            reloaded_table.metadata().properties().get("test_property"),
914            Some(&"test_value".to_string())
915        );
916        assert_eq!(
917            reloaded_table.metadata_location(),
918            updated_table.metadata_location(),
919            "Reloaded table should have the same metadata location as the updated table"
920        );
921    }
922
923    #[tokio::test]
924    async fn test_builder_load_missing_bucket_arn() {
925        let builder = S3TablesCatalogBuilder::default();
926        let result = builder.load("s3tables", HashMap::new()).await;
927
928        assert!(result.is_err());
929        if let Err(err) = result {
930            assert_eq!(err.kind(), ErrorKind::DataInvalid);
931            assert_eq!(err.message(), "Table bucket ARN is required");
932        }
933    }
934
935    #[tokio::test]
936    async fn test_builder_with_endpoint_url_ok() {
937        let builder = S3TablesCatalogBuilder::default().with_endpoint_url("http://localhost:4566");
938
939        let result = builder
940            .load(
941                "s3tables",
942                HashMap::from([
943                    (
944                        S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
945                        "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
946                    ),
947                    ("some_prop".to_string(), "some_value".to_string()),
948                ]),
949            )
950            .await;
951
952        assert!(result.is_ok());
953    }
954
955    #[tokio::test]
956    async fn test_builder_with_client_ok() {
957        use aws_config::BehaviorVersion;
958
959        let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
960        let client = aws_sdk_s3tables::Client::new(&sdk_config);
961
962        let builder = S3TablesCatalogBuilder::default().with_client(client);
963        let result = builder
964            .load(
965                "s3tables",
966                HashMap::from([(
967                    S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
968                    "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
969                )]),
970            )
971            .await;
972
973        assert!(result.is_ok());
974    }
975
976    #[tokio::test]
977    async fn test_builder_with_table_bucket_arn() {
978        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
979        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
980
981        let result = builder.load("s3tables", HashMap::new()).await;
982
983        assert!(result.is_ok());
984        let catalog = result.unwrap();
985        assert_eq!(catalog.config.table_bucket_arn, test_arn);
986    }
987
988    #[tokio::test]
989    async fn test_builder_empty_table_bucket_arn_edge_cases() {
990        let mut props = HashMap::new();
991        props.insert(
992            S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
993            "".to_string(),
994        );
995
996        let builder = S3TablesCatalogBuilder::default();
997        let result = builder.load("s3tables", props).await;
998
999        assert!(result.is_err());
1000        if let Err(err) = result {
1001            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1002            assert_eq!(err.message(), "Table bucket ARN is required");
1003        }
1004    }
1005
1006    #[tokio::test]
1007    async fn test_endpoint_url_property_overrides_builder_method() {
1008        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1009        let builder_endpoint = "http://localhost:4566";
1010        let property_endpoint = "http://localhost:8080";
1011
1012        let builder = S3TablesCatalogBuilder::default()
1013            .with_table_bucket_arn(test_arn)
1014            .with_endpoint_url(builder_endpoint);
1015
1016        let mut props = HashMap::new();
1017        props.insert(
1018            S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1019            property_endpoint.to_string(),
1020        );
1021
1022        let result = builder.load("s3tables", props).await;
1023
1024        assert!(result.is_ok());
1025        let catalog = result.unwrap();
1026
1027        // Property value should override builder method value
1028        assert_eq!(
1029            catalog.config.endpoint_url,
1030            Some(property_endpoint.to_string())
1031        );
1032        assert_ne!(
1033            catalog.config.endpoint_url,
1034            Some(builder_endpoint.to_string())
1035        );
1036    }
1037
1038    #[tokio::test]
1039    async fn test_endpoint_url_builder_method_only() {
1040        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1041        let builder_endpoint = "http://localhost:4566";
1042
1043        let builder = S3TablesCatalogBuilder::default()
1044            .with_table_bucket_arn(test_arn)
1045            .with_endpoint_url(builder_endpoint);
1046
1047        let result = builder.load("s3tables", HashMap::new()).await;
1048
1049        assert!(result.is_ok());
1050        let catalog = result.unwrap();
1051
1052        assert_eq!(
1053            catalog.config.endpoint_url,
1054            Some(builder_endpoint.to_string())
1055        );
1056    }
1057
1058    #[tokio::test]
1059    async fn test_endpoint_url_property_only() {
1060        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1061        let property_endpoint = "http://localhost:8080";
1062
1063        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1064
1065        let mut props = HashMap::new();
1066        props.insert(
1067            S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1068            property_endpoint.to_string(),
1069        );
1070
1071        let result = builder.load("s3tables", props).await;
1072
1073        assert!(result.is_ok());
1074        let catalog = result.unwrap();
1075
1076        assert_eq!(
1077            catalog.config.endpoint_url,
1078            Some(property_endpoint.to_string())
1079        );
1080    }
1081
1082    #[tokio::test]
1083    async fn test_table_bucket_arn_property_overrides_builder_method() {
1084        let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1085        let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1086
1087        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1088
1089        let mut props = HashMap::new();
1090        props.insert(
1091            S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1092            property_arn.to_string(),
1093        );
1094
1095        let result = builder.load("s3tables", props).await;
1096
1097        assert!(result.is_ok());
1098        let catalog = result.unwrap();
1099
1100        assert_eq!(catalog.config.table_bucket_arn, property_arn);
1101        assert_ne!(catalog.config.table_bucket_arn, builder_arn);
1102    }
1103
1104    #[tokio::test]
1105    async fn test_table_bucket_arn_builder_method_only() {
1106        let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1107
1108        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1109
1110        let result = builder.load("s3tables", HashMap::new()).await;
1111
1112        assert!(result.is_ok());
1113        let catalog = result.unwrap();
1114
1115        assert_eq!(catalog.config.table_bucket_arn, builder_arn);
1116    }
1117
1118    #[tokio::test]
1119    async fn test_table_bucket_arn_property_only() {
1120        let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1121
1122        let builder = S3TablesCatalogBuilder::default();
1123
1124        let mut props = HashMap::new();
1125        props.insert(
1126            S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1127            property_arn.to_string(),
1128        );
1129
1130        let result = builder.load("s3tables", props).await;
1131
1132        assert!(result.is_ok());
1133        let catalog = result.unwrap();
1134
1135        assert_eq!(catalog.config.table_bucket_arn, property_arn);
1136    }
1137
1138    #[tokio::test]
1139    async fn test_builder_empty_name_validation() {
1140        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1141        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1142
1143        let result = builder.load("", HashMap::new()).await;
1144
1145        assert!(result.is_err());
1146        if let Err(err) = result {
1147            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1148            assert_eq!(err.message(), "Catalog name cannot be empty");
1149        }
1150    }
1151
1152    #[tokio::test]
1153    async fn test_builder_whitespace_only_name_validation() {
1154        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1155        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1156
1157        let result = builder.load("   \t\n  ", HashMap::new()).await;
1158
1159        assert!(result.is_err());
1160        if let Err(err) = result {
1161            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1162            assert_eq!(err.message(), "Catalog name cannot be empty");
1163        }
1164    }
1165
1166    #[tokio::test]
1167    async fn test_builder_name_validation_with_missing_arn() {
1168        let builder = S3TablesCatalogBuilder::default();
1169
1170        let result = builder.load("", HashMap::new()).await;
1171
1172        assert!(result.is_err());
1173        if let Err(err) = result {
1174            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1175            assert_eq!(err.message(), "Catalog name cannot be empty");
1176        }
1177    }
1178}