iceberg_catalog_glue/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use anyhow::anyhow;
24use async_trait::async_trait;
25use aws_sdk_glue::operation::create_table::CreateTableError;
26use aws_sdk_glue::operation::update_table::UpdateTableError;
27use aws_sdk_glue::types::TableInput;
28use iceberg::io::{
29    FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
30    S3_SESSION_TOKEN, StorageFactory,
31};
32use iceberg::spec::{TableMetadata, TableMetadataBuilder};
33use iceberg::table::Table;
34use iceberg::{
35    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
36    Runtime, TableCommit, TableCreation, TableIdent,
37};
38use iceberg_storage_opendal::OpenDalStorageFactory;
39
40use crate::error::{from_aws_build_error, from_aws_sdk_error};
41use crate::utils::{
42    convert_to_database, convert_to_glue_table, convert_to_namespace, create_sdk_config,
43    get_default_table_location, get_metadata_location, validate_namespace,
44};
45use crate::{
46    AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, with_catalog_id,
47};
48
49/// Glue catalog URI
50pub const GLUE_CATALOG_PROP_URI: &str = "uri";
51/// Glue catalog id
52pub const GLUE_CATALOG_PROP_CATALOG_ID: &str = "catalog_id";
53/// Glue catalog warehouse location
54pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
55
56/// Builder for [`GlueCatalog`].
57#[derive(Debug)]
58pub struct GlueCatalogBuilder {
59    config: GlueCatalogConfig,
60    storage_factory: Option<Arc<dyn StorageFactory>>,
61    runtime: Option<Runtime>,
62}
63
64impl Default for GlueCatalogBuilder {
65    fn default() -> Self {
66        Self {
67            config: GlueCatalogConfig {
68                name: None,
69                uri: None,
70                catalog_id: None,
71                warehouse: "".to_string(),
72                props: HashMap::new(),
73            },
74            storage_factory: None,
75            runtime: None,
76        }
77    }
78}
79
80impl CatalogBuilder for GlueCatalogBuilder {
81    type C = GlueCatalog;
82
83    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
84        self.storage_factory = Some(storage_factory);
85        self
86    }
87
88    fn with_runtime(mut self, runtime: Runtime) -> Self {
89        self.runtime = Some(runtime);
90        self
91    }
92
93    fn load(
94        mut self,
95        name: impl Into<String>,
96        props: HashMap<String, String>,
97    ) -> impl Future<Output = Result<Self::C>> + Send {
98        self.config.name = Some(name.into());
99
100        if props.contains_key(GLUE_CATALOG_PROP_URI) {
101            self.config.uri = props.get(GLUE_CATALOG_PROP_URI).cloned()
102        }
103
104        if props.contains_key(GLUE_CATALOG_PROP_CATALOG_ID) {
105            self.config.catalog_id = props.get(GLUE_CATALOG_PROP_CATALOG_ID).cloned()
106        }
107
108        if props.contains_key(GLUE_CATALOG_PROP_WAREHOUSE) {
109            self.config.warehouse = props
110                .get(GLUE_CATALOG_PROP_WAREHOUSE)
111                .cloned()
112                .unwrap_or_default();
113        }
114
115        // Collect other remaining properties
116        self.config.props = props
117            .into_iter()
118            .filter(|(k, _)| {
119                k != GLUE_CATALOG_PROP_URI
120                    && k != GLUE_CATALOG_PROP_CATALOG_ID
121                    && k != GLUE_CATALOG_PROP_WAREHOUSE
122            })
123            .collect();
124
125        async move {
126            if self.config.name.is_none() {
127                return Err(Error::new(
128                    ErrorKind::DataInvalid,
129                    "Catalog name is required",
130                ));
131            }
132            if self.config.warehouse.is_empty() {
133                return Err(Error::new(
134                    ErrorKind::DataInvalid,
135                    "Catalog warehouse is required",
136                ));
137            }
138
139            let runtime = match self.runtime {
140                Some(rt) => rt,
141                None => Runtime::try_current()?,
142            };
143            GlueCatalog::new(self.config, self.storage_factory, runtime).await
144        }
145    }
146}
147
148#[derive(Debug)]
149/// Glue Catalog configuration
150pub(crate) struct GlueCatalogConfig {
151    name: Option<String>,
152    uri: Option<String>,
153    catalog_id: Option<String>,
154    warehouse: String,
155    props: HashMap<String, String>,
156}
157
158struct GlueClient(aws_sdk_glue::Client);
159
160/// Glue Catalog
161pub struct GlueCatalog {
162    config: GlueCatalogConfig,
163    client: GlueClient,
164    file_io: FileIO,
165    runtime: Runtime,
166}
167
168impl Debug for GlueCatalog {
169    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170        f.debug_struct("GlueCatalog")
171            .field("config", &self.config)
172            .finish_non_exhaustive()
173    }
174}
175
176impl GlueCatalog {
177    /// Create a new glue catalog
178    async fn new(
179        config: GlueCatalogConfig,
180        storage_factory: Option<Arc<dyn StorageFactory>>,
181        runtime: Runtime,
182    ) -> Result<Self> {
183        let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await;
184        let mut file_io_props = config.props.clone();
185        if !file_io_props.contains_key(S3_ACCESS_KEY_ID)
186            && let Some(access_key_id) = file_io_props.get(AWS_ACCESS_KEY_ID)
187        {
188            file_io_props.insert(S3_ACCESS_KEY_ID.to_string(), access_key_id.to_string());
189        }
190        if !file_io_props.contains_key(S3_SECRET_ACCESS_KEY)
191            && let Some(secret_access_key) = file_io_props.get(AWS_SECRET_ACCESS_KEY)
192        {
193            file_io_props.insert(
194                S3_SECRET_ACCESS_KEY.to_string(),
195                secret_access_key.to_string(),
196            );
197        }
198        if !file_io_props.contains_key(S3_REGION)
199            && let Some(region) = file_io_props.get(AWS_REGION_NAME)
200        {
201            file_io_props.insert(S3_REGION.to_string(), region.to_string());
202        }
203        if !file_io_props.contains_key(S3_SESSION_TOKEN)
204            && let Some(session_token) = file_io_props.get(AWS_SESSION_TOKEN)
205        {
206            file_io_props.insert(S3_SESSION_TOKEN.to_string(), session_token.to_string());
207        }
208        if !file_io_props.contains_key(S3_ENDPOINT)
209            && let Some(aws_endpoint) = config.uri.as_ref()
210        {
211            file_io_props.insert(S3_ENDPOINT.to_string(), aws_endpoint.to_string());
212        }
213
214        let client = aws_sdk_glue::Client::new(&sdk_config);
215
216        // Use provided factory or default to OpenDalStorageFactory::S3
217        let factory = storage_factory.unwrap_or_else(|| {
218            Arc::new(OpenDalStorageFactory::S3 {
219                customized_credential_load: None,
220            })
221        });
222        let file_io = FileIOBuilder::new(factory)
223            .with_props(file_io_props)
224            .build();
225
226        Ok(GlueCatalog {
227            config,
228            client: GlueClient(client),
229            file_io,
230            runtime,
231        })
232    }
233    /// Get the catalogs `FileIO`
234    pub fn file_io(&self) -> FileIO {
235        self.file_io.clone()
236    }
237
238    /// Loads a table from the Glue Catalog along with its version_id for optimistic locking.
239    ///
240    /// # Returns
241    /// A `Result` wrapping a tuple of (`Table`, `Option<String>`) where the String is the version_id
242    /// from Glue that should be used for optimistic concurrency control when updating the table.
243    ///
244    /// # Errors
245    /// This function may return an error in several scenarios, including:
246    /// - Failure to validate the namespace.
247    /// - Failure to retrieve the table from the Glue Catalog.
248    /// - Absence of metadata location information in the table's properties.
249    /// - Issues reading or deserializing the table's metadata file.
250    async fn load_table_with_version_id(
251        &self,
252        table: &TableIdent,
253    ) -> Result<(Table, Option<String>)> {
254        let db_name = validate_namespace(table.namespace())?;
255        let table_name = table.name();
256
257        let builder = self
258            .client
259            .0
260            .get_table()
261            .database_name(&db_name)
262            .name(table_name);
263        let builder = with_catalog_id!(builder, self.config);
264
265        let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
266
267        let glue_table = glue_table_output.table().ok_or_else(|| {
268            Error::new(
269                ErrorKind::TableNotFound,
270                format!(
271                    "Table object for database: {db_name} and table: {table_name} does not exist"
272                ),
273            )
274        })?;
275
276        let version_id = glue_table.version_id.clone();
277        let metadata_location = get_metadata_location(&glue_table.parameters)?;
278
279        let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
280
281        let table = Table::builder()
282            .file_io(self.file_io())
283            .metadata_location(metadata_location)
284            .metadata(metadata)
285            .identifier(TableIdent::new(
286                NamespaceIdent::new(db_name),
287                table_name.to_owned(),
288            ))
289            .runtime(self.runtime.clone())
290            .build()?;
291
292        Ok((table, version_id))
293    }
294}
295
296#[async_trait]
297impl Catalog for GlueCatalog {
298    /// List namespaces from glue catalog.
299    ///
300    /// Glue doesn't support nested namespaces.
301    /// We will return an empty list if parent is some.
302    async fn list_namespaces(
303        &self,
304        parent: Option<&NamespaceIdent>,
305    ) -> Result<Vec<NamespaceIdent>> {
306        if parent.is_some() {
307            return Ok(vec![]);
308        }
309
310        let mut database_list: Vec<NamespaceIdent> = Vec::new();
311        let mut next_token: Option<String> = None;
312
313        loop {
314            let builder = match &next_token {
315                Some(token) => self.client.0.get_databases().next_token(token),
316                None => self.client.0.get_databases(),
317            };
318            let builder = with_catalog_id!(builder, self.config);
319            let resp = builder.send().await.map_err(from_aws_sdk_error)?;
320
321            let dbs: Vec<NamespaceIdent> = resp
322                .database_list()
323                .iter()
324                .map(|db| NamespaceIdent::new(db.name().to_string()))
325                .collect();
326
327            database_list.extend(dbs);
328
329            next_token = resp.next_token().map(ToOwned::to_owned);
330            if next_token.is_none() {
331                break;
332            }
333        }
334
335        Ok(database_list)
336    }
337
338    /// Creates a new namespace with the given identifier and properties.
339    ///
340    /// Attempts to create a namespace defined by the `namespace`
341    /// parameter and configured with the specified `properties`.
342    ///
343    /// This function can return an error in the following situations:
344    ///
345    /// - Errors from `validate_namespace` if the namespace identifier does not
346    /// meet validation criteria.
347    /// - Errors from `convert_to_database` if the properties cannot be
348    /// successfully converted into a database configuration.
349    /// - Errors from the underlying database creation process, converted using
350    /// `from_sdk_error`.
351    async fn create_namespace(
352        &self,
353        namespace: &NamespaceIdent,
354        properties: HashMap<String, String>,
355    ) -> Result<Namespace> {
356        if self.namespace_exists(namespace).await? {
357            return Err(Error::new(
358                ErrorKind::NamespaceAlreadyExists,
359                format!("Namespace {namespace:?} already exists"),
360            ));
361        }
362
363        let db_input = convert_to_database(namespace, &properties)?;
364
365        let builder = self.client.0.create_database().database_input(db_input);
366        let builder = with_catalog_id!(builder, self.config);
367
368        builder.send().await.map_err(from_aws_sdk_error)?;
369
370        Ok(Namespace::with_properties(namespace.clone(), properties))
371    }
372
373    /// Retrieves a namespace by its identifier.
374    ///
375    /// Validates the given namespace identifier and then queries the
376    /// underlying database client to fetch the corresponding namespace data.
377    /// Constructs a `Namespace` object with the retrieved data and returns it.
378    ///
379    /// This function can return an error in any of the following situations:
380    /// - If the provided namespace identifier fails validation checks
381    /// - If there is an error querying the database, returned by
382    /// `from_sdk_error`.
383    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
384        let db_name = validate_namespace(namespace)?;
385
386        let builder = self.client.0.get_database().name(&db_name);
387        let builder = with_catalog_id!(builder, self.config);
388
389        let resp = builder.send().await.map_err(|err| {
390            if err
391                .as_service_error()
392                .map(|e| e.is_entity_not_found_exception())
393                == Some(true)
394            {
395                return Error::new(
396                    ErrorKind::NamespaceNotFound,
397                    format!("Namespace {namespace:?} does not exist"),
398                );
399            }
400            from_aws_sdk_error(err)
401        })?;
402
403        match resp.database() {
404            Some(db) => {
405                let namespace = convert_to_namespace(db);
406                Ok(namespace)
407            }
408            None => Err(Error::new(
409                ErrorKind::NamespaceNotFound,
410                format!("Database with name: {db_name} does not exist"),
411            )),
412        }
413    }
414
415    /// Checks if a namespace exists within the Glue Catalog.
416    ///
417    /// Validates the namespace identifier by querying the Glue Catalog
418    /// to determine if the specified namespace (database) exists.
419    ///
420    /// # Returns
421    /// A `Result<bool>` indicating the outcome of the check:
422    /// - `Ok(true)` if the namespace exists.
423    /// - `Ok(false)` if the namespace does not exist, identified by a specific
424    /// `EntityNotFoundException` variant.
425    /// - `Err(...)` if an error occurs during validation or the Glue Catalog
426    /// query, with the error encapsulating the issue.
427    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
428        let db_name = validate_namespace(namespace)?;
429
430        let builder = self.client.0.get_database().name(&db_name);
431        let builder = with_catalog_id!(builder, self.config);
432
433        let resp = builder.send().await;
434
435        match resp {
436            Ok(_) => Ok(true),
437            Err(err) => {
438                if err
439                    .as_service_error()
440                    .map(|e| e.is_entity_not_found_exception())
441                    == Some(true)
442                {
443                    return Ok(false);
444                }
445                Err(from_aws_sdk_error(err))
446            }
447        }
448    }
449
450    /// Asynchronously updates properties of an existing namespace.
451    ///
452    /// Converts the given namespace identifier and properties into a database
453    /// representation and then attempts to update the corresponding namespace
454    /// in the Glue Catalog.
455    ///
456    /// # Returns
457    /// Returns `Ok(())` if the namespace update is successful. If the
458    /// namespace cannot be updated due to missing information or an error
459    /// during the update process, an `Err(...)` is returned.
460    async fn update_namespace(
461        &self,
462        namespace: &NamespaceIdent,
463        properties: HashMap<String, String>,
464    ) -> Result<()> {
465        if !self.namespace_exists(namespace).await? {
466            return Err(Error::new(
467                ErrorKind::NamespaceNotFound,
468                format!("Namespace {namespace:?} does not exist"),
469            ));
470        }
471
472        let db_name = validate_namespace(namespace)?;
473        let db_input = convert_to_database(namespace, &properties)?;
474
475        let builder = self
476            .client
477            .0
478            .update_database()
479            .name(&db_name)
480            .database_input(db_input);
481        let builder = with_catalog_id!(builder, self.config);
482
483        builder.send().await.map_err(from_aws_sdk_error)?;
484
485        Ok(())
486    }
487
488    /// Asynchronously drops a namespace from the Glue Catalog.
489    ///
490    /// Checks if the namespace is empty. If it still contains tables the
491    /// namespace will not be dropped, but an error is returned instead.
492    ///
493    /// # Returns
494    /// A `Result<()>` indicating the outcome:
495    /// - `Ok(())` signifies successful namespace deletion.
496    /// - `Err(...)` signifies failure to drop the namespace due to validation
497    /// errors, connectivity issues, or Glue Catalog constraints.
498    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
499        if !self.namespace_exists(namespace).await? {
500            return Err(Error::new(
501                ErrorKind::NamespaceNotFound,
502                format!("Namespace {namespace:?} does not exist"),
503            ));
504        }
505
506        let db_name = validate_namespace(namespace)?;
507        let table_list = self.list_tables(namespace).await?;
508
509        if !table_list.is_empty() {
510            return Err(Error::new(
511                ErrorKind::DataInvalid,
512                format!("Database with name: {} is not empty", &db_name),
513            ));
514        }
515
516        let builder = self.client.0.delete_database().name(db_name);
517        let builder = with_catalog_id!(builder, self.config);
518
519        builder.send().await.map_err(from_aws_sdk_error)?;
520
521        Ok(())
522    }
523
524    /// Asynchronously lists all tables within a specified namespace.
525    ///
526    /// # Returns
527    /// A `Result<Vec<TableIdent>>`, which is:
528    /// - `Ok(vec![...])` containing a vector of `TableIdent` instances, each
529    /// representing a table within the specified namespace.
530    /// - `Err(...)` if an error occurs during namespace validation or while
531    /// querying the database.
532    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
533        let db_name = validate_namespace(namespace)?;
534
535        let mut table_list: Vec<TableIdent> = Vec::new();
536        let mut next_token: Option<String> = None;
537
538        loop {
539            let builder = match &next_token {
540                Some(token) => self
541                    .client
542                    .0
543                    .get_tables()
544                    .database_name(&db_name)
545                    .next_token(token),
546                None => self.client.0.get_tables().database_name(&db_name),
547            };
548            let builder = with_catalog_id!(builder, self.config);
549            let resp = builder.send().await.map_err(from_aws_sdk_error)?;
550
551            let tables: Vec<_> = resp
552                .table_list()
553                .iter()
554                .map(|tbl| TableIdent::new(namespace.clone(), tbl.name().to_string()))
555                .collect();
556
557            table_list.extend(tables);
558
559            next_token = resp.next_token().map(ToOwned::to_owned);
560            if next_token.is_none() {
561                break;
562            }
563        }
564
565        Ok(table_list)
566    }
567
568    /// Creates a new table within a specified namespace using the provided
569    /// table creation settings.
570    ///
571    /// # Returns
572    /// A `Result` wrapping a `Table` object representing the newly created
573    /// table.
574    ///
575    /// # Errors
576    /// This function may return an error in several cases, including invalid
577    /// namespace identifiers, failure to determine a default storage location,
578    /// issues generating or writing table metadata, and errors communicating
579    /// with the Glue Catalog.
580    async fn create_table(
581        &self,
582        namespace: &NamespaceIdent,
583        mut creation: TableCreation,
584    ) -> Result<Table> {
585        let db_name = validate_namespace(namespace)?;
586        let table_name = creation.name.clone();
587
588        let location = match &creation.location {
589            Some(location) => location.clone(),
590            None => {
591                let ns = self.get_namespace(namespace).await?;
592                let location =
593                    get_default_table_location(&ns, &db_name, &table_name, &self.config.warehouse);
594                creation.location = Some(location.clone());
595                location
596            }
597        };
598        let metadata = TableMetadataBuilder::from_table_creation(creation)?
599            .build()?
600            .metadata;
601        let metadata_location = MetadataLocation::new_with_metadata(location.clone(), &metadata);
602
603        metadata.write_to(&self.file_io, &metadata_location).await?;
604
605        let metadata_location_str = metadata_location.to_string();
606        let glue_table = convert_to_glue_table(
607            &table_name,
608            metadata_location_str.clone(),
609            &metadata,
610            metadata.properties(),
611            None,
612        )?;
613
614        let builder = self
615            .client
616            .0
617            .create_table()
618            .database_name(&db_name)
619            .table_input(glue_table);
620        let builder = with_catalog_id!(builder, self.config);
621
622        builder.send().await.map_err(from_aws_sdk_error)?;
623
624        Table::builder()
625            .file_io(self.file_io())
626            .metadata_location(metadata_location_str)
627            .metadata(metadata)
628            .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
629            .runtime(self.runtime.clone())
630            .build()
631    }
632
633    /// Loads a table from the Glue Catalog and constructs a `Table` object
634    /// based on its metadata.
635    ///
636    /// # Returns
637    /// A `Result` wrapping a `Table` object that represents the loaded table.
638    ///
639    /// # Errors
640    /// This function may return an error in several scenarios, including:
641    /// - Failure to validate the namespace.
642    /// - Failure to retrieve the table from the Glue Catalog.
643    /// - Absence of metadata location information in the table's properties.
644    /// - Issues reading or deserializing the table's metadata file.
645    async fn load_table(&self, table: &TableIdent) -> Result<Table> {
646        let (table, _) = self.load_table_with_version_id(table).await?;
647        Ok(table)
648    }
649
650    /// Asynchronously drops a table from the database.
651    ///
652    /// # Errors
653    /// Returns an error if:
654    /// - The namespace provided in `table` cannot be validated
655    /// or does not exist.
656    /// - The underlying database client encounters an error while
657    /// attempting to drop the table. This includes scenarios where
658    /// the table does not exist.
659    /// - Any network or communication error occurs with the database backend.
660    async fn drop_table(&self, table: &TableIdent) -> Result<()> {
661        let db_name = validate_namespace(table.namespace())?;
662        let table_name = table.name();
663
664        let builder = self
665            .client
666            .0
667            .delete_table()
668            .database_name(&db_name)
669            .name(table_name);
670        let builder = with_catalog_id!(builder, self.config);
671
672        builder.send().await.map_err(from_aws_sdk_error)?;
673
674        Ok(())
675    }
676
677    async fn purge_table(&self, table: &TableIdent) -> Result<()> {
678        let table_info = self.load_table(table).await?;
679        self.drop_table(table).await?;
680        iceberg::drop_table_data(
681            table_info.file_io(),
682            table_info.metadata(),
683            table_info.metadata_location(),
684        )
685        .await
686    }
687
688    /// Asynchronously checks the existence of a specified table
689    /// in the database.
690    ///
691    /// # Returns
692    /// - `Ok(true)` if the table exists in the database.
693    /// - `Ok(false)` if the table does not exist in the database.
694    /// - `Err(...)` if an error occurs during the process
695    async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
696        let db_name = validate_namespace(table.namespace())?;
697        let table_name = table.name();
698
699        let builder = self
700            .client
701            .0
702            .get_table()
703            .database_name(&db_name)
704            .name(table_name);
705        let builder = with_catalog_id!(builder, self.config);
706
707        let resp = builder.send().await;
708
709        match resp {
710            Ok(_) => Ok(true),
711            Err(err) => {
712                if err
713                    .as_service_error()
714                    .map(|e| e.is_entity_not_found_exception())
715                    == Some(true)
716                {
717                    return Ok(false);
718                }
719                Err(from_aws_sdk_error(err))
720            }
721        }
722    }
723
724    /// Asynchronously renames a table within the database
725    /// or moves it between namespaces (databases).
726    ///
727    /// # Returns
728    /// - `Ok(())` on successful rename or move of the table.
729    /// - `Err(...)` if an error occurs during the process.
730    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
731        let src_db_name = validate_namespace(src.namespace())?;
732        let dest_db_name = validate_namespace(dest.namespace())?;
733
734        let src_table_name = src.name();
735        let dest_table_name = dest.name();
736
737        let builder = self
738            .client
739            .0
740            .get_table()
741            .database_name(&src_db_name)
742            .name(src_table_name);
743        let builder = with_catalog_id!(builder, self.config);
744
745        let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
746
747        match glue_table_output.table() {
748            None => Err(Error::new(
749                ErrorKind::TableNotFound,
750                format!(
751                    "'Table' object for database: {src_db_name} and table: {src_table_name} does not exist"
752                ),
753            )),
754            Some(table) => {
755                let rename_table_input = TableInput::builder()
756                    .name(dest_table_name)
757                    .set_parameters(table.parameters.clone())
758                    .set_storage_descriptor(table.storage_descriptor.clone())
759                    .set_table_type(table.table_type.clone())
760                    .set_description(table.description.clone())
761                    .build()
762                    .map_err(from_aws_build_error)?;
763
764                let builder = self
765                    .client
766                    .0
767                    .create_table()
768                    .database_name(&dest_db_name)
769                    .table_input(rename_table_input);
770                let builder = with_catalog_id!(builder, self.config);
771
772                builder.send().await.map_err(from_aws_sdk_error)?;
773
774                let drop_src_table_result = self.drop_table(src).await;
775
776                match drop_src_table_result {
777                    Ok(_) => Ok(()),
778                    Err(_) => {
779                        let err_msg_src_table =
780                            format!("Failed to drop old table {src_db_name}.{src_table_name}.");
781
782                        let drop_dest_table_result = self.drop_table(dest).await;
783
784                        match drop_dest_table_result {
785                            Ok(_) => Err(Error::new(
786                                ErrorKind::Unexpected,
787                                format!(
788                                    "{err_msg_src_table} Rolled back table creation for {dest_db_name}.{dest_table_name}."
789                                ),
790                            )),
791                            Err(_) => Err(Error::new(
792                                ErrorKind::Unexpected,
793                                format!(
794                                    "{err_msg_src_table} Failed to roll back table creation for {dest_db_name}.{dest_table_name}. Please clean up manually."
795                                ),
796                            )),
797                        }
798                    }
799                }
800            }
801        }
802    }
803
804    /// registers an existing table into the Glue Catalog.
805    ///
806    /// Converts the provided table identifier and metadata location into a
807    /// Glue-compatible table representation, and attempts to create the
808    /// corresponding table in the Glue Catalog.
809    ///
810    /// # Returns
811    /// Returns `Ok(Table)` if the table is successfully registered and loaded.
812    /// If the registration fails due to validation issues, existing table conflicts,
813    /// metadata problems, or errors during the registration or loading process,
814    /// an `Err(...)` is returned.
815    async fn register_table(
816        &self,
817        table_ident: &TableIdent,
818        metadata_location: String,
819    ) -> Result<Table> {
820        let db_name = validate_namespace(table_ident.namespace())?;
821        let table_name = table_ident.name();
822        let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
823
824        let table_input = convert_to_glue_table(
825            table_name,
826            metadata_location.clone(),
827            &metadata,
828            metadata.properties(),
829            None,
830        )?;
831
832        let builder = self
833            .client
834            .0
835            .create_table()
836            .database_name(&db_name)
837            .table_input(table_input);
838        let builder = with_catalog_id!(builder, self.config);
839
840        builder.send().await.map_err(|e| {
841            let error = e.into_service_error();
842            match error {
843                CreateTableError::EntityNotFoundException(_) => Error::new(
844                    ErrorKind::NamespaceNotFound,
845                    format!("Database {db_name} does not exist"),
846                ),
847                CreateTableError::AlreadyExistsException(_) => Error::new(
848                    ErrorKind::TableAlreadyExists,
849                    format!("Table {table_ident} already exists"),
850                ),
851                _ => Error::new(
852                    ErrorKind::Unexpected,
853                    format!("Failed to register table {table_ident} due to AWS SDK error"),
854                ),
855            }
856            .with_source(anyhow!("aws sdk error: {error:?}"))
857        })?;
858
859        Ok(Table::builder()
860            .identifier(table_ident.clone())
861            .metadata_location(metadata_location)
862            .metadata(metadata)
863            .file_io(self.file_io())
864            .runtime(self.runtime.clone())
865            .build()?)
866    }
867
868    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
869        let table_ident = commit.identifier().clone();
870        let table_namespace = validate_namespace(table_ident.namespace())?;
871
872        let (current_table, current_version_id) =
873            self.load_table_with_version_id(&table_ident).await?;
874        let current_metadata_location = current_table.metadata_location_result()?.to_string();
875
876        let staged_table = commit.apply(current_table)?;
877        let staged_metadata_location_str = staged_table.metadata_location_result()?;
878        let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
879
880        // Write new metadata
881        staged_table
882            .metadata()
883            .write_to(staged_table.file_io(), &staged_metadata_location)
884            .await?;
885
886        // Persist staged table to Glue with optimistic locking
887        let mut builder = self
888            .client
889            .0
890            .update_table()
891            .database_name(table_namespace)
892            .set_skip_archive(Some(true)) // todo make this configurable
893            .table_input(convert_to_glue_table(
894                table_ident.name(),
895                staged_metadata_location.to_string(),
896                staged_table.metadata(),
897                staged_table.metadata().properties(),
898                Some(current_metadata_location),
899            )?);
900
901        // Add VersionId for optimistic locking
902        if let Some(version_id) = current_version_id {
903            builder = builder.version_id(version_id);
904        }
905
906        let builder = with_catalog_id!(builder, self.config);
907        let _ = builder.send().await.map_err(|e| {
908            let error = e.into_service_error();
909            match error {
910                UpdateTableError::EntityNotFoundException(_) => Error::new(
911                    ErrorKind::TableNotFound,
912                    format!("Table {table_ident} is not found"),
913                ),
914                UpdateTableError::ConcurrentModificationException(_) => Error::new(
915                    ErrorKind::CatalogCommitConflicts,
916                    format!("Commit failed for table: {table_ident}"),
917                )
918                .with_retryable(true),
919                _ => Error::new(
920                    ErrorKind::Unexpected,
921                    format!("Operation failed for table: {table_ident} for hitting aws sdk error"),
922                ),
923            }
924            .with_source(anyhow!("aws sdk error: {error:?}"))
925        })?;
926
927        Ok(staged_table)
928    }
929}