1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use anyhow::anyhow;
24use async_trait::async_trait;
25use aws_sdk_glue::operation::create_table::CreateTableError;
26use aws_sdk_glue::operation::update_table::UpdateTableError;
27use aws_sdk_glue::types::TableInput;
28use iceberg::io::{
29 FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
30 S3_SESSION_TOKEN, StorageFactory,
31};
32use iceberg::spec::{TableMetadata, TableMetadataBuilder};
33use iceberg::table::Table;
34use iceberg::{
35 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
36 TableCommit, TableCreation, TableIdent,
37};
38use iceberg_storage_opendal::OpenDalStorageFactory;
39
40use crate::error::{from_aws_build_error, from_aws_sdk_error};
41use crate::utils::{
42 convert_to_database, convert_to_glue_table, convert_to_namespace, create_sdk_config,
43 get_default_table_location, get_metadata_location, validate_namespace,
44};
45use crate::{
46 AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, with_catalog_id,
47};
48
49pub const GLUE_CATALOG_PROP_URI: &str = "uri";
51pub const GLUE_CATALOG_PROP_CATALOG_ID: &str = "catalog_id";
53pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
55
56#[derive(Debug)]
58pub struct GlueCatalogBuilder {
59 config: GlueCatalogConfig,
60 storage_factory: Option<Arc<dyn StorageFactory>>,
61}
62
63impl Default for GlueCatalogBuilder {
64 fn default() -> Self {
65 Self {
66 config: GlueCatalogConfig {
67 name: None,
68 uri: None,
69 catalog_id: None,
70 warehouse: "".to_string(),
71 props: HashMap::new(),
72 },
73 storage_factory: None,
74 }
75 }
76}
77
78impl CatalogBuilder for GlueCatalogBuilder {
79 type C = GlueCatalog;
80
81 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
82 self.storage_factory = Some(storage_factory);
83 self
84 }
85
86 fn load(
87 mut self,
88 name: impl Into<String>,
89 props: HashMap<String, String>,
90 ) -> impl Future<Output = Result<Self::C>> + Send {
91 self.config.name = Some(name.into());
92
93 if props.contains_key(GLUE_CATALOG_PROP_URI) {
94 self.config.uri = props.get(GLUE_CATALOG_PROP_URI).cloned()
95 }
96
97 if props.contains_key(GLUE_CATALOG_PROP_CATALOG_ID) {
98 self.config.catalog_id = props.get(GLUE_CATALOG_PROP_CATALOG_ID).cloned()
99 }
100
101 if props.contains_key(GLUE_CATALOG_PROP_WAREHOUSE) {
102 self.config.warehouse = props
103 .get(GLUE_CATALOG_PROP_WAREHOUSE)
104 .cloned()
105 .unwrap_or_default();
106 }
107
108 self.config.props = props
110 .into_iter()
111 .filter(|(k, _)| {
112 k != GLUE_CATALOG_PROP_URI
113 && k != GLUE_CATALOG_PROP_CATALOG_ID
114 && k != GLUE_CATALOG_PROP_WAREHOUSE
115 })
116 .collect();
117
118 async move {
119 if self.config.name.is_none() {
120 return Err(Error::new(
121 ErrorKind::DataInvalid,
122 "Catalog name is required",
123 ));
124 }
125 if self.config.warehouse.is_empty() {
126 return Err(Error::new(
127 ErrorKind::DataInvalid,
128 "Catalog warehouse is required",
129 ));
130 }
131
132 GlueCatalog::new(self.config, self.storage_factory).await
133 }
134 }
135}
136
137#[derive(Debug)]
138pub(crate) struct GlueCatalogConfig {
140 name: Option<String>,
141 uri: Option<String>,
142 catalog_id: Option<String>,
143 warehouse: String,
144 props: HashMap<String, String>,
145}
146
147struct GlueClient(aws_sdk_glue::Client);
148
149pub struct GlueCatalog {
151 config: GlueCatalogConfig,
152 client: GlueClient,
153 file_io: FileIO,
154}
155
156impl Debug for GlueCatalog {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.debug_struct("GlueCatalog")
159 .field("config", &self.config)
160 .finish_non_exhaustive()
161 }
162}
163
164impl GlueCatalog {
165 async fn new(
167 config: GlueCatalogConfig,
168 storage_factory: Option<Arc<dyn StorageFactory>>,
169 ) -> Result<Self> {
170 let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await;
171 let mut file_io_props = config.props.clone();
172 if !file_io_props.contains_key(S3_ACCESS_KEY_ID)
173 && let Some(access_key_id) = file_io_props.get(AWS_ACCESS_KEY_ID)
174 {
175 file_io_props.insert(S3_ACCESS_KEY_ID.to_string(), access_key_id.to_string());
176 }
177 if !file_io_props.contains_key(S3_SECRET_ACCESS_KEY)
178 && let Some(secret_access_key) = file_io_props.get(AWS_SECRET_ACCESS_KEY)
179 {
180 file_io_props.insert(
181 S3_SECRET_ACCESS_KEY.to_string(),
182 secret_access_key.to_string(),
183 );
184 }
185 if !file_io_props.contains_key(S3_REGION)
186 && let Some(region) = file_io_props.get(AWS_REGION_NAME)
187 {
188 file_io_props.insert(S3_REGION.to_string(), region.to_string());
189 }
190 if !file_io_props.contains_key(S3_SESSION_TOKEN)
191 && let Some(session_token) = file_io_props.get(AWS_SESSION_TOKEN)
192 {
193 file_io_props.insert(S3_SESSION_TOKEN.to_string(), session_token.to_string());
194 }
195 if !file_io_props.contains_key(S3_ENDPOINT)
196 && let Some(aws_endpoint) = config.uri.as_ref()
197 {
198 file_io_props.insert(S3_ENDPOINT.to_string(), aws_endpoint.to_string());
199 }
200
201 let client = aws_sdk_glue::Client::new(&sdk_config);
202
203 let factory = storage_factory.unwrap_or_else(|| {
205 Arc::new(OpenDalStorageFactory::S3 {
206 configured_scheme: "s3a".to_string(),
207 customized_credential_load: None,
208 })
209 });
210 let file_io = FileIOBuilder::new(factory)
211 .with_props(file_io_props)
212 .build();
213
214 Ok(GlueCatalog {
215 config,
216 client: GlueClient(client),
217 file_io,
218 })
219 }
220 pub fn file_io(&self) -> FileIO {
222 self.file_io.clone()
223 }
224
225 async fn load_table_with_version_id(
238 &self,
239 table: &TableIdent,
240 ) -> Result<(Table, Option<String>)> {
241 let db_name = validate_namespace(table.namespace())?;
242 let table_name = table.name();
243
244 let builder = self
245 .client
246 .0
247 .get_table()
248 .database_name(&db_name)
249 .name(table_name);
250 let builder = with_catalog_id!(builder, self.config);
251
252 let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
253
254 let glue_table = glue_table_output.table().ok_or_else(|| {
255 Error::new(
256 ErrorKind::TableNotFound,
257 format!(
258 "Table object for database: {db_name} and table: {table_name} does not exist"
259 ),
260 )
261 })?;
262
263 let version_id = glue_table.version_id.clone();
264 let metadata_location = get_metadata_location(&glue_table.parameters)?;
265
266 let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
267
268 let table = Table::builder()
269 .file_io(self.file_io())
270 .metadata_location(metadata_location)
271 .metadata(metadata)
272 .identifier(TableIdent::new(
273 NamespaceIdent::new(db_name),
274 table_name.to_owned(),
275 ))
276 .build()?;
277
278 Ok((table, version_id))
279 }
280}
281
282#[async_trait]
283impl Catalog for GlueCatalog {
284 async fn list_namespaces(
289 &self,
290 parent: Option<&NamespaceIdent>,
291 ) -> Result<Vec<NamespaceIdent>> {
292 if parent.is_some() {
293 return Ok(vec![]);
294 }
295
296 let mut database_list: Vec<NamespaceIdent> = Vec::new();
297 let mut next_token: Option<String> = None;
298
299 loop {
300 let builder = match &next_token {
301 Some(token) => self.client.0.get_databases().next_token(token),
302 None => self.client.0.get_databases(),
303 };
304 let builder = with_catalog_id!(builder, self.config);
305 let resp = builder.send().await.map_err(from_aws_sdk_error)?;
306
307 let dbs: Vec<NamespaceIdent> = resp
308 .database_list()
309 .iter()
310 .map(|db| NamespaceIdent::new(db.name().to_string()))
311 .collect();
312
313 database_list.extend(dbs);
314
315 next_token = resp.next_token().map(ToOwned::to_owned);
316 if next_token.is_none() {
317 break;
318 }
319 }
320
321 Ok(database_list)
322 }
323
324 async fn create_namespace(
338 &self,
339 namespace: &NamespaceIdent,
340 properties: HashMap<String, String>,
341 ) -> Result<Namespace> {
342 if self.namespace_exists(namespace).await? {
343 return Err(Error::new(
344 ErrorKind::NamespaceAlreadyExists,
345 format!("Namespace {namespace:?} already exists"),
346 ));
347 }
348
349 let db_input = convert_to_database(namespace, &properties)?;
350
351 let builder = self.client.0.create_database().database_input(db_input);
352 let builder = with_catalog_id!(builder, self.config);
353
354 builder.send().await.map_err(from_aws_sdk_error)?;
355
356 Ok(Namespace::with_properties(namespace.clone(), properties))
357 }
358
359 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
370 let db_name = validate_namespace(namespace)?;
371
372 let builder = self.client.0.get_database().name(&db_name);
373 let builder = with_catalog_id!(builder, self.config);
374
375 let resp = builder.send().await.map_err(|err| {
376 if err
377 .as_service_error()
378 .map(|e| e.is_entity_not_found_exception())
379 == Some(true)
380 {
381 return Error::new(
382 ErrorKind::NamespaceNotFound,
383 format!("Namespace {namespace:?} does not exist"),
384 );
385 }
386 from_aws_sdk_error(err)
387 })?;
388
389 match resp.database() {
390 Some(db) => {
391 let namespace = convert_to_namespace(db);
392 Ok(namespace)
393 }
394 None => Err(Error::new(
395 ErrorKind::NamespaceNotFound,
396 format!("Database with name: {db_name} does not exist"),
397 )),
398 }
399 }
400
401 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
414 let db_name = validate_namespace(namespace)?;
415
416 let builder = self.client.0.get_database().name(&db_name);
417 let builder = with_catalog_id!(builder, self.config);
418
419 let resp = builder.send().await;
420
421 match resp {
422 Ok(_) => Ok(true),
423 Err(err) => {
424 if err
425 .as_service_error()
426 .map(|e| e.is_entity_not_found_exception())
427 == Some(true)
428 {
429 return Ok(false);
430 }
431 Err(from_aws_sdk_error(err))
432 }
433 }
434 }
435
436 async fn update_namespace(
447 &self,
448 namespace: &NamespaceIdent,
449 properties: HashMap<String, String>,
450 ) -> Result<()> {
451 if !self.namespace_exists(namespace).await? {
452 return Err(Error::new(
453 ErrorKind::NamespaceNotFound,
454 format!("Namespace {namespace:?} does not exist"),
455 ));
456 }
457
458 let db_name = validate_namespace(namespace)?;
459 let db_input = convert_to_database(namespace, &properties)?;
460
461 let builder = self
462 .client
463 .0
464 .update_database()
465 .name(&db_name)
466 .database_input(db_input);
467 let builder = with_catalog_id!(builder, self.config);
468
469 builder.send().await.map_err(from_aws_sdk_error)?;
470
471 Ok(())
472 }
473
474 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
485 if !self.namespace_exists(namespace).await? {
486 return Err(Error::new(
487 ErrorKind::NamespaceNotFound,
488 format!("Namespace {namespace:?} does not exist"),
489 ));
490 }
491
492 let db_name = validate_namespace(namespace)?;
493 let table_list = self.list_tables(namespace).await?;
494
495 if !table_list.is_empty() {
496 return Err(Error::new(
497 ErrorKind::DataInvalid,
498 format!("Database with name: {} is not empty", &db_name),
499 ));
500 }
501
502 let builder = self.client.0.delete_database().name(db_name);
503 let builder = with_catalog_id!(builder, self.config);
504
505 builder.send().await.map_err(from_aws_sdk_error)?;
506
507 Ok(())
508 }
509
510 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
519 let db_name = validate_namespace(namespace)?;
520
521 let mut table_list: Vec<TableIdent> = Vec::new();
522 let mut next_token: Option<String> = None;
523
524 loop {
525 let builder = match &next_token {
526 Some(token) => self
527 .client
528 .0
529 .get_tables()
530 .database_name(&db_name)
531 .next_token(token),
532 None => self.client.0.get_tables().database_name(&db_name),
533 };
534 let builder = with_catalog_id!(builder, self.config);
535 let resp = builder.send().await.map_err(from_aws_sdk_error)?;
536
537 let tables: Vec<_> = resp
538 .table_list()
539 .iter()
540 .map(|tbl| TableIdent::new(namespace.clone(), tbl.name().to_string()))
541 .collect();
542
543 table_list.extend(tables);
544
545 next_token = resp.next_token().map(ToOwned::to_owned);
546 if next_token.is_none() {
547 break;
548 }
549 }
550
551 Ok(table_list)
552 }
553
554 async fn create_table(
567 &self,
568 namespace: &NamespaceIdent,
569 mut creation: TableCreation,
570 ) -> Result<Table> {
571 let db_name = validate_namespace(namespace)?;
572 let table_name = creation.name.clone();
573
574 let location = match &creation.location {
575 Some(location) => location.clone(),
576 None => {
577 let ns = self.get_namespace(namespace).await?;
578 let location =
579 get_default_table_location(&ns, &db_name, &table_name, &self.config.warehouse);
580 creation.location = Some(location.clone());
581 location
582 }
583 };
584 let metadata = TableMetadataBuilder::from_table_creation(creation)?
585 .build()?
586 .metadata;
587 let metadata_location = MetadataLocation::new_with_metadata(location.clone(), &metadata);
588
589 metadata.write_to(&self.file_io, &metadata_location).await?;
590
591 let metadata_location_str = metadata_location.to_string();
592 let glue_table = convert_to_glue_table(
593 &table_name,
594 metadata_location_str.clone(),
595 &metadata,
596 metadata.properties(),
597 None,
598 )?;
599
600 let builder = self
601 .client
602 .0
603 .create_table()
604 .database_name(&db_name)
605 .table_input(glue_table);
606 let builder = with_catalog_id!(builder, self.config);
607
608 builder.send().await.map_err(from_aws_sdk_error)?;
609
610 Table::builder()
611 .file_io(self.file_io())
612 .metadata_location(metadata_location_str)
613 .metadata(metadata)
614 .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
615 .build()
616 }
617
618 async fn load_table(&self, table: &TableIdent) -> Result<Table> {
631 let (table, _) = self.load_table_with_version_id(table).await?;
632 Ok(table)
633 }
634
635 async fn drop_table(&self, table: &TableIdent) -> Result<()> {
646 let db_name = validate_namespace(table.namespace())?;
647 let table_name = table.name();
648
649 let builder = self
650 .client
651 .0
652 .delete_table()
653 .database_name(&db_name)
654 .name(table_name);
655 let builder = with_catalog_id!(builder, self.config);
656
657 builder.send().await.map_err(from_aws_sdk_error)?;
658
659 Ok(())
660 }
661
662 async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
670 let db_name = validate_namespace(table.namespace())?;
671 let table_name = table.name();
672
673 let builder = self
674 .client
675 .0
676 .get_table()
677 .database_name(&db_name)
678 .name(table_name);
679 let builder = with_catalog_id!(builder, self.config);
680
681 let resp = builder.send().await;
682
683 match resp {
684 Ok(_) => Ok(true),
685 Err(err) => {
686 if err
687 .as_service_error()
688 .map(|e| e.is_entity_not_found_exception())
689 == Some(true)
690 {
691 return Ok(false);
692 }
693 Err(from_aws_sdk_error(err))
694 }
695 }
696 }
697
698 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
705 let src_db_name = validate_namespace(src.namespace())?;
706 let dest_db_name = validate_namespace(dest.namespace())?;
707
708 let src_table_name = src.name();
709 let dest_table_name = dest.name();
710
711 let builder = self
712 .client
713 .0
714 .get_table()
715 .database_name(&src_db_name)
716 .name(src_table_name);
717 let builder = with_catalog_id!(builder, self.config);
718
719 let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
720
721 match glue_table_output.table() {
722 None => Err(Error::new(
723 ErrorKind::TableNotFound,
724 format!(
725 "'Table' object for database: {src_db_name} and table: {src_table_name} does not exist"
726 ),
727 )),
728 Some(table) => {
729 let rename_table_input = TableInput::builder()
730 .name(dest_table_name)
731 .set_parameters(table.parameters.clone())
732 .set_storage_descriptor(table.storage_descriptor.clone())
733 .set_table_type(table.table_type.clone())
734 .set_description(table.description.clone())
735 .build()
736 .map_err(from_aws_build_error)?;
737
738 let builder = self
739 .client
740 .0
741 .create_table()
742 .database_name(&dest_db_name)
743 .table_input(rename_table_input);
744 let builder = with_catalog_id!(builder, self.config);
745
746 builder.send().await.map_err(from_aws_sdk_error)?;
747
748 let drop_src_table_result = self.drop_table(src).await;
749
750 match drop_src_table_result {
751 Ok(_) => Ok(()),
752 Err(_) => {
753 let err_msg_src_table =
754 format!("Failed to drop old table {src_db_name}.{src_table_name}.");
755
756 let drop_dest_table_result = self.drop_table(dest).await;
757
758 match drop_dest_table_result {
759 Ok(_) => Err(Error::new(
760 ErrorKind::Unexpected,
761 format!(
762 "{err_msg_src_table} Rolled back table creation for {dest_db_name}.{dest_table_name}."
763 ),
764 )),
765 Err(_) => Err(Error::new(
766 ErrorKind::Unexpected,
767 format!(
768 "{err_msg_src_table} Failed to roll back table creation for {dest_db_name}.{dest_table_name}. Please clean up manually."
769 ),
770 )),
771 }
772 }
773 }
774 }
775 }
776 }
777
778 async fn register_table(
790 &self,
791 table_ident: &TableIdent,
792 metadata_location: String,
793 ) -> Result<Table> {
794 let db_name = validate_namespace(table_ident.namespace())?;
795 let table_name = table_ident.name();
796 let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
797
798 let table_input = convert_to_glue_table(
799 table_name,
800 metadata_location.clone(),
801 &metadata,
802 metadata.properties(),
803 None,
804 )?;
805
806 let builder = self
807 .client
808 .0
809 .create_table()
810 .database_name(&db_name)
811 .table_input(table_input);
812 let builder = with_catalog_id!(builder, self.config);
813
814 builder.send().await.map_err(|e| {
815 let error = e.into_service_error();
816 match error {
817 CreateTableError::EntityNotFoundException(_) => Error::new(
818 ErrorKind::NamespaceNotFound,
819 format!("Database {db_name} does not exist"),
820 ),
821 CreateTableError::AlreadyExistsException(_) => Error::new(
822 ErrorKind::TableAlreadyExists,
823 format!("Table {table_ident} already exists"),
824 ),
825 _ => Error::new(
826 ErrorKind::Unexpected,
827 format!("Failed to register table {table_ident} due to AWS SDK error"),
828 ),
829 }
830 .with_source(anyhow!("aws sdk error: {error:?}"))
831 })?;
832
833 Ok(Table::builder()
834 .identifier(table_ident.clone())
835 .metadata_location(metadata_location)
836 .metadata(metadata)
837 .file_io(self.file_io())
838 .build()?)
839 }
840
841 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
842 let table_ident = commit.identifier().clone();
843 let table_namespace = validate_namespace(table_ident.namespace())?;
844
845 let (current_table, current_version_id) =
846 self.load_table_with_version_id(&table_ident).await?;
847 let current_metadata_location = current_table.metadata_location_result()?.to_string();
848
849 let staged_table = commit.apply(current_table)?;
850 let staged_metadata_location_str = staged_table.metadata_location_result()?;
851 let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
852
853 staged_table
855 .metadata()
856 .write_to(staged_table.file_io(), &staged_metadata_location)
857 .await?;
858
859 let mut builder = self
861 .client
862 .0
863 .update_table()
864 .database_name(table_namespace)
865 .set_skip_archive(Some(true)) .table_input(convert_to_glue_table(
867 table_ident.name(),
868 staged_metadata_location.to_string(),
869 staged_table.metadata(),
870 staged_table.metadata().properties(),
871 Some(current_metadata_location),
872 )?);
873
874 if let Some(version_id) = current_version_id {
876 builder = builder.version_id(version_id);
877 }
878
879 let builder = with_catalog_id!(builder, self.config);
880 let _ = builder.send().await.map_err(|e| {
881 let error = e.into_service_error();
882 match error {
883 UpdateTableError::EntityNotFoundException(_) => Error::new(
884 ErrorKind::TableNotFound,
885 format!("Table {table_ident} is not found"),
886 ),
887 UpdateTableError::ConcurrentModificationException(_) => Error::new(
888 ErrorKind::CatalogCommitConflicts,
889 format!("Commit failed for table: {table_ident}"),
890 )
891 .with_retryable(true),
892 _ => Error::new(
893 ErrorKind::Unexpected,
894 format!("Operation failed for table: {table_ident} for hitting aws sdk error"),
895 ),
896 }
897 .with_source(anyhow!("aws sdk error: {error:?}"))
898 })?;
899
900 Ok(staged_table)
901 }
902}