iceberg_catalog_glue/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use anyhow::anyhow;
24use async_trait::async_trait;
25use aws_sdk_glue::operation::create_table::CreateTableError;
26use aws_sdk_glue::operation::update_table::UpdateTableError;
27use aws_sdk_glue::types::TableInput;
28use iceberg::io::{
29    FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
30    S3_SESSION_TOKEN, StorageFactory,
31};
32use iceberg::spec::{TableMetadata, TableMetadataBuilder};
33use iceberg::table::Table;
34use iceberg::{
35    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
36    TableCommit, TableCreation, TableIdent,
37};
38use iceberg_storage_opendal::OpenDalStorageFactory;
39
40use crate::error::{from_aws_build_error, from_aws_sdk_error};
41use crate::utils::{
42    convert_to_database, convert_to_glue_table, convert_to_namespace, create_sdk_config,
43    get_default_table_location, get_metadata_location, validate_namespace,
44};
45use crate::{
46    AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, with_catalog_id,
47};
48
49/// Glue catalog URI
50pub const GLUE_CATALOG_PROP_URI: &str = "uri";
51/// Glue catalog id
52pub const GLUE_CATALOG_PROP_CATALOG_ID: &str = "catalog_id";
53/// Glue catalog warehouse location
54pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
55
56/// Builder for [`GlueCatalog`].
57#[derive(Debug)]
58pub struct GlueCatalogBuilder {
59    config: GlueCatalogConfig,
60    storage_factory: Option<Arc<dyn StorageFactory>>,
61}
62
63impl Default for GlueCatalogBuilder {
64    fn default() -> Self {
65        Self {
66            config: GlueCatalogConfig {
67                name: None,
68                uri: None,
69                catalog_id: None,
70                warehouse: "".to_string(),
71                props: HashMap::new(),
72            },
73            storage_factory: None,
74        }
75    }
76}
77
78impl CatalogBuilder for GlueCatalogBuilder {
79    type C = GlueCatalog;
80
81    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
82        self.storage_factory = Some(storage_factory);
83        self
84    }
85
86    fn load(
87        mut self,
88        name: impl Into<String>,
89        props: HashMap<String, String>,
90    ) -> impl Future<Output = Result<Self::C>> + Send {
91        self.config.name = Some(name.into());
92
93        if props.contains_key(GLUE_CATALOG_PROP_URI) {
94            self.config.uri = props.get(GLUE_CATALOG_PROP_URI).cloned()
95        }
96
97        if props.contains_key(GLUE_CATALOG_PROP_CATALOG_ID) {
98            self.config.catalog_id = props.get(GLUE_CATALOG_PROP_CATALOG_ID).cloned()
99        }
100
101        if props.contains_key(GLUE_CATALOG_PROP_WAREHOUSE) {
102            self.config.warehouse = props
103                .get(GLUE_CATALOG_PROP_WAREHOUSE)
104                .cloned()
105                .unwrap_or_default();
106        }
107
108        // Collect other remaining properties
109        self.config.props = props
110            .into_iter()
111            .filter(|(k, _)| {
112                k != GLUE_CATALOG_PROP_URI
113                    && k != GLUE_CATALOG_PROP_CATALOG_ID
114                    && k != GLUE_CATALOG_PROP_WAREHOUSE
115            })
116            .collect();
117
118        async move {
119            if self.config.name.is_none() {
120                return Err(Error::new(
121                    ErrorKind::DataInvalid,
122                    "Catalog name is required",
123                ));
124            }
125            if self.config.warehouse.is_empty() {
126                return Err(Error::new(
127                    ErrorKind::DataInvalid,
128                    "Catalog warehouse is required",
129                ));
130            }
131
132            GlueCatalog::new(self.config, self.storage_factory).await
133        }
134    }
135}
136
137#[derive(Debug)]
138/// Glue Catalog configuration
139pub(crate) struct GlueCatalogConfig {
140    name: Option<String>,
141    uri: Option<String>,
142    catalog_id: Option<String>,
143    warehouse: String,
144    props: HashMap<String, String>,
145}
146
147struct GlueClient(aws_sdk_glue::Client);
148
149/// Glue Catalog
150pub struct GlueCatalog {
151    config: GlueCatalogConfig,
152    client: GlueClient,
153    file_io: FileIO,
154}
155
156impl Debug for GlueCatalog {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.debug_struct("GlueCatalog")
159            .field("config", &self.config)
160            .finish_non_exhaustive()
161    }
162}
163
164impl GlueCatalog {
165    /// Create a new glue catalog
166    async fn new(
167        config: GlueCatalogConfig,
168        storage_factory: Option<Arc<dyn StorageFactory>>,
169    ) -> Result<Self> {
170        let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await;
171        let mut file_io_props = config.props.clone();
172        if !file_io_props.contains_key(S3_ACCESS_KEY_ID)
173            && let Some(access_key_id) = file_io_props.get(AWS_ACCESS_KEY_ID)
174        {
175            file_io_props.insert(S3_ACCESS_KEY_ID.to_string(), access_key_id.to_string());
176        }
177        if !file_io_props.contains_key(S3_SECRET_ACCESS_KEY)
178            && let Some(secret_access_key) = file_io_props.get(AWS_SECRET_ACCESS_KEY)
179        {
180            file_io_props.insert(
181                S3_SECRET_ACCESS_KEY.to_string(),
182                secret_access_key.to_string(),
183            );
184        }
185        if !file_io_props.contains_key(S3_REGION)
186            && let Some(region) = file_io_props.get(AWS_REGION_NAME)
187        {
188            file_io_props.insert(S3_REGION.to_string(), region.to_string());
189        }
190        if !file_io_props.contains_key(S3_SESSION_TOKEN)
191            && let Some(session_token) = file_io_props.get(AWS_SESSION_TOKEN)
192        {
193            file_io_props.insert(S3_SESSION_TOKEN.to_string(), session_token.to_string());
194        }
195        if !file_io_props.contains_key(S3_ENDPOINT)
196            && let Some(aws_endpoint) = config.uri.as_ref()
197        {
198            file_io_props.insert(S3_ENDPOINT.to_string(), aws_endpoint.to_string());
199        }
200
201        let client = aws_sdk_glue::Client::new(&sdk_config);
202
203        // Use provided factory or default to OpenDalStorageFactory::S3
204        let factory = storage_factory.unwrap_or_else(|| {
205            Arc::new(OpenDalStorageFactory::S3 {
206                configured_scheme: "s3a".to_string(),
207                customized_credential_load: None,
208            })
209        });
210        let file_io = FileIOBuilder::new(factory)
211            .with_props(file_io_props)
212            .build();
213
214        Ok(GlueCatalog {
215            config,
216            client: GlueClient(client),
217            file_io,
218        })
219    }
220    /// Get the catalogs `FileIO`
221    pub fn file_io(&self) -> FileIO {
222        self.file_io.clone()
223    }
224
225    /// Loads a table from the Glue Catalog along with its version_id for optimistic locking.
226    ///
227    /// # Returns
228    /// A `Result` wrapping a tuple of (`Table`, `Option<String>`) where the String is the version_id
229    /// from Glue that should be used for optimistic concurrency control when updating the table.
230    ///
231    /// # Errors
232    /// This function may return an error in several scenarios, including:
233    /// - Failure to validate the namespace.
234    /// - Failure to retrieve the table from the Glue Catalog.
235    /// - Absence of metadata location information in the table's properties.
236    /// - Issues reading or deserializing the table's metadata file.
237    async fn load_table_with_version_id(
238        &self,
239        table: &TableIdent,
240    ) -> Result<(Table, Option<String>)> {
241        let db_name = validate_namespace(table.namespace())?;
242        let table_name = table.name();
243
244        let builder = self
245            .client
246            .0
247            .get_table()
248            .database_name(&db_name)
249            .name(table_name);
250        let builder = with_catalog_id!(builder, self.config);
251
252        let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
253
254        let glue_table = glue_table_output.table().ok_or_else(|| {
255            Error::new(
256                ErrorKind::TableNotFound,
257                format!(
258                    "Table object for database: {db_name} and table: {table_name} does not exist"
259                ),
260            )
261        })?;
262
263        let version_id = glue_table.version_id.clone();
264        let metadata_location = get_metadata_location(&glue_table.parameters)?;
265
266        let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
267
268        let table = Table::builder()
269            .file_io(self.file_io())
270            .metadata_location(metadata_location)
271            .metadata(metadata)
272            .identifier(TableIdent::new(
273                NamespaceIdent::new(db_name),
274                table_name.to_owned(),
275            ))
276            .build()?;
277
278        Ok((table, version_id))
279    }
280}
281
282#[async_trait]
283impl Catalog for GlueCatalog {
284    /// List namespaces from glue catalog.
285    ///
286    /// Glue doesn't support nested namespaces.
287    /// We will return an empty list if parent is some.
288    async fn list_namespaces(
289        &self,
290        parent: Option<&NamespaceIdent>,
291    ) -> Result<Vec<NamespaceIdent>> {
292        if parent.is_some() {
293            return Ok(vec![]);
294        }
295
296        let mut database_list: Vec<NamespaceIdent> = Vec::new();
297        let mut next_token: Option<String> = None;
298
299        loop {
300            let builder = match &next_token {
301                Some(token) => self.client.0.get_databases().next_token(token),
302                None => self.client.0.get_databases(),
303            };
304            let builder = with_catalog_id!(builder, self.config);
305            let resp = builder.send().await.map_err(from_aws_sdk_error)?;
306
307            let dbs: Vec<NamespaceIdent> = resp
308                .database_list()
309                .iter()
310                .map(|db| NamespaceIdent::new(db.name().to_string()))
311                .collect();
312
313            database_list.extend(dbs);
314
315            next_token = resp.next_token().map(ToOwned::to_owned);
316            if next_token.is_none() {
317                break;
318            }
319        }
320
321        Ok(database_list)
322    }
323
324    /// Creates a new namespace with the given identifier and properties.
325    ///
326    /// Attempts to create a namespace defined by the `namespace`
327    /// parameter and configured with the specified `properties`.
328    ///
329    /// This function can return an error in the following situations:
330    ///
331    /// - Errors from `validate_namespace` if the namespace identifier does not
332    /// meet validation criteria.
333    /// - Errors from `convert_to_database` if the properties cannot be
334    /// successfully converted into a database configuration.
335    /// - Errors from the underlying database creation process, converted using
336    /// `from_sdk_error`.
337    async fn create_namespace(
338        &self,
339        namespace: &NamespaceIdent,
340        properties: HashMap<String, String>,
341    ) -> Result<Namespace> {
342        if self.namespace_exists(namespace).await? {
343            return Err(Error::new(
344                ErrorKind::NamespaceAlreadyExists,
345                format!("Namespace {namespace:?} already exists"),
346            ));
347        }
348
349        let db_input = convert_to_database(namespace, &properties)?;
350
351        let builder = self.client.0.create_database().database_input(db_input);
352        let builder = with_catalog_id!(builder, self.config);
353
354        builder.send().await.map_err(from_aws_sdk_error)?;
355
356        Ok(Namespace::with_properties(namespace.clone(), properties))
357    }
358
359    /// Retrieves a namespace by its identifier.
360    ///
361    /// Validates the given namespace identifier and then queries the
362    /// underlying database client to fetch the corresponding namespace data.
363    /// Constructs a `Namespace` object with the retrieved data and returns it.
364    ///
365    /// This function can return an error in any of the following situations:
366    /// - If the provided namespace identifier fails validation checks
367    /// - If there is an error querying the database, returned by
368    /// `from_sdk_error`.
369    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
370        let db_name = validate_namespace(namespace)?;
371
372        let builder = self.client.0.get_database().name(&db_name);
373        let builder = with_catalog_id!(builder, self.config);
374
375        let resp = builder.send().await.map_err(|err| {
376            if err
377                .as_service_error()
378                .map(|e| e.is_entity_not_found_exception())
379                == Some(true)
380            {
381                return Error::new(
382                    ErrorKind::NamespaceNotFound,
383                    format!("Namespace {namespace:?} does not exist"),
384                );
385            }
386            from_aws_sdk_error(err)
387        })?;
388
389        match resp.database() {
390            Some(db) => {
391                let namespace = convert_to_namespace(db);
392                Ok(namespace)
393            }
394            None => Err(Error::new(
395                ErrorKind::NamespaceNotFound,
396                format!("Database with name: {db_name} does not exist"),
397            )),
398        }
399    }
400
401    /// Checks if a namespace exists within the Glue Catalog.
402    ///
403    /// Validates the namespace identifier by querying the Glue Catalog
404    /// to determine if the specified namespace (database) exists.
405    ///
406    /// # Returns
407    /// A `Result<bool>` indicating the outcome of the check:
408    /// - `Ok(true)` if the namespace exists.
409    /// - `Ok(false)` if the namespace does not exist, identified by a specific
410    /// `EntityNotFoundException` variant.
411    /// - `Err(...)` if an error occurs during validation or the Glue Catalog
412    /// query, with the error encapsulating the issue.
413    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
414        let db_name = validate_namespace(namespace)?;
415
416        let builder = self.client.0.get_database().name(&db_name);
417        let builder = with_catalog_id!(builder, self.config);
418
419        let resp = builder.send().await;
420
421        match resp {
422            Ok(_) => Ok(true),
423            Err(err) => {
424                if err
425                    .as_service_error()
426                    .map(|e| e.is_entity_not_found_exception())
427                    == Some(true)
428                {
429                    return Ok(false);
430                }
431                Err(from_aws_sdk_error(err))
432            }
433        }
434    }
435
436    /// Asynchronously updates properties of an existing namespace.
437    ///
438    /// Converts the given namespace identifier and properties into a database
439    /// representation and then attempts to update the corresponding namespace
440    /// in the Glue Catalog.
441    ///
442    /// # Returns
443    /// Returns `Ok(())` if the namespace update is successful. If the
444    /// namespace cannot be updated due to missing information or an error
445    /// during the update process, an `Err(...)` is returned.
446    async fn update_namespace(
447        &self,
448        namespace: &NamespaceIdent,
449        properties: HashMap<String, String>,
450    ) -> Result<()> {
451        if !self.namespace_exists(namespace).await? {
452            return Err(Error::new(
453                ErrorKind::NamespaceNotFound,
454                format!("Namespace {namespace:?} does not exist"),
455            ));
456        }
457
458        let db_name = validate_namespace(namespace)?;
459        let db_input = convert_to_database(namespace, &properties)?;
460
461        let builder = self
462            .client
463            .0
464            .update_database()
465            .name(&db_name)
466            .database_input(db_input);
467        let builder = with_catalog_id!(builder, self.config);
468
469        builder.send().await.map_err(from_aws_sdk_error)?;
470
471        Ok(())
472    }
473
474    /// Asynchronously drops a namespace from the Glue Catalog.
475    ///
476    /// Checks if the namespace is empty. If it still contains tables the
477    /// namespace will not be dropped, but an error is returned instead.
478    ///
479    /// # Returns
480    /// A `Result<()>` indicating the outcome:
481    /// - `Ok(())` signifies successful namespace deletion.
482    /// - `Err(...)` signifies failure to drop the namespace due to validation
483    /// errors, connectivity issues, or Glue Catalog constraints.
484    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
485        if !self.namespace_exists(namespace).await? {
486            return Err(Error::new(
487                ErrorKind::NamespaceNotFound,
488                format!("Namespace {namespace:?} does not exist"),
489            ));
490        }
491
492        let db_name = validate_namespace(namespace)?;
493        let table_list = self.list_tables(namespace).await?;
494
495        if !table_list.is_empty() {
496            return Err(Error::new(
497                ErrorKind::DataInvalid,
498                format!("Database with name: {} is not empty", &db_name),
499            ));
500        }
501
502        let builder = self.client.0.delete_database().name(db_name);
503        let builder = with_catalog_id!(builder, self.config);
504
505        builder.send().await.map_err(from_aws_sdk_error)?;
506
507        Ok(())
508    }
509
510    /// Asynchronously lists all tables within a specified namespace.
511    ///
512    /// # Returns
513    /// A `Result<Vec<TableIdent>>`, which is:
514    /// - `Ok(vec![...])` containing a vector of `TableIdent` instances, each
515    /// representing a table within the specified namespace.
516    /// - `Err(...)` if an error occurs during namespace validation or while
517    /// querying the database.
518    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
519        let db_name = validate_namespace(namespace)?;
520
521        let mut table_list: Vec<TableIdent> = Vec::new();
522        let mut next_token: Option<String> = None;
523
524        loop {
525            let builder = match &next_token {
526                Some(token) => self
527                    .client
528                    .0
529                    .get_tables()
530                    .database_name(&db_name)
531                    .next_token(token),
532                None => self.client.0.get_tables().database_name(&db_name),
533            };
534            let builder = with_catalog_id!(builder, self.config);
535            let resp = builder.send().await.map_err(from_aws_sdk_error)?;
536
537            let tables: Vec<_> = resp
538                .table_list()
539                .iter()
540                .map(|tbl| TableIdent::new(namespace.clone(), tbl.name().to_string()))
541                .collect();
542
543            table_list.extend(tables);
544
545            next_token = resp.next_token().map(ToOwned::to_owned);
546            if next_token.is_none() {
547                break;
548            }
549        }
550
551        Ok(table_list)
552    }
553
554    /// Creates a new table within a specified namespace using the provided
555    /// table creation settings.
556    ///
557    /// # Returns
558    /// A `Result` wrapping a `Table` object representing the newly created
559    /// table.
560    ///
561    /// # Errors
562    /// This function may return an error in several cases, including invalid
563    /// namespace identifiers, failure to determine a default storage location,
564    /// issues generating or writing table metadata, and errors communicating
565    /// with the Glue Catalog.
566    async fn create_table(
567        &self,
568        namespace: &NamespaceIdent,
569        mut creation: TableCreation,
570    ) -> Result<Table> {
571        let db_name = validate_namespace(namespace)?;
572        let table_name = creation.name.clone();
573
574        let location = match &creation.location {
575            Some(location) => location.clone(),
576            None => {
577                let ns = self.get_namespace(namespace).await?;
578                let location =
579                    get_default_table_location(&ns, &db_name, &table_name, &self.config.warehouse);
580                creation.location = Some(location.clone());
581                location
582            }
583        };
584        let metadata = TableMetadataBuilder::from_table_creation(creation)?
585            .build()?
586            .metadata;
587        let metadata_location = MetadataLocation::new_with_metadata(location.clone(), &metadata);
588
589        metadata.write_to(&self.file_io, &metadata_location).await?;
590
591        let metadata_location_str = metadata_location.to_string();
592        let glue_table = convert_to_glue_table(
593            &table_name,
594            metadata_location_str.clone(),
595            &metadata,
596            metadata.properties(),
597            None,
598        )?;
599
600        let builder = self
601            .client
602            .0
603            .create_table()
604            .database_name(&db_name)
605            .table_input(glue_table);
606        let builder = with_catalog_id!(builder, self.config);
607
608        builder.send().await.map_err(from_aws_sdk_error)?;
609
610        Table::builder()
611            .file_io(self.file_io())
612            .metadata_location(metadata_location_str)
613            .metadata(metadata)
614            .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
615            .build()
616    }
617
618    /// Loads a table from the Glue Catalog and constructs a `Table` object
619    /// based on its metadata.
620    ///
621    /// # Returns
622    /// A `Result` wrapping a `Table` object that represents the loaded table.
623    ///
624    /// # Errors
625    /// This function may return an error in several scenarios, including:
626    /// - Failure to validate the namespace.
627    /// - Failure to retrieve the table from the Glue Catalog.
628    /// - Absence of metadata location information in the table's properties.
629    /// - Issues reading or deserializing the table's metadata file.
630    async fn load_table(&self, table: &TableIdent) -> Result<Table> {
631        let (table, _) = self.load_table_with_version_id(table).await?;
632        Ok(table)
633    }
634
635    /// Asynchronously drops a table from the database.
636    ///
637    /// # Errors
638    /// Returns an error if:
639    /// - The namespace provided in `table` cannot be validated
640    /// or does not exist.
641    /// - The underlying database client encounters an error while
642    /// attempting to drop the table. This includes scenarios where
643    /// the table does not exist.
644    /// - Any network or communication error occurs with the database backend.
645    async fn drop_table(&self, table: &TableIdent) -> Result<()> {
646        let db_name = validate_namespace(table.namespace())?;
647        let table_name = table.name();
648
649        let builder = self
650            .client
651            .0
652            .delete_table()
653            .database_name(&db_name)
654            .name(table_name);
655        let builder = with_catalog_id!(builder, self.config);
656
657        builder.send().await.map_err(from_aws_sdk_error)?;
658
659        Ok(())
660    }
661
662    /// Asynchronously checks the existence of a specified table
663    /// in the database.
664    ///
665    /// # Returns
666    /// - `Ok(true)` if the table exists in the database.
667    /// - `Ok(false)` if the table does not exist in the database.
668    /// - `Err(...)` if an error occurs during the process
669    async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
670        let db_name = validate_namespace(table.namespace())?;
671        let table_name = table.name();
672
673        let builder = self
674            .client
675            .0
676            .get_table()
677            .database_name(&db_name)
678            .name(table_name);
679        let builder = with_catalog_id!(builder, self.config);
680
681        let resp = builder.send().await;
682
683        match resp {
684            Ok(_) => Ok(true),
685            Err(err) => {
686                if err
687                    .as_service_error()
688                    .map(|e| e.is_entity_not_found_exception())
689                    == Some(true)
690                {
691                    return Ok(false);
692                }
693                Err(from_aws_sdk_error(err))
694            }
695        }
696    }
697
698    /// Asynchronously renames a table within the database
699    /// or moves it between namespaces (databases).
700    ///
701    /// # Returns
702    /// - `Ok(())` on successful rename or move of the table.
703    /// - `Err(...)` if an error occurs during the process.
704    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
705        let src_db_name = validate_namespace(src.namespace())?;
706        let dest_db_name = validate_namespace(dest.namespace())?;
707
708        let src_table_name = src.name();
709        let dest_table_name = dest.name();
710
711        let builder = self
712            .client
713            .0
714            .get_table()
715            .database_name(&src_db_name)
716            .name(src_table_name);
717        let builder = with_catalog_id!(builder, self.config);
718
719        let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
720
721        match glue_table_output.table() {
722            None => Err(Error::new(
723                ErrorKind::TableNotFound,
724                format!(
725                    "'Table' object for database: {src_db_name} and table: {src_table_name} does not exist"
726                ),
727            )),
728            Some(table) => {
729                let rename_table_input = TableInput::builder()
730                    .name(dest_table_name)
731                    .set_parameters(table.parameters.clone())
732                    .set_storage_descriptor(table.storage_descriptor.clone())
733                    .set_table_type(table.table_type.clone())
734                    .set_description(table.description.clone())
735                    .build()
736                    .map_err(from_aws_build_error)?;
737
738                let builder = self
739                    .client
740                    .0
741                    .create_table()
742                    .database_name(&dest_db_name)
743                    .table_input(rename_table_input);
744                let builder = with_catalog_id!(builder, self.config);
745
746                builder.send().await.map_err(from_aws_sdk_error)?;
747
748                let drop_src_table_result = self.drop_table(src).await;
749
750                match drop_src_table_result {
751                    Ok(_) => Ok(()),
752                    Err(_) => {
753                        let err_msg_src_table =
754                            format!("Failed to drop old table {src_db_name}.{src_table_name}.");
755
756                        let drop_dest_table_result = self.drop_table(dest).await;
757
758                        match drop_dest_table_result {
759                            Ok(_) => Err(Error::new(
760                                ErrorKind::Unexpected,
761                                format!(
762                                    "{err_msg_src_table} Rolled back table creation for {dest_db_name}.{dest_table_name}."
763                                ),
764                            )),
765                            Err(_) => Err(Error::new(
766                                ErrorKind::Unexpected,
767                                format!(
768                                    "{err_msg_src_table} Failed to roll back table creation for {dest_db_name}.{dest_table_name}. Please clean up manually."
769                                ),
770                            )),
771                        }
772                    }
773                }
774            }
775        }
776    }
777
778    /// registers an existing table into the Glue Catalog.
779    ///
780    /// Converts the provided table identifier and metadata location into a
781    /// Glue-compatible table representation, and attempts to create the
782    /// corresponding table in the Glue Catalog.
783    ///
784    /// # Returns
785    /// Returns `Ok(Table)` if the table is successfully registered and loaded.
786    /// If the registration fails due to validation issues, existing table conflicts,
787    /// metadata problems, or errors during the registration or loading process,
788    /// an `Err(...)` is returned.
789    async fn register_table(
790        &self,
791        table_ident: &TableIdent,
792        metadata_location: String,
793    ) -> Result<Table> {
794        let db_name = validate_namespace(table_ident.namespace())?;
795        let table_name = table_ident.name();
796        let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
797
798        let table_input = convert_to_glue_table(
799            table_name,
800            metadata_location.clone(),
801            &metadata,
802            metadata.properties(),
803            None,
804        )?;
805
806        let builder = self
807            .client
808            .0
809            .create_table()
810            .database_name(&db_name)
811            .table_input(table_input);
812        let builder = with_catalog_id!(builder, self.config);
813
814        builder.send().await.map_err(|e| {
815            let error = e.into_service_error();
816            match error {
817                CreateTableError::EntityNotFoundException(_) => Error::new(
818                    ErrorKind::NamespaceNotFound,
819                    format!("Database {db_name} does not exist"),
820                ),
821                CreateTableError::AlreadyExistsException(_) => Error::new(
822                    ErrorKind::TableAlreadyExists,
823                    format!("Table {table_ident} already exists"),
824                ),
825                _ => Error::new(
826                    ErrorKind::Unexpected,
827                    format!("Failed to register table {table_ident} due to AWS SDK error"),
828                ),
829            }
830            .with_source(anyhow!("aws sdk error: {error:?}"))
831        })?;
832
833        Ok(Table::builder()
834            .identifier(table_ident.clone())
835            .metadata_location(metadata_location)
836            .metadata(metadata)
837            .file_io(self.file_io())
838            .build()?)
839    }
840
841    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
842        let table_ident = commit.identifier().clone();
843        let table_namespace = validate_namespace(table_ident.namespace())?;
844
845        let (current_table, current_version_id) =
846            self.load_table_with_version_id(&table_ident).await?;
847        let current_metadata_location = current_table.metadata_location_result()?.to_string();
848
849        let staged_table = commit.apply(current_table)?;
850        let staged_metadata_location_str = staged_table.metadata_location_result()?;
851        let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
852
853        // Write new metadata
854        staged_table
855            .metadata()
856            .write_to(staged_table.file_io(), &staged_metadata_location)
857            .await?;
858
859        // Persist staged table to Glue with optimistic locking
860        let mut builder = self
861            .client
862            .0
863            .update_table()
864            .database_name(table_namespace)
865            .set_skip_archive(Some(true)) // todo make this configurable
866            .table_input(convert_to_glue_table(
867                table_ident.name(),
868                staged_metadata_location.to_string(),
869                staged_table.metadata(),
870                staged_table.metadata().properties(),
871                Some(current_metadata_location),
872            )?);
873
874        // Add VersionId for optimistic locking
875        if let Some(version_id) = current_version_id {
876            builder = builder.version_id(version_id);
877        }
878
879        let builder = with_catalog_id!(builder, self.config);
880        let _ = builder.send().await.map_err(|e| {
881            let error = e.into_service_error();
882            match error {
883                UpdateTableError::EntityNotFoundException(_) => Error::new(
884                    ErrorKind::TableNotFound,
885                    format!("Table {table_ident} is not found"),
886                ),
887                UpdateTableError::ConcurrentModificationException(_) => Error::new(
888                    ErrorKind::CatalogCommitConflicts,
889                    format!("Commit failed for table: {table_ident}"),
890                )
891                .with_retryable(true),
892                _ => Error::new(
893                    ErrorKind::Unexpected,
894                    format!("Operation failed for table: {table_ident} for hitting aws sdk error"),
895                ),
896            }
897            .with_source(anyhow!("aws sdk error: {error:?}"))
898        })?;
899
900        Ok(staged_table)
901    }
902}