iceberg_catalog_s3tables/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::future::Future;
20
21use async_trait::async_trait;
22use aws_sdk_s3tables::operation::create_table::CreateTableOutput;
23use aws_sdk_s3tables::operation::get_namespace::GetNamespaceOutput;
24use aws_sdk_s3tables::operation::get_table::GetTableOutput;
25use aws_sdk_s3tables::operation::list_tables::ListTablesOutput;
26use aws_sdk_s3tables::operation::update_table_metadata_location::UpdateTableMetadataLocationError;
27use aws_sdk_s3tables::types::OpenTableFormat;
28use iceberg::io::{FileIO, FileIOBuilder};
29use iceberg::spec::{TableMetadata, TableMetadataBuilder};
30use iceberg::table::Table;
31use iceberg::{
32    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
33    TableCommit, TableCreation, TableIdent,
34};
35
36use crate::utils::create_sdk_config;
37
38/// S3Tables table bucket ARN property
39pub const S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN: &str = "table_bucket_arn";
40/// S3Tables endpoint URL property
41pub const S3TABLES_CATALOG_PROP_ENDPOINT_URL: &str = "endpoint_url";
42
43/// S3Tables catalog configuration.
44#[derive(Debug)]
45struct S3TablesCatalogConfig {
46    /// Catalog name.
47    name: Option<String>,
48    /// Unlike other buckets, S3Tables bucket is not a physical bucket, but a virtual bucket
49    /// that is managed by s3tables. We can't directly access the bucket with path like
50    /// s3://{bucket_name}/{file_path}, all the operations are done with respect of the bucket
51    /// ARN.
52    table_bucket_arn: String,
53    /// Endpoint URL for the catalog.
54    endpoint_url: Option<String>,
55    /// Optional pre-configured AWS SDK client for S3Tables.
56    client: Option<aws_sdk_s3tables::Client>,
57    /// Properties for the catalog. The available properties are:
58    /// - `profile_name`: The name of the AWS profile to use.
59    /// - `region_name`: The AWS region to use.
60    /// - `aws_access_key_id`: The AWS access key ID to use.
61    /// - `aws_secret_access_key`: The AWS secret access key to use.
62    /// - `aws_session_token`: The AWS session token to use.
63    props: HashMap<String, String>,
64}
65
66/// Builder for [`S3TablesCatalog`].
67#[derive(Debug)]
68pub struct S3TablesCatalogBuilder(S3TablesCatalogConfig);
69
70/// Default builder for [`S3TablesCatalog`].
71impl Default for S3TablesCatalogBuilder {
72    fn default() -> Self {
73        Self(S3TablesCatalogConfig {
74            name: None,
75            table_bucket_arn: "".to_string(),
76            endpoint_url: None,
77            client: None,
78            props: HashMap::new(),
79        })
80    }
81}
82
83/// Builder methods for [`S3TablesCatalog`].
84impl S3TablesCatalogBuilder {
85    /// Configure the catalog with a custom endpoint URL (useful for local testing/mocking).
86    ///
87    /// # Behavior with Properties
88    ///
89    /// If both this method and the `endpoint_url` property are provided during catalog loading,
90    /// the property value will take precedence and overwrite the value set by this method.
91    /// This follows the general pattern where properties specified in the `load()` method
92    /// have higher priority than builder method configurations.
93    pub fn with_endpoint_url(mut self, endpoint_url: impl Into<String>) -> Self {
94        self.0.endpoint_url = Some(endpoint_url.into());
95        self
96    }
97
98    /// Configure the catalog with a pre-built AWS SDK client.
99    pub fn with_client(mut self, client: aws_sdk_s3tables::Client) -> Self {
100        self.0.client = Some(client);
101        self
102    }
103
104    /// Configure the catalog with a table bucket ARN.
105    ///
106    /// # Behavior with Properties
107    ///
108    /// If both this method and the `table_bucket_arn` property are provided during catalog loading,
109    /// the property value will take precedence and overwrite the value set by this method.
110    /// This follows the general pattern where properties specified in the `load()` method
111    /// have higher priority than builder method configurations.
112    pub fn with_table_bucket_arn(mut self, table_bucket_arn: impl Into<String>) -> Self {
113        self.0.table_bucket_arn = table_bucket_arn.into();
114        self
115    }
116}
117
118impl CatalogBuilder for S3TablesCatalogBuilder {
119    type C = S3TablesCatalog;
120
121    fn load(
122        mut self,
123        name: impl Into<String>,
124        props: HashMap<String, String>,
125    ) -> impl Future<Output = Result<Self::C>> + Send {
126        let catalog_name = name.into();
127        self.0.name = Some(catalog_name.clone());
128
129        if props.contains_key(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN) {
130            self.0.table_bucket_arn = props
131                .get(S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN)
132                .cloned()
133                .unwrap_or_default();
134        }
135
136        if props.contains_key(S3TABLES_CATALOG_PROP_ENDPOINT_URL) {
137            self.0.endpoint_url = props.get(S3TABLES_CATALOG_PROP_ENDPOINT_URL).cloned();
138        }
139
140        // Collect other remaining properties
141        self.0.props = props
142            .into_iter()
143            .filter(|(k, _)| {
144                k != S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN
145                    && k != S3TABLES_CATALOG_PROP_ENDPOINT_URL
146            })
147            .collect();
148
149        async move {
150            if catalog_name.trim().is_empty() {
151                Err(Error::new(
152                    ErrorKind::DataInvalid,
153                    "Catalog name cannot be empty",
154                ))
155            } else if self.0.table_bucket_arn.is_empty() {
156                Err(Error::new(
157                    ErrorKind::DataInvalid,
158                    "Table bucket ARN is required",
159                ))
160            } else {
161                S3TablesCatalog::new(self.0).await
162            }
163        }
164    }
165}
166
167/// S3Tables catalog implementation.
168#[derive(Debug)]
169pub struct S3TablesCatalog {
170    config: S3TablesCatalogConfig,
171    s3tables_client: aws_sdk_s3tables::Client,
172    file_io: FileIO,
173}
174
175impl S3TablesCatalog {
176    /// Creates a new S3Tables catalog.
177    async fn new(config: S3TablesCatalogConfig) -> Result<Self> {
178        let s3tables_client = if let Some(client) = config.client.clone() {
179            client
180        } else {
181            let aws_config = create_sdk_config(&config.props, config.endpoint_url.clone()).await;
182            aws_sdk_s3tables::Client::new(&aws_config)
183        };
184
185        let file_io = FileIOBuilder::new("s3").with_props(&config.props).build()?;
186
187        Ok(Self {
188            config,
189            s3tables_client,
190            file_io,
191        })
192    }
193
194    async fn load_table_with_version_token(
195        &self,
196        table_ident: &TableIdent,
197    ) -> Result<(Table, String)> {
198        let req = self
199            .s3tables_client
200            .get_table()
201            .table_bucket_arn(self.config.table_bucket_arn.clone())
202            .namespace(table_ident.namespace().to_url_string())
203            .name(table_ident.name());
204        let resp: GetTableOutput = req.send().await.map_err(from_aws_sdk_error)?;
205
206        // when a table is created, it's possible that the metadata location is not set.
207        let metadata_location = resp.metadata_location().ok_or_else(|| {
208            Error::new(
209                ErrorKind::Unexpected,
210                format!(
211                    "Table {} does not have metadata location",
212                    table_ident.name()
213                ),
214            )
215        })?;
216        let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
217
218        let table = Table::builder()
219            .identifier(table_ident.clone())
220            .metadata(metadata)
221            .metadata_location(metadata_location)
222            .file_io(self.file_io.clone())
223            .build()?;
224        Ok((table, resp.version_token))
225    }
226}
227
228#[async_trait]
229impl Catalog for S3TablesCatalog {
230    /// List namespaces from s3tables catalog.
231    ///
232    /// S3Tables doesn't support nested namespaces. If parent is provided, it will
233    /// return an empty list.
234    async fn list_namespaces(
235        &self,
236        parent: Option<&NamespaceIdent>,
237    ) -> Result<Vec<NamespaceIdent>> {
238        if parent.is_some() {
239            return Ok(vec![]);
240        }
241
242        let mut result = Vec::new();
243        let mut continuation_token = None;
244        loop {
245            let mut req = self
246                .s3tables_client
247                .list_namespaces()
248                .table_bucket_arn(self.config.table_bucket_arn.clone());
249            if let Some(token) = continuation_token {
250                req = req.continuation_token(token);
251            }
252            let resp = req.send().await.map_err(from_aws_sdk_error)?;
253            for ns in resp.namespaces() {
254                result.push(NamespaceIdent::from_vec(ns.namespace().to_vec())?);
255            }
256            continuation_token = resp.continuation_token().map(|s| s.to_string());
257            if continuation_token.is_none() {
258                break;
259            }
260        }
261        Ok(result)
262    }
263
264    /// Creates a new namespace with the given identifier and properties.
265    ///
266    /// Attempts to create a namespace defined by the `namespace`. The `properties`
267    /// parameter is ignored.
268    ///
269    /// The following naming rules apply to namespaces:
270    ///
271    /// - Names must be between 3 (min) and 63 (max) characters long.
272    /// - Names can consist only of lowercase letters, numbers, and underscores (_).
273    /// - Names must begin and end with a letter or number.
274    /// - Names must not contain hyphens (-) or periods (.).
275    ///
276    /// This function can return an error in the following situations:
277    ///
278    /// - Errors from the underlying database creation process, converted using
279    /// `from_aws_sdk_error`.
280    async fn create_namespace(
281        &self,
282        namespace: &NamespaceIdent,
283        _properties: HashMap<String, String>,
284    ) -> Result<Namespace> {
285        let req = self
286            .s3tables_client
287            .create_namespace()
288            .table_bucket_arn(self.config.table_bucket_arn.clone())
289            .namespace(namespace.to_url_string());
290        req.send().await.map_err(from_aws_sdk_error)?;
291        Ok(Namespace::with_properties(
292            namespace.clone(),
293            HashMap::new(),
294        ))
295    }
296
297    /// Retrieves a namespace by its identifier.
298    ///
299    /// Validates the given namespace identifier and then queries the
300    /// underlying database client to fetch the corresponding namespace data.
301    /// Constructs a `Namespace` object with the retrieved data and returns it.
302    ///
303    /// This function can return an error in any of the following situations:
304    /// - If there is an error querying the database, returned by
305    /// `from_aws_sdk_error`.
306    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
307        let req = self
308            .s3tables_client
309            .get_namespace()
310            .table_bucket_arn(self.config.table_bucket_arn.clone())
311            .namespace(namespace.to_url_string());
312        let resp: GetNamespaceOutput = req.send().await.map_err(from_aws_sdk_error)?;
313        let properties = HashMap::new();
314        Ok(Namespace::with_properties(
315            NamespaceIdent::from_vec(resp.namespace().to_vec())?,
316            properties,
317        ))
318    }
319
320    /// Checks if a namespace exists within the s3tables catalog.
321    ///
322    /// Validates the namespace identifier by querying the s3tables catalog
323    /// to determine if the specified namespace exists.
324    ///
325    /// # Returns
326    /// A `Result<bool>` indicating the outcome of the check:
327    /// - `Ok(true)` if the namespace exists.
328    /// - `Ok(false)` if the namespace does not exist, identified by a specific
329    /// `IsNotFoundException` variant.
330    /// - `Err(...)` if an error occurs during validation or the s3tables catalog
331    /// query, with the error encapsulating the issue.
332    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
333        let req = self
334            .s3tables_client
335            .get_namespace()
336            .table_bucket_arn(self.config.table_bucket_arn.clone())
337            .namespace(namespace.to_url_string());
338        match req.send().await {
339            Ok(_) => Ok(true),
340            Err(err) => {
341                if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
342                    Ok(false)
343                } else {
344                    Err(from_aws_sdk_error(err))
345                }
346            }
347        }
348    }
349
350    /// Updates the properties of an existing namespace.
351    ///
352    /// S3Tables doesn't support updating namespace properties, so this function
353    /// will always return an error.
354    async fn update_namespace(
355        &self,
356        _namespace: &NamespaceIdent,
357        _properties: HashMap<String, String>,
358    ) -> Result<()> {
359        Err(Error::new(
360            ErrorKind::FeatureUnsupported,
361            "Update namespace is not supported for s3tables catalog",
362        ))
363    }
364
365    /// Drops an existing namespace from the s3tables catalog.
366    ///
367    /// Validates the namespace identifier and then deletes the corresponding
368    /// namespace from the s3tables catalog.
369    ///
370    /// This function can return an error in the following situations:
371    /// - Errors from the underlying database deletion process, converted using
372    /// `from_aws_sdk_error`.
373    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
374        let req = self
375            .s3tables_client
376            .delete_namespace()
377            .table_bucket_arn(self.config.table_bucket_arn.clone())
378            .namespace(namespace.to_url_string());
379        req.send().await.map_err(from_aws_sdk_error)?;
380        Ok(())
381    }
382
383    /// Lists all tables within a given namespace.
384    ///
385    /// Retrieves all tables associated with the specified namespace and returns
386    /// their identifiers.
387    ///
388    /// This function can return an error in the following situations:
389    /// - Errors from the underlying database query process, converted using
390    /// `from_aws_sdk_error`.
391    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
392        let mut result = Vec::new();
393        let mut continuation_token = None;
394        loop {
395            let mut req = self
396                .s3tables_client
397                .list_tables()
398                .table_bucket_arn(self.config.table_bucket_arn.clone())
399                .namespace(namespace.to_url_string());
400            if let Some(token) = continuation_token {
401                req = req.continuation_token(token);
402            }
403            let resp: ListTablesOutput = req.send().await.map_err(from_aws_sdk_error)?;
404            for table in resp.tables() {
405                result.push(TableIdent::new(
406                    NamespaceIdent::from_vec(table.namespace().to_vec())?,
407                    table.name().to_string(),
408                ));
409            }
410            continuation_token = resp.continuation_token().map(|s| s.to_string());
411            if continuation_token.is_none() {
412                break;
413            }
414        }
415        Ok(result)
416    }
417
418    /// Creates a new table within a specified namespace.
419    ///
420    /// Attempts to create a table defined by the `creation` parameter. The metadata
421    /// location is generated by the s3tables catalog, looks like:
422    ///
423    /// s3://{RANDOM WAREHOUSE LOCATION}/metadata/{VERSION}-{UUID}.metadata.json
424    ///
425    /// We have to get this random warehouse location after the table is created.
426    ///
427    /// This function can return an error in the following situations:
428    /// - If the location of the table is set by user, identified by a specific
429    /// `DataInvalid` variant.
430    /// - Errors from the underlying database creation process, converted using
431    /// `from_aws_sdk_error`.
432    async fn create_table(
433        &self,
434        namespace: &NamespaceIdent,
435        mut creation: TableCreation,
436    ) -> Result<Table> {
437        let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
438
439        // create table
440        let create_resp: CreateTableOutput = self
441            .s3tables_client
442            .create_table()
443            .table_bucket_arn(self.config.table_bucket_arn.clone())
444            .namespace(namespace.to_url_string())
445            .format(OpenTableFormat::Iceberg)
446            .name(table_ident.name())
447            .send()
448            .await
449            .map_err(from_aws_sdk_error)?;
450
451        // prepare metadata location. the warehouse location is generated by s3tables catalog,
452        // which looks like: s3://e6c9bf20-991a-46fb-kni5xs1q2yxi3xxdyxzjzigdeop1quse2b--table-s3
453        let metadata_location = match &creation.location {
454            Some(_) => {
455                return Err(Error::new(
456                    ErrorKind::DataInvalid,
457                    "The location of the table is generated by s3tables catalog, can't be set by user.",
458                ));
459            }
460            None => {
461                let get_resp: GetTableOutput = self
462                    .s3tables_client
463                    .get_table()
464                    .table_bucket_arn(self.config.table_bucket_arn.clone())
465                    .namespace(namespace.to_url_string())
466                    .name(table_ident.name())
467                    .send()
468                    .await
469                    .map_err(from_aws_sdk_error)?;
470                let warehouse_location = get_resp.warehouse_location().to_string();
471                MetadataLocation::new_with_table_location(warehouse_location).to_string()
472            }
473        };
474
475        // write metadata to file
476        creation.location = Some(metadata_location.clone());
477        let metadata = TableMetadataBuilder::from_table_creation(creation)?
478            .build()?
479            .metadata;
480        metadata.write_to(&self.file_io, &metadata_location).await?;
481
482        // update metadata location
483        self.s3tables_client
484            .update_table_metadata_location()
485            .table_bucket_arn(self.config.table_bucket_arn.clone())
486            .namespace(namespace.to_url_string())
487            .name(table_ident.name())
488            .metadata_location(metadata_location.clone())
489            .version_token(create_resp.version_token())
490            .send()
491            .await
492            .map_err(from_aws_sdk_error)?;
493
494        let table = Table::builder()
495            .identifier(table_ident)
496            .metadata_location(metadata_location)
497            .metadata(metadata)
498            .file_io(self.file_io.clone())
499            .build()?;
500        Ok(table)
501    }
502
503    /// Loads an existing table from the s3tables catalog.
504    ///
505    /// Retrieves the metadata location of the specified table and constructs a
506    /// `Table` object with the retrieved metadata.
507    ///
508    /// This function can return an error in the following situations:
509    /// - If the table does not have a metadata location, identified by a specific
510    /// `Unexpected` variant.
511    /// - Errors from the underlying database query process, converted using
512    /// `from_aws_sdk_error`.
513    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
514        Ok(self.load_table_with_version_token(table_ident).await?.0)
515    }
516
517    /// Drops an existing table from the s3tables catalog.
518    ///
519    /// Validates the table identifier and then deletes the corresponding
520    /// table from the s3tables catalog.
521    ///
522    /// This function can return an error in the following situations:
523    /// - Errors from the underlying database deletion process, converted using
524    /// `from_aws_sdk_error`.
525    async fn drop_table(&self, table: &TableIdent) -> Result<()> {
526        let req = self
527            .s3tables_client
528            .delete_table()
529            .table_bucket_arn(self.config.table_bucket_arn.clone())
530            .namespace(table.namespace().to_url_string())
531            .name(table.name());
532        req.send().await.map_err(from_aws_sdk_error)?;
533        Ok(())
534    }
535
536    /// Checks if a table exists within the s3tables catalog.
537    ///
538    /// Validates the table identifier by querying the s3tables catalog
539    /// to determine if the specified table exists.
540    ///
541    /// # Returns
542    /// A `Result<bool>` indicating the outcome of the check:
543    /// - `Ok(true)` if the table exists.
544    /// - `Ok(false)` if the table does not exist, identified by a specific
545    /// `IsNotFoundException` variant.
546    /// - `Err(...)` if an error occurs during validation or the s3tables catalog
547    /// query, with the error encapsulating the issue.
548    async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
549        let req = self
550            .s3tables_client
551            .get_table()
552            .table_bucket_arn(self.config.table_bucket_arn.clone())
553            .namespace(table_ident.namespace().to_url_string())
554            .name(table_ident.name());
555        match req.send().await {
556            Ok(_) => Ok(true),
557            Err(err) => {
558                if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
559                    Ok(false)
560                } else {
561                    Err(from_aws_sdk_error(err))
562                }
563            }
564        }
565    }
566
567    /// Renames an existing table within the s3tables catalog.
568    ///
569    /// Validates the source and destination table identifiers and then renames
570    /// the source table to the destination table.
571    ///
572    /// This function can return an error in the following situations:
573    /// - Errors from the underlying database renaming process, converted using
574    /// `from_aws_sdk_error`.
575    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
576        let req = self
577            .s3tables_client
578            .rename_table()
579            .table_bucket_arn(self.config.table_bucket_arn.clone())
580            .namespace(src.namespace().to_url_string())
581            .name(src.name())
582            .new_namespace_name(dest.namespace().to_url_string())
583            .new_name(dest.name());
584        req.send().await.map_err(from_aws_sdk_error)?;
585        Ok(())
586    }
587
588    async fn register_table(
589        &self,
590        _table_ident: &TableIdent,
591        _metadata_location: String,
592    ) -> Result<Table> {
593        Err(Error::new(
594            ErrorKind::FeatureUnsupported,
595            "Registering a table is not supported yet",
596        ))
597    }
598
599    /// Updates an existing table within the s3tables catalog.
600    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
601        let table_ident = commit.identifier().clone();
602        let table_namespace = table_ident.namespace();
603        let (current_table, version_token) =
604            self.load_table_with_version_token(&table_ident).await?;
605
606        let staged_table = commit.apply(current_table)?;
607        let staged_metadata_location = staged_table.metadata_location_result()?;
608
609        staged_table
610            .metadata()
611            .write_to(staged_table.file_io(), staged_metadata_location)
612            .await?;
613
614        let builder = self
615            .s3tables_client
616            .update_table_metadata_location()
617            .table_bucket_arn(&self.config.table_bucket_arn)
618            .namespace(table_namespace.to_url_string())
619            .name(table_ident.name())
620            .version_token(version_token)
621            .metadata_location(staged_metadata_location);
622
623        let _ = builder.send().await.map_err(|e| {
624            let error = e.into_service_error();
625            match error {
626                UpdateTableMetadataLocationError::ConflictException(_) => Error::new(
627                    ErrorKind::CatalogCommitConflicts,
628                    format!("Commit conflicted for table: {table_ident}"),
629                )
630                .with_retryable(true),
631                UpdateTableMetadataLocationError::NotFoundException(_) => Error::new(
632                    ErrorKind::TableNotFound,
633                    format!("Table {table_ident} is not found"),
634                ),
635                _ => Error::new(
636                    ErrorKind::Unexpected,
637                    "Operation failed for hitting aws sdk error",
638                ),
639            }
640            .with_source(anyhow::Error::msg(format!("aws sdk error: {error:?}")))
641        })?;
642
643        Ok(staged_table)
644    }
645}
646
647/// Format AWS SDK error into iceberg error
648pub(crate) fn from_aws_sdk_error<T>(error: aws_sdk_s3tables::error::SdkError<T>) -> Error
649where T: std::fmt::Debug {
650    Error::new(
651        ErrorKind::Unexpected,
652        format!("Operation failed for hitting aws sdk error: {error:?}"),
653    )
654}
655
656#[cfg(test)]
657mod tests {
658    use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
659    use iceberg::transaction::{ApplyTransactionAction, Transaction};
660
661    use super::*;
662
663    async fn load_s3tables_catalog_from_env() -> Result<Option<S3TablesCatalog>> {
664        let table_bucket_arn = match std::env::var("TABLE_BUCKET_ARN").ok() {
665            Some(table_bucket_arn) => table_bucket_arn,
666            None => return Ok(None),
667        };
668
669        let config = S3TablesCatalogConfig {
670            name: None,
671            table_bucket_arn,
672            endpoint_url: None,
673            client: None,
674            props: HashMap::new(),
675        };
676
677        Ok(Some(S3TablesCatalog::new(config).await?))
678    }
679
680    #[tokio::test]
681    async fn test_s3tables_list_namespace() {
682        let catalog = match load_s3tables_catalog_from_env().await {
683            Ok(Some(catalog)) => catalog,
684            Ok(None) => return,
685            Err(e) => panic!("Error loading catalog: {e}"),
686        };
687
688        let namespaces = catalog.list_namespaces(None).await.unwrap();
689        assert!(!namespaces.is_empty());
690    }
691
692    #[tokio::test]
693    async fn test_s3tables_list_tables() {
694        let catalog = match load_s3tables_catalog_from_env().await {
695            Ok(Some(catalog)) => catalog,
696            Ok(None) => return,
697            Err(e) => panic!("Error loading catalog: {e}"),
698        };
699
700        let tables = catalog
701            .list_tables(&NamespaceIdent::new("aws_s3_metadata".to_string()))
702            .await
703            .unwrap();
704        assert!(!tables.is_empty());
705    }
706
707    #[tokio::test]
708    async fn test_s3tables_load_table() {
709        let catalog = match load_s3tables_catalog_from_env().await {
710            Ok(Some(catalog)) => catalog,
711            Ok(None) => return,
712            Err(e) => panic!("Error loading catalog: {e}"),
713        };
714
715        let table = catalog
716            .load_table(&TableIdent::new(
717                NamespaceIdent::new("aws_s3_metadata".to_string()),
718                "query_storage_metadata".to_string(),
719            ))
720            .await
721            .unwrap();
722        println!("{table:?}");
723    }
724
725    #[tokio::test]
726    async fn test_s3tables_create_delete_namespace() {
727        let catalog = match load_s3tables_catalog_from_env().await {
728            Ok(Some(catalog)) => catalog,
729            Ok(None) => return,
730            Err(e) => panic!("Error loading catalog: {e}"),
731        };
732
733        let namespace = NamespaceIdent::new("test_s3tables_create_delete_namespace".to_string());
734        catalog
735            .create_namespace(&namespace, HashMap::new())
736            .await
737            .unwrap();
738        assert!(catalog.namespace_exists(&namespace).await.unwrap());
739        catalog.drop_namespace(&namespace).await.unwrap();
740        assert!(!catalog.namespace_exists(&namespace).await.unwrap());
741    }
742
743    #[tokio::test]
744    async fn test_s3tables_create_delete_table() {
745        let catalog = match load_s3tables_catalog_from_env().await {
746            Ok(Some(catalog)) => catalog,
747            Ok(None) => return,
748            Err(e) => panic!("Error loading catalog: {e}"),
749        };
750
751        let creation = {
752            let schema = Schema::builder()
753                .with_schema_id(0)
754                .with_fields(vec![
755                    NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
756                    NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
757                ])
758                .build()
759                .unwrap();
760            TableCreation::builder()
761                .name("test_s3tables_create_delete_table".to_string())
762                .properties(HashMap::new())
763                .schema(schema)
764                .build()
765        };
766
767        let namespace = NamespaceIdent::new("test_s3tables_create_delete_table".to_string());
768        let table_ident = TableIdent::new(
769            namespace.clone(),
770            "test_s3tables_create_delete_table".to_string(),
771        );
772        catalog.drop_namespace(&namespace).await.ok();
773        catalog.drop_table(&table_ident).await.ok();
774
775        catalog
776            .create_namespace(&namespace, HashMap::new())
777            .await
778            .unwrap();
779        catalog.create_table(&namespace, creation).await.unwrap();
780        assert!(catalog.table_exists(&table_ident).await.unwrap());
781        catalog.drop_table(&table_ident).await.unwrap();
782        assert!(!catalog.table_exists(&table_ident).await.unwrap());
783        catalog.drop_namespace(&namespace).await.unwrap();
784    }
785
786    #[tokio::test]
787    async fn test_s3tables_update_table() {
788        let catalog = match load_s3tables_catalog_from_env().await {
789            Ok(Some(catalog)) => catalog,
790            Ok(None) => return,
791            Err(e) => panic!("Error loading catalog: {e}"),
792        };
793
794        // Create a test namespace and table
795        let namespace = NamespaceIdent::new("test_s3tables_update_table".to_string());
796        let table_ident =
797            TableIdent::new(namespace.clone(), "test_s3tables_update_table".to_string());
798
799        // Clean up any existing resources from previous test runs
800        catalog.drop_table(&table_ident).await.ok();
801        catalog.drop_namespace(&namespace).await.ok();
802
803        // Create namespace and table
804        catalog
805            .create_namespace(&namespace, HashMap::new())
806            .await
807            .unwrap();
808
809        let creation = {
810            let schema = Schema::builder()
811                .with_schema_id(0)
812                .with_fields(vec![
813                    NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
814                    NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
815                ])
816                .build()
817                .unwrap();
818            TableCreation::builder()
819                .name(table_ident.name().to_string())
820                .properties(HashMap::new())
821                .schema(schema)
822                .build()
823        };
824
825        let table = catalog.create_table(&namespace, creation).await.unwrap();
826
827        // Create a transaction to update the table
828        let tx = Transaction::new(&table);
829
830        // Store the original metadata location for comparison
831        let original_metadata_location = table.metadata_location();
832
833        // Update table properties using the transaction
834        let tx = tx
835            .update_table_properties()
836            .set("test_property".to_string(), "test_value".to_string())
837            .apply(tx)
838            .unwrap();
839
840        // Commit the transaction to the catalog
841        let updated_table = tx.commit(&catalog).await.unwrap();
842
843        // Verify the update was successful
844        assert_eq!(
845            updated_table.metadata().properties().get("test_property"),
846            Some(&"test_value".to_string())
847        );
848
849        // Verify the metadata location has been updated
850        assert_ne!(
851            updated_table.metadata_location(),
852            original_metadata_location,
853            "Metadata location should be updated after commit"
854        );
855
856        // Load the table again from the catalog to verify changes were persisted
857        let reloaded_table = catalog.load_table(&table_ident).await.unwrap();
858
859        // Verify the reloaded table matches the updated table
860        assert_eq!(
861            reloaded_table.metadata().properties().get("test_property"),
862            Some(&"test_value".to_string())
863        );
864        assert_eq!(
865            reloaded_table.metadata_location(),
866            updated_table.metadata_location(),
867            "Reloaded table should have the same metadata location as the updated table"
868        );
869    }
870
871    #[tokio::test]
872    async fn test_builder_load_missing_bucket_arn() {
873        let builder = S3TablesCatalogBuilder::default();
874        let result = builder.load("s3tables", HashMap::new()).await;
875
876        assert!(result.is_err());
877        if let Err(err) = result {
878            assert_eq!(err.kind(), ErrorKind::DataInvalid);
879            assert_eq!(err.message(), "Table bucket ARN is required");
880        }
881    }
882
883    #[tokio::test]
884    async fn test_builder_with_endpoint_url_ok() {
885        let builder = S3TablesCatalogBuilder::default().with_endpoint_url("http://localhost:4566");
886
887        let result = builder
888            .load(
889                "s3tables",
890                HashMap::from([
891                    (
892                        S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
893                        "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
894                    ),
895                    ("some_prop".to_string(), "some_value".to_string()),
896                ]),
897            )
898            .await;
899
900        assert!(result.is_ok());
901    }
902
903    #[tokio::test]
904    async fn test_builder_with_client_ok() {
905        use aws_config::BehaviorVersion;
906
907        let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
908        let client = aws_sdk_s3tables::Client::new(&sdk_config);
909
910        let builder = S3TablesCatalogBuilder::default().with_client(client);
911        let result = builder
912            .load(
913                "s3tables",
914                HashMap::from([(
915                    S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
916                    "arn:aws:s3tables:us-east-1:123456789012:bucket/test".to_string(),
917                )]),
918            )
919            .await;
920
921        assert!(result.is_ok());
922    }
923
924    #[tokio::test]
925    async fn test_builder_with_table_bucket_arn() {
926        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
927        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
928
929        let result = builder.load("s3tables", HashMap::new()).await;
930
931        assert!(result.is_ok());
932        let catalog = result.unwrap();
933        assert_eq!(catalog.config.table_bucket_arn, test_arn);
934    }
935
936    #[tokio::test]
937    async fn test_builder_empty_table_bucket_arn_edge_cases() {
938        let mut props = HashMap::new();
939        props.insert(
940            S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
941            "".to_string(),
942        );
943
944        let builder = S3TablesCatalogBuilder::default();
945        let result = builder.load("s3tables", props).await;
946
947        assert!(result.is_err());
948        if let Err(err) = result {
949            assert_eq!(err.kind(), ErrorKind::DataInvalid);
950            assert_eq!(err.message(), "Table bucket ARN is required");
951        }
952    }
953
954    #[tokio::test]
955    async fn test_endpoint_url_property_overrides_builder_method() {
956        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
957        let builder_endpoint = "http://localhost:4566";
958        let property_endpoint = "http://localhost:8080";
959
960        let builder = S3TablesCatalogBuilder::default()
961            .with_table_bucket_arn(test_arn)
962            .with_endpoint_url(builder_endpoint);
963
964        let mut props = HashMap::new();
965        props.insert(
966            S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
967            property_endpoint.to_string(),
968        );
969
970        let result = builder.load("s3tables", props).await;
971
972        assert!(result.is_ok());
973        let catalog = result.unwrap();
974
975        // Property value should override builder method value
976        assert_eq!(
977            catalog.config.endpoint_url,
978            Some(property_endpoint.to_string())
979        );
980        assert_ne!(
981            catalog.config.endpoint_url,
982            Some(builder_endpoint.to_string())
983        );
984    }
985
986    #[tokio::test]
987    async fn test_endpoint_url_builder_method_only() {
988        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
989        let builder_endpoint = "http://localhost:4566";
990
991        let builder = S3TablesCatalogBuilder::default()
992            .with_table_bucket_arn(test_arn)
993            .with_endpoint_url(builder_endpoint);
994
995        let result = builder.load("s3tables", HashMap::new()).await;
996
997        assert!(result.is_ok());
998        let catalog = result.unwrap();
999
1000        assert_eq!(
1001            catalog.config.endpoint_url,
1002            Some(builder_endpoint.to_string())
1003        );
1004    }
1005
1006    #[tokio::test]
1007    async fn test_endpoint_url_property_only() {
1008        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1009        let property_endpoint = "http://localhost:8080";
1010
1011        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1012
1013        let mut props = HashMap::new();
1014        props.insert(
1015            S3TABLES_CATALOG_PROP_ENDPOINT_URL.to_string(),
1016            property_endpoint.to_string(),
1017        );
1018
1019        let result = builder.load("s3tables", props).await;
1020
1021        assert!(result.is_ok());
1022        let catalog = result.unwrap();
1023
1024        assert_eq!(
1025            catalog.config.endpoint_url,
1026            Some(property_endpoint.to_string())
1027        );
1028    }
1029
1030    #[tokio::test]
1031    async fn test_table_bucket_arn_property_overrides_builder_method() {
1032        let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1033        let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1034
1035        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1036
1037        let mut props = HashMap::new();
1038        props.insert(
1039            S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1040            property_arn.to_string(),
1041        );
1042
1043        let result = builder.load("s3tables", props).await;
1044
1045        assert!(result.is_ok());
1046        let catalog = result.unwrap();
1047
1048        assert_eq!(catalog.config.table_bucket_arn, property_arn);
1049        assert_ne!(catalog.config.table_bucket_arn, builder_arn);
1050    }
1051
1052    #[tokio::test]
1053    async fn test_table_bucket_arn_builder_method_only() {
1054        let builder_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/builder-bucket";
1055
1056        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(builder_arn);
1057
1058        let result = builder.load("s3tables", HashMap::new()).await;
1059
1060        assert!(result.is_ok());
1061        let catalog = result.unwrap();
1062
1063        assert_eq!(catalog.config.table_bucket_arn, builder_arn);
1064    }
1065
1066    #[tokio::test]
1067    async fn test_table_bucket_arn_property_only() {
1068        let property_arn = "arn:aws:s3tables:us-east-1:987654321098:bucket/property-bucket";
1069
1070        let builder = S3TablesCatalogBuilder::default();
1071
1072        let mut props = HashMap::new();
1073        props.insert(
1074            S3TABLES_CATALOG_PROP_TABLE_BUCKET_ARN.to_string(),
1075            property_arn.to_string(),
1076        );
1077
1078        let result = builder.load("s3tables", props).await;
1079
1080        assert!(result.is_ok());
1081        let catalog = result.unwrap();
1082
1083        assert_eq!(catalog.config.table_bucket_arn, property_arn);
1084    }
1085
1086    #[tokio::test]
1087    async fn test_builder_empty_name_validation() {
1088        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1089        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1090
1091        let result = builder.load("", HashMap::new()).await;
1092
1093        assert!(result.is_err());
1094        if let Err(err) = result {
1095            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1096            assert_eq!(err.message(), "Catalog name cannot be empty");
1097        }
1098    }
1099
1100    #[tokio::test]
1101    async fn test_builder_whitespace_only_name_validation() {
1102        let test_arn = "arn:aws:s3tables:us-west-2:123456789012:bucket/test-bucket";
1103        let builder = S3TablesCatalogBuilder::default().with_table_bucket_arn(test_arn);
1104
1105        let result = builder.load("   \t\n  ", HashMap::new()).await;
1106
1107        assert!(result.is_err());
1108        if let Err(err) = result {
1109            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1110            assert_eq!(err.message(), "Catalog name cannot be empty");
1111        }
1112    }
1113
1114    #[tokio::test]
1115    async fn test_builder_name_validation_with_missing_arn() {
1116        let builder = S3TablesCatalogBuilder::default();
1117
1118        let result = builder.load("", HashMap::new()).await;
1119
1120        assert!(result.is_err());
1121        if let Err(err) = result {
1122            assert_eq!(err.kind(), ErrorKind::DataInvalid);
1123            assert_eq!(err.message(), "Catalog name cannot be empty");
1124        }
1125    }
1126}