iceberg_catalog_s3tables/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::future::Future;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use aws_sdk_s3tables::operation::create_table::CreateTableOutput;
25use aws_sdk_s3tables::operation::get_namespace::GetNamespaceOutput;
26use aws_sdk_s3tables::operation::get_table::GetTableOutput;
27use aws_sdk_s3tables::operation::list_tables::ListTablesOutput;
28use aws_sdk_s3tables::operation::update_table_metadata_location::UpdateTableMetadataLocationError;
29use aws_sdk_s3tables::types::OpenTableFormat;
30use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
31use iceberg::spec::{TableMetadata, TableMetadataBuilder};
32use iceberg::table::Table;
33use iceberg::{
34    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
35    Runtime, TableCommit, TableCreation, TableIdent,
36};
37use iceberg_storage_opendal::OpenDalStorageFactory;
38
39use crate::utils::create_sdk_config;
40
41/// S3Tables table bucket ARN property
42pub const S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN: &str = "table_bucket_arn";
43/// S3Tables endpoint URL property
44pub const S3TABLES_CATALOG_PROP_ENDPOINT_URL: &str = "endpoint_url";
45
46/// S3Tables catalog configuration.
47#[derive(Debug)]
48struct S3TablesCatalogConfig {
49    /// Catalog name.
50    name: Option<String>,
51    /// Unlike other buckets, S3Tables bucket is not a physical bucket, but a virtual bucket
52    /// that is managed by s3tables. We can't directly access the bucket with path like
53    /// s3://{bucket_name}/{file_path}, all the operations are done with respect of the bucket
54    /// ARN.
55    table_bucket_arn: String,
56    /// Endpoint URL for the catalog.
57    endpoint_url: Option<String>,
58    /// Optional pre-configured AWS SDK client for S3Tables.
59    client: Option<aws_sdk_s3tables::Client>,
60    /// Properties for the catalog. The available properties are:
61    /// - `profile_name`: The name of the AWS profile to use.
62    /// - `region_name`: The AWS region to use.
63    /// - `aws_access_key_id`: The AWS access key ID to use.
64    /// - `aws_secret_access_key`: The AWS secret access key to use.
65    /// - `aws_session_token`: The AWS session token to use.
66    props: HashMap<String, String>,
67}
68
69/// Builder for [`S3TablesCatalog`].
70#[derive(Debug)]
71pub struct S3TablesCatalogBuilder {
72    config: S3TablesCatalogConfig,
73    storage_factory: Option<Arc<dyn StorageFactory>>,
74    runtime: Option<Runtime>,
75}
76
77/// Default builder for [`S3TablesCatalog`].
78impl Default for S3TablesCatalogBuilder {
79    fn default() -> Self {
80        Self {
81            config: S3TablesCatalogConfig {
82                name: None,
83                table_bucket_arn: "".to_string(),
84                endpoint_url: None,
85                client: None,
86                props: HashMap::new(),
87            },
88            storage_factory: None,
89            runtime: None,
90        }
91    }
92}
93
94/// Builder methods for [`S3TablesCatalog`].
95impl S3TablesCatalogBuilder {
96    /// Configure the catalog with a custom endpoint URL (useful for local testing/mocking).
97    ///
98    /// # Behavior with Properties
99    ///
100    /// If both this method and the `endpoint_url` property are provided during catalog loading,
101    /// the property value will take precedence and overwrite the value set by this method.
102    /// This follows the general pattern where properties specified in the `load()` method
103    /// have higher priority than builder method configurations.
104    pub fn with_endpoint_url(mut self, endpoint_url: impl Into<String>) -> Self {
105        self.config.endpoint_url = Some(endpoint_url.into());
106        self
107    }
108
109    /// Configure the catalog with a pre-built AWS SDK client.
110    pub fn with_client(mut self, client: aws_sdk_s3tables::Client) -> Self {
111        self.config.client = Some(client);
112        self
113    }
114
115    /// Configure the catalog with a table bucket ARN.
116    ///
117    /// # Behavior with Properties
118    ///
119    /// If both this method and the `table_bucket_arn` property are provided during catalog loading,
120    /// the property value will take precedence and overwrite the value set by this method.
121    /// This follows the general pattern where properties specified in the `load()` method
122    /// have higher priority than builder method configurations.
123    pub fn with_table_bucket_arn(mut self, table_bucket_arn: impl Into<String>) -> Self {
124        self.config.table_bucket_arn = table_bucket_arn.into();
125        self
126    }
127}
128
129impl CatalogBuilder for S3TablesCatalogBuilder {
130    type C = S3TablesCatalog;
131
132    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
133        self.storage_factory = Some(storage_factory);
134        self
135    }
136
137    fn with_runtime(mut self, runtime: Runtime) -> Self {
138        self.runtime = Some(runtime);
139        self
140    }
141
142    fn load(
143        mut self,
144        name: impl Into<String>,
145        props: HashMap<String, String>,
146    ) -> impl Future<Output = Result<Self::C>> + Send {
147        let catalog_name = name.into();
148        self.config.name = Some(catalog_name.clone());
149
150        if props.contains_key(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN) {
151            self.config.table_bucket_arn = props
152                .get(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN)
153                .cloned()
154                .unwrap_or_default();
155        }
156
157        if props.contains_key(S3TABLES_CATALOG_PROP_ENDPOINT_URL) {
158            self.config.endpoint_url = props.get(S3TABLES_CATALOG_PROP_ENDPOINT_URL).cloned();
159        }
160
161        // Collect other remaining properties
162        self.config.props = props
163            .into_iter()
164            .filter(|(k, _)| {
165                k != S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN
166                    && k != S3TABLES_CATALOG_PROP_ENDPOINT_URL
167            })
168            .collect();
169
170        async move {
171            if catalog_name.trim().is_empty() {
172                Err(Error::new(
173                    ErrorKind::DataInvalid,
174                    "Catalog name cannot be empty",
175                ))
176            } else if self.config.table_bucket_arn.is_empty() {
177                Err(Error::new(
178                    ErrorKind::DataInvalid,
179                    "Table bucket ARN is required",
180                ))
181            } else {
182                let runtime = match self.runtime {
183                    Some(rt) => rt,
184                    None => Runtime::try_current()?,
185                };
186                S3TablesCatalog::new(self.config, self.storage_factory, runtime).await
187            }
188        }
189    }
190}
191
192/// S3Tables catalog implementation.
193#[derive(Debug)]
194pub struct S3TablesCatalog {
195    config: S3TablesCatalogConfig,
196    s3tables_client: aws_sdk_s3tables::Client,
197    file_io: FileIO,
198    runtime: Runtime,
199}
200
201impl S3TablesCatalog {
202    /// Creates a new S3Tables catalog.
203    async fn new(
204        config: S3TablesCatalogConfig,
205        storage_factory: Option<Arc<dyn StorageFactory>>,
206        runtime: Runtime,
207    ) -> Result<Self> {
208        let s3tables_client = if let Some(client) = config.client.clone() {
209            client
210        } else {
211            let aws_config = create_sdk_config(&config.props, config.endpoint_url.clone()).await;
212            aws_sdk_s3tables::Client::new(&aws_config)
213        };
214
215        // Use provided factory or default to OpenDalStorageFactory::S3
216        let factory = storage_factory.unwrap_or_else(|| {
217            Arc::new(OpenDalStorageFactory::S3 {
218                customized_credential_load: None,
219            })
220        });
221        let file_io = FileIOBuilder::new(factory)
222            .with_props(&config.props)
223            .build();
224
225        Ok(Self {
226            config,
227            s3tables_client,
228            file_io,
229            runtime,
230        })
231    }
232
233    async fn load_table_with_version_token(
234        &self,
235        table_ident: &TableIdent,
236    ) -> Result<(Table, String)> {
237        let req = self
238            .s3tables_client
239            .get_table()
240            .table_bucket_arn(self.config.table_bucket_arn.clone())
241            .namespace(table_ident.namespace().to_url_string())
242            .name(table_ident.name());
243        let resp: GetTableOutput = req.send().await.map_err(from_aws_sdk_error)?;
244
245        // when a table is created, it's possible that the metadata location is not set.
246        let metadata_location = resp.metadata_location().ok_or_else(|| {
247            Error::new(
248                ErrorKind::Unexpected,
249                format!(
250                    "Table {} does not have metadata location",
251                    table_ident.name()
252                ),
253            )
254        })?;
255        let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
256
257        let table = Table::builder()
258            .identifier(table_ident.clone())
259            .metadata(metadata)
260            .metadata_location(metadata_location)
261            .file_io(self.file_io.clone())
262            .runtime(self.runtime.clone())
263            .build()?;
264        Ok((table, resp.version_token))
265    }
266}
267
268#[async_trait]
269impl Catalog for S3TablesCatalog {
270    /// List namespaces from s3tables catalog.
271    ///
272    /// S3Tables doesn't support nested namespaces. If parent is provided, it will
273    /// return an empty list.
274    async fn list_namespaces(
275        &self,
276        parent: Option<&NamespaceIdent>,
277    ) -> Result<Vec<NamespaceIdent>> {
278        if parent.is_some() {
279            return Ok(vec![]);
280        }
281
282        let mut result = Vec::new();
283        let mut continuation_token = None;
284        loop {
285            let mut req = self
286                .s3tables_client
287                .list_namespaces()
288                .table_bucket_arn(self.config.table_bucket_arn.clone());
289            if let Some(token) = continuation_token {
290                req = req.continuation_token(token);
291            }
292            let resp = req.send().await.map_err(from_aws_sdk_error)?;
293            for ns in resp.namespaces() {
294                result.push(NamespaceIdent::from_vec(ns.namespace().to_vec())?);
295            }
296            continuation_token = resp.continuation_token().map(|s| s.to_string());
297            if continuation_token.is_none() {
298                break;
299            }
300        }
301        Ok(result)
302    }
303
304    /// Creates a new namespace with the given identifier and properties.
305    ///
306    /// Attempts to create a namespace defined by the `namespace`. The `properties`
307    /// parameter is ignored.
308    ///
309    /// The following naming rules apply to namespaces:
310    ///
311    /// - Names must be between 3 (min) and 63 (max) characters long.
312    /// - Names can consist only of lowercase letters, numbers, and underscores (_).
313    /// - Names must begin and end with a letter or number.
314    /// - Names must not contain hyphens (-) or periods (.).
315    ///
316    /// This function can return an error in the following situations:
317    ///
318    /// - Errors from the underlying database creation process, converted using
319    /// `from_aws_sdk_error`.
320    async fn create_namespace(
321        &self,
322        namespace: &NamespaceIdent,
323        _properties: HashMap<String, String>,
324    ) -> Result<Namespace> {
325        if self.namespace_exists(namespace).await? {
326            return Err(Error::new(
327                ErrorKind::NamespaceAlreadyExists,
328                format!("Namespace {namespace:?} already exists"),
329            ));
330        }
331
332        let req = self
333            .s3tables_client
334            .create_namespace()
335            .table_bucket_arn(self.config.table_bucket_arn.clone())
336            .namespace(namespace.to_url_string());
337        req.send().await.map_err(from_aws_sdk_error)?;
338        Ok(Namespace::with_properties(
339            namespace.clone(),
340            HashMap::new(),
341        ))
342    }
343
344    /// Retrieves a namespace by its identifier.
345    ///
346    /// Validates the given namespace identifier and then queries the
347    /// underlying database client to fetch the corresponding namespace data.
348    /// Constructs a `Namespace` object with the retrieved data and returns it.
349    ///
350    /// This function can return an error in any of the following situations:
351    /// - If there is an error querying the database, returned by
352    /// `from_aws_sdk_error`.
353    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
354        if !self.namespace_exists(namespace).await? {
355            return Err(Error::new(
356                ErrorKind::NamespaceNotFound,
357                format!("Namespace {namespace:?} does not exist"),
358            ));
359        }
360
361        let req = self
362            .s3tables_client
363            .get_namespace()
364            .table_bucket_arn(self.config.table_bucket_arn.clone())
365            .namespace(namespace.to_url_string());
366        let resp: GetNamespaceOutput = req.send().await.map_err(from_aws_sdk_error)?;
367        let properties = HashMap::new();
368        Ok(Namespace::with_properties(
369            NamespaceIdent::from_vec(resp.namespace().to_vec())?,
370            properties,
371        ))
372    }
373
374    /// Checks if a namespace exists within the s3tables catalog.
375    ///
376    /// Validates the namespace identifier by querying the s3tables catalog
377    /// to determine if the specified namespace exists.
378    ///
379    /// # Returns
380    /// A `Result<bool>` indicating the outcome of the check:
381    /// - `Ok(true)` if the namespace exists.
382    /// - `Ok(false)` if the namespace does not exist, identified by a specific
383    /// `IsNotFoundException` variant.
384    /// - `Err(...)` if an error occurs during validation or the s3tables catalog
385    /// query, with the error encapsulating the issue.
386    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
387        let req = self
388            .s3tables_client
389            .get_namespace()
390            .table_bucket_arn(self.config.table_bucket_arn.clone())
391            .namespace(namespace.to_url_string());
392        match req.send().await {
393            Ok(_) => Ok(true),
394            Err(err) => {
395                if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
396                    Ok(false)
397                } else {
398                    Err(from_aws_sdk_error(err))
399                }
400            }
401        }
402    }
403
404    /// Updates the properties of an existing namespace.
405    ///
406    /// S3Tables doesn't support updating namespace properties, so this function
407    /// will always return an error.
408    async fn update_namespace(
409        &self,
410        _namespace: &NamespaceIdent,
411        _properties: HashMap<String, String>,
412    ) -> Result<()> {
413        Err(Error::new(
414            ErrorKind::FeatureUnsupported,
415            "Update namespace is not supported for s3tables catalog",
416        ))
417    }
418
419    /// Drops an existing namespace from the s3tables catalog.
420    ///
421    /// Validates the namespace identifier and then deletes the corresponding
422    /// namespace from the s3tables catalog.
423    ///
424    /// This function can return an error in the following situations:
425    /// - Errors from the underlying database deletion process, converted using
426    /// `from_aws_sdk_error`.
427    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
428        if !self.namespace_exists(namespace).await? {
429            return Err(Error::new(
430                ErrorKind::NamespaceNotFound,
431                format!("Namespace {namespace:?} does not exist"),
432            ));
433        }
434
435        let req = self
436            .s3tables_client
437            .delete_namespace()
438            .table_bucket_arn(self.config.table_bucket_arn.clone())
439            .namespace(namespace.to_url_string());
440        req.send().await.map_err(from_aws_sdk_error)?;
441        Ok(())
442    }
443
444    /// Lists all tables within a given namespace.
445    ///
446    /// Retrieves all tables associated with the specified namespace and returns
447    /// their identifiers.
448    ///
449    /// This function can return an error in the following situations:
450    /// - Errors from the underlying database query process, converted using
451    /// `from_aws_sdk_error`.
452    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
453        let mut result = Vec::new();
454        let mut continuation_token = None;
455        loop {
456            let mut req = self
457                .s3tables_client
458                .list_tables()
459                .table_bucket_arn(self.config.table_bucket_arn.clone())
460                .namespace(namespace.to_url_string());
461            if let Some(token) = continuation_token {
462                req = req.continuation_token(token);
463            }
464            let resp: ListTablesOutput = req.send().await.map_err(from_aws_sdk_error)?;
465            for table in resp.tables() {
466                result.push(TableIdent::new(
467                    NamespaceIdent::from_vec(table.namespace().to_vec())?,
468                    table.name().to_string(),
469                ));
470            }
471            continuation_token = resp.continuation_token().map(|s| s.to_string());
472            if continuation_token.is_none() {
473                break;
474            }
475        }
476        Ok(result)
477    }
478
479    /// Creates a new table within a specified namespace.
480    ///
481    /// Attempts to create a table defined by the `creation` parameter. The metadata
482    /// location is generated by the s3tables catalog, looks like:
483    ///
484    /// s3://{RANDOM WAREHOUSE LOCATION}/metadata/{VERSION}-{UUID}.metadata.json
485    ///
486    /// We have to get this random warehouse location after the table is created.
487    ///
488    /// This function can return an error in the following situations:
489    /// - If the location of the table is set by user, identified by a specific
490    /// `DataInvalid` variant.
491    /// - Errors from the underlying database creation process, converted using
492    /// `from_aws_sdk_error`.
493    async fn create_table(
494        &self,
495        namespace: &NamespaceIdent,
496        mut creation: TableCreation,
497    ) -> Result<Table> {
498        let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
499
500        // create table
501        let create_resp: CreateTableOutput = self
502            .s3tables_client
503            .create_table()
504            .table_bucket_arn(self.config.table_bucket_arn.clone())
505            .namespace(namespace.to_url_string())
506            .format(OpenTableFormat::Iceberg)
507            .name(table_ident.name())
508            .send()
509            .await
510            .map_err(from_aws_sdk_error)?;
511
512        // prepare table location. the warehouse location is generated by s3tables catalog,
513        // which looks like: s3://e6c9bf20-991a-46fb-kni5xs1q2yxi3xxdyxzjzigdeop1quse2b--table-s3
514        let table_location = match &creation.location {
515            Some(_) => {
516                return Err(Error::new(
517                    ErrorKind::DataInvalid,
518                    "The location of the table is generated by s3tables catalog, can't be set by user.",
519                ));
520            }
521            None => {
522                let get_resp: GetTableOutput = self
523                    .s3tables_client
524                    .get_table()
525                    .table_bucket_arn(self.config.table_bucket_arn.clone())
526                    .namespace(namespace.to_url_string())
527                    .name(table_ident.name())
528                    .send()
529                    .await
530                    .map_err(from_aws_sdk_error)?;
531                get_resp.warehouse_location().to_string()
532            }
533        };
534
535        // write metadata to file
536        creation.location = Some(table_location.clone());
537        let metadata = TableMetadataBuilder::from_table_creation(creation)?
538            .build()?
539            .metadata;
540        let metadata_location = MetadataLocation::new_with_metadata(table_location, &metadata);
541        metadata.write_to(&self.file_io, &metadata_location).await?;
542
543        // update metadata location
544        let metadata_location_str = metadata_location.to_string();
545        self.s3tables_client
546            .update_table_metadata_location()
547            .table_bucket_arn(self.config.table_bucket_arn.clone())
548            .namespace(namespace.to_url_string())
549            .name(table_ident.name())
550            .metadata_location(metadata_location_str.clone())
551            .version_token(create_resp.version_token())
552            .send()
553            .await
554            .map_err(from_aws_sdk_error)?;
555
556        let table = Table::builder()
557            .identifier(table_ident)
558            .metadata_location(metadata_location_str)
559            .metadata(metadata)
560            .file_io(self.file_io.clone())
561            .runtime(self.runtime.clone())
562            .build()?;
563        Ok(table)
564    }
565
566    /// Loads an existing table from the s3tables catalog.
567    ///
568    /// Retrieves the metadata location of the specified table and constructs a
569    /// `Table` object with the retrieved metadata.
570    ///
571    /// This function can return an error in the following situations:
572    /// - If the table does not have a metadata location, identified by a specific
573    /// `Unexpected` variant.
574    /// - Errors from the underlying database query process, converted using
575    /// `from_aws_sdk_error`.
576    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
577        Ok(self.load_table_with_version_token(table_ident).await?.0)
578    }
579
580    /// Not supported for S3Tables. Use `purge_table` instead.
581    ///
582    /// S3 Tables doesn't support soft delete, so dropping a table will permanently remove it from the catalog.
583    async fn drop_table(&self, _table: &TableIdent) -> Result<()> {
584        Err(Error::new(
585            ErrorKind::FeatureUnsupported,
586            "drop_table is not supported for S3Tables; use purge_table instead",
587        ))
588    }
589
590    /// Purge a table from the S3 Tables catalog.
591    async fn purge_table(&self, table: &TableIdent) -> Result<()> {
592        let req = self
593            .s3tables_client
594            .delete_table()
595            .table_bucket_arn(self.config.table_bucket_arn.clone())
596            .namespace(table.namespace().to_url_string())
597            .name(table.name());
598        req.send().await.map_err(from_aws_sdk_error)?;
599        Ok(())
600    }
601
602    /// Checks if a table exists within the s3tables catalog.
603    ///
604    /// Validates the table identifier by querying the s3tables catalog
605    /// to determine if the specified table exists.
606    ///
607    /// # Returns
608    /// A `Result<bool>` indicating the outcome of the check:
609    /// - `Ok(true)` if the table exists.
610    /// - `Ok(false)` if the table does not exist, identified by a specific
611    /// `IsNotFoundException` variant.
612    /// - `Err(...)` if an error occurs during validation or the s3tables catalog
613    /// query, with the error encapsulating the issue.
614    async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
615        let req = self
616            .s3tables_client
617            .get_table()
618            .table_bucket_arn(self.config.table_bucket_arn.clone())
619            .namespace(table_ident.namespace().to_url_string())
620            .name(table_ident.name());
621        match req.send().await {
622            Ok(_) => Ok(true),
623            Err(err) => {
624                if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
625                    Ok(false)
626                } else {
627                    Err(from_aws_sdk_error(err))
628                }
629            }
630        }
631    }
632
633    /// Renames an existing table within the s3tables catalog.
634    ///
635    /// Validates the source and destination table identifiers and then renames
636    /// the source table to the destination table.
637    ///
638    /// This function can return an error in the following situations:
639    /// - Errors from the underlying database renaming process, converted using
640    /// `from_aws_sdk_error`.
641    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
642        let req = self
643            .s3tables_client
644            .rename_table()
645            .table_bucket_arn(self.config.table_bucket_arn.clone())
646            .namespace(src.namespace().to_url_string())
647            .name(src.name())
648            .new_namespace_name(dest.namespace().to_url_string())
649            .new_name(dest.name());
650        req.send().await.map_err(from_aws_sdk_error)?;
651        Ok(())
652    }
653
654    async fn register_table(
655        &self,
656        _table_ident: &TableIdent,
657        _metadata_location: String,
658    ) -> Result<Table> {
659        Err(Error::new(
660            ErrorKind::FeatureUnsupported,
661            "Registering a table is not supported yet",
662        ))
663    }
664
665    /// Updates an existing table within the s3tables catalog.
666    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
667        let table_ident = commit.identifier().clone();
668        let table_namespace = table_ident.namespace();
669        let (current_table, version_token) =
670            self.load_table_with_version_token(&table_ident).await?;
671
672        let staged_table = commit.apply(current_table)?;
673        let staged_metadata_location_str = staged_table.metadata_location_result()?;
674        let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
675
676        staged_table
677            .metadata()
678            .write_to(staged_table.file_io(), &staged_metadata_location)
679            .await?;
680
681        let builder = self
682            .s3tables_client
683            .update_table_metadata_location()
684            .table_bucket_arn(&self.config.table_bucket_arn)
685            .namespace(table_namespace.to_url_string())
686            .name(table_ident.name())
687            .version_token(version_token)
688            .metadata_location(staged_metadata_location_str);
689
690        let _ = builder.send().await.map_err(|e| {
691            let error = e.into_service_error();
692            match error {
693                UpdateTableMetadataLocationError::ConflictException(_) => Error::new(
694                    ErrorKind::CatalogCommitConflicts,
695                    format!("Commit conflicted for table: {table_ident}"),
696                )
697                .with_retryable(true),
698                UpdateTableMetadataLocationError::NotFoundException(_) => Error::new(
699                    ErrorKind::TableNotFound,
700                    format!("Table {table_ident} is not found"),
701                ),
702                _ => Error::new(
703                    ErrorKind::Unexpected,
704                    "Operation failed for hitting aws sdk error",
705                ),
706            }
707            .with_source(anyhow::Error::msg(format!("aws sdk error: {error:?}")))
708        })?;
709
710        Ok(staged_table)
711    }
712}
713
714/// Format AWS SDK error into iceberg error
715pub(crate) fn from_aws_sdk_error<T>(error: aws_sdk_s3tables::error::SdkError<T>) -> Error
716where T: std::fmt::Debug {
717    Error::new(
718        ErrorKind::Unexpected,
719        format!("Operation failed for hitting aws sdk error: {error:?}"),
720    )
721}
722
723#[cfg(test)]
724mod tests {
725    use futures::TryStreamExt;
726    use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
727    use iceberg::transaction::{ApplyTransactionAction, Transaction};
728
729    use super::*;
730
731    async fn load_s3tables_catalog_from_env() -> Result<Option<S3TablesCatalog>> {
732        let table_bucket_arn = match std::env::var("TABLE_BUCKET_ARN").ok() {
733            Some(table_bucket_arn) => table_bucket_arn,
734            None => return Ok(None),
735        };
736
737        let config = S3TablesCatalogConfig {
738            name: None,
739            table_bucket_arn,
740            endpoint_url: None,
741            client: None,
742            props: HashMap::new(),
743        };
744
745        Ok(Some(
746            S3TablesCatalog::new(config, None, Runtime::current()).await?,
747        ))
748    }
749
750    #[tokio::test]
751    async fn test_s3tables_list_namespace() {
752        let catalog = match load_s3tables_catalog_from_env().await {
753            Ok(Some(catalog)) => catalog,
754            Ok(None) => return,
755            Err(e) => panic!("Error loading catalog: {e}"),
756        };
757
758        let namespaces = catalog.list_namespaces(None).await.unwrap();
759        assert!(!namespaces.is_empty());
760    }
761
762    #[tokio::test]
763    async fn test_s3tables_list_tables() {
764        let catalog = match load_s3tables_catalog_from_env().await {
765            Ok(Some(catalog)) => catalog,
766            Ok(None) => return,
767            Err(e) => panic!("Error loading catalog: {e}"),
768        };
769
770        let tables = catalog
771            .list_tables(&NamespaceIdent::new("aws_s3_metadata".to_string()))
772            .await
773            .unwrap();
774        assert!(!tables.is_empty());
775    }
776
777    #[tokio::test]
778    async fn test_s3tables_load_table() {
779        let catalog = match load_s3tables_catalog_from_env().await {
780            Ok(Some(catalog)) => catalog,
781            Ok(None) => return,
782            Err(e) => panic!("Error loading catalog: {e}"),
783        };
784
785        let table = catalog
786            .load_table(&TableIdent::new(
787                NamespaceIdent::new("aws_s3_metadata".to_string()),
788                "query_storage_metadata".to_string(),
789            ))
790            .await
791            .unwrap();
792        println!("{table:?}");
793    }
794
795    #[tokio::test]
796    async fn test_s3tables_create_delete_namespace() {
797        let catalog = match load_s3tables_catalog_from_env().await {
798            Ok(Some(catalog)) => catalog,
799            Ok(None) => return,
800            Err(e) => panic!("Error loading catalog: {e}"),
801        };
802
803        let namespace = NamespaceIdent::new("test_s3tables_create_delete_namespace".to_string());
804        catalog
805            .create_namespace(&namespace, HashMap::new())
806            .await
807            .unwrap();
808        assert!(catalog.namespace_exists(&namespace).await.unwrap());
809        catalog.drop_namespace(&namespace).await.unwrap();
810        assert!(!catalog.namespace_exists(&namespace).await.unwrap());
811    }
812
813    #[tokio::test]
814    async fn test_s3tables_create_delete_table() {
815        let catalog = match load_s3tables_catalog_from_env().await {
816            Ok(Some(catalog)) => catalog,
817            Ok(None) => return,
818            Err(e) => panic!("Error loading catalog: {e}"),
819        };
820
821        let creation = {
822            let schema = Schema::builder()
823                .with_schema_id(0)
824                .with_fields(vec![
825                    NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
826                    NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
827                ])
828                .build()
829                .unwrap();
830            TableCreation::builder()
831                .name("test_s3tables_create_delete_table".to_string())
832                .properties(HashMap::new())
833                .schema(schema)
834                .build()
835        };
836
837        let namespace = NamespaceIdent::new("test_s3tables_create_delete_table".to_string());
838        let table_ident = TableIdent::new(
839            namespace.clone(),
840            "test_s3tables_create_delete_table".to_string(),
841        );
842        catalog.drop_namespace(&namespace).await.ok();
843        catalog.drop_table(&table_ident).await.ok();
844
845        catalog
846            .create_namespace(&namespace, HashMap::new())
847            .await
848            .unwrap();
849        catalog.create_table(&namespace, creation).await.unwrap();
850        assert!(catalog.table_exists(&table_ident).await.unwrap());
851        catalog.drop_table(&table_ident).await.unwrap();
852        assert!(!catalog.table_exists(&table_ident).await.unwrap());
853        catalog.drop_namespace(&namespace).await.unwrap();
854    }
855
856    #[tokio::test]
857    async fn test_s3tables_update_table() {
858        let catalog = match load_s3tables_catalog_from_env().await {
859            Ok(Some(catalog)) => catalog,
860            Ok(None) => return,
861            Err(e) => panic!("Error loading catalog: {e}"),
862        };
863
864        // Create a test namespace and table
865        let namespace = NamespaceIdent::new("test_s3tables_update_table".to_string());
866        let table_ident =
867            TableIdent::new(namespace.clone(), "test_s3tables_update_table".to_string());
868
869        // Clean up any existing resources from previous test runs
870        catalog.drop_table(&table_ident).await.ok();
871        catalog.drop_namespace(&namespace).await.ok();
872
873        // Create namespace and table
874        catalog
875            .create_namespace(&namespace, HashMap::new())
876            .await
877            .unwrap();
878
879        let creation = {
880            let schema = Schema::builder()
881                .with_schema_id(0)
882                .with_fields(vec![
883                    NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
884                    NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
885                ])
886                .build()
887                .unwrap();
888            TableCreation::builder()
889                .name(table_ident.name().to_string())
890                .properties(HashMap::new())
891                .schema(schema)
892                .build()
893        };
894
895        let table = catalog.create_table(&namespace, creation).await.unwrap();
896
897        // Create a transaction to update the table
898        let tx = Transaction::new(&table);
899
900        // Store the original metadata location for comparison
901        let original_metadata_location = table.metadata_location();
902
903        // Update table properties using the transaction
904        let tx = tx
905            .update_table_properties()
906            .set("test_property".to_string(), "test_value".to_string())
907            .apply(tx)
908            .unwrap();
909
910        // Commit the transaction to the catalog
911        let updated_table = tx.commit(&catalog).await.unwrap();
912
913        // Verify the update was successful
914        assert_eq!(
915            updated_table.metadata().properties().get("test_property"),
916            Some(&"test_value".to_string())
917        );
918
919        // Verify the metadata location has been updated
920        assert_ne!(
921            updated_table.metadata_location(),
922            original_metadata_location,
923            "Metadata location should be updated after commit"
924        );
925
926        // Load the table again from the catalog to verify changes were persisted
927        let reloaded_table = catalog.load_table(&table_ident).await.unwrap();
928
929        // Verify the reloaded table matches the updated table
930        assert_eq!(
931            reloaded_table.metadata().properties().get("test_property"),
932            Some(&"test_value".to_string())
933        );
934        assert_eq!(
935            reloaded_table.metadata_location(),
936            updated_table.metadata_location(),
937            "Reloaded table should have the same metadata location as the updated table"
938        );
939    }
940
941    #[tokio::test]
942    async fn test_builder_load_missing_bucket_arn() {
943        let builder = S3TablesCatalogBuilder::default();
944        let result = builder.load("s3tables", HashMap::new()).await;
945
946        assert!(result.is_err());
947        if let Err(err) = result {
948            assert_eq!(err.kind(), ErrorKind::DataInvalid);
949            assert_eq!(err.message(), "Table bucket ARN is required");
950        }
951    }
952
953    #[tokio::test]
954    async fn test_builder_with_endpoint_url_ok() {
955        let builder = S3TablesCatalogBuilder::default().with_endpoint_url("http://localhost:4566");
956
957        let result = builder
958            .load(
959                "s3tables",
960                HashMap::from([
961                    (
962                        S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
963                        "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
964                    ),
965                    ("some_prop".to_string(), "some_value".to_string()),
966                ]),
967            )
968            .await;
969
970        assert!(result.is_ok());
971    }
972
973    #[tokio::test]
974    async fn test_builder_with_client_ok() {
975        use aws_config::BehaviorVersion;
976
977        let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
978        let client = aws_sdk_s3tables::Client::new(&sdk_config);
979
980        let builder = S3TablesCatalogBuilder::default().with_client(client);
981        let result = builder
982            .load(
983                "s3tables",
984                HashMap::from([(
985                    S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
986                    "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
987                )]),
988            )
989            .await;
990
991        assert!(result.is_ok());
992    }
993
994    #[tokio::test]
995    async fn test_builder_with_table_bucket_arn() {
996        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
997        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
998
999        let result = builder.load("s3tables", HashMap::new()).await;
1000
1001        assert!(result.is_ok());
1002        let catalog = result.unwrap();
1003        assert_eq!(catalog.config.table_bucket_arn, test_arn);
1004    }
1005
1006    #[tokio::test]
1007    async fn test_builder_empty_table_bucket_arn_edge_cases() {
1008        let mut props = HashMap::new();
1009        props.insert(
1010            S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1011            "".to_string(),
1012        );
1013
1014        let builder = S3TablesCatalogBuilder::default();
1015        let result = builder.load("s3tables", props).await;
1016
1017        assert!(result.is_err());
1018        if let Err(err) = result {
1019            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1020            assert_eq!(err.message(), "Table bucket ARN is required");
1021        }
1022    }
1023
1024    #[tokio::test]
1025    async fn test_endpoint_url_property_overrides_builder_method() {
1026        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1027        let builder_endpoint = "http://localhost:4566";
1028        let property_endpoint = "http://localhost:8080";
1029
1030        let builder = S3TablesCatalogBuilder::default()
1031            .with_table_bucket_arn(test_arn)
1032            .with_endpoint_url(builder_endpoint);
1033
1034        let mut props = HashMap::new();
1035        props.insert(
1036            S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1037            property_endpoint.to_string(),
1038        );
1039
1040        let result = builder.load("s3tables", props).await;
1041
1042        assert!(result.is_ok());
1043        let catalog = result.unwrap();
1044
1045        // Property value should override builder method value
1046        assert_eq!(
1047            catalog.config.endpoint_url,
1048            Some(property_endpoint.to_string())
1049        );
1050        assert_ne!(
1051            catalog.config.endpoint_url,
1052            Some(builder_endpoint.to_string())
1053        );
1054    }
1055
1056    #[tokio::test]
1057    async fn test_endpoint_url_builder_method_only() {
1058        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1059        let builder_endpoint = "http://localhost:4566";
1060
1061        let builder = S3TablesCatalogBuilder::default()
1062            .with_table_bucket_arn(test_arn)
1063            .with_endpoint_url(builder_endpoint);
1064
1065        let result = builder.load("s3tables", HashMap::new()).await;
1066
1067        assert!(result.is_ok());
1068        let catalog = result.unwrap();
1069
1070        assert_eq!(
1071            catalog.config.endpoint_url,
1072            Some(builder_endpoint.to_string())
1073        );
1074    }
1075
1076    #[tokio::test]
1077    async fn test_endpoint_url_property_only() {
1078        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1079        let property_endpoint = "http://localhost:8080";
1080
1081        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1082
1083        let mut props = HashMap::new();
1084        props.insert(
1085            S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1086            property_endpoint.to_string(),
1087        );
1088
1089        let result = builder.load("s3tables", props).await;
1090
1091        assert!(result.is_ok());
1092        let catalog = result.unwrap();
1093
1094        assert_eq!(
1095            catalog.config.endpoint_url,
1096            Some(property_endpoint.to_string())
1097        );
1098    }
1099
1100    #[tokio::test]
1101    async fn test_table_bucket_arn_property_overrides_builder_method() {
1102        let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1103        let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1104
1105        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1106
1107        let mut props = HashMap::new();
1108        props.insert(
1109            S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1110            property_arn.to_string(),
1111        );
1112
1113        let result = builder.load("s3tables", props).await;
1114
1115        assert!(result.is_ok());
1116        let catalog = result.unwrap();
1117
1118        assert_eq!(catalog.config.table_bucket_arn, property_arn);
1119        assert_ne!(catalog.config.table_bucket_arn, builder_arn);
1120    }
1121
1122    #[tokio::test]
1123    async fn test_table_bucket_arn_builder_method_only() {
1124        let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1125
1126        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1127
1128        let result = builder.load("s3tables", HashMap::new()).await;
1129
1130        assert!(result.is_ok());
1131        let catalog = result.unwrap();
1132
1133        assert_eq!(catalog.config.table_bucket_arn, builder_arn);
1134    }
1135
1136    #[tokio::test]
1137    async fn test_table_bucket_arn_property_only() {
1138        let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1139
1140        let builder = S3TablesCatalogBuilder::default();
1141
1142        let mut props = HashMap::new();
1143        props.insert(
1144            S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1145            property_arn.to_string(),
1146        );
1147
1148        let result = builder.load("s3tables", props).await;
1149
1150        assert!(result.is_ok());
1151        let catalog = result.unwrap();
1152
1153        assert_eq!(catalog.config.table_bucket_arn, property_arn);
1154    }
1155
1156    #[tokio::test]
1157    async fn test_builder_empty_name_validation() {
1158        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1159        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1160
1161        let result = builder.load("", HashMap::new()).await;
1162
1163        assert!(result.is_err());
1164        if let Err(err) = result {
1165            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1166            assert_eq!(err.message(), "Catalog name cannot be empty");
1167        }
1168    }
1169
1170    #[tokio::test]
1171    async fn test_builder_whitespace_only_name_validation() {
1172        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1173        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1174
1175        let result = builder.load("   \t\n  ", HashMap::new()).await;
1176
1177        assert!(result.is_err());
1178        if let Err(err) = result {
1179            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1180            assert_eq!(err.message(), "Catalog name cannot be empty");
1181        }
1182    }
1183
1184    #[tokio::test]
1185    async fn test_builder_name_validation_with_missing_arn() {
1186        let builder = S3TablesCatalogBuilder::default();
1187
1188        let result = builder.load("", HashMap::new()).await;
1189
1190        assert!(result.is_err());
1191        if let Err(err) = result {
1192            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1193            assert_eq!(err.message(), "Catalog name cannot be empty");
1194        }
1195    }
1196
1197    /// Verify that an S3 Table catalog can create a table, write data, load the same table, and read from it.
1198    #[tokio::test]
1199    async fn test_s3tables_create_table_write_load_table_read() {
1200        use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
1201        use iceberg::writer::file_writer::ParquetWriterBuilder;
1202        use iceberg::writer::file_writer::location_generator::{
1203            DefaultFileNameGenerator, DefaultLocationGenerator,
1204        };
1205        use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
1206        use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
1207
1208        let catalog = match load_s3tables_catalog_from_env().await {
1209            Ok(Some(c)) => c,
1210            Ok(None) => return,
1211            Err(e) => panic!("Error loading catalog: {e}"),
1212        };
1213
1214        let ns = NamespaceIdent::new(format!("test_rw_{}", uuid::Uuid::new_v4().simple()));
1215        catalog.create_namespace(&ns, HashMap::new()).await.unwrap();
1216
1217        let table_name = String::from("table");
1218
1219        let schema = Schema::builder()
1220            .with_fields(vec![
1221                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
1222            ])
1223            .build()
1224            .unwrap();
1225        let creation = TableCreation::builder()
1226            .name(table_name.clone())
1227            .schema(schema)
1228            .build();
1229
1230        let table = catalog.create_table(&ns, creation).await.unwrap();
1231
1232        // Write one row.
1233        let arrow_schema: Arc<arrow_schema::Schema> = Arc::new(
1234            table
1235                .metadata()
1236                .current_schema()
1237                .as_ref()
1238                .try_into()
1239                .unwrap(),
1240        );
1241        let batch = arrow_array::RecordBatch::try_new(arrow_schema, vec![Arc::new(
1242            arrow_array::Int32Array::from(vec![42]),
1243        )])
1244        .unwrap();
1245
1246        // Locations will be generated based on the table metadata, which will be using `s3://` for Amazon S3 Tables.
1247        let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
1248        let file_name_generator = DefaultFileNameGenerator::new(
1249            "test".to_string(),
1250            None,
1251            iceberg::spec::DataFileFormat::Parquet,
1252        );
1253        let parquet_writer_builder = ParquetWriterBuilder::new(
1254            parquet::file::properties::WriterProperties::default(),
1255            table.metadata().current_schema().clone(),
1256        );
1257        let rw = RollingFileWriterBuilder::new_with_default_file_size(
1258            parquet_writer_builder,
1259            table.file_io().clone(),
1260            location_generator,
1261            file_name_generator,
1262        );
1263        let mut writer = DataFileWriterBuilder::new(rw).build(None).await.unwrap();
1264        writer.write(batch.clone()).await.unwrap();
1265        let data_files = writer.close().await.unwrap();
1266
1267        let tx = Transaction::new(&table);
1268        let tx = tx
1269            .fast_append()
1270            .add_data_files(data_files)
1271            .apply(tx)
1272            .unwrap();
1273        tx.commit(&catalog).await.unwrap();
1274
1275        // Reload from catalog and read back.
1276        let table_ident = TableIdent::new(ns.clone(), table_name.clone());
1277        let reloaded = catalog.load_table(&table_ident).await.unwrap();
1278        let batches: Vec<arrow_array::RecordBatch> = reloaded
1279            .scan()
1280            .select_all()
1281            .build()
1282            .expect("scan to be valid (snapshot exists, schema is OK)")
1283            .to_arrow()
1284            .await
1285            .expect("scan tasks should be OK")
1286            .try_collect()
1287            .await
1288            .expect("scan should complete successfully");
1289
1290        assert_eq!(batches.len(), 1);
1291        assert_eq!(
1292            batches[0], batch,
1293            "read records should match records written earlier"
1294        );
1295
1296        // Clean up.
1297        catalog.purge_table(&table_ident).await.ok();
1298        catalog.drop_namespace(&ns).await.ok();
1299    }
1300}