1use std::collections::BTreeMap;
20
21use apache_avro::Schema as AvroSchema;
22use apache_avro::schema::{
23 ArraySchema, DecimalSchema, FixedSchema, MapSchema, Name, RecordField as AvroRecordField,
24 RecordFieldOrder, RecordSchema, UnionSchema,
25};
26use itertools::{Either, Itertools};
27use serde_json::{Number, Value};
28
29use crate::spec::{
30 ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, SchemaVisitor,
31 StructType, Type, visit_schema,
32};
33use crate::{Error, ErrorKind, Result, ensure_data_valid};
34
35const ELEMENT_ID: &str = "element-id";
36const FIELD_ID_PROP: &str = "field-id";
37const KEY_ID: &str = "key-id";
38const VALUE_ID: &str = "value-id";
39const MAP_LOGICAL_TYPE: &str = "map";
40const LOGICAL_TYPE: &str = "logicalType";
42
43struct SchemaToAvroSchema {
44 schema: String,
45}
46
47type AvroSchemaOrField = Either<AvroSchema, AvroRecordField>;
48
49impl SchemaVisitor for SchemaToAvroSchema {
50 type T = AvroSchemaOrField;
51
52 fn schema(&mut self, _schema: &Schema, value: AvroSchemaOrField) -> Result<AvroSchemaOrField> {
53 let mut avro_schema = value.unwrap_left();
54
55 if let AvroSchema::Record(record) = &mut avro_schema {
56 record.name = Name::from(self.schema.as_str());
57 } else {
58 return Err(Error::new(
59 ErrorKind::Unexpected,
60 "Schema result must be avro record!",
61 ));
62 }
63
64 Ok(Either::Left(avro_schema))
65 }
66
67 fn field(
68 &mut self,
69 field: &NestedFieldRef,
70 avro_schema: AvroSchemaOrField,
71 ) -> Result<AvroSchemaOrField> {
72 let mut field_schema = avro_schema.unwrap_left();
73 if let AvroSchema::Record(record) = &mut field_schema {
74 record.name = Name::from(format!("r{}", field.id).as_str());
75 }
76
77 if !field.required {
78 field_schema = avro_optional(field_schema)?;
79 }
80
81 let default = if let Some(literal) = &field.initial_default {
82 Some(literal.clone().try_into_json(&field.field_type)?)
83 } else if !field.required {
84 Some(Value::Null)
85 } else {
86 None
87 };
88
89 let mut avro_record_field = AvroRecordField {
90 name: field.name.clone(),
91 schema: field_schema,
92 order: RecordFieldOrder::Ignore,
93 position: 0,
94 doc: field.doc.clone(),
95 aliases: None,
96 default,
97 custom_attributes: Default::default(),
98 };
99
100 avro_record_field.custom_attributes.insert(
101 FIELD_ID_PROP.to_string(),
102 Value::Number(Number::from(field.id)),
103 );
104
105 Ok(Either::Right(avro_record_field))
106 }
107
108 fn r#struct(
109 &mut self,
110 _struct: &StructType,
111 results: Vec<AvroSchemaOrField>,
112 ) -> Result<AvroSchemaOrField> {
113 let avro_fields = results.into_iter().map(|r| r.unwrap_right()).collect_vec();
114
115 Ok(Either::Left(
116 avro_record_schema("null", avro_fields)?,
119 ))
120 }
121
122 fn list(&mut self, list: &ListType, value: AvroSchemaOrField) -> Result<AvroSchemaOrField> {
123 let mut field_schema = value.unwrap_left();
124
125 if let AvroSchema::Record(record) = &mut field_schema {
126 record.name = Name::from(format!("r{}", list.element_field.id).as_str());
127 }
128
129 if !list.element_field.required {
130 field_schema = avro_optional(field_schema)?;
131 }
132
133 Ok(Either::Left(AvroSchema::Array(ArraySchema {
134 items: Box::new(field_schema),
135 attributes: BTreeMap::from([(
136 ELEMENT_ID.to_string(),
137 Value::Number(Number::from(list.element_field.id)),
138 )]),
139 })))
140 }
141
142 fn map(
143 &mut self,
144 map: &MapType,
145 key_value: AvroSchemaOrField,
146 value: AvroSchemaOrField,
147 ) -> Result<AvroSchemaOrField> {
148 let key_field_schema = key_value.unwrap_left();
149 let mut value_field_schema = value.unwrap_left();
150 if !map.value_field.required {
151 value_field_schema = avro_optional(value_field_schema)?;
152 }
153
154 if matches!(key_field_schema, AvroSchema::String) {
155 Ok(Either::Left(AvroSchema::Map(MapSchema {
156 types: Box::new(value_field_schema),
157 attributes: BTreeMap::from([
158 (
159 KEY_ID.to_string(),
160 Value::Number(Number::from(map.key_field.id)),
161 ),
162 (
163 VALUE_ID.to_string(),
164 Value::Number(Number::from(map.value_field.id)),
165 ),
166 ]),
167 })))
168 } else {
169 let key_field = {
172 let mut field = AvroRecordField {
173 name: map.key_field.name.clone(),
174 doc: None,
175 aliases: None,
176 default: None,
177 schema: key_field_schema,
178 order: RecordFieldOrder::Ascending,
179 position: 0,
180 custom_attributes: Default::default(),
181 };
182 field.custom_attributes.insert(
183 FIELD_ID_PROP.to_string(),
184 Value::Number(Number::from(map.key_field.id)),
185 );
186 field
187 };
188
189 let value_field = {
190 let mut field = AvroRecordField {
191 name: map.value_field.name.clone(),
192 doc: None,
193 aliases: None,
194 default: None,
195 schema: value_field_schema,
196 order: RecordFieldOrder::Ignore,
197 position: 0,
198 custom_attributes: Default::default(),
199 };
200 field.custom_attributes.insert(
201 FIELD_ID_PROP.to_string(),
202 Value::Number(Number::from(map.value_field.id)),
203 );
204 field
205 };
206
207 let fields = vec![key_field, value_field];
208 let item_avro_schema = avro_record_schema(
209 format!("k{}_v{}", map.key_field.id, map.value_field.id).as_str(),
210 fields,
211 )?;
212
213 Ok(Either::Left(AvroSchema::Array(ArraySchema {
214 items: Box::new(item_avro_schema),
215 attributes: BTreeMap::from([(
216 LOGICAL_TYPE.to_string(),
217 Value::String(MAP_LOGICAL_TYPE.to_string()),
218 )]),
219 })))
220 }
221 }
222
223 fn primitive(&mut self, p: &PrimitiveType) -> Result<AvroSchemaOrField> {
224 let avro_schema = match p {
225 PrimitiveType::Boolean => AvroSchema::Boolean,
226 PrimitiveType::Int => AvroSchema::Int,
227 PrimitiveType::Long => AvroSchema::Long,
228 PrimitiveType::Float => AvroSchema::Float,
229 PrimitiveType::Double => AvroSchema::Double,
230 PrimitiveType::Date => AvroSchema::Date,
231 PrimitiveType::Time => AvroSchema::TimeMicros,
232 PrimitiveType::Timestamp => AvroSchema::TimestampMicros,
233 PrimitiveType::Timestamptz => AvroSchema::TimestampMicros,
234 PrimitiveType::TimestampNs => AvroSchema::TimestampNanos,
235 PrimitiveType::TimestamptzNs => AvroSchema::TimestampNanos,
236 PrimitiveType::String => AvroSchema::String,
237 PrimitiveType::Uuid => AvroSchema::Uuid,
238 PrimitiveType::Fixed(len) => avro_fixed_schema((*len) as usize)?,
239 PrimitiveType::Binary => AvroSchema::Bytes,
240 PrimitiveType::Decimal { precision, scale } => {
241 avro_decimal_schema(*precision as usize, *scale as usize)?
242 }
243 };
244 Ok(Either::Left(avro_schema))
245 }
246}
247
248pub(crate) fn schema_to_avro_schema(name: impl ToString, schema: &Schema) -> Result<AvroSchema> {
250 let mut converter = SchemaToAvroSchema {
251 schema: name.to_string(),
252 };
253
254 visit_schema(schema, &mut converter).map(Either::unwrap_left)
255}
256
257fn avro_record_schema(name: &str, fields: Vec<AvroRecordField>) -> Result<AvroSchema> {
258 let lookup = fields
259 .iter()
260 .enumerate()
261 .map(|f| (f.1.name.clone(), f.0))
262 .collect();
263
264 Ok(AvroSchema::Record(RecordSchema {
265 name: Name::new(name)?,
266 aliases: None,
267 doc: None,
268 fields,
269 lookup,
270 attributes: Default::default(),
271 }))
272}
273
274pub(crate) fn avro_fixed_schema(len: usize) -> Result<AvroSchema> {
275 Ok(AvroSchema::Fixed(FixedSchema {
276 name: Name::new(format!("fixed_{len}").as_str())?,
277 aliases: None,
278 doc: None,
279 size: len,
280 attributes: Default::default(),
281 default: None,
282 }))
283}
284
285pub(crate) fn avro_decimal_schema(precision: usize, scale: usize) -> Result<AvroSchema> {
286 Ok(AvroSchema::Decimal(DecimalSchema {
291 precision,
292 scale,
293 inner: Box::new(AvroSchema::Fixed(FixedSchema {
294 name: Name::new(&format!("decimal_{precision}_{scale}")).unwrap(),
297 aliases: None,
298 doc: None,
299 size: crate::spec::Type::decimal_required_bytes(precision as u32)? as usize,
300 attributes: Default::default(),
301 default: None,
302 })),
303 }))
304}
305
306fn avro_optional(avro_schema: AvroSchema) -> Result<AvroSchema> {
307 Ok(AvroSchema::Union(UnionSchema::new(vec![
308 AvroSchema::Null,
309 avro_schema,
310 ])?))
311}
312
313fn is_avro_optional(avro_schema: &AvroSchema) -> bool {
314 match avro_schema {
315 AvroSchema::Union(union) => union.is_nullable(),
316 _ => false,
317 }
318}
319
320pub(crate) trait AvroSchemaVisitor {
322 type T;
323
324 fn record(&mut self, record: &RecordSchema, fields: Vec<Self::T>) -> Result<Self::T>;
325
326 fn union(&mut self, union: &UnionSchema, options: Vec<Self::T>) -> Result<Self::T>;
327
328 fn array(&mut self, array: &ArraySchema, item: Self::T) -> Result<Self::T>;
329 fn map(&mut self, map: &MapSchema, value: Self::T) -> Result<Self::T>;
330 fn map_array(&mut self, array: &RecordSchema, key: Self::T, value: Self::T) -> Result<Self::T>;
333
334 fn primitive(&mut self, schema: &AvroSchema) -> Result<Self::T>;
335}
336
337pub(crate) fn visit<V: AvroSchemaVisitor>(schema: &AvroSchema, visitor: &mut V) -> Result<V::T> {
339 match schema {
340 AvroSchema::Record(record) => {
341 let field_results = record
342 .fields
343 .iter()
344 .map(|f| visit(&f.schema, visitor))
345 .collect::<Result<Vec<V::T>>>()?;
346
347 visitor.record(record, field_results)
348 }
349 AvroSchema::Union(union) => {
350 let option_results = union
351 .variants()
352 .iter()
353 .map(|f| visit(f, visitor))
354 .collect::<Result<Vec<V::T>>>()?;
355
356 visitor.union(union, option_results)
357 }
358 AvroSchema::Array(item) => {
359 if let Some(logical_type) = item
360 .attributes
361 .get(LOGICAL_TYPE)
362 .and_then(|v| Value::as_str(v))
363 {
364 if logical_type == MAP_LOGICAL_TYPE {
365 if let AvroSchema::Record(record_schema) = &*item.items {
366 let key = visit(&record_schema.fields[0].schema, visitor)?;
367 let value = visit(&record_schema.fields[1].schema, visitor)?;
368 return visitor.map_array(record_schema, key, value);
369 } else {
370 return Err(Error::new(
371 ErrorKind::DataInvalid,
372 "Can't convert avro map schema, item is not a record.",
373 ));
374 }
375 } else {
376 return Err(Error::new(
377 ErrorKind::FeatureUnsupported,
378 format!(
379 "Logical type {logical_type} is not support in iceberg array type.",
380 ),
381 ));
382 }
383 }
384 let item_result = visit(&item.items, visitor)?;
385 visitor.array(item, item_result)
386 }
387 AvroSchema::Map(inner) => {
388 let item_result = visit(&inner.types, visitor)?;
389 visitor.map(inner, item_result)
390 }
391 schema => visitor.primitive(schema),
392 }
393}
394
395struct AvroSchemaToSchema;
396
397impl AvroSchemaToSchema {
398 #[inline]
400 fn get_element_id_from_attributes(
401 attributes: &BTreeMap<String, Value>,
402 name: &str,
403 ) -> Result<i32> {
404 attributes
405 .get(name)
406 .ok_or_else(|| {
407 Error::new(
408 ErrorKind::DataInvalid,
409 "Can't convert avro array schema, missing element id.",
410 )
411 })?
412 .as_i64()
413 .ok_or_else(|| {
414 Error::new(
415 ErrorKind::DataInvalid,
416 "Can't convert avro array schema, element id is not a valid i64 number.",
417 )
418 })?
419 .try_into()
420 .map_err(|_| {
421 Error::new(
422 ErrorKind::DataInvalid,
423 "Can't convert avro array schema, element id is not a valid i32.",
424 )
425 })
426 }
427}
428
429impl AvroSchemaVisitor for AvroSchemaToSchema {
430 type T = Option<Type>;
432
433 fn record(
434 &mut self,
435 record: &RecordSchema,
436 field_types: Vec<Option<Type>>,
437 ) -> Result<Option<Type>> {
438 let mut fields = Vec::with_capacity(field_types.len());
439 for (avro_field, field_type) in record.fields.iter().zip_eq(field_types) {
440 let field_id =
441 Self::get_element_id_from_attributes(&avro_field.custom_attributes, FIELD_ID_PROP)?;
442
443 let optional = is_avro_optional(&avro_field.schema);
444
445 let mut field =
446 NestedField::new(field_id, &avro_field.name, field_type.unwrap(), !optional);
447
448 if let Some(doc) = &avro_field.doc {
449 field = field.with_doc(doc);
450 }
451
452 fields.push(field.into());
453 }
454
455 Ok(Some(Type::Struct(StructType::new(fields))))
456 }
457
458 fn union(
459 &mut self,
460 union: &UnionSchema,
461 mut options: Vec<Option<Type>>,
462 ) -> Result<Option<Type>> {
463 ensure_data_valid!(
464 options.len() <= 2 && !options.is_empty(),
465 "Can't convert avro union type {:?} to iceberg.",
466 union
467 );
468
469 if options.len() > 1 {
470 ensure_data_valid!(
471 options[0].is_none(),
472 "Can't convert avro union type {:?} to iceberg.",
473 union
474 );
475 }
476
477 if options.len() == 1 {
478 Ok(Some(options.remove(0).unwrap()))
479 } else {
480 Ok(Some(options.remove(1).unwrap()))
481 }
482 }
483
484 fn array(&mut self, array: &ArraySchema, item: Option<Type>) -> Result<Self::T> {
485 let element_field_id = Self::get_element_id_from_attributes(&array.attributes, ELEMENT_ID)?;
486 let element_field = NestedField::list_element(
487 element_field_id,
488 item.unwrap(),
489 !is_avro_optional(&array.items),
490 )
491 .into();
492 Ok(Some(Type::List(ListType { element_field })))
493 }
494
495 fn map(&mut self, map: &MapSchema, value: Option<Type>) -> Result<Option<Type>> {
496 let key_field_id = Self::get_element_id_from_attributes(&map.attributes, KEY_ID)?;
497 let key_field =
498 NestedField::map_key_element(key_field_id, Type::Primitive(PrimitiveType::String));
499 let value_field_id = Self::get_element_id_from_attributes(&map.attributes, VALUE_ID)?;
500 let value_field = NestedField::map_value_element(
501 value_field_id,
502 value.unwrap(),
503 !is_avro_optional(&map.types),
504 );
505 Ok(Some(Type::Map(MapType {
506 key_field: key_field.into(),
507 value_field: value_field.into(),
508 })))
509 }
510
511 fn primitive(&mut self, schema: &AvroSchema) -> Result<Option<Type>> {
512 let schema_type = match schema {
513 AvroSchema::Decimal(decimal) => {
514 Type::decimal(decimal.precision as u32, decimal.scale as u32)?
515 }
516 AvroSchema::Date => Type::Primitive(PrimitiveType::Date),
517 AvroSchema::TimeMicros => Type::Primitive(PrimitiveType::Time),
518 AvroSchema::TimestampMicros => Type::Primitive(PrimitiveType::Timestamp),
519 AvroSchema::TimestampNanos => Type::Primitive(PrimitiveType::TimestampNs),
520 AvroSchema::Boolean => Type::Primitive(PrimitiveType::Boolean),
521 AvroSchema::Int => Type::Primitive(PrimitiveType::Int),
522 AvroSchema::Long => Type::Primitive(PrimitiveType::Long),
523 AvroSchema::Float => Type::Primitive(PrimitiveType::Float),
524 AvroSchema::Double => Type::Primitive(PrimitiveType::Double),
525 AvroSchema::Uuid => Type::Primitive(PrimitiveType::Uuid),
526 AvroSchema::String | AvroSchema::Enum(_) => Type::Primitive(PrimitiveType::String),
527 AvroSchema::Fixed(fixed) => Type::Primitive(PrimitiveType::Fixed(fixed.size as u64)),
528 AvroSchema::Bytes => Type::Primitive(PrimitiveType::Binary),
529 AvroSchema::Null => return Ok(None),
530 _ => {
531 return Err(Error::new(
532 ErrorKind::Unexpected,
533 "Unable to convert avro {schema} to iceberg primitive type.",
534 ));
535 }
536 };
537
538 Ok(Some(schema_type))
539 }
540
541 fn map_array(
542 &mut self,
543 array: &RecordSchema,
544 key: Option<Type>,
545 value: Option<Type>,
546 ) -> Result<Self::T> {
547 let key = key.ok_or_else(|| {
548 Error::new(
549 ErrorKind::DataInvalid,
550 "Can't convert avro map schema, missing key schema.",
551 )
552 })?;
553 let value = value.ok_or_else(|| {
554 Error::new(
555 ErrorKind::DataInvalid,
556 "Can't convert avro map schema, missing value schema.",
557 )
558 })?;
559 let key_id = Self::get_element_id_from_attributes(
560 &array.fields[0].custom_attributes,
561 FIELD_ID_PROP,
562 )?;
563 let value_id = Self::get_element_id_from_attributes(
564 &array.fields[1].custom_attributes,
565 FIELD_ID_PROP,
566 )?;
567 let key_field = NestedField::map_key_element(key_id, key);
568 let value_field = NestedField::map_value_element(
569 value_id,
570 value,
571 !is_avro_optional(&array.fields[1].schema),
572 );
573 Ok(Some(Type::Map(MapType {
574 key_field: key_field.into(),
575 value_field: value_field.into(),
576 })))
577 }
578}
579
580#[allow(unused)]
583pub(crate) fn avro_schema_to_schema(avro_schema: &AvroSchema) -> Result<Schema> {
585 if let AvroSchema::Record(_) = avro_schema {
586 let mut converter = AvroSchemaToSchema;
587 let schema_type =
588 visit(avro_schema, &mut converter)?.expect("Iceberg schema should not be none.");
589 if let Type::Struct(s) = schema_type {
590 Schema::builder()
591 .with_fields(s.fields().iter().cloned())
592 .build()
593 } else {
594 Err(Error::new(
595 ErrorKind::Unexpected,
596 format!("Expected to convert avro record schema to struct type, but {schema_type}"),
597 ))
598 }
599 } else {
600 Err(Error::new(
601 ErrorKind::DataInvalid,
602 "Can't convert non record avro schema to iceberg schema: {avro_schema}",
603 ))
604 }
605}
606
607#[cfg(test)]
608mod tests {
609 use std::fs::read_to_string;
610 use std::sync::Arc;
611
612 use apache_avro::Schema as AvroSchema;
613 use apache_avro::schema::{Namespace, UnionSchema};
614
615 use super::*;
616 use crate::avro::schema::AvroSchemaToSchema;
617 use crate::spec::{ListType, MapType, NestedField, PrimitiveType, Schema, StructType, Type};
618
619 fn read_test_data_file_to_avro_schema(filename: &str) -> AvroSchema {
620 let input = read_to_string(format!(
621 "{}/testdata/{}",
622 env!("CARGO_MANIFEST_DIR"),
623 filename
624 ))
625 .unwrap();
626
627 AvroSchema::parse_str(input.as_str()).unwrap()
628 }
629
630 fn check_schema_conversion(avro_schema: AvroSchema, iceberg_schema: Schema) {
635 let converted_iceberg_schema = avro_schema_to_schema(&avro_schema).unwrap();
637 assert_eq!(iceberg_schema, converted_iceberg_schema);
638
639 let converted_avro_schema = schema_to_avro_schema(
641 avro_schema.name().unwrap().fullname(Namespace::None),
642 &iceberg_schema,
643 )
644 .unwrap();
645 assert_eq!(avro_schema, converted_avro_schema);
646
647 let converted_avro_converted_iceberg_schema =
649 avro_schema_to_schema(&converted_avro_schema).unwrap();
650 assert_eq!(iceberg_schema, converted_avro_converted_iceberg_schema);
651 }
652
653 #[test]
654 fn test_manifest_file_v1_schema() {
655 let fields = vec![
656 NestedField::required(500, "manifest_path", PrimitiveType::String.into())
657 .with_doc("Location URI with FS scheme")
658 .into(),
659 NestedField::required(501, "manifest_length", PrimitiveType::Long.into())
660 .with_doc("Total file size in bytes")
661 .into(),
662 NestedField::required(502, "partition_spec_id", PrimitiveType::Int.into())
663 .with_doc("Spec ID used to write")
664 .into(),
665 NestedField::optional(503, "added_snapshot_id", PrimitiveType::Long.into())
666 .with_doc("Snapshot ID that added the manifest")
667 .into(),
668 NestedField::optional(504, "added_data_files_count", PrimitiveType::Int.into())
669 .with_doc("Added entry count")
670 .into(),
671 NestedField::optional(505, "existing_data_files_count", PrimitiveType::Int.into())
672 .with_doc("Existing entry count")
673 .into(),
674 NestedField::optional(506, "deleted_data_files_count", PrimitiveType::Int.into())
675 .with_doc("Deleted entry count")
676 .into(),
677 NestedField::optional(
678 507,
679 "partitions",
680 ListType {
681 element_field: NestedField::list_element(
682 508,
683 StructType::new(vec![
684 NestedField::required(
685 509,
686 "contains_null",
687 PrimitiveType::Boolean.into(),
688 )
689 .with_doc("True if any file has a null partition value")
690 .into(),
691 NestedField::optional(
692 518,
693 "contains_nan",
694 PrimitiveType::Boolean.into(),
695 )
696 .with_doc("True if any file has a nan partition value")
697 .into(),
698 NestedField::optional(510, "lower_bound", PrimitiveType::Binary.into())
699 .with_doc("Partition lower bound for all files")
700 .into(),
701 NestedField::optional(511, "upper_bound", PrimitiveType::Binary.into())
702 .with_doc("Partition upper bound for all files")
703 .into(),
704 ])
705 .into(),
706 true,
707 )
708 .into(),
709 }
710 .into(),
711 )
712 .with_doc("Summary for each partition")
713 .into(),
714 NestedField::optional(512, "added_rows_count", PrimitiveType::Long.into())
715 .with_doc("Added rows count")
716 .into(),
717 NestedField::optional(513, "existing_rows_count", PrimitiveType::Long.into())
718 .with_doc("Existing rows count")
719 .into(),
720 NestedField::optional(514, "deleted_rows_count", PrimitiveType::Long.into())
721 .with_doc("Deleted rows count")
722 .into(),
723 ];
724
725 let iceberg_schema = Schema::builder().with_fields(fields).build().unwrap();
726 check_schema_conversion(
727 read_test_data_file_to_avro_schema("avro_schema_manifest_file_v1.json"),
728 iceberg_schema,
729 );
730 }
731
732 #[test]
733 fn test_avro_list_required_primitive() {
734 let avro_schema = {
735 AvroSchema::parse_str(
736 r#"
737{
738 "type": "record",
739 "name": "avro_schema",
740 "fields": [
741 {
742 "name": "array_with_string",
743 "type": {
744 "type": "array",
745 "items": "string",
746 "default": [],
747 "element-id": 101
748 },
749 "field-id": 100
750 }
751 ]
752}"#,
753 )
754 .unwrap()
755 };
756
757 let iceberg_schema = {
758 Schema::builder()
759 .with_fields(vec![
760 NestedField::required(
761 100,
762 "array_with_string",
763 ListType {
764 element_field: NestedField::list_element(
765 101,
766 PrimitiveType::String.into(),
767 true,
768 )
769 .into(),
770 }
771 .into(),
772 )
773 .into(),
774 ])
775 .build()
776 .unwrap()
777 };
778
779 check_schema_conversion(avro_schema, iceberg_schema);
780 }
781
782 #[test]
783 fn test_avro_list_wrapped_primitive() {
784 let avro_schema = {
785 AvroSchema::parse_str(
786 r#"
787{
788 "type": "record",
789 "name": "avro_schema",
790 "fields": [
791 {
792 "name": "array_with_string",
793 "type": {
794 "type": "array",
795 "items": {"type": "string"},
796 "default": [],
797 "element-id": 101
798 },
799 "field-id": 100
800 }
801 ]
802}
803"#,
804 )
805 .unwrap()
806 };
807
808 let iceberg_schema = {
809 Schema::builder()
810 .with_fields(vec![
811 NestedField::required(
812 100,
813 "array_with_string",
814 ListType {
815 element_field: NestedField::list_element(
816 101,
817 PrimitiveType::String.into(),
818 true,
819 )
820 .into(),
821 }
822 .into(),
823 )
824 .into(),
825 ])
826 .build()
827 .unwrap()
828 };
829
830 check_schema_conversion(avro_schema, iceberg_schema);
831 }
832
833 #[test]
834 fn test_avro_list_required_record() {
835 let avro_schema = {
836 AvroSchema::parse_str(
837 r#"
838{
839 "type": "record",
840 "name": "avro_schema",
841 "fields": [
842 {
843 "name": "array_with_record",
844 "type": {
845 "type": "array",
846 "items": {
847 "type": "record",
848 "name": "r101",
849 "fields": [
850 {
851 "name": "contains_null",
852 "type": "boolean",
853 "field-id": 102
854 },
855 {
856 "name": "contains_nan",
857 "type": ["null", "boolean"],
858 "field-id": 103
859 }
860 ]
861 },
862 "element-id": 101
863 },
864 "field-id": 100
865 }
866 ]
867}
868"#,
869 )
870 .unwrap()
871 };
872
873 let iceberg_schema = {
874 Schema::builder()
875 .with_fields(vec![
876 NestedField::required(
877 100,
878 "array_with_record",
879 ListType {
880 element_field: NestedField::list_element(
881 101,
882 StructType::new(vec![
883 NestedField::required(
884 102,
885 "contains_null",
886 PrimitiveType::Boolean.into(),
887 )
888 .into(),
889 NestedField::optional(
890 103,
891 "contains_nan",
892 PrimitiveType::Boolean.into(),
893 )
894 .into(),
895 ])
896 .into(),
897 true,
898 )
899 .into(),
900 }
901 .into(),
902 )
903 .into(),
904 ])
905 .build()
906 .unwrap()
907 };
908
909 check_schema_conversion(avro_schema, iceberg_schema);
910 }
911
912 #[test]
913 fn test_schema_with_array_map() {
914 let avro_schema = {
915 AvroSchema::parse_str(
916 r#"
917{
918 "type": "record",
919 "name": "avro_schema",
920 "fields": [
921 {
922 "name": "optional",
923 "type": {
924 "type": "array",
925 "items": {
926 "type": "record",
927 "name": "k102_v103",
928 "fields": [
929 {
930 "name": "key",
931 "type": "boolean",
932 "field-id": 102
933 },
934 {
935 "name": "value",
936 "type": ["null", "boolean"],
937 "field-id": 103
938 }
939 ]
940 },
941 "default": [],
942 "element-id": 101,
943 "logicalType": "map"
944 },
945 "field-id": 100
946 },{
947 "name": "required",
948 "type": {
949 "type": "array",
950 "items": {
951 "type": "record",
952 "name": "k105_v106",
953 "fields": [
954 {
955 "name": "key",
956 "type": "boolean",
957 "field-id": 105
958 },
959 {
960 "name": "value",
961 "type": "boolean",
962 "field-id": 106
963 }
964 ]
965 },
966 "default": [],
967 "logicalType": "map"
968 },
969 "field-id": 104
970 }, {
971 "name": "string_map",
972 "type": {
973 "type": "map",
974 "values": ["null", "long"],
975 "key-id": 108,
976 "value-id": 109
977 },
978 "field-id": 107
979 }
980 ]
981}
982"#,
983 )
984 .unwrap()
985 };
986
987 let iceberg_schema = {
988 Schema::builder()
989 .with_fields(vec![
990 Arc::new(NestedField::required(
991 100,
992 "optional",
993 Type::Map(MapType {
994 key_field: NestedField::map_key_element(
995 102,
996 PrimitiveType::Boolean.into(),
997 )
998 .into(),
999 value_field: NestedField::map_value_element(
1000 103,
1001 PrimitiveType::Boolean.into(),
1002 false,
1003 )
1004 .into(),
1005 }),
1006 )),
1007 Arc::new(NestedField::required(
1008 104,
1009 "required",
1010 Type::Map(MapType {
1011 key_field: NestedField::map_key_element(
1012 105,
1013 PrimitiveType::Boolean.into(),
1014 )
1015 .into(),
1016 value_field: NestedField::map_value_element(
1017 106,
1018 PrimitiveType::Boolean.into(),
1019 true,
1020 )
1021 .into(),
1022 }),
1023 )),
1024 Arc::new(NestedField::required(
1025 107,
1026 "string_map",
1027 Type::Map(MapType {
1028 key_field: NestedField::map_key_element(
1029 108,
1030 PrimitiveType::String.into(),
1031 )
1032 .into(),
1033 value_field: NestedField::map_value_element(
1034 109,
1035 PrimitiveType::Long.into(),
1036 false,
1037 )
1038 .into(),
1039 }),
1040 )),
1041 ])
1042 .build()
1043 .unwrap()
1044 };
1045
1046 check_schema_conversion(avro_schema, iceberg_schema);
1047 }
1048
1049 #[test]
1050 fn test_resolve_union() {
1051 let avro_schema = UnionSchema::new(vec![
1052 AvroSchema::Null,
1053 AvroSchema::String,
1054 AvroSchema::Boolean,
1055 ])
1056 .unwrap();
1057
1058 let mut converter = AvroSchemaToSchema;
1059
1060 let options = avro_schema
1061 .variants()
1062 .iter()
1063 .map(|v| converter.primitive(v).unwrap())
1064 .collect();
1065 assert!(converter.union(&avro_schema, options).is_err());
1066 }
1067
1068 #[test]
1069 fn test_string_type() {
1070 let mut converter = AvroSchemaToSchema;
1071 let avro_schema = AvroSchema::String;
1072
1073 assert_eq!(
1074 Some(PrimitiveType::String.into()),
1075 converter.primitive(&avro_schema).unwrap()
1076 );
1077 }
1078
1079 #[test]
1080 fn test_map_type() {
1081 let avro_schema = {
1082 AvroSchema::parse_str(
1083 r#"
1084{
1085 "type": "map",
1086 "values": ["null", "long"],
1087 "key-id": 101,
1088 "value-id": 102
1089}
1090"#,
1091 )
1092 .unwrap()
1093 };
1094
1095 let AvroSchema::Map(avro_schema) = avro_schema else {
1096 unreachable!()
1097 };
1098
1099 let mut converter = AvroSchemaToSchema;
1100 let iceberg_type = Type::Map(MapType {
1101 key_field: NestedField::map_key_element(101, PrimitiveType::String.into()).into(),
1102 value_field: NestedField::map_value_element(102, PrimitiveType::Long.into(), false)
1103 .into(),
1104 });
1105
1106 assert_eq!(
1107 iceberg_type,
1108 converter
1109 .map(&avro_schema, Some(PrimitiveType::Long.into()))
1110 .unwrap()
1111 .unwrap()
1112 );
1113 }
1114
1115 #[test]
1116 fn test_fixed_type() {
1117 let avro_schema = {
1118 AvroSchema::parse_str(
1119 r#"
1120 {"name": "test", "type": "fixed", "size": 22}
1121 "#,
1122 )
1123 .unwrap()
1124 };
1125
1126 let mut converter = AvroSchemaToSchema;
1127
1128 let iceberg_type = Type::from(PrimitiveType::Fixed(22));
1129
1130 assert_eq!(
1131 iceberg_type,
1132 converter.primitive(&avro_schema).unwrap().unwrap()
1133 );
1134 }
1135
1136 #[test]
1137 fn test_unknown_primitive() {
1138 let mut converter = AvroSchemaToSchema;
1139
1140 assert!(converter.primitive(&AvroSchema::Duration).is_err());
1141 }
1142
1143 #[test]
1144 fn test_no_field_id() {
1145 let avro_schema = {
1146 AvroSchema::parse_str(
1147 r#"
1148{
1149 "type": "record",
1150 "name": "avro_schema",
1151 "fields": [
1152 {
1153 "name": "array_with_string",
1154 "type": "string"
1155 }
1156 ]
1157}
1158"#,
1159 )
1160 .unwrap()
1161 };
1162
1163 assert!(avro_schema_to_schema(&avro_schema).is_err());
1164 }
1165
1166 #[test]
1167 fn test_decimal_type() {
1168 let avro_schema = {
1169 AvroSchema::parse_str(
1170 r#"
1171 {"type": "bytes", "logicalType": "decimal", "precision": 25, "scale": 19}
1172 "#,
1173 )
1174 .unwrap()
1175 };
1176
1177 let mut converter = AvroSchemaToSchema;
1178
1179 assert_eq!(
1180 Type::decimal(25, 19).unwrap(),
1181 converter.primitive(&avro_schema).unwrap().unwrap()
1182 );
1183 }
1184
1185 #[test]
1186 fn test_date_type() {
1187 let mut converter = AvroSchemaToSchema;
1188
1189 assert_eq!(
1190 Type::from(PrimitiveType::Date),
1191 converter.primitive(&AvroSchema::Date).unwrap().unwrap()
1192 );
1193 }
1194
1195 #[test]
1196 fn test_uuid_type() {
1197 let avro_schema = {
1198 AvroSchema::parse_str(
1199 r#"
1200 {"name": "test", "type": "fixed", "size": 16, "logicalType": "uuid"}
1201 "#,
1202 )
1203 .unwrap()
1204 };
1205
1206 let mut converter = AvroSchemaToSchema;
1207
1208 let iceberg_type = Type::from(PrimitiveType::Uuid);
1209
1210 assert_eq!(
1211 iceberg_type,
1212 converter.primitive(&avro_schema).unwrap().unwrap()
1213 );
1214 }
1215}