iceberg_catalog_glue/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use anyhow::anyhow;
24use async_trait::async_trait;
25use aws_sdk_glue::operation::create_table::CreateTableError;
26use aws_sdk_glue::operation::update_table::UpdateTableError;
27use aws_sdk_glue::types::TableInput;
28use iceberg::io::{
29    FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
30    S3_SESSION_TOKEN, StorageFactory,
31};
32use iceberg::spec::{TableMetadata, TableMetadataBuilder};
33use iceberg::table::Table;
34use iceberg::{
35    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
36    Runtime, TableCommit, TableCreation, TableIdent,
37};
38use iceberg_storage_opendal::OpenDalStorageFactory;
39
40use crate::error::{from_aws_build_error, from_aws_sdk_error};
41use crate::utils::{
42    convert_to_database, convert_to_glue_table, convert_to_namespace, create_sdk_config,
43    get_default_table_location, get_metadata_location, is_iceberg_table, validate_namespace,
44};
45use crate::{
46    AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, with_catalog_id,
47};
48
49/// Glue catalog URI
50pub const GLUE_CATALOG_PROP_URI: &str = "uri";
51/// Glue catalog id
52pub const GLUE_CATALOG_PROP_CATALOG_ID: &str = "catalog_id";
53/// Glue catalog warehouse location
54pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
55
56/// Builder for [`GlueCatalog`].
57#[derive(Debug)]
58pub struct GlueCatalogBuilder {
59    config: GlueCatalogConfig,
60    storage_factory: Option<Arc<dyn StorageFactory>>,
61    runtime: Option<Runtime>,
62}
63
64impl Default for GlueCatalogBuilder {
65    fn default() -> Self {
66        Self {
67            config: GlueCatalogConfig {
68                name: None,
69                uri: None,
70                catalog_id: None,
71                warehouse: "".to_string(),
72                props: HashMap::new(),
73            },
74            storage_factory: None,
75            runtime: None,
76        }
77    }
78}
79
80impl CatalogBuilder for GlueCatalogBuilder {
81    type C = GlueCatalog;
82
83    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
84        self.storage_factory = Some(storage_factory);
85        self
86    }
87
88    fn with_runtime(mut self, runtime: Runtime) -> Self {
89        self.runtime = Some(runtime);
90        self
91    }
92
93    fn load(
94        mut self,
95        name: impl Into<String>,
96        props: HashMap<String, String>,
97    ) -> impl Future<Output = Result<Self::C>> + Send {
98        self.config.name = Some(name.into());
99
100        if props.contains_key(GLUE_CATALOG_PROP_URI) {
101            self.config.uri = props.get(GLUE_CATALOG_PROP_URI).cloned()
102        }
103
104        if props.contains_key(GLUE_CATALOG_PROP_CATALOG_ID) {
105            self.config.catalog_id = props.get(GLUE_CATALOG_PROP_CATALOG_ID).cloned()
106        }
107
108        if props.contains_key(GLUE_CATALOG_PROP_WAREHOUSE) {
109            self.config.warehouse = props
110                .get(GLUE_CATALOG_PROP_WAREHOUSE)
111                .cloned()
112                .unwrap_or_default();
113        }
114
115        // Collect other remaining properties
116        self.config.props = props
117            .into_iter()
118            .filter(|(k, _)| {
119                k != GLUE_CATALOG_PROP_URI
120                    && k != GLUE_CATALOG_PROP_CATALOG_ID
121                    && k != GLUE_CATALOG_PROP_WAREHOUSE
122            })
123            .collect();
124
125        async move {
126            if self.config.name.is_none() {
127                return Err(Error::new(
128                    ErrorKind::DataInvalid,
129                    "Catalog name is required",
130                ));
131            }
132            if self.config.warehouse.is_empty() {
133                return Err(Error::new(
134                    ErrorKind::DataInvalid,
135                    "Catalog warehouse is required",
136                ));
137            }
138
139            let runtime = match self.runtime {
140                Some(rt) => rt,
141                None => Runtime::try_current()?,
142            };
143            GlueCatalog::new(self.config, self.storage_factory, runtime).await
144        }
145    }
146}
147
148#[derive(Debug)]
149/// Glue Catalog configuration
150pub(crate) struct GlueCatalogConfig {
151    name: Option<String>,
152    uri: Option<String>,
153    catalog_id: Option<String>,
154    warehouse: String,
155    props: HashMap<String, String>,
156}
157
158struct GlueClient(aws_sdk_glue::Client);
159
160/// Glue Catalog
161pub struct GlueCatalog {
162    config: GlueCatalogConfig,
163    client: GlueClient,
164    file_io: FileIO,
165    runtime: Runtime,
166}
167
168impl Debug for GlueCatalog {
169    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170        f.debug_struct("GlueCatalog")
171            .field("config", &self.config)
172            .finish_non_exhaustive()
173    }
174}
175
176impl GlueCatalog {
177    /// Create a new glue catalog
178    async fn new(
179        config: GlueCatalogConfig,
180        storage_factory: Option<Arc<dyn StorageFactory>>,
181        runtime: Runtime,
182    ) -> Result<Self> {
183        let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await;
184        let mut file_io_props = config.props.clone();
185        if !file_io_props.contains_key(S3_ACCESS_KEY_ID)
186            && let Some(access_key_id) = file_io_props.get(AWS_ACCESS_KEY_ID)
187        {
188            file_io_props.insert(S3_ACCESS_KEY_ID.to_string(), access_key_id.to_string());
189        }
190        if !file_io_props.contains_key(S3_SECRET_ACCESS_KEY)
191            && let Some(secret_access_key) = file_io_props.get(AWS_SECRET_ACCESS_KEY)
192        {
193            file_io_props.insert(
194                S3_SECRET_ACCESS_KEY.to_string(),
195                secret_access_key.to_string(),
196            );
197        }
198        if !file_io_props.contains_key(S3_REGION)
199            && let Some(region) = file_io_props.get(AWS_REGION_NAME)
200        {
201            file_io_props.insert(S3_REGION.to_string(), region.to_string());
202        }
203        if !file_io_props.contains_key(S3_SESSION_TOKEN)
204            && let Some(session_token) = file_io_props.get(AWS_SESSION_TOKEN)
205        {
206            file_io_props.insert(S3_SESSION_TOKEN.to_string(), session_token.to_string());
207        }
208        if !file_io_props.contains_key(S3_ENDPOINT)
209            && let Some(aws_endpoint) = config.uri.as_ref()
210        {
211            file_io_props.insert(S3_ENDPOINT.to_string(), aws_endpoint.to_string());
212        }
213
214        let client = aws_sdk_glue::Client::new(&sdk_config);
215
216        // Use provided factory or default to OpenDalStorageFactory::S3
217        let factory = storage_factory.unwrap_or_else(|| {
218            Arc::new(OpenDalStorageFactory::S3 {
219                customized_credential_load: None,
220            })
221        });
222        let file_io = FileIOBuilder::new(factory)
223            .with_props(file_io_props)
224            .build();
225
226        Ok(GlueCatalog {
227            config,
228            client: GlueClient(client),
229            file_io,
230            runtime,
231        })
232    }
233    /// Get the catalogs `FileIO`
234    pub fn file_io(&self) -> FileIO {
235        self.file_io.clone()
236    }
237
238    /// Loads a table from the Glue Catalog along with its version_id for optimistic locking.
239    ///
240    /// # Returns
241    /// A `Result` wrapping a tuple of (`Table`, `Option<String>`) where the String is the version_id
242    /// from Glue that should be used for optimistic concurrency control when updating the table.
243    ///
244    /// # Errors
245    /// This function may return an error in several scenarios, including:
246    /// - Failure to validate the namespace.
247    /// - Failure to retrieve the table from the Glue Catalog.
248    /// - Absence of metadata location information in the table's properties.
249    /// - Issues reading or deserializing the table's metadata file.
250    async fn load_table_with_version_id(
251        &self,
252        table: &TableIdent,
253    ) -> Result<(Table, Option<String>)> {
254        let db_name = validate_namespace(table.namespace())?;
255        let table_name = table.name();
256
257        let builder = self
258            .client
259            .0
260            .get_table()
261            .database_name(&db_name)
262            .name(table_name);
263        let builder = with_catalog_id!(builder, self.config);
264
265        let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
266
267        let glue_table = glue_table_output.table().ok_or_else(|| {
268            Error::new(
269                ErrorKind::TableNotFound,
270                format!(
271                    "Table object for database: {db_name} and table: {table_name} does not exist"
272                ),
273            )
274        })?;
275
276        let version_id = glue_table.version_id.clone();
277        let metadata_location = get_metadata_location(&glue_table.parameters)?;
278
279        let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
280
281        let table = Table::builder()
282            .file_io(self.file_io())
283            .metadata_location(metadata_location)
284            .metadata(metadata)
285            .identifier(TableIdent::new(
286                NamespaceIdent::new(db_name),
287                table_name.to_owned(),
288            ))
289            .runtime(self.runtime.clone())
290            .build()?;
291
292        Ok((table, version_id))
293    }
294}
295
296#[async_trait]
297impl Catalog for GlueCatalog {
298    /// List namespaces from glue catalog.
299    ///
300    /// Glue doesn't support nested namespaces.
301    /// We will return an empty list if parent is some.
302    async fn list_namespaces(
303        &self,
304        parent: Option<&NamespaceIdent>,
305    ) -> Result<Vec<NamespaceIdent>> {
306        if parent.is_some() {
307            return Ok(vec![]);
308        }
309
310        let mut database_list: Vec<NamespaceIdent> = Vec::new();
311        let mut next_token: Option<String> = None;
312
313        loop {
314            let builder = match &next_token {
315                Some(token) => self.client.0.get_databases().next_token(token),
316                None => self.client.0.get_databases(),
317            };
318            let builder = with_catalog_id!(builder, self.config);
319            let resp = builder.send().await.map_err(from_aws_sdk_error)?;
320
321            let dbs: Vec<NamespaceIdent> = resp
322                .database_list()
323                .iter()
324                .map(|db| NamespaceIdent::new(db.name().to_string()))
325                .collect();
326
327            database_list.extend(dbs);
328
329            next_token = resp.next_token().map(ToOwned::to_owned);
330            if next_token.is_none() {
331                break;
332            }
333        }
334
335        Ok(database_list)
336    }
337
338    /// Creates a new namespace with the given identifier and properties.
339    ///
340    /// Attempts to create a namespace defined by the `namespace`
341    /// parameter and configured with the specified `properties`.
342    ///
343    /// This function can return an error in the following situations:
344    ///
345    /// - Errors from `validate_namespace` if the namespace identifier does not
346    /// meet validation criteria.
347    /// - Errors from `convert_to_database` if the properties cannot be
348    /// successfully converted into a database configuration.
349    /// - Errors from the underlying database creation process, converted using
350    /// `from_sdk_error`.
351    async fn create_namespace(
352        &self,
353        namespace: &NamespaceIdent,
354        properties: HashMap<String, String>,
355    ) -> Result<Namespace> {
356        if self.namespace_exists(namespace).await? {
357            return Err(Error::new(
358                ErrorKind::NamespaceAlreadyExists,
359                format!("Namespace {namespace:?} already exists"),
360            ));
361        }
362
363        let db_input = convert_to_database(namespace, &properties)?;
364
365        let builder = self.client.0.create_database().database_input(db_input);
366        let builder = with_catalog_id!(builder, self.config);
367
368        builder.send().await.map_err(from_aws_sdk_error)?;
369
370        Ok(Namespace::with_properties(namespace.clone(), properties))
371    }
372
373    /// Retrieves a namespace by its identifier.
374    ///
375    /// Validates the given namespace identifier and then queries the
376    /// underlying database client to fetch the corresponding namespace data.
377    /// Constructs a `Namespace` object with the retrieved data and returns it.
378    ///
379    /// This function can return an error in any of the following situations:
380    /// - If the provided namespace identifier fails validation checks
381    /// - If there is an error querying the database, returned by
382    /// `from_sdk_error`.
383    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
384        let db_name = validate_namespace(namespace)?;
385
386        let builder = self.client.0.get_database().name(&db_name);
387        let builder = with_catalog_id!(builder, self.config);
388
389        let resp = builder.send().await.map_err(|err| {
390            if err
391                .as_service_error()
392                .map(|e| e.is_entity_not_found_exception())
393                == Some(true)
394            {
395                return Error::new(
396                    ErrorKind::NamespaceNotFound,
397                    format!("Namespace {namespace:?} does not exist"),
398                );
399            }
400            from_aws_sdk_error(err)
401        })?;
402
403        match resp.database() {
404            Some(db) => {
405                let namespace = convert_to_namespace(db);
406                Ok(namespace)
407            }
408            None => Err(Error::new(
409                ErrorKind::NamespaceNotFound,
410                format!("Database with name: {db_name} does not exist"),
411            )),
412        }
413    }
414
415    /// Checks if a namespace exists within the Glue Catalog.
416    ///
417    /// Validates the namespace identifier by querying the Glue Catalog
418    /// to determine if the specified namespace (database) exists.
419    ///
420    /// # Returns
421    /// A `Result<bool>` indicating the outcome of the check:
422    /// - `Ok(true)` if the namespace exists.
423    /// - `Ok(false)` if the namespace does not exist, identified by a specific
424    /// `EntityNotFoundException` variant.
425    /// - `Err(...)` if an error occurs during validation or the Glue Catalog
426    /// query, with the error encapsulating the issue.
427    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
428        let db_name = validate_namespace(namespace)?;
429
430        let builder = self.client.0.get_database().name(&db_name);
431        let builder = with_catalog_id!(builder, self.config);
432
433        let resp = builder.send().await;
434
435        match resp {
436            Ok(_) => Ok(true),
437            Err(err) => {
438                if err
439                    .as_service_error()
440                    .map(|e| e.is_entity_not_found_exception())
441                    == Some(true)
442                {
443                    return Ok(false);
444                }
445                Err(from_aws_sdk_error(err))
446            }
447        }
448    }
449
450    /// Asynchronously updates properties of an existing namespace.
451    ///
452    /// Converts the given namespace identifier and properties into a database
453    /// representation and then attempts to update the corresponding namespace
454    /// in the Glue Catalog.
455    ///
456    /// # Returns
457    /// Returns `Ok(())` if the namespace update is successful. If the
458    /// namespace cannot be updated due to missing information or an error
459    /// during the update process, an `Err(...)` is returned.
460    async fn update_namespace(
461        &self,
462        namespace: &NamespaceIdent,
463        properties: HashMap<String, String>,
464    ) -> Result<()> {
465        if !self.namespace_exists(namespace).await? {
466            return Err(Error::new(
467                ErrorKind::NamespaceNotFound,
468                format!("Namespace {namespace:?} does not exist"),
469            ));
470        }
471
472        let db_name = validate_namespace(namespace)?;
473        let db_input = convert_to_database(namespace, &properties)?;
474
475        let builder = self
476            .client
477            .0
478            .update_database()
479            .name(&db_name)
480            .database_input(db_input);
481        let builder = with_catalog_id!(builder, self.config);
482
483        builder.send().await.map_err(from_aws_sdk_error)?;
484
485        Ok(())
486    }
487
488    /// Asynchronously drops a namespace from the Glue Catalog.
489    ///
490    /// Checks if the namespace is empty. If it still contains tables the
491    /// namespace will not be dropped, but an error is returned instead.
492    ///
493    /// # Returns
494    /// A `Result<()>` indicating the outcome:
495    /// - `Ok(())` signifies successful namespace deletion.
496    /// - `Err(...)` signifies failure to drop the namespace due to validation
497    /// errors, connectivity issues, or Glue Catalog constraints.
498    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
499        if !self.namespace_exists(namespace).await? {
500            return Err(Error::new(
501                ErrorKind::NamespaceNotFound,
502                format!("Namespace {namespace:?} does not exist"),
503            ));
504        }
505
506        let db_name = validate_namespace(namespace)?;
507
508        // Check for ANY Glue table in the database, not just Iceberg tables.
509        // Glue's `delete_database` will fail if any table (Iceberg or not) is
510        // still present, and `list_tables` only returns Iceberg tables, so we
511        // query Glue directly here.
512        let builder = self
513            .client
514            .0
515            .get_tables()
516            .database_name(&db_name)
517            .max_results(1);
518        let builder = with_catalog_id!(builder, self.config);
519        let resp = builder.send().await.map_err(from_aws_sdk_error)?;
520
521        if !resp.table_list().is_empty() {
522            return Err(Error::new(
523                ErrorKind::DataInvalid,
524                format!("Database with name: {} is not empty", &db_name),
525            ));
526        }
527
528        let builder = self.client.0.delete_database().name(db_name);
529        let builder = with_catalog_id!(builder, self.config);
530
531        builder.send().await.map_err(from_aws_sdk_error)?;
532
533        Ok(())
534    }
535
536    /// Asynchronously lists all Iceberg tables within a specified namespace.
537    ///
538    /// Glue databases may contain a mix of Iceberg and non-Iceberg tables
539    /// (e.g. plain Hive tables). Only tables whose `table_type` parameter is
540    /// set to `ICEBERG` (case-insensitive) are returned
541    ///
542    /// # Returns
543    /// A `Result<Vec<TableIdent>>`, which is:
544    /// - `Ok(vec![...])` containing a vector of `TableIdent` instances, each
545    /// representing an Iceberg table within the specified namespace.
546    /// - `Err(...)` if an error occurs during namespace validation or while
547    /// querying the database.
548    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
549        let db_name = validate_namespace(namespace)?;
550
551        let mut table_list: Vec<TableIdent> = Vec::new();
552        let mut next_token: Option<String> = None;
553
554        loop {
555            let builder = match &next_token {
556                Some(token) => self
557                    .client
558                    .0
559                    .get_tables()
560                    .database_name(&db_name)
561                    .next_token(token),
562                None => self.client.0.get_tables().database_name(&db_name),
563            };
564            let builder = with_catalog_id!(builder, self.config);
565            let resp = builder.send().await.map_err(from_aws_sdk_error)?;
566
567            let tables: Vec<_> = resp
568                .table_list()
569                .iter()
570                .filter(|tbl| is_iceberg_table(&tbl.parameters))
571                .map(|tbl| TableIdent::new(namespace.clone(), tbl.name().to_string()))
572                .collect();
573
574            table_list.extend(tables);
575
576            next_token = resp.next_token().map(ToOwned::to_owned);
577            if next_token.is_none() {
578                break;
579            }
580        }
581
582        Ok(table_list)
583    }
584
585    /// Creates a new table within a specified namespace using the provided
586    /// table creation settings.
587    ///
588    /// # Returns
589    /// A `Result` wrapping a `Table` object representing the newly created
590    /// table.
591    ///
592    /// # Errors
593    /// This function may return an error in several cases, including invalid
594    /// namespace identifiers, failure to determine a default storage location,
595    /// issues generating or writing table metadata, and errors communicating
596    /// with the Glue Catalog.
597    async fn create_table(
598        &self,
599        namespace: &NamespaceIdent,
600        mut creation: TableCreation,
601    ) -> Result<Table> {
602        let db_name = validate_namespace(namespace)?;
603        let table_name = creation.name.clone();
604
605        let location = match &creation.location {
606            Some(location) => location.clone(),
607            None => {
608                let ns = self.get_namespace(namespace).await?;
609                let location =
610                    get_default_table_location(&ns, &db_name, &table_name, &self.config.warehouse);
611                creation.location = Some(location.clone());
612                location
613            }
614        };
615        let metadata = TableMetadataBuilder::from_table_creation(creation)?
616            .build()?
617            .metadata;
618        let metadata_location = MetadataLocation::new_with_metadata(location.clone(), &metadata);
619
620        metadata.write_to(&self.file_io, &metadata_location).await?;
621
622        let metadata_location_str = metadata_location.to_string();
623        let glue_table = convert_to_glue_table(
624            &table_name,
625            metadata_location_str.clone(),
626            &metadata,
627            metadata.properties(),
628            None,
629        )?;
630
631        let builder = self
632            .client
633            .0
634            .create_table()
635            .database_name(&db_name)
636            .table_input(glue_table);
637        let builder = with_catalog_id!(builder, self.config);
638
639        builder.send().await.map_err(from_aws_sdk_error)?;
640
641        Table::builder()
642            .file_io(self.file_io())
643            .metadata_location(metadata_location_str)
644            .metadata(metadata)
645            .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
646            .runtime(self.runtime.clone())
647            .build()
648    }
649
650    /// Loads a table from the Glue Catalog and constructs a `Table` object
651    /// based on its metadata.
652    ///
653    /// # Returns
654    /// A `Result` wrapping a `Table` object that represents the loaded table.
655    ///
656    /// # Errors
657    /// This function may return an error in several scenarios, including:
658    /// - Failure to validate the namespace.
659    /// - Failure to retrieve the table from the Glue Catalog.
660    /// - Absence of metadata location information in the table's properties.
661    /// - Issues reading or deserializing the table's metadata file.
662    async fn load_table(&self, table: &TableIdent) -> Result<Table> {
663        let (table, _) = self.load_table_with_version_id(table).await?;
664        Ok(table)
665    }
666
667    /// Asynchronously drops a table from the database.
668    ///
669    /// # Errors
670    /// Returns an error if:
671    /// - The namespace provided in `table` cannot be validated
672    /// or does not exist.
673    /// - The underlying database client encounters an error while
674    /// attempting to drop the table. This includes scenarios where
675    /// the table does not exist.
676    /// - Any network or communication error occurs with the database backend.
677    async fn drop_table(&self, table: &TableIdent) -> Result<()> {
678        let db_name = validate_namespace(table.namespace())?;
679        let table_name = table.name();
680
681        let builder = self
682            .client
683            .0
684            .delete_table()
685            .database_name(&db_name)
686            .name(table_name);
687        let builder = with_catalog_id!(builder, self.config);
688
689        builder.send().await.map_err(from_aws_sdk_error)?;
690
691        Ok(())
692    }
693
694    async fn purge_table(&self, table: &TableIdent) -> Result<()> {
695        let table_info = self.load_table(table).await?;
696        self.drop_table(table).await?;
697        iceberg::drop_table_data(&table_info).await
698    }
699
700    /// Asynchronously checks the existence of a specified table
701    /// in the database.
702    ///
703    /// # Returns
704    /// - `Ok(true)` if the table exists in the database.
705    /// - `Ok(false)` if the table does not exist in the database.
706    /// - `Err(...)` if an error occurs during the process
707    async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
708        let db_name = validate_namespace(table.namespace())?;
709        let table_name = table.name();
710
711        let builder = self
712            .client
713            .0
714            .get_table()
715            .database_name(&db_name)
716            .name(table_name);
717        let builder = with_catalog_id!(builder, self.config);
718
719        let resp = builder.send().await;
720
721        match resp {
722            Ok(_) => Ok(true),
723            Err(err) => {
724                if err
725                    .as_service_error()
726                    .map(|e| e.is_entity_not_found_exception())
727                    == Some(true)
728                {
729                    return Ok(false);
730                }
731                Err(from_aws_sdk_error(err))
732            }
733        }
734    }
735
736    /// Asynchronously renames a table within the database
737    /// or moves it between namespaces (databases).
738    ///
739    /// # Returns
740    /// - `Ok(())` on successful rename or move of the table.
741    /// - `Err(...)` if an error occurs during the process.
742    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
743        let src_db_name = validate_namespace(src.namespace())?;
744        let dest_db_name = validate_namespace(dest.namespace())?;
745
746        let src_table_name = src.name();
747        let dest_table_name = dest.name();
748
749        let builder = self
750            .client
751            .0
752            .get_table()
753            .database_name(&src_db_name)
754            .name(src_table_name);
755        let builder = with_catalog_id!(builder, self.config);
756
757        let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
758
759        match glue_table_output.table() {
760            None => Err(Error::new(
761                ErrorKind::TableNotFound,
762                format!(
763                    "'Table' object for database: {src_db_name} and table: {src_table_name} does not exist"
764                ),
765            )),
766            Some(table) => {
767                let rename_table_input = TableInput::builder()
768                    .name(dest_table_name)
769                    .set_parameters(table.parameters.clone())
770                    .set_storage_descriptor(table.storage_descriptor.clone())
771                    .set_table_type(table.table_type.clone())
772                    .set_description(table.description.clone())
773                    .build()
774                    .map_err(from_aws_build_error)?;
775
776                let builder = self
777                    .client
778                    .0
779                    .create_table()
780                    .database_name(&dest_db_name)
781                    .table_input(rename_table_input);
782                let builder = with_catalog_id!(builder, self.config);
783
784                builder.send().await.map_err(from_aws_sdk_error)?;
785
786                let drop_src_table_result = self.drop_table(src).await;
787
788                match drop_src_table_result {
789                    Ok(_) => Ok(()),
790                    Err(_) => {
791                        let err_msg_src_table =
792                            format!("Failed to drop old table {src_db_name}.{src_table_name}.");
793
794                        let drop_dest_table_result = self.drop_table(dest).await;
795
796                        match drop_dest_table_result {
797                            Ok(_) => Err(Error::new(
798                                ErrorKind::Unexpected,
799                                format!(
800                                    "{err_msg_src_table} Rolled back table creation for {dest_db_name}.{dest_table_name}."
801                                ),
802                            )),
803                            Err(_) => Err(Error::new(
804                                ErrorKind::Unexpected,
805                                format!(
806                                    "{err_msg_src_table} Failed to roll back table creation for {dest_db_name}.{dest_table_name}. Please clean up manually."
807                                ),
808                            )),
809                        }
810                    }
811                }
812            }
813        }
814    }
815
816    /// registers an existing table into the Glue Catalog.
817    ///
818    /// Converts the provided table identifier and metadata location into a
819    /// Glue-compatible table representation, and attempts to create the
820    /// corresponding table in the Glue Catalog.
821    ///
822    /// # Returns
823    /// Returns `Ok(Table)` if the table is successfully registered and loaded.
824    /// If the registration fails due to validation issues, existing table conflicts,
825    /// metadata problems, or errors during the registration or loading process,
826    /// an `Err(...)` is returned.
827    async fn register_table(
828        &self,
829        table_ident: &TableIdent,
830        metadata_location: String,
831    ) -> Result<Table> {
832        let db_name = validate_namespace(table_ident.namespace())?;
833        let table_name = table_ident.name();
834        let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
835
836        let table_input = convert_to_glue_table(
837            table_name,
838            metadata_location.clone(),
839            &metadata,
840            metadata.properties(),
841            None,
842        )?;
843
844        let builder = self
845            .client
846            .0
847            .create_table()
848            .database_name(&db_name)
849            .table_input(table_input);
850        let builder = with_catalog_id!(builder, self.config);
851
852        builder.send().await.map_err(|e| {
853            let error = e.into_service_error();
854            match error {
855                CreateTableError::EntityNotFoundException(_) => Error::new(
856                    ErrorKind::NamespaceNotFound,
857                    format!("Database {db_name} does not exist"),
858                ),
859                CreateTableError::AlreadyExistsException(_) => Error::new(
860                    ErrorKind::TableAlreadyExists,
861                    format!("Table {table_ident} already exists"),
862                ),
863                _ => Error::new(
864                    ErrorKind::Unexpected,
865                    format!("Failed to register table {table_ident} due to AWS SDK error"),
866                ),
867            }
868            .with_source(anyhow!("aws sdk error: {error:?}"))
869        })?;
870
871        Ok(Table::builder()
872            .identifier(table_ident.clone())
873            .metadata_location(metadata_location)
874            .metadata(metadata)
875            .file_io(self.file_io())
876            .runtime(self.runtime.clone())
877            .build()?)
878    }
879
880    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
881        let table_ident = commit.identifier().clone();
882        let table_namespace = validate_namespace(table_ident.namespace())?;
883
884        let (current_table, current_version_id) =
885            self.load_table_with_version_id(&table_ident).await?;
886        let current_metadata_location = current_table.metadata_location_result()?.to_string();
887
888        let staged_table = commit.apply(current_table)?;
889        let staged_metadata_location_str = staged_table.metadata_location_result()?;
890        let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
891
892        // Write new metadata
893        staged_table
894            .metadata()
895            .write_to(staged_table.file_io(), &staged_metadata_location)
896            .await?;
897
898        // Persist staged table to Glue with optimistic locking
899        let mut builder = self
900            .client
901            .0
902            .update_table()
903            .database_name(table_namespace)
904            .set_skip_archive(Some(true)) // todo make this configurable
905            .table_input(convert_to_glue_table(
906                table_ident.name(),
907                staged_metadata_location.to_string(),
908                staged_table.metadata(),
909                staged_table.metadata().properties(),
910                Some(current_metadata_location),
911            )?);
912
913        // Add VersionId for optimistic locking
914        if let Some(version_id) = current_version_id {
915            builder = builder.version_id(version_id);
916        }
917
918        let builder = with_catalog_id!(builder, self.config);
919        let _ = builder.send().await.map_err(|e| {
920            let error = e.into_service_error();
921            match error {
922                UpdateTableError::EntityNotFoundException(_) => Error::new(
923                    ErrorKind::TableNotFound,
924                    format!("Table {table_ident} is not found"),
925                ),
926                UpdateTableError::ConcurrentModificationException(_) => Error::new(
927                    ErrorKind::CatalogCommitConflicts,
928                    format!("Commit failed for table: {table_ident}"),
929                )
930                .with_retryable(true),
931                _ => Error::new(
932                    ErrorKind::Unexpected,
933                    format!("Operation failed for table: {table_ident} for hitting aws sdk error"),
934                ),
935            }
936            .with_source(anyhow!("aws sdk error: {error:?}"))
937        })?;
938
939        Ok(staged_table)
940    }
941}