iceberg_catalog_s3tables/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::future::Future;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use aws_sdk_s3tables::operation::create_table::CreateTableOutput;
25use aws_sdk_s3tables::operation::get_namespace::GetNamespaceOutput;
26use aws_sdk_s3tables::operation::get_table::GetTableOutput;
27use aws_sdk_s3tables::operation::list_tables::ListTablesOutput;
28use aws_sdk_s3tables::operation::update_table_metadata_location::UpdateTableMetadataLocationError;
29use aws_sdk_s3tables::types::OpenTableFormat;
30use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
31use iceberg::spec::{TableMetadata, TableMetadataBuilder};
32use iceberg::table::Table;
33use iceberg::{
34    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
35    TableCommit, TableCreation, TableIdent,
36};
37use iceberg_storage_opendal::OpenDalStorageFactory;
38
39use crate::utils::create_sdk_config;
40
41/// S3Tables table bucket ARN property
42pub const S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN: &str = "table_bucket_arn";
43/// S3Tables endpoint URL property
44pub const S3TABLES_CATALOG_PROP_ENDPOINT_URL: &str = "endpoint_url";
45
46/// S3Tables catalog configuration.
47#[derive(Debug)]
48struct S3TablesCatalogConfig {
49    /// Catalog name.
50    name: Option<String>,
51    /// Unlike other buckets, S3Tables bucket is not a physical bucket, but a virtual bucket
52    /// that is managed by s3tables. We can't directly access the bucket with path like
53    /// s3://{bucket_name}/{file_path}, all the operations are done with respect of the bucket
54    /// ARN.
55    table_bucket_arn: String,
56    /// Endpoint URL for the catalog.
57    endpoint_url: Option<String>,
58    /// Optional pre-configured AWS SDK client for S3Tables.
59    client: Option<aws_sdk_s3tables::Client>,
60    /// Properties for the catalog. The available properties are:
61    /// - `profile_name`: The name of the AWS profile to use.
62    /// - `region_name`: The AWS region to use.
63    /// - `aws_access_key_id`: The AWS access key ID to use.
64    /// - `aws_secret_access_key`: The AWS secret access key to use.
65    /// - `aws_session_token`: The AWS session token to use.
66    props: HashMap<String, String>,
67}
68
69/// Builder for [`S3TablesCatalog`].
70#[derive(Debug)]
71pub struct S3TablesCatalogBuilder {
72    config: S3TablesCatalogConfig,
73    storage_factory: Option<Arc<dyn StorageFactory>>,
74}
75
76/// Default builder for [`S3TablesCatalog`].
77impl Default for S3TablesCatalogBuilder {
78    fn default() -> Self {
79        Self {
80            config: S3TablesCatalogConfig {
81                name: None,
82                table_bucket_arn: "".to_string(),
83                endpoint_url: None,
84                client: None,
85                props: HashMap::new(),
86            },
87            storage_factory: None,
88        }
89    }
90}
91
92/// Builder methods for [`S3TablesCatalog`].
93impl S3TablesCatalogBuilder {
94    /// Configure the catalog with a custom endpoint URL (useful for local testing/mocking).
95    ///
96    /// # Behavior with Properties
97    ///
98    /// If both this method and the `endpoint_url` property are provided during catalog loading,
99    /// the property value will take precedence and overwrite the value set by this method.
100    /// This follows the general pattern where properties specified in the `load()` method
101    /// have higher priority than builder method configurations.
102    pub fn with_endpoint_url(mut self, endpoint_url: impl Into<String>) -> Self {
103        self.config.endpoint_url = Some(endpoint_url.into());
104        self
105    }
106
107    /// Configure the catalog with a pre-built AWS SDK client.
108    pub fn with_client(mut self, client: aws_sdk_s3tables::Client) -> Self {
109        self.config.client = Some(client);
110        self
111    }
112
113    /// Configure the catalog with a table bucket ARN.
114    ///
115    /// # Behavior with Properties
116    ///
117    /// If both this method and the `table_bucket_arn` property are provided during catalog loading,
118    /// the property value will take precedence and overwrite the value set by this method.
119    /// This follows the general pattern where properties specified in the `load()` method
120    /// have higher priority than builder method configurations.
121    pub fn with_table_bucket_arn(mut self, table_bucket_arn: impl Into<String>) -> Self {
122        self.config.table_bucket_arn = table_bucket_arn.into();
123        self
124    }
125}
126
127impl CatalogBuilder for S3TablesCatalogBuilder {
128    type C = S3TablesCatalog;
129
130    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
131        self.storage_factory = Some(storage_factory);
132        self
133    }
134
135    fn load(
136        mut self,
137        name: impl Into<String>,
138        props: HashMap<String, String>,
139    ) -> impl Future<Output = Result<Self::C>> + Send {
140        let catalog_name = name.into();
141        self.config.name = Some(catalog_name.clone());
142
143        if props.contains_key(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN) {
144            self.config.table_bucket_arn = props
145                .get(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN)
146                .cloned()
147                .unwrap_or_default();
148        }
149
150        if props.contains_key(S3TABLES_CATALOG_PROP_ENDPOINT_URL) {
151            self.config.endpoint_url = props.get(S3TABLES_CATALOG_PROP_ENDPOINT_URL).cloned();
152        }
153
154        // Collect other remaining properties
155        self.config.props = props
156            .into_iter()
157            .filter(|(k, _)| {
158                k != S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN
159                    && k != S3TABLES_CATALOG_PROP_ENDPOINT_URL
160            })
161            .collect();
162
163        async move {
164            if catalog_name.trim().is_empty() {
165                Err(Error::new(
166                    ErrorKind::DataInvalid,
167                    "Catalog name cannot be empty",
168                ))
169            } else if self.config.table_bucket_arn.is_empty() {
170                Err(Error::new(
171                    ErrorKind::DataInvalid,
172                    "Table bucket ARN is required",
173                ))
174            } else {
175                S3TablesCatalog::new(self.config, self.storage_factory).await
176            }
177        }
178    }
179}
180
181/// S3Tables catalog implementation.
182#[derive(Debug)]
183pub struct S3TablesCatalog {
184    config: S3TablesCatalogConfig,
185    s3tables_client: aws_sdk_s3tables::Client,
186    file_io: FileIO,
187}
188
189impl S3TablesCatalog {
190    /// Creates a new S3Tables catalog.
191    async fn new(
192        config: S3TablesCatalogConfig,
193        storage_factory: Option<Arc<dyn StorageFactory>>,
194    ) -> Result<Self> {
195        let s3tables_client = if let Some(client) = config.client.clone() {
196            client
197        } else {
198            let aws_config = create_sdk_config(&config.props, config.endpoint_url.clone()).await;
199            aws_sdk_s3tables::Client::new(&aws_config)
200        };
201
202        // Use provided factory or default to OpenDalStorageFactory::S3
203        let factory = storage_factory.unwrap_or_else(|| {
204            Arc::new(OpenDalStorageFactory::S3 {
205                configured_scheme: "s3a".to_string(),
206                customized_credential_load: None,
207            })
208        });
209        let file_io = FileIOBuilder::new(factory)
210            .with_props(&config.props)
211            .build();
212
213        Ok(Self {
214            config,
215            s3tables_client,
216            file_io,
217        })
218    }
219
220    async fn load_table_with_version_token(
221        &self,
222        table_ident: &TableIdent,
223    ) -> Result<(Table, String)> {
224        let req = self
225            .s3tables_client
226            .get_table()
227            .table_bucket_arn(self.config.table_bucket_arn.clone())
228            .namespace(table_ident.namespace().to_url_string())
229            .name(table_ident.name());
230        let resp: GetTableOutput = req.send().await.map_err(from_aws_sdk_error)?;
231
232        // when a table is created, it's possible that the metadata location is not set.
233        let metadata_location = resp.metadata_location().ok_or_else(|| {
234            Error::new(
235                ErrorKind::Unexpected,
236                format!(
237                    "Table {} does not have metadata location",
238                    table_ident.name()
239                ),
240            )
241        })?;
242        let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
243
244        let table = Table::builder()
245            .identifier(table_ident.clone())
246            .metadata(metadata)
247            .metadata_location(metadata_location)
248            .file_io(self.file_io.clone())
249            .build()?;
250        Ok((table, resp.version_token))
251    }
252}
253
254#[async_trait]
255impl Catalog for S3TablesCatalog {
256    /// List namespaces from s3tables catalog.
257    ///
258    /// S3Tables doesn't support nested namespaces. If parent is provided, it will
259    /// return an empty list.
260    async fn list_namespaces(
261        &self,
262        parent: Option<&NamespaceIdent>,
263    ) -> Result<Vec<NamespaceIdent>> {
264        if parent.is_some() {
265            return Ok(vec![]);
266        }
267
268        let mut result = Vec::new();
269        let mut continuation_token = None;
270        loop {
271            let mut req = self
272                .s3tables_client
273                .list_namespaces()
274                .table_bucket_arn(self.config.table_bucket_arn.clone());
275            if let Some(token) = continuation_token {
276                req = req.continuation_token(token);
277            }
278            let resp = req.send().await.map_err(from_aws_sdk_error)?;
279            for ns in resp.namespaces() {
280                result.push(NamespaceIdent::from_vec(ns.namespace().to_vec())?);
281            }
282            continuation_token = resp.continuation_token().map(|s| s.to_string());
283            if continuation_token.is_none() {
284                break;
285            }
286        }
287        Ok(result)
288    }
289
290    /// Creates a new namespace with the given identifier and properties.
291    ///
292    /// Attempts to create a namespace defined by the `namespace`. The `properties`
293    /// parameter is ignored.
294    ///
295    /// The following naming rules apply to namespaces:
296    ///
297    /// - Names must be between 3 (min) and 63 (max) characters long.
298    /// - Names can consist only of lowercase letters, numbers, and underscores (_).
299    /// - Names must begin and end with a letter or number.
300    /// - Names must not contain hyphens (-) or periods (.).
301    ///
302    /// This function can return an error in the following situations:
303    ///
304    /// - Errors from the underlying database creation process, converted using
305    /// `from_aws_sdk_error`.
306    async fn create_namespace(
307        &self,
308        namespace: &NamespaceIdent,
309        _properties: HashMap<String, String>,
310    ) -> Result<Namespace> {
311        if self.namespace_exists(namespace).await? {
312            return Err(Error::new(
313                ErrorKind::NamespaceAlreadyExists,
314                format!("Namespace {namespace:?} already exists"),
315            ));
316        }
317
318        let req = self
319            .s3tables_client
320            .create_namespace()
321            .table_bucket_arn(self.config.table_bucket_arn.clone())
322            .namespace(namespace.to_url_string());
323        req.send().await.map_err(from_aws_sdk_error)?;
324        Ok(Namespace::with_properties(
325            namespace.clone(),
326            HashMap::new(),
327        ))
328    }
329
330    /// Retrieves a namespace by its identifier.
331    ///
332    /// Validates the given namespace identifier and then queries the
333    /// underlying database client to fetch the corresponding namespace data.
334    /// Constructs a `Namespace` object with the retrieved data and returns it.
335    ///
336    /// This function can return an error in any of the following situations:
337    /// - If there is an error querying the database, returned by
338    /// `from_aws_sdk_error`.
339    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
340        if !self.namespace_exists(namespace).await? {
341            return Err(Error::new(
342                ErrorKind::NamespaceNotFound,
343                format!("Namespace {namespace:?} does not exist"),
344            ));
345        }
346
347        let req = self
348            .s3tables_client
349            .get_namespace()
350            .table_bucket_arn(self.config.table_bucket_arn.clone())
351            .namespace(namespace.to_url_string());
352        let resp: GetNamespaceOutput = req.send().await.map_err(from_aws_sdk_error)?;
353        let properties = HashMap::new();
354        Ok(Namespace::with_properties(
355            NamespaceIdent::from_vec(resp.namespace().to_vec())?,
356            properties,
357        ))
358    }
359
360    /// Checks if a namespace exists within the s3tables catalog.
361    ///
362    /// Validates the namespace identifier by querying the s3tables catalog
363    /// to determine if the specified namespace exists.
364    ///
365    /// # Returns
366    /// A `Result<bool>` indicating the outcome of the check:
367    /// - `Ok(true)` if the namespace exists.
368    /// - `Ok(false)` if the namespace does not exist, identified by a specific
369    /// `IsNotFoundException` variant.
370    /// - `Err(...)` if an error occurs during validation or the s3tables catalog
371    /// query, with the error encapsulating the issue.
372    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
373        let req = self
374            .s3tables_client
375            .get_namespace()
376            .table_bucket_arn(self.config.table_bucket_arn.clone())
377            .namespace(namespace.to_url_string());
378        match req.send().await {
379            Ok(_) => Ok(true),
380            Err(err) => {
381                if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
382                    Ok(false)
383                } else {
384                    Err(from_aws_sdk_error(err))
385                }
386            }
387        }
388    }
389
390    /// Updates the properties of an existing namespace.
391    ///
392    /// S3Tables doesn't support updating namespace properties, so this function
393    /// will always return an error.
394    async fn update_namespace(
395        &self,
396        _namespace: &NamespaceIdent,
397        _properties: HashMap<String, String>,
398    ) -> Result<()> {
399        Err(Error::new(
400            ErrorKind::FeatureUnsupported,
401            "Update namespace is not supported for s3tables catalog",
402        ))
403    }
404
405    /// Drops an existing namespace from the s3tables catalog.
406    ///
407    /// Validates the namespace identifier and then deletes the corresponding
408    /// namespace from the s3tables catalog.
409    ///
410    /// This function can return an error in the following situations:
411    /// - Errors from the underlying database deletion process, converted using
412    /// `from_aws_sdk_error`.
413    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
414        if !self.namespace_exists(namespace).await? {
415            return Err(Error::new(
416                ErrorKind::NamespaceNotFound,
417                format!("Namespace {namespace:?} does not exist"),
418            ));
419        }
420
421        let req = self
422            .s3tables_client
423            .delete_namespace()
424            .table_bucket_arn(self.config.table_bucket_arn.clone())
425            .namespace(namespace.to_url_string());
426        req.send().await.map_err(from_aws_sdk_error)?;
427        Ok(())
428    }
429
430    /// Lists all tables within a given namespace.
431    ///
432    /// Retrieves all tables associated with the specified namespace and returns
433    /// their identifiers.
434    ///
435    /// This function can return an error in the following situations:
436    /// - Errors from the underlying database query process, converted using
437    /// `from_aws_sdk_error`.
438    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
439        let mut result = Vec::new();
440        let mut continuation_token = None;
441        loop {
442            let mut req = self
443                .s3tables_client
444                .list_tables()
445                .table_bucket_arn(self.config.table_bucket_arn.clone())
446                .namespace(namespace.to_url_string());
447            if let Some(token) = continuation_token {
448                req = req.continuation_token(token);
449            }
450            let resp: ListTablesOutput = req.send().await.map_err(from_aws_sdk_error)?;
451            for table in resp.tables() {
452                result.push(TableIdent::new(
453                    NamespaceIdent::from_vec(table.namespace().to_vec())?,
454                    table.name().to_string(),
455                ));
456            }
457            continuation_token = resp.continuation_token().map(|s| s.to_string());
458            if continuation_token.is_none() {
459                break;
460            }
461        }
462        Ok(result)
463    }
464
465    /// Creates a new table within a specified namespace.
466    ///
467    /// Attempts to create a table defined by the `creation` parameter. The metadata
468    /// location is generated by the s3tables catalog, looks like:
469    ///
470    /// s3://{RANDOM WAREHOUSE LOCATION}/metadata/{VERSION}-{UUID}.metadata.json
471    ///
472    /// We have to get this random warehouse location after the table is created.
473    ///
474    /// This function can return an error in the following situations:
475    /// - If the location of the table is set by user, identified by a specific
476    /// `DataInvalid` variant.
477    /// - Errors from the underlying database creation process, converted using
478    /// `from_aws_sdk_error`.
479    async fn create_table(
480        &self,
481        namespace: &NamespaceIdent,
482        mut creation: TableCreation,
483    ) -> Result<Table> {
484        let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
485
486        // create table
487        let create_resp: CreateTableOutput = self
488            .s3tables_client
489            .create_table()
490            .table_bucket_arn(self.config.table_bucket_arn.clone())
491            .namespace(namespace.to_url_string())
492            .format(OpenTableFormat::Iceberg)
493            .name(table_ident.name())
494            .send()
495            .await
496            .map_err(from_aws_sdk_error)?;
497
498        // prepare table location. the warehouse location is generated by s3tables catalog,
499        // which looks like: s3://e6c9bf20-991a-46fb-kni5xs1q2yxi3xxdyxzjzigdeop1quse2b--table-s3
500        let table_location = match &creation.location {
501            Some(_) => {
502                return Err(Error::new(
503                    ErrorKind::DataInvalid,
504                    "The location of the table is generated by s3tables catalog, can't be set by user.",
505                ));
506            }
507            None => {
508                let get_resp: GetTableOutput = self
509                    .s3tables_client
510                    .get_table()
511                    .table_bucket_arn(self.config.table_bucket_arn.clone())
512                    .namespace(namespace.to_url_string())
513                    .name(table_ident.name())
514                    .send()
515                    .await
516                    .map_err(from_aws_sdk_error)?;
517                get_resp.warehouse_location().to_string()
518            }
519        };
520
521        // write metadata to file
522        creation.location = Some(table_location.clone());
523        let metadata = TableMetadataBuilder::from_table_creation(creation)?
524            .build()?
525            .metadata;
526        let metadata_location = MetadataLocation::new_with_metadata(table_location, &metadata);
527        metadata.write_to(&self.file_io, &metadata_location).await?;
528
529        // update metadata location
530        let metadata_location_str = metadata_location.to_string();
531        self.s3tables_client
532            .update_table_metadata_location()
533            .table_bucket_arn(self.config.table_bucket_arn.clone())
534            .namespace(namespace.to_url_string())
535            .name(table_ident.name())
536            .metadata_location(metadata_location_str.clone())
537            .version_token(create_resp.version_token())
538            .send()
539            .await
540            .map_err(from_aws_sdk_error)?;
541
542        let table = Table::builder()
543            .identifier(table_ident)
544            .metadata_location(metadata_location_str)
545            .metadata(metadata)
546            .file_io(self.file_io.clone())
547            .build()?;
548        Ok(table)
549    }
550
551    /// Loads an existing table from the s3tables catalog.
552    ///
553    /// Retrieves the metadata location of the specified table and constructs a
554    /// `Table` object with the retrieved metadata.
555    ///
556    /// This function can return an error in the following situations:
557    /// - If the table does not have a metadata location, identified by a specific
558    /// `Unexpected` variant.
559    /// - Errors from the underlying database query process, converted using
560    /// `from_aws_sdk_error`.
561    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
562        Ok(self.load_table_with_version_token(table_ident).await?.0)
563    }
564
565    /// Drops an existing table from the s3tables catalog.
566    ///
567    /// Validates the table identifier and then deletes the corresponding
568    /// table from the s3tables catalog.
569    ///
570    /// This function can return an error in the following situations:
571    /// - Errors from the underlying database deletion process, converted using
572    /// `from_aws_sdk_error`.
573    async fn drop_table(&self, table: &TableIdent) -> Result<()> {
574        let req = self
575            .s3tables_client
576            .delete_table()
577            .table_bucket_arn(self.config.table_bucket_arn.clone())
578            .namespace(table.namespace().to_url_string())
579            .name(table.name());
580        req.send().await.map_err(from_aws_sdk_error)?;
581        Ok(())
582    }
583
584    /// Checks if a table exists within the s3tables catalog.
585    ///
586    /// Validates the table identifier by querying the s3tables catalog
587    /// to determine if the specified table exists.
588    ///
589    /// # Returns
590    /// A `Result<bool>` indicating the outcome of the check:
591    /// - `Ok(true)` if the table exists.
592    /// - `Ok(false)` if the table does not exist, identified by a specific
593    /// `IsNotFoundException` variant.
594    /// - `Err(...)` if an error occurs during validation or the s3tables catalog
595    /// query, with the error encapsulating the issue.
596    async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
597        let req = self
598            .s3tables_client
599            .get_table()
600            .table_bucket_arn(self.config.table_bucket_arn.clone())
601            .namespace(table_ident.namespace().to_url_string())
602            .name(table_ident.name());
603        match req.send().await {
604            Ok(_) => Ok(true),
605            Err(err) => {
606                if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
607                    Ok(false)
608                } else {
609                    Err(from_aws_sdk_error(err))
610                }
611            }
612        }
613    }
614
615    /// Renames an existing table within the s3tables catalog.
616    ///
617    /// Validates the source and destination table identifiers and then renames
618    /// the source table to the destination table.
619    ///
620    /// This function can return an error in the following situations:
621    /// - Errors from the underlying database renaming process, converted using
622    /// `from_aws_sdk_error`.
623    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
624        let req = self
625            .s3tables_client
626            .rename_table()
627            .table_bucket_arn(self.config.table_bucket_arn.clone())
628            .namespace(src.namespace().to_url_string())
629            .name(src.name())
630            .new_namespace_name(dest.namespace().to_url_string())
631            .new_name(dest.name());
632        req.send().await.map_err(from_aws_sdk_error)?;
633        Ok(())
634    }
635
636    async fn register_table(
637        &self,
638        _table_ident: &TableIdent,
639        _metadata_location: String,
640    ) -> Result<Table> {
641        Err(Error::new(
642            ErrorKind::FeatureUnsupported,
643            "Registering a table is not supported yet",
644        ))
645    }
646
647    /// Updates an existing table within the s3tables catalog.
648    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
649        let table_ident = commit.identifier().clone();
650        let table_namespace = table_ident.namespace();
651        let (current_table, version_token) =
652            self.load_table_with_version_token(&table_ident).await?;
653
654        let staged_table = commit.apply(current_table)?;
655        let staged_metadata_location_str = staged_table.metadata_location_result()?;
656        let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
657
658        staged_table
659            .metadata()
660            .write_to(staged_table.file_io(), &staged_metadata_location)
661            .await?;
662
663        let builder = self
664            .s3tables_client
665            .update_table_metadata_location()
666            .table_bucket_arn(&self.config.table_bucket_arn)
667            .namespace(table_namespace.to_url_string())
668            .name(table_ident.name())
669            .version_token(version_token)
670            .metadata_location(staged_metadata_location_str);
671
672        let _ = builder.send().await.map_err(|e| {
673            let error = e.into_service_error();
674            match error {
675                UpdateTableMetadataLocationError::ConflictException(_) => Error::new(
676                    ErrorKind::CatalogCommitConflicts,
677                    format!("Commit conflicted for table: {table_ident}"),
678                )
679                .with_retryable(true),
680                UpdateTableMetadataLocationError::NotFoundException(_) => Error::new(
681                    ErrorKind::TableNotFound,
682                    format!("Table {table_ident} is not found"),
683                ),
684                _ => Error::new(
685                    ErrorKind::Unexpected,
686                    "Operation failed for hitting aws sdk error",
687                ),
688            }
689            .with_source(anyhow::Error::msg(format!("aws sdk error: {error:?}")))
690        })?;
691
692        Ok(staged_table)
693    }
694}
695
696/// Format AWS SDK error into iceberg error
697pub(crate) fn from_aws_sdk_error<T>(error: aws_sdk_s3tables::error::SdkError<T>) -> Error
698where T: std::fmt::Debug {
699    Error::new(
700        ErrorKind::Unexpected,
701        format!("Operation failed for hitting aws sdk error: {error:?}"),
702    )
703}
704
705#[cfg(test)]
706mod tests {
707    use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
708    use iceberg::transaction::{ApplyTransactionAction, Transaction};
709
710    use super::*;
711
712    async fn load_s3tables_catalog_from_env() -> Result<Option<S3TablesCatalog>> {
713        let table_bucket_arn = match std::env::var("TABLE_BUCKET_ARN").ok() {
714            Some(table_bucket_arn) => table_bucket_arn,
715            None => return Ok(None),
716        };
717
718        let config = S3TablesCatalogConfig {
719            name: None,
720            table_bucket_arn,
721            endpoint_url: None,
722            client: None,
723            props: HashMap::new(),
724        };
725
726        Ok(Some(S3TablesCatalog::new(config, None).await?))
727    }
728
729    #[tokio::test]
730    async fn test_s3tables_list_namespace() {
731        let catalog = match load_s3tables_catalog_from_env().await {
732            Ok(Some(catalog)) => catalog,
733            Ok(None) => return,
734            Err(e) => panic!("Error loading catalog: {e}"),
735        };
736
737        let namespaces = catalog.list_namespaces(None).await.unwrap();
738        assert!(!namespaces.is_empty());
739    }
740
741    #[tokio::test]
742    async fn test_s3tables_list_tables() {
743        let catalog = match load_s3tables_catalog_from_env().await {
744            Ok(Some(catalog)) => catalog,
745            Ok(None) => return,
746            Err(e) => panic!("Error loading catalog: {e}"),
747        };
748
749        let tables = catalog
750            .list_tables(&NamespaceIdent::new("aws_s3_metadata".to_string()))
751            .await
752            .unwrap();
753        assert!(!tables.is_empty());
754    }
755
756    #[tokio::test]
757    async fn test_s3tables_load_table() {
758        let catalog = match load_s3tables_catalog_from_env().await {
759            Ok(Some(catalog)) => catalog,
760            Ok(None) => return,
761            Err(e) => panic!("Error loading catalog: {e}"),
762        };
763
764        let table = catalog
765            .load_table(&TableIdent::new(
766                NamespaceIdent::new("aws_s3_metadata".to_string()),
767                "query_storage_metadata".to_string(),
768            ))
769            .await
770            .unwrap();
771        println!("{table:?}");
772    }
773
774    #[tokio::test]
775    async fn test_s3tables_create_delete_namespace() {
776        let catalog = match load_s3tables_catalog_from_env().await {
777            Ok(Some(catalog)) => catalog,
778            Ok(None) => return,
779            Err(e) => panic!("Error loading catalog: {e}"),
780        };
781
782        let namespace = NamespaceIdent::new("test_s3tables_create_delete_namespace".to_string());
783        catalog
784            .create_namespace(&namespace, HashMap::new())
785            .await
786            .unwrap();
787        assert!(catalog.namespace_exists(&namespace).await.unwrap());
788        catalog.drop_namespace(&namespace).await.unwrap();
789        assert!(!catalog.namespace_exists(&namespace).await.unwrap());
790    }
791
792    #[tokio::test]
793    async fn test_s3tables_create_delete_table() {
794        let catalog = match load_s3tables_catalog_from_env().await {
795            Ok(Some(catalog)) => catalog,
796            Ok(None) => return,
797            Err(e) => panic!("Error loading catalog: {e}"),
798        };
799
800        let creation = {
801            let schema = Schema::builder()
802                .with_schema_id(0)
803                .with_fields(vec![
804                    NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
805                    NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
806                ])
807                .build()
808                .unwrap();
809            TableCreation::builder()
810                .name("test_s3tables_create_delete_table".to_string())
811                .properties(HashMap::new())
812                .schema(schema)
813                .build()
814        };
815
816        let namespace = NamespaceIdent::new("test_s3tables_create_delete_table".to_string());
817        let table_ident = TableIdent::new(
818            namespace.clone(),
819            "test_s3tables_create_delete_table".to_string(),
820        );
821        catalog.drop_namespace(&namespace).await.ok();
822        catalog.drop_table(&table_ident).await.ok();
823
824        catalog
825            .create_namespace(&namespace, HashMap::new())
826            .await
827            .unwrap();
828        catalog.create_table(&namespace, creation).await.unwrap();
829        assert!(catalog.table_exists(&table_ident).await.unwrap());
830        catalog.drop_table(&table_ident).await.unwrap();
831        assert!(!catalog.table_exists(&table_ident).await.unwrap());
832        catalog.drop_namespace(&namespace).await.unwrap();
833    }
834
835    #[tokio::test]
836    async fn test_s3tables_update_table() {
837        let catalog = match load_s3tables_catalog_from_env().await {
838            Ok(Some(catalog)) => catalog,
839            Ok(None) => return,
840            Err(e) => panic!("Error loading catalog: {e}"),
841        };
842
843        // Create a test namespace and table
844        let namespace = NamespaceIdent::new("test_s3tables_update_table".to_string());
845        let table_ident =
846            TableIdent::new(namespace.clone(), "test_s3tables_update_table".to_string());
847
848        // Clean up any existing resources from previous test runs
849        catalog.drop_table(&table_ident).await.ok();
850        catalog.drop_namespace(&namespace).await.ok();
851
852        // Create namespace and table
853        catalog
854            .create_namespace(&namespace, HashMap::new())
855            .await
856            .unwrap();
857
858        let creation = {
859            let schema = Schema::builder()
860                .with_schema_id(0)
861                .with_fields(vec![
862                    NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
863                    NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
864                ])
865                .build()
866                .unwrap();
867            TableCreation::builder()
868                .name(table_ident.name().to_string())
869                .properties(HashMap::new())
870                .schema(schema)
871                .build()
872        };
873
874        let table = catalog.create_table(&namespace, creation).await.unwrap();
875
876        // Create a transaction to update the table
877        let tx = Transaction::new(&table);
878
879        // Store the original metadata location for comparison
880        let original_metadata_location = table.metadata_location();
881
882        // Update table properties using the transaction
883        let tx = tx
884            .update_table_properties()
885            .set("test_property".to_string(), "test_value".to_string())
886            .apply(tx)
887            .unwrap();
888
889        // Commit the transaction to the catalog
890        let updated_table = tx.commit(&catalog).await.unwrap();
891
892        // Verify the update was successful
893        assert_eq!(
894            updated_table.metadata().properties().get("test_property"),
895            Some(&"test_value".to_string())
896        );
897
898        // Verify the metadata location has been updated
899        assert_ne!(
900            updated_table.metadata_location(),
901            original_metadata_location,
902            "Metadata location should be updated after commit"
903        );
904
905        // Load the table again from the catalog to verify changes were persisted
906        let reloaded_table = catalog.load_table(&table_ident).await.unwrap();
907
908        // Verify the reloaded table matches the updated table
909        assert_eq!(
910            reloaded_table.metadata().properties().get("test_property"),
911            Some(&"test_value".to_string())
912        );
913        assert_eq!(
914            reloaded_table.metadata_location(),
915            updated_table.metadata_location(),
916            "Reloaded table should have the same metadata location as the updated table"
917        );
918    }
919
920    #[tokio::test]
921    async fn test_builder_load_missing_bucket_arn() {
922        let builder = S3TablesCatalogBuilder::default();
923        let result = builder.load("s3tables", HashMap::new()).await;
924
925        assert!(result.is_err());
926        if let Err(err) = result {
927            assert_eq!(err.kind(), ErrorKind::DataInvalid);
928            assert_eq!(err.message(), "Table bucket ARN is required");
929        }
930    }
931
932    #[tokio::test]
933    async fn test_builder_with_endpoint_url_ok() {
934        let builder = S3TablesCatalogBuilder::default().with_endpoint_url("http://localhost:4566");
935
936        let result = builder
937            .load(
938                "s3tables",
939                HashMap::from([
940                    (
941                        S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
942                        "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
943                    ),
944                    ("some_prop".to_string(), "some_value".to_string()),
945                ]),
946            )
947            .await;
948
949        assert!(result.is_ok());
950    }
951
952    #[tokio::test]
953    async fn test_builder_with_client_ok() {
954        use aws_config::BehaviorVersion;
955
956        let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
957        let client = aws_sdk_s3tables::Client::new(&sdk_config);
958
959        let builder = S3TablesCatalogBuilder::default().with_client(client);
960        let result = builder
961            .load(
962                "s3tables",
963                HashMap::from([(
964                    S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
965                    "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
966                )]),
967            )
968            .await;
969
970        assert!(result.is_ok());
971    }
972
973    #[tokio::test]
974    async fn test_builder_with_table_bucket_arn() {
975        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
976        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
977
978        let result = builder.load("s3tables", HashMap::new()).await;
979
980        assert!(result.is_ok());
981        let catalog = result.unwrap();
982        assert_eq!(catalog.config.table_bucket_arn, test_arn);
983    }
984
985    #[tokio::test]
986    async fn test_builder_empty_table_bucket_arn_edge_cases() {
987        let mut props = HashMap::new();
988        props.insert(
989            S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
990            "".to_string(),
991        );
992
993        let builder = S3TablesCatalogBuilder::default();
994        let result = builder.load("s3tables", props).await;
995
996        assert!(result.is_err());
997        if let Err(err) = result {
998            assert_eq!(err.kind(), ErrorKind::DataInvalid);
999            assert_eq!(err.message(), "Table bucket ARN is required");
1000        }
1001    }
1002
1003    #[tokio::test]
1004    async fn test_endpoint_url_property_overrides_builder_method() {
1005        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1006        let builder_endpoint = "http://localhost:4566";
1007        let property_endpoint = "http://localhost:8080";
1008
1009        let builder = S3TablesCatalogBuilder::default()
1010            .with_table_bucket_arn(test_arn)
1011            .with_endpoint_url(builder_endpoint);
1012
1013        let mut props = HashMap::new();
1014        props.insert(
1015            S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1016            property_endpoint.to_string(),
1017        );
1018
1019        let result = builder.load("s3tables", props).await;
1020
1021        assert!(result.is_ok());
1022        let catalog = result.unwrap();
1023
1024        // Property value should override builder method value
1025        assert_eq!(
1026            catalog.config.endpoint_url,
1027            Some(property_endpoint.to_string())
1028        );
1029        assert_ne!(
1030            catalog.config.endpoint_url,
1031            Some(builder_endpoint.to_string())
1032        );
1033    }
1034
1035    #[tokio::test]
1036    async fn test_endpoint_url_builder_method_only() {
1037        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1038        let builder_endpoint = "http://localhost:4566";
1039
1040        let builder = S3TablesCatalogBuilder::default()
1041            .with_table_bucket_arn(test_arn)
1042            .with_endpoint_url(builder_endpoint);
1043
1044        let result = builder.load("s3tables", HashMap::new()).await;
1045
1046        assert!(result.is_ok());
1047        let catalog = result.unwrap();
1048
1049        assert_eq!(
1050            catalog.config.endpoint_url,
1051            Some(builder_endpoint.to_string())
1052        );
1053    }
1054
1055    #[tokio::test]
1056    async fn test_endpoint_url_property_only() {
1057        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1058        let property_endpoint = "http://localhost:8080";
1059
1060        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1061
1062        let mut props = HashMap::new();
1063        props.insert(
1064            S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1065            property_endpoint.to_string(),
1066        );
1067
1068        let result = builder.load("s3tables", props).await;
1069
1070        assert!(result.is_ok());
1071        let catalog = result.unwrap();
1072
1073        assert_eq!(
1074            catalog.config.endpoint_url,
1075            Some(property_endpoint.to_string())
1076        );
1077    }
1078
1079    #[tokio::test]
1080    async fn test_table_bucket_arn_property_overrides_builder_method() {
1081        let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1082        let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1083
1084        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1085
1086        let mut props = HashMap::new();
1087        props.insert(
1088            S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1089            property_arn.to_string(),
1090        );
1091
1092        let result = builder.load("s3tables", props).await;
1093
1094        assert!(result.is_ok());
1095        let catalog = result.unwrap();
1096
1097        assert_eq!(catalog.config.table_bucket_arn, property_arn);
1098        assert_ne!(catalog.config.table_bucket_arn, builder_arn);
1099    }
1100
1101    #[tokio::test]
1102    async fn test_table_bucket_arn_builder_method_only() {
1103        let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1104
1105        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1106
1107        let result = builder.load("s3tables", HashMap::new()).await;
1108
1109        assert!(result.is_ok());
1110        let catalog = result.unwrap();
1111
1112        assert_eq!(catalog.config.table_bucket_arn, builder_arn);
1113    }
1114
1115    #[tokio::test]
1116    async fn test_table_bucket_arn_property_only() {
1117        let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1118
1119        let builder = S3TablesCatalogBuilder::default();
1120
1121        let mut props = HashMap::new();
1122        props.insert(
1123            S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1124            property_arn.to_string(),
1125        );
1126
1127        let result = builder.load("s3tables", props).await;
1128
1129        assert!(result.is_ok());
1130        let catalog = result.unwrap();
1131
1132        assert_eq!(catalog.config.table_bucket_arn, property_arn);
1133    }
1134
1135    #[tokio::test]
1136    async fn test_builder_empty_name_validation() {
1137        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1138        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1139
1140        let result = builder.load("", HashMap::new()).await;
1141
1142        assert!(result.is_err());
1143        if let Err(err) = result {
1144            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1145            assert_eq!(err.message(), "Catalog name cannot be empty");
1146        }
1147    }
1148
1149    #[tokio::test]
1150    async fn test_builder_whitespace_only_name_validation() {
1151        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1152        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1153
1154        let result = builder.load("   \t\n  ", HashMap::new()).await;
1155
1156        assert!(result.is_err());
1157        if let Err(err) = result {
1158            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1159            assert_eq!(err.message(), "Catalog name cannot be empty");
1160        }
1161    }
1162
1163    #[tokio::test]
1164    async fn test_builder_name_validation_with_missing_arn() {
1165        let builder = S3TablesCatalogBuilder::default();
1166
1167        let result = builder.load("", HashMap::new()).await;
1168
1169        assert!(result.is_err());
1170        if let Err(err) = result {
1171            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1172            assert_eq!(err.message(), "Catalog name cannot be empty");
1173        }
1174    }
1175}