1use std::collections::HashMap;
19
20use aws_config::{BehaviorVersion, Region, SdkConfig};
21use aws_sdk_glue::config::Credentials;
22use aws_sdk_glue::types::{Database, DatabaseInput, StorageDescriptor, TableInput};
23use iceberg::spec::TableMetadata;
24use iceberg::{Error, ErrorKind, Namespace, NamespaceIdent, Result};
25
26use crate::error::from_aws_build_error;
27use crate::schema::GlueSchemaBuilder;
28
29pub const AWS_PROFILE_NAME: &str = "profile_name";
31pub const AWS_REGION_NAME: &str = "region_name";
33pub const AWS_ACCESS_KEY_ID: &str = "aws_access_key_id";
35pub const AWS_SECRET_ACCESS_KEY: &str = "aws_secret_access_key";
37pub const AWS_SESSION_TOKEN: &str = "aws_session_token";
39const DESCRIPTION: &str = "description";
41const LOCATION: &str = "location_uri";
43const METADATA_LOCATION: &str = "metadata_location";
45const PREV_METADATA_LOCATION: &str = "previous_metadata_location";
47const EXTERNAL_TABLE: &str = "EXTERNAL_TABLE";
49const TABLE_TYPE: &str = "table_type";
51const ICEBERG: &str = "ICEBERG";
53
54pub(crate) async fn create_sdk_config(
57 properties: &HashMap<String, String>,
58 endpoint_uri: Option<&String>,
59) -> SdkConfig {
60 let mut config = aws_config::defaults(BehaviorVersion::latest());
61
62 if let Some(endpoint) = endpoint_uri {
63 config = config.endpoint_url(endpoint)
64 };
65
66 if properties.is_empty() {
67 return config.load().await;
68 }
69
70 if let (Some(access_key), Some(secret_key)) = (
71 properties.get(AWS_ACCESS_KEY_ID),
72 properties.get(AWS_SECRET_ACCESS_KEY),
73 ) {
74 let session_token = properties.get(AWS_SESSION_TOKEN).cloned();
75 let credentials_provider =
76 Credentials::new(access_key, secret_key, session_token, None, "properties");
77
78 config = config.credentials_provider(credentials_provider)
79 };
80
81 if let Some(profile_name) = properties.get(AWS_PROFILE_NAME) {
82 config = config.profile_name(profile_name);
83 }
84
85 if let Some(region_name) = properties.get(AWS_REGION_NAME) {
86 let region = Region::new(region_name.clone());
87 config = config.region(region);
88 }
89
90 config.load().await
91}
92
93pub(crate) fn convert_to_database(
95 namespace: &NamespaceIdent,
96 properties: &HashMap<String, String>,
97) -> Result<DatabaseInput> {
98 let db_name = validate_namespace(namespace)?;
99 let mut builder = DatabaseInput::builder().name(db_name);
100
101 for (k, v) in properties.iter() {
102 match k.as_ref() {
103 DESCRIPTION => {
104 builder = builder.description(v);
105 }
106 LOCATION => {
107 builder = builder.location_uri(v);
108 }
109 _ => {
110 builder = builder.parameters(k, v);
111 }
112 }
113 }
114
115 builder.build().map_err(from_aws_build_error)
116}
117
118pub(crate) fn convert_to_namespace(database: &Database) -> Namespace {
120 let db_name = database.name().to_string();
121 let mut properties = database
122 .parameters()
123 .map_or_else(HashMap::new, |p| p.clone());
124
125 if let Some(location_uri) = database.location_uri() {
126 properties.insert(LOCATION.to_string(), location_uri.to_string());
127 };
128
129 if let Some(description) = database.description() {
130 properties.insert(DESCRIPTION.to_string(), description.to_string());
131 }
132
133 Namespace::with_properties(NamespaceIdent::new(db_name), properties)
134}
135
136pub(crate) fn convert_to_glue_table(
143 table_name: impl Into<String>,
144 metadata_location: String,
145 metadata: &TableMetadata,
146 properties: &HashMap<String, String>,
147 prev_metadata_location: Option<String>,
148) -> Result<TableInput> {
149 let glue_schema = GlueSchemaBuilder::from_iceberg(metadata)?.build();
150
151 let storage_descriptor = StorageDescriptor::builder()
152 .set_columns(Some(glue_schema))
153 .location(metadata.location().to_string())
154 .build();
155
156 let mut parameters = HashMap::from([
157 (TABLE_TYPE.to_string(), ICEBERG.to_string()),
158 (METADATA_LOCATION.to_string(), metadata_location),
159 ]);
160
161 if let Some(prev) = prev_metadata_location {
162 parameters.insert(PREV_METADATA_LOCATION.to_string(), prev);
163 }
164
165 let mut table_input_builder = TableInput::builder()
166 .name(table_name)
167 .set_parameters(Some(parameters))
168 .storage_descriptor(storage_descriptor)
169 .table_type(EXTERNAL_TABLE);
170
171 if let Some(description) = properties.get(DESCRIPTION) {
172 table_input_builder = table_input_builder.description(description);
173 }
174
175 let table_input = table_input_builder.build().map_err(from_aws_build_error)?;
176
177 Ok(table_input)
178}
179
180pub(crate) fn validate_namespace(namespace: &NamespaceIdent) -> Result<String> {
182 let name = namespace.as_ref();
183
184 if name.len() != 1 {
185 return Err(Error::new(
186 ErrorKind::DataInvalid,
187 format!(
188 "Invalid database name: {namespace:?}, hierarchical namespaces are not supported"
189 ),
190 ));
191 }
192
193 let name = name[0].clone();
194
195 if name.is_empty() {
196 return Err(Error::new(
197 ErrorKind::DataInvalid,
198 "Invalid database, provided namespace is empty.",
199 ));
200 }
201
202 Ok(name)
203}
204
205pub(crate) fn get_default_table_location(
207 namespace: &Namespace,
208 db_name: impl AsRef<str>,
209 table_name: impl AsRef<str>,
210 warehouse: impl AsRef<str>,
211) -> String {
212 let properties = namespace.properties();
213
214 match properties.get(LOCATION) {
215 Some(location) => format!("{}/{}", location, table_name.as_ref()),
216 None => {
217 let warehouse_location = warehouse.as_ref().trim_end_matches('/');
218
219 format!(
220 "{}/{}.db/{}",
221 warehouse_location,
222 db_name.as_ref(),
223 table_name.as_ref()
224 )
225 }
226 }
227}
228
229pub(crate) fn get_metadata_location(
231 parameters: &Option<HashMap<String, String>>,
232) -> Result<String> {
233 match parameters {
234 Some(properties) => match properties.get(METADATA_LOCATION) {
235 Some(location) => Ok(location.to_string()),
236 None => Err(Error::new(
237 ErrorKind::DataInvalid,
238 format!("No '{METADATA_LOCATION}' set on table"),
239 )),
240 },
241 None => Err(Error::new(
242 ErrorKind::DataInvalid,
243 "No 'parameters' set on table. Location of metadata is undefined",
244 )),
245 }
246}
247
248#[macro_export]
249macro_rules! with_catalog_id {
251 ($builder:expr, $config:expr) => {{
252 if let Some(catalog_id) = &$config.catalog_id {
253 $builder.catalog_id(catalog_id)
254 } else {
255 $builder
256 }
257 }};
258}
259
260#[cfg(test)]
261mod tests {
262 use aws_sdk_glue::config::ProvideCredentials;
263 use aws_sdk_glue::types::Column;
264 use iceberg::spec::{NestedField, PrimitiveType, Schema, TableMetadataBuilder, Type};
265 use iceberg::{MetadataLocation, Namespace, Result, TableCreation};
266
267 use super::*;
268 use crate::schema::{ICEBERG_FIELD_CURRENT, ICEBERG_FIELD_ID, ICEBERG_FIELD_OPTIONAL};
269
270 fn create_metadata(schema: Schema) -> Result<TableMetadata> {
271 let table_creation = TableCreation::builder()
272 .name("my_table".to_string())
273 .location("my_location".to_string())
274 .schema(schema)
275 .build();
276 let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
277 .build()?
278 .metadata;
279
280 Ok(metadata)
281 }
282
283 #[test]
284 fn test_get_metadata_location() -> Result<()> {
285 let params_valid = Some(HashMap::from([(
286 METADATA_LOCATION.to_string(),
287 "my_location".to_string(),
288 )]));
289 let params_missing_key = Some(HashMap::from([(
290 "not_here".to_string(),
291 "my_location".to_string(),
292 )]));
293
294 let result_valid = get_metadata_location(¶ms_valid)?;
295 let result_missing_key = get_metadata_location(¶ms_missing_key);
296 let result_no_params = get_metadata_location(&None);
297
298 assert_eq!(result_valid, "my_location");
299 assert!(result_missing_key.is_err());
300 assert!(result_no_params.is_err());
301
302 Ok(())
303 }
304
305 #[test]
306 fn test_convert_to_glue_table() -> Result<()> {
307 let table_name = "my_table".to_string();
308 let location = "s3a://warehouse/hive".to_string();
309 let metadata_location = MetadataLocation::new_with_table_location(location).to_string();
310 let properties = HashMap::new();
311 let schema = Schema::builder()
312 .with_schema_id(1)
313 .with_fields(vec![
314 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
315 ])
316 .build()?;
317
318 let metadata = create_metadata(schema)?;
319
320 let parameters = HashMap::from([
321 (ICEBERG_FIELD_ID.to_string(), "1".to_string()),
322 (ICEBERG_FIELD_OPTIONAL.to_string(), "false".to_string()),
323 (ICEBERG_FIELD_CURRENT.to_string(), "true".to_string()),
324 ]);
325
326 let column = Column::builder()
327 .name("foo")
328 .r#type("int")
329 .set_parameters(Some(parameters))
330 .set_comment(None)
331 .build()
332 .map_err(from_aws_build_error)?;
333
334 let storage_descriptor = StorageDescriptor::builder()
335 .set_columns(Some(vec![column]))
336 .location(metadata.location())
337 .build();
338
339 let result =
340 convert_to_glue_table(&table_name, metadata_location, &metadata, &properties, None)?;
341
342 assert_eq!(result.name(), &table_name);
343 assert_eq!(result.description(), None);
344 assert_eq!(result.storage_descriptor, Some(storage_descriptor));
345
346 Ok(())
347 }
348
349 #[test]
350 fn test_get_default_table_location() -> Result<()> {
351 let properties = HashMap::from([(LOCATION.to_string(), "db_location".to_string())]);
352
353 let namespace =
354 Namespace::with_properties(NamespaceIdent::new("default".into()), properties);
355 let db_name = validate_namespace(namespace.name())?;
356 let table_name = "my_table";
357
358 let expected = "db_location/my_table";
359 let result =
360 get_default_table_location(&namespace, db_name, table_name, "warehouse_location");
361
362 assert_eq!(expected, result);
363
364 Ok(())
365 }
366
367 #[test]
368 fn test_get_default_table_location_warehouse() -> Result<()> {
369 let namespace = Namespace::new(NamespaceIdent::new("default".into()));
370 let db_name = validate_namespace(namespace.name())?;
371 let table_name = "my_table";
372
373 let expected = "warehouse_location/default.db/my_table";
374 let result =
375 get_default_table_location(&namespace, db_name, table_name, "warehouse_location");
376
377 assert_eq!(expected, result);
378
379 Ok(())
380 }
381
382 #[test]
383 fn test_convert_to_namespace() -> Result<()> {
384 let db = Database::builder()
385 .name("my_db")
386 .location_uri("my_location")
387 .description("my_description")
388 .build()
389 .map_err(from_aws_build_error)?;
390
391 let properties = HashMap::from([
392 (DESCRIPTION.to_string(), "my_description".to_string()),
393 (LOCATION.to_string(), "my_location".to_string()),
394 ]);
395
396 let expected =
397 Namespace::with_properties(NamespaceIdent::new("my_db".to_string()), properties);
398 let result = convert_to_namespace(&db);
399
400 assert_eq!(result, expected);
401
402 Ok(())
403 }
404
405 #[test]
406 fn test_convert_to_database() -> Result<()> {
407 let namespace = NamespaceIdent::new("my_database".to_string());
408 let properties = HashMap::from([(LOCATION.to_string(), "my_location".to_string())]);
409
410 let result = convert_to_database(&namespace, &properties)?;
411
412 assert_eq!("my_database", result.name());
413 assert_eq!(Some("my_location".to_string()), result.location_uri);
414
415 Ok(())
416 }
417
418 #[test]
419 fn test_validate_namespace() {
420 let valid_ns = Namespace::new(NamespaceIdent::new("ns".to_string()));
421 let empty_ns = Namespace::new(NamespaceIdent::new("".to_string()));
422 let hierarchical_ns = Namespace::new(
423 NamespaceIdent::from_vec(vec!["level1".to_string(), "level2".to_string()]).unwrap(),
424 );
425
426 let valid = validate_namespace(valid_ns.name());
427 let empty = validate_namespace(empty_ns.name());
428 let hierarchical = validate_namespace(hierarchical_ns.name());
429
430 assert!(valid.is_ok());
431 assert!(empty.is_err());
432 assert!(hierarchical.is_err());
433 }
434
435 #[tokio::test]
436 async fn test_config_with_custom_endpoint() {
437 let properties = HashMap::new();
438 let endpoint_url = "http://custom_url:5000";
439
440 let sdk_config = create_sdk_config(&properties, Some(&endpoint_url.to_string())).await;
441
442 let result = sdk_config.endpoint_url().unwrap();
443
444 assert_eq!(result, endpoint_url);
445 }
446
447 #[tokio::test]
448 async fn test_config_with_properties() {
449 let properties = HashMap::from([
450 (AWS_PROFILE_NAME.to_string(), "my_profile".to_string()),
451 (AWS_REGION_NAME.to_string(), "us-east-1".to_string()),
452 (AWS_ACCESS_KEY_ID.to_string(), "my-access-id".to_string()),
453 (
454 AWS_SECRET_ACCESS_KEY.to_string(),
455 "my-secret-key".to_string(),
456 ),
457 (AWS_SESSION_TOKEN.to_string(), "my-token".to_string()),
458 ]);
459
460 let sdk_config = create_sdk_config(&properties, None).await;
461
462 let region = sdk_config.region().unwrap().as_ref();
463 let credentials = sdk_config
464 .credentials_provider()
465 .unwrap()
466 .provide_credentials()
467 .await
468 .unwrap();
469
470 assert_eq!("us-east-1", region);
471 assert_eq!("my-access-id", credentials.access_key_id());
472 assert_eq!("my-secret-key", credentials.secret_access_key());
473 assert_eq!("my-token", credentials.session_token().unwrap());
474 }
475}