1pub(crate) const ICEBERG_FIELD_ID: &str = "iceberg.field.id";
20pub(crate) const ICEBERG_FIELD_OPTIONAL: &str = "iceberg.field.optional";
22pub(crate) const ICEBERG_FIELD_CURRENT: &str = "iceberg.field.current";
24
25use std::collections::HashMap;
26
27use aws_sdk_glue::types::Column;
28use iceberg::spec::{PrimitiveType, SchemaVisitor, TableMetadata, visit_schema};
29use iceberg::{Error, ErrorKind, Result};
30
31use crate::error::from_aws_build_error;
32
33type GlueSchema = Vec<Column>;
34
35#[derive(Debug, Default)]
36pub(crate) struct GlueSchemaBuilder {
37 schema: GlueSchema,
38 is_current: bool,
39 depth: usize,
40}
41
42impl GlueSchemaBuilder {
43 pub fn from_iceberg(metadata: &TableMetadata) -> Result<GlueSchemaBuilder> {
45 let current_schema = metadata.current_schema();
46
47 let mut builder = Self {
48 schema: Vec::new(),
49 is_current: true,
50 depth: 0,
51 };
52
53 visit_schema(current_schema, &mut builder)?;
54
55 builder.is_current = false;
56
57 for schema in metadata.schemas_iter() {
58 if schema.schema_id() == current_schema.schema_id() {
59 continue;
60 }
61
62 visit_schema(schema, &mut builder)?;
63 }
64
65 Ok(builder)
66 }
67
68 pub fn build(self) -> GlueSchema {
70 self.schema
71 }
72
73 fn is_inside_struct(&self) -> bool {
75 self.depth > 0
76 }
77}
78
79impl SchemaVisitor for GlueSchemaBuilder {
80 type T = String;
81
82 fn schema(
83 &mut self,
84 _schema: &iceberg::spec::Schema,
85 value: Self::T,
86 ) -> iceberg::Result<String> {
87 Ok(value)
88 }
89
90 fn before_struct_field(&mut self, _field: &iceberg::spec::NestedFieldRef) -> Result<()> {
91 self.depth += 1;
92 Ok(())
93 }
94
95 fn r#struct(
96 &mut self,
97 r#_struct: &iceberg::spec::StructType,
98 results: Vec<String>,
99 ) -> iceberg::Result<String> {
100 Ok(format!("struct<{}>", results.join(", ")))
101 }
102
103 fn after_struct_field(&mut self, _field: &iceberg::spec::NestedFieldRef) -> Result<()> {
104 self.depth -= 1;
105 Ok(())
106 }
107
108 fn field(
109 &mut self,
110 field: &iceberg::spec::NestedFieldRef,
111 value: String,
112 ) -> iceberg::Result<String> {
113 if self.is_inside_struct() {
114 return Ok(format!("{}:{}", field.name, &value));
115 }
116
117 let parameters = HashMap::from([
118 (ICEBERG_FIELD_ID.to_string(), format!("{}", field.id)),
119 (
120 ICEBERG_FIELD_OPTIONAL.to_string(),
121 format!("{}", !field.required).to_lowercase(),
122 ),
123 (
124 ICEBERG_FIELD_CURRENT.to_string(),
125 format!("{}", self.is_current).to_lowercase(),
126 ),
127 ]);
128
129 let mut builder = Column::builder()
130 .name(field.name.clone())
131 .r#type(&value)
132 .set_parameters(Some(parameters));
133
134 if let Some(comment) = field.doc.as_ref() {
135 builder = builder.comment(comment);
136 }
137
138 let column = builder.build().map_err(from_aws_build_error)?;
139
140 self.schema.push(column);
141
142 Ok(value)
143 }
144
145 fn list(&mut self, _list: &iceberg::spec::ListType, value: String) -> iceberg::Result<String> {
146 Ok(format!("array<{value}>"))
147 }
148
149 fn map(
150 &mut self,
151 _map: &iceberg::spec::MapType,
152 key_value: String,
153 value: String,
154 ) -> iceberg::Result<String> {
155 Ok(format!("map<{key_value},{value}>"))
156 }
157
158 fn primitive(&mut self, p: &iceberg::spec::PrimitiveType) -> iceberg::Result<Self::T> {
159 let glue_type = match p {
160 PrimitiveType::Boolean => "boolean".to_string(),
161 PrimitiveType::Int => "int".to_string(),
162 PrimitiveType::Long => "bigint".to_string(),
163 PrimitiveType::Float => "float".to_string(),
164 PrimitiveType::Double => "double".to_string(),
165 PrimitiveType::Date => "date".to_string(),
166 PrimitiveType::Timestamp => "timestamp".to_string(),
167 PrimitiveType::TimestampNs => "timestamp_ns".to_string(),
168 PrimitiveType::Timestamptz | PrimitiveType::TimestamptzNs => {
169 return Err(Error::new(
170 ErrorKind::FeatureUnsupported,
171 format!("Conversion from {p:?} is not supported"),
172 ));
173 }
174 PrimitiveType::Time | PrimitiveType::String | PrimitiveType::Uuid => {
175 "string".to_string()
176 }
177 PrimitiveType::Binary | PrimitiveType::Fixed(_) => "binary".to_string(),
178 PrimitiveType::Decimal { precision, scale } => {
179 format!("decimal({precision},{scale})")
180 }
181 };
182
183 Ok(glue_type)
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use iceberg::TableCreation;
190 use iceberg::spec::{Schema, TableMetadataBuilder};
191
192 use super::*;
193
194 fn create_metadata(schema: Schema) -> Result<TableMetadata> {
195 let table_creation = TableCreation::builder()
196 .name("my_table".to_string())
197 .location("my_location".to_string())
198 .schema(schema)
199 .build();
200 let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
201 .build()?
202 .metadata;
203
204 Ok(metadata)
205 }
206
207 fn create_column(
208 name: impl Into<String>,
209 r#type: impl Into<String>,
210 id: impl Into<String>,
211 optional: bool,
212 ) -> Result<Column> {
213 let parameters = HashMap::from([
214 (ICEBERG_FIELD_ID.to_string(), id.into()),
215 (ICEBERG_FIELD_OPTIONAL.to_string(), optional.to_string()),
216 (ICEBERG_FIELD_CURRENT.to_string(), "true".to_string()),
217 ]);
218
219 Column::builder()
220 .name(name)
221 .r#type(r#type)
222 .set_comment(None)
223 .set_parameters(Some(parameters))
224 .build()
225 .map_err(from_aws_build_error)
226 }
227
228 #[test]
229 fn test_schema_with_simple_fields() -> Result<()> {
230 let record = r#"{
231 "type": "struct",
232 "schema-id": 1,
233 "fields": [
234 {
235 "id": 1,
236 "name": "c1",
237 "required": true,
238 "type": "boolean"
239 },
240 {
241 "id": 2,
242 "name": "c2",
243 "required": true,
244 "type": "int"
245 },
246 {
247 "id": 3,
248 "name": "c3",
249 "required": true,
250 "type": "long"
251 },
252 {
253 "id": 4,
254 "name": "c4",
255 "required": true,
256 "type": "float"
257 },
258 {
259 "id": 5,
260 "name": "c5",
261 "required": true,
262 "type": "double"
263 },
264 {
265 "id": 6,
266 "name": "c6",
267 "required": true,
268 "type": "decimal(2,2)"
269 },
270 {
271 "id": 7,
272 "name": "c7",
273 "required": true,
274 "type": "date"
275 },
276 {
277 "id": 8,
278 "name": "c8",
279 "required": true,
280 "type": "time"
281 },
282 {
283 "id": 9,
284 "name": "c9",
285 "required": true,
286 "type": "timestamp"
287 },
288 {
289 "id": 10,
290 "name": "c10",
291 "required": true,
292 "type": "string"
293 },
294 {
295 "id": 11,
296 "name": "c11",
297 "required": true,
298 "type": "uuid"
299 },
300 {
301 "id": 12,
302 "name": "c12",
303 "required": true,
304 "type": "fixed[4]"
305 },
306 {
307 "id": 13,
308 "name": "c13",
309 "required": true,
310 "type": "binary"
311 }
312 ]
313 }"#;
314
315 let schema = serde_json::from_str::<Schema>(record)?;
316 let metadata = create_metadata(schema)?;
317
318 let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
319
320 let expected = vec![
321 create_column("c1", "boolean", "1", false)?,
322 create_column("c2", "int", "2", false)?,
323 create_column("c3", "bigint", "3", false)?,
324 create_column("c4", "float", "4", false)?,
325 create_column("c5", "double", "5", false)?,
326 create_column("c6", "decimal(2,2)", "6", false)?,
327 create_column("c7", "date", "7", false)?,
328 create_column("c8", "string", "8", false)?,
329 create_column("c9", "timestamp", "9", false)?,
330 create_column("c10", "string", "10", false)?,
331 create_column("c11", "string", "11", false)?,
332 create_column("c12", "binary", "12", false)?,
333 create_column("c13", "binary", "13", false)?,
334 ];
335
336 assert_eq!(result, expected);
337
338 Ok(())
339 }
340
341 #[test]
342 fn test_schema_with_structs() -> Result<()> {
343 let record = r#"{
344 "type": "struct",
345 "schema-id": 1,
346 "fields": [
347 {
348 "id": 1,
349 "name": "person",
350 "required": true,
351 "type": {
352 "type": "struct",
353 "fields": [
354 {
355 "id": 2,
356 "name": "name",
357 "required": true,
358 "type": "string"
359 },
360 {
361 "id": 3,
362 "name": "age",
363 "required": false,
364 "type": "int"
365 }
366 ]
367 }
368 }
369 ]
370 }"#;
371
372 let schema = serde_json::from_str::<Schema>(record)?;
373 let metadata = create_metadata(schema)?;
374
375 let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
376
377 let expected = vec![create_column(
378 "person",
379 "struct<name:string, age:int>",
380 "1",
381 false,
382 )?];
383
384 assert_eq!(result, expected);
385
386 Ok(())
387 }
388
389 #[test]
390 fn test_schema_with_struct_inside_list() -> Result<()> {
391 let record = r#"
392 {
393 "schema-id": 1,
394 "type": "struct",
395 "fields": [
396 {
397 "id": 1,
398 "name": "location",
399 "required": true,
400 "type": {
401 "type": "list",
402 "element-id": 2,
403 "element-required": true,
404 "element": {
405 "type": "struct",
406 "fields": [
407 {
408 "id": 3,
409 "name": "latitude",
410 "required": false,
411 "type": "float"
412 },
413 {
414 "id": 4,
415 "name": "longitude",
416 "required": false,
417 "type": "float"
418 }
419 ]
420 }
421 }
422 }
423 ]
424 }
425 "#;
426
427 let schema = serde_json::from_str::<Schema>(record)?;
428 let metadata = create_metadata(schema)?;
429
430 let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
431
432 let expected = vec![create_column(
433 "location",
434 "array<struct<latitude:float, longitude:float>>",
435 "1",
436 false,
437 )?];
438
439 assert_eq!(result, expected);
440
441 Ok(())
442 }
443
444 #[test]
445 fn test_schema_with_nested_maps() -> Result<()> {
446 let record = r#"
447 {
448 "schema-id": 1,
449 "type": "struct",
450 "fields": [
451 {
452 "id": 1,
453 "name": "quux",
454 "required": true,
455 "type": {
456 "type": "map",
457 "key-id": 2,
458 "key": "string",
459 "value-id": 3,
460 "value-required": true,
461 "value": {
462 "type": "map",
463 "key-id": 4,
464 "key": "string",
465 "value-id": 5,
466 "value-required": true,
467 "value": "int"
468 }
469 }
470 }
471 ]
472 }
473 "#;
474
475 let schema = serde_json::from_str::<Schema>(record)?;
476 let metadata = create_metadata(schema)?;
477
478 let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
479
480 let expected = vec![create_column(
481 "quux",
482 "map<string,map<string,int>>",
483 "1",
484 false,
485 )?];
486
487 assert_eq!(result, expected);
488
489 Ok(())
490 }
491
492 #[test]
493 fn test_schema_with_optional_fields() -> Result<()> {
494 let record = r#"{
495 "type": "struct",
496 "schema-id": 1,
497 "fields": [
498 {
499 "id": 1,
500 "name": "required_field",
501 "required": true,
502 "type": "string"
503 },
504 {
505 "id": 2,
506 "name": "optional_field",
507 "required": false,
508 "type": "int"
509 }
510 ]
511 }"#;
512
513 let schema = serde_json::from_str::<Schema>(record)?;
514 let metadata = create_metadata(schema)?;
515
516 let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
517
518 let expected = vec![
519 create_column("required_field", "string", "1", false)?,
520 create_column("optional_field", "int", "2", true)?,
521 ];
522
523 assert_eq!(result, expected);
524 Ok(())
525 }
526}