iceberg_catalog_glue/
schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18/// Property `iceberg.field.id` for `Column`
19pub(crate) const ICEBERG_FIELD_ID: &str = "iceberg.field.id";
20/// Property `iceberg.field.optional` for `Column`
21pub(crate) const ICEBERG_FIELD_OPTIONAL: &str = "iceberg.field.optional";
22/// Property `iceberg.field.current` for `Column`
23pub(crate) const ICEBERG_FIELD_CURRENT: &str = "iceberg.field.current";
24
25use std::collections::HashMap;
26
27use aws_sdk_glue::types::Column;
28use iceberg::spec::{PrimitiveType, SchemaVisitor, TableMetadata, visit_schema};
29use iceberg::{Error, ErrorKind, Result};
30
31use crate::error::from_aws_build_error;
32
33type GlueSchema = Vec<Column>;
34
35#[derive(Debug, Default)]
36pub(crate) struct GlueSchemaBuilder {
37    schema: GlueSchema,
38    is_current: bool,
39    depth: usize,
40}
41
42impl GlueSchemaBuilder {
43    /// Creates a new `GlueSchemaBuilder` from iceberg `Schema`
44    pub fn from_iceberg(metadata: &TableMetadata) -> Result<GlueSchemaBuilder> {
45        let current_schema = metadata.current_schema();
46
47        let mut builder = Self {
48            schema: Vec::new(),
49            is_current: true,
50            depth: 0,
51        };
52
53        visit_schema(current_schema, &mut builder)?;
54
55        builder.is_current = false;
56
57        for schema in metadata.schemas_iter() {
58            if schema.schema_id() == current_schema.schema_id() {
59                continue;
60            }
61
62            visit_schema(schema, &mut builder)?;
63        }
64
65        Ok(builder)
66    }
67
68    /// Returns the newly converted `GlueSchema`
69    pub fn build(self) -> GlueSchema {
70        self.schema
71    }
72
73    /// Check if is in `StructType` while traversing schema
74    fn is_inside_struct(&self) -> bool {
75        self.depth > 0
76    }
77}
78
79impl SchemaVisitor for GlueSchemaBuilder {
80    type T = String;
81
82    fn schema(
83        &mut self,
84        _schema: &iceberg::spec::Schema,
85        value: Self::T,
86    ) -> iceberg::Result<String> {
87        Ok(value)
88    }
89
90    fn before_struct_field(&mut self, _field: &iceberg::spec::NestedFieldRef) -> Result<()> {
91        self.depth += 1;
92        Ok(())
93    }
94
95    fn r#struct(
96        &mut self,
97        r#_struct: &iceberg::spec::StructType,
98        results: Vec<String>,
99    ) -> iceberg::Result<String> {
100        Ok(format!("struct<{}>", results.join(", ")))
101    }
102
103    fn after_struct_field(&mut self, _field: &iceberg::spec::NestedFieldRef) -> Result<()> {
104        self.depth -= 1;
105        Ok(())
106    }
107
108    fn field(
109        &mut self,
110        field: &iceberg::spec::NestedFieldRef,
111        value: String,
112    ) -> iceberg::Result<String> {
113        if self.is_inside_struct() {
114            return Ok(format!("{}:{}", field.name, &value));
115        }
116
117        let parameters = HashMap::from([
118            (ICEBERG_FIELD_ID.to_string(), format!("{}", field.id)),
119            (
120                ICEBERG_FIELD_OPTIONAL.to_string(),
121                format!("{}", !field.required).to_lowercase(),
122            ),
123            (
124                ICEBERG_FIELD_CURRENT.to_string(),
125                format!("{}", self.is_current).to_lowercase(),
126            ),
127        ]);
128
129        let mut builder = Column::builder()
130            .name(field.name.clone())
131            .r#type(&value)
132            .set_parameters(Some(parameters));
133
134        if let Some(comment) = field.doc.as_ref() {
135            builder = builder.comment(comment);
136        }
137
138        let column = builder.build().map_err(from_aws_build_error)?;
139
140        self.schema.push(column);
141
142        Ok(value)
143    }
144
145    fn list(&mut self, _list: &iceberg::spec::ListType, value: String) -> iceberg::Result<String> {
146        Ok(format!("array<{value}>"))
147    }
148
149    fn map(
150        &mut self,
151        _map: &iceberg::spec::MapType,
152        key_value: String,
153        value: String,
154    ) -> iceberg::Result<String> {
155        Ok(format!("map<{key_value},{value}>"))
156    }
157
158    fn primitive(&mut self, p: &iceberg::spec::PrimitiveType) -> iceberg::Result<Self::T> {
159        let glue_type = match p {
160            PrimitiveType::Boolean => "boolean".to_string(),
161            PrimitiveType::Int => "int".to_string(),
162            PrimitiveType::Long => "bigint".to_string(),
163            PrimitiveType::Float => "float".to_string(),
164            PrimitiveType::Double => "double".to_string(),
165            PrimitiveType::Date => "date".to_string(),
166            PrimitiveType::Timestamp => "timestamp".to_string(),
167            PrimitiveType::TimestampNs => "timestamp_ns".to_string(),
168            PrimitiveType::TimestamptzNs => "timestamptz_ns".to_string(),
169            PrimitiveType::Time | PrimitiveType::String | PrimitiveType::Uuid => {
170                "string".to_string()
171            }
172            PrimitiveType::Binary | PrimitiveType::Fixed(_) => "binary".to_string(),
173            PrimitiveType::Decimal { precision, scale } => {
174                format!("decimal({precision},{scale})")
175            }
176            _ => {
177                return Err(Error::new(
178                    ErrorKind::FeatureUnsupported,
179                    "Conversion from 'Timestamptz' is not supported",
180                ));
181            }
182        };
183
184        Ok(glue_type)
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use iceberg::TableCreation;
191    use iceberg::spec::{Schema, TableMetadataBuilder};
192
193    use super::*;
194
195    fn create_metadata(schema: Schema) -> Result<TableMetadata> {
196        let table_creation = TableCreation::builder()
197            .name("my_table".to_string())
198            .location("my_location".to_string())
199            .schema(schema)
200            .build();
201        let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
202            .build()?
203            .metadata;
204
205        Ok(metadata)
206    }
207
208    fn create_column(
209        name: impl Into<String>,
210        r#type: impl Into<String>,
211        id: impl Into<String>,
212        optional: bool,
213    ) -> Result<Column> {
214        let parameters = HashMap::from([
215            (ICEBERG_FIELD_ID.to_string(), id.into()),
216            (ICEBERG_FIELD_OPTIONAL.to_string(), optional.to_string()),
217            (ICEBERG_FIELD_CURRENT.to_string(), "true".to_string()),
218        ]);
219
220        Column::builder()
221            .name(name)
222            .r#type(r#type)
223            .set_comment(None)
224            .set_parameters(Some(parameters))
225            .build()
226            .map_err(from_aws_build_error)
227    }
228
229    #[test]
230    fn test_schema_with_simple_fields() -> Result<()> {
231        let record = r#"{
232            "type": "struct",
233            "schema-id": 1,
234            "fields": [
235                {
236                    "id": 1,
237                    "name": "c1",
238                    "required": true,
239                    "type": "boolean"
240                },
241                {
242                    "id": 2,
243                    "name": "c2",
244                    "required": true,
245                    "type": "int"
246                },
247                {
248                    "id": 3,
249                    "name": "c3",
250                    "required": true,
251                    "type": "long"
252                },
253                {
254                    "id": 4,
255                    "name": "c4",
256                    "required": true,
257                    "type": "float"
258                },
259                {
260                    "id": 5,
261                    "name": "c5",
262                    "required": true,
263                    "type": "double"
264                },
265                {
266                    "id": 6,
267                    "name": "c6",
268                    "required": true,
269                    "type": "decimal(2,2)"
270                },
271                {
272                    "id": 7,
273                    "name": "c7",
274                    "required": true,
275                    "type": "date"
276                },
277                {
278                    "id": 8,
279                    "name": "c8",
280                    "required": true,
281                    "type": "time"
282                },
283                {
284                    "id": 9,
285                    "name": "c9",
286                    "required": true,
287                    "type": "timestamp"
288                },
289                {
290                    "id": 10,
291                    "name": "c10",
292                    "required": true,
293                    "type": "string"
294                },
295                {
296                    "id": 11,
297                    "name": "c11",
298                    "required": true,
299                    "type": "uuid"
300                },
301                {
302                    "id": 12,
303                    "name": "c12",
304                    "required": true,
305                    "type": "fixed[4]"
306                },
307                {
308                    "id": 13,
309                    "name": "c13",
310                    "required": true,
311                    "type": "binary"
312                }
313            ]
314        }"#;
315
316        let schema = serde_json::from_str::<Schema>(record)?;
317        let metadata = create_metadata(schema)?;
318
319        let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
320
321        let expected = vec![
322            create_column("c1", "boolean", "1", false)?,
323            create_column("c2", "int", "2", false)?,
324            create_column("c3", "bigint", "3", false)?,
325            create_column("c4", "float", "4", false)?,
326            create_column("c5", "double", "5", false)?,
327            create_column("c6", "decimal(2,2)", "6", false)?,
328            create_column("c7", "date", "7", false)?,
329            create_column("c8", "string", "8", false)?,
330            create_column("c9", "timestamp", "9", false)?,
331            create_column("c10", "string", "10", false)?,
332            create_column("c11", "string", "11", false)?,
333            create_column("c12", "binary", "12", false)?,
334            create_column("c13", "binary", "13", false)?,
335        ];
336
337        assert_eq!(result, expected);
338
339        Ok(())
340    }
341
342    #[test]
343    fn test_schema_with_structs() -> Result<()> {
344        let record = r#"{
345            "type": "struct",
346            "schema-id": 1,
347            "fields": [
348                {
349                    "id": 1,
350                    "name": "person",
351                    "required": true,
352                    "type": {
353                        "type": "struct",
354                        "fields": [
355                            {
356                                "id": 2,
357                                "name": "name",
358                                "required": true,
359                                "type": "string"
360                            },
361                            {
362                                "id": 3,
363                                "name": "age",
364                                "required": false,
365                                "type": "int"
366                            }
367                        ]
368                    }
369                }
370            ]
371        }"#;
372
373        let schema = serde_json::from_str::<Schema>(record)?;
374        let metadata = create_metadata(schema)?;
375
376        let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
377
378        let expected = vec![create_column(
379            "person",
380            "struct<name:string, age:int>",
381            "1",
382            false,
383        )?];
384
385        assert_eq!(result, expected);
386
387        Ok(())
388    }
389
390    #[test]
391    fn test_schema_with_struct_inside_list() -> Result<()> {
392        let record = r#"
393        {
394            "schema-id": 1,
395            "type": "struct",
396            "fields": [
397                {
398                    "id": 1,
399                    "name": "location",
400                    "required": true,
401                    "type": {
402                        "type": "list",
403                        "element-id": 2,
404                        "element-required": true,
405                        "element": {
406                            "type": "struct",
407                            "fields": [
408                                {
409                                    "id": 3,
410                                    "name": "latitude",
411                                    "required": false,
412                                    "type": "float"
413                                },
414                                {
415                                    "id": 4,
416                                    "name": "longitude",
417                                    "required": false,
418                                    "type": "float"
419                                }
420                            ]
421                        }
422                    }
423                }
424            ]
425        }
426        "#;
427
428        let schema = serde_json::from_str::<Schema>(record)?;
429        let metadata = create_metadata(schema)?;
430
431        let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
432
433        let expected = vec![create_column(
434            "location",
435            "array<struct<latitude:float, longitude:float>>",
436            "1",
437            false,
438        )?];
439
440        assert_eq!(result, expected);
441
442        Ok(())
443    }
444
445    #[test]
446    fn test_schema_with_nested_maps() -> Result<()> {
447        let record = r#"
448            {
449                "schema-id": 1,
450                "type": "struct",
451                "fields": [
452                    {
453                        "id": 1,
454                        "name": "quux",
455                        "required": true,
456                        "type": {
457                            "type": "map",
458                            "key-id": 2,
459                            "key": "string",
460                            "value-id": 3,
461                            "value-required": true,
462                            "value": {
463                                "type": "map",
464                                "key-id": 4,
465                                "key": "string",
466                                "value-id": 5,
467                                "value-required": true,
468                                "value": "int"
469                            }
470                        }
471                    }
472                ]
473            }
474        "#;
475
476        let schema = serde_json::from_str::<Schema>(record)?;
477        let metadata = create_metadata(schema)?;
478
479        let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
480
481        let expected = vec![create_column(
482            "quux",
483            "map<string,map<string,int>>",
484            "1",
485            false,
486        )?];
487
488        assert_eq!(result, expected);
489
490        Ok(())
491    }
492
493    #[test]
494    fn test_schema_with_optional_fields() -> Result<()> {
495        let record = r#"{
496            "type": "struct",
497            "schema-id": 1,
498            "fields": [
499                {
500                    "id": 1,
501                    "name": "required_field",
502                    "required": true,
503                    "type": "string"
504                },
505                {
506                    "id": 2,
507                    "name": "optional_field",
508                    "required": false,
509                    "type": "int"
510                }
511            ]
512        }"#;
513
514        let schema = serde_json::from_str::<Schema>(record)?;
515        let metadata = create_metadata(schema)?;
516
517        let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
518
519        let expected = vec![
520            create_column("required_field", "string", "1", false)?,
521            create_column("optional_field", "int", "2", true)?,
522        ];
523
524        assert_eq!(result, expected);
525        Ok(())
526    }
527}