iceberg_catalog_glue/
schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18/// Property `iceberg.field.id` for `Column`
19pub(crate) const ICEBERG_FIELD_ID: &str = "iceberg.field.id";
20/// Property `iceberg.field.optional` for `Column`
21pub(crate) const ICEBERG_FIELD_OPTIONAL: &str = "iceberg.field.optional";
22/// Property `iceberg.field.current` for `Column`
23pub(crate) const ICEBERG_FIELD_CURRENT: &str = "iceberg.field.current";
24
25use std::collections::HashMap;
26
27use aws_sdk_glue::types::Column;
28use iceberg::spec::{PrimitiveType, SchemaVisitor, TableMetadata, visit_schema};
29use iceberg::{Error, ErrorKind, Result};
30
31use crate::error::from_aws_build_error;
32
33type GlueSchema = Vec<Column>;
34
35#[derive(Debug, Default)]
36pub(crate) struct GlueSchemaBuilder {
37    schema: GlueSchema,
38    is_current: bool,
39    depth: usize,
40}
41
42impl GlueSchemaBuilder {
43    /// Creates a new `GlueSchemaBuilder` from iceberg `Schema`
44    pub fn from_iceberg(metadata: &TableMetadata) -> Result<GlueSchemaBuilder> {
45        let current_schema = metadata.current_schema();
46
47        let mut builder = Self {
48            schema: Vec::new(),
49            is_current: true,
50            depth: 0,
51        };
52
53        visit_schema(current_schema, &mut builder)?;
54
55        builder.is_current = false;
56
57        for schema in metadata.schemas_iter() {
58            if schema.schema_id() == current_schema.schema_id() {
59                continue;
60            }
61
62            visit_schema(schema, &mut builder)?;
63        }
64
65        Ok(builder)
66    }
67
68    /// Returns the newly converted `GlueSchema`
69    pub fn build(self) -> GlueSchema {
70        self.schema
71    }
72
73    /// Check if is in `StructType` while traversing schema
74    fn is_inside_struct(&self) -> bool {
75        self.depth > 0
76    }
77}
78
79impl SchemaVisitor for GlueSchemaBuilder {
80    type T = String;
81
82    fn schema(
83        &mut self,
84        _schema: &iceberg::spec::Schema,
85        value: Self::T,
86    ) -> iceberg::Result<String> {
87        Ok(value)
88    }
89
90    fn before_struct_field(&mut self, _field: &iceberg::spec::NestedFieldRef) -> Result<()> {
91        self.depth += 1;
92        Ok(())
93    }
94
95    fn r#struct(
96        &mut self,
97        r#_struct: &iceberg::spec::StructType,
98        results: Vec<String>,
99    ) -> iceberg::Result<String> {
100        Ok(format!("struct<{}>", results.join(", ")))
101    }
102
103    fn after_struct_field(&mut self, _field: &iceberg::spec::NestedFieldRef) -> Result<()> {
104        self.depth -= 1;
105        Ok(())
106    }
107
108    fn field(
109        &mut self,
110        field: &iceberg::spec::NestedFieldRef,
111        value: String,
112    ) -> iceberg::Result<String> {
113        if self.is_inside_struct() {
114            return Ok(format!("{}:{}", field.name, &value));
115        }
116
117        let parameters = HashMap::from([
118            (ICEBERG_FIELD_ID.to_string(), format!("{}", field.id)),
119            (
120                ICEBERG_FIELD_OPTIONAL.to_string(),
121                format!("{}", !field.required).to_lowercase(),
122            ),
123            (
124                ICEBERG_FIELD_CURRENT.to_string(),
125                format!("{}", self.is_current).to_lowercase(),
126            ),
127        ]);
128
129        let mut builder = Column::builder()
130            .name(field.name.clone())
131            .r#type(&value)
132            .set_parameters(Some(parameters));
133
134        if let Some(comment) = field.doc.as_ref() {
135            builder = builder.comment(comment);
136        }
137
138        let column = builder.build().map_err(from_aws_build_error)?;
139
140        self.schema.push(column);
141
142        Ok(value)
143    }
144
145    fn list(&mut self, _list: &iceberg::spec::ListType, value: String) -> iceberg::Result<String> {
146        Ok(format!("array<{value}>"))
147    }
148
149    fn map(
150        &mut self,
151        _map: &iceberg::spec::MapType,
152        key_value: String,
153        value: String,
154    ) -> iceberg::Result<String> {
155        Ok(format!("map<{key_value},{value}>"))
156    }
157
158    fn primitive(&mut self, p: &iceberg::spec::PrimitiveType) -> iceberg::Result<Self::T> {
159        let glue_type = match p {
160            PrimitiveType::Boolean => "boolean".to_string(),
161            PrimitiveType::Int => "int".to_string(),
162            PrimitiveType::Long => "bigint".to_string(),
163            PrimitiveType::Float => "float".to_string(),
164            PrimitiveType::Double => "double".to_string(),
165            PrimitiveType::Date => "date".to_string(),
166            PrimitiveType::Timestamp => "timestamp".to_string(),
167            PrimitiveType::TimestampNs => "timestamp_ns".to_string(),
168            PrimitiveType::Timestamptz | PrimitiveType::TimestamptzNs => {
169                return Err(Error::new(
170                    ErrorKind::FeatureUnsupported,
171                    format!("Conversion from {p:?} is not supported"),
172                ));
173            }
174            PrimitiveType::Time | PrimitiveType::String | PrimitiveType::Uuid => {
175                "string".to_string()
176            }
177            PrimitiveType::Binary | PrimitiveType::Fixed(_) => "binary".to_string(),
178            PrimitiveType::Decimal { precision, scale } => {
179                format!("decimal({precision},{scale})")
180            }
181        };
182
183        Ok(glue_type)
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use iceberg::TableCreation;
190    use iceberg::spec::{Schema, TableMetadataBuilder};
191
192    use super::*;
193
194    fn create_metadata(schema: Schema) -> Result<TableMetadata> {
195        let table_creation = TableCreation::builder()
196            .name("my_table".to_string())
197            .location("my_location".to_string())
198            .schema(schema)
199            .build();
200        let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
201            .build()?
202            .metadata;
203
204        Ok(metadata)
205    }
206
207    fn create_column(
208        name: impl Into<String>,
209        r#type: impl Into<String>,
210        id: impl Into<String>,
211        optional: bool,
212    ) -> Result<Column> {
213        let parameters = HashMap::from([
214            (ICEBERG_FIELD_ID.to_string(), id.into()),
215            (ICEBERG_FIELD_OPTIONAL.to_string(), optional.to_string()),
216            (ICEBERG_FIELD_CURRENT.to_string(), "true".to_string()),
217        ]);
218
219        Column::builder()
220            .name(name)
221            .r#type(r#type)
222            .set_comment(None)
223            .set_parameters(Some(parameters))
224            .build()
225            .map_err(from_aws_build_error)
226    }
227
228    #[test]
229    fn test_schema_with_simple_fields() -> Result<()> {
230        let record = r#"{
231            "type": "struct",
232            "schema-id": 1,
233            "fields": [
234                {
235                    "id": 1,
236                    "name": "c1",
237                    "required": true,
238                    "type": "boolean"
239                },
240                {
241                    "id": 2,
242                    "name": "c2",
243                    "required": true,
244                    "type": "int"
245                },
246                {
247                    "id": 3,
248                    "name": "c3",
249                    "required": true,
250                    "type": "long"
251                },
252                {
253                    "id": 4,
254                    "name": "c4",
255                    "required": true,
256                    "type": "float"
257                },
258                {
259                    "id": 5,
260                    "name": "c5",
261                    "required": true,
262                    "type": "double"
263                },
264                {
265                    "id": 6,
266                    "name": "c6",
267                    "required": true,
268                    "type": "decimal(2,2)"
269                },
270                {
271                    "id": 7,
272                    "name": "c7",
273                    "required": true,
274                    "type": "date"
275                },
276                {
277                    "id": 8,
278                    "name": "c8",
279                    "required": true,
280                    "type": "time"
281                },
282                {
283                    "id": 9,
284                    "name": "c9",
285                    "required": true,
286                    "type": "timestamp"
287                },
288                {
289                    "id": 10,
290                    "name": "c10",
291                    "required": true,
292                    "type": "string"
293                },
294                {
295                    "id": 11,
296                    "name": "c11",
297                    "required": true,
298                    "type": "uuid"
299                },
300                {
301                    "id": 12,
302                    "name": "c12",
303                    "required": true,
304                    "type": "fixed[4]"
305                },
306                {
307                    "id": 13,
308                    "name": "c13",
309                    "required": true,
310                    "type": "binary"
311                }
312            ]
313        }"#;
314
315        let schema = serde_json::from_str::<Schema>(record)?;
316        let metadata = create_metadata(schema)?;
317
318        let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
319
320        let expected = vec![
321            create_column("c1", "boolean", "1", false)?,
322            create_column("c2", "int", "2", false)?,
323            create_column("c3", "bigint", "3", false)?,
324            create_column("c4", "float", "4", false)?,
325            create_column("c5", "double", "5", false)?,
326            create_column("c6", "decimal(2,2)", "6", false)?,
327            create_column("c7", "date", "7", false)?,
328            create_column("c8", "string", "8", false)?,
329            create_column("c9", "timestamp", "9", false)?,
330            create_column("c10", "string", "10", false)?,
331            create_column("c11", "string", "11", false)?,
332            create_column("c12", "binary", "12", false)?,
333            create_column("c13", "binary", "13", false)?,
334        ];
335
336        assert_eq!(result, expected);
337
338        Ok(())
339    }
340
341    #[test]
342    fn test_schema_with_structs() -> Result<()> {
343        let record = r#"{
344            "type": "struct",
345            "schema-id": 1,
346            "fields": [
347                {
348                    "id": 1,
349                    "name": "person",
350                    "required": true,
351                    "type": {
352                        "type": "struct",
353                        "fields": [
354                            {
355                                "id": 2,
356                                "name": "name",
357                                "required": true,
358                                "type": "string"
359                            },
360                            {
361                                "id": 3,
362                                "name": "age",
363                                "required": false,
364                                "type": "int"
365                            }
366                        ]
367                    }
368                }
369            ]
370        }"#;
371
372        let schema = serde_json::from_str::<Schema>(record)?;
373        let metadata = create_metadata(schema)?;
374
375        let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
376
377        let expected = vec![create_column(
378            "person",
379            "struct<name:string, age:int>",
380            "1",
381            false,
382        )?];
383
384        assert_eq!(result, expected);
385
386        Ok(())
387    }
388
389    #[test]
390    fn test_schema_with_struct_inside_list() -> Result<()> {
391        let record = r#"
392        {
393            "schema-id": 1,
394            "type": "struct",
395            "fields": [
396                {
397                    "id": 1,
398                    "name": "location",
399                    "required": true,
400                    "type": {
401                        "type": "list",
402                        "element-id": 2,
403                        "element-required": true,
404                        "element": {
405                            "type": "struct",
406                            "fields": [
407                                {
408                                    "id": 3,
409                                    "name": "latitude",
410                                    "required": false,
411                                    "type": "float"
412                                },
413                                {
414                                    "id": 4,
415                                    "name": "longitude",
416                                    "required": false,
417                                    "type": "float"
418                                }
419                            ]
420                        }
421                    }
422                }
423            ]
424        }
425        "#;
426
427        let schema = serde_json::from_str::<Schema>(record)?;
428        let metadata = create_metadata(schema)?;
429
430        let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
431
432        let expected = vec![create_column(
433            "location",
434            "array<struct<latitude:float, longitude:float>>",
435            "1",
436            false,
437        )?];
438
439        assert_eq!(result, expected);
440
441        Ok(())
442    }
443
444    #[test]
445    fn test_schema_with_nested_maps() -> Result<()> {
446        let record = r#"
447            {
448                "schema-id": 1,
449                "type": "struct",
450                "fields": [
451                    {
452                        "id": 1,
453                        "name": "quux",
454                        "required": true,
455                        "type": {
456                            "type": "map",
457                            "key-id": 2,
458                            "key": "string",
459                            "value-id": 3,
460                            "value-required": true,
461                            "value": {
462                                "type": "map",
463                                "key-id": 4,
464                                "key": "string",
465                                "value-id": 5,
466                                "value-required": true,
467                                "value": "int"
468                            }
469                        }
470                    }
471                ]
472            }
473        "#;
474
475        let schema = serde_json::from_str::<Schema>(record)?;
476        let metadata = create_metadata(schema)?;
477
478        let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
479
480        let expected = vec![create_column(
481            "quux",
482            "map<string,map<string,int>>",
483            "1",
484            false,
485        )?];
486
487        assert_eq!(result, expected);
488
489        Ok(())
490    }
491
492    #[test]
493    fn test_schema_with_optional_fields() -> Result<()> {
494        let record = r#"{
495            "type": "struct",
496            "schema-id": 1,
497            "fields": [
498                {
499                    "id": 1,
500                    "name": "required_field",
501                    "required": true,
502                    "type": "string"
503                },
504                {
505                    "id": 2,
506                    "name": "optional_field",
507                    "required": false,
508                    "type": "int"
509                }
510            ]
511        }"#;
512
513        let schema = serde_json::from_str::<Schema>(record)?;
514        let metadata = create_metadata(schema)?;
515
516        let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
517
518        let expected = vec![
519            create_column("required_field", "string", "1", false)?,
520            create_column("optional_field", "int", "2", true)?,
521        ];
522
523        assert_eq!(result, expected);
524        Ok(())
525    }
526}