1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use anyhow::anyhow;
24use async_trait::async_trait;
25use aws_sdk_glue::operation::create_table::CreateTableError;
26use aws_sdk_glue::operation::update_table::UpdateTableError;
27use aws_sdk_glue::types::TableInput;
28use iceberg::io::{
29 FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
30 S3_SESSION_TOKEN, StorageFactory,
31};
32use iceberg::spec::{TableMetadata, TableMetadataBuilder};
33use iceberg::table::Table;
34use iceberg::{
35 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
36 TableCommit, TableCreation, TableIdent,
37};
38use iceberg_storage_opendal::OpenDalStorageFactory;
39
40use crate::error::{from_aws_build_error, from_aws_sdk_error};
41use crate::utils::{
42 convert_to_database, convert_to_glue_table, convert_to_namespace, create_sdk_config,
43 get_default_table_location, get_metadata_location, validate_namespace,
44};
45use crate::{
46 AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, with_catalog_id,
47};
48
49pub const GLUE_CATALOG_PROP_URI: &str = "uri";
51pub const GLUE_CATALOG_PROP_CATALOG_ID: &str = "catalog_id";
53pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
55
56#[derive(Debug)]
58pub struct GlueCatalogBuilder {
59 config: GlueCatalogConfig,
60 storage_factory: Option<Arc<dyn StorageFactory>>,
61}
62
63impl Default for GlueCatalogBuilder {
64 fn default() -> Self {
65 Self {
66 config: GlueCatalogConfig {
67 name: None,
68 uri: None,
69 catalog_id: None,
70 warehouse: "".to_string(),
71 props: HashMap::new(),
72 },
73 storage_factory: None,
74 }
75 }
76}
77
78impl CatalogBuilder for GlueCatalogBuilder {
79 type C = GlueCatalog;
80
81 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
82 self.storage_factory = Some(storage_factory);
83 self
84 }
85
86 fn load(
87 mut self,
88 name: impl Into<String>,
89 props: HashMap<String, String>,
90 ) -> impl Future<Output = Result<Self::C>> + Send {
91 self.config.name = Some(name.into());
92
93 if props.contains_key(GLUE_CATALOG_PROP_URI) {
94 self.config.uri = props.get(GLUE_CATALOG_PROP_URI).cloned()
95 }
96
97 if props.contains_key(GLUE_CATALOG_PROP_CATALOG_ID) {
98 self.config.catalog_id = props.get(GLUE_CATALOG_PROP_CATALOG_ID).cloned()
99 }
100
101 if props.contains_key(GLUE_CATALOG_PROP_WAREHOUSE) {
102 self.config.warehouse = props
103 .get(GLUE_CATALOG_PROP_WAREHOUSE)
104 .cloned()
105 .unwrap_or_default();
106 }
107
108 self.config.props = props
110 .into_iter()
111 .filter(|(k, _)| {
112 k != GLUE_CATALOG_PROP_URI
113 && k != GLUE_CATALOG_PROP_CATALOG_ID
114 && k != GLUE_CATALOG_PROP_WAREHOUSE
115 })
116 .collect();
117
118 async move {
119 if self.config.name.is_none() {
120 return Err(Error::new(
121 ErrorKind::DataInvalid,
122 "Catalog name is required",
123 ));
124 }
125 if self.config.warehouse.is_empty() {
126 return Err(Error::new(
127 ErrorKind::DataInvalid,
128 "Catalog warehouse is required",
129 ));
130 }
131
132 GlueCatalog::new(self.config, self.storage_factory).await
133 }
134 }
135}
136
137#[derive(Debug)]
138pub(crate) struct GlueCatalogConfig {
140 name: Option<String>,
141 uri: Option<String>,
142 catalog_id: Option<String>,
143 warehouse: String,
144 props: HashMap<String, String>,
145}
146
147struct GlueClient(aws_sdk_glue::Client);
148
149pub struct GlueCatalog {
151 config: GlueCatalogConfig,
152 client: GlueClient,
153 file_io: FileIO,
154}
155
156impl Debug for GlueCatalog {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.debug_struct("GlueCatalog")
159 .field("config", &self.config)
160 .finish_non_exhaustive()
161 }
162}
163
164impl GlueCatalog {
165 async fn new(
167 config: GlueCatalogConfig,
168 storage_factory: Option<Arc<dyn StorageFactory>>,
169 ) -> Result<Self> {
170 let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await;
171 let mut file_io_props = config.props.clone();
172 if !file_io_props.contains_key(S3_ACCESS_KEY_ID)
173 && let Some(access_key_id) = file_io_props.get(AWS_ACCESS_KEY_ID)
174 {
175 file_io_props.insert(S3_ACCESS_KEY_ID.to_string(), access_key_id.to_string());
176 }
177 if !file_io_props.contains_key(S3_SECRET_ACCESS_KEY)
178 && let Some(secret_access_key) = file_io_props.get(AWS_SECRET_ACCESS_KEY)
179 {
180 file_io_props.insert(
181 S3_SECRET_ACCESS_KEY.to_string(),
182 secret_access_key.to_string(),
183 );
184 }
185 if !file_io_props.contains_key(S3_REGION)
186 && let Some(region) = file_io_props.get(AWS_REGION_NAME)
187 {
188 file_io_props.insert(S3_REGION.to_string(), region.to_string());
189 }
190 if !file_io_props.contains_key(S3_SESSION_TOKEN)
191 && let Some(session_token) = file_io_props.get(AWS_SESSION_TOKEN)
192 {
193 file_io_props.insert(S3_SESSION_TOKEN.to_string(), session_token.to_string());
194 }
195 if !file_io_props.contains_key(S3_ENDPOINT)
196 && let Some(aws_endpoint) = config.uri.as_ref()
197 {
198 file_io_props.insert(S3_ENDPOINT.to_string(), aws_endpoint.to_string());
199 }
200
201 let client = aws_sdk_glue::Client::new(&sdk_config);
202
203 let factory = storage_factory.unwrap_or_else(|| {
205 Arc::new(OpenDalStorageFactory::S3 {
206 customized_credential_load: None,
207 })
208 });
209 let file_io = FileIOBuilder::new(factory)
210 .with_props(file_io_props)
211 .build();
212
213 Ok(GlueCatalog {
214 config,
215 client: GlueClient(client),
216 file_io,
217 })
218 }
219 pub fn file_io(&self) -> FileIO {
221 self.file_io.clone()
222 }
223
224 async fn load_table_with_version_id(
237 &self,
238 table: &TableIdent,
239 ) -> Result<(Table, Option<String>)> {
240 let db_name = validate_namespace(table.namespace())?;
241 let table_name = table.name();
242
243 let builder = self
244 .client
245 .0
246 .get_table()
247 .database_name(&db_name)
248 .name(table_name);
249 let builder = with_catalog_id!(builder, self.config);
250
251 let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
252
253 let glue_table = glue_table_output.table().ok_or_else(|| {
254 Error::new(
255 ErrorKind::TableNotFound,
256 format!(
257 "Table object for database: {db_name} and table: {table_name} does not exist"
258 ),
259 )
260 })?;
261
262 let version_id = glue_table.version_id.clone();
263 let metadata_location = get_metadata_location(&glue_table.parameters)?;
264
265 let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
266
267 let table = Table::builder()
268 .file_io(self.file_io())
269 .metadata_location(metadata_location)
270 .metadata(metadata)
271 .identifier(TableIdent::new(
272 NamespaceIdent::new(db_name),
273 table_name.to_owned(),
274 ))
275 .build()?;
276
277 Ok((table, version_id))
278 }
279}
280
281#[async_trait]
282impl Catalog for GlueCatalog {
283 async fn list_namespaces(
288 &self,
289 parent: Option<&NamespaceIdent>,
290 ) -> Result<Vec<NamespaceIdent>> {
291 if parent.is_some() {
292 return Ok(vec![]);
293 }
294
295 let mut database_list: Vec<NamespaceIdent> = Vec::new();
296 let mut next_token: Option<String> = None;
297
298 loop {
299 let builder = match &next_token {
300 Some(token) => self.client.0.get_databases().next_token(token),
301 None => self.client.0.get_databases(),
302 };
303 let builder = with_catalog_id!(builder, self.config);
304 let resp = builder.send().await.map_err(from_aws_sdk_error)?;
305
306 let dbs: Vec<NamespaceIdent> = resp
307 .database_list()
308 .iter()
309 .map(|db| NamespaceIdent::new(db.name().to_string()))
310 .collect();
311
312 database_list.extend(dbs);
313
314 next_token = resp.next_token().map(ToOwned::to_owned);
315 if next_token.is_none() {
316 break;
317 }
318 }
319
320 Ok(database_list)
321 }
322
323 async fn create_namespace(
337 &self,
338 namespace: &NamespaceIdent,
339 properties: HashMap<String, String>,
340 ) -> Result<Namespace> {
341 if self.namespace_exists(namespace).await? {
342 return Err(Error::new(
343 ErrorKind::NamespaceAlreadyExists,
344 format!("Namespace {namespace:?} already exists"),
345 ));
346 }
347
348 let db_input = convert_to_database(namespace, &properties)?;
349
350 let builder = self.client.0.create_database().database_input(db_input);
351 let builder = with_catalog_id!(builder, self.config);
352
353 builder.send().await.map_err(from_aws_sdk_error)?;
354
355 Ok(Namespace::with_properties(namespace.clone(), properties))
356 }
357
358 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
369 let db_name = validate_namespace(namespace)?;
370
371 let builder = self.client.0.get_database().name(&db_name);
372 let builder = with_catalog_id!(builder, self.config);
373
374 let resp = builder.send().await.map_err(|err| {
375 if err
376 .as_service_error()
377 .map(|e| e.is_entity_not_found_exception())
378 == Some(true)
379 {
380 return Error::new(
381 ErrorKind::NamespaceNotFound,
382 format!("Namespace {namespace:?} does not exist"),
383 );
384 }
385 from_aws_sdk_error(err)
386 })?;
387
388 match resp.database() {
389 Some(db) => {
390 let namespace = convert_to_namespace(db);
391 Ok(namespace)
392 }
393 None => Err(Error::new(
394 ErrorKind::NamespaceNotFound,
395 format!("Database with name: {db_name} does not exist"),
396 )),
397 }
398 }
399
400 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
413 let db_name = validate_namespace(namespace)?;
414
415 let builder = self.client.0.get_database().name(&db_name);
416 let builder = with_catalog_id!(builder, self.config);
417
418 let resp = builder.send().await;
419
420 match resp {
421 Ok(_) => Ok(true),
422 Err(err) => {
423 if err
424 .as_service_error()
425 .map(|e| e.is_entity_not_found_exception())
426 == Some(true)
427 {
428 return Ok(false);
429 }
430 Err(from_aws_sdk_error(err))
431 }
432 }
433 }
434
435 async fn update_namespace(
446 &self,
447 namespace: &NamespaceIdent,
448 properties: HashMap<String, String>,
449 ) -> Result<()> {
450 if !self.namespace_exists(namespace).await? {
451 return Err(Error::new(
452 ErrorKind::NamespaceNotFound,
453 format!("Namespace {namespace:?} does not exist"),
454 ));
455 }
456
457 let db_name = validate_namespace(namespace)?;
458 let db_input = convert_to_database(namespace, &properties)?;
459
460 let builder = self
461 .client
462 .0
463 .update_database()
464 .name(&db_name)
465 .database_input(db_input);
466 let builder = with_catalog_id!(builder, self.config);
467
468 builder.send().await.map_err(from_aws_sdk_error)?;
469
470 Ok(())
471 }
472
473 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
484 if !self.namespace_exists(namespace).await? {
485 return Err(Error::new(
486 ErrorKind::NamespaceNotFound,
487 format!("Namespace {namespace:?} does not exist"),
488 ));
489 }
490
491 let db_name = validate_namespace(namespace)?;
492 let table_list = self.list_tables(namespace).await?;
493
494 if !table_list.is_empty() {
495 return Err(Error::new(
496 ErrorKind::DataInvalid,
497 format!("Database with name: {} is not empty", &db_name),
498 ));
499 }
500
501 let builder = self.client.0.delete_database().name(db_name);
502 let builder = with_catalog_id!(builder, self.config);
503
504 builder.send().await.map_err(from_aws_sdk_error)?;
505
506 Ok(())
507 }
508
509 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
518 let db_name = validate_namespace(namespace)?;
519
520 let mut table_list: Vec<TableIdent> = Vec::new();
521 let mut next_token: Option<String> = None;
522
523 loop {
524 let builder = match &next_token {
525 Some(token) => self
526 .client
527 .0
528 .get_tables()
529 .database_name(&db_name)
530 .next_token(token),
531 None => self.client.0.get_tables().database_name(&db_name),
532 };
533 let builder = with_catalog_id!(builder, self.config);
534 let resp = builder.send().await.map_err(from_aws_sdk_error)?;
535
536 let tables: Vec<_> = resp
537 .table_list()
538 .iter()
539 .map(|tbl| TableIdent::new(namespace.clone(), tbl.name().to_string()))
540 .collect();
541
542 table_list.extend(tables);
543
544 next_token = resp.next_token().map(ToOwned::to_owned);
545 if next_token.is_none() {
546 break;
547 }
548 }
549
550 Ok(table_list)
551 }
552
553 async fn create_table(
566 &self,
567 namespace: &NamespaceIdent,
568 mut creation: TableCreation,
569 ) -> Result<Table> {
570 let db_name = validate_namespace(namespace)?;
571 let table_name = creation.name.clone();
572
573 let location = match &creation.location {
574 Some(location) => location.clone(),
575 None => {
576 let ns = self.get_namespace(namespace).await?;
577 let location =
578 get_default_table_location(&ns, &db_name, &table_name, &self.config.warehouse);
579 creation.location = Some(location.clone());
580 location
581 }
582 };
583 let metadata = TableMetadataBuilder::from_table_creation(creation)?
584 .build()?
585 .metadata;
586 let metadata_location = MetadataLocation::new_with_metadata(location.clone(), &metadata);
587
588 metadata.write_to(&self.file_io, &metadata_location).await?;
589
590 let metadata_location_str = metadata_location.to_string();
591 let glue_table = convert_to_glue_table(
592 &table_name,
593 metadata_location_str.clone(),
594 &metadata,
595 metadata.properties(),
596 None,
597 )?;
598
599 let builder = self
600 .client
601 .0
602 .create_table()
603 .database_name(&db_name)
604 .table_input(glue_table);
605 let builder = with_catalog_id!(builder, self.config);
606
607 builder.send().await.map_err(from_aws_sdk_error)?;
608
609 Table::builder()
610 .file_io(self.file_io())
611 .metadata_location(metadata_location_str)
612 .metadata(metadata)
613 .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
614 .build()
615 }
616
617 async fn load_table(&self, table: &TableIdent) -> Result<Table> {
630 let (table, _) = self.load_table_with_version_id(table).await?;
631 Ok(table)
632 }
633
634 async fn drop_table(&self, table: &TableIdent) -> Result<()> {
645 let db_name = validate_namespace(table.namespace())?;
646 let table_name = table.name();
647
648 let builder = self
649 .client
650 .0
651 .delete_table()
652 .database_name(&db_name)
653 .name(table_name);
654 let builder = with_catalog_id!(builder, self.config);
655
656 builder.send().await.map_err(from_aws_sdk_error)?;
657
658 Ok(())
659 }
660
661 async fn purge_table(&self, table: &TableIdent) -> Result<()> {
662 let table_info = self.load_table(table).await?;
663 self.drop_table(table).await?;
664 iceberg::drop_table_data(
665 table_info.file_io(),
666 table_info.metadata(),
667 table_info.metadata_location(),
668 )
669 .await
670 }
671
672 async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
680 let db_name = validate_namespace(table.namespace())?;
681 let table_name = table.name();
682
683 let builder = self
684 .client
685 .0
686 .get_table()
687 .database_name(&db_name)
688 .name(table_name);
689 let builder = with_catalog_id!(builder, self.config);
690
691 let resp = builder.send().await;
692
693 match resp {
694 Ok(_) => Ok(true),
695 Err(err) => {
696 if err
697 .as_service_error()
698 .map(|e| e.is_entity_not_found_exception())
699 == Some(true)
700 {
701 return Ok(false);
702 }
703 Err(from_aws_sdk_error(err))
704 }
705 }
706 }
707
708 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
715 let src_db_name = validate_namespace(src.namespace())?;
716 let dest_db_name = validate_namespace(dest.namespace())?;
717
718 let src_table_name = src.name();
719 let dest_table_name = dest.name();
720
721 let builder = self
722 .client
723 .0
724 .get_table()
725 .database_name(&src_db_name)
726 .name(src_table_name);
727 let builder = with_catalog_id!(builder, self.config);
728
729 let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
730
731 match glue_table_output.table() {
732 None => Err(Error::new(
733 ErrorKind::TableNotFound,
734 format!(
735 "'Table' object for database: {src_db_name} and table: {src_table_name} does not exist"
736 ),
737 )),
738 Some(table) => {
739 let rename_table_input = TableInput::builder()
740 .name(dest_table_name)
741 .set_parameters(table.parameters.clone())
742 .set_storage_descriptor(table.storage_descriptor.clone())
743 .set_table_type(table.table_type.clone())
744 .set_description(table.description.clone())
745 .build()
746 .map_err(from_aws_build_error)?;
747
748 let builder = self
749 .client
750 .0
751 .create_table()
752 .database_name(&dest_db_name)
753 .table_input(rename_table_input);
754 let builder = with_catalog_id!(builder, self.config);
755
756 builder.send().await.map_err(from_aws_sdk_error)?;
757
758 let drop_src_table_result = self.drop_table(src).await;
759
760 match drop_src_table_result {
761 Ok(_) => Ok(()),
762 Err(_) => {
763 let err_msg_src_table =
764 format!("Failed to drop old table {src_db_name}.{src_table_name}.");
765
766 let drop_dest_table_result = self.drop_table(dest).await;
767
768 match drop_dest_table_result {
769 Ok(_) => Err(Error::new(
770 ErrorKind::Unexpected,
771 format!(
772 "{err_msg_src_table} Rolled back table creation for {dest_db_name}.{dest_table_name}."
773 ),
774 )),
775 Err(_) => Err(Error::new(
776 ErrorKind::Unexpected,
777 format!(
778 "{err_msg_src_table} Failed to roll back table creation for {dest_db_name}.{dest_table_name}. Please clean up manually."
779 ),
780 )),
781 }
782 }
783 }
784 }
785 }
786 }
787
788 async fn register_table(
800 &self,
801 table_ident: &TableIdent,
802 metadata_location: String,
803 ) -> Result<Table> {
804 let db_name = validate_namespace(table_ident.namespace())?;
805 let table_name = table_ident.name();
806 let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
807
808 let table_input = convert_to_glue_table(
809 table_name,
810 metadata_location.clone(),
811 &metadata,
812 metadata.properties(),
813 None,
814 )?;
815
816 let builder = self
817 .client
818 .0
819 .create_table()
820 .database_name(&db_name)
821 .table_input(table_input);
822 let builder = with_catalog_id!(builder, self.config);
823
824 builder.send().await.map_err(|e| {
825 let error = e.into_service_error();
826 match error {
827 CreateTableError::EntityNotFoundException(_) => Error::new(
828 ErrorKind::NamespaceNotFound,
829 format!("Database {db_name} does not exist"),
830 ),
831 CreateTableError::AlreadyExistsException(_) => Error::new(
832 ErrorKind::TableAlreadyExists,
833 format!("Table {table_ident} already exists"),
834 ),
835 _ => Error::new(
836 ErrorKind::Unexpected,
837 format!("Failed to register table {table_ident} due to AWS SDK error"),
838 ),
839 }
840 .with_source(anyhow!("aws sdk error: {error:?}"))
841 })?;
842
843 Ok(Table::builder()
844 .identifier(table_ident.clone())
845 .metadata_location(metadata_location)
846 .metadata(metadata)
847 .file_io(self.file_io())
848 .build()?)
849 }
850
851 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
852 let table_ident = commit.identifier().clone();
853 let table_namespace = validate_namespace(table_ident.namespace())?;
854
855 let (current_table, current_version_id) =
856 self.load_table_with_version_id(&table_ident).await?;
857 let current_metadata_location = current_table.metadata_location_result()?.to_string();
858
859 let staged_table = commit.apply(current_table)?;
860 let staged_metadata_location_str = staged_table.metadata_location_result()?;
861 let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
862
863 staged_table
865 .metadata()
866 .write_to(staged_table.file_io(), &staged_metadata_location)
867 .await?;
868
869 let mut builder = self
871 .client
872 .0
873 .update_table()
874 .database_name(table_namespace)
875 .set_skip_archive(Some(true)) .table_input(convert_to_glue_table(
877 table_ident.name(),
878 staged_metadata_location.to_string(),
879 staged_table.metadata(),
880 staged_table.metadata().properties(),
881 Some(current_metadata_location),
882 )?);
883
884 if let Some(version_id) = current_version_id {
886 builder = builder.version_id(version_id);
887 }
888
889 let builder = with_catalog_id!(builder, self.config);
890 let _ = builder.send().await.map_err(|e| {
891 let error = e.into_service_error();
892 match error {
893 UpdateTableError::EntityNotFoundException(_) => Error::new(
894 ErrorKind::TableNotFound,
895 format!("Table {table_ident} is not found"),
896 ),
897 UpdateTableError::ConcurrentModificationException(_) => Error::new(
898 ErrorKind::CatalogCommitConflicts,
899 format!("Commit failed for table: {table_ident}"),
900 )
901 .with_retryable(true),
902 _ => Error::new(
903 ErrorKind::Unexpected,
904 format!("Operation failed for table: {table_ident} for hitting aws sdk error"),
905 ),
906 }
907 .with_source(anyhow!("aws sdk error: {error:?}"))
908 })?;
909
910 Ok(staged_table)
911 }
912}