iceberg_catalog_glue/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20
21use anyhow::anyhow;
22use async_trait::async_trait;
23use aws_sdk_glue::operation::create_table::CreateTableError;
24use aws_sdk_glue::operation::update_table::UpdateTableError;
25use aws_sdk_glue::types::TableInput;
26use iceberg::io::{
27    FileIO, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN,
28};
29use iceberg::spec::{TableMetadata, TableMetadataBuilder};
30use iceberg::table::Table;
31use iceberg::{
32    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
33    TableCommit, TableCreation, TableIdent,
34};
35
36use crate::error::{from_aws_build_error, from_aws_sdk_error};
37use crate::utils::{
38    convert_to_database, convert_to_glue_table, convert_to_namespace, create_sdk_config,
39    get_default_table_location, get_metadata_location, validate_namespace,
40};
41use crate::{
42    AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, with_catalog_id,
43};
44
45/// Glue catalog URI
46pub const GLUE_CATALOG_PROP_URI: &str = "uri";
47/// Glue catalog id
48pub const GLUE_CATALOG_PROP_CATALOG_ID: &str = "catalog_id";
49/// Glue catalog warehouse location
50pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
51
52/// Builder for [`GlueCatalog`].
53#[derive(Debug)]
54pub struct GlueCatalogBuilder(GlueCatalogConfig);
55
56impl Default for GlueCatalogBuilder {
57    fn default() -> Self {
58        Self(GlueCatalogConfig {
59            name: None,
60            uri: None,
61            catalog_id: None,
62            warehouse: "".to_string(),
63            props: HashMap::new(),
64        })
65    }
66}
67
68impl CatalogBuilder for GlueCatalogBuilder {
69    type C = GlueCatalog;
70
71    fn load(
72        mut self,
73        name: impl Into<String>,
74        props: HashMap<String, String>,
75    ) -> impl Future<Output = Result<Self::C>> + Send {
76        self.0.name = Some(name.into());
77
78        if props.contains_key(GLUE_CATALOG_PROP_URI) {
79            self.0.uri = props.get(GLUE_CATALOG_PROP_URI).cloned()
80        }
81
82        if props.contains_key(GLUE_CATALOG_PROP_CATALOG_ID) {
83            self.0.catalog_id = props.get(GLUE_CATALOG_PROP_CATALOG_ID).cloned()
84        }
85
86        if props.contains_key(GLUE_CATALOG_PROP_WAREHOUSE) {
87            self.0.warehouse = props
88                .get(GLUE_CATALOG_PROP_WAREHOUSE)
89                .cloned()
90                .unwrap_or_default();
91        }
92
93        // Collect other remaining properties
94        self.0.props = props
95            .into_iter()
96            .filter(|(k, _)| {
97                k != GLUE_CATALOG_PROP_URI
98                    && k != GLUE_CATALOG_PROP_CATALOG_ID
99                    && k != GLUE_CATALOG_PROP_WAREHOUSE
100            })
101            .collect();
102
103        async move {
104            if self.0.name.is_none() {
105                return Err(Error::new(
106                    ErrorKind::DataInvalid,
107                    "Catalog name is required",
108                ));
109            }
110            if self.0.warehouse.is_empty() {
111                return Err(Error::new(
112                    ErrorKind::DataInvalid,
113                    "Catalog warehouse is required",
114                ));
115            }
116
117            GlueCatalog::new(self.0).await
118        }
119    }
120}
121
122#[derive(Debug)]
123/// Glue Catalog configuration
124pub(crate) struct GlueCatalogConfig {
125    name: Option<String>,
126    uri: Option<String>,
127    catalog_id: Option<String>,
128    warehouse: String,
129    props: HashMap<String, String>,
130}
131
132struct GlueClient(aws_sdk_glue::Client);
133
134/// Glue Catalog
135pub struct GlueCatalog {
136    config: GlueCatalogConfig,
137    client: GlueClient,
138    file_io: FileIO,
139}
140
141impl Debug for GlueCatalog {
142    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143        f.debug_struct("GlueCatalog")
144            .field("config", &self.config)
145            .finish_non_exhaustive()
146    }
147}
148
149impl GlueCatalog {
150    /// Create a new glue catalog
151    async fn new(config: GlueCatalogConfig) -> Result<Self> {
152        let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await;
153        let mut file_io_props = config.props.clone();
154        if !file_io_props.contains_key(S3_ACCESS_KEY_ID) {
155            if let Some(access_key_id) = file_io_props.get(AWS_ACCESS_KEY_ID) {
156                file_io_props.insert(S3_ACCESS_KEY_ID.to_string(), access_key_id.to_string());
157            }
158        }
159        if !file_io_props.contains_key(S3_SECRET_ACCESS_KEY) {
160            if let Some(secret_access_key) = file_io_props.get(AWS_SECRET_ACCESS_KEY) {
161                file_io_props.insert(
162                    S3_SECRET_ACCESS_KEY.to_string(),
163                    secret_access_key.to_string(),
164                );
165            }
166        }
167        if !file_io_props.contains_key(S3_REGION) {
168            if let Some(region) = file_io_props.get(AWS_REGION_NAME) {
169                file_io_props.insert(S3_REGION.to_string(), region.to_string());
170            }
171        }
172        if !file_io_props.contains_key(S3_SESSION_TOKEN) {
173            if let Some(session_token) = file_io_props.get(AWS_SESSION_TOKEN) {
174                file_io_props.insert(S3_SESSION_TOKEN.to_string(), session_token.to_string());
175            }
176        }
177        if !file_io_props.contains_key(S3_ENDPOINT) {
178            if let Some(aws_endpoint) = config.uri.as_ref() {
179                file_io_props.insert(S3_ENDPOINT.to_string(), aws_endpoint.to_string());
180            }
181        }
182
183        let client = aws_sdk_glue::Client::new(&sdk_config);
184
185        let file_io = FileIO::from_path(&config.warehouse)?
186            .with_props(file_io_props)
187            .build()?;
188
189        Ok(GlueCatalog {
190            config,
191            client: GlueClient(client),
192            file_io,
193        })
194    }
195    /// Get the catalogs `FileIO`
196    pub fn file_io(&self) -> FileIO {
197        self.file_io.clone()
198    }
199
200    /// Loads a table from the Glue Catalog along with its version_id for optimistic locking.
201    ///
202    /// # Returns
203    /// A `Result` wrapping a tuple of (`Table`, `Option<String>`) where the String is the version_id
204    /// from Glue that should be used for optimistic concurrency control when updating the table.
205    ///
206    /// # Errors
207    /// This function may return an error in several scenarios, including:
208    /// - Failure to validate the namespace.
209    /// - Failure to retrieve the table from the Glue Catalog.
210    /// - Absence of metadata location information in the table's properties.
211    /// - Issues reading or deserializing the table's metadata file.
212    async fn load_table_with_version_id(
213        &self,
214        table: &TableIdent,
215    ) -> Result<(Table, Option<String>)> {
216        let db_name = validate_namespace(table.namespace())?;
217        let table_name = table.name();
218
219        let builder = self
220            .client
221            .0
222            .get_table()
223            .database_name(&db_name)
224            .name(table_name);
225        let builder = with_catalog_id!(builder, self.config);
226
227        let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
228
229        let glue_table = glue_table_output.table().ok_or_else(|| {
230            Error::new(
231                ErrorKind::TableNotFound,
232                format!(
233                    "Table object for database: {db_name} and table: {table_name} does not exist"
234                ),
235            )
236        })?;
237
238        let version_id = glue_table.version_id.clone();
239        let metadata_location = get_metadata_location(&glue_table.parameters)?;
240
241        let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
242
243        let table = Table::builder()
244            .file_io(self.file_io())
245            .metadata_location(metadata_location)
246            .metadata(metadata)
247            .identifier(TableIdent::new(
248                NamespaceIdent::new(db_name),
249                table_name.to_owned(),
250            ))
251            .build()?;
252
253        Ok((table, version_id))
254    }
255}
256
257#[async_trait]
258impl Catalog for GlueCatalog {
259    /// List namespaces from glue catalog.
260    ///
261    /// Glue doesn't support nested namespaces.
262    /// We will return an empty list if parent is some.
263    async fn list_namespaces(
264        &self,
265        parent: Option<&NamespaceIdent>,
266    ) -> Result<Vec<NamespaceIdent>> {
267        if parent.is_some() {
268            return Ok(vec![]);
269        }
270
271        let mut database_list: Vec<NamespaceIdent> = Vec::new();
272        let mut next_token: Option<String> = None;
273
274        loop {
275            let builder = match &next_token {
276                Some(token) => self.client.0.get_databases().next_token(token),
277                None => self.client.0.get_databases(),
278            };
279            let builder = with_catalog_id!(builder, self.config);
280            let resp = builder.send().await.map_err(from_aws_sdk_error)?;
281
282            let dbs: Vec<NamespaceIdent> = resp
283                .database_list()
284                .iter()
285                .map(|db| NamespaceIdent::new(db.name().to_string()))
286                .collect();
287
288            database_list.extend(dbs);
289
290            next_token = resp.next_token().map(ToOwned::to_owned);
291            if next_token.is_none() {
292                break;
293            }
294        }
295
296        Ok(database_list)
297    }
298
299    /// Creates a new namespace with the given identifier and properties.
300    ///
301    /// Attempts to create a namespace defined by the `namespace`
302    /// parameter and configured with the specified `properties`.
303    ///
304    /// This function can return an error in the following situations:
305    ///
306    /// - Errors from `validate_namespace` if the namespace identifier does not
307    /// meet validation criteria.
308    /// - Errors from `convert_to_database` if the properties cannot be
309    /// successfully converted into a database configuration.
310    /// - Errors from the underlying database creation process, converted using
311    /// `from_sdk_error`.
312    async fn create_namespace(
313        &self,
314        namespace: &NamespaceIdent,
315        properties: HashMap<String, String>,
316    ) -> Result<Namespace> {
317        let db_input = convert_to_database(namespace, &properties)?;
318
319        let builder = self.client.0.create_database().database_input(db_input);
320        let builder = with_catalog_id!(builder, self.config);
321
322        builder.send().await.map_err(from_aws_sdk_error)?;
323
324        Ok(Namespace::with_properties(namespace.clone(), properties))
325    }
326
327    /// Retrieves a namespace by its identifier.
328    ///
329    /// Validates the given namespace identifier and then queries the
330    /// underlying database client to fetch the corresponding namespace data.
331    /// Constructs a `Namespace` object with the retrieved data and returns it.
332    ///
333    /// This function can return an error in any of the following situations:
334    /// - If the provided namespace identifier fails validation checks
335    /// - If there is an error querying the database, returned by
336    /// `from_sdk_error`.
337    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
338        let db_name = validate_namespace(namespace)?;
339
340        let builder = self.client.0.get_database().name(&db_name);
341        let builder = with_catalog_id!(builder, self.config);
342
343        let resp = builder.send().await.map_err(from_aws_sdk_error)?;
344
345        match resp.database() {
346            Some(db) => {
347                let namespace = convert_to_namespace(db);
348                Ok(namespace)
349            }
350            None => Err(Error::new(
351                ErrorKind::DataInvalid,
352                format!("Database with name: {db_name} does not exist"),
353            )),
354        }
355    }
356
357    /// Checks if a namespace exists within the Glue Catalog.
358    ///
359    /// Validates the namespace identifier by querying the Glue Catalog
360    /// to determine if the specified namespace (database) exists.
361    ///
362    /// # Returns
363    /// A `Result<bool>` indicating the outcome of the check:
364    /// - `Ok(true)` if the namespace exists.
365    /// - `Ok(false)` if the namespace does not exist, identified by a specific
366    /// `EntityNotFoundException` variant.
367    /// - `Err(...)` if an error occurs during validation or the Glue Catalog
368    /// query, with the error encapsulating the issue.
369    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
370        let db_name = validate_namespace(namespace)?;
371
372        let builder = self.client.0.get_database().name(&db_name);
373        let builder = with_catalog_id!(builder, self.config);
374
375        let resp = builder.send().await;
376
377        match resp {
378            Ok(_) => Ok(true),
379            Err(err) => {
380                if err
381                    .as_service_error()
382                    .map(|e| e.is_entity_not_found_exception())
383                    == Some(true)
384                {
385                    return Ok(false);
386                }
387                Err(from_aws_sdk_error(err))
388            }
389        }
390    }
391
392    /// Asynchronously updates properties of an existing namespace.
393    ///
394    /// Converts the given namespace identifier and properties into a database
395    /// representation and then attempts to update the corresponding namespace
396    /// in the Glue Catalog.
397    ///
398    /// # Returns
399    /// Returns `Ok(())` if the namespace update is successful. If the
400    /// namespace cannot be updated due to missing information or an error
401    /// during the update process, an `Err(...)` is returned.
402    async fn update_namespace(
403        &self,
404        namespace: &NamespaceIdent,
405        properties: HashMap<String, String>,
406    ) -> Result<()> {
407        let db_name = validate_namespace(namespace)?;
408        let db_input = convert_to_database(namespace, &properties)?;
409
410        let builder = self
411            .client
412            .0
413            .update_database()
414            .name(&db_name)
415            .database_input(db_input);
416        let builder = with_catalog_id!(builder, self.config);
417
418        builder.send().await.map_err(from_aws_sdk_error)?;
419
420        Ok(())
421    }
422
423    /// Asynchronously drops a namespace from the Glue Catalog.
424    ///
425    /// Checks if the namespace is empty. If it still contains tables the
426    /// namespace will not be dropped, but an error is returned instead.
427    ///
428    /// # Returns
429    /// A `Result<()>` indicating the outcome:
430    /// - `Ok(())` signifies successful namespace deletion.
431    /// - `Err(...)` signifies failure to drop the namespace due to validation
432    /// errors, connectivity issues, or Glue Catalog constraints.
433    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
434        let db_name = validate_namespace(namespace)?;
435        let table_list = self.list_tables(namespace).await?;
436
437        if !table_list.is_empty() {
438            return Err(Error::new(
439                ErrorKind::DataInvalid,
440                format!("Database with name: {} is not empty", &db_name),
441            ));
442        }
443
444        let builder = self.client.0.delete_database().name(db_name);
445        let builder = with_catalog_id!(builder, self.config);
446
447        builder.send().await.map_err(from_aws_sdk_error)?;
448
449        Ok(())
450    }
451
452    /// Asynchronously lists all tables within a specified namespace.
453    ///
454    /// # Returns
455    /// A `Result<Vec<TableIdent>>`, which is:
456    /// - `Ok(vec![...])` containing a vector of `TableIdent` instances, each
457    /// representing a table within the specified namespace.
458    /// - `Err(...)` if an error occurs during namespace validation or while
459    /// querying the database.
460    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
461        let db_name = validate_namespace(namespace)?;
462
463        let mut table_list: Vec<TableIdent> = Vec::new();
464        let mut next_token: Option<String> = None;
465
466        loop {
467            let builder = match &next_token {
468                Some(token) => self
469                    .client
470                    .0
471                    .get_tables()
472                    .database_name(&db_name)
473                    .next_token(token),
474                None => self.client.0.get_tables().database_name(&db_name),
475            };
476            let builder = with_catalog_id!(builder, self.config);
477            let resp = builder.send().await.map_err(from_aws_sdk_error)?;
478
479            let tables: Vec<_> = resp
480                .table_list()
481                .iter()
482                .map(|tbl| TableIdent::new(namespace.clone(), tbl.name().to_string()))
483                .collect();
484
485            table_list.extend(tables);
486
487            next_token = resp.next_token().map(ToOwned::to_owned);
488            if next_token.is_none() {
489                break;
490            }
491        }
492
493        Ok(table_list)
494    }
495
496    /// Creates a new table within a specified namespace using the provided
497    /// table creation settings.
498    ///
499    /// # Returns
500    /// A `Result` wrapping a `Table` object representing the newly created
501    /// table.
502    ///
503    /// # Errors
504    /// This function may return an error in several cases, including invalid
505    /// namespace identifiers, failure to determine a default storage location,
506    /// issues generating or writing table metadata, and errors communicating
507    /// with the Glue Catalog.
508    async fn create_table(
509        &self,
510        namespace: &NamespaceIdent,
511        mut creation: TableCreation,
512    ) -> Result<Table> {
513        let db_name = validate_namespace(namespace)?;
514        let table_name = creation.name.clone();
515
516        let location = match &creation.location {
517            Some(location) => location.clone(),
518            None => {
519                let ns = self.get_namespace(namespace).await?;
520                let location =
521                    get_default_table_location(&ns, &db_name, &table_name, &self.config.warehouse);
522                creation.location = Some(location.clone());
523                location
524            }
525        };
526        let metadata = TableMetadataBuilder::from_table_creation(creation)?
527            .build()?
528            .metadata;
529        let metadata_location =
530            MetadataLocation::new_with_table_location(location.clone()).to_string();
531
532        metadata.write_to(&self.file_io, &metadata_location).await?;
533
534        let glue_table = convert_to_glue_table(
535            &table_name,
536            metadata_location.clone(),
537            &metadata,
538            metadata.properties(),
539            None,
540        )?;
541
542        let builder = self
543            .client
544            .0
545            .create_table()
546            .database_name(&db_name)
547            .table_input(glue_table);
548        let builder = with_catalog_id!(builder, self.config);
549
550        builder.send().await.map_err(from_aws_sdk_error)?;
551
552        Table::builder()
553            .file_io(self.file_io())
554            .metadata_location(metadata_location)
555            .metadata(metadata)
556            .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
557            .build()
558    }
559
560    /// Loads a table from the Glue Catalog and constructs a `Table` object
561    /// based on its metadata.
562    ///
563    /// # Returns
564    /// A `Result` wrapping a `Table` object that represents the loaded table.
565    ///
566    /// # Errors
567    /// This function may return an error in several scenarios, including:
568    /// - Failure to validate the namespace.
569    /// - Failure to retrieve the table from the Glue Catalog.
570    /// - Absence of metadata location information in the table's properties.
571    /// - Issues reading or deserializing the table's metadata file.
572    async fn load_table(&self, table: &TableIdent) -> Result<Table> {
573        let (table, _) = self.load_table_with_version_id(table).await?;
574        Ok(table)
575    }
576
577    /// Asynchronously drops a table from the database.
578    ///
579    /// # Errors
580    /// Returns an error if:
581    /// - The namespace provided in `table` cannot be validated
582    /// or does not exist.
583    /// - The underlying database client encounters an error while
584    /// attempting to drop the table. This includes scenarios where
585    /// the table does not exist.
586    /// - Any network or communication error occurs with the database backend.
587    async fn drop_table(&self, table: &TableIdent) -> Result<()> {
588        let db_name = validate_namespace(table.namespace())?;
589        let table_name = table.name();
590
591        let builder = self
592            .client
593            .0
594            .delete_table()
595            .database_name(&db_name)
596            .name(table_name);
597        let builder = with_catalog_id!(builder, self.config);
598
599        builder.send().await.map_err(from_aws_sdk_error)?;
600
601        Ok(())
602    }
603
604    /// Asynchronously checks the existence of a specified table
605    /// in the database.
606    ///
607    /// # Returns
608    /// - `Ok(true)` if the table exists in the database.
609    /// - `Ok(false)` if the table does not exist in the database.
610    /// - `Err(...)` if an error occurs during the process
611    async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
612        let db_name = validate_namespace(table.namespace())?;
613        let table_name = table.name();
614
615        let builder = self
616            .client
617            .0
618            .get_table()
619            .database_name(&db_name)
620            .name(table_name);
621        let builder = with_catalog_id!(builder, self.config);
622
623        let resp = builder.send().await;
624
625        match resp {
626            Ok(_) => Ok(true),
627            Err(err) => {
628                if err
629                    .as_service_error()
630                    .map(|e| e.is_entity_not_found_exception())
631                    == Some(true)
632                {
633                    return Ok(false);
634                }
635                Err(from_aws_sdk_error(err))
636            }
637        }
638    }
639
640    /// Asynchronously renames a table within the database
641    /// or moves it between namespaces (databases).
642    ///
643    /// # Returns
644    /// - `Ok(())` on successful rename or move of the table.
645    /// - `Err(...)` if an error occurs during the process.
646    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
647        let src_db_name = validate_namespace(src.namespace())?;
648        let dest_db_name = validate_namespace(dest.namespace())?;
649
650        let src_table_name = src.name();
651        let dest_table_name = dest.name();
652
653        let builder = self
654            .client
655            .0
656            .get_table()
657            .database_name(&src_db_name)
658            .name(src_table_name);
659        let builder = with_catalog_id!(builder, self.config);
660
661        let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
662
663        match glue_table_output.table() {
664            None => Err(Error::new(
665                ErrorKind::TableNotFound,
666                format!(
667                    "'Table' object for database: {src_db_name} and table: {src_table_name} does not exist"
668                ),
669            )),
670            Some(table) => {
671                let rename_table_input = TableInput::builder()
672                    .name(dest_table_name)
673                    .set_parameters(table.parameters.clone())
674                    .set_storage_descriptor(table.storage_descriptor.clone())
675                    .set_table_type(table.table_type.clone())
676                    .set_description(table.description.clone())
677                    .build()
678                    .map_err(from_aws_build_error)?;
679
680                let builder = self
681                    .client
682                    .0
683                    .create_table()
684                    .database_name(&dest_db_name)
685                    .table_input(rename_table_input);
686                let builder = with_catalog_id!(builder, self.config);
687
688                builder.send().await.map_err(from_aws_sdk_error)?;
689
690                let drop_src_table_result = self.drop_table(src).await;
691
692                match drop_src_table_result {
693                    Ok(_) => Ok(()),
694                    Err(_) => {
695                        let err_msg_src_table =
696                            format!("Failed to drop old table {src_db_name}.{src_table_name}.");
697
698                        let drop_dest_table_result = self.drop_table(dest).await;
699
700                        match drop_dest_table_result {
701                            Ok(_) => Err(Error::new(
702                                ErrorKind::Unexpected,
703                                format!(
704                                    "{err_msg_src_table} Rolled back table creation for {dest_db_name}.{dest_table_name}."
705                                ),
706                            )),
707                            Err(_) => Err(Error::new(
708                                ErrorKind::Unexpected,
709                                format!(
710                                    "{err_msg_src_table} Failed to roll back table creation for {dest_db_name}.{dest_table_name}. Please clean up manually."
711                                ),
712                            )),
713                        }
714                    }
715                }
716            }
717        }
718    }
719
720    /// registers an existing table into the Glue Catalog.
721    ///
722    /// Converts the provided table identifier and metadata location into a
723    /// Glue-compatible table representation, and attempts to create the
724    /// corresponding table in the Glue Catalog.
725    ///
726    /// # Returns
727    /// Returns `Ok(Table)` if the table is successfully registered and loaded.
728    /// If the registration fails due to validation issues, existing table conflicts,
729    /// metadata problems, or errors during the registration or loading process,
730    /// an `Err(...)` is returned.
731    async fn register_table(
732        &self,
733        table_ident: &TableIdent,
734        metadata_location: String,
735    ) -> Result<Table> {
736        let db_name = validate_namespace(table_ident.namespace())?;
737        let table_name = table_ident.name();
738        let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
739
740        let table_input = convert_to_glue_table(
741            table_name,
742            metadata_location.clone(),
743            &metadata,
744            metadata.properties(),
745            None,
746        )?;
747
748        let builder = self
749            .client
750            .0
751            .create_table()
752            .database_name(&db_name)
753            .table_input(table_input);
754        let builder = with_catalog_id!(builder, self.config);
755
756        builder.send().await.map_err(|e| {
757            let error = e.into_service_error();
758            match error {
759                CreateTableError::EntityNotFoundException(_) => Error::new(
760                    ErrorKind::NamespaceNotFound,
761                    format!("Database {db_name} does not exist"),
762                ),
763                CreateTableError::AlreadyExistsException(_) => Error::new(
764                    ErrorKind::TableAlreadyExists,
765                    format!("Table {table_ident} already exists"),
766                ),
767                _ => Error::new(
768                    ErrorKind::Unexpected,
769                    format!("Failed to register table {table_ident} due to AWS SDK error"),
770                ),
771            }
772            .with_source(anyhow!("aws sdk error: {error:?}"))
773        })?;
774
775        Ok(Table::builder()
776            .identifier(table_ident.clone())
777            .metadata_location(metadata_location)
778            .metadata(metadata)
779            .file_io(self.file_io())
780            .build()?)
781    }
782
783    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
784        let table_ident = commit.identifier().clone();
785        let table_namespace = validate_namespace(table_ident.namespace())?;
786
787        let (current_table, current_version_id) =
788            self.load_table_with_version_id(&table_ident).await?;
789        let current_metadata_location = current_table.metadata_location_result()?.to_string();
790
791        let staged_table = commit.apply(current_table)?;
792        let staged_metadata_location = staged_table.metadata_location_result()?;
793
794        // Write new metadata
795        staged_table
796            .metadata()
797            .write_to(staged_table.file_io(), staged_metadata_location)
798            .await?;
799
800        // Persist staged table to Glue with optimistic locking
801        let mut builder = self
802            .client
803            .0
804            .update_table()
805            .database_name(table_namespace)
806            .set_skip_archive(Some(true)) // todo make this configurable
807            .table_input(convert_to_glue_table(
808                table_ident.name(),
809                staged_metadata_location.to_string(),
810                staged_table.metadata(),
811                staged_table.metadata().properties(),
812                Some(current_metadata_location),
813            )?);
814
815        // Add VersionId for optimistic locking
816        if let Some(version_id) = current_version_id {
817            builder = builder.version_id(version_id);
818        }
819
820        let builder = with_catalog_id!(builder, self.config);
821        let _ = builder.send().await.map_err(|e| {
822            let error = e.into_service_error();
823            match error {
824                UpdateTableError::EntityNotFoundException(_) => Error::new(
825                    ErrorKind::TableNotFound,
826                    format!("Table {table_ident} is not found"),
827                ),
828                UpdateTableError::ConcurrentModificationException(_) => Error::new(
829                    ErrorKind::CatalogCommitConflicts,
830                    format!("Commit failed for table: {table_ident}"),
831                )
832                .with_retryable(true),
833                _ => Error::new(
834                    ErrorKind::Unexpected,
835                    format!("Operation failed for table: {table_ident} for hitting aws sdk error"),
836                ),
837            }
838            .with_source(anyhow!("aws sdk error: {error:?}"))
839        })?;
840
841        Ok(staged_table)
842    }
843}