iceberg_catalog_glue/
utils.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19
20use aws_config::{BehaviorVersion, Region, SdkConfig};
21use aws_sdk_glue::config::Credentials;
22use aws_sdk_glue::types::{Database, DatabaseInput, StorageDescriptor, TableInput};
23use iceberg::spec::TableMetadata;
24use iceberg::{Error, ErrorKind, Namespace, NamespaceIdent, Result};
25
26use crate::error::from_aws_build_error;
27use crate::schema::GlueSchemaBuilder;
28
29/// Property aws profile name
30pub const AWS_PROFILE_NAME: &str = "profile_name";
31/// Property aws region
32pub const AWS_REGION_NAME: &str = "region_name";
33/// Property aws access key
34pub const AWS_ACCESS_KEY_ID: &str = "aws_access_key_id";
35/// Property aws secret access key
36pub const AWS_SECRET_ACCESS_KEY: &str = "aws_secret_access_key";
37/// Property aws session token
38pub const AWS_SESSION_TOKEN: &str = "aws_session_token";
39/// Parameter namespace description
40const DESCRIPTION: &str = "description";
41/// Parameter namespace location uri
42const LOCATION: &str = "location_uri";
43/// Property `metadata_location` for `TableInput`
44const METADATA_LOCATION: &str = "metadata_location";
45/// Property `previous_metadata_location` for `TableInput`
46const PREV_METADATA_LOCATION: &str = "previous_metadata_location";
47/// Property external table for `TableInput`
48const EXTERNAL_TABLE: &str = "EXTERNAL_TABLE";
49/// Parameter key `table_type` for `TableInput`
50const TABLE_TYPE: &str = "table_type";
51/// Parameter value `table_type` for `TableInput`
52const ICEBERG: &str = "ICEBERG";
53
54/// Creates an aws sdk configuration based on
55/// provided properties and an optional endpoint URL.
56pub(crate) async fn create_sdk_config(
57    properties: &HashMap<String, String>,
58    endpoint_uri: Option<&String>,
59) -> SdkConfig {
60    let mut config = aws_config::defaults(BehaviorVersion::latest());
61
62    if let Some(endpoint) = endpoint_uri {
63        config = config.endpoint_url(endpoint)
64    };
65
66    if properties.is_empty() {
67        return config.load().await;
68    }
69
70    if let (Some(access_key), Some(secret_key)) = (
71        properties.get(AWS_ACCESS_KEY_ID),
72        properties.get(AWS_SECRET_ACCESS_KEY),
73    ) {
74        let session_token = properties.get(AWS_SESSION_TOKEN).cloned();
75        let credentials_provider =
76            Credentials::new(access_key, secret_key, session_token, None, "properties");
77
78        config = config.credentials_provider(credentials_provider)
79    };
80
81    if let Some(profile_name) = properties.get(AWS_PROFILE_NAME) {
82        config = config.profile_name(profile_name);
83    }
84
85    if let Some(region_name) = properties.get(AWS_REGION_NAME) {
86        let region = Region::new(region_name.clone());
87        config = config.region(region);
88    }
89
90    config.load().await
91}
92
93/// Create `DatabaseInput` from `NamespaceIdent` and properties
94pub(crate) fn convert_to_database(
95    namespace: &NamespaceIdent,
96    properties: &HashMap<String, String>,
97) -> Result<DatabaseInput> {
98    let db_name = validate_namespace(namespace)?;
99    let mut builder = DatabaseInput::builder().name(db_name);
100
101    for (k, v) in properties.iter() {
102        match k.as_ref() {
103            DESCRIPTION => {
104                builder = builder.description(v);
105            }
106            LOCATION => {
107                builder = builder.location_uri(v);
108            }
109            _ => {
110                builder = builder.parameters(k, v);
111            }
112        }
113    }
114
115    builder.build().map_err(from_aws_build_error)
116}
117
118/// Create `Namespace` from aws sdk glue `Database`
119pub(crate) fn convert_to_namespace(database: &Database) -> Namespace {
120    let db_name = database.name().to_string();
121    let mut properties = database
122        .parameters()
123        .map_or_else(HashMap::new, |p| p.clone());
124
125    if let Some(location_uri) = database.location_uri() {
126        properties.insert(LOCATION.to_string(), location_uri.to_string());
127    };
128
129    if let Some(description) = database.description() {
130        properties.insert(DESCRIPTION.to_string(), description.to_string());
131    }
132
133    Namespace::with_properties(NamespaceIdent::new(db_name), properties)
134}
135
136/// Converts Iceberg table metadata into an
137/// AWS Glue `TableInput` representation.
138///
139/// This function facilitates the integration of Iceberg tables with AWS Glue
140/// by converting Iceberg table metadata into a Glue-compatible `TableInput`
141/// structure.
142pub(crate) fn convert_to_glue_table(
143    table_name: impl Into<String>,
144    metadata_location: String,
145    metadata: &TableMetadata,
146    properties: &HashMap<String, String>,
147    prev_metadata_location: Option<String>,
148) -> Result<TableInput> {
149    let glue_schema = GlueSchemaBuilder::from_iceberg(metadata)?.build();
150
151    let storage_descriptor = StorageDescriptor::builder()
152        .set_columns(Some(glue_schema))
153        .location(metadata.location().to_string())
154        .build();
155
156    let mut parameters = HashMap::from([
157        (TABLE_TYPE.to_string(), ICEBERG.to_string()),
158        (METADATA_LOCATION.to_string(), metadata_location),
159    ]);
160
161    if let Some(prev) = prev_metadata_location {
162        parameters.insert(PREV_METADATA_LOCATION.to_string(), prev);
163    }
164
165    let mut table_input_builder = TableInput::builder()
166        .name(table_name)
167        .set_parameters(Some(parameters))
168        .storage_descriptor(storage_descriptor)
169        .table_type(EXTERNAL_TABLE);
170
171    if let Some(description) = properties.get(DESCRIPTION) {
172        table_input_builder = table_input_builder.description(description);
173    }
174
175    let table_input = table_input_builder.build().map_err(from_aws_build_error)?;
176
177    Ok(table_input)
178}
179
180/// Checks if provided `NamespaceIdent` is valid
181pub(crate) fn validate_namespace(namespace: &NamespaceIdent) -> Result<String> {
182    let name = namespace.as_ref();
183
184    if name.len() != 1 {
185        return Err(Error::new(
186            ErrorKind::DataInvalid,
187            format!(
188                "Invalid database name: {namespace:?}, hierarchical namespaces are not supported"
189            ),
190        ));
191    }
192
193    let name = name[0].clone();
194
195    if name.is_empty() {
196        return Err(Error::new(
197            ErrorKind::DataInvalid,
198            "Invalid database, provided namespace is empty.",
199        ));
200    }
201
202    Ok(name)
203}
204
205/// Get default table location from `Namespace` properties
206pub(crate) fn get_default_table_location(
207    namespace: &Namespace,
208    db_name: impl AsRef<str>,
209    table_name: impl AsRef<str>,
210    warehouse: impl AsRef<str>,
211) -> String {
212    let properties = namespace.properties();
213
214    match properties.get(LOCATION) {
215        Some(location) => format!("{}/{}", location, table_name.as_ref()),
216        None => {
217            let warehouse_location = warehouse.as_ref().trim_end_matches('/');
218
219            format!(
220                "{}/{}.db/{}",
221                warehouse_location,
222                db_name.as_ref(),
223                table_name.as_ref()
224            )
225        }
226    }
227}
228
229/// Get metadata location from `GlueTable` parameters
230pub(crate) fn get_metadata_location(
231    parameters: &Option<HashMap<String, String>>,
232) -> Result<String> {
233    match parameters {
234        Some(properties) => match properties.get(METADATA_LOCATION) {
235            Some(location) => Ok(location.to_string()),
236            None => Err(Error::new(
237                ErrorKind::DataInvalid,
238                format!("No '{METADATA_LOCATION}' set on table"),
239            )),
240        },
241        None => Err(Error::new(
242            ErrorKind::DataInvalid,
243            "No 'parameters' set on table. Location of metadata is undefined",
244        )),
245    }
246}
247
248#[macro_export]
249/// Extends aws sdk builder with `catalog_id` if present
250macro_rules! with_catalog_id {
251    ($builder:expr, $config:expr) => {{
252        if let Some(catalog_id) = &$config.catalog_id {
253            $builder.catalog_id(catalog_id)
254        } else {
255            $builder
256        }
257    }};
258}
259
260#[cfg(test)]
261mod tests {
262    use aws_sdk_glue::config::ProvideCredentials;
263    use aws_sdk_glue::types::Column;
264    use iceberg::spec::{NestedField, PrimitiveType, Schema, TableMetadataBuilder, Type};
265    use iceberg::{MetadataLocation, Namespace, Result, TableCreation};
266
267    use super::*;
268    use crate::schema::{ICEBERG_FIELD_CURRENT, ICEBERG_FIELD_ID, ICEBERG_FIELD_OPTIONAL};
269
270    fn create_metadata(schema: Schema) -> Result<TableMetadata> {
271        let table_creation = TableCreation::builder()
272            .name("my_table".to_string())
273            .location("my_location".to_string())
274            .schema(schema)
275            .build();
276        let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
277            .build()?
278            .metadata;
279
280        Ok(metadata)
281    }
282
283    #[test]
284    fn test_get_metadata_location() -> Result<()> {
285        let params_valid = Some(HashMap::from([(
286            METADATA_LOCATION.to_string(),
287            "my_location".to_string(),
288        )]));
289        let params_missing_key = Some(HashMap::from([(
290            "not_here".to_string(),
291            "my_location".to_string(),
292        )]));
293
294        let result_valid = get_metadata_location(&params_valid)?;
295        let result_missing_key = get_metadata_location(&params_missing_key);
296        let result_no_params = get_metadata_location(&None);
297
298        assert_eq!(result_valid, "my_location");
299        assert!(result_missing_key.is_err());
300        assert!(result_no_params.is_err());
301
302        Ok(())
303    }
304
305    #[test]
306    fn test_convert_to_glue_table() -> Result<()> {
307        let table_name = "my_table".to_string();
308        let location = "s3a://warehouse/hive".to_string();
309        let metadata_location = MetadataLocation::new_with_table_location(location).to_string();
310        let properties = HashMap::new();
311        let schema = Schema::builder()
312            .with_schema_id(1)
313            .with_fields(vec![
314                NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
315            ])
316            .build()?;
317
318        let metadata = create_metadata(schema)?;
319
320        let parameters = HashMap::from([
321            (ICEBERG_FIELD_ID.to_string(), "1".to_string()),
322            (ICEBERG_FIELD_OPTIONAL.to_string(), "false".to_string()),
323            (ICEBERG_FIELD_CURRENT.to_string(), "true".to_string()),
324        ]);
325
326        let column = Column::builder()
327            .name("foo")
328            .r#type("int")
329            .set_parameters(Some(parameters))
330            .set_comment(None)
331            .build()
332            .map_err(from_aws_build_error)?;
333
334        let storage_descriptor = StorageDescriptor::builder()
335            .set_columns(Some(vec![column]))
336            .location(metadata.location())
337            .build();
338
339        let result =
340            convert_to_glue_table(&table_name, metadata_location, &metadata, &properties, None)?;
341
342        assert_eq!(result.name(), &table_name);
343        assert_eq!(result.description(), None);
344        assert_eq!(result.storage_descriptor, Some(storage_descriptor));
345
346        Ok(())
347    }
348
349    #[test]
350    fn test_get_default_table_location() -> Result<()> {
351        let properties = HashMap::from([(LOCATION.to_string(), "db_location".to_string())]);
352
353        let namespace =
354            Namespace::with_properties(NamespaceIdent::new("default".into()), properties);
355        let db_name = validate_namespace(namespace.name())?;
356        let table_name = "my_table";
357
358        let expected = "db_location/my_table";
359        let result =
360            get_default_table_location(&namespace, db_name, table_name, "warehouse_location");
361
362        assert_eq!(expected, result);
363
364        Ok(())
365    }
366
367    #[test]
368    fn test_get_default_table_location_warehouse() -> Result<()> {
369        let namespace = Namespace::new(NamespaceIdent::new("default".into()));
370        let db_name = validate_namespace(namespace.name())?;
371        let table_name = "my_table";
372
373        let expected = "warehouse_location/default.db/my_table";
374        let result =
375            get_default_table_location(&namespace, db_name, table_name, "warehouse_location");
376
377        assert_eq!(expected, result);
378
379        Ok(())
380    }
381
382    #[test]
383    fn test_convert_to_namespace() -> Result<()> {
384        let db = Database::builder()
385            .name("my_db")
386            .location_uri("my_location")
387            .description("my_description")
388            .build()
389            .map_err(from_aws_build_error)?;
390
391        let properties = HashMap::from([
392            (DESCRIPTION.to_string(), "my_description".to_string()),
393            (LOCATION.to_string(), "my_location".to_string()),
394        ]);
395
396        let expected =
397            Namespace::with_properties(NamespaceIdent::new("my_db".to_string()), properties);
398        let result = convert_to_namespace(&db);
399
400        assert_eq!(result, expected);
401
402        Ok(())
403    }
404
405    #[test]
406    fn test_convert_to_database() -> Result<()> {
407        let namespace = NamespaceIdent::new("my_database".to_string());
408        let properties = HashMap::from([(LOCATION.to_string(), "my_location".to_string())]);
409
410        let result = convert_to_database(&namespace, &properties)?;
411
412        assert_eq!("my_database", result.name());
413        assert_eq!(Some("my_location".to_string()), result.location_uri);
414
415        Ok(())
416    }
417
418    #[test]
419    fn test_validate_namespace() {
420        let valid_ns = Namespace::new(NamespaceIdent::new("ns".to_string()));
421        let empty_ns = Namespace::new(NamespaceIdent::new("".to_string()));
422        let hierarchical_ns = Namespace::new(
423            NamespaceIdent::from_vec(vec!["level1".to_string(), "level2".to_string()]).unwrap(),
424        );
425
426        let valid = validate_namespace(valid_ns.name());
427        let empty = validate_namespace(empty_ns.name());
428        let hierarchical = validate_namespace(hierarchical_ns.name());
429
430        assert!(valid.is_ok());
431        assert!(empty.is_err());
432        assert!(hierarchical.is_err());
433    }
434
435    #[tokio::test]
436    async fn test_config_with_custom_endpoint() {
437        let properties = HashMap::new();
438        let endpoint_url = "http://custom_url:5000";
439
440        let sdk_config = create_sdk_config(&properties, Some(&endpoint_url.to_string())).await;
441
442        let result = sdk_config.endpoint_url().unwrap();
443
444        assert_eq!(result, endpoint_url);
445    }
446
447    #[tokio::test]
448    async fn test_config_with_properties() {
449        let properties = HashMap::from([
450            (AWS_PROFILE_NAME.to_string(), "my_profile".to_string()),
451            (AWS_REGION_NAME.to_string(), "us-east-1".to_string()),
452            (AWS_ACCESS_KEY_ID.to_string(), "my-access-id".to_string()),
453            (
454                AWS_SECRET_ACCESS_KEY.to_string(),
455                "my-secret-key".to_string(),
456            ),
457            (AWS_SESSION_TOKEN.to_string(), "my-token".to_string()),
458        ]);
459
460        let sdk_config = create_sdk_config(&properties, None).await;
461
462        let region = sdk_config.region().unwrap().as_ref();
463        let credentials = sdk_config
464            .credentials_provider()
465            .unwrap()
466            .provide_credentials()
467            .await
468            .unwrap();
469
470        assert_eq!("us-east-1", region);
471        assert_eq!("my-access-id", credentials.access_key_id());
472        assert_eq!("my-secret-key", credentials.secret_access_key());
473        assert_eq!("my-token", credentials.session_token().unwrap());
474    }
475}