iceberg_catalog_glue/
utils.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19
20use aws_config::{BehaviorVersion, Region, SdkConfig};
21use aws_sdk_glue::config::Credentials;
22use aws_sdk_glue::types::{Database, DatabaseInput, StorageDescriptor, TableInput};
23use iceberg::spec::TableMetadata;
24use iceberg::{Error, ErrorKind, Namespace, NamespaceIdent, Result};
25
26use crate::error::from_aws_build_error;
27use crate::schema::GlueSchemaBuilder;
28
29/// Property aws profile name
30pub const AWS_PROFILE_NAME: &str = "profile_name";
31/// Property aws region
32pub const AWS_REGION_NAME: &str = "region_name";
33/// Property aws access key
34pub const AWS_ACCESS_KEY_ID: &str = "aws_access_key_id";
35/// Property aws secret access key
36pub const AWS_SECRET_ACCESS_KEY: &str = "aws_secret_access_key";
37/// Property aws session token
38pub const AWS_SESSION_TOKEN: &str = "aws_session_token";
39/// Parameter namespace description
40const DESCRIPTION: &str = "description";
41/// Parameter namespace location uri
42const LOCATION: &str = "location_uri";
43/// Property `metadata_location` for `TableInput`
44const METADATA_LOCATION: &str = "metadata_location";
45/// Property `previous_metadata_location` for `TableInput`
46const PREV_METADATA_LOCATION: &str = "previous_metadata_location";
47/// Property external table for `TableInput`
48const EXTERNAL_TABLE: &str = "EXTERNAL_TABLE";
49/// Parameter key `table_type` for `TableInput`
50const TABLE_TYPE: &str = "table_type";
51/// Parameter value `table_type` for `TableInput`
52const ICEBERG: &str = "ICEBERG";
53
54/// Creates an aws sdk configuration based on
55/// provided properties and an optional endpoint URL.
56pub(crate) async fn create_sdk_config(
57    properties: &HashMap<String, String>,
58    endpoint_uri: Option<&String>,
59) -> SdkConfig {
60    let mut config = aws_config::defaults(BehaviorVersion::latest());
61
62    if let Some(endpoint) = endpoint_uri {
63        config = config.endpoint_url(endpoint)
64    };
65
66    if properties.is_empty() {
67        return config.load().await;
68    }
69
70    if let (Some(access_key), Some(secret_key)) = (
71        properties.get(AWS_ACCESS_KEY_ID),
72        properties.get(AWS_SECRET_ACCESS_KEY),
73    ) {
74        let session_token = properties.get(AWS_SESSION_TOKEN).cloned();
75        let credentials_provider =
76            Credentials::new(access_key, secret_key, session_token, None, "properties");
77
78        config = config.credentials_provider(credentials_provider)
79    };
80
81    if let Some(profile_name) = properties.get(AWS_PROFILE_NAME) {
82        config = config.profile_name(profile_name);
83    }
84
85    if let Some(region_name) = properties.get(AWS_REGION_NAME) {
86        let region = Region::new(region_name.clone());
87        config = config.region(region);
88    }
89
90    config.load().await
91}
92
93/// Create `DatabaseInput` from `NamespaceIdent` and properties
94pub(crate) fn convert_to_database(
95    namespace: &NamespaceIdent,
96    properties: &HashMap<String, String>,
97) -> Result<DatabaseInput> {
98    let db_name = validate_namespace(namespace)?;
99    let mut builder = DatabaseInput::builder().name(db_name);
100
101    for (k, v) in properties.iter() {
102        match k.as_ref() {
103            DESCRIPTION => {
104                builder = builder.description(v);
105            }
106            LOCATION => {
107                builder = builder.location_uri(v);
108            }
109            _ => {
110                builder = builder.parameters(k, v);
111            }
112        }
113    }
114
115    builder.build().map_err(from_aws_build_error)
116}
117
118/// Create `Namespace` from aws sdk glue `Database`
119pub(crate) fn convert_to_namespace(database: &Database) -> Namespace {
120    let db_name = database.name().to_string();
121    let mut properties = database
122        .parameters()
123        .map_or_else(HashMap::new, |p| p.clone());
124
125    if let Some(location_uri) = database.location_uri() {
126        properties.insert(LOCATION.to_string(), location_uri.to_string());
127    };
128
129    if let Some(description) = database.description() {
130        properties.insert(DESCRIPTION.to_string(), description.to_string());
131    }
132
133    Namespace::with_properties(NamespaceIdent::new(db_name), properties)
134}
135
136/// Converts Iceberg table metadata into an
137/// AWS Glue `TableInput` representation.
138///
139/// This function facilitates the integration of Iceberg tables with AWS Glue
140/// by converting Iceberg table metadata into a Glue-compatible `TableInput`
141/// structure.
142pub(crate) fn convert_to_glue_table(
143    table_name: impl Into<String>,
144    metadata_location: String,
145    metadata: &TableMetadata,
146    properties: &HashMap<String, String>,
147    prev_metadata_location: Option<String>,
148) -> Result<TableInput> {
149    let glue_schema = GlueSchemaBuilder::from_iceberg(metadata)?.build();
150
151    let storage_descriptor = StorageDescriptor::builder()
152        .set_columns(Some(glue_schema))
153        .location(metadata.location().to_string())
154        .build();
155
156    let mut parameters = HashMap::from([
157        (TABLE_TYPE.to_string(), ICEBERG.to_string()),
158        (METADATA_LOCATION.to_string(), metadata_location),
159    ]);
160
161    if let Some(prev) = prev_metadata_location {
162        parameters.insert(PREV_METADATA_LOCATION.to_string(), prev);
163    }
164
165    let mut table_input_builder = TableInput::builder()
166        .name(table_name)
167        .set_parameters(Some(parameters))
168        .storage_descriptor(storage_descriptor)
169        .table_type(EXTERNAL_TABLE);
170
171    if let Some(description) = properties.get(DESCRIPTION) {
172        table_input_builder = table_input_builder.description(description);
173    }
174
175    let table_input = table_input_builder.build().map_err(from_aws_build_error)?;
176
177    Ok(table_input)
178}
179
180/// Checks if provided `NamespaceIdent` is valid
181pub(crate) fn validate_namespace(namespace: &NamespaceIdent) -> Result<String> {
182    let name = namespace.as_ref();
183
184    if name.len() != 1 {
185        return Err(Error::new(
186            ErrorKind::DataInvalid,
187            format!(
188                "Invalid database name: {namespace:?}, hierarchical namespaces are not supported"
189            ),
190        ));
191    }
192
193    let name = name[0].clone();
194
195    if name.is_empty() {
196        return Err(Error::new(
197            ErrorKind::DataInvalid,
198            "Invalid database, provided namespace is empty.",
199        ));
200    }
201
202    Ok(name)
203}
204
205/// Get default table location from `Namespace` properties
206pub(crate) fn get_default_table_location(
207    namespace: &Namespace,
208    db_name: impl AsRef<str>,
209    table_name: impl AsRef<str>,
210    warehouse: impl AsRef<str>,
211) -> String {
212    let properties = namespace.properties();
213
214    match properties.get(LOCATION) {
215        Some(location) => format!("{}/{}", location, table_name.as_ref()),
216        None => {
217            let warehouse_location = warehouse.as_ref().trim_end_matches('/');
218
219            format!(
220                "{}/{}.db/{}",
221                warehouse_location,
222                db_name.as_ref(),
223                table_name.as_ref()
224            )
225        }
226    }
227}
228
229/// Get metadata location from `GlueTable` parameters
230pub(crate) fn get_metadata_location(
231    parameters: &Option<HashMap<String, String>>,
232) -> Result<String> {
233    match parameters {
234        Some(properties) => match properties.get(METADATA_LOCATION) {
235            Some(location) => Ok(location.to_string()),
236            None => Err(Error::new(
237                ErrorKind::DataInvalid,
238                format!("No '{METADATA_LOCATION}' set on table"),
239            )),
240        },
241        None => Err(Error::new(
242            ErrorKind::DataInvalid,
243            "No 'parameters' set on table. Location of metadata is undefined",
244        )),
245    }
246}
247
248#[macro_export]
249/// Extends aws sdk builder with `catalog_id` if present
250macro_rules! with_catalog_id {
251    ($builder:expr, $config:expr) => {{
252        if let Some(catalog_id) = &$config.catalog_id {
253            $builder.catalog_id(catalog_id)
254        } else {
255            $builder
256        }
257    }};
258}
259
260#[cfg(test)]
261mod tests {
262    use aws_sdk_glue::config::ProvideCredentials;
263    use aws_sdk_glue::types::Column;
264    use iceberg::spec::{NestedField, PrimitiveType, Schema, TableMetadataBuilder, Type};
265    use iceberg::{MetadataLocation, Namespace, Result, TableCreation};
266
267    use super::*;
268    use crate::schema::{ICEBERG_FIELD_CURRENT, ICEBERG_FIELD_ID, ICEBERG_FIELD_OPTIONAL};
269
270    fn create_metadata(schema: Schema) -> Result<TableMetadata> {
271        let table_creation = TableCreation::builder()
272            .name("my_table".to_string())
273            .location("my_location".to_string())
274            .schema(schema)
275            .build();
276        let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
277            .build()?
278            .metadata;
279
280        Ok(metadata)
281    }
282
283    #[test]
284    fn test_get_metadata_location() -> Result<()> {
285        let params_valid = Some(HashMap::from([(
286            METADATA_LOCATION.to_string(),
287            "my_location".to_string(),
288        )]));
289        let params_missing_key = Some(HashMap::from([(
290            "not_here".to_string(),
291            "my_location".to_string(),
292        )]));
293
294        let result_valid = get_metadata_location(&params_valid)?;
295        let result_missing_key = get_metadata_location(&params_missing_key);
296        let result_no_params = get_metadata_location(&None);
297
298        assert_eq!(result_valid, "my_location");
299        assert!(result_missing_key.is_err());
300        assert!(result_no_params.is_err());
301
302        Ok(())
303    }
304
305    #[test]
306    fn test_convert_to_glue_table() -> Result<()> {
307        let table_name = "my_table".to_string();
308        let location = "s3a://warehouse/hive".to_string();
309        let schema = Schema::builder()
310            .with_schema_id(1)
311            .with_fields(vec![
312                NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
313            ])
314            .build()?;
315
316        let metadata = create_metadata(schema)?;
317        let metadata_location =
318            MetadataLocation::new_with_metadata(location, &metadata).to_string();
319
320        let parameters = HashMap::from([
321            (ICEBERG_FIELD_ID.to_string(), "1".to_string()),
322            (ICEBERG_FIELD_OPTIONAL.to_string(), "false".to_string()),
323            (ICEBERG_FIELD_CURRENT.to_string(), "true".to_string()),
324        ]);
325
326        let column = Column::builder()
327            .name("foo")
328            .r#type("int")
329            .set_parameters(Some(parameters))
330            .set_comment(None)
331            .build()
332            .map_err(from_aws_build_error)?;
333
334        let storage_descriptor = StorageDescriptor::builder()
335            .set_columns(Some(vec![column]))
336            .location(metadata.location())
337            .build();
338
339        let result = convert_to_glue_table(
340            &table_name,
341            metadata_location,
342            &metadata,
343            metadata.properties(),
344            None,
345        )?;
346
347        assert_eq!(result.name(), &table_name);
348        assert_eq!(result.description(), None);
349        assert_eq!(result.storage_descriptor, Some(storage_descriptor));
350
351        Ok(())
352    }
353
354    #[test]
355    fn test_get_default_table_location() -> Result<()> {
356        let properties = HashMap::from([(LOCATION.to_string(), "db_location".to_string())]);
357
358        let namespace =
359            Namespace::with_properties(NamespaceIdent::new("default".into()), properties);
360        let db_name = validate_namespace(namespace.name())?;
361        let table_name = "my_table";
362
363        let expected = "db_location/my_table";
364        let result =
365            get_default_table_location(&namespace, db_name, table_name, "warehouse_location");
366
367        assert_eq!(expected, result);
368
369        Ok(())
370    }
371
372    #[test]
373    fn test_get_default_table_location_warehouse() -> Result<()> {
374        let namespace = Namespace::new(NamespaceIdent::new("default".into()));
375        let db_name = validate_namespace(namespace.name())?;
376        let table_name = "my_table";
377
378        let expected = "warehouse_location/default.db/my_table";
379        let result =
380            get_default_table_location(&namespace, db_name, table_name, "warehouse_location");
381
382        assert_eq!(expected, result);
383
384        Ok(())
385    }
386
387    #[test]
388    fn test_convert_to_namespace() -> Result<()> {
389        let db = Database::builder()
390            .name("my_db")
391            .location_uri("my_location")
392            .description("my_description")
393            .build()
394            .map_err(from_aws_build_error)?;
395
396        let properties = HashMap::from([
397            (DESCRIPTION.to_string(), "my_description".to_string()),
398            (LOCATION.to_string(), "my_location".to_string()),
399        ]);
400
401        let expected =
402            Namespace::with_properties(NamespaceIdent::new("my_db".to_string()), properties);
403        let result = convert_to_namespace(&db);
404
405        assert_eq!(result, expected);
406
407        Ok(())
408    }
409
410    #[test]
411    fn test_convert_to_database() -> Result<()> {
412        let namespace = NamespaceIdent::new("my_database".to_string());
413        let properties = HashMap::from([(LOCATION.to_string(), "my_location".to_string())]);
414
415        let result = convert_to_database(&namespace, &properties)?;
416
417        assert_eq!("my_database", result.name());
418        assert_eq!(Some("my_location".to_string()), result.location_uri);
419
420        Ok(())
421    }
422
423    #[test]
424    fn test_validate_namespace() {
425        let valid_ns = Namespace::new(NamespaceIdent::new("ns".to_string()));
426        let empty_ns = Namespace::new(NamespaceIdent::new("".to_string()));
427        let hierarchical_ns = Namespace::new(
428            NamespaceIdent::from_vec(vec!["level1".to_string(), "level2".to_string()]).unwrap(),
429        );
430
431        let valid = validate_namespace(valid_ns.name());
432        let empty = validate_namespace(empty_ns.name());
433        let hierarchical = validate_namespace(hierarchical_ns.name());
434
435        assert!(valid.is_ok());
436        assert!(empty.is_err());
437        assert!(hierarchical.is_err());
438    }
439
440    #[tokio::test]
441    async fn test_config_with_custom_endpoint() {
442        let properties = HashMap::new();
443        let endpoint_url = "http://custom_url:5001";
444
445        let sdk_config = create_sdk_config(&properties, Some(&endpoint_url.to_string())).await;
446
447        let result = sdk_config.endpoint_url().unwrap();
448
449        assert_eq!(result, endpoint_url);
450    }
451
452    #[tokio::test]
453    async fn test_config_with_properties() {
454        let properties = HashMap::from([
455            (AWS_PROFILE_NAME.to_string(), "my_profile".to_string()),
456            (AWS_REGION_NAME.to_string(), "us-east-1".to_string()),
457            (AWS_ACCESS_KEY_ID.to_string(), "my-access-id".to_string()),
458            (
459                AWS_SECRET_ACCESS_KEY.to_string(),
460                "my-secret-key".to_string(),
461            ),
462            (AWS_SESSION_TOKEN.to_string(), "my-token".to_string()),
463        ]);
464
465        let sdk_config = create_sdk_config(&properties, None).await;
466
467        let region = sdk_config.region().unwrap().as_ref();
468        let credentials = sdk_config
469            .credentials_provider()
470            .unwrap()
471            .provide_credentials()
472            .await
473            .unwrap();
474
475        assert_eq!("us-east-1", region);
476        assert_eq!("my-access-id", credentials.access_key_id());
477        assert_eq!("my-secret-key", credentials.secret_access_key());
478        assert_eq!("my-token", credentials.session_token().unwrap());
479    }
480}