iceberg_catalog_glue/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use anyhow::anyhow;
24use async_trait::async_trait;
25use aws_sdk_glue::operation::create_table::CreateTableError;
26use aws_sdk_glue::operation::update_table::UpdateTableError;
27use aws_sdk_glue::types::TableInput;
28use iceberg::io::{
29    FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
30    S3_SESSION_TOKEN, StorageFactory,
31};
32use iceberg::spec::{TableMetadata, TableMetadataBuilder};
33use iceberg::table::Table;
34use iceberg::{
35    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
36    TableCommit, TableCreation, TableIdent,
37};
38use iceberg_storage_opendal::OpenDalStorageFactory;
39
40use crate::error::{from_aws_build_error, from_aws_sdk_error};
41use crate::utils::{
42    convert_to_database, convert_to_glue_table, convert_to_namespace, create_sdk_config,
43    get_default_table_location, get_metadata_location, validate_namespace,
44};
45use crate::{
46    AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, with_catalog_id,
47};
48
49/// Glue catalog URI
50pub const GLUE_CATALOG_PROP_URI: &str = "uri";
51/// Glue catalog id
52pub const GLUE_CATALOG_PROP_CATALOG_ID: &str = "catalog_id";
53/// Glue catalog warehouse location
54pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
55
56/// Builder for [`GlueCatalog`].
57#[derive(Debug)]
58pub struct GlueCatalogBuilder {
59    config: GlueCatalogConfig,
60    storage_factory: Option<Arc<dyn StorageFactory>>,
61}
62
63impl Default for GlueCatalogBuilder {
64    fn default() -> Self {
65        Self {
66            config: GlueCatalogConfig {
67                name: None,
68                uri: None,
69                catalog_id: None,
70                warehouse: "".to_string(),
71                props: HashMap::new(),
72            },
73            storage_factory: None,
74        }
75    }
76}
77
78impl CatalogBuilder for GlueCatalogBuilder {
79    type C = GlueCatalog;
80
81    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
82        self.storage_factory = Some(storage_factory);
83        self
84    }
85
86    fn load(
87        mut self,
88        name: impl Into<String>,
89        props: HashMap<String, String>,
90    ) -> impl Future<Output = Result<Self::C>> + Send {
91        self.config.name = Some(name.into());
92
93        if props.contains_key(GLUE_CATALOG_PROP_URI) {
94            self.config.uri = props.get(GLUE_CATALOG_PROP_URI).cloned()
95        }
96
97        if props.contains_key(GLUE_CATALOG_PROP_CATALOG_ID) {
98            self.config.catalog_id = props.get(GLUE_CATALOG_PROP_CATALOG_ID).cloned()
99        }
100
101        if props.contains_key(GLUE_CATALOG_PROP_WAREHOUSE) {
102            self.config.warehouse = props
103                .get(GLUE_CATALOG_PROP_WAREHOUSE)
104                .cloned()
105                .unwrap_or_default();
106        }
107
108        // Collect other remaining properties
109        self.config.props = props
110            .into_iter()
111            .filter(|(k, _)| {
112                k != GLUE_CATALOG_PROP_URI
113                    && k != GLUE_CATALOG_PROP_CATALOG_ID
114                    && k != GLUE_CATALOG_PROP_WAREHOUSE
115            })
116            .collect();
117
118        async move {
119            if self.config.name.is_none() {
120                return Err(Error::new(
121                    ErrorKind::DataInvalid,
122                    "Catalog name is required",
123                ));
124            }
125            if self.config.warehouse.is_empty() {
126                return Err(Error::new(
127                    ErrorKind::DataInvalid,
128                    "Catalog warehouse is required",
129                ));
130            }
131
132            GlueCatalog::new(self.config, self.storage_factory).await
133        }
134    }
135}
136
137#[derive(Debug)]
138/// Glue Catalog configuration
139pub(crate) struct GlueCatalogConfig {
140    name: Option<String>,
141    uri: Option<String>,
142    catalog_id: Option<String>,
143    warehouse: String,
144    props: HashMap<String, String>,
145}
146
147struct GlueClient(aws_sdk_glue::Client);
148
149/// Glue Catalog
150pub struct GlueCatalog {
151    config: GlueCatalogConfig,
152    client: GlueClient,
153    file_io: FileIO,
154}
155
156impl Debug for GlueCatalog {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.debug_struct("GlueCatalog")
159            .field("config", &self.config)
160            .finish_non_exhaustive()
161    }
162}
163
164impl GlueCatalog {
165    /// Create a new glue catalog
166    async fn new(
167        config: GlueCatalogConfig,
168        storage_factory: Option<Arc<dyn StorageFactory>>,
169    ) -> Result<Self> {
170        let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await;
171        let mut file_io_props = config.props.clone();
172        if !file_io_props.contains_key(S3_ACCESS_KEY_ID)
173            && let Some(access_key_id) = file_io_props.get(AWS_ACCESS_KEY_ID)
174        {
175            file_io_props.insert(S3_ACCESS_KEY_ID.to_string(), access_key_id.to_string());
176        }
177        if !file_io_props.contains_key(S3_SECRET_ACCESS_KEY)
178            && let Some(secret_access_key) = file_io_props.get(AWS_SECRET_ACCESS_KEY)
179        {
180            file_io_props.insert(
181                S3_SECRET_ACCESS_KEY.to_string(),
182                secret_access_key.to_string(),
183            );
184        }
185        if !file_io_props.contains_key(S3_REGION)
186            && let Some(region) = file_io_props.get(AWS_REGION_NAME)
187        {
188            file_io_props.insert(S3_REGION.to_string(), region.to_string());
189        }
190        if !file_io_props.contains_key(S3_SESSION_TOKEN)
191            && let Some(session_token) = file_io_props.get(AWS_SESSION_TOKEN)
192        {
193            file_io_props.insert(S3_SESSION_TOKEN.to_string(), session_token.to_string());
194        }
195        if !file_io_props.contains_key(S3_ENDPOINT)
196            && let Some(aws_endpoint) = config.uri.as_ref()
197        {
198            file_io_props.insert(S3_ENDPOINT.to_string(), aws_endpoint.to_string());
199        }
200
201        let client = aws_sdk_glue::Client::new(&sdk_config);
202
203        // Use provided factory or default to OpenDalStorageFactory::S3
204        let factory = storage_factory.unwrap_or_else(|| {
205            Arc::new(OpenDalStorageFactory::S3 {
206                customized_credential_load: None,
207            })
208        });
209        let file_io = FileIOBuilder::new(factory)
210            .with_props(file_io_props)
211            .build();
212
213        Ok(GlueCatalog {
214            config,
215            client: GlueClient(client),
216            file_io,
217        })
218    }
219    /// Get the catalogs `FileIO`
220    pub fn file_io(&self) -> FileIO {
221        self.file_io.clone()
222    }
223
224    /// Loads a table from the Glue Catalog along with its version_id for optimistic locking.
225    ///
226    /// # Returns
227    /// A `Result` wrapping a tuple of (`Table`, `Option<String>`) where the String is the version_id
228    /// from Glue that should be used for optimistic concurrency control when updating the table.
229    ///
230    /// # Errors
231    /// This function may return an error in several scenarios, including:
232    /// - Failure to validate the namespace.
233    /// - Failure to retrieve the table from the Glue Catalog.
234    /// - Absence of metadata location information in the table's properties.
235    /// - Issues reading or deserializing the table's metadata file.
236    async fn load_table_with_version_id(
237        &self,
238        table: &TableIdent,
239    ) -> Result<(Table, Option<String>)> {
240        let db_name = validate_namespace(table.namespace())?;
241        let table_name = table.name();
242
243        let builder = self
244            .client
245            .0
246            .get_table()
247            .database_name(&db_name)
248            .name(table_name);
249        let builder = with_catalog_id!(builder, self.config);
250
251        let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
252
253        let glue_table = glue_table_output.table().ok_or_else(|| {
254            Error::new(
255                ErrorKind::TableNotFound,
256                format!(
257                    "Table object for database: {db_name} and table: {table_name} does not exist"
258                ),
259            )
260        })?;
261
262        let version_id = glue_table.version_id.clone();
263        let metadata_location = get_metadata_location(&glue_table.parameters)?;
264
265        let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
266
267        let table = Table::builder()
268            .file_io(self.file_io())
269            .metadata_location(metadata_location)
270            .metadata(metadata)
271            .identifier(TableIdent::new(
272                NamespaceIdent::new(db_name),
273                table_name.to_owned(),
274            ))
275            .build()?;
276
277        Ok((table, version_id))
278    }
279}
280
281#[async_trait]
282impl Catalog for GlueCatalog {
283    /// List namespaces from glue catalog.
284    ///
285    /// Glue doesn't support nested namespaces.
286    /// We will return an empty list if parent is some.
287    async fn list_namespaces(
288        &self,
289        parent: Option<&NamespaceIdent>,
290    ) -> Result<Vec<NamespaceIdent>> {
291        if parent.is_some() {
292            return Ok(vec![]);
293        }
294
295        let mut database_list: Vec<NamespaceIdent> = Vec::new();
296        let mut next_token: Option<String> = None;
297
298        loop {
299            let builder = match &next_token {
300                Some(token) => self.client.0.get_databases().next_token(token),
301                None => self.client.0.get_databases(),
302            };
303            let builder = with_catalog_id!(builder, self.config);
304            let resp = builder.send().await.map_err(from_aws_sdk_error)?;
305
306            let dbs: Vec<NamespaceIdent> = resp
307                .database_list()
308                .iter()
309                .map(|db| NamespaceIdent::new(db.name().to_string()))
310                .collect();
311
312            database_list.extend(dbs);
313
314            next_token = resp.next_token().map(ToOwned::to_owned);
315            if next_token.is_none() {
316                break;
317            }
318        }
319
320        Ok(database_list)
321    }
322
323    /// Creates a new namespace with the given identifier and properties.
324    ///
325    /// Attempts to create a namespace defined by the `namespace`
326    /// parameter and configured with the specified `properties`.
327    ///
328    /// This function can return an error in the following situations:
329    ///
330    /// - Errors from `validate_namespace` if the namespace identifier does not
331    /// meet validation criteria.
332    /// - Errors from `convert_to_database` if the properties cannot be
333    /// successfully converted into a database configuration.
334    /// - Errors from the underlying database creation process, converted using
335    /// `from_sdk_error`.
336    async fn create_namespace(
337        &self,
338        namespace: &NamespaceIdent,
339        properties: HashMap<String, String>,
340    ) -> Result<Namespace> {
341        if self.namespace_exists(namespace).await? {
342            return Err(Error::new(
343                ErrorKind::NamespaceAlreadyExists,
344                format!("Namespace {namespace:?} already exists"),
345            ));
346        }
347
348        let db_input = convert_to_database(namespace, &properties)?;
349
350        let builder = self.client.0.create_database().database_input(db_input);
351        let builder = with_catalog_id!(builder, self.config);
352
353        builder.send().await.map_err(from_aws_sdk_error)?;
354
355        Ok(Namespace::with_properties(namespace.clone(), properties))
356    }
357
358    /// Retrieves a namespace by its identifier.
359    ///
360    /// Validates the given namespace identifier and then queries the
361    /// underlying database client to fetch the corresponding namespace data.
362    /// Constructs a `Namespace` object with the retrieved data and returns it.
363    ///
364    /// This function can return an error in any of the following situations:
365    /// - If the provided namespace identifier fails validation checks
366    /// - If there is an error querying the database, returned by
367    /// `from_sdk_error`.
368    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
369        let db_name = validate_namespace(namespace)?;
370
371        let builder = self.client.0.get_database().name(&db_name);
372        let builder = with_catalog_id!(builder, self.config);
373
374        let resp = builder.send().await.map_err(|err| {
375            if err
376                .as_service_error()
377                .map(|e| e.is_entity_not_found_exception())
378                == Some(true)
379            {
380                return Error::new(
381                    ErrorKind::NamespaceNotFound,
382                    format!("Namespace {namespace:?} does not exist"),
383                );
384            }
385            from_aws_sdk_error(err)
386        })?;
387
388        match resp.database() {
389            Some(db) => {
390                let namespace = convert_to_namespace(db);
391                Ok(namespace)
392            }
393            None => Err(Error::new(
394                ErrorKind::NamespaceNotFound,
395                format!("Database with name: {db_name} does not exist"),
396            )),
397        }
398    }
399
400    /// Checks if a namespace exists within the Glue Catalog.
401    ///
402    /// Validates the namespace identifier by querying the Glue Catalog
403    /// to determine if the specified namespace (database) exists.
404    ///
405    /// # Returns
406    /// A `Result<bool>` indicating the outcome of the check:
407    /// - `Ok(true)` if the namespace exists.
408    /// - `Ok(false)` if the namespace does not exist, identified by a specific
409    /// `EntityNotFoundException` variant.
410    /// - `Err(...)` if an error occurs during validation or the Glue Catalog
411    /// query, with the error encapsulating the issue.
412    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
413        let db_name = validate_namespace(namespace)?;
414
415        let builder = self.client.0.get_database().name(&db_name);
416        let builder = with_catalog_id!(builder, self.config);
417
418        let resp = builder.send().await;
419
420        match resp {
421            Ok(_) => Ok(true),
422            Err(err) => {
423                if err
424                    .as_service_error()
425                    .map(|e| e.is_entity_not_found_exception())
426                    == Some(true)
427                {
428                    return Ok(false);
429                }
430                Err(from_aws_sdk_error(err))
431            }
432        }
433    }
434
435    /// Asynchronously updates properties of an existing namespace.
436    ///
437    /// Converts the given namespace identifier and properties into a database
438    /// representation and then attempts to update the corresponding namespace
439    /// in the Glue Catalog.
440    ///
441    /// # Returns
442    /// Returns `Ok(())` if the namespace update is successful. If the
443    /// namespace cannot be updated due to missing information or an error
444    /// during the update process, an `Err(...)` is returned.
445    async fn update_namespace(
446        &self,
447        namespace: &NamespaceIdent,
448        properties: HashMap<String, String>,
449    ) -> Result<()> {
450        if !self.namespace_exists(namespace).await? {
451            return Err(Error::new(
452                ErrorKind::NamespaceNotFound,
453                format!("Namespace {namespace:?} does not exist"),
454            ));
455        }
456
457        let db_name = validate_namespace(namespace)?;
458        let db_input = convert_to_database(namespace, &properties)?;
459
460        let builder = self
461            .client
462            .0
463            .update_database()
464            .name(&db_name)
465            .database_input(db_input);
466        let builder = with_catalog_id!(builder, self.config);
467
468        builder.send().await.map_err(from_aws_sdk_error)?;
469
470        Ok(())
471    }
472
473    /// Asynchronously drops a namespace from the Glue Catalog.
474    ///
475    /// Checks if the namespace is empty. If it still contains tables the
476    /// namespace will not be dropped, but an error is returned instead.
477    ///
478    /// # Returns
479    /// A `Result<()>` indicating the outcome:
480    /// - `Ok(())` signifies successful namespace deletion.
481    /// - `Err(...)` signifies failure to drop the namespace due to validation
482    /// errors, connectivity issues, or Glue Catalog constraints.
483    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
484        if !self.namespace_exists(namespace).await? {
485            return Err(Error::new(
486                ErrorKind::NamespaceNotFound,
487                format!("Namespace {namespace:?} does not exist"),
488            ));
489        }
490
491        let db_name = validate_namespace(namespace)?;
492        let table_list = self.list_tables(namespace).await?;
493
494        if !table_list.is_empty() {
495            return Err(Error::new(
496                ErrorKind::DataInvalid,
497                format!("Database with name: {} is not empty", &db_name),
498            ));
499        }
500
501        let builder = self.client.0.delete_database().name(db_name);
502        let builder = with_catalog_id!(builder, self.config);
503
504        builder.send().await.map_err(from_aws_sdk_error)?;
505
506        Ok(())
507    }
508
509    /// Asynchronously lists all tables within a specified namespace.
510    ///
511    /// # Returns
512    /// A `Result<Vec<TableIdent>>`, which is:
513    /// - `Ok(vec![...])` containing a vector of `TableIdent` instances, each
514    /// representing a table within the specified namespace.
515    /// - `Err(...)` if an error occurs during namespace validation or while
516    /// querying the database.
517    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
518        let db_name = validate_namespace(namespace)?;
519
520        let mut table_list: Vec<TableIdent> = Vec::new();
521        let mut next_token: Option<String> = None;
522
523        loop {
524            let builder = match &next_token {
525                Some(token) => self
526                    .client
527                    .0
528                    .get_tables()
529                    .database_name(&db_name)
530                    .next_token(token),
531                None => self.client.0.get_tables().database_name(&db_name),
532            };
533            let builder = with_catalog_id!(builder, self.config);
534            let resp = builder.send().await.map_err(from_aws_sdk_error)?;
535
536            let tables: Vec<_> = resp
537                .table_list()
538                .iter()
539                .map(|tbl| TableIdent::new(namespace.clone(), tbl.name().to_string()))
540                .collect();
541
542            table_list.extend(tables);
543
544            next_token = resp.next_token().map(ToOwned::to_owned);
545            if next_token.is_none() {
546                break;
547            }
548        }
549
550        Ok(table_list)
551    }
552
553    /// Creates a new table within a specified namespace using the provided
554    /// table creation settings.
555    ///
556    /// # Returns
557    /// A `Result` wrapping a `Table` object representing the newly created
558    /// table.
559    ///
560    /// # Errors
561    /// This function may return an error in several cases, including invalid
562    /// namespace identifiers, failure to determine a default storage location,
563    /// issues generating or writing table metadata, and errors communicating
564    /// with the Glue Catalog.
565    async fn create_table(
566        &self,
567        namespace: &NamespaceIdent,
568        mut creation: TableCreation,
569    ) -> Result<Table> {
570        let db_name = validate_namespace(namespace)?;
571        let table_name = creation.name.clone();
572
573        let location = match &creation.location {
574            Some(location) => location.clone(),
575            None => {
576                let ns = self.get_namespace(namespace).await?;
577                let location =
578                    get_default_table_location(&ns, &db_name, &table_name, &self.config.warehouse);
579                creation.location = Some(location.clone());
580                location
581            }
582        };
583        let metadata = TableMetadataBuilder::from_table_creation(creation)?
584            .build()?
585            .metadata;
586        let metadata_location = MetadataLocation::new_with_metadata(location.clone(), &metadata);
587
588        metadata.write_to(&self.file_io, &metadata_location).await?;
589
590        let metadata_location_str = metadata_location.to_string();
591        let glue_table = convert_to_glue_table(
592            &table_name,
593            metadata_location_str.clone(),
594            &metadata,
595            metadata.properties(),
596            None,
597        )?;
598
599        let builder = self
600            .client
601            .0
602            .create_table()
603            .database_name(&db_name)
604            .table_input(glue_table);
605        let builder = with_catalog_id!(builder, self.config);
606
607        builder.send().await.map_err(from_aws_sdk_error)?;
608
609        Table::builder()
610            .file_io(self.file_io())
611            .metadata_location(metadata_location_str)
612            .metadata(metadata)
613            .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
614            .build()
615    }
616
617    /// Loads a table from the Glue Catalog and constructs a `Table` object
618    /// based on its metadata.
619    ///
620    /// # Returns
621    /// A `Result` wrapping a `Table` object that represents the loaded table.
622    ///
623    /// # Errors
624    /// This function may return an error in several scenarios, including:
625    /// - Failure to validate the namespace.
626    /// - Failure to retrieve the table from the Glue Catalog.
627    /// - Absence of metadata location information in the table's properties.
628    /// - Issues reading or deserializing the table's metadata file.
629    async fn load_table(&self, table: &TableIdent) -> Result<Table> {
630        let (table, _) = self.load_table_with_version_id(table).await?;
631        Ok(table)
632    }
633
634    /// Asynchronously drops a table from the database.
635    ///
636    /// # Errors
637    /// Returns an error if:
638    /// - The namespace provided in `table` cannot be validated
639    /// or does not exist.
640    /// - The underlying database client encounters an error while
641    /// attempting to drop the table. This includes scenarios where
642    /// the table does not exist.
643    /// - Any network or communication error occurs with the database backend.
644    async fn drop_table(&self, table: &TableIdent) -> Result<()> {
645        let db_name = validate_namespace(table.namespace())?;
646        let table_name = table.name();
647
648        let builder = self
649            .client
650            .0
651            .delete_table()
652            .database_name(&db_name)
653            .name(table_name);
654        let builder = with_catalog_id!(builder, self.config);
655
656        builder.send().await.map_err(from_aws_sdk_error)?;
657
658        Ok(())
659    }
660
661    async fn purge_table(&self, table: &TableIdent) -> Result<()> {
662        let table_info = self.load_table(table).await?;
663        self.drop_table(table).await?;
664        iceberg::drop_table_data(
665            table_info.file_io(),
666            table_info.metadata(),
667            table_info.metadata_location(),
668        )
669        .await
670    }
671
672    /// Asynchronously checks the existence of a specified table
673    /// in the database.
674    ///
675    /// # Returns
676    /// - `Ok(true)` if the table exists in the database.
677    /// - `Ok(false)` if the table does not exist in the database.
678    /// - `Err(...)` if an error occurs during the process
679    async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
680        let db_name = validate_namespace(table.namespace())?;
681        let table_name = table.name();
682
683        let builder = self
684            .client
685            .0
686            .get_table()
687            .database_name(&db_name)
688            .name(table_name);
689        let builder = with_catalog_id!(builder, self.config);
690
691        let resp = builder.send().await;
692
693        match resp {
694            Ok(_) => Ok(true),
695            Err(err) => {
696                if err
697                    .as_service_error()
698                    .map(|e| e.is_entity_not_found_exception())
699                    == Some(true)
700                {
701                    return Ok(false);
702                }
703                Err(from_aws_sdk_error(err))
704            }
705        }
706    }
707
708    /// Asynchronously renames a table within the database
709    /// or moves it between namespaces (databases).
710    ///
711    /// # Returns
712    /// - `Ok(())` on successful rename or move of the table.
713    /// - `Err(...)` if an error occurs during the process.
714    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
715        let src_db_name = validate_namespace(src.namespace())?;
716        let dest_db_name = validate_namespace(dest.namespace())?;
717
718        let src_table_name = src.name();
719        let dest_table_name = dest.name();
720
721        let builder = self
722            .client
723            .0
724            .get_table()
725            .database_name(&src_db_name)
726            .name(src_table_name);
727        let builder = with_catalog_id!(builder, self.config);
728
729        let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
730
731        match glue_table_output.table() {
732            None => Err(Error::new(
733                ErrorKind::TableNotFound,
734                format!(
735                    "'Table' object for database: {src_db_name} and table: {src_table_name} does not exist"
736                ),
737            )),
738            Some(table) => {
739                let rename_table_input = TableInput::builder()
740                    .name(dest_table_name)
741                    .set_parameters(table.parameters.clone())
742                    .set_storage_descriptor(table.storage_descriptor.clone())
743                    .set_table_type(table.table_type.clone())
744                    .set_description(table.description.clone())
745                    .build()
746                    .map_err(from_aws_build_error)?;
747
748                let builder = self
749                    .client
750                    .0
751                    .create_table()
752                    .database_name(&dest_db_name)
753                    .table_input(rename_table_input);
754                let builder = with_catalog_id!(builder, self.config);
755
756                builder.send().await.map_err(from_aws_sdk_error)?;
757
758                let drop_src_table_result = self.drop_table(src).await;
759
760                match drop_src_table_result {
761                    Ok(_) => Ok(()),
762                    Err(_) => {
763                        let err_msg_src_table =
764                            format!("Failed to drop old table {src_db_name}.{src_table_name}.");
765
766                        let drop_dest_table_result = self.drop_table(dest).await;
767
768                        match drop_dest_table_result {
769                            Ok(_) => Err(Error::new(
770                                ErrorKind::Unexpected,
771                                format!(
772                                    "{err_msg_src_table} Rolled back table creation for {dest_db_name}.{dest_table_name}."
773                                ),
774                            )),
775                            Err(_) => Err(Error::new(
776                                ErrorKind::Unexpected,
777                                format!(
778                                    "{err_msg_src_table} Failed to roll back table creation for {dest_db_name}.{dest_table_name}. Please clean up manually."
779                                ),
780                            )),
781                        }
782                    }
783                }
784            }
785        }
786    }
787
788    /// registers an existing table into the Glue Catalog.
789    ///
790    /// Converts the provided table identifier and metadata location into a
791    /// Glue-compatible table representation, and attempts to create the
792    /// corresponding table in the Glue Catalog.
793    ///
794    /// # Returns
795    /// Returns `Ok(Table)` if the table is successfully registered and loaded.
796    /// If the registration fails due to validation issues, existing table conflicts,
797    /// metadata problems, or errors during the registration or loading process,
798    /// an `Err(...)` is returned.
799    async fn register_table(
800        &self,
801        table_ident: &TableIdent,
802        metadata_location: String,
803    ) -> Result<Table> {
804        let db_name = validate_namespace(table_ident.namespace())?;
805        let table_name = table_ident.name();
806        let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
807
808        let table_input = convert_to_glue_table(
809            table_name,
810            metadata_location.clone(),
811            &metadata,
812            metadata.properties(),
813            None,
814        )?;
815
816        let builder = self
817            .client
818            .0
819            .create_table()
820            .database_name(&db_name)
821            .table_input(table_input);
822        let builder = with_catalog_id!(builder, self.config);
823
824        builder.send().await.map_err(|e| {
825            let error = e.into_service_error();
826            match error {
827                CreateTableError::EntityNotFoundException(_) => Error::new(
828                    ErrorKind::NamespaceNotFound,
829                    format!("Database {db_name} does not exist"),
830                ),
831                CreateTableError::AlreadyExistsException(_) => Error::new(
832                    ErrorKind::TableAlreadyExists,
833                    format!("Table {table_ident} already exists"),
834                ),
835                _ => Error::new(
836                    ErrorKind::Unexpected,
837                    format!("Failed to register table {table_ident} due to AWS SDK error"),
838                ),
839            }
840            .with_source(anyhow!("aws sdk error: {error:?}"))
841        })?;
842
843        Ok(Table::builder()
844            .identifier(table_ident.clone())
845            .metadata_location(metadata_location)
846            .metadata(metadata)
847            .file_io(self.file_io())
848            .build()?)
849    }
850
851    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
852        let table_ident = commit.identifier().clone();
853        let table_namespace = validate_namespace(table_ident.namespace())?;
854
855        let (current_table, current_version_id) =
856            self.load_table_with_version_id(&table_ident).await?;
857        let current_metadata_location = current_table.metadata_location_result()?.to_string();
858
859        let staged_table = commit.apply(current_table)?;
860        let staged_metadata_location_str = staged_table.metadata_location_result()?;
861        let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
862
863        // Write new metadata
864        staged_table
865            .metadata()
866            .write_to(staged_table.file_io(), &staged_metadata_location)
867            .await?;
868
869        // Persist staged table to Glue with optimistic locking
870        let mut builder = self
871            .client
872            .0
873            .update_table()
874            .database_name(table_namespace)
875            .set_skip_archive(Some(true)) // todo make this configurable
876            .table_input(convert_to_glue_table(
877                table_ident.name(),
878                staged_metadata_location.to_string(),
879                staged_table.metadata(),
880                staged_table.metadata().properties(),
881                Some(current_metadata_location),
882            )?);
883
884        // Add VersionId for optimistic locking
885        if let Some(version_id) = current_version_id {
886            builder = builder.version_id(version_id);
887        }
888
889        let builder = with_catalog_id!(builder, self.config);
890        let _ = builder.send().await.map_err(|e| {
891            let error = e.into_service_error();
892            match error {
893                UpdateTableError::EntityNotFoundException(_) => Error::new(
894                    ErrorKind::TableNotFound,
895                    format!("Table {table_ident} is not found"),
896                ),
897                UpdateTableError::ConcurrentModificationException(_) => Error::new(
898                    ErrorKind::CatalogCommitConflicts,
899                    format!("Commit failed for table: {table_ident}"),
900                )
901                .with_retryable(true),
902                _ => Error::new(
903                    ErrorKind::Unexpected,
904                    format!("Operation failed for table: {table_ident} for hitting aws sdk error"),
905                ),
906            }
907            .with_source(anyhow!("aws sdk error: {error:?}"))
908        })?;
909
910        Ok(staged_table)
911    }
912}