1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use anyhow::anyhow;
24use async_trait::async_trait;
25use aws_sdk_glue::operation::create_table::CreateTableError;
26use aws_sdk_glue::operation::update_table::UpdateTableError;
27use aws_sdk_glue::types::TableInput;
28use iceberg::io::{
29 FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
30 S3_SESSION_TOKEN, StorageFactory,
31};
32use iceberg::spec::{TableMetadata, TableMetadataBuilder};
33use iceberg::table::Table;
34use iceberg::{
35 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
36 Runtime, TableCommit, TableCreation, TableIdent,
37};
38use iceberg_storage_opendal::OpenDalStorageFactory;
39
40use crate::error::{from_aws_build_error, from_aws_sdk_error};
41use crate::utils::{
42 convert_to_database, convert_to_glue_table, convert_to_namespace, create_sdk_config,
43 get_default_table_location, get_metadata_location, is_iceberg_table, validate_namespace,
44};
45use crate::{
46 AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, with_catalog_id,
47};
48
49pub const GLUE_CATALOG_PROP_URI: &str = "uri";
51pub const GLUE_CATALOG_PROP_CATALOG_ID: &str = "catalog_id";
53pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
55
56#[derive(Debug)]
58pub struct GlueCatalogBuilder {
59 config: GlueCatalogConfig,
60 storage_factory: Option<Arc<dyn StorageFactory>>,
61 runtime: Option<Runtime>,
62}
63
64impl Default for GlueCatalogBuilder {
65 fn default() -> Self {
66 Self {
67 config: GlueCatalogConfig {
68 name: None,
69 uri: None,
70 catalog_id: None,
71 warehouse: "".to_string(),
72 props: HashMap::new(),
73 },
74 storage_factory: None,
75 runtime: None,
76 }
77 }
78}
79
80impl CatalogBuilder for GlueCatalogBuilder {
81 type C = GlueCatalog;
82
83 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
84 self.storage_factory = Some(storage_factory);
85 self
86 }
87
88 fn with_runtime(mut self, runtime: Runtime) -> Self {
89 self.runtime = Some(runtime);
90 self
91 }
92
93 fn load(
94 mut self,
95 name: impl Into<String>,
96 props: HashMap<String, String>,
97 ) -> impl Future<Output = Result<Self::C>> + Send {
98 self.config.name = Some(name.into());
99
100 if props.contains_key(GLUE_CATALOG_PROP_URI) {
101 self.config.uri = props.get(GLUE_CATALOG_PROP_URI).cloned()
102 }
103
104 if props.contains_key(GLUE_CATALOG_PROP_CATALOG_ID) {
105 self.config.catalog_id = props.get(GLUE_CATALOG_PROP_CATALOG_ID).cloned()
106 }
107
108 if props.contains_key(GLUE_CATALOG_PROP_WAREHOUSE) {
109 self.config.warehouse = props
110 .get(GLUE_CATALOG_PROP_WAREHOUSE)
111 .cloned()
112 .unwrap_or_default();
113 }
114
115 self.config.props = props
117 .into_iter()
118 .filter(|(k, _)| {
119 k != GLUE_CATALOG_PROP_URI
120 && k != GLUE_CATALOG_PROP_CATALOG_ID
121 && k != GLUE_CATALOG_PROP_WAREHOUSE
122 })
123 .collect();
124
125 async move {
126 if self.config.name.is_none() {
127 return Err(Error::new(
128 ErrorKind::DataInvalid,
129 "Catalog name is required",
130 ));
131 }
132 if self.config.warehouse.is_empty() {
133 return Err(Error::new(
134 ErrorKind::DataInvalid,
135 "Catalog warehouse is required",
136 ));
137 }
138
139 let runtime = match self.runtime {
140 Some(rt) => rt,
141 None => Runtime::try_current()?,
142 };
143 GlueCatalog::new(self.config, self.storage_factory, runtime).await
144 }
145 }
146}
147
148#[derive(Debug)]
149pub(crate) struct GlueCatalogConfig {
151 name: Option<String>,
152 uri: Option<String>,
153 catalog_id: Option<String>,
154 warehouse: String,
155 props: HashMap<String, String>,
156}
157
158struct GlueClient(aws_sdk_glue::Client);
159
160pub struct GlueCatalog {
162 config: GlueCatalogConfig,
163 client: GlueClient,
164 file_io: FileIO,
165 runtime: Runtime,
166}
167
168impl Debug for GlueCatalog {
169 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170 f.debug_struct("GlueCatalog")
171 .field("config", &self.config)
172 .finish_non_exhaustive()
173 }
174}
175
176impl GlueCatalog {
177 async fn new(
179 config: GlueCatalogConfig,
180 storage_factory: Option<Arc<dyn StorageFactory>>,
181 runtime: Runtime,
182 ) -> Result<Self> {
183 let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await;
184 let mut file_io_props = config.props.clone();
185 if !file_io_props.contains_key(S3_ACCESS_KEY_ID)
186 && let Some(access_key_id) = file_io_props.get(AWS_ACCESS_KEY_ID)
187 {
188 file_io_props.insert(S3_ACCESS_KEY_ID.to_string(), access_key_id.to_string());
189 }
190 if !file_io_props.contains_key(S3_SECRET_ACCESS_KEY)
191 && let Some(secret_access_key) = file_io_props.get(AWS_SECRET_ACCESS_KEY)
192 {
193 file_io_props.insert(
194 S3_SECRET_ACCESS_KEY.to_string(),
195 secret_access_key.to_string(),
196 );
197 }
198 if !file_io_props.contains_key(S3_REGION)
199 && let Some(region) = file_io_props.get(AWS_REGION_NAME)
200 {
201 file_io_props.insert(S3_REGION.to_string(), region.to_string());
202 }
203 if !file_io_props.contains_key(S3_SESSION_TOKEN)
204 && let Some(session_token) = file_io_props.get(AWS_SESSION_TOKEN)
205 {
206 file_io_props.insert(S3_SESSION_TOKEN.to_string(), session_token.to_string());
207 }
208 if !file_io_props.contains_key(S3_ENDPOINT)
209 && let Some(aws_endpoint) = config.uri.as_ref()
210 {
211 file_io_props.insert(S3_ENDPOINT.to_string(), aws_endpoint.to_string());
212 }
213
214 let client = aws_sdk_glue::Client::new(&sdk_config);
215
216 let factory = storage_factory.unwrap_or_else(|| {
218 Arc::new(OpenDalStorageFactory::S3 {
219 customized_credential_load: None,
220 })
221 });
222 let file_io = FileIOBuilder::new(factory)
223 .with_props(file_io_props)
224 .build();
225
226 Ok(GlueCatalog {
227 config,
228 client: GlueClient(client),
229 file_io,
230 runtime,
231 })
232 }
233 pub fn file_io(&self) -> FileIO {
235 self.file_io.clone()
236 }
237
238 async fn load_table_with_version_id(
251 &self,
252 table: &TableIdent,
253 ) -> Result<(Table, Option<String>)> {
254 let db_name = validate_namespace(table.namespace())?;
255 let table_name = table.name();
256
257 let builder = self
258 .client
259 .0
260 .get_table()
261 .database_name(&db_name)
262 .name(table_name);
263 let builder = with_catalog_id!(builder, self.config);
264
265 let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
266
267 let glue_table = glue_table_output.table().ok_or_else(|| {
268 Error::new(
269 ErrorKind::TableNotFound,
270 format!(
271 "Table object for database: {db_name} and table: {table_name} does not exist"
272 ),
273 )
274 })?;
275
276 let version_id = glue_table.version_id.clone();
277 let metadata_location = get_metadata_location(&glue_table.parameters)?;
278
279 let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
280
281 let table = Table::builder()
282 .file_io(self.file_io())
283 .metadata_location(metadata_location)
284 .metadata(metadata)
285 .identifier(TableIdent::new(
286 NamespaceIdent::new(db_name),
287 table_name.to_owned(),
288 ))
289 .runtime(self.runtime.clone())
290 .build()?;
291
292 Ok((table, version_id))
293 }
294}
295
296#[async_trait]
297impl Catalog for GlueCatalog {
298 async fn list_namespaces(
303 &self,
304 parent: Option<&NamespaceIdent>,
305 ) -> Result<Vec<NamespaceIdent>> {
306 if parent.is_some() {
307 return Ok(vec![]);
308 }
309
310 let mut database_list: Vec<NamespaceIdent> = Vec::new();
311 let mut next_token: Option<String> = None;
312
313 loop {
314 let builder = match &next_token {
315 Some(token) => self.client.0.get_databases().next_token(token),
316 None => self.client.0.get_databases(),
317 };
318 let builder = with_catalog_id!(builder, self.config);
319 let resp = builder.send().await.map_err(from_aws_sdk_error)?;
320
321 let dbs: Vec<NamespaceIdent> = resp
322 .database_list()
323 .iter()
324 .map(|db| NamespaceIdent::new(db.name().to_string()))
325 .collect();
326
327 database_list.extend(dbs);
328
329 next_token = resp.next_token().map(ToOwned::to_owned);
330 if next_token.is_none() {
331 break;
332 }
333 }
334
335 Ok(database_list)
336 }
337
338 async fn create_namespace(
352 &self,
353 namespace: &NamespaceIdent,
354 properties: HashMap<String, String>,
355 ) -> Result<Namespace> {
356 if self.namespace_exists(namespace).await? {
357 return Err(Error::new(
358 ErrorKind::NamespaceAlreadyExists,
359 format!("Namespace {namespace:?} already exists"),
360 ));
361 }
362
363 let db_input = convert_to_database(namespace, &properties)?;
364
365 let builder = self.client.0.create_database().database_input(db_input);
366 let builder = with_catalog_id!(builder, self.config);
367
368 builder.send().await.map_err(from_aws_sdk_error)?;
369
370 Ok(Namespace::with_properties(namespace.clone(), properties))
371 }
372
373 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
384 let db_name = validate_namespace(namespace)?;
385
386 let builder = self.client.0.get_database().name(&db_name);
387 let builder = with_catalog_id!(builder, self.config);
388
389 let resp = builder.send().await.map_err(|err| {
390 if err
391 .as_service_error()
392 .map(|e| e.is_entity_not_found_exception())
393 == Some(true)
394 {
395 return Error::new(
396 ErrorKind::NamespaceNotFound,
397 format!("Namespace {namespace:?} does not exist"),
398 );
399 }
400 from_aws_sdk_error(err)
401 })?;
402
403 match resp.database() {
404 Some(db) => {
405 let namespace = convert_to_namespace(db);
406 Ok(namespace)
407 }
408 None => Err(Error::new(
409 ErrorKind::NamespaceNotFound,
410 format!("Database with name: {db_name} does not exist"),
411 )),
412 }
413 }
414
415 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
428 let db_name = validate_namespace(namespace)?;
429
430 let builder = self.client.0.get_database().name(&db_name);
431 let builder = with_catalog_id!(builder, self.config);
432
433 let resp = builder.send().await;
434
435 match resp {
436 Ok(_) => Ok(true),
437 Err(err) => {
438 if err
439 .as_service_error()
440 .map(|e| e.is_entity_not_found_exception())
441 == Some(true)
442 {
443 return Ok(false);
444 }
445 Err(from_aws_sdk_error(err))
446 }
447 }
448 }
449
450 async fn update_namespace(
461 &self,
462 namespace: &NamespaceIdent,
463 properties: HashMap<String, String>,
464 ) -> Result<()> {
465 if !self.namespace_exists(namespace).await? {
466 return Err(Error::new(
467 ErrorKind::NamespaceNotFound,
468 format!("Namespace {namespace:?} does not exist"),
469 ));
470 }
471
472 let db_name = validate_namespace(namespace)?;
473 let db_input = convert_to_database(namespace, &properties)?;
474
475 let builder = self
476 .client
477 .0
478 .update_database()
479 .name(&db_name)
480 .database_input(db_input);
481 let builder = with_catalog_id!(builder, self.config);
482
483 builder.send().await.map_err(from_aws_sdk_error)?;
484
485 Ok(())
486 }
487
488 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
499 if !self.namespace_exists(namespace).await? {
500 return Err(Error::new(
501 ErrorKind::NamespaceNotFound,
502 format!("Namespace {namespace:?} does not exist"),
503 ));
504 }
505
506 let db_name = validate_namespace(namespace)?;
507
508 let builder = self
513 .client
514 .0
515 .get_tables()
516 .database_name(&db_name)
517 .max_results(1);
518 let builder = with_catalog_id!(builder, self.config);
519 let resp = builder.send().await.map_err(from_aws_sdk_error)?;
520
521 if !resp.table_list().is_empty() {
522 return Err(Error::new(
523 ErrorKind::DataInvalid,
524 format!("Database with name: {} is not empty", &db_name),
525 ));
526 }
527
528 let builder = self.client.0.delete_database().name(db_name);
529 let builder = with_catalog_id!(builder, self.config);
530
531 builder.send().await.map_err(from_aws_sdk_error)?;
532
533 Ok(())
534 }
535
536 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
549 let db_name = validate_namespace(namespace)?;
550
551 let mut table_list: Vec<TableIdent> = Vec::new();
552 let mut next_token: Option<String> = None;
553
554 loop {
555 let builder = match &next_token {
556 Some(token) => self
557 .client
558 .0
559 .get_tables()
560 .database_name(&db_name)
561 .next_token(token),
562 None => self.client.0.get_tables().database_name(&db_name),
563 };
564 let builder = with_catalog_id!(builder, self.config);
565 let resp = builder.send().await.map_err(from_aws_sdk_error)?;
566
567 let tables: Vec<_> = resp
568 .table_list()
569 .iter()
570 .filter(|tbl| is_iceberg_table(&tbl.parameters))
571 .map(|tbl| TableIdent::new(namespace.clone(), tbl.name().to_string()))
572 .collect();
573
574 table_list.extend(tables);
575
576 next_token = resp.next_token().map(ToOwned::to_owned);
577 if next_token.is_none() {
578 break;
579 }
580 }
581
582 Ok(table_list)
583 }
584
585 async fn create_table(
598 &self,
599 namespace: &NamespaceIdent,
600 mut creation: TableCreation,
601 ) -> Result<Table> {
602 let db_name = validate_namespace(namespace)?;
603 let table_name = creation.name.clone();
604
605 let location = match &creation.location {
606 Some(location) => location.clone(),
607 None => {
608 let ns = self.get_namespace(namespace).await?;
609 let location =
610 get_default_table_location(&ns, &db_name, &table_name, &self.config.warehouse);
611 creation.location = Some(location.clone());
612 location
613 }
614 };
615 let metadata = TableMetadataBuilder::from_table_creation(creation)?
616 .build()?
617 .metadata;
618 let metadata_location = MetadataLocation::new_with_metadata(location.clone(), &metadata);
619
620 metadata.write_to(&self.file_io, &metadata_location).await?;
621
622 let metadata_location_str = metadata_location.to_string();
623 let glue_table = convert_to_glue_table(
624 &table_name,
625 metadata_location_str.clone(),
626 &metadata,
627 metadata.properties(),
628 None,
629 )?;
630
631 let builder = self
632 .client
633 .0
634 .create_table()
635 .database_name(&db_name)
636 .table_input(glue_table);
637 let builder = with_catalog_id!(builder, self.config);
638
639 builder.send().await.map_err(from_aws_sdk_error)?;
640
641 Table::builder()
642 .file_io(self.file_io())
643 .metadata_location(metadata_location_str)
644 .metadata(metadata)
645 .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
646 .runtime(self.runtime.clone())
647 .build()
648 }
649
650 async fn load_table(&self, table: &TableIdent) -> Result<Table> {
663 let (table, _) = self.load_table_with_version_id(table).await?;
664 Ok(table)
665 }
666
667 async fn drop_table(&self, table: &TableIdent) -> Result<()> {
678 let db_name = validate_namespace(table.namespace())?;
679 let table_name = table.name();
680
681 let builder = self
682 .client
683 .0
684 .delete_table()
685 .database_name(&db_name)
686 .name(table_name);
687 let builder = with_catalog_id!(builder, self.config);
688
689 builder.send().await.map_err(from_aws_sdk_error)?;
690
691 Ok(())
692 }
693
694 async fn purge_table(&self, table: &TableIdent) -> Result<()> {
695 let table_info = self.load_table(table).await?;
696 self.drop_table(table).await?;
697 iceberg::drop_table_data(&table_info).await
698 }
699
700 async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
708 let db_name = validate_namespace(table.namespace())?;
709 let table_name = table.name();
710
711 let builder = self
712 .client
713 .0
714 .get_table()
715 .database_name(&db_name)
716 .name(table_name);
717 let builder = with_catalog_id!(builder, self.config);
718
719 let resp = builder.send().await;
720
721 match resp {
722 Ok(_) => Ok(true),
723 Err(err) => {
724 if err
725 .as_service_error()
726 .map(|e| e.is_entity_not_found_exception())
727 == Some(true)
728 {
729 return Ok(false);
730 }
731 Err(from_aws_sdk_error(err))
732 }
733 }
734 }
735
736 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
743 let src_db_name = validate_namespace(src.namespace())?;
744 let dest_db_name = validate_namespace(dest.namespace())?;
745
746 let src_table_name = src.name();
747 let dest_table_name = dest.name();
748
749 let builder = self
750 .client
751 .0
752 .get_table()
753 .database_name(&src_db_name)
754 .name(src_table_name);
755 let builder = with_catalog_id!(builder, self.config);
756
757 let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
758
759 match glue_table_output.table() {
760 None => Err(Error::new(
761 ErrorKind::TableNotFound,
762 format!(
763 "'Table' object for database: {src_db_name} and table: {src_table_name} does not exist"
764 ),
765 )),
766 Some(table) => {
767 let rename_table_input = TableInput::builder()
768 .name(dest_table_name)
769 .set_parameters(table.parameters.clone())
770 .set_storage_descriptor(table.storage_descriptor.clone())
771 .set_table_type(table.table_type.clone())
772 .set_description(table.description.clone())
773 .build()
774 .map_err(from_aws_build_error)?;
775
776 let builder = self
777 .client
778 .0
779 .create_table()
780 .database_name(&dest_db_name)
781 .table_input(rename_table_input);
782 let builder = with_catalog_id!(builder, self.config);
783
784 builder.send().await.map_err(from_aws_sdk_error)?;
785
786 let drop_src_table_result = self.drop_table(src).await;
787
788 match drop_src_table_result {
789 Ok(_) => Ok(()),
790 Err(_) => {
791 let err_msg_src_table =
792 format!("Failed to drop old table {src_db_name}.{src_table_name}.");
793
794 let drop_dest_table_result = self.drop_table(dest).await;
795
796 match drop_dest_table_result {
797 Ok(_) => Err(Error::new(
798 ErrorKind::Unexpected,
799 format!(
800 "{err_msg_src_table} Rolled back table creation for {dest_db_name}.{dest_table_name}."
801 ),
802 )),
803 Err(_) => Err(Error::new(
804 ErrorKind::Unexpected,
805 format!(
806 "{err_msg_src_table} Failed to roll back table creation for {dest_db_name}.{dest_table_name}. Please clean up manually."
807 ),
808 )),
809 }
810 }
811 }
812 }
813 }
814 }
815
816 async fn register_table(
828 &self,
829 table_ident: &TableIdent,
830 metadata_location: String,
831 ) -> Result<Table> {
832 let db_name = validate_namespace(table_ident.namespace())?;
833 let table_name = table_ident.name();
834 let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
835
836 let table_input = convert_to_glue_table(
837 table_name,
838 metadata_location.clone(),
839 &metadata,
840 metadata.properties(),
841 None,
842 )?;
843
844 let builder = self
845 .client
846 .0
847 .create_table()
848 .database_name(&db_name)
849 .table_input(table_input);
850 let builder = with_catalog_id!(builder, self.config);
851
852 builder.send().await.map_err(|e| {
853 let error = e.into_service_error();
854 match error {
855 CreateTableError::EntityNotFoundException(_) => Error::new(
856 ErrorKind::NamespaceNotFound,
857 format!("Database {db_name} does not exist"),
858 ),
859 CreateTableError::AlreadyExistsException(_) => Error::new(
860 ErrorKind::TableAlreadyExists,
861 format!("Table {table_ident} already exists"),
862 ),
863 _ => Error::new(
864 ErrorKind::Unexpected,
865 format!("Failed to register table {table_ident} due to AWS SDK error"),
866 ),
867 }
868 .with_source(anyhow!("aws sdk error: {error:?}"))
869 })?;
870
871 Ok(Table::builder()
872 .identifier(table_ident.clone())
873 .metadata_location(metadata_location)
874 .metadata(metadata)
875 .file_io(self.file_io())
876 .runtime(self.runtime.clone())
877 .build()?)
878 }
879
880 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
881 let table_ident = commit.identifier().clone();
882 let table_namespace = validate_namespace(table_ident.namespace())?;
883
884 let (current_table, current_version_id) =
885 self.load_table_with_version_id(&table_ident).await?;
886 let current_metadata_location = current_table.metadata_location_result()?.to_string();
887
888 let staged_table = commit.apply(current_table)?;
889 let staged_metadata_location_str = staged_table.metadata_location_result()?;
890 let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
891
892 staged_table
894 .metadata()
895 .write_to(staged_table.file_io(), &staged_metadata_location)
896 .await?;
897
898 let mut builder = self
900 .client
901 .0
902 .update_table()
903 .database_name(table_namespace)
904 .set_skip_archive(Some(true)) .table_input(convert_to_glue_table(
906 table_ident.name(),
907 staged_metadata_location.to_string(),
908 staged_table.metadata(),
909 staged_table.metadata().properties(),
910 Some(current_metadata_location),
911 )?);
912
913 if let Some(version_id) = current_version_id {
915 builder = builder.version_id(version_id);
916 }
917
918 let builder = with_catalog_id!(builder, self.config);
919 let _ = builder.send().await.map_err(|e| {
920 let error = e.into_service_error();
921 match error {
922 UpdateTableError::EntityNotFoundException(_) => Error::new(
923 ErrorKind::TableNotFound,
924 format!("Table {table_ident} is not found"),
925 ),
926 UpdateTableError::ConcurrentModificationException(_) => Error::new(
927 ErrorKind::CatalogCommitConflicts,
928 format!("Commit failed for table: {table_ident}"),
929 )
930 .with_retryable(true),
931 _ => Error::new(
932 ErrorKind::Unexpected,
933 format!("Operation failed for table: {table_ident} for hitting aws sdk error"),
934 ),
935 }
936 .with_source(anyhow!("aws sdk error: {error:?}"))
937 })?;
938
939 Ok(staged_table)
940 }
941}