Skip to main content

iceberg_catalog_s3tables/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::future::Future;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use aws_sdk_s3tables::operation::create_table::CreateTableOutput;
25use aws_sdk_s3tables::operation::get_namespace::GetNamespaceOutput;
26use aws_sdk_s3tables::operation::get_table::{GetTableError, GetTableOutput};
27use aws_sdk_s3tables::operation::list_tables::ListTablesOutput;
28use aws_sdk_s3tables::operation::update_table_metadata_location::UpdateTableMetadataLocationError;
29use aws_sdk_s3tables::types::OpenTableFormat;
30use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
31use iceberg::spec::{TableMetadata, TableMetadataBuilder};
32use iceberg::table::Table;
33use iceberg::{
34    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
35    Runtime, TableCommit, TableCreation, TableIdent,
36};
37use iceberg_storage_opendal::OpenDalStorageFactory;
38
39use crate::utils::create_sdk_config;
40
41/// S3Tables table bucket ARN property
42pub const S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN: &str = "table_bucket_arn";
43/// S3Tables endpoint URL property
44pub const S3TABLES_CATALOG_PROP_ENDPOINT_URL: &str = "endpoint_url";
45
46/// S3Tables catalog configuration.
47#[derive(Debug)]
48struct S3TablesCatalogConfig {
49    /// Catalog name.
50    name: Option<String>,
51    /// Unlike other buckets, S3Tables bucket is not a physical bucket, but a virtual bucket
52    /// that is managed by s3tables. We can't directly access the bucket with path like
53    /// s3://{bucket_name}/{file_path}, all the operations are done with respect of the bucket
54    /// ARN.
55    table_bucket_arn: String,
56    /// Endpoint URL for the catalog.
57    endpoint_url: Option<String>,
58    /// Optional pre-configured AWS SDK client for S3Tables.
59    client: Option<aws_sdk_s3tables::Client>,
60    /// Properties for the catalog. The available properties are:
61    /// - `profile_name`: The name of the AWS profile to use.
62    /// - `region_name`: The AWS region to use.
63    /// - `aws_access_key_id`: The AWS access key ID to use.
64    /// - `aws_secret_access_key`: The AWS secret access key to use.
65    /// - `aws_session_token`: The AWS session token to use.
66    props: HashMap<String, String>,
67}
68
69/// Builder for [`S3TablesCatalog`].
70#[derive(Debug)]
71pub struct S3TablesCatalogBuilder {
72    config: S3TablesCatalogConfig,
73    storage_factory: Option<Arc<dyn StorageFactory>>,
74    runtime: Option<Runtime>,
75}
76
77/// Default builder for [`S3TablesCatalog`].
78impl Default for S3TablesCatalogBuilder {
79    fn default() -> Self {
80        Self {
81            config: S3TablesCatalogConfig {
82                name: None,
83                table_bucket_arn: "".to_string(),
84                endpoint_url: None,
85                client: None,
86                props: HashMap::new(),
87            },
88            storage_factory: None,
89            runtime: None,
90        }
91    }
92}
93
94/// Builder methods for [`S3TablesCatalog`].
95impl S3TablesCatalogBuilder {
96    /// Configure the catalog with a custom endpoint URL (useful for local testing/mocking).
97    ///
98    /// # Behavior with Properties
99    ///
100    /// If both this method and the `endpoint_url` property are provided during catalog loading,
101    /// the property value will take precedence and overwrite the value set by this method.
102    /// This follows the general pattern where properties specified in the `load()` method
103    /// have higher priority than builder method configurations.
104    pub fn with_endpoint_url(mut self, endpoint_url: impl Into<String>) -> Self {
105        self.config.endpoint_url = Some(endpoint_url.into());
106        self
107    }
108
109    /// Configure the catalog with a pre-built AWS SDK client.
110    pub fn with_client(mut self, client: aws_sdk_s3tables::Client) -> Self {
111        self.config.client = Some(client);
112        self
113    }
114
115    /// Configure the catalog with a table bucket ARN.
116    ///
117    /// # Behavior with Properties
118    ///
119    /// If both this method and the `table_bucket_arn` property are provided during catalog loading,
120    /// the property value will take precedence and overwrite the value set by this method.
121    /// This follows the general pattern where properties specified in the `load()` method
122    /// have higher priority than builder method configurations.
123    pub fn with_table_bucket_arn(mut self, table_bucket_arn: impl Into<String>) -> Self {
124        self.config.table_bucket_arn = table_bucket_arn.into();
125        self
126    }
127}
128
129impl CatalogBuilder for S3TablesCatalogBuilder {
130    type C = S3TablesCatalog;
131
132    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
133        self.storage_factory = Some(storage_factory);
134        self
135    }
136
137    fn with_runtime(mut self, runtime: Runtime) -> Self {
138        self.runtime = Some(runtime);
139        self
140    }
141
142    fn load(
143        mut self,
144        name: impl Into<String>,
145        props: HashMap<String, String>,
146    ) -> impl Future<Output = Result<Self::C>> + Send {
147        let catalog_name = name.into();
148        self.config.name = Some(catalog_name.clone());
149
150        if props.contains_key(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN) {
151            self.config.table_bucket_arn = props
152                .get(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN)
153                .cloned()
154                .unwrap_or_default();
155        }
156
157        if props.contains_key(S3TABLES_CATALOG_PROP_ENDPOINT_URL) {
158            self.config.endpoint_url = props.get(S3TABLES_CATALOG_PROP_ENDPOINT_URL).cloned();
159        }
160
161        // Collect other remaining properties
162        self.config.props = props
163            .into_iter()
164            .filter(|(k, _)| {
165                k != S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN
166                    && k != S3TABLES_CATALOG_PROP_ENDPOINT_URL
167            })
168            .collect();
169
170        async move {
171            if catalog_name.trim().is_empty() {
172                Err(Error::new(
173                    ErrorKind::DataInvalid,
174                    "Catalog name cannot be empty",
175                ))
176            } else if self.config.table_bucket_arn.is_empty() {
177                Err(Error::new(
178                    ErrorKind::DataInvalid,
179                    "Table bucket ARN is required",
180                ))
181            } else {
182                let runtime = match self.runtime {
183                    Some(rt) => rt,
184                    None => Runtime::try_current()?,
185                };
186                S3TablesCatalog::new(self.config, self.storage_factory, runtime).await
187            }
188        }
189    }
190}
191
192/// S3Tables catalog implementation.
193#[derive(Debug)]
194pub struct S3TablesCatalog {
195    config: S3TablesCatalogConfig,
196    s3tables_client: aws_sdk_s3tables::Client,
197    file_io: FileIO,
198    runtime: Runtime,
199}
200
201impl S3TablesCatalog {
202    /// Creates a new S3Tables catalog.
203    async fn new(
204        config: S3TablesCatalogConfig,
205        storage_factory: Option<Arc<dyn StorageFactory>>,
206        runtime: Runtime,
207    ) -> Result<Self> {
208        let s3tables_client = if let Some(client) = config.client.clone() {
209            client
210        } else {
211            let aws_config = create_sdk_config(&config.props, config.endpoint_url.clone()).await;
212            aws_sdk_s3tables::Client::new(&aws_config)
213        };
214
215        // Use provided factory or default to OpenDalStorageFactory::S3
216        let factory = storage_factory.unwrap_or_else(|| {
217            Arc::new(OpenDalStorageFactory::S3 {
218                customized_credential_load: None,
219            })
220        });
221        let file_io = FileIOBuilder::new(factory)
222            .with_props(&config.props)
223            .build();
224
225        Ok(Self {
226            config,
227            s3tables_client,
228            file_io,
229            runtime,
230        })
231    }
232
233    async fn load_table_with_version_token(
234        &self,
235        table_ident: &TableIdent,
236    ) -> Result<(Table, String)> {
237        let req = self
238            .s3tables_client
239            .get_table()
240            .table_bucket_arn(self.config.table_bucket_arn.clone())
241            .namespace(table_ident.namespace().to_url_string())
242            .name(table_ident.name());
243
244        let resp = req.send().await.map_err(|err| {
245            if err
246                .as_service_error()
247                .is_some_and(GetTableError::is_not_found_exception)
248            {
249                // S3 Tables GetTable API only reports that the resource was not found, and does not distinguish between namespaces and tables.
250                // https://docs.aws.amazon.com/AmazonS3/latest/API/API_s3Buckets_GetTable.html#API_s3Buckets_GetTable_Errors
251                Error::new(
252                    ErrorKind::TableNotFound,
253                    format!("Table {table_ident} is not found, either because the namespace or table did not exist"),
254                )
255                .with_source(err)
256            } else {
257                from_aws_sdk_error(err)
258            }
259        })?;
260
261        // when a table is created, it's possible that the metadata location is not set.
262        let metadata_location = resp.metadata_location().ok_or_else(|| {
263            Error::new(
264                ErrorKind::Unexpected,
265                format!(
266                    "Table {} does not have metadata location",
267                    table_ident.name()
268                ),
269            )
270        })?;
271        let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
272
273        let table = Table::builder()
274            .identifier(table_ident.clone())
275            .metadata(metadata)
276            .metadata_location(metadata_location)
277            .file_io(self.file_io.clone())
278            .runtime(self.runtime.clone())
279            .build()?;
280        Ok((table, resp.version_token))
281    }
282}
283
284#[async_trait]
285impl Catalog for S3TablesCatalog {
286    /// List namespaces from s3tables catalog.
287    ///
288    /// S3Tables doesn't support nested namespaces. If parent is provided, it will
289    /// return an empty list.
290    async fn list_namespaces(
291        &self,
292        parent: Option<&NamespaceIdent>,
293    ) -> Result<Vec<NamespaceIdent>> {
294        if parent.is_some() {
295            return Ok(vec![]);
296        }
297
298        let mut result = Vec::new();
299        let mut continuation_token = None;
300        loop {
301            let mut req = self
302                .s3tables_client
303                .list_namespaces()
304                .table_bucket_arn(self.config.table_bucket_arn.clone());
305            if let Some(token) = continuation_token {
306                req = req.continuation_token(token);
307            }
308            let resp = req.send().await.map_err(from_aws_sdk_error)?;
309            for ns in resp.namespaces() {
310                result.push(NamespaceIdent::from_vec(ns.namespace().to_vec())?);
311            }
312            continuation_token = resp.continuation_token().map(|s| s.to_string());
313            if continuation_token.is_none() {
314                break;
315            }
316        }
317        Ok(result)
318    }
319
320    /// Creates a new namespace with the given identifier and properties.
321    ///
322    /// Attempts to create a namespace defined by the `namespace`. The `properties`
323    /// parameter is ignored.
324    ///
325    /// The following naming rules apply to namespaces:
326    ///
327    /// - Names must be between 3 (min) and 63 (max) characters long.
328    /// - Names can consist only of lowercase letters, numbers, and underscores (_).
329    /// - Names must begin and end with a letter or number.
330    /// - Names must not contain hyphens (-) or periods (.).
331    ///
332    /// This function can return an error in the following situations:
333    ///
334    /// - Errors from the underlying database creation process, converted using
335    /// `from_aws_sdk_error`.
336    async fn create_namespace(
337        &self,
338        namespace: &NamespaceIdent,
339        _properties: HashMap<String, String>,
340    ) -> Result<Namespace> {
341        if self.namespace_exists(namespace).await? {
342            return Err(Error::new(
343                ErrorKind::NamespaceAlreadyExists,
344                format!("Namespace {namespace:?} already exists"),
345            ));
346        }
347
348        let req = self
349            .s3tables_client
350            .create_namespace()
351            .table_bucket_arn(self.config.table_bucket_arn.clone())
352            .namespace(namespace.to_url_string());
353        req.send().await.map_err(from_aws_sdk_error)?;
354        Ok(Namespace::with_properties(
355            namespace.clone(),
356            HashMap::new(),
357        ))
358    }
359
360    /// Retrieves a namespace by its identifier.
361    ///
362    /// Validates the given namespace identifier and then queries the
363    /// underlying database client to fetch the corresponding namespace data.
364    /// Constructs a `Namespace` object with the retrieved data and returns it.
365    ///
366    /// This function can return an error in any of the following situations:
367    /// - If there is an error querying the database, returned by
368    /// `from_aws_sdk_error`.
369    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
370        if !self.namespace_exists(namespace).await? {
371            return Err(Error::new(
372                ErrorKind::NamespaceNotFound,
373                format!("Namespace {namespace:?} does not exist"),
374            ));
375        }
376
377        let req = self
378            .s3tables_client
379            .get_namespace()
380            .table_bucket_arn(self.config.table_bucket_arn.clone())
381            .namespace(namespace.to_url_string());
382        let resp: GetNamespaceOutput = req.send().await.map_err(from_aws_sdk_error)?;
383        let properties = HashMap::new();
384        Ok(Namespace::with_properties(
385            NamespaceIdent::from_vec(resp.namespace().to_vec())?,
386            properties,
387        ))
388    }
389
390    /// Checks if a namespace exists within the s3tables catalog.
391    ///
392    /// Validates the namespace identifier by querying the s3tables catalog
393    /// to determine if the specified namespace exists.
394    ///
395    /// # Returns
396    /// A `Result<bool>` indicating the outcome of the check:
397    /// - `Ok(true)` if the namespace exists.
398    /// - `Ok(false)` if the namespace does not exist, identified by a specific
399    /// `IsNotFoundException` variant.
400    /// - `Err(...)` if an error occurs during validation or the s3tables catalog
401    /// query, with the error encapsulating the issue.
402    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
403        let req = self
404            .s3tables_client
405            .get_namespace()
406            .table_bucket_arn(self.config.table_bucket_arn.clone())
407            .namespace(namespace.to_url_string());
408        match req.send().await {
409            Ok(_) => Ok(true),
410            Err(err) => {
411                if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
412                    Ok(false)
413                } else {
414                    Err(from_aws_sdk_error(err))
415                }
416            }
417        }
418    }
419
420    /// Updates the properties of an existing namespace.
421    ///
422    /// S3Tables doesn't support updating namespace properties, so this function
423    /// will always return an error.
424    async fn update_namespace(
425        &self,
426        _namespace: &NamespaceIdent,
427        _properties: HashMap<String, String>,
428    ) -> Result<()> {
429        Err(Error::new(
430            ErrorKind::FeatureUnsupported,
431            "Update namespace is not supported for s3tables catalog",
432        ))
433    }
434
435    /// Drops an existing namespace from the s3tables catalog.
436    ///
437    /// Validates the namespace identifier and then deletes the corresponding
438    /// namespace from the s3tables catalog.
439    ///
440    /// This function can return an error in the following situations:
441    /// - Errors from the underlying database deletion process, converted using
442    /// `from_aws_sdk_error`.
443    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
444        if !self.namespace_exists(namespace).await? {
445            return Err(Error::new(
446                ErrorKind::NamespaceNotFound,
447                format!("Namespace {namespace:?} does not exist"),
448            ));
449        }
450
451        let req = self
452            .s3tables_client
453            .delete_namespace()
454            .table_bucket_arn(self.config.table_bucket_arn.clone())
455            .namespace(namespace.to_url_string());
456        req.send().await.map_err(from_aws_sdk_error)?;
457        Ok(())
458    }
459
460    /// Lists all tables within a given namespace.
461    ///
462    /// Retrieves all tables associated with the specified namespace and returns
463    /// their identifiers.
464    ///
465    /// This function can return an error in the following situations:
466    /// - Errors from the underlying database query process, converted using
467    /// `from_aws_sdk_error`.
468    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
469        let mut result = Vec::new();
470        let mut continuation_token = None;
471        loop {
472            let mut req = self
473                .s3tables_client
474                .list_tables()
475                .table_bucket_arn(self.config.table_bucket_arn.clone())
476                .namespace(namespace.to_url_string());
477            if let Some(token) = continuation_token {
478                req = req.continuation_token(token);
479            }
480            let resp: ListTablesOutput = req.send().await.map_err(from_aws_sdk_error)?;
481            for table in resp.tables() {
482                result.push(TableIdent::new(
483                    NamespaceIdent::from_vec(table.namespace().to_vec())?,
484                    table.name().to_string(),
485                ));
486            }
487            continuation_token = resp.continuation_token().map(|s| s.to_string());
488            if continuation_token.is_none() {
489                break;
490            }
491        }
492        Ok(result)
493    }
494
495    /// Creates a new table within a specified namespace.
496    ///
497    /// Attempts to create a table defined by the `creation` parameter. The metadata
498    /// location is generated by the s3tables catalog, looks like:
499    ///
500    /// s3://{RANDOM WAREHOUSE LOCATION}/metadata/{VERSION}-{UUID}.metadata.json
501    ///
502    /// We have to get this random warehouse location after the table is created.
503    ///
504    /// This function can return an error in the following situations:
505    /// - If the location of the table is set by user, identified by a specific
506    /// `DataInvalid` variant.
507    /// - Errors from the underlying database creation process, converted using
508    /// `from_aws_sdk_error`.
509    async fn create_table(
510        &self,
511        namespace: &NamespaceIdent,
512        mut creation: TableCreation,
513    ) -> Result<Table> {
514        let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
515
516        // create table
517        let create_resp: CreateTableOutput = self
518            .s3tables_client
519            .create_table()
520            .table_bucket_arn(self.config.table_bucket_arn.clone())
521            .namespace(namespace.to_url_string())
522            .format(OpenTableFormat::Iceberg)
523            .name(table_ident.name())
524            .send()
525            .await
526            .map_err(from_aws_sdk_error)?;
527
528        // prepare table location. the warehouse location is generated by s3tables catalog,
529        // which looks like: s3://e6c9bf20-991a-46fb-kni5xs1q2yxi3xxdyxzjzigdeop1quse2b--table-s3
530        let table_location = match &creation.location {
531            Some(_) => {
532                return Err(Error::new(
533                    ErrorKind::DataInvalid,
534                    "The location of the table is generated by s3tables catalog, can't be set by user.",
535                ));
536            }
537            None => {
538                let get_resp: GetTableOutput = self
539                    .s3tables_client
540                    .get_table()
541                    .table_bucket_arn(self.config.table_bucket_arn.clone())
542                    .namespace(namespace.to_url_string())
543                    .name(table_ident.name())
544                    .send()
545                    .await
546                    .map_err(from_aws_sdk_error)?;
547                get_resp.warehouse_location().to_string()
548            }
549        };
550
551        // write metadata to file
552        creation.location = Some(table_location.clone());
553        let metadata = TableMetadataBuilder::from_table_creation(creation)?
554            .build()?
555            .metadata;
556        let metadata_location = MetadataLocation::new_with_metadata(table_location, &metadata);
557        metadata.write_to(&self.file_io, &metadata_location).await?;
558
559        // update metadata location
560        let metadata_location_str = metadata_location.to_string();
561        self.s3tables_client
562            .update_table_metadata_location()
563            .table_bucket_arn(self.config.table_bucket_arn.clone())
564            .namespace(namespace.to_url_string())
565            .name(table_ident.name())
566            .metadata_location(metadata_location_str.clone())
567            .version_token(create_resp.version_token())
568            .send()
569            .await
570            .map_err(from_aws_sdk_error)?;
571
572        let table = Table::builder()
573            .identifier(table_ident)
574            .metadata_location(metadata_location_str)
575            .metadata(metadata)
576            .file_io(self.file_io.clone())
577            .runtime(self.runtime.clone())
578            .build()?;
579        Ok(table)
580    }
581
582    /// Loads an existing table from the s3tables catalog.
583    ///
584    /// Retrieves the metadata location of the specified table and constructs a
585    /// `Table` object with the retrieved metadata.
586    ///
587    /// This function can return an error in the following situations:
588    /// - If the table does not have a metadata location, identified by a specific
589    /// `Unexpected` variant.
590    /// - Errors from the underlying database query process, converted using
591    /// `from_aws_sdk_error`.
592    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
593        Ok(self.load_table_with_version_token(table_ident).await?.0)
594    }
595
596    /// Not supported for S3Tables. Use `purge_table` instead.
597    ///
598    /// S3 Tables doesn't support soft delete, so dropping a table will permanently remove it from the catalog.
599    async fn drop_table(&self, _table: &TableIdent) -> Result<()> {
600        Err(Error::new(
601            ErrorKind::FeatureUnsupported,
602            "drop_table is not supported for S3Tables; use purge_table instead",
603        ))
604    }
605
606    /// Purge a table from the S3 Tables catalog.
607    async fn purge_table(&self, table: &TableIdent) -> Result<()> {
608        let req = self
609            .s3tables_client
610            .delete_table()
611            .table_bucket_arn(self.config.table_bucket_arn.clone())
612            .namespace(table.namespace().to_url_string())
613            .name(table.name());
614        req.send().await.map_err(from_aws_sdk_error)?;
615        Ok(())
616    }
617
618    /// Checks if a table exists within the s3tables catalog.
619    ///
620    /// Validates the table identifier by querying the s3tables catalog
621    /// to determine if the specified table exists.
622    ///
623    /// # Returns
624    /// A `Result<bool>` indicating the outcome of the check:
625    /// - `Ok(true)` if the table exists.
626    /// - `Ok(false)` if the table does not exist, identified by a specific
627    /// `IsNotFoundException` variant.
628    /// - `Err(...)` if an error occurs during validation or the s3tables catalog
629    /// query, with the error encapsulating the issue.
630    async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
631        let req = self
632            .s3tables_client
633            .get_table()
634            .table_bucket_arn(self.config.table_bucket_arn.clone())
635            .namespace(table_ident.namespace().to_url_string())
636            .name(table_ident.name());
637        match req.send().await {
638            Ok(_) => Ok(true),
639            Err(err) => {
640                if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
641                    Ok(false)
642                } else {
643                    Err(from_aws_sdk_error(err))
644                }
645            }
646        }
647    }
648
649    /// Renames an existing table within the s3tables catalog.
650    ///
651    /// Validates the source and destination table identifiers and then renames
652    /// the source table to the destination table.
653    ///
654    /// This function can return an error in the following situations:
655    /// - Errors from the underlying database renaming process, converted using
656    /// `from_aws_sdk_error`.
657    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
658        let req = self
659            .s3tables_client
660            .rename_table()
661            .table_bucket_arn(self.config.table_bucket_arn.clone())
662            .namespace(src.namespace().to_url_string())
663            .name(src.name())
664            .new_namespace_name(dest.namespace().to_url_string())
665            .new_name(dest.name());
666        req.send().await.map_err(from_aws_sdk_error)?;
667        Ok(())
668    }
669
670    async fn register_table(
671        &self,
672        _table_ident: &TableIdent,
673        _metadata_location: String,
674    ) -> Result<Table> {
675        Err(Error::new(
676            ErrorKind::FeatureUnsupported,
677            "Registering a table is not supported yet",
678        ))
679    }
680
681    /// Updates an existing table within the s3tables catalog.
682    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
683        let table_ident = commit.identifier().clone();
684        let table_namespace = table_ident.namespace();
685        let (current_table, version_token) =
686            self.load_table_with_version_token(&table_ident).await?;
687
688        let staged_table = commit.apply(current_table)?;
689        let staged_metadata_location_str = staged_table.metadata_location_result()?;
690        let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
691
692        staged_table
693            .metadata()
694            .write_to(staged_table.file_io(), &staged_metadata_location)
695            .await?;
696
697        let builder = self
698            .s3tables_client
699            .update_table_metadata_location()
700            .table_bucket_arn(&self.config.table_bucket_arn)
701            .namespace(table_namespace.to_url_string())
702            .name(table_ident.name())
703            .version_token(version_token)
704            .metadata_location(staged_metadata_location_str);
705
706        let _ = builder.send().await.map_err(|e| {
707            let error = e.into_service_error();
708            match error {
709                UpdateTableMetadataLocationError::ConflictException(_) => Error::new(
710                    ErrorKind::CatalogCommitConflicts,
711                    format!("Commit conflicted for table: {table_ident}"),
712                )
713                .with_retryable(true),
714                UpdateTableMetadataLocationError::NotFoundException(_) => Error::new(
715                    ErrorKind::TableNotFound,
716                    format!("Table {table_ident} is not found"),
717                ),
718                _ => Error::new(
719                    ErrorKind::Unexpected,
720                    "Operation failed for hitting aws sdk error",
721                ),
722            }
723            .with_source(anyhow::Error::msg(format!("aws sdk error: {error:?}")))
724        })?;
725
726        Ok(staged_table)
727    }
728}
729
730/// Format AWS SDK error into iceberg error
731pub(crate) fn from_aws_sdk_error<T>(error: aws_sdk_s3tables::error::SdkError<T>) -> Error
732where T: std::fmt::Debug {
733    Error::new(
734        ErrorKind::Unexpected,
735        format!("Operation failed for hitting aws sdk error: {error:?}"),
736    )
737}
738
739#[cfg(test)]
740mod tests {
741    use futures::TryStreamExt;
742    use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
743    use iceberg::transaction::{ApplyTransactionAction, Transaction};
744
745    use super::*;
746
747    async fn load_s3tables_catalog_from_env() -> Result<Option<S3TablesCatalog>> {
748        let table_bucket_arn = match std::env::var("TABLE_BUCKET_ARN").ok() {
749            Some(table_bucket_arn) => table_bucket_arn,
750            None => return Ok(None),
751        };
752
753        let config = S3TablesCatalogConfig {
754            name: None,
755            table_bucket_arn,
756            endpoint_url: None,
757            client: None,
758            props: HashMap::new(),
759        };
760
761        Ok(Some(
762            S3TablesCatalog::new(config, None, Runtime::current()).await?,
763        ))
764    }
765
766    #[tokio::test]
767    async fn test_s3tables_list_namespace() {
768        let catalog = match load_s3tables_catalog_from_env().await {
769            Ok(Some(catalog)) => catalog,
770            Ok(None) => return,
771            Err(e) => panic!("Error loading catalog: {e}"),
772        };
773
774        let namespaces = catalog.list_namespaces(None).await.unwrap();
775        assert!(!namespaces.is_empty());
776    }
777
778    #[tokio::test]
779    async fn test_s3tables_list_tables() {
780        let catalog = match load_s3tables_catalog_from_env().await {
781            Ok(Some(catalog)) => catalog,
782            Ok(None) => return,
783            Err(e) => panic!("Error loading catalog: {e}"),
784        };
785
786        let tables = catalog
787            .list_tables(&NamespaceIdent::new("aws_s3_metadata".to_string()))
788            .await
789            .unwrap();
790        assert!(!tables.is_empty());
791    }
792
793    #[tokio::test]
794    async fn test_s3tables_load_table() {
795        let catalog = match load_s3tables_catalog_from_env().await {
796            Ok(Some(catalog)) => catalog,
797            Ok(None) => return,
798            Err(e) => panic!("Error loading catalog: {e}"),
799        };
800
801        let _table = catalog
802            .load_table(&TableIdent::new(
803                NamespaceIdent::new("aws_s3_metadata".to_string()),
804                "query_storage_metadata".to_string(),
805            ))
806            .await
807            .expect("table that exists should be loaded");
808
809        let load_table_err = catalog
810            .load_table(&TableIdent::new(
811                NamespaceIdent::new("not_a_namespace".to_string()),
812                "not_a_table_name".to_string(),
813            ))
814            .await
815            .expect_err("loading a table that does not exist should fail");
816        assert_eq!(
817            load_table_err.kind(),
818            ErrorKind::TableNotFound,
819            "must return table not found error for non-existent table"
820        );
821    }
822
823    #[tokio::test]
824    async fn test_s3tables_create_delete_namespace() {
825        let catalog = match load_s3tables_catalog_from_env().await {
826            Ok(Some(catalog)) => catalog,
827            Ok(None) => return,
828            Err(e) => panic!("Error loading catalog: {e}"),
829        };
830
831        let namespace = NamespaceIdent::new("test_s3tables_create_delete_namespace".to_string());
832        catalog
833            .create_namespace(&namespace, HashMap::new())
834            .await
835            .unwrap();
836        assert!(catalog.namespace_exists(&namespace).await.unwrap());
837        catalog.drop_namespace(&namespace).await.unwrap();
838        assert!(!catalog.namespace_exists(&namespace).await.unwrap());
839    }
840
841    #[tokio::test]
842    async fn test_s3tables_create_delete_table() {
843        let catalog = match load_s3tables_catalog_from_env().await {
844            Ok(Some(catalog)) => catalog,
845            Ok(None) => return,
846            Err(e) => panic!("Error loading catalog: {e}"),
847        };
848
849        let creation = {
850            let schema = Schema::builder()
851                .with_schema_id(0)
852                .with_fields(vec![
853                    NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
854                    NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
855                ])
856                .build()
857                .unwrap();
858            TableCreation::builder()
859                .name("test_s3tables_create_delete_table".to_string())
860                .properties(HashMap::new())
861                .schema(schema)
862                .build()
863        };
864
865        let namespace = NamespaceIdent::new("test_s3tables_create_delete_table".to_string());
866        let table_ident = TableIdent::new(
867            namespace.clone(),
868            "test_s3tables_create_delete_table".to_string(),
869        );
870        catalog.drop_namespace(&namespace).await.ok();
871        catalog.drop_table(&table_ident).await.ok();
872
873        catalog
874            .create_namespace(&namespace, HashMap::new())
875            .await
876            .unwrap();
877        catalog.create_table(&namespace, creation).await.unwrap();
878        assert!(catalog.table_exists(&table_ident).await.unwrap());
879        catalog.drop_table(&table_ident).await.unwrap();
880        assert!(!catalog.table_exists(&table_ident).await.unwrap());
881        catalog.drop_namespace(&namespace).await.unwrap();
882    }
883
884    #[tokio::test]
885    async fn test_s3tables_update_table() {
886        let catalog = match load_s3tables_catalog_from_env().await {
887            Ok(Some(catalog)) => catalog,
888            Ok(None) => return,
889            Err(e) => panic!("Error loading catalog: {e}"),
890        };
891
892        // Create a test namespace and table
893        let namespace = NamespaceIdent::new("test_s3tables_update_table".to_string());
894        let table_ident =
895            TableIdent::new(namespace.clone(), "test_s3tables_update_table".to_string());
896
897        // Clean up any existing resources from previous test runs
898        catalog.drop_table(&table_ident).await.ok();
899        catalog.drop_namespace(&namespace).await.ok();
900
901        // Create namespace and table
902        catalog
903            .create_namespace(&namespace, HashMap::new())
904            .await
905            .unwrap();
906
907        let creation = {
908            let schema = Schema::builder()
909                .with_schema_id(0)
910                .with_fields(vec![
911                    NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
912                    NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
913                ])
914                .build()
915                .unwrap();
916            TableCreation::builder()
917                .name(table_ident.name().to_string())
918                .properties(HashMap::new())
919                .schema(schema)
920                .build()
921        };
922
923        let table = catalog.create_table(&namespace, creation).await.unwrap();
924
925        // Create a transaction to update the table
926        let tx = Transaction::new(&table);
927
928        // Store the original metadata location for comparison
929        let original_metadata_location = table.metadata_location();
930
931        // Update table properties using the transaction
932        let tx = tx
933            .update_table_properties()
934            .set("test_property".to_string(), "test_value".to_string())
935            .apply(tx)
936            .unwrap();
937
938        // Commit the transaction to the catalog
939        let updated_table = tx.commit(&catalog).await.unwrap();
940
941        // Verify the update was successful
942        assert_eq!(
943            updated_table.metadata().properties().get("test_property"),
944            Some(&"test_value".to_string())
945        );
946
947        // Verify the metadata location has been updated
948        assert_ne!(
949            updated_table.metadata_location(),
950            original_metadata_location,
951            "Metadata location should be updated after commit"
952        );
953
954        // Load the table again from the catalog to verify changes were persisted
955        let reloaded_table = catalog.load_table(&table_ident).await.unwrap();
956
957        // Verify the reloaded table matches the updated table
958        assert_eq!(
959            reloaded_table.metadata().properties().get("test_property"),
960            Some(&"test_value".to_string())
961        );
962        assert_eq!(
963            reloaded_table.metadata_location(),
964            updated_table.metadata_location(),
965            "Reloaded table should have the same metadata location as the updated table"
966        );
967    }
968
969    #[tokio::test]
970    async fn test_builder_load_missing_bucket_arn() {
971        let builder = S3TablesCatalogBuilder::default();
972        let result = builder.load("s3tables", HashMap::new()).await;
973
974        assert!(result.is_err());
975        if let Err(err) = result {
976            assert_eq!(err.kind(), ErrorKind::DataInvalid);
977            assert_eq!(err.message(), "Table bucket ARN is required");
978        }
979    }
980
981    #[tokio::test]
982    async fn test_builder_with_endpoint_url_ok() {
983        let builder = S3TablesCatalogBuilder::default().with_endpoint_url("http://localhost:4566");
984
985        let result = builder
986            .load(
987                "s3tables",
988                HashMap::from([
989                    (
990                        S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
991                        "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
992                    ),
993                    ("some_prop".to_string(), "some_value".to_string()),
994                ]),
995            )
996            .await;
997
998        assert!(result.is_ok());
999    }
1000
1001    #[tokio::test]
1002    async fn test_builder_with_client_ok() {
1003        use aws_config::BehaviorVersion;
1004
1005        let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
1006        let client = aws_sdk_s3tables::Client::new(&sdk_config);
1007
1008        let builder = S3TablesCatalogBuilder::default().with_client(client);
1009        let result = builder
1010            .load(
1011                "s3tables",
1012                HashMap::from([(
1013                    S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1014                    "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
1015                )]),
1016            )
1017            .await;
1018
1019        assert!(result.is_ok());
1020    }
1021
1022    #[tokio::test]
1023    async fn test_builder_with_table_bucket_arn() {
1024        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1025        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1026
1027        let result = builder.load("s3tables", HashMap::new()).await;
1028
1029        assert!(result.is_ok());
1030        let catalog = result.unwrap();
1031        assert_eq!(catalog.config.table_bucket_arn, test_arn);
1032    }
1033
1034    #[tokio::test]
1035    async fn test_builder_empty_table_bucket_arn_edge_cases() {
1036        let mut props = HashMap::new();
1037        props.insert(
1038            S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1039            "".to_string(),
1040        );
1041
1042        let builder = S3TablesCatalogBuilder::default();
1043        let result = builder.load("s3tables", props).await;
1044
1045        assert!(result.is_err());
1046        if let Err(err) = result {
1047            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1048            assert_eq!(err.message(), "Table bucket ARN is required");
1049        }
1050    }
1051
1052    #[tokio::test]
1053    async fn test_endpoint_url_property_overrides_builder_method() {
1054        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1055        let builder_endpoint = "http://localhost:4566";
1056        let property_endpoint = "http://localhost:8080";
1057
1058        let builder = S3TablesCatalogBuilder::default()
1059            .with_table_bucket_arn(test_arn)
1060            .with_endpoint_url(builder_endpoint);
1061
1062        let mut props = HashMap::new();
1063        props.insert(
1064            S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1065            property_endpoint.to_string(),
1066        );
1067
1068        let result = builder.load("s3tables", props).await;
1069
1070        assert!(result.is_ok());
1071        let catalog = result.unwrap();
1072
1073        // Property value should override builder method value
1074        assert_eq!(
1075            catalog.config.endpoint_url,
1076            Some(property_endpoint.to_string())
1077        );
1078        assert_ne!(
1079            catalog.config.endpoint_url,
1080            Some(builder_endpoint.to_string())
1081        );
1082    }
1083
1084    #[tokio::test]
1085    async fn test_endpoint_url_builder_method_only() {
1086        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1087        let builder_endpoint = "http://localhost:4566";
1088
1089        let builder = S3TablesCatalogBuilder::default()
1090            .with_table_bucket_arn(test_arn)
1091            .with_endpoint_url(builder_endpoint);
1092
1093        let result = builder.load("s3tables", HashMap::new()).await;
1094
1095        assert!(result.is_ok());
1096        let catalog = result.unwrap();
1097
1098        assert_eq!(
1099            catalog.config.endpoint_url,
1100            Some(builder_endpoint.to_string())
1101        );
1102    }
1103
1104    #[tokio::test]
1105    async fn test_endpoint_url_property_only() {
1106        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1107        let property_endpoint = "http://localhost:8080";
1108
1109        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1110
1111        let mut props = HashMap::new();
1112        props.insert(
1113            S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1114            property_endpoint.to_string(),
1115        );
1116
1117        let result = builder.load("s3tables", props).await;
1118
1119        assert!(result.is_ok());
1120        let catalog = result.unwrap();
1121
1122        assert_eq!(
1123            catalog.config.endpoint_url,
1124            Some(property_endpoint.to_string())
1125        );
1126    }
1127
1128    #[tokio::test]
1129    async fn test_table_bucket_arn_property_overrides_builder_method() {
1130        let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1131        let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1132
1133        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1134
1135        let mut props = HashMap::new();
1136        props.insert(
1137            S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1138            property_arn.to_string(),
1139        );
1140
1141        let result = builder.load("s3tables", props).await;
1142
1143        assert!(result.is_ok());
1144        let catalog = result.unwrap();
1145
1146        assert_eq!(catalog.config.table_bucket_arn, property_arn);
1147        assert_ne!(catalog.config.table_bucket_arn, builder_arn);
1148    }
1149
1150    #[tokio::test]
1151    async fn test_table_bucket_arn_builder_method_only() {
1152        let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1153
1154        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1155
1156        let result = builder.load("s3tables", HashMap::new()).await;
1157
1158        assert!(result.is_ok());
1159        let catalog = result.unwrap();
1160
1161        assert_eq!(catalog.config.table_bucket_arn, builder_arn);
1162    }
1163
1164    #[tokio::test]
1165    async fn test_table_bucket_arn_property_only() {
1166        let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1167
1168        let builder = S3TablesCatalogBuilder::default();
1169
1170        let mut props = HashMap::new();
1171        props.insert(
1172            S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1173            property_arn.to_string(),
1174        );
1175
1176        let result = builder.load("s3tables", props).await;
1177
1178        assert!(result.is_ok());
1179        let catalog = result.unwrap();
1180
1181        assert_eq!(catalog.config.table_bucket_arn, property_arn);
1182    }
1183
1184    #[tokio::test]
1185    async fn test_builder_empty_name_validation() {
1186        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1187        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1188
1189        let result = builder.load("", HashMap::new()).await;
1190
1191        assert!(result.is_err());
1192        if let Err(err) = result {
1193            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1194            assert_eq!(err.message(), "Catalog name cannot be empty");
1195        }
1196    }
1197
1198    #[tokio::test]
1199    async fn test_builder_whitespace_only_name_validation() {
1200        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1201        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1202
1203        let result = builder.load("   \t\n  ", HashMap::new()).await;
1204
1205        assert!(result.is_err());
1206        if let Err(err) = result {
1207            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1208            assert_eq!(err.message(), "Catalog name cannot be empty");
1209        }
1210    }
1211
1212    #[tokio::test]
1213    async fn test_builder_name_validation_with_missing_arn() {
1214        let builder = S3TablesCatalogBuilder::default();
1215
1216        let result = builder.load("", HashMap::new()).await;
1217
1218        assert!(result.is_err());
1219        if let Err(err) = result {
1220            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1221            assert_eq!(err.message(), "Catalog name cannot be empty");
1222        }
1223    }
1224
1225    /// Verify that an S3 Table catalog can create a table, write data, load the same table, and read from it.
1226    #[tokio::test]
1227    async fn test_s3tables_create_table_write_load_table_read() {
1228        use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
1229        use iceberg::writer::file_writer::ParquetWriterBuilder;
1230        use iceberg::writer::file_writer::location_generator::{
1231            DefaultFileNameGenerator, DefaultLocationGenerator,
1232        };
1233        use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
1234        use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
1235
1236        let catalog = match load_s3tables_catalog_from_env().await {
1237            Ok(Some(c)) => c,
1238            Ok(None) => return,
1239            Err(e) => panic!("Error loading catalog: {e}"),
1240        };
1241
1242        let ns = NamespaceIdent::new(format!("test_rw_{}", uuid::Uuid::new_v4().simple()));
1243        catalog.create_namespace(&ns, HashMap::new()).await.unwrap();
1244
1245        let table_name = String::from("table");
1246
1247        let schema = Schema::builder()
1248            .with_fields(vec![
1249                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
1250            ])
1251            .build()
1252            .unwrap();
1253        let creation = TableCreation::builder()
1254            .name(table_name.clone())
1255            .schema(schema)
1256            .build();
1257
1258        let table = catalog.create_table(&ns, creation).await.unwrap();
1259
1260        // Write one row.
1261        let arrow_schema: Arc<arrow_schema::Schema> = Arc::new(
1262            table
1263                .metadata()
1264                .current_schema()
1265                .as_ref()
1266                .try_into()
1267                .unwrap(),
1268        );
1269        let batch = arrow_array::RecordBatch::try_new(arrow_schema, vec![Arc::new(
1270            arrow_array::Int32Array::from(vec![42]),
1271        )])
1272        .unwrap();
1273
1274        // Locations will be generated based on the table metadata, which will be using `s3://` for Amazon S3 Tables.
1275        let location_generator = DefaultLocationGenerator::new(table.metadata()).unwrap();
1276        let file_name_generator = DefaultFileNameGenerator::new(
1277            "test".to_string(),
1278            None,
1279            iceberg::spec::DataFileFormat::Parquet,
1280        );
1281        let parquet_writer_builder = ParquetWriterBuilder::new(
1282            parquet::file::properties::WriterProperties::default(),
1283            table.metadata().current_schema().clone(),
1284        );
1285        let rw = RollingFileWriterBuilder::new_with_default_file_size(
1286            parquet_writer_builder,
1287            table.file_io().clone(),
1288            location_generator,
1289            file_name_generator,
1290        );
1291        let mut writer = DataFileWriterBuilder::new(rw).build(None).await.unwrap();
1292        writer.write(batch.clone()).await.unwrap();
1293        let data_files = writer.close().await.unwrap();
1294
1295        let tx = Transaction::new(&table);
1296        let tx = tx
1297            .fast_append()
1298            .add_data_files(data_files)
1299            .apply(tx)
1300            .unwrap();
1301        tx.commit(&catalog).await.unwrap();
1302
1303        // Reload from catalog and read back.
1304        let table_ident = TableIdent::new(ns.clone(), table_name.clone());
1305        let reloaded = catalog.load_table(&table_ident).await.unwrap();
1306        let batches: Vec<arrow_array::RecordBatch> = reloaded
1307            .scan()
1308            .select_all()
1309            .build()
1310            .expect("scan to be valid (snapshot exists, schema is OK)")
1311            .to_arrow()
1312            .await
1313            .expect("scan tasks should be OK")
1314            .try_collect()
1315            .await
1316            .expect("scan should complete successfully");
1317
1318        assert_eq!(batches.len(), 1);
1319        assert_eq!(
1320            batches[0], batch,
1321            "read records should match records written earlier"
1322        );
1323
1324        // Clean up.
1325        catalog.purge_table(&table_ident).await.ok();
1326        catalog.drop_namespace(&ns).await.ok();
1327    }
1328}