1use std::collections::HashMap;
19use std::fmt::Debug;
20
21use anyhow::anyhow;
22use async_trait::async_trait;
23use aws_sdk_glue::operation::create_table::CreateTableError;
24use aws_sdk_glue::operation::update_table::UpdateTableError;
25use aws_sdk_glue::types::TableInput;
26use iceberg::io::{
27 FileIO, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN,
28};
29use iceberg::spec::{TableMetadata, TableMetadataBuilder};
30use iceberg::table::Table;
31use iceberg::{
32 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
33 TableCommit, TableCreation, TableIdent,
34};
35
36use crate::error::{from_aws_build_error, from_aws_sdk_error};
37use crate::utils::{
38 convert_to_database, convert_to_glue_table, convert_to_namespace, create_sdk_config,
39 get_default_table_location, get_metadata_location, validate_namespace,
40};
41use crate::{
42 AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, with_catalog_id,
43};
44
45pub const GLUE_CATALOG_PROP_URI: &str = "uri";
47pub const GLUE_CATALOG_PROP_CATALOG_ID: &str = "catalog_id";
49pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
51
52#[derive(Debug)]
54pub struct GlueCatalogBuilder(GlueCatalogConfig);
55
56impl Default for GlueCatalogBuilder {
57 fn default() -> Self {
58 Self(GlueCatalogConfig {
59 name: None,
60 uri: None,
61 catalog_id: None,
62 warehouse: "".to_string(),
63 props: HashMap::new(),
64 })
65 }
66}
67
68impl CatalogBuilder for GlueCatalogBuilder {
69 type C = GlueCatalog;
70
71 fn load(
72 mut self,
73 name: impl Into<String>,
74 props: HashMap<String, String>,
75 ) -> impl Future<Output = Result<Self::C>> + Send {
76 self.0.name = Some(name.into());
77
78 if props.contains_key(GLUE_CATALOG_PROP_URI) {
79 self.0.uri = props.get(GLUE_CATALOG_PROP_URI).cloned()
80 }
81
82 if props.contains_key(GLUE_CATALOG_PROP_CATALOG_ID) {
83 self.0.catalog_id = props.get(GLUE_CATALOG_PROP_CATALOG_ID).cloned()
84 }
85
86 if props.contains_key(GLUE_CATALOG_PROP_WAREHOUSE) {
87 self.0.warehouse = props
88 .get(GLUE_CATALOG_PROP_WAREHOUSE)
89 .cloned()
90 .unwrap_or_default();
91 }
92
93 self.0.props = props
95 .into_iter()
96 .filter(|(k, _)| {
97 k != GLUE_CATALOG_PROP_URI
98 && k != GLUE_CATALOG_PROP_CATALOG_ID
99 && k != GLUE_CATALOG_PROP_WAREHOUSE
100 })
101 .collect();
102
103 async move {
104 if self.0.name.is_none() {
105 return Err(Error::new(
106 ErrorKind::DataInvalid,
107 "Catalog name is required",
108 ));
109 }
110 if self.0.warehouse.is_empty() {
111 return Err(Error::new(
112 ErrorKind::DataInvalid,
113 "Catalog warehouse is required",
114 ));
115 }
116
117 GlueCatalog::new(self.0).await
118 }
119 }
120}
121
122#[derive(Debug)]
123pub(crate) struct GlueCatalogConfig {
125 name: Option<String>,
126 uri: Option<String>,
127 catalog_id: Option<String>,
128 warehouse: String,
129 props: HashMap<String, String>,
130}
131
132struct GlueClient(aws_sdk_glue::Client);
133
134pub struct GlueCatalog {
136 config: GlueCatalogConfig,
137 client: GlueClient,
138 file_io: FileIO,
139}
140
141impl Debug for GlueCatalog {
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 f.debug_struct("GlueCatalog")
144 .field("config", &self.config)
145 .finish_non_exhaustive()
146 }
147}
148
149impl GlueCatalog {
150 async fn new(config: GlueCatalogConfig) -> Result<Self> {
152 let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await;
153 let mut file_io_props = config.props.clone();
154 if !file_io_props.contains_key(S3_ACCESS_KEY_ID) {
155 if let Some(access_key_id) = file_io_props.get(AWS_ACCESS_KEY_ID) {
156 file_io_props.insert(S3_ACCESS_KEY_ID.to_string(), access_key_id.to_string());
157 }
158 }
159 if !file_io_props.contains_key(S3_SECRET_ACCESS_KEY) {
160 if let Some(secret_access_key) = file_io_props.get(AWS_SECRET_ACCESS_KEY) {
161 file_io_props.insert(
162 S3_SECRET_ACCESS_KEY.to_string(),
163 secret_access_key.to_string(),
164 );
165 }
166 }
167 if !file_io_props.contains_key(S3_REGION) {
168 if let Some(region) = file_io_props.get(AWS_REGION_NAME) {
169 file_io_props.insert(S3_REGION.to_string(), region.to_string());
170 }
171 }
172 if !file_io_props.contains_key(S3_SESSION_TOKEN) {
173 if let Some(session_token) = file_io_props.get(AWS_SESSION_TOKEN) {
174 file_io_props.insert(S3_SESSION_TOKEN.to_string(), session_token.to_string());
175 }
176 }
177 if !file_io_props.contains_key(S3_ENDPOINT) {
178 if let Some(aws_endpoint) = config.uri.as_ref() {
179 file_io_props.insert(S3_ENDPOINT.to_string(), aws_endpoint.to_string());
180 }
181 }
182
183 let client = aws_sdk_glue::Client::new(&sdk_config);
184
185 let file_io = FileIO::from_path(&config.warehouse)?
186 .with_props(file_io_props)
187 .build()?;
188
189 Ok(GlueCatalog {
190 config,
191 client: GlueClient(client),
192 file_io,
193 })
194 }
195 pub fn file_io(&self) -> FileIO {
197 self.file_io.clone()
198 }
199
200 async fn load_table_with_version_id(
213 &self,
214 table: &TableIdent,
215 ) -> Result<(Table, Option<String>)> {
216 let db_name = validate_namespace(table.namespace())?;
217 let table_name = table.name();
218
219 let builder = self
220 .client
221 .0
222 .get_table()
223 .database_name(&db_name)
224 .name(table_name);
225 let builder = with_catalog_id!(builder, self.config);
226
227 let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
228
229 let glue_table = glue_table_output.table().ok_or_else(|| {
230 Error::new(
231 ErrorKind::TableNotFound,
232 format!(
233 "Table object for database: {db_name} and table: {table_name} does not exist"
234 ),
235 )
236 })?;
237
238 let version_id = glue_table.version_id.clone();
239 let metadata_location = get_metadata_location(&glue_table.parameters)?;
240
241 let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
242
243 let table = Table::builder()
244 .file_io(self.file_io())
245 .metadata_location(metadata_location)
246 .metadata(metadata)
247 .identifier(TableIdent::new(
248 NamespaceIdent::new(db_name),
249 table_name.to_owned(),
250 ))
251 .build()?;
252
253 Ok((table, version_id))
254 }
255}
256
257#[async_trait]
258impl Catalog for GlueCatalog {
259 async fn list_namespaces(
264 &self,
265 parent: Option<&NamespaceIdent>,
266 ) -> Result<Vec<NamespaceIdent>> {
267 if parent.is_some() {
268 return Ok(vec![]);
269 }
270
271 let mut database_list: Vec<NamespaceIdent> = Vec::new();
272 let mut next_token: Option<String> = None;
273
274 loop {
275 let builder = match &next_token {
276 Some(token) => self.client.0.get_databases().next_token(token),
277 None => self.client.0.get_databases(),
278 };
279 let builder = with_catalog_id!(builder, self.config);
280 let resp = builder.send().await.map_err(from_aws_sdk_error)?;
281
282 let dbs: Vec<NamespaceIdent> = resp
283 .database_list()
284 .iter()
285 .map(|db| NamespaceIdent::new(db.name().to_string()))
286 .collect();
287
288 database_list.extend(dbs);
289
290 next_token = resp.next_token().map(ToOwned::to_owned);
291 if next_token.is_none() {
292 break;
293 }
294 }
295
296 Ok(database_list)
297 }
298
299 async fn create_namespace(
313 &self,
314 namespace: &NamespaceIdent,
315 properties: HashMap<String, String>,
316 ) -> Result<Namespace> {
317 let db_input = convert_to_database(namespace, &properties)?;
318
319 let builder = self.client.0.create_database().database_input(db_input);
320 let builder = with_catalog_id!(builder, self.config);
321
322 builder.send().await.map_err(from_aws_sdk_error)?;
323
324 Ok(Namespace::with_properties(namespace.clone(), properties))
325 }
326
327 async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
338 let db_name = validate_namespace(namespace)?;
339
340 let builder = self.client.0.get_database().name(&db_name);
341 let builder = with_catalog_id!(builder, self.config);
342
343 let resp = builder.send().await.map_err(from_aws_sdk_error)?;
344
345 match resp.database() {
346 Some(db) => {
347 let namespace = convert_to_namespace(db);
348 Ok(namespace)
349 }
350 None => Err(Error::new(
351 ErrorKind::DataInvalid,
352 format!("Database with name: {db_name} does not exist"),
353 )),
354 }
355 }
356
357 async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
370 let db_name = validate_namespace(namespace)?;
371
372 let builder = self.client.0.get_database().name(&db_name);
373 let builder = with_catalog_id!(builder, self.config);
374
375 let resp = builder.send().await;
376
377 match resp {
378 Ok(_) => Ok(true),
379 Err(err) => {
380 if err
381 .as_service_error()
382 .map(|e| e.is_entity_not_found_exception())
383 == Some(true)
384 {
385 return Ok(false);
386 }
387 Err(from_aws_sdk_error(err))
388 }
389 }
390 }
391
392 async fn update_namespace(
403 &self,
404 namespace: &NamespaceIdent,
405 properties: HashMap<String, String>,
406 ) -> Result<()> {
407 let db_name = validate_namespace(namespace)?;
408 let db_input = convert_to_database(namespace, &properties)?;
409
410 let builder = self
411 .client
412 .0
413 .update_database()
414 .name(&db_name)
415 .database_input(db_input);
416 let builder = with_catalog_id!(builder, self.config);
417
418 builder.send().await.map_err(from_aws_sdk_error)?;
419
420 Ok(())
421 }
422
423 async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
434 let db_name = validate_namespace(namespace)?;
435 let table_list = self.list_tables(namespace).await?;
436
437 if !table_list.is_empty() {
438 return Err(Error::new(
439 ErrorKind::DataInvalid,
440 format!("Database with name: {} is not empty", &db_name),
441 ));
442 }
443
444 let builder = self.client.0.delete_database().name(db_name);
445 let builder = with_catalog_id!(builder, self.config);
446
447 builder.send().await.map_err(from_aws_sdk_error)?;
448
449 Ok(())
450 }
451
452 async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
461 let db_name = validate_namespace(namespace)?;
462
463 let mut table_list: Vec<TableIdent> = Vec::new();
464 let mut next_token: Option<String> = None;
465
466 loop {
467 let builder = match &next_token {
468 Some(token) => self
469 .client
470 .0
471 .get_tables()
472 .database_name(&db_name)
473 .next_token(token),
474 None => self.client.0.get_tables().database_name(&db_name),
475 };
476 let builder = with_catalog_id!(builder, self.config);
477 let resp = builder.send().await.map_err(from_aws_sdk_error)?;
478
479 let tables: Vec<_> = resp
480 .table_list()
481 .iter()
482 .map(|tbl| TableIdent::new(namespace.clone(), tbl.name().to_string()))
483 .collect();
484
485 table_list.extend(tables);
486
487 next_token = resp.next_token().map(ToOwned::to_owned);
488 if next_token.is_none() {
489 break;
490 }
491 }
492
493 Ok(table_list)
494 }
495
496 async fn create_table(
509 &self,
510 namespace: &NamespaceIdent,
511 mut creation: TableCreation,
512 ) -> Result<Table> {
513 let db_name = validate_namespace(namespace)?;
514 let table_name = creation.name.clone();
515
516 let location = match &creation.location {
517 Some(location) => location.clone(),
518 None => {
519 let ns = self.get_namespace(namespace).await?;
520 let location =
521 get_default_table_location(&ns, &db_name, &table_name, &self.config.warehouse);
522 creation.location = Some(location.clone());
523 location
524 }
525 };
526 let metadata = TableMetadataBuilder::from_table_creation(creation)?
527 .build()?
528 .metadata;
529 let metadata_location =
530 MetadataLocation::new_with_table_location(location.clone()).to_string();
531
532 metadata.write_to(&self.file_io, &metadata_location).await?;
533
534 let glue_table = convert_to_glue_table(
535 &table_name,
536 metadata_location.clone(),
537 &metadata,
538 metadata.properties(),
539 None,
540 )?;
541
542 let builder = self
543 .client
544 .0
545 .create_table()
546 .database_name(&db_name)
547 .table_input(glue_table);
548 let builder = with_catalog_id!(builder, self.config);
549
550 builder.send().await.map_err(from_aws_sdk_error)?;
551
552 Table::builder()
553 .file_io(self.file_io())
554 .metadata_location(metadata_location)
555 .metadata(metadata)
556 .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
557 .build()
558 }
559
560 async fn load_table(&self, table: &TableIdent) -> Result<Table> {
573 let (table, _) = self.load_table_with_version_id(table).await?;
574 Ok(table)
575 }
576
577 async fn drop_table(&self, table: &TableIdent) -> Result<()> {
588 let db_name = validate_namespace(table.namespace())?;
589 let table_name = table.name();
590
591 let builder = self
592 .client
593 .0
594 .delete_table()
595 .database_name(&db_name)
596 .name(table_name);
597 let builder = with_catalog_id!(builder, self.config);
598
599 builder.send().await.map_err(from_aws_sdk_error)?;
600
601 Ok(())
602 }
603
604 async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
612 let db_name = validate_namespace(table.namespace())?;
613 let table_name = table.name();
614
615 let builder = self
616 .client
617 .0
618 .get_table()
619 .database_name(&db_name)
620 .name(table_name);
621 let builder = with_catalog_id!(builder, self.config);
622
623 let resp = builder.send().await;
624
625 match resp {
626 Ok(_) => Ok(true),
627 Err(err) => {
628 if err
629 .as_service_error()
630 .map(|e| e.is_entity_not_found_exception())
631 == Some(true)
632 {
633 return Ok(false);
634 }
635 Err(from_aws_sdk_error(err))
636 }
637 }
638 }
639
640 async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
647 let src_db_name = validate_namespace(src.namespace())?;
648 let dest_db_name = validate_namespace(dest.namespace())?;
649
650 let src_table_name = src.name();
651 let dest_table_name = dest.name();
652
653 let builder = self
654 .client
655 .0
656 .get_table()
657 .database_name(&src_db_name)
658 .name(src_table_name);
659 let builder = with_catalog_id!(builder, self.config);
660
661 let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
662
663 match glue_table_output.table() {
664 None => Err(Error::new(
665 ErrorKind::TableNotFound,
666 format!(
667 "'Table' object for database: {src_db_name} and table: {src_table_name} does not exist"
668 ),
669 )),
670 Some(table) => {
671 let rename_table_input = TableInput::builder()
672 .name(dest_table_name)
673 .set_parameters(table.parameters.clone())
674 .set_storage_descriptor(table.storage_descriptor.clone())
675 .set_table_type(table.table_type.clone())
676 .set_description(table.description.clone())
677 .build()
678 .map_err(from_aws_build_error)?;
679
680 let builder = self
681 .client
682 .0
683 .create_table()
684 .database_name(&dest_db_name)
685 .table_input(rename_table_input);
686 let builder = with_catalog_id!(builder, self.config);
687
688 builder.send().await.map_err(from_aws_sdk_error)?;
689
690 let drop_src_table_result = self.drop_table(src).await;
691
692 match drop_src_table_result {
693 Ok(_) => Ok(()),
694 Err(_) => {
695 let err_msg_src_table =
696 format!("Failed to drop old table {src_db_name}.{src_table_name}.");
697
698 let drop_dest_table_result = self.drop_table(dest).await;
699
700 match drop_dest_table_result {
701 Ok(_) => Err(Error::new(
702 ErrorKind::Unexpected,
703 format!(
704 "{err_msg_src_table} Rolled back table creation for {dest_db_name}.{dest_table_name}."
705 ),
706 )),
707 Err(_) => Err(Error::new(
708 ErrorKind::Unexpected,
709 format!(
710 "{err_msg_src_table} Failed to roll back table creation for {dest_db_name}.{dest_table_name}. Please clean up manually."
711 ),
712 )),
713 }
714 }
715 }
716 }
717 }
718 }
719
720 async fn register_table(
732 &self,
733 table_ident: &TableIdent,
734 metadata_location: String,
735 ) -> Result<Table> {
736 let db_name = validate_namespace(table_ident.namespace())?;
737 let table_name = table_ident.name();
738 let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
739
740 let table_input = convert_to_glue_table(
741 table_name,
742 metadata_location.clone(),
743 &metadata,
744 metadata.properties(),
745 None,
746 )?;
747
748 let builder = self
749 .client
750 .0
751 .create_table()
752 .database_name(&db_name)
753 .table_input(table_input);
754 let builder = with_catalog_id!(builder, self.config);
755
756 builder.send().await.map_err(|e| {
757 let error = e.into_service_error();
758 match error {
759 CreateTableError::EntityNotFoundException(_) => Error::new(
760 ErrorKind::NamespaceNotFound,
761 format!("Database {db_name} does not exist"),
762 ),
763 CreateTableError::AlreadyExistsException(_) => Error::new(
764 ErrorKind::TableAlreadyExists,
765 format!("Table {table_ident} already exists"),
766 ),
767 _ => Error::new(
768 ErrorKind::Unexpected,
769 format!("Failed to register table {table_ident} due to AWS SDK error"),
770 ),
771 }
772 .with_source(anyhow!("aws sdk error: {error:?}"))
773 })?;
774
775 Ok(Table::builder()
776 .identifier(table_ident.clone())
777 .metadata_location(metadata_location)
778 .metadata(metadata)
779 .file_io(self.file_io())
780 .build()?)
781 }
782
783 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
784 let table_ident = commit.identifier().clone();
785 let table_namespace = validate_namespace(table_ident.namespace())?;
786
787 let (current_table, current_version_id) =
788 self.load_table_with_version_id(&table_ident).await?;
789 let current_metadata_location = current_table.metadata_location_result()?.to_string();
790
791 let staged_table = commit.apply(current_table)?;
792 let staged_metadata_location = staged_table.metadata_location_result()?;
793
794 staged_table
796 .metadata()
797 .write_to(staged_table.file_io(), staged_metadata_location)
798 .await?;
799
800 let mut builder = self
802 .client
803 .0
804 .update_table()
805 .database_name(table_namespace)
806 .set_skip_archive(Some(true)) .table_input(convert_to_glue_table(
808 table_ident.name(),
809 staged_metadata_location.to_string(),
810 staged_table.metadata(),
811 staged_table.metadata().properties(),
812 Some(current_metadata_location),
813 )?);
814
815 if let Some(version_id) = current_version_id {
817 builder = builder.version_id(version_id);
818 }
819
820 let builder = with_catalog_id!(builder, self.config);
821 let _ = builder.send().await.map_err(|e| {
822 let error = e.into_service_error();
823 match error {
824 UpdateTableError::EntityNotFoundException(_) => Error::new(
825 ErrorKind::TableNotFound,
826 format!("Table {table_ident} is not found"),
827 ),
828 UpdateTableError::ConcurrentModificationException(_) => Error::new(
829 ErrorKind::CatalogCommitConflicts,
830 format!("Commit failed for table: {table_ident}"),
831 )
832 .with_retryable(true),
833 _ => Error::new(
834 ErrorKind::Unexpected,
835 format!("Operation failed for table: {table_ident} for hitting aws sdk error"),
836 ),
837 }
838 .with_source(anyhow!("aws sdk error: {error:?}"))
839 })?;
840
841 Ok(staged_table)
842 }
843}