iceberg_catalog_hms/
utils.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19
20use chrono::Utc;
21use hive_metastore::{Database, PrincipalType, SerDeInfo, StorageDescriptor};
22use iceberg::spec::Schema;
23use iceberg::{Error, ErrorKind, Namespace, NamespaceIdent, Result};
24use pilota::{AHashMap, FastStr};
25
26use crate::schema::HiveSchemaBuilder;
27
28/// hive.metastore.database.owner setting
29const HMS_DB_OWNER: &str = "hive.metastore.database.owner";
30/// hive.metastore.database.owner default setting
31const HMS_DEFAULT_DB_OWNER: &str = "user.name";
32/// hive.metastore.database.owner-type setting
33const HMS_DB_OWNER_TYPE: &str = "hive.metastore.database.owner-type";
34/// hive metatore `owner` property
35const OWNER: &str = "owner";
36/// hive metatore `description` property
37const COMMENT: &str = "comment";
38/// hive metatore `location` property
39const LOCATION: &str = "location";
40/// hive metatore `metadata_location` property
41const METADATA_LOCATION: &str = "metadata_location";
42/// hive metatore `external` property
43const EXTERNAL: &str = "EXTERNAL";
44/// hive metatore `external_table` property
45const EXTERNAL_TABLE: &str = "EXTERNAL_TABLE";
46/// hive metatore `table_type` property
47const TABLE_TYPE: &str = "table_type";
48/// hive metatore `SerDeInfo` serialization_lib parameter
49const SERIALIZATION_LIB: &str = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
50/// hive metatore input format
51const INPUT_FORMAT: &str = "org.apache.hadoop.mapred.FileInputFormat";
52/// hive metatore output format
53const OUTPUT_FORMAT: &str = "org.apache.hadoop.mapred.FileOutputFormat";
54
55/// Returns a `Namespace` by extracting database name and properties
56/// from `hive_metastore::hms::Database`
57pub(crate) fn convert_to_namespace(database: &Database) -> Result<Namespace> {
58    let mut properties = HashMap::new();
59
60    let name = database
61        .name
62        .as_ref()
63        .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Database name must be specified"))?
64        .to_string();
65
66    if let Some(description) = &database.description {
67        properties.insert(COMMENT.to_string(), description.to_string());
68    };
69
70    if let Some(location) = &database.location_uri {
71        properties.insert(LOCATION.to_string(), location.to_string());
72    };
73
74    if let Some(owner) = &database.owner_name {
75        properties.insert(HMS_DB_OWNER.to_string(), owner.to_string());
76    };
77
78    if let Some(owner_type) = database.owner_type {
79        let value = if owner_type == PrincipalType::USER {
80            "User"
81        } else if owner_type == PrincipalType::GROUP {
82            "Group"
83        } else if owner_type == PrincipalType::ROLE {
84            "Role"
85        } else {
86            unreachable!("Invalid owner type")
87        };
88
89        properties.insert(HMS_DB_OWNER_TYPE.to_string(), value.to_string());
90    };
91
92    if let Some(params) = &database.parameters {
93        params.iter().for_each(|(k, v)| {
94            properties.insert(k.clone().into(), v.clone().into());
95        });
96    };
97
98    Ok(Namespace::with_properties(
99        NamespaceIdent::new(name),
100        properties,
101    ))
102}
103
104/// Converts name and properties into `hive_metastore::hms::Database`
105/// after validating the `namespace` and `owner-settings`.
106pub(crate) fn convert_to_database(
107    namespace: &NamespaceIdent,
108    properties: &HashMap<String, String>,
109) -> Result<Database> {
110    let name = validate_namespace(namespace)?;
111    validate_owner_settings(properties)?;
112
113    let mut db = Database::default();
114    let mut parameters = AHashMap::new();
115
116    db.name = Some(name.into());
117
118    for (k, v) in properties {
119        match k.as_str() {
120            COMMENT => db.description = Some(v.clone().into()),
121            LOCATION => db.location_uri = Some(format_location_uri(v.clone()).into()),
122            HMS_DB_OWNER => db.owner_name = Some(v.clone().into()),
123            HMS_DB_OWNER_TYPE => {
124                let owner_type = match v.to_lowercase().as_str() {
125                    "user" => PrincipalType::USER,
126                    "group" => PrincipalType::GROUP,
127                    "role" => PrincipalType::ROLE,
128                    _ => {
129                        return Err(Error::new(
130                            ErrorKind::DataInvalid,
131                            format!("Invalid value for setting 'owner_type': {v}"),
132                        ));
133                    }
134                };
135                db.owner_type = Some(owner_type);
136            }
137            _ => {
138                parameters.insert(
139                    FastStr::from_string(k.clone()),
140                    FastStr::from_string(v.clone()),
141                );
142            }
143        }
144    }
145
146    db.parameters = Some(parameters);
147
148    // Set default owner, if none provided
149    // https://github.com/apache/iceberg/blob/main/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveHadoopUtil.java#L44
150    if db.owner_name.is_none() {
151        db.owner_name = Some(HMS_DEFAULT_DB_OWNER.into());
152        db.owner_type = Some(PrincipalType::USER);
153    }
154
155    Ok(db)
156}
157
158pub(crate) fn convert_to_hive_table(
159    db_name: String,
160    schema: &Schema,
161    table_name: String,
162    location: String,
163    metadata_location: String,
164    properties: &HashMap<String, String>,
165) -> Result<hive_metastore::Table> {
166    let serde_info = SerDeInfo {
167        serialization_lib: Some(SERIALIZATION_LIB.into()),
168        ..Default::default()
169    };
170
171    let hive_schema = HiveSchemaBuilder::from_iceberg(schema)?.build();
172
173    let storage_descriptor = StorageDescriptor {
174        location: Some(location.into()),
175        cols: Some(hive_schema),
176        input_format: Some(INPUT_FORMAT.into()),
177        output_format: Some(OUTPUT_FORMAT.into()),
178        serde_info: Some(serde_info),
179        ..Default::default()
180    };
181
182    let parameters = AHashMap::from([
183        (FastStr::from(EXTERNAL), FastStr::from("TRUE")),
184        (FastStr::from(TABLE_TYPE), FastStr::from("ICEBERG")),
185        (
186            FastStr::from(METADATA_LOCATION),
187            FastStr::from(metadata_location),
188        ),
189    ]);
190
191    let current_time_ms = get_current_time()?;
192    let owner = properties
193        .get(OWNER)
194        .map_or(HMS_DEFAULT_DB_OWNER.to_string(), |v| v.into());
195
196    Ok(hive_metastore::Table {
197        table_name: Some(table_name.into()),
198        db_name: Some(db_name.into()),
199        table_type: Some(EXTERNAL_TABLE.into()),
200        owner: Some(owner.into()),
201        create_time: Some(current_time_ms),
202        last_access_time: Some(current_time_ms),
203        sd: Some(storage_descriptor),
204        parameters: Some(parameters),
205        ..Default::default()
206    })
207}
208
209/// Checks if provided `NamespaceIdent` is valid.
210pub(crate) fn validate_namespace(namespace: &NamespaceIdent) -> Result<String> {
211    let name = namespace.as_ref();
212
213    if name.len() != 1 {
214        return Err(Error::new(
215            ErrorKind::DataInvalid,
216            format!(
217                "Invalid database name: {namespace:?}, hierarchical namespaces are not supported"
218            ),
219        ));
220    }
221
222    let name = name[0].clone();
223
224    if name.is_empty() {
225        return Err(Error::new(
226            ErrorKind::DataInvalid,
227            "Invalid database, provided namespace is empty.",
228        ));
229    }
230
231    Ok(name)
232}
233
234/// Get default table location from `Namespace` properties
235pub(crate) fn get_default_table_location(
236    namespace: &Namespace,
237    table_name: impl AsRef<str>,
238    warehouse: impl AsRef<str>,
239) -> String {
240    let properties = namespace.properties();
241
242    let location = match properties.get(LOCATION) {
243        Some(location) => location,
244        None => warehouse.as_ref(),
245    };
246
247    format!("{}/{}", location, table_name.as_ref())
248}
249
250/// Get metadata location from `HiveTable` parameters
251pub(crate) fn get_metadata_location(
252    parameters: &Option<AHashMap<FastStr, FastStr>>,
253) -> Result<String> {
254    match parameters {
255        Some(properties) => match properties.get(METADATA_LOCATION) {
256            Some(location) => Ok(location.to_string()),
257            None => Err(Error::new(
258                ErrorKind::DataInvalid,
259                format!("No '{METADATA_LOCATION}' set on table"),
260            )),
261        },
262        None => Err(Error::new(
263            ErrorKind::DataInvalid,
264            "No 'parameters' set on table. Location of metadata is undefined",
265        )),
266    }
267}
268
269/// Formats location_uri by e.g. removing trailing slashes.
270fn format_location_uri(location: String) -> String {
271    let mut location = location;
272
273    if !location.starts_with('/') {
274        location = format!("/{location}");
275    }
276
277    if location.ends_with('/') && location.len() > 1 {
278        location.pop();
279    }
280
281    location
282}
283
284/// Checks if `owner-settings` are valid.
285/// If `owner_type` is set, then `owner` must also be set.
286fn validate_owner_settings(properties: &HashMap<String, String>) -> Result<()> {
287    let owner_is_set = properties.get(HMS_DB_OWNER).is_some();
288    let owner_type_is_set = properties.get(HMS_DB_OWNER_TYPE).is_some();
289
290    if owner_type_is_set && !owner_is_set {
291        return Err(Error::new(
292            ErrorKind::DataInvalid,
293            format!(
294                "Setting '{HMS_DB_OWNER_TYPE}' without setting '{HMS_DB_OWNER}' is not allowed"
295            ),
296        ));
297    }
298
299    Ok(())
300}
301
302fn get_current_time() -> Result<i32> {
303    let now = Utc::now();
304    now.timestamp().try_into().map_err(|_| {
305        Error::new(
306            ErrorKind::Unexpected,
307            "Current time is out of range for i32",
308        )
309    })
310}
311
312#[cfg(test)]
313mod tests {
314    use iceberg::spec::{NestedField, PrimitiveType, Type};
315    use iceberg::{MetadataLocation, Namespace, NamespaceIdent};
316
317    use super::*;
318
319    #[test]
320    fn test_get_metadata_location() -> Result<()> {
321        let params_valid = Some(AHashMap::from([(
322            FastStr::new(METADATA_LOCATION),
323            FastStr::new("my_location"),
324        )]));
325        let params_missing_key = Some(AHashMap::from([(
326            FastStr::new("not_here"),
327            FastStr::new("my_location"),
328        )]));
329
330        let result_valid = get_metadata_location(&params_valid)?;
331        let result_missing_key = get_metadata_location(&params_missing_key);
332        let result_no_params = get_metadata_location(&None);
333
334        assert_eq!(result_valid, "my_location");
335        assert!(result_missing_key.is_err());
336        assert!(result_no_params.is_err());
337
338        Ok(())
339    }
340
341    #[test]
342    fn test_convert_to_hive_table() -> Result<()> {
343        let db_name = "my_db".to_string();
344        let table_name = "my_table".to_string();
345        let location = "s3a://warehouse/hms".to_string();
346        let metadata_location =
347            MetadataLocation::new_with_table_location(location.clone()).to_string();
348        let properties = HashMap::new();
349        let schema = Schema::builder()
350            .with_schema_id(1)
351            .with_fields(vec![
352                NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
353                NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
354            ])
355            .build()?;
356
357        let result = convert_to_hive_table(
358            db_name.clone(),
359            &schema,
360            table_name.clone(),
361            location.clone(),
362            metadata_location,
363            &properties,
364        )?;
365
366        let serde_info = SerDeInfo {
367            serialization_lib: Some(SERIALIZATION_LIB.into()),
368            ..Default::default()
369        };
370
371        let hive_schema = HiveSchemaBuilder::from_iceberg(&schema)?.build();
372
373        let sd = StorageDescriptor {
374            location: Some(location.into()),
375            cols: Some(hive_schema),
376            input_format: Some(INPUT_FORMAT.into()),
377            output_format: Some(OUTPUT_FORMAT.into()),
378            serde_info: Some(serde_info),
379            ..Default::default()
380        };
381
382        assert_eq!(result.db_name, Some(db_name.into()));
383        assert_eq!(result.table_name, Some(table_name.into()));
384        assert_eq!(result.table_type, Some(EXTERNAL_TABLE.into()));
385        assert_eq!(result.owner, Some(HMS_DEFAULT_DB_OWNER.into()));
386        assert_eq!(result.sd, Some(sd));
387
388        Ok(())
389    }
390
391    #[test]
392    fn test_get_default_table_location() -> Result<()> {
393        let properties = HashMap::from([(LOCATION.to_string(), "db_location".to_string())]);
394
395        let namespace =
396            Namespace::with_properties(NamespaceIdent::new("default".into()), properties);
397        let table_name = "my_table";
398
399        let expected = "db_location/my_table";
400        let result = get_default_table_location(&namespace, table_name, "warehouse_location");
401
402        assert_eq!(expected, result);
403
404        Ok(())
405    }
406
407    #[test]
408    fn test_get_default_table_location_warehouse() -> Result<()> {
409        let namespace = Namespace::new(NamespaceIdent::new("default".into()));
410        let table_name = "my_table";
411
412        let expected = "warehouse_location/my_table";
413        let result = get_default_table_location(&namespace, table_name, "warehouse_location");
414
415        assert_eq!(expected, result);
416
417        Ok(())
418    }
419
420    #[test]
421    fn test_convert_to_namespace() -> Result<()> {
422        let properties = HashMap::from([
423            (COMMENT.to_string(), "my_description".to_string()),
424            (LOCATION.to_string(), "/my_location".to_string()),
425            (HMS_DB_OWNER.to_string(), "apache".to_string()),
426            (HMS_DB_OWNER_TYPE.to_string(), "User".to_string()),
427            ("key1".to_string(), "value1".to_string()),
428        ]);
429
430        let ident = NamespaceIdent::new("my_namespace".into());
431        let db = convert_to_database(&ident, &properties)?;
432
433        let expected_ns = Namespace::with_properties(ident, properties);
434        let result_ns = convert_to_namespace(&db)?;
435
436        assert_eq!(expected_ns, result_ns);
437
438        Ok(())
439    }
440
441    #[test]
442    fn test_validate_owner_settings() {
443        let valid = HashMap::from([
444            (HMS_DB_OWNER.to_string(), "apache".to_string()),
445            (HMS_DB_OWNER_TYPE.to_string(), "user".to_string()),
446        ]);
447        let invalid = HashMap::from([(HMS_DB_OWNER_TYPE.to_string(), "user".to_string())]);
448
449        assert!(validate_owner_settings(&valid).is_ok());
450        assert!(validate_owner_settings(&invalid).is_err());
451    }
452
453    #[test]
454    fn test_convert_to_database() -> Result<()> {
455        let ns = NamespaceIdent::new("my_namespace".into());
456        let properties = HashMap::from([
457            (COMMENT.to_string(), "my_description".to_string()),
458            (LOCATION.to_string(), "my_location".to_string()),
459            (HMS_DB_OWNER.to_string(), "apache".to_string()),
460            (HMS_DB_OWNER_TYPE.to_string(), "user".to_string()),
461            ("key1".to_string(), "value1".to_string()),
462        ]);
463
464        let db = convert_to_database(&ns, &properties)?;
465
466        assert_eq!(db.name, Some(FastStr::from("my_namespace")));
467        assert_eq!(db.description, Some(FastStr::from("my_description")));
468        assert_eq!(db.owner_name, Some(FastStr::from("apache")));
469        assert_eq!(db.owner_type, Some(PrincipalType::USER));
470
471        if let Some(params) = db.parameters {
472            assert_eq!(params.get("key1"), Some(&FastStr::from("value1")));
473        }
474
475        Ok(())
476    }
477
478    #[test]
479    fn test_convert_to_database_with_default_user() -> Result<()> {
480        let ns = NamespaceIdent::new("my_namespace".into());
481        let properties = HashMap::new();
482
483        let db = convert_to_database(&ns, &properties)?;
484
485        assert_eq!(db.name, Some(FastStr::from("my_namespace")));
486        assert_eq!(db.owner_name, Some(FastStr::from(HMS_DEFAULT_DB_OWNER)));
487        assert_eq!(db.owner_type, Some(PrincipalType::USER));
488
489        Ok(())
490    }
491
492    #[test]
493    fn test_validate_namespace() {
494        let valid_ns = Namespace::new(NamespaceIdent::new("ns".to_string()));
495        let empty_ns = Namespace::new(NamespaceIdent::new("".to_string()));
496        let hierarchical_ns = Namespace::new(
497            NamespaceIdent::from_vec(vec!["level1".to_string(), "level2".to_string()]).unwrap(),
498        );
499
500        let valid = validate_namespace(valid_ns.name());
501        let empty = validate_namespace(empty_ns.name());
502        let hierarchical = validate_namespace(hierarchical_ns.name());
503
504        assert!(valid.is_ok());
505        assert!(empty.is_err());
506        assert!(hierarchical.is_err());
507    }
508
509    #[test]
510    fn test_format_location_uri() {
511        let inputs = vec!["iceberg", "is/", "/nice/", "really/nice/", "/"];
512        let outputs = vec!["/iceberg", "/is", "/nice", "/really/nice", "/"];
513
514        inputs.into_iter().zip(outputs).for_each(|(inp, out)| {
515            let location = format_location_uri(inp.to_string());
516            assert_eq!(location, out);
517        })
518    }
519}