iceberg_catalog_hms/
schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use hive_metastore::FieldSchema;
19use iceberg::spec::{PrimitiveType, Schema, SchemaVisitor, visit_schema};
20use iceberg::{Error, ErrorKind, Result};
21
22type HiveSchema = Vec<FieldSchema>;
23
24#[derive(Debug, Default)]
25pub(crate) struct HiveSchemaBuilder {
26    schema: HiveSchema,
27    depth: usize,
28}
29
30impl HiveSchemaBuilder {
31    /// Creates a new `HiveSchemaBuilder` from iceberg `Schema`
32    pub fn from_iceberg(schema: &Schema) -> Result<HiveSchemaBuilder> {
33        let mut builder = Self::default();
34        visit_schema(schema, &mut builder)?;
35        Ok(builder)
36    }
37
38    /// Returns the newly converted `HiveSchema`
39    pub fn build(self) -> HiveSchema {
40        self.schema
41    }
42
43    /// Check if is in `StructType` while traversing schema
44    fn is_inside_struct(&self) -> bool {
45        self.depth > 0
46    }
47}
48
49impl SchemaVisitor for HiveSchemaBuilder {
50    type T = String;
51
52    fn schema(
53        &mut self,
54        _schema: &iceberg::spec::Schema,
55        value: String,
56    ) -> iceberg::Result<String> {
57        Ok(value)
58    }
59
60    fn before_struct_field(
61        &mut self,
62        _field: &iceberg::spec::NestedFieldRef,
63    ) -> iceberg::Result<()> {
64        self.depth += 1;
65        Ok(())
66    }
67
68    fn r#struct(
69        &mut self,
70        r#_struct: &iceberg::spec::StructType,
71        results: Vec<String>,
72    ) -> iceberg::Result<String> {
73        Ok(format!("struct<{}>", results.join(", ")))
74    }
75
76    fn after_struct_field(
77        &mut self,
78        _field: &iceberg::spec::NestedFieldRef,
79    ) -> iceberg::Result<()> {
80        self.depth -= 1;
81        Ok(())
82    }
83
84    fn field(
85        &mut self,
86        field: &iceberg::spec::NestedFieldRef,
87        value: String,
88    ) -> iceberg::Result<String> {
89        if self.is_inside_struct() {
90            return Ok(format!("{}:{}", field.name, value));
91        }
92
93        self.schema.push(FieldSchema {
94            name: Some(field.name.clone().into()),
95            r#type: Some(value.clone().into()),
96            comment: field.doc.clone().map(|doc| doc.into()),
97        });
98
99        Ok(value)
100    }
101
102    fn list(&mut self, _list: &iceberg::spec::ListType, value: String) -> iceberg::Result<String> {
103        Ok(format!("array<{value}>"))
104    }
105
106    fn map(
107        &mut self,
108        _map: &iceberg::spec::MapType,
109        key_value: String,
110        value: String,
111    ) -> iceberg::Result<String> {
112        Ok(format!("map<{key_value},{value}>"))
113    }
114
115    fn primitive(&mut self, p: &iceberg::spec::PrimitiveType) -> iceberg::Result<String> {
116        let hive_type = match p {
117            PrimitiveType::Boolean => "boolean".to_string(),
118            PrimitiveType::Int => "int".to_string(),
119            PrimitiveType::Long => "bigint".to_string(),
120            PrimitiveType::Float => "float".to_string(),
121            PrimitiveType::Double => "double".to_string(),
122            PrimitiveType::Date => "date".to_string(),
123            PrimitiveType::Timestamp => "timestamp".to_string(),
124            PrimitiveType::TimestampNs => "timestamp_ns".to_string(),
125            PrimitiveType::Timestamptz | PrimitiveType::TimestamptzNs => {
126                return Err(Error::new(
127                    ErrorKind::FeatureUnsupported,
128                    format!("Conversion from {p:?} is not supported"),
129                ));
130            }
131            PrimitiveType::Time | PrimitiveType::String | PrimitiveType::Uuid => {
132                "string".to_string()
133            }
134            PrimitiveType::Binary | PrimitiveType::Fixed(_) => "binary".to_string(),
135            PrimitiveType::Decimal { precision, scale } => {
136                format!("decimal({precision},{scale})")
137            }
138        };
139
140        Ok(hive_type)
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use iceberg::Result;
147    use iceberg::spec::Schema;
148
149    use super::*;
150
151    #[test]
152    fn test_schema_with_nested_maps() -> Result<()> {
153        let record = r#"
154            {
155                "schema-id": 1,
156                "type": "struct",
157                "fields": [
158                    {
159                        "id": 1,
160                        "name": "quux",
161                        "required": true,
162                        "type": {
163                            "type": "map",
164                            "key-id": 2,
165                            "key": "string",
166                            "value-id": 3,
167                            "value-required": true,
168                            "value": {
169                                "type": "map",
170                                "key-id": 4,
171                                "key": "string",
172                                "value-id": 5,
173                                "value-required": true,
174                                "value": "int"
175                            }
176                        }
177                    }
178                ]
179            }
180        "#;
181
182        let schema = serde_json::from_str::<Schema>(record)?;
183
184        let result = HiveSchemaBuilder::from_iceberg(&schema)?.build();
185
186        let expected = vec![FieldSchema {
187            name: Some("quux".into()),
188            r#type: Some("map<string,map<string,int>>".into()),
189            comment: None,
190        }];
191
192        assert_eq!(result, expected);
193
194        Ok(())
195    }
196
197    #[test]
198    fn test_schema_with_struct_inside_list() -> Result<()> {
199        let record = r#"
200        {
201            "schema-id": 1,
202            "type": "struct",
203            "fields": [
204                {
205                    "id": 1,
206                    "name": "location",
207                    "required": true,
208                    "type": {
209                        "type": "list",
210                        "element-id": 2,
211                        "element-required": true,
212                        "element": {
213                            "type": "struct",
214                            "fields": [
215                                {
216                                    "id": 3,
217                                    "name": "latitude",
218                                    "required": false,
219                                    "type": "float"
220                                },
221                                {
222                                    "id": 4,
223                                    "name": "longitude",
224                                    "required": false,
225                                    "type": "float"
226                                }
227                            ]
228                        }
229                    }
230                }
231            ]
232        }
233        "#;
234
235        let schema = serde_json::from_str::<Schema>(record)?;
236
237        let result = HiveSchemaBuilder::from_iceberg(&schema)?.build();
238
239        let expected = vec![FieldSchema {
240            name: Some("location".into()),
241            r#type: Some("array<struct<latitude:float, longitude:float>>".into()),
242            comment: None,
243        }];
244
245        assert_eq!(result, expected);
246
247        Ok(())
248    }
249
250    #[test]
251    fn test_schema_with_structs() -> Result<()> {
252        let record = r#"{
253            "type": "struct",
254            "schema-id": 1,
255            "fields": [
256                {
257                    "id": 1,
258                    "name": "person",
259                    "required": true,
260                    "type": {
261                        "type": "struct",
262                        "fields": [
263                            {
264                                "id": 2,
265                                "name": "name",
266                                "required": true,
267                                "type": "string"
268                            },
269                            {
270                                "id": 3,
271                                "name": "age",
272                                "required": false,
273                                "type": "int"
274                            }
275                        ]
276                    }
277                }
278            ]
279        }"#;
280
281        let schema = serde_json::from_str::<Schema>(record)?;
282
283        let result = HiveSchemaBuilder::from_iceberg(&schema)?.build();
284
285        let expected = vec![FieldSchema {
286            name: Some("person".into()),
287            r#type: Some("struct<name:string, age:int>".into()),
288            comment: None,
289        }];
290
291        assert_eq!(result, expected);
292
293        Ok(())
294    }
295
296    #[test]
297    fn test_schema_with_simple_fields() -> Result<()> {
298        let record = r#"{
299            "type": "struct",
300            "schema-id": 1,
301            "fields": [
302                {
303                    "id": 1,
304                    "name": "c1",
305                    "required": true,
306                    "type": "boolean"
307                },
308                {
309                    "id": 2,
310                    "name": "c2",
311                    "required": true,
312                    "type": "int"
313                },
314                {
315                    "id": 3,
316                    "name": "c3",
317                    "required": true,
318                    "type": "long"
319                },
320                {
321                    "id": 4,
322                    "name": "c4",
323                    "required": true,
324                    "type": "float"
325                },
326                {
327                    "id": 5,
328                    "name": "c5",
329                    "required": true,
330                    "type": "double"
331                },
332                {
333                    "id": 6,
334                    "name": "c6",
335                    "required": true,
336                    "type": "decimal(2,2)"
337                },
338                {
339                    "id": 7,
340                    "name": "c7",
341                    "required": true,
342                    "type": "date"
343                },
344                {
345                    "id": 8,
346                    "name": "c8",
347                    "required": true,
348                    "type": "time"
349                },
350                {
351                    "id": 9,
352                    "name": "c9",
353                    "required": true,
354                    "type": "timestamp"
355                },
356                {
357                    "id": 10,
358                    "name": "c10",
359                    "required": true,
360                    "type": "string"
361                },
362                {
363                    "id": 11,
364                    "name": "c11",
365                    "required": true,
366                    "type": "uuid"
367                },
368                {
369                    "id": 12,
370                    "name": "c12",
371                    "required": true,
372                    "type": "fixed[4]"
373                },
374                {
375                    "id": 13,
376                    "name": "c13",
377                    "required": true,
378                    "type": "binary"
379                }
380            ]
381        }"#;
382
383        let schema = serde_json::from_str::<Schema>(record)?;
384
385        let result = HiveSchemaBuilder::from_iceberg(&schema)?.build();
386
387        let expected = vec![
388            FieldSchema {
389                name: Some("c1".into()),
390                r#type: Some("boolean".into()),
391                comment: None,
392            },
393            FieldSchema {
394                name: Some("c2".into()),
395                r#type: Some("int".into()),
396                comment: None,
397            },
398            FieldSchema {
399                name: Some("c3".into()),
400                r#type: Some("bigint".into()),
401                comment: None,
402            },
403            FieldSchema {
404                name: Some("c4".into()),
405                r#type: Some("float".into()),
406                comment: None,
407            },
408            FieldSchema {
409                name: Some("c5".into()),
410                r#type: Some("double".into()),
411                comment: None,
412            },
413            FieldSchema {
414                name: Some("c6".into()),
415                r#type: Some("decimal(2,2)".into()),
416                comment: None,
417            },
418            FieldSchema {
419                name: Some("c7".into()),
420                r#type: Some("date".into()),
421                comment: None,
422            },
423            FieldSchema {
424                name: Some("c8".into()),
425                r#type: Some("string".into()),
426                comment: None,
427            },
428            FieldSchema {
429                name: Some("c9".into()),
430                r#type: Some("timestamp".into()),
431                comment: None,
432            },
433            FieldSchema {
434                name: Some("c10".into()),
435                r#type: Some("string".into()),
436                comment: None,
437            },
438            FieldSchema {
439                name: Some("c11".into()),
440                r#type: Some("string".into()),
441                comment: None,
442            },
443            FieldSchema {
444                name: Some("c12".into()),
445                r#type: Some("binary".into()),
446                comment: None,
447            },
448            FieldSchema {
449                name: Some("c13".into()),
450                r#type: Some("binary".into()),
451                comment: None,
452            },
453        ];
454
455        assert_eq!(result, expected);
456
457        Ok(())
458    }
459}