iceberg_catalog_s3tables/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::future::Future;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use aws_sdk_s3tables::operation::create_table::CreateTableOutput;
25use aws_sdk_s3tables::operation::get_namespace::GetNamespaceOutput;
26use aws_sdk_s3tables::operation::get_table::GetTableOutput;
27use aws_sdk_s3tables::operation::list_tables::ListTablesOutput;
28use aws_sdk_s3tables::operation::update_table_metadata_location::UpdateTableMetadataLocationError;
29use aws_sdk_s3tables::types::OpenTableFormat;
30use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
31use iceberg::spec::{TableMetadata, TableMetadataBuilder};
32use iceberg::table::Table;
33use iceberg::{
34    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
35    TableCommit, TableCreation, TableIdent,
36};
37use iceberg_storage_opendal::OpenDalStorageFactory;
38
39use crate::utils::create_sdk_config;
40
41/// S3Tables table bucket ARN property
42pub const S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN: &str = "table_bucket_arn";
43/// S3Tables endpoint URL property
44pub const S3TABLES_CATALOG_PROP_ENDPOINT_URL: &str = "endpoint_url";
45
46/// S3Tables catalog configuration.
47#[derive(Debug)]
48struct S3TablesCatalogConfig {
49    /// Catalog name.
50    name: Option<String>,
51    /// Unlike other buckets, S3Tables bucket is not a physical bucket, but a virtual bucket
52    /// that is managed by s3tables. We can't directly access the bucket with path like
53    /// s3://{bucket_name}/{file_path}, all the operations are done with respect of the bucket
54    /// ARN.
55    table_bucket_arn: String,
56    /// Endpoint URL for the catalog.
57    endpoint_url: Option<String>,
58    /// Optional pre-configured AWS SDK client for S3Tables.
59    client: Option<aws_sdk_s3tables::Client>,
60    /// Properties for the catalog. The available properties are:
61    /// - `profile_name`: The name of the AWS profile to use.
62    /// - `region_name`: The AWS region to use.
63    /// - `aws_access_key_id`: The AWS access key ID to use.
64    /// - `aws_secret_access_key`: The AWS secret access key to use.
65    /// - `aws_session_token`: The AWS session token to use.
66    props: HashMap<String, String>,
67}
68
69/// Builder for [`S3TablesCatalog`].
70#[derive(Debug)]
71pub struct S3TablesCatalogBuilder {
72    config: S3TablesCatalogConfig,
73    storage_factory: Option<Arc<dyn StorageFactory>>,
74}
75
76/// Default builder for [`S3TablesCatalog`].
77impl Default for S3TablesCatalogBuilder {
78    fn default() -> Self {
79        Self {
80            config: S3TablesCatalogConfig {
81                name: None,
82                table_bucket_arn: "".to_string(),
83                endpoint_url: None,
84                client: None,
85                props: HashMap::new(),
86            },
87            storage_factory: None,
88        }
89    }
90}
91
92/// Builder methods for [`S3TablesCatalog`].
93impl S3TablesCatalogBuilder {
94    /// Configure the catalog with a custom endpoint URL (useful for local testing/mocking).
95    ///
96    /// # Behavior with Properties
97    ///
98    /// If both this method and the `endpoint_url` property are provided during catalog loading,
99    /// the property value will take precedence and overwrite the value set by this method.
100    /// This follows the general pattern where properties specified in the `load()` method
101    /// have higher priority than builder method configurations.
102    pub fn with_endpoint_url(mut self, endpoint_url: impl Into<String>) -> Self {
103        self.config.endpoint_url = Some(endpoint_url.into());
104        self
105    }
106
107    /// Configure the catalog with a pre-built AWS SDK client.
108    pub fn with_client(mut self, client: aws_sdk_s3tables::Client) -> Self {
109        self.config.client = Some(client);
110        self
111    }
112
113    /// Configure the catalog with a table bucket ARN.
114    ///
115    /// # Behavior with Properties
116    ///
117    /// If both this method and the `table_bucket_arn` property are provided during catalog loading,
118    /// the property value will take precedence and overwrite the value set by this method.
119    /// This follows the general pattern where properties specified in the `load()` method
120    /// have higher priority than builder method configurations.
121    pub fn with_table_bucket_arn(mut self, table_bucket_arn: impl Into<String>) -> Self {
122        self.config.table_bucket_arn = table_bucket_arn.into();
123        self
124    }
125}
126
127impl CatalogBuilder for S3TablesCatalogBuilder {
128    type C = S3TablesCatalog;
129
130    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
131        self.storage_factory = Some(storage_factory);
132        self
133    }
134
135    fn load(
136        mut self,
137        name: impl Into<String>,
138        props: HashMap<String, String>,
139    ) -> impl Future<Output = Result<Self::C>> + Send {
140        let catalog_name = name.into();
141        self.config.name = Some(catalog_name.clone());
142
143        if props.contains_key(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN) {
144            self.config.table_bucket_arn = props
145                .get(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN)
146                .cloned()
147                .unwrap_or_default();
148        }
149
150        if props.contains_key(S3TABLES_CATALOG_PROP_ENDPOINT_URL) {
151            self.config.endpoint_url = props.get(S3TABLES_CATALOG_PROP_ENDPOINT_URL).cloned();
152        }
153
154        // Collect other remaining properties
155        self.config.props = props
156            .into_iter()
157            .filter(|(k, _)| {
158                k != S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN
159                    && k != S3TABLES_CATALOG_PROP_ENDPOINT_URL
160            })
161            .collect();
162
163        async move {
164            if catalog_name.trim().is_empty() {
165                Err(Error::new(
166                    ErrorKind::DataInvalid,
167                    "Catalog name cannot be empty",
168                ))
169            } else if self.config.table_bucket_arn.is_empty() {
170                Err(Error::new(
171                    ErrorKind::DataInvalid,
172                    "Table bucket ARN is required",
173                ))
174            } else {
175                S3TablesCatalog::new(self.config, self.storage_factory).await
176            }
177        }
178    }
179}
180
181/// S3Tables catalog implementation.
182#[derive(Debug)]
183pub struct S3TablesCatalog {
184    config: S3TablesCatalogConfig,
185    s3tables_client: aws_sdk_s3tables::Client,
186    file_io: FileIO,
187}
188
189impl S3TablesCatalog {
190    /// Creates a new S3Tables catalog.
191    async fn new(
192        config: S3TablesCatalogConfig,
193        storage_factory: Option<Arc<dyn StorageFactory>>,
194    ) -> Result<Self> {
195        let s3tables_client = if let Some(client) = config.client.clone() {
196            client
197        } else {
198            let aws_config = create_sdk_config(&config.props, config.endpoint_url.clone()).await;
199            aws_sdk_s3tables::Client::new(&aws_config)
200        };
201
202        // Use provided factory or default to OpenDalStorageFactory::S3
203        let factory = storage_factory.unwrap_or_else(|| {
204            Arc::new(OpenDalStorageFactory::S3 {
205                customized_credential_load: None,
206            })
207        });
208        let file_io = FileIOBuilder::new(factory)
209            .with_props(&config.props)
210            .build();
211
212        Ok(Self {
213            config,
214            s3tables_client,
215            file_io,
216        })
217    }
218
219    async fn load_table_with_version_token(
220        &self,
221        table_ident: &TableIdent,
222    ) -> Result<(Table, String)> {
223        let req = self
224            .s3tables_client
225            .get_table()
226            .table_bucket_arn(self.config.table_bucket_arn.clone())
227            .namespace(table_ident.namespace().to_url_string())
228            .name(table_ident.name());
229        let resp: GetTableOutput = req.send().await.map_err(from_aws_sdk_error)?;
230
231        // when a table is created, it's possible that the metadata location is not set.
232        let metadata_location = resp.metadata_location().ok_or_else(|| {
233            Error::new(
234                ErrorKind::Unexpected,
235                format!(
236                    "Table {} does not have metadata location",
237                    table_ident.name()
238                ),
239            )
240        })?;
241        let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
242
243        let table = Table::builder()
244            .identifier(table_ident.clone())
245            .metadata(metadata)
246            .metadata_location(metadata_location)
247            .file_io(self.file_io.clone())
248            .build()?;
249        Ok((table, resp.version_token))
250    }
251}
252
253#[async_trait]
254impl Catalog for S3TablesCatalog {
255    /// List namespaces from s3tables catalog.
256    ///
257    /// S3Tables doesn't support nested namespaces. If parent is provided, it will
258    /// return an empty list.
259    async fn list_namespaces(
260        &self,
261        parent: Option<&NamespaceIdent>,
262    ) -> Result<Vec<NamespaceIdent>> {
263        if parent.is_some() {
264            return Ok(vec![]);
265        }
266
267        let mut result = Vec::new();
268        let mut continuation_token = None;
269        loop {
270            let mut req = self
271                .s3tables_client
272                .list_namespaces()
273                .table_bucket_arn(self.config.table_bucket_arn.clone());
274            if let Some(token) = continuation_token {
275                req = req.continuation_token(token);
276            }
277            let resp = req.send().await.map_err(from_aws_sdk_error)?;
278            for ns in resp.namespaces() {
279                result.push(NamespaceIdent::from_vec(ns.namespace().to_vec())?);
280            }
281            continuation_token = resp.continuation_token().map(|s| s.to_string());
282            if continuation_token.is_none() {
283                break;
284            }
285        }
286        Ok(result)
287    }
288
289    /// Creates a new namespace with the given identifier and properties.
290    ///
291    /// Attempts to create a namespace defined by the `namespace`. The `properties`
292    /// parameter is ignored.
293    ///
294    /// The following naming rules apply to namespaces:
295    ///
296    /// - Names must be between 3 (min) and 63 (max) characters long.
297    /// - Names can consist only of lowercase letters, numbers, and underscores (_).
298    /// - Names must begin and end with a letter or number.
299    /// - Names must not contain hyphens (-) or periods (.).
300    ///
301    /// This function can return an error in the following situations:
302    ///
303    /// - Errors from the underlying database creation process, converted using
304    /// `from_aws_sdk_error`.
305    async fn create_namespace(
306        &self,
307        namespace: &NamespaceIdent,
308        _properties: HashMap<String, String>,
309    ) -> Result<Namespace> {
310        if self.namespace_exists(namespace).await? {
311            return Err(Error::new(
312                ErrorKind::NamespaceAlreadyExists,
313                format!("Namespace {namespace:?} already exists"),
314            ));
315        }
316
317        let req = self
318            .s3tables_client
319            .create_namespace()
320            .table_bucket_arn(self.config.table_bucket_arn.clone())
321            .namespace(namespace.to_url_string());
322        req.send().await.map_err(from_aws_sdk_error)?;
323        Ok(Namespace::with_properties(
324            namespace.clone(),
325            HashMap::new(),
326        ))
327    }
328
329    /// Retrieves a namespace by its identifier.
330    ///
331    /// Validates the given namespace identifier and then queries the
332    /// underlying database client to fetch the corresponding namespace data.
333    /// Constructs a `Namespace` object with the retrieved data and returns it.
334    ///
335    /// This function can return an error in any of the following situations:
336    /// - If there is an error querying the database, returned by
337    /// `from_aws_sdk_error`.
338    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
339        if !self.namespace_exists(namespace).await? {
340            return Err(Error::new(
341                ErrorKind::NamespaceNotFound,
342                format!("Namespace {namespace:?} does not exist"),
343            ));
344        }
345
346        let req = self
347            .s3tables_client
348            .get_namespace()
349            .table_bucket_arn(self.config.table_bucket_arn.clone())
350            .namespace(namespace.to_url_string());
351        let resp: GetNamespaceOutput = req.send().await.map_err(from_aws_sdk_error)?;
352        let properties = HashMap::new();
353        Ok(Namespace::with_properties(
354            NamespaceIdent::from_vec(resp.namespace().to_vec())?,
355            properties,
356        ))
357    }
358
359    /// Checks if a namespace exists within the s3tables catalog.
360    ///
361    /// Validates the namespace identifier by querying the s3tables catalog
362    /// to determine if the specified namespace exists.
363    ///
364    /// # Returns
365    /// A `Result<bool>` indicating the outcome of the check:
366    /// - `Ok(true)` if the namespace exists.
367    /// - `Ok(false)` if the namespace does not exist, identified by a specific
368    /// `IsNotFoundException` variant.
369    /// - `Err(...)` if an error occurs during validation or the s3tables catalog
370    /// query, with the error encapsulating the issue.
371    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
372        let req = self
373            .s3tables_client
374            .get_namespace()
375            .table_bucket_arn(self.config.table_bucket_arn.clone())
376            .namespace(namespace.to_url_string());
377        match req.send().await {
378            Ok(_) => Ok(true),
379            Err(err) => {
380                if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
381                    Ok(false)
382                } else {
383                    Err(from_aws_sdk_error(err))
384                }
385            }
386        }
387    }
388
389    /// Updates the properties of an existing namespace.
390    ///
391    /// S3Tables doesn't support updating namespace properties, so this function
392    /// will always return an error.
393    async fn update_namespace(
394        &self,
395        _namespace: &NamespaceIdent,
396        _properties: HashMap<String, String>,
397    ) -> Result<()> {
398        Err(Error::new(
399            ErrorKind::FeatureUnsupported,
400            "Update namespace is not supported for s3tables catalog",
401        ))
402    }
403
404    /// Drops an existing namespace from the s3tables catalog.
405    ///
406    /// Validates the namespace identifier and then deletes the corresponding
407    /// namespace from the s3tables catalog.
408    ///
409    /// This function can return an error in the following situations:
410    /// - Errors from the underlying database deletion process, converted using
411    /// `from_aws_sdk_error`.
412    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
413        if !self.namespace_exists(namespace).await? {
414            return Err(Error::new(
415                ErrorKind::NamespaceNotFound,
416                format!("Namespace {namespace:?} does not exist"),
417            ));
418        }
419
420        let req = self
421            .s3tables_client
422            .delete_namespace()
423            .table_bucket_arn(self.config.table_bucket_arn.clone())
424            .namespace(namespace.to_url_string());
425        req.send().await.map_err(from_aws_sdk_error)?;
426        Ok(())
427    }
428
429    /// Lists all tables within a given namespace.
430    ///
431    /// Retrieves all tables associated with the specified namespace and returns
432    /// their identifiers.
433    ///
434    /// This function can return an error in the following situations:
435    /// - Errors from the underlying database query process, converted using
436    /// `from_aws_sdk_error`.
437    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
438        let mut result = Vec::new();
439        let mut continuation_token = None;
440        loop {
441            let mut req = self
442                .s3tables_client
443                .list_tables()
444                .table_bucket_arn(self.config.table_bucket_arn.clone())
445                .namespace(namespace.to_url_string());
446            if let Some(token) = continuation_token {
447                req = req.continuation_token(token);
448            }
449            let resp: ListTablesOutput = req.send().await.map_err(from_aws_sdk_error)?;
450            for table in resp.tables() {
451                result.push(TableIdent::new(
452                    NamespaceIdent::from_vec(table.namespace().to_vec())?,
453                    table.name().to_string(),
454                ));
455            }
456            continuation_token = resp.continuation_token().map(|s| s.to_string());
457            if continuation_token.is_none() {
458                break;
459            }
460        }
461        Ok(result)
462    }
463
464    /// Creates a new table within a specified namespace.
465    ///
466    /// Attempts to create a table defined by the `creation` parameter. The metadata
467    /// location is generated by the s3tables catalog, looks like:
468    ///
469    /// s3://{RANDOM WAREHOUSE LOCATION}/metadata/{VERSION}-{UUID}.metadata.json
470    ///
471    /// We have to get this random warehouse location after the table is created.
472    ///
473    /// This function can return an error in the following situations:
474    /// - If the location of the table is set by user, identified by a specific
475    /// `DataInvalid` variant.
476    /// - Errors from the underlying database creation process, converted using
477    /// `from_aws_sdk_error`.
478    async fn create_table(
479        &self,
480        namespace: &NamespaceIdent,
481        mut creation: TableCreation,
482    ) -> Result<Table> {
483        let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
484
485        // create table
486        let create_resp: CreateTableOutput = self
487            .s3tables_client
488            .create_table()
489            .table_bucket_arn(self.config.table_bucket_arn.clone())
490            .namespace(namespace.to_url_string())
491            .format(OpenTableFormat::Iceberg)
492            .name(table_ident.name())
493            .send()
494            .await
495            .map_err(from_aws_sdk_error)?;
496
497        // prepare table location. the warehouse location is generated by s3tables catalog,
498        // which looks like: s3://e6c9bf20-991a-46fb-kni5xs1q2yxi3xxdyxzjzigdeop1quse2b--table-s3
499        let table_location = match &creation.location {
500            Some(_) => {
501                return Err(Error::new(
502                    ErrorKind::DataInvalid,
503                    "The location of the table is generated by s3tables catalog, can't be set by user.",
504                ));
505            }
506            None => {
507                let get_resp: GetTableOutput = self
508                    .s3tables_client
509                    .get_table()
510                    .table_bucket_arn(self.config.table_bucket_arn.clone())
511                    .namespace(namespace.to_url_string())
512                    .name(table_ident.name())
513                    .send()
514                    .await
515                    .map_err(from_aws_sdk_error)?;
516                get_resp.warehouse_location().to_string()
517            }
518        };
519
520        // write metadata to file
521        creation.location = Some(table_location.clone());
522        let metadata = TableMetadataBuilder::from_table_creation(creation)?
523            .build()?
524            .metadata;
525        let metadata_location = MetadataLocation::new_with_metadata(table_location, &metadata);
526        metadata.write_to(&self.file_io, &metadata_location).await?;
527
528        // update metadata location
529        let metadata_location_str = metadata_location.to_string();
530        self.s3tables_client
531            .update_table_metadata_location()
532            .table_bucket_arn(self.config.table_bucket_arn.clone())
533            .namespace(namespace.to_url_string())
534            .name(table_ident.name())
535            .metadata_location(metadata_location_str.clone())
536            .version_token(create_resp.version_token())
537            .send()
538            .await
539            .map_err(from_aws_sdk_error)?;
540
541        let table = Table::builder()
542            .identifier(table_ident)
543            .metadata_location(metadata_location_str)
544            .metadata(metadata)
545            .file_io(self.file_io.clone())
546            .build()?;
547        Ok(table)
548    }
549
550    /// Loads an existing table from the s3tables catalog.
551    ///
552    /// Retrieves the metadata location of the specified table and constructs a
553    /// `Table` object with the retrieved metadata.
554    ///
555    /// This function can return an error in the following situations:
556    /// - If the table does not have a metadata location, identified by a specific
557    /// `Unexpected` variant.
558    /// - Errors from the underlying database query process, converted using
559    /// `from_aws_sdk_error`.
560    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
561        Ok(self.load_table_with_version_token(table_ident).await?.0)
562    }
563
564    /// Not supported for S3Tables. Use `purge_table` instead.
565    ///
566    /// S3 Tables doesn't support soft delete, so dropping a table will permanently remove it from the catalog.
567    async fn drop_table(&self, _table: &TableIdent) -> Result<()> {
568        Err(Error::new(
569            ErrorKind::FeatureUnsupported,
570            "drop_table is not supported for S3Tables; use purge_table instead",
571        ))
572    }
573
574    /// Purge a table from the S3 Tables catalog.
575    async fn purge_table(&self, table: &TableIdent) -> Result<()> {
576        let req = self
577            .s3tables_client
578            .delete_table()
579            .table_bucket_arn(self.config.table_bucket_arn.clone())
580            .namespace(table.namespace().to_url_string())
581            .name(table.name());
582        req.send().await.map_err(from_aws_sdk_error)?;
583        Ok(())
584    }
585
586    /// Checks if a table exists within the s3tables catalog.
587    ///
588    /// Validates the table identifier by querying the s3tables catalog
589    /// to determine if the specified table exists.
590    ///
591    /// # Returns
592    /// A `Result<bool>` indicating the outcome of the check:
593    /// - `Ok(true)` if the table exists.
594    /// - `Ok(false)` if the table does not exist, identified by a specific
595    /// `IsNotFoundException` variant.
596    /// - `Err(...)` if an error occurs during validation or the s3tables catalog
597    /// query, with the error encapsulating the issue.
598    async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
599        let req = self
600            .s3tables_client
601            .get_table()
602            .table_bucket_arn(self.config.table_bucket_arn.clone())
603            .namespace(table_ident.namespace().to_url_string())
604            .name(table_ident.name());
605        match req.send().await {
606            Ok(_) => Ok(true),
607            Err(err) => {
608                if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
609                    Ok(false)
610                } else {
611                    Err(from_aws_sdk_error(err))
612                }
613            }
614        }
615    }
616
617    /// Renames an existing table within the s3tables catalog.
618    ///
619    /// Validates the source and destination table identifiers and then renames
620    /// the source table to the destination table.
621    ///
622    /// This function can return an error in the following situations:
623    /// - Errors from the underlying database renaming process, converted using
624    /// `from_aws_sdk_error`.
625    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
626        let req = self
627            .s3tables_client
628            .rename_table()
629            .table_bucket_arn(self.config.table_bucket_arn.clone())
630            .namespace(src.namespace().to_url_string())
631            .name(src.name())
632            .new_namespace_name(dest.namespace().to_url_string())
633            .new_name(dest.name());
634        req.send().await.map_err(from_aws_sdk_error)?;
635        Ok(())
636    }
637
638    async fn register_table(
639        &self,
640        _table_ident: &TableIdent,
641        _metadata_location: String,
642    ) -> Result<Table> {
643        Err(Error::new(
644            ErrorKind::FeatureUnsupported,
645            "Registering a table is not supported yet",
646        ))
647    }
648
649    /// Updates an existing table within the s3tables catalog.
650    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
651        let table_ident = commit.identifier().clone();
652        let table_namespace = table_ident.namespace();
653        let (current_table, version_token) =
654            self.load_table_with_version_token(&table_ident).await?;
655
656        let staged_table = commit.apply(current_table)?;
657        let staged_metadata_location_str = staged_table.metadata_location_result()?;
658        let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
659
660        staged_table
661            .metadata()
662            .write_to(staged_table.file_io(), &staged_metadata_location)
663            .await?;
664
665        let builder = self
666            .s3tables_client
667            .update_table_metadata_location()
668            .table_bucket_arn(&self.config.table_bucket_arn)
669            .namespace(table_namespace.to_url_string())
670            .name(table_ident.name())
671            .version_token(version_token)
672            .metadata_location(staged_metadata_location_str);
673
674        let _ = builder.send().await.map_err(|e| {
675            let error = e.into_service_error();
676            match error {
677                UpdateTableMetadataLocationError::ConflictException(_) => Error::new(
678                    ErrorKind::CatalogCommitConflicts,
679                    format!("Commit conflicted for table: {table_ident}"),
680                )
681                .with_retryable(true),
682                UpdateTableMetadataLocationError::NotFoundException(_) => Error::new(
683                    ErrorKind::TableNotFound,
684                    format!("Table {table_ident} is not found"),
685                ),
686                _ => Error::new(
687                    ErrorKind::Unexpected,
688                    "Operation failed for hitting aws sdk error",
689                ),
690            }
691            .with_source(anyhow::Error::msg(format!("aws sdk error: {error:?}")))
692        })?;
693
694        Ok(staged_table)
695    }
696}
697
698/// Format AWS SDK error into iceberg error
699pub(crate) fn from_aws_sdk_error<T>(error: aws_sdk_s3tables::error::SdkError<T>) -> Error
700where T: std::fmt::Debug {
701    Error::new(
702        ErrorKind::Unexpected,
703        format!("Operation failed for hitting aws sdk error: {error:?}"),
704    )
705}
706
707#[cfg(test)]
708mod tests {
709    use futures::TryStreamExt;
710    use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
711    use iceberg::transaction::{ApplyTransactionAction, Transaction};
712
713    use super::*;
714
715    async fn load_s3tables_catalog_from_env() -> Result<Option<S3TablesCatalog>> {
716        let table_bucket_arn = match std::env::var("TABLE_BUCKET_ARN").ok() {
717            Some(table_bucket_arn) => table_bucket_arn,
718            None => return Ok(None),
719        };
720
721        let config = S3TablesCatalogConfig {
722            name: None,
723            table_bucket_arn,
724            endpoint_url: None,
725            client: None,
726            props: HashMap::new(),
727        };
728
729        Ok(Some(S3TablesCatalog::new(config, None).await?))
730    }
731
732    #[tokio::test]
733    async fn test_s3tables_list_namespace() {
734        let catalog = match load_s3tables_catalog_from_env().await {
735            Ok(Some(catalog)) => catalog,
736            Ok(None) => return,
737            Err(e) => panic!("Error loading catalog: {e}"),
738        };
739
740        let namespaces = catalog.list_namespaces(None).await.unwrap();
741        assert!(!namespaces.is_empty());
742    }
743
744    #[tokio::test]
745    async fn test_s3tables_list_tables() {
746        let catalog = match load_s3tables_catalog_from_env().await {
747            Ok(Some(catalog)) => catalog,
748            Ok(None) => return,
749            Err(e) => panic!("Error loading catalog: {e}"),
750        };
751
752        let tables = catalog
753            .list_tables(&NamespaceIdent::new("aws_s3_metadata".to_string()))
754            .await
755            .unwrap();
756        assert!(!tables.is_empty());
757    }
758
759    #[tokio::test]
760    async fn test_s3tables_load_table() {
761        let catalog = match load_s3tables_catalog_from_env().await {
762            Ok(Some(catalog)) => catalog,
763            Ok(None) => return,
764            Err(e) => panic!("Error loading catalog: {e}"),
765        };
766
767        let table = catalog
768            .load_table(&TableIdent::new(
769                NamespaceIdent::new("aws_s3_metadata".to_string()),
770                "query_storage_metadata".to_string(),
771            ))
772            .await
773            .unwrap();
774        println!("{table:?}");
775    }
776
777    #[tokio::test]
778    async fn test_s3tables_create_delete_namespace() {
779        let catalog = match load_s3tables_catalog_from_env().await {
780            Ok(Some(catalog)) => catalog,
781            Ok(None) => return,
782            Err(e) => panic!("Error loading catalog: {e}"),
783        };
784
785        let namespace = NamespaceIdent::new("test_s3tables_create_delete_namespace".to_string());
786        catalog
787            .create_namespace(&namespace, HashMap::new())
788            .await
789            .unwrap();
790        assert!(catalog.namespace_exists(&namespace).await.unwrap());
791        catalog.drop_namespace(&namespace).await.unwrap();
792        assert!(!catalog.namespace_exists(&namespace).await.unwrap());
793    }
794
795    #[tokio::test]
796    async fn test_s3tables_create_delete_table() {
797        let catalog = match load_s3tables_catalog_from_env().await {
798            Ok(Some(catalog)) => catalog,
799            Ok(None) => return,
800            Err(e) => panic!("Error loading catalog: {e}"),
801        };
802
803        let creation = {
804            let schema = Schema::builder()
805                .with_schema_id(0)
806                .with_fields(vec![
807                    NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
808                    NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
809                ])
810                .build()
811                .unwrap();
812            TableCreation::builder()
813                .name("test_s3tables_create_delete_table".to_string())
814                .properties(HashMap::new())
815                .schema(schema)
816                .build()
817        };
818
819        let namespace = NamespaceIdent::new("test_s3tables_create_delete_table".to_string());
820        let table_ident = TableIdent::new(
821            namespace.clone(),
822            "test_s3tables_create_delete_table".to_string(),
823        );
824        catalog.drop_namespace(&namespace).await.ok();
825        catalog.drop_table(&table_ident).await.ok();
826
827        catalog
828            .create_namespace(&namespace, HashMap::new())
829            .await
830            .unwrap();
831        catalog.create_table(&namespace, creation).await.unwrap();
832        assert!(catalog.table_exists(&table_ident).await.unwrap());
833        catalog.drop_table(&table_ident).await.unwrap();
834        assert!(!catalog.table_exists(&table_ident).await.unwrap());
835        catalog.drop_namespace(&namespace).await.unwrap();
836    }
837
838    #[tokio::test]
839    async fn test_s3tables_update_table() {
840        let catalog = match load_s3tables_catalog_from_env().await {
841            Ok(Some(catalog)) => catalog,
842            Ok(None) => return,
843            Err(e) => panic!("Error loading catalog: {e}"),
844        };
845
846        // Create a test namespace and table
847        let namespace = NamespaceIdent::new("test_s3tables_update_table".to_string());
848        let table_ident =
849            TableIdent::new(namespace.clone(), "test_s3tables_update_table".to_string());
850
851        // Clean up any existing resources from previous test runs
852        catalog.drop_table(&table_ident).await.ok();
853        catalog.drop_namespace(&namespace).await.ok();
854
855        // Create namespace and table
856        catalog
857            .create_namespace(&namespace, HashMap::new())
858            .await
859            .unwrap();
860
861        let creation = {
862            let schema = Schema::builder()
863                .with_schema_id(0)
864                .with_fields(vec![
865                    NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
866                    NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
867                ])
868                .build()
869                .unwrap();
870            TableCreation::builder()
871                .name(table_ident.name().to_string())
872                .properties(HashMap::new())
873                .schema(schema)
874                .build()
875        };
876
877        let table = catalog.create_table(&namespace, creation).await.unwrap();
878
879        // Create a transaction to update the table
880        let tx = Transaction::new(&table);
881
882        // Store the original metadata location for comparison
883        let original_metadata_location = table.metadata_location();
884
885        // Update table properties using the transaction
886        let tx = tx
887            .update_table_properties()
888            .set("test_property".to_string(), "test_value".to_string())
889            .apply(tx)
890            .unwrap();
891
892        // Commit the transaction to the catalog
893        let updated_table = tx.commit(&catalog).await.unwrap();
894
895        // Verify the update was successful
896        assert_eq!(
897            updated_table.metadata().properties().get("test_property"),
898            Some(&"test_value".to_string())
899        );
900
901        // Verify the metadata location has been updated
902        assert_ne!(
903            updated_table.metadata_location(),
904            original_metadata_location,
905            "Metadata location should be updated after commit"
906        );
907
908        // Load the table again from the catalog to verify changes were persisted
909        let reloaded_table = catalog.load_table(&table_ident).await.unwrap();
910
911        // Verify the reloaded table matches the updated table
912        assert_eq!(
913            reloaded_table.metadata().properties().get("test_property"),
914            Some(&"test_value".to_string())
915        );
916        assert_eq!(
917            reloaded_table.metadata_location(),
918            updated_table.metadata_location(),
919            "Reloaded table should have the same metadata location as the updated table"
920        );
921    }
922
923    #[tokio::test]
924    async fn test_builder_load_missing_bucket_arn() {
925        let builder = S3TablesCatalogBuilder::default();
926        let result = builder.load("s3tables", HashMap::new()).await;
927
928        assert!(result.is_err());
929        if let Err(err) = result {
930            assert_eq!(err.kind(), ErrorKind::DataInvalid);
931            assert_eq!(err.message(), "Table bucket ARN is required");
932        }
933    }
934
935    #[tokio::test]
936    async fn test_builder_with_endpoint_url_ok() {
937        let builder = S3TablesCatalogBuilder::default().with_endpoint_url("http://localhost:4566");
938
939        let result = builder
940            .load(
941                "s3tables",
942                HashMap::from([
943                    (
944                        S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
945                        "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
946                    ),
947                    ("some_prop".to_string(), "some_value".to_string()),
948                ]),
949            )
950            .await;
951
952        assert!(result.is_ok());
953    }
954
955    #[tokio::test]
956    async fn test_builder_with_client_ok() {
957        use aws_config::BehaviorVersion;
958
959        let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
960        let client = aws_sdk_s3tables::Client::new(&sdk_config);
961
962        let builder = S3TablesCatalogBuilder::default().with_client(client);
963        let result = builder
964            .load(
965                "s3tables",
966                HashMap::from([(
967                    S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
968                    "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
969                )]),
970            )
971            .await;
972
973        assert!(result.is_ok());
974    }
975
976    #[tokio::test]
977    async fn test_builder_with_table_bucket_arn() {
978        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
979        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
980
981        let result = builder.load("s3tables", HashMap::new()).await;
982
983        assert!(result.is_ok());
984        let catalog = result.unwrap();
985        assert_eq!(catalog.config.table_bucket_arn, test_arn);
986    }
987
988    #[tokio::test]
989    async fn test_builder_empty_table_bucket_arn_edge_cases() {
990        let mut props = HashMap::new();
991        props.insert(
992            S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
993            "".to_string(),
994        );
995
996        let builder = S3TablesCatalogBuilder::default();
997        let result = builder.load("s3tables", props).await;
998
999        assert!(result.is_err());
1000        if let Err(err) = result {
1001            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1002            assert_eq!(err.message(), "Table bucket ARN is required");
1003        }
1004    }
1005
1006    #[tokio::test]
1007    async fn test_endpoint_url_property_overrides_builder_method() {
1008        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1009        let builder_endpoint = "http://localhost:4566";
1010        let property_endpoint = "http://localhost:8080";
1011
1012        let builder = S3TablesCatalogBuilder::default()
1013            .with_table_bucket_arn(test_arn)
1014            .with_endpoint_url(builder_endpoint);
1015
1016        let mut props = HashMap::new();
1017        props.insert(
1018            S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1019            property_endpoint.to_string(),
1020        );
1021
1022        let result = builder.load("s3tables", props).await;
1023
1024        assert!(result.is_ok());
1025        let catalog = result.unwrap();
1026
1027        // Property value should override builder method value
1028        assert_eq!(
1029            catalog.config.endpoint_url,
1030            Some(property_endpoint.to_string())
1031        );
1032        assert_ne!(
1033            catalog.config.endpoint_url,
1034            Some(builder_endpoint.to_string())
1035        );
1036    }
1037
1038    #[tokio::test]
1039    async fn test_endpoint_url_builder_method_only() {
1040        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1041        let builder_endpoint = "http://localhost:4566";
1042
1043        let builder = S3TablesCatalogBuilder::default()
1044            .with_table_bucket_arn(test_arn)
1045            .with_endpoint_url(builder_endpoint);
1046
1047        let result = builder.load("s3tables", HashMap::new()).await;
1048
1049        assert!(result.is_ok());
1050        let catalog = result.unwrap();
1051
1052        assert_eq!(
1053            catalog.config.endpoint_url,
1054            Some(builder_endpoint.to_string())
1055        );
1056    }
1057
1058    #[tokio::test]
1059    async fn test_endpoint_url_property_only() {
1060        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1061        let property_endpoint = "http://localhost:8080";
1062
1063        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1064
1065        let mut props = HashMap::new();
1066        props.insert(
1067            S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1068            property_endpoint.to_string(),
1069        );
1070
1071        let result = builder.load("s3tables", props).await;
1072
1073        assert!(result.is_ok());
1074        let catalog = result.unwrap();
1075
1076        assert_eq!(
1077            catalog.config.endpoint_url,
1078            Some(property_endpoint.to_string())
1079        );
1080    }
1081
1082    #[tokio::test]
1083    async fn test_table_bucket_arn_property_overrides_builder_method() {
1084        let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1085        let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1086
1087        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1088
1089        let mut props = HashMap::new();
1090        props.insert(
1091            S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1092            property_arn.to_string(),
1093        );
1094
1095        let result = builder.load("s3tables", props).await;
1096
1097        assert!(result.is_ok());
1098        let catalog = result.unwrap();
1099
1100        assert_eq!(catalog.config.table_bucket_arn, property_arn);
1101        assert_ne!(catalog.config.table_bucket_arn, builder_arn);
1102    }
1103
1104    #[tokio::test]
1105    async fn test_table_bucket_arn_builder_method_only() {
1106        let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1107
1108        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1109
1110        let result = builder.load("s3tables", HashMap::new()).await;
1111
1112        assert!(result.is_ok());
1113        let catalog = result.unwrap();
1114
1115        assert_eq!(catalog.config.table_bucket_arn, builder_arn);
1116    }
1117
1118    #[tokio::test]
1119    async fn test_table_bucket_arn_property_only() {
1120        let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1121
1122        let builder = S3TablesCatalogBuilder::default();
1123
1124        let mut props = HashMap::new();
1125        props.insert(
1126            S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1127            property_arn.to_string(),
1128        );
1129
1130        let result = builder.load("s3tables", props).await;
1131
1132        assert!(result.is_ok());
1133        let catalog = result.unwrap();
1134
1135        assert_eq!(catalog.config.table_bucket_arn, property_arn);
1136    }
1137
1138    #[tokio::test]
1139    async fn test_builder_empty_name_validation() {
1140        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1141        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1142
1143        let result = builder.load("", HashMap::new()).await;
1144
1145        assert!(result.is_err());
1146        if let Err(err) = result {
1147            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1148            assert_eq!(err.message(), "Catalog name cannot be empty");
1149        }
1150    }
1151
1152    #[tokio::test]
1153    async fn test_builder_whitespace_only_name_validation() {
1154        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1155        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1156
1157        let result = builder.load("   \t\n  ", HashMap::new()).await;
1158
1159        assert!(result.is_err());
1160        if let Err(err) = result {
1161            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1162            assert_eq!(err.message(), "Catalog name cannot be empty");
1163        }
1164    }
1165
1166    #[tokio::test]
1167    async fn test_builder_name_validation_with_missing_arn() {
1168        let builder = S3TablesCatalogBuilder::default();
1169
1170        let result = builder.load("", HashMap::new()).await;
1171
1172        assert!(result.is_err());
1173        if let Err(err) = result {
1174            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1175            assert_eq!(err.message(), "Catalog name cannot be empty");
1176        }
1177    }
1178
1179    /// Verify that an S3 Table catalog can create a table, write data, load the same table, and read from it.
1180    #[tokio::test]
1181    async fn test_s3tables_create_table_write_load_table_read() {
1182        use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
1183        use iceberg::writer::file_writer::ParquetWriterBuilder;
1184        use iceberg::writer::file_writer::location_generator::{
1185            DefaultFileNameGenerator, DefaultLocationGenerator,
1186        };
1187        use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
1188        use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
1189
1190        let catalog = match load_s3tables_catalog_from_env().await {
1191            Ok(Some(c)) => c,
1192            Ok(None) => return,
1193            Err(e) => panic!("Error loading catalog: {e}"),
1194        };
1195
1196        let ns = NamespaceIdent::new(format!("test_rw_{}", uuid::Uuid::new_v4().simple()));
1197        catalog.create_namespace(&ns, HashMap::new()).await.unwrap();
1198
1199        let table_name = String::from("table");
1200
1201        let schema = Schema::builder()
1202            .with_fields(vec![
1203                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
1204            ])
1205            .build()
1206            .unwrap();
1207        let creation = TableCreation::builder()
1208            .name(table_name.clone())
1209            .schema(schema)
1210            .build();
1211
1212        let table = catalog.create_table(&ns, creation).await.unwrap();
1213
1214        // Write one row.
1215        let arrow_schema: Arc<arrow_schema::Schema> = Arc::new(
1216            table
1217                .metadata()
1218                .current_schema()
1219                .as_ref()
1220                .try_into()
1221                .unwrap(),
1222        );
1223        let batch = arrow_array::RecordBatch::try_new(arrow_schema, vec![Arc::new(
1224            arrow_array::Int32Array::from(vec![42]),
1225        )])
1226        .unwrap();
1227
1228        // Locations will be generated based on the table metadata, which will be using `s3://` for Amazon S3 Tables.
1229        let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
1230        let file_name_generator = DefaultFileNameGenerator::new(
1231            "test".to_string(),
1232            None,
1233            iceberg::spec::DataFileFormat::Parquet,
1234        );
1235        let parquet_writer_builder = ParquetWriterBuilder::new(
1236            parquet::file::properties::WriterProperties::default(),
1237            table.metadata().current_schema().clone(),
1238        );
1239        let rw = RollingFileWriterBuilder::new_with_default_file_size(
1240            parquet_writer_builder,
1241            table.file_io().clone(),
1242            location_generator,
1243            file_name_generator,
1244        );
1245        let mut writer = DataFileWriterBuilder::new(rw).build(None).await.unwrap();
1246        writer.write(batch.clone()).await.unwrap();
1247        let data_files = writer.close().await.unwrap();
1248
1249        let tx = Transaction::new(&table);
1250        let tx = tx
1251            .fast_append()
1252            .add_data_files(data_files)
1253            .apply(tx)
1254            .unwrap();
1255        tx.commit(&catalog).await.unwrap();
1256
1257        // Reload from catalog and read back.
1258        let table_ident = TableIdent::new(ns.clone(), table_name.clone());
1259        let reloaded = catalog.load_table(&table_ident).await.unwrap();
1260        let batches: Vec<arrow_array::RecordBatch> = reloaded
1261            .scan()
1262            .select_all()
1263            .build()
1264            .expect("scan to be valid (snapshot exists, schema is OK)")
1265            .to_arrow()
1266            .await
1267            .expect("scan tasks should be OK")
1268            .try_collect()
1269            .await
1270            .expect("scan should complete successfully");
1271
1272        assert_eq!(batches.len(), 1);
1273        assert_eq!(
1274            batches[0], batch,
1275            "read records should match records written earlier"
1276        );
1277
1278        // Clean up.
1279        catalog.purge_table(&table_ident).await.ok();
1280        catalog.drop_namespace(&ns).await.ok();
1281    }
1282}