1use std::collections::HashMap;
19
20use chrono::Utc;
21use hive_metastore::{Database, PrincipalType, SerDeInfo, StorageDescriptor};
22use iceberg::spec::Schema;
23use iceberg::{Error, ErrorKind, Namespace, NamespaceIdent, Result};
24use pilota::{AHashMap, FastStr};
25
26use crate::schema::HiveSchemaBuilder;
27
28const HMS_DB_OWNER: &str = "hive.metastore.database.owner";
30const HMS_DEFAULT_DB_OWNER: &str = "user.name";
32const HMS_DB_OWNER_TYPE: &str = "hive.metastore.database.owner-type";
34const OWNER: &str = "owner";
36const COMMENT: &str = "comment";
38const LOCATION: &str = "location";
40const METADATA_LOCATION: &str = "metadata_location";
42const EXTERNAL: &str = "EXTERNAL";
44const EXTERNAL_TABLE: &str = "EXTERNAL_TABLE";
46const TABLE_TYPE: &str = "table_type";
48const SERIALIZATION_LIB: &str = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
50const INPUT_FORMAT: &str = "org.apache.hadoop.mapred.FileInputFormat";
52const OUTPUT_FORMAT: &str = "org.apache.hadoop.mapred.FileOutputFormat";
54
55pub(crate) fn convert_to_namespace(database: &Database) -> Result<Namespace> {
58 let mut properties = HashMap::new();
59
60 let name = database
61 .name
62 .as_ref()
63 .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Database name must be specified"))?
64 .to_string();
65
66 if let Some(description) = &database.description {
67 properties.insert(COMMENT.to_string(), description.to_string());
68 };
69
70 if let Some(location) = &database.location_uri {
71 properties.insert(LOCATION.to_string(), location.to_string());
72 };
73
74 if let Some(owner) = &database.owner_name {
75 properties.insert(HMS_DB_OWNER.to_string(), owner.to_string());
76 };
77
78 if let Some(owner_type) = database.owner_type {
79 let value = if owner_type == PrincipalType::USER {
80 "User"
81 } else if owner_type == PrincipalType::GROUP {
82 "Group"
83 } else if owner_type == PrincipalType::ROLE {
84 "Role"
85 } else {
86 unreachable!("Invalid owner type")
87 };
88
89 properties.insert(HMS_DB_OWNER_TYPE.to_string(), value.to_string());
90 };
91
92 if let Some(params) = &database.parameters {
93 params.iter().for_each(|(k, v)| {
94 properties.insert(k.clone().into(), v.clone().into());
95 });
96 };
97
98 Ok(Namespace::with_properties(
99 NamespaceIdent::new(name),
100 properties,
101 ))
102}
103
104pub(crate) fn convert_to_database(
107 namespace: &NamespaceIdent,
108 properties: &HashMap<String, String>,
109) -> Result<Database> {
110 let name = validate_namespace(namespace)?;
111 validate_owner_settings(properties)?;
112
113 let mut db = Database::default();
114 let mut parameters = AHashMap::new();
115
116 db.name = Some(name.into());
117
118 for (k, v) in properties {
119 match k.as_str() {
120 COMMENT => db.description = Some(v.clone().into()),
121 LOCATION => db.location_uri = Some(format_location_uri(v.clone()).into()),
122 HMS_DB_OWNER => db.owner_name = Some(v.clone().into()),
123 HMS_DB_OWNER_TYPE => {
124 let owner_type = match v.to_lowercase().as_str() {
125 "user" => PrincipalType::USER,
126 "group" => PrincipalType::GROUP,
127 "role" => PrincipalType::ROLE,
128 _ => {
129 return Err(Error::new(
130 ErrorKind::DataInvalid,
131 format!("Invalid value for setting 'owner_type': {v}"),
132 ));
133 }
134 };
135 db.owner_type = Some(owner_type);
136 }
137 _ => {
138 parameters.insert(
139 FastStr::from_string(k.clone()),
140 FastStr::from_string(v.clone()),
141 );
142 }
143 }
144 }
145
146 db.parameters = Some(parameters);
147
148 if db.owner_name.is_none() {
151 db.owner_name = Some(HMS_DEFAULT_DB_OWNER.into());
152 db.owner_type = Some(PrincipalType::USER);
153 }
154
155 Ok(db)
156}
157
158pub(crate) fn convert_to_hive_table(
159 db_name: String,
160 schema: &Schema,
161 table_name: String,
162 location: String,
163 metadata_location: String,
164 properties: &HashMap<String, String>,
165) -> Result<hive_metastore::Table> {
166 let serde_info = SerDeInfo {
167 serialization_lib: Some(SERIALIZATION_LIB.into()),
168 ..Default::default()
169 };
170
171 let hive_schema = HiveSchemaBuilder::from_iceberg(schema)?.build();
172
173 let storage_descriptor = StorageDescriptor {
174 location: Some(location.into()),
175 cols: Some(hive_schema),
176 input_format: Some(INPUT_FORMAT.into()),
177 output_format: Some(OUTPUT_FORMAT.into()),
178 serde_info: Some(serde_info),
179 ..Default::default()
180 };
181
182 let parameters = AHashMap::from([
183 (FastStr::from(EXTERNAL), FastStr::from("TRUE")),
184 (FastStr::from(TABLE_TYPE), FastStr::from("ICEBERG")),
185 (
186 FastStr::from(METADATA_LOCATION),
187 FastStr::from(metadata_location),
188 ),
189 ]);
190
191 let current_time_ms = get_current_time()?;
192 let owner = properties
193 .get(OWNER)
194 .map_or(HMS_DEFAULT_DB_OWNER.to_string(), |v| v.into());
195
196 Ok(hive_metastore::Table {
197 table_name: Some(table_name.into()),
198 db_name: Some(db_name.into()),
199 table_type: Some(EXTERNAL_TABLE.into()),
200 owner: Some(owner.into()),
201 create_time: Some(current_time_ms),
202 last_access_time: Some(current_time_ms),
203 sd: Some(storage_descriptor),
204 parameters: Some(parameters),
205 ..Default::default()
206 })
207}
208
209pub(crate) fn validate_namespace(namespace: &NamespaceIdent) -> Result<String> {
211 let name = namespace.as_ref();
212
213 if name.len() != 1 {
214 return Err(Error::new(
215 ErrorKind::DataInvalid,
216 format!(
217 "Invalid database name: {namespace:?}, hierarchical namespaces are not supported"
218 ),
219 ));
220 }
221
222 let name = name[0].clone();
223
224 if name.is_empty() {
225 return Err(Error::new(
226 ErrorKind::DataInvalid,
227 "Invalid database, provided namespace is empty.",
228 ));
229 }
230
231 Ok(name)
232}
233
234pub(crate) fn get_default_table_location(
236 namespace: &Namespace,
237 table_name: impl AsRef<str>,
238 warehouse: impl AsRef<str>,
239) -> String {
240 let properties = namespace.properties();
241
242 let location = match properties.get(LOCATION) {
243 Some(location) => location,
244 None => warehouse.as_ref(),
245 };
246
247 format!("{}/{}", location, table_name.as_ref())
248}
249
250pub(crate) fn get_metadata_location(
252 parameters: &Option<AHashMap<FastStr, FastStr>>,
253) -> Result<String> {
254 match parameters {
255 Some(properties) => match properties.get(METADATA_LOCATION) {
256 Some(location) => Ok(location.to_string()),
257 None => Err(Error::new(
258 ErrorKind::DataInvalid,
259 format!("No '{METADATA_LOCATION}' set on table"),
260 )),
261 },
262 None => Err(Error::new(
263 ErrorKind::DataInvalid,
264 "No 'parameters' set on table. Location of metadata is undefined",
265 )),
266 }
267}
268
269fn format_location_uri(location: String) -> String {
271 let mut location = location;
272
273 if !location.starts_with('/') {
274 location = format!("/{location}");
275 }
276
277 if location.ends_with('/') && location.len() > 1 {
278 location.pop();
279 }
280
281 location
282}
283
284fn validate_owner_settings(properties: &HashMap<String, String>) -> Result<()> {
287 let owner_is_set = properties.get(HMS_DB_OWNER).is_some();
288 let owner_type_is_set = properties.get(HMS_DB_OWNER_TYPE).is_some();
289
290 if owner_type_is_set && !owner_is_set {
291 return Err(Error::new(
292 ErrorKind::DataInvalid,
293 format!(
294 "Setting '{HMS_DB_OWNER_TYPE}' without setting '{HMS_DB_OWNER}' is not allowed"
295 ),
296 ));
297 }
298
299 Ok(())
300}
301
302fn get_current_time() -> Result<i32> {
303 let now = Utc::now();
304 now.timestamp().try_into().map_err(|_| {
305 Error::new(
306 ErrorKind::Unexpected,
307 "Current time is out of range for i32",
308 )
309 })
310}
311
312#[cfg(test)]
313mod tests {
314 use iceberg::spec::{NestedField, PrimitiveType, Type};
315 use iceberg::{MetadataLocation, Namespace, NamespaceIdent};
316
317 use super::*;
318
319 #[test]
320 fn test_get_metadata_location() -> Result<()> {
321 let params_valid = Some(AHashMap::from([(
322 FastStr::new(METADATA_LOCATION),
323 FastStr::new("my_location"),
324 )]));
325 let params_missing_key = Some(AHashMap::from([(
326 FastStr::new("not_here"),
327 FastStr::new("my_location"),
328 )]));
329
330 let result_valid = get_metadata_location(¶ms_valid)?;
331 let result_missing_key = get_metadata_location(¶ms_missing_key);
332 let result_no_params = get_metadata_location(&None);
333
334 assert_eq!(result_valid, "my_location");
335 assert!(result_missing_key.is_err());
336 assert!(result_no_params.is_err());
337
338 Ok(())
339 }
340
341 #[test]
342 fn test_convert_to_hive_table() -> Result<()> {
343 let db_name = "my_db".to_string();
344 let table_name = "my_table".to_string();
345 let location = "s3a://warehouse/hms".to_string();
346 let metadata_location =
347 MetadataLocation::new_with_table_location(location.clone()).to_string();
348 let properties = HashMap::new();
349 let schema = Schema::builder()
350 .with_schema_id(1)
351 .with_fields(vec![
352 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
353 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
354 ])
355 .build()?;
356
357 let result = convert_to_hive_table(
358 db_name.clone(),
359 &schema,
360 table_name.clone(),
361 location.clone(),
362 metadata_location,
363 &properties,
364 )?;
365
366 let serde_info = SerDeInfo {
367 serialization_lib: Some(SERIALIZATION_LIB.into()),
368 ..Default::default()
369 };
370
371 let hive_schema = HiveSchemaBuilder::from_iceberg(&schema)?.build();
372
373 let sd = StorageDescriptor {
374 location: Some(location.into()),
375 cols: Some(hive_schema),
376 input_format: Some(INPUT_FORMAT.into()),
377 output_format: Some(OUTPUT_FORMAT.into()),
378 serde_info: Some(serde_info),
379 ..Default::default()
380 };
381
382 assert_eq!(result.db_name, Some(db_name.into()));
383 assert_eq!(result.table_name, Some(table_name.into()));
384 assert_eq!(result.table_type, Some(EXTERNAL_TABLE.into()));
385 assert_eq!(result.owner, Some(HMS_DEFAULT_DB_OWNER.into()));
386 assert_eq!(result.sd, Some(sd));
387
388 Ok(())
389 }
390
391 #[test]
392 fn test_get_default_table_location() -> Result<()> {
393 let properties = HashMap::from([(LOCATION.to_string(), "db_location".to_string())]);
394
395 let namespace =
396 Namespace::with_properties(NamespaceIdent::new("default".into()), properties);
397 let table_name = "my_table";
398
399 let expected = "db_location/my_table";
400 let result = get_default_table_location(&namespace, table_name, "warehouse_location");
401
402 assert_eq!(expected, result);
403
404 Ok(())
405 }
406
407 #[test]
408 fn test_get_default_table_location_warehouse() -> Result<()> {
409 let namespace = Namespace::new(NamespaceIdent::new("default".into()));
410 let table_name = "my_table";
411
412 let expected = "warehouse_location/my_table";
413 let result = get_default_table_location(&namespace, table_name, "warehouse_location");
414
415 assert_eq!(expected, result);
416
417 Ok(())
418 }
419
420 #[test]
421 fn test_convert_to_namespace() -> Result<()> {
422 let properties = HashMap::from([
423 (COMMENT.to_string(), "my_description".to_string()),
424 (LOCATION.to_string(), "/my_location".to_string()),
425 (HMS_DB_OWNER.to_string(), "apache".to_string()),
426 (HMS_DB_OWNER_TYPE.to_string(), "User".to_string()),
427 ("key1".to_string(), "value1".to_string()),
428 ]);
429
430 let ident = NamespaceIdent::new("my_namespace".into());
431 let db = convert_to_database(&ident, &properties)?;
432
433 let expected_ns = Namespace::with_properties(ident, properties);
434 let result_ns = convert_to_namespace(&db)?;
435
436 assert_eq!(expected_ns, result_ns);
437
438 Ok(())
439 }
440
441 #[test]
442 fn test_validate_owner_settings() {
443 let valid = HashMap::from([
444 (HMS_DB_OWNER.to_string(), "apache".to_string()),
445 (HMS_DB_OWNER_TYPE.to_string(), "user".to_string()),
446 ]);
447 let invalid = HashMap::from([(HMS_DB_OWNER_TYPE.to_string(), "user".to_string())]);
448
449 assert!(validate_owner_settings(&valid).is_ok());
450 assert!(validate_owner_settings(&invalid).is_err());
451 }
452
453 #[test]
454 fn test_convert_to_database() -> Result<()> {
455 let ns = NamespaceIdent::new("my_namespace".into());
456 let properties = HashMap::from([
457 (COMMENT.to_string(), "my_description".to_string()),
458 (LOCATION.to_string(), "my_location".to_string()),
459 (HMS_DB_OWNER.to_string(), "apache".to_string()),
460 (HMS_DB_OWNER_TYPE.to_string(), "user".to_string()),
461 ("key1".to_string(), "value1".to_string()),
462 ]);
463
464 let db = convert_to_database(&ns, &properties)?;
465
466 assert_eq!(db.name, Some(FastStr::from("my_namespace")));
467 assert_eq!(db.description, Some(FastStr::from("my_description")));
468 assert_eq!(db.owner_name, Some(FastStr::from("apache")));
469 assert_eq!(db.owner_type, Some(PrincipalType::USER));
470
471 if let Some(params) = db.parameters {
472 assert_eq!(params.get("key1"), Some(&FastStr::from("value1")));
473 }
474
475 Ok(())
476 }
477
478 #[test]
479 fn test_convert_to_database_with_default_user() -> Result<()> {
480 let ns = NamespaceIdent::new("my_namespace".into());
481 let properties = HashMap::new();
482
483 let db = convert_to_database(&ns, &properties)?;
484
485 assert_eq!(db.name, Some(FastStr::from("my_namespace")));
486 assert_eq!(db.owner_name, Some(FastStr::from(HMS_DEFAULT_DB_OWNER)));
487 assert_eq!(db.owner_type, Some(PrincipalType::USER));
488
489 Ok(())
490 }
491
492 #[test]
493 fn test_validate_namespace() {
494 let valid_ns = Namespace::new(NamespaceIdent::new("ns".to_string()));
495 let empty_ns = Namespace::new(NamespaceIdent::new("".to_string()));
496 let hierarchical_ns = Namespace::new(
497 NamespaceIdent::from_vec(vec!["level1".to_string(), "level2".to_string()]).unwrap(),
498 );
499
500 let valid = validate_namespace(valid_ns.name());
501 let empty = validate_namespace(empty_ns.name());
502 let hierarchical = validate_namespace(hierarchical_ns.name());
503
504 assert!(valid.is_ok());
505 assert!(empty.is_err());
506 assert!(hierarchical.is_err());
507 }
508
509 #[test]
510 fn test_format_location_uri() {
511 let inputs = vec!["iceberg", "is/", "/nice/", "really/nice/", "/"];
512 let outputs = vec!["/iceberg", "/is", "/nice", "/really/nice", "/"];
513
514 inputs.into_iter().zip(outputs).for_each(|(inp, out)| {
515 let location = format_location_uri(inp.to_string());
516 assert_eq!(location, out);
517 })
518 }
519}