1pub(crate) const ICEBERG_FIELD_ID: &str = "iceberg.field.id";
20pub(crate) const ICEBERG_FIELD_OPTIONAL: &str = "iceberg.field.optional";
22pub(crate) const ICEBERG_FIELD_CURRENT: &str = "iceberg.field.current";
24
25use std::collections::HashMap;
26
27use aws_sdk_glue::types::Column;
28use iceberg::spec::{PrimitiveType, SchemaVisitor, TableMetadata, visit_schema};
29use iceberg::{Error, ErrorKind, Result};
30
31use crate::error::from_aws_build_error;
32
33type GlueSchema = Vec<Column>;
34
35#[derive(Debug, Default)]
36pub(crate) struct GlueSchemaBuilder {
37 schema: GlueSchema,
38 is_current: bool,
39 depth: usize,
40}
41
42impl GlueSchemaBuilder {
43 pub fn from_iceberg(metadata: &TableMetadata) -> Result<GlueSchemaBuilder> {
45 let current_schema = metadata.current_schema();
46
47 let mut builder = Self {
48 schema: Vec::new(),
49 is_current: true,
50 depth: 0,
51 };
52
53 visit_schema(current_schema, &mut builder)?;
54
55 builder.is_current = false;
56
57 for schema in metadata.schemas_iter() {
58 if schema.schema_id() == current_schema.schema_id() {
59 continue;
60 }
61
62 visit_schema(schema, &mut builder)?;
63 }
64
65 Ok(builder)
66 }
67
68 pub fn build(self) -> GlueSchema {
70 self.schema
71 }
72
73 fn is_inside_struct(&self) -> bool {
75 self.depth > 0
76 }
77}
78
79impl SchemaVisitor for GlueSchemaBuilder {
80 type T = String;
81
82 fn schema(
83 &mut self,
84 _schema: &iceberg::spec::Schema,
85 value: Self::T,
86 ) -> iceberg::Result<String> {
87 Ok(value)
88 }
89
90 fn before_struct_field(&mut self, _field: &iceberg::spec::NestedFieldRef) -> Result<()> {
91 self.depth += 1;
92 Ok(())
93 }
94
95 fn r#struct(
96 &mut self,
97 r#_struct: &iceberg::spec::StructType,
98 results: Vec<String>,
99 ) -> iceberg::Result<String> {
100 Ok(format!("struct<{}>", results.join(", ")))
101 }
102
103 fn after_struct_field(&mut self, _field: &iceberg::spec::NestedFieldRef) -> Result<()> {
104 self.depth -= 1;
105 Ok(())
106 }
107
108 fn field(
109 &mut self,
110 field: &iceberg::spec::NestedFieldRef,
111 value: String,
112 ) -> iceberg::Result<String> {
113 if self.is_inside_struct() {
114 return Ok(format!("{}:{}", field.name, &value));
115 }
116
117 let parameters = HashMap::from([
118 (ICEBERG_FIELD_ID.to_string(), format!("{}", field.id)),
119 (
120 ICEBERG_FIELD_OPTIONAL.to_string(),
121 format!("{}", !field.required).to_lowercase(),
122 ),
123 (
124 ICEBERG_FIELD_CURRENT.to_string(),
125 format!("{}", self.is_current).to_lowercase(),
126 ),
127 ]);
128
129 let mut builder = Column::builder()
130 .name(field.name.clone())
131 .r#type(&value)
132 .set_parameters(Some(parameters));
133
134 if let Some(comment) = field.doc.as_ref() {
135 builder = builder.comment(comment);
136 }
137
138 let column = builder.build().map_err(from_aws_build_error)?;
139
140 self.schema.push(column);
141
142 Ok(value)
143 }
144
145 fn list(&mut self, _list: &iceberg::spec::ListType, value: String) -> iceberg::Result<String> {
146 Ok(format!("array<{value}>"))
147 }
148
149 fn map(
150 &mut self,
151 _map: &iceberg::spec::MapType,
152 key_value: String,
153 value: String,
154 ) -> iceberg::Result<String> {
155 Ok(format!("map<{key_value},{value}>"))
156 }
157
158 fn primitive(&mut self, p: &iceberg::spec::PrimitiveType) -> iceberg::Result<Self::T> {
159 let glue_type = match p {
160 PrimitiveType::Boolean => "boolean".to_string(),
161 PrimitiveType::Int => "int".to_string(),
162 PrimitiveType::Long => "bigint".to_string(),
163 PrimitiveType::Float => "float".to_string(),
164 PrimitiveType::Double => "double".to_string(),
165 PrimitiveType::Date => "date".to_string(),
166 PrimitiveType::Timestamp => "timestamp".to_string(),
167 PrimitiveType::TimestampNs => "timestamp_ns".to_string(),
168 PrimitiveType::TimestamptzNs => "timestamptz_ns".to_string(),
169 PrimitiveType::Time | PrimitiveType::String | PrimitiveType::Uuid => {
170 "string".to_string()
171 }
172 PrimitiveType::Binary | PrimitiveType::Fixed(_) => "binary".to_string(),
173 PrimitiveType::Decimal { precision, scale } => {
174 format!("decimal({precision},{scale})")
175 }
176 _ => {
177 return Err(Error::new(
178 ErrorKind::FeatureUnsupported,
179 "Conversion from 'Timestamptz' is not supported",
180 ));
181 }
182 };
183
184 Ok(glue_type)
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use iceberg::TableCreation;
191 use iceberg::spec::{Schema, TableMetadataBuilder};
192
193 use super::*;
194
195 fn create_metadata(schema: Schema) -> Result<TableMetadata> {
196 let table_creation = TableCreation::builder()
197 .name("my_table".to_string())
198 .location("my_location".to_string())
199 .schema(schema)
200 .build();
201 let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
202 .build()?
203 .metadata;
204
205 Ok(metadata)
206 }
207
208 fn create_column(
209 name: impl Into<String>,
210 r#type: impl Into<String>,
211 id: impl Into<String>,
212 optional: bool,
213 ) -> Result<Column> {
214 let parameters = HashMap::from([
215 (ICEBERG_FIELD_ID.to_string(), id.into()),
216 (ICEBERG_FIELD_OPTIONAL.to_string(), optional.to_string()),
217 (ICEBERG_FIELD_CURRENT.to_string(), "true".to_string()),
218 ]);
219
220 Column::builder()
221 .name(name)
222 .r#type(r#type)
223 .set_comment(None)
224 .set_parameters(Some(parameters))
225 .build()
226 .map_err(from_aws_build_error)
227 }
228
229 #[test]
230 fn test_schema_with_simple_fields() -> Result<()> {
231 let record = r#"{
232 "type": "struct",
233 "schema-id": 1,
234 "fields": [
235 {
236 "id": 1,
237 "name": "c1",
238 "required": true,
239 "type": "boolean"
240 },
241 {
242 "id": 2,
243 "name": "c2",
244 "required": true,
245 "type": "int"
246 },
247 {
248 "id": 3,
249 "name": "c3",
250 "required": true,
251 "type": "long"
252 },
253 {
254 "id": 4,
255 "name": "c4",
256 "required": true,
257 "type": "float"
258 },
259 {
260 "id": 5,
261 "name": "c5",
262 "required": true,
263 "type": "double"
264 },
265 {
266 "id": 6,
267 "name": "c6",
268 "required": true,
269 "type": "decimal(2,2)"
270 },
271 {
272 "id": 7,
273 "name": "c7",
274 "required": true,
275 "type": "date"
276 },
277 {
278 "id": 8,
279 "name": "c8",
280 "required": true,
281 "type": "time"
282 },
283 {
284 "id": 9,
285 "name": "c9",
286 "required": true,
287 "type": "timestamp"
288 },
289 {
290 "id": 10,
291 "name": "c10",
292 "required": true,
293 "type": "string"
294 },
295 {
296 "id": 11,
297 "name": "c11",
298 "required": true,
299 "type": "uuid"
300 },
301 {
302 "id": 12,
303 "name": "c12",
304 "required": true,
305 "type": "fixed[4]"
306 },
307 {
308 "id": 13,
309 "name": "c13",
310 "required": true,
311 "type": "binary"
312 }
313 ]
314 }"#;
315
316 let schema = serde_json::from_str::<Schema>(record)?;
317 let metadata = create_metadata(schema)?;
318
319 let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
320
321 let expected = vec![
322 create_column("c1", "boolean", "1", false)?,
323 create_column("c2", "int", "2", false)?,
324 create_column("c3", "bigint", "3", false)?,
325 create_column("c4", "float", "4", false)?,
326 create_column("c5", "double", "5", false)?,
327 create_column("c6", "decimal(2,2)", "6", false)?,
328 create_column("c7", "date", "7", false)?,
329 create_column("c8", "string", "8", false)?,
330 create_column("c9", "timestamp", "9", false)?,
331 create_column("c10", "string", "10", false)?,
332 create_column("c11", "string", "11", false)?,
333 create_column("c12", "binary", "12", false)?,
334 create_column("c13", "binary", "13", false)?,
335 ];
336
337 assert_eq!(result, expected);
338
339 Ok(())
340 }
341
342 #[test]
343 fn test_schema_with_structs() -> Result<()> {
344 let record = r#"{
345 "type": "struct",
346 "schema-id": 1,
347 "fields": [
348 {
349 "id": 1,
350 "name": "person",
351 "required": true,
352 "type": {
353 "type": "struct",
354 "fields": [
355 {
356 "id": 2,
357 "name": "name",
358 "required": true,
359 "type": "string"
360 },
361 {
362 "id": 3,
363 "name": "age",
364 "required": false,
365 "type": "int"
366 }
367 ]
368 }
369 }
370 ]
371 }"#;
372
373 let schema = serde_json::from_str::<Schema>(record)?;
374 let metadata = create_metadata(schema)?;
375
376 let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
377
378 let expected = vec![create_column(
379 "person",
380 "struct<name:string, age:int>",
381 "1",
382 false,
383 )?];
384
385 assert_eq!(result, expected);
386
387 Ok(())
388 }
389
390 #[test]
391 fn test_schema_with_struct_inside_list() -> Result<()> {
392 let record = r#"
393 {
394 "schema-id": 1,
395 "type": "struct",
396 "fields": [
397 {
398 "id": 1,
399 "name": "location",
400 "required": true,
401 "type": {
402 "type": "list",
403 "element-id": 2,
404 "element-required": true,
405 "element": {
406 "type": "struct",
407 "fields": [
408 {
409 "id": 3,
410 "name": "latitude",
411 "required": false,
412 "type": "float"
413 },
414 {
415 "id": 4,
416 "name": "longitude",
417 "required": false,
418 "type": "float"
419 }
420 ]
421 }
422 }
423 }
424 ]
425 }
426 "#;
427
428 let schema = serde_json::from_str::<Schema>(record)?;
429 let metadata = create_metadata(schema)?;
430
431 let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
432
433 let expected = vec![create_column(
434 "location",
435 "array<struct<latitude:float, longitude:float>>",
436 "1",
437 false,
438 )?];
439
440 assert_eq!(result, expected);
441
442 Ok(())
443 }
444
445 #[test]
446 fn test_schema_with_nested_maps() -> Result<()> {
447 let record = r#"
448 {
449 "schema-id": 1,
450 "type": "struct",
451 "fields": [
452 {
453 "id": 1,
454 "name": "quux",
455 "required": true,
456 "type": {
457 "type": "map",
458 "key-id": 2,
459 "key": "string",
460 "value-id": 3,
461 "value-required": true,
462 "value": {
463 "type": "map",
464 "key-id": 4,
465 "key": "string",
466 "value-id": 5,
467 "value-required": true,
468 "value": "int"
469 }
470 }
471 }
472 ]
473 }
474 "#;
475
476 let schema = serde_json::from_str::<Schema>(record)?;
477 let metadata = create_metadata(schema)?;
478
479 let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
480
481 let expected = vec![create_column(
482 "quux",
483 "map<string,map<string,int>>",
484 "1",
485 false,
486 )?];
487
488 assert_eq!(result, expected);
489
490 Ok(())
491 }
492
493 #[test]
494 fn test_schema_with_optional_fields() -> Result<()> {
495 let record = r#"{
496 "type": "struct",
497 "schema-id": 1,
498 "fields": [
499 {
500 "id": 1,
501 "name": "required_field",
502 "required": true,
503 "type": "string"
504 },
505 {
506 "id": 2,
507 "name": "optional_field",
508 "required": false,
509 "type": "int"
510 }
511 ]
512 }"#;
513
514 let schema = serde_json::from_str::<Schema>(record)?;
515 let metadata = create_metadata(schema)?;
516
517 let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
518
519 let expected = vec![
520 create_column("required_field", "string", "1", false)?,
521 create_column("optional_field", "int", "2", true)?,
522 ];
523
524 assert_eq!(result, expected);
525 Ok(())
526 }
527}