1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::str::FromStr;
21use std::sync::Arc;
22
23use anyhow::anyhow;
24use async_trait::async_trait;
25use aws_sdk_glue::operation::create_table::CreateTableError;
26use aws_sdk_glue::operation::update_table::UpdateTableError;
27use aws_sdk_glue::types::TableInput;
28use iceberg::io::{
29 FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
30 S3_SESSION_TOKEN, StorageFactory,
31};
32use iceberg::spec::{TableMetadata, TableMetadataBuilder};
33use iceberg::table::Table;
34use iceberg::{
35 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
36 Runtime, TableCommit, TableCreation, TableIdent,
37};
38use iceberg_storage_opendal::OpenDalStorageFactory;
39
40use crate::error::{from_aws_build_error, from_aws_sdk_error};
41use crate::utils::{
42 convert_to_database, convert_to_glue_table, convert_to_namespace, create_sdk_config,
43 get_default_table_location, get_metadata_location, validate_namespace,
44};
45use crate::{
46 AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, with_catalog_id,
47};
48
49pub const GLUE_CATALOG_PROP_URI: &str = "uri";
51pub const GLUE_CATALOG_PROP_CATALOG_ID: &str = "catalog_id";
53pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
55
56#[derive(Debug)]
58pub struct GlueCatalogBuilder {
59 config: GlueCatalogConfig,
60 storage_factory: Option<Arc<dyn StorageFactory>>,
61 runtime: Option<Runtime>,
62}
63
64impl Default for GlueCatalogBuilder {
65 fn default() -> Self {
66 Self {
67 config: GlueCatalogConfig {
68 name: None,
69 uri: None,
70 catalog_id: None,
71 warehouse: "".to_string(),
72 props: HashMap::new(),
73 },
74 storage_factory: None,
75 runtime: None,
76 }
77 }
78}
79
80impl CatalogBuilder for GlueCatalogBuilder {
81 type C = GlueCatalog;
82
83 fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
84 self.storage_factory = Some(storage_factory);
85 self
86 }
87
88 fn with_runtime(mut self, runtime: Runtime) -> Self {
89 self.runtime = Some(runtime);
90 self
91 }
92
93 fn load(
94 mut self,
95 name: impl Into<String>,
96 props: HashMap<String, String>,
97 ) -> impl Future<Output = Result<Self::C>> + Send {
98 self.config.name = Some(name.into());
99
100 if props.contains_key(GLUE_CATALOG_PROP_URI) {
101 self.config.uri = props.get(GLUE_CATALOG_PROP_URI).cloned()
102 }
103
104 if props.contains_key(GLUE_CATALOG_PROP_CATALOG_ID) {
105 self.config.catalog_id = props.get(GLUE_CATALOG_PROP_CATALOG_ID).cloned()
106 }
107
108 if props.contains_key(GLUE_CATALOG_PROP_WAREHOUSE) {
109 self.config.warehouse = props
110 .get(GLUE_CATALOG_PROP_WAREHOUSE)
111 .cloned()
112 .unwrap_or_default();
113 }
114
115 self.config.props = props
117 .into_iter()
118 .filter(|(k, _)| {
119 k != GLUE_CATALOG_PROP_URI
120 && k != GLUE_CATALOG_PROP_CATALOG_ID
121 && k != GLUE_CATALOG_PROP_WAREHOUSE
122 })
123 .collect();
124
125 async move {
126 if self.config.name.is_none() {
127 return Err(Error::new(
128 ErrorKind::DataInvalid,
129 "Catalog name is required",
130 ));
131 }
132 if self.config.warehouse.is_empty() {
133 return Err(Error::new(
134 ErrorKind::DataInvalid,
135 "Catalog warehouse is required",
136 ));
137 }
138
139 let runtime = match self.runtime {
140 Some(rt) => rt,
141 None => Runtime::try_current()?,
142 };
143 GlueCatalog::new(self.config, self.storage_factory, runtime).await
144 }
145 }
146}
147
148#[derive(Debug)]
149pub(crate) struct GlueCatalogConfig {
151 name: Option<String>,
152 uri: Option<String>,
153 catalog_id: Option<String>,
154 warehouse: String,
155 props: HashMap<String, String>,
156}
157
158struct GlueClient(aws_sdk_glue::Client);
159
160pub struct GlueCatalog {
162 config: GlueCatalogConfig,
163 client: GlueClient,
164 file_io: FileIO,
165 runtime: Runtime,
166}
167
168impl Debug for GlueCatalog {
169 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170 f.debug_struct("GlueCatalog")
171 .field("config", &self.config)
172 .finish_non_exhaustive()
173 }
174}
175
176impl GlueCatalog {
177 async fn new(
179 config: GlueCatalogConfig,
180 storage_factory: Option<Arc<dyn StorageFactory>>,
181 runtime: Runtime,
182 ) -> Result<Self> {
183 let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await;
184 let mut file_io_props = config.props.clone();
185 if !file_io_props.contains_key(S3_ACCESS_KEY_ID)
186 && let Some(access_key_id) = file_io_props.get(AWS_ACCESS_KEY_ID)
187 {
188 file_io_props.insert(S3_ACCESS_KEY_ID.to_string(), access_key_id.to_string());
189 }
190 if !file_io_props.contains_key(S3_SECRET_ACCESS_KEY)
191 && let Some(secret_access_key) = file_io_props.get(AWS_SECRET_ACCESS_KEY)
192 {
193 file_io_props.insert(
194 S3_SECRET_ACCESS_KEY.to_string(),
195 secret_access_key.to_string(),
196 );
197 }
198 if !file_io_props.contains_key(S3_REGION)
199 && let Some(region) = file_io_props.get(AWS_REGION_NAME)
200 {
201 file_io_props.insert(S3_REGION.to_string(), region.to_string());
202 }
203 if !file_io_props.contains_key(S3_SESSION_TOKEN)
204 && let Some(session_token) = file_io_props.get(AWS_SESSION_TOKEN)
205 {
206 file_io_props.insert(S3_SESSION_TOKEN.to_string(), session_token.to_string());
207 }
208 if !file_io_props.contains_key(S3_ENDPOINT)
209 && let Some(aws_endpoint) = config.uri.as_ref()
210 {
211 file_io_props.insert(S3_ENDPOINT.to_string(), aws_endpoint.to_string());
212 }
213
214 let client = aws_sdk_glue::Client::new(&sdk_config);
215
216 let factory = storage_factory.unwrap_or_else(|| {
218 Arc::new(OpenDalStorageFactory::S3 {
219 customized_credential_load: None,
220 })
221 });
222 let file_io = FileIOBuilder::new(factory)
223 .with_props(file_io_props)
224 .build();
225
226 Ok(GlueCatalog {
227 config,
228 client: GlueClient(client),
229 file_io,
230 runtime,
231 })
232 }
233 pub fn file_io(&self) -> FileIO {
235 self.file_io.clone()
236 }
237
238 async fn load_table_with_version_id(
251 &self,
252 table: &TableIdent,
253 ) -> Result<(Table, Option<String>)> {
254 let db_name = validate_namespace(table.namespace())?;
255 let table_name = table.name();
256
257 let builder = self
258 .client
259 .0
260 .get_table()
261 .database_name(&db_name)
262 .name(table_name);
263 let builder = with_catalog_id!(builder, self.config);
264
265 let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
266
267 let glue_table = glue_table_output.table().ok_or_else(|| {
268 Error::new(
269 ErrorKind::TableNotFound,
270 format!(
271 "Table object for database: {db_name} and table: {table_name} does not exist"
272 ),
273 )
274 })?;
275
276 let version_id = glue_table.version_id.clone();
277 let metadata_location = get_metadata_location(&glue_table.parameters)?;
278
279 let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
280
281 let table = Table::builder()
282 .file_io(self.file_io())
283 .metadata_location(metadata_location)
284 .metadata(metadata)
285 .identifier(TableIdent::new(
286 NamespaceIdent::new(db_name),
287 table_name.to_owned(),
288 ))
289 .runtime(self.runtime.clone())
290 .build()?;
291
292 Ok((table, version_id))
293 }
294}
295
296#[async_trait]
297impl Catalog for GlueCatalog {
298 async fn list_namespaces(
303 &self,
304 parent: Option<&NamespaceIdent>,
305 ) -> Result<Vec<NamespaceIdent>> {
306 if parent.is_some() {
307 return Ok(vec![]);
308 }
309
310 let mut database_list: Vec<NamespaceIdent> = Vec::new();
311 let mut next_token: Option<String> = None;
312
313 loop {
314 let builder = match &next_token {
315 Some(token) => self.client.0.get_databases().next_token(token),
316 None => self.client.0.get_databases(),
317 };
318 let builder = with_catalog_id!(builder, self.config);
319 let resp = builder.send().await.map_err(from_aws_sdk_error)?;
320
321 let dbs: Vec<NamespaceIdent> = resp
322 .database_list()
323 .iter()
324 .map(|db| NamespaceIdent::new(db.name().to_string()))
325 .collect();
326
327 database_list.extend(dbs);
328
329 next_token = resp.next_token().map(ToOwned::to_owned);
330 if next_token.is_none() {
331 break;
332 }
333 }
334
335 Ok(database_list)
336 }
337
338 async fn create_namespace(
352 &self,
353 namespace: &NamespaceIdent,
354 properties: HashMap<String, String>,
355 ) -> Result<Namespace> {
356 if self.namespace_exists(namespace).await? {
357 return Err(Error::new(
358 ErrorKind::NamespaceAlreadyExists,
359 format!("Namespace {namespace:?} already exists"),
360 ));
361 }
362
363 let db_input = convert_to_database(namespace, &properties)?;
364
365 let builder = self.client.0.create_database().database_input(db_input);
366 let builder = with_catalog_id!(builder, self.config);
367
368 builder.send().await.map_err(from_aws_sdk_error)?;
369
370 Ok(Namespace::with_properties(namespace.clone(), properties))
371 }
372
373 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
384 let db_name = validate_namespace(namespace)?;
385
386 let builder = self.client.0.get_database().name(&db_name);
387 let builder = with_catalog_id!(builder, self.config);
388
389 let resp = builder.send().await.map_err(|err| {
390 if err
391 .as_service_error()
392 .map(|e| e.is_entity_not_found_exception())
393 == Some(true)
394 {
395 return Error::new(
396 ErrorKind::NamespaceNotFound,
397 format!("Namespace {namespace:?} does not exist"),
398 );
399 }
400 from_aws_sdk_error(err)
401 })?;
402
403 match resp.database() {
404 Some(db) => {
405 let namespace = convert_to_namespace(db);
406 Ok(namespace)
407 }
408 None => Err(Error::new(
409 ErrorKind::NamespaceNotFound,
410 format!("Database with name: {db_name} does not exist"),
411 )),
412 }
413 }
414
415 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
428 let db_name = validate_namespace(namespace)?;
429
430 let builder = self.client.0.get_database().name(&db_name);
431 let builder = with_catalog_id!(builder, self.config);
432
433 let resp = builder.send().await;
434
435 match resp {
436 Ok(_) => Ok(true),
437 Err(err) => {
438 if err
439 .as_service_error()
440 .map(|e| e.is_entity_not_found_exception())
441 == Some(true)
442 {
443 return Ok(false);
444 }
445 Err(from_aws_sdk_error(err))
446 }
447 }
448 }
449
450 async fn update_namespace(
461 &self,
462 namespace: &NamespaceIdent,
463 properties: HashMap<String, String>,
464 ) -> Result<()> {
465 if !self.namespace_exists(namespace).await? {
466 return Err(Error::new(
467 ErrorKind::NamespaceNotFound,
468 format!("Namespace {namespace:?} does not exist"),
469 ));
470 }
471
472 let db_name = validate_namespace(namespace)?;
473 let db_input = convert_to_database(namespace, &properties)?;
474
475 let builder = self
476 .client
477 .0
478 .update_database()
479 .name(&db_name)
480 .database_input(db_input);
481 let builder = with_catalog_id!(builder, self.config);
482
483 builder.send().await.map_err(from_aws_sdk_error)?;
484
485 Ok(())
486 }
487
488 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
499 if !self.namespace_exists(namespace).await? {
500 return Err(Error::new(
501 ErrorKind::NamespaceNotFound,
502 format!("Namespace {namespace:?} does not exist"),
503 ));
504 }
505
506 let db_name = validate_namespace(namespace)?;
507 let table_list = self.list_tables(namespace).await?;
508
509 if !table_list.is_empty() {
510 return Err(Error::new(
511 ErrorKind::DataInvalid,
512 format!("Database with name: {} is not empty", &db_name),
513 ));
514 }
515
516 let builder = self.client.0.delete_database().name(db_name);
517 let builder = with_catalog_id!(builder, self.config);
518
519 builder.send().await.map_err(from_aws_sdk_error)?;
520
521 Ok(())
522 }
523
524 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
533 let db_name = validate_namespace(namespace)?;
534
535 let mut table_list: Vec<TableIdent> = Vec::new();
536 let mut next_token: Option<String> = None;
537
538 loop {
539 let builder = match &next_token {
540 Some(token) => self
541 .client
542 .0
543 .get_tables()
544 .database_name(&db_name)
545 .next_token(token),
546 None => self.client.0.get_tables().database_name(&db_name),
547 };
548 let builder = with_catalog_id!(builder, self.config);
549 let resp = builder.send().await.map_err(from_aws_sdk_error)?;
550
551 let tables: Vec<_> = resp
552 .table_list()
553 .iter()
554 .map(|tbl| TableIdent::new(namespace.clone(), tbl.name().to_string()))
555 .collect();
556
557 table_list.extend(tables);
558
559 next_token = resp.next_token().map(ToOwned::to_owned);
560 if next_token.is_none() {
561 break;
562 }
563 }
564
565 Ok(table_list)
566 }
567
568 async fn create_table(
581 &self,
582 namespace: &NamespaceIdent,
583 mut creation: TableCreation,
584 ) -> Result<Table> {
585 let db_name = validate_namespace(namespace)?;
586 let table_name = creation.name.clone();
587
588 let location = match &creation.location {
589 Some(location) => location.clone(),
590 None => {
591 let ns = self.get_namespace(namespace).await?;
592 let location =
593 get_default_table_location(&ns, &db_name, &table_name, &self.config.warehouse);
594 creation.location = Some(location.clone());
595 location
596 }
597 };
598 let metadata = TableMetadataBuilder::from_table_creation(creation)?
599 .build()?
600 .metadata;
601 let metadata_location = MetadataLocation::new_with_metadata(location.clone(), &metadata);
602
603 metadata.write_to(&self.file_io, &metadata_location).await?;
604
605 let metadata_location_str = metadata_location.to_string();
606 let glue_table = convert_to_glue_table(
607 &table_name,
608 metadata_location_str.clone(),
609 &metadata,
610 metadata.properties(),
611 None,
612 )?;
613
614 let builder = self
615 .client
616 .0
617 .create_table()
618 .database_name(&db_name)
619 .table_input(glue_table);
620 let builder = with_catalog_id!(builder, self.config);
621
622 builder.send().await.map_err(from_aws_sdk_error)?;
623
624 Table::builder()
625 .file_io(self.file_io())
626 .metadata_location(metadata_location_str)
627 .metadata(metadata)
628 .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
629 .runtime(self.runtime.clone())
630 .build()
631 }
632
633 async fn load_table(&self, table: &TableIdent) -> Result<Table> {
646 let (table, _) = self.load_table_with_version_id(table).await?;
647 Ok(table)
648 }
649
650 async fn drop_table(&self, table: &TableIdent) -> Result<()> {
661 let db_name = validate_namespace(table.namespace())?;
662 let table_name = table.name();
663
664 let builder = self
665 .client
666 .0
667 .delete_table()
668 .database_name(&db_name)
669 .name(table_name);
670 let builder = with_catalog_id!(builder, self.config);
671
672 builder.send().await.map_err(from_aws_sdk_error)?;
673
674 Ok(())
675 }
676
677 async fn purge_table(&self, table: &TableIdent) -> Result<()> {
678 let table_info = self.load_table(table).await?;
679 self.drop_table(table).await?;
680 iceberg::drop_table_data(
681 table_info.file_io(),
682 table_info.metadata(),
683 table_info.metadata_location(),
684 )
685 .await
686 }
687
688 async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
696 let db_name = validate_namespace(table.namespace())?;
697 let table_name = table.name();
698
699 let builder = self
700 .client
701 .0
702 .get_table()
703 .database_name(&db_name)
704 .name(table_name);
705 let builder = with_catalog_id!(builder, self.config);
706
707 let resp = builder.send().await;
708
709 match resp {
710 Ok(_) => Ok(true),
711 Err(err) => {
712 if err
713 .as_service_error()
714 .map(|e| e.is_entity_not_found_exception())
715 == Some(true)
716 {
717 return Ok(false);
718 }
719 Err(from_aws_sdk_error(err))
720 }
721 }
722 }
723
724 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
731 let src_db_name = validate_namespace(src.namespace())?;
732 let dest_db_name = validate_namespace(dest.namespace())?;
733
734 let src_table_name = src.name();
735 let dest_table_name = dest.name();
736
737 let builder = self
738 .client
739 .0
740 .get_table()
741 .database_name(&src_db_name)
742 .name(src_table_name);
743 let builder = with_catalog_id!(builder, self.config);
744
745 let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
746
747 match glue_table_output.table() {
748 None => Err(Error::new(
749 ErrorKind::TableNotFound,
750 format!(
751 "'Table' object for database: {src_db_name} and table: {src_table_name} does not exist"
752 ),
753 )),
754 Some(table) => {
755 let rename_table_input = TableInput::builder()
756 .name(dest_table_name)
757 .set_parameters(table.parameters.clone())
758 .set_storage_descriptor(table.storage_descriptor.clone())
759 .set_table_type(table.table_type.clone())
760 .set_description(table.description.clone())
761 .build()
762 .map_err(from_aws_build_error)?;
763
764 let builder = self
765 .client
766 .0
767 .create_table()
768 .database_name(&dest_db_name)
769 .table_input(rename_table_input);
770 let builder = with_catalog_id!(builder, self.config);
771
772 builder.send().await.map_err(from_aws_sdk_error)?;
773
774 let drop_src_table_result = self.drop_table(src).await;
775
776 match drop_src_table_result {
777 Ok(_) => Ok(()),
778 Err(_) => {
779 let err_msg_src_table =
780 format!("Failed to drop old table {src_db_name}.{src_table_name}.");
781
782 let drop_dest_table_result = self.drop_table(dest).await;
783
784 match drop_dest_table_result {
785 Ok(_) => Err(Error::new(
786 ErrorKind::Unexpected,
787 format!(
788 "{err_msg_src_table} Rolled back table creation for {dest_db_name}.{dest_table_name}."
789 ),
790 )),
791 Err(_) => Err(Error::new(
792 ErrorKind::Unexpected,
793 format!(
794 "{err_msg_src_table} Failed to roll back table creation for {dest_db_name}.{dest_table_name}. Please clean up manually."
795 ),
796 )),
797 }
798 }
799 }
800 }
801 }
802 }
803
804 async fn register_table(
816 &self,
817 table_ident: &TableIdent,
818 metadata_location: String,
819 ) -> Result<Table> {
820 let db_name = validate_namespace(table_ident.namespace())?;
821 let table_name = table_ident.name();
822 let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
823
824 let table_input = convert_to_glue_table(
825 table_name,
826 metadata_location.clone(),
827 &metadata,
828 metadata.properties(),
829 None,
830 )?;
831
832 let builder = self
833 .client
834 .0
835 .create_table()
836 .database_name(&db_name)
837 .table_input(table_input);
838 let builder = with_catalog_id!(builder, self.config);
839
840 builder.send().await.map_err(|e| {
841 let error = e.into_service_error();
842 match error {
843 CreateTableError::EntityNotFoundException(_) => Error::new(
844 ErrorKind::NamespaceNotFound,
845 format!("Database {db_name} does not exist"),
846 ),
847 CreateTableError::AlreadyExistsException(_) => Error::new(
848 ErrorKind::TableAlreadyExists,
849 format!("Table {table_ident} already exists"),
850 ),
851 _ => Error::new(
852 ErrorKind::Unexpected,
853 format!("Failed to register table {table_ident} due to AWS SDK error"),
854 ),
855 }
856 .with_source(anyhow!("aws sdk error: {error:?}"))
857 })?;
858
859 Ok(Table::builder()
860 .identifier(table_ident.clone())
861 .metadata_location(metadata_location)
862 .metadata(metadata)
863 .file_io(self.file_io())
864 .runtime(self.runtime.clone())
865 .build()?)
866 }
867
868 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
869 let table_ident = commit.identifier().clone();
870 let table_namespace = validate_namespace(table_ident.namespace())?;
871
872 let (current_table, current_version_id) =
873 self.load_table_with_version_id(&table_ident).await?;
874 let current_metadata_location = current_table.metadata_location_result()?.to_string();
875
876 let staged_table = commit.apply(current_table)?;
877 let staged_metadata_location_str = staged_table.metadata_location_result()?;
878 let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
879
880 staged_table
882 .metadata()
883 .write_to(staged_table.file_io(), &staged_metadata_location)
884 .await?;
885
886 let mut builder = self
888 .client
889 .0
890 .update_table()
891 .database_name(table_namespace)
892 .set_skip_archive(Some(true)) .table_input(convert_to_glue_table(
894 table_ident.name(),
895 staged_metadata_location.to_string(),
896 staged_table.metadata(),
897 staged_table.metadata().properties(),
898 Some(current_metadata_location),
899 )?);
900
901 if let Some(version_id) = current_version_id {
903 builder = builder.version_id(version_id);
904 }
905
906 let builder = with_catalog_id!(builder, self.config);
907 let _ = builder.send().await.map_err(|e| {
908 let error = e.into_service_error();
909 match error {
910 UpdateTableError::EntityNotFoundException(_) => Error::new(
911 ErrorKind::TableNotFound,
912 format!("Table {table_ident} is not found"),
913 ),
914 UpdateTableError::ConcurrentModificationException(_) => Error::new(
915 ErrorKind::CatalogCommitConflicts,
916 format!("Commit failed for table: {table_ident}"),
917 )
918 .with_retryable(true),
919 _ => Error::new(
920 ErrorKind::Unexpected,
921 format!("Operation failed for table: {table_ident} for hitting aws sdk error"),
922 ),
923 }
924 .with_source(anyhow!("aws sdk error: {error:?}"))
925 })?;
926
927 Ok(staged_table)
928 }
929}