iceberg/avro/
schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Conversion between iceberg and avro schema.
19use std::collections::BTreeMap;
20
21use apache_avro::Schema as AvroSchema;
22use apache_avro::schema::{
23    ArraySchema, DecimalSchema, FixedSchema, MapSchema, Name, RecordField as AvroRecordField,
24    RecordFieldOrder, RecordSchema, UnionSchema,
25};
26use itertools::{Either, Itertools};
27use serde_json::{Number, Value};
28
29use crate::spec::{
30    ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, SchemaVisitor,
31    StructType, Type, visit_schema,
32};
33use crate::{Error, ErrorKind, Result, ensure_data_valid};
34
35const ELEMENT_ID: &str = "element-id";
36const FIELD_ID_PROP: &str = "field-id";
37const KEY_ID: &str = "key-id";
38const VALUE_ID: &str = "value-id";
39const MAP_LOGICAL_TYPE: &str = "map";
40// This const may better to maintain in avro-rs.
41const LOGICAL_TYPE: &str = "logicalType";
42
43struct SchemaToAvroSchema {
44    schema: String,
45}
46
47type AvroSchemaOrField = Either<AvroSchema, AvroRecordField>;
48
49impl SchemaVisitor for SchemaToAvroSchema {
50    type T = AvroSchemaOrField;
51
52    fn schema(&mut self, _schema: &Schema, value: AvroSchemaOrField) -> Result<AvroSchemaOrField> {
53        let mut avro_schema = value.unwrap_left();
54
55        if let AvroSchema::Record(record) = &mut avro_schema {
56            record.name = Name::from(self.schema.as_str());
57        } else {
58            return Err(Error::new(
59                ErrorKind::Unexpected,
60                "Schema result must be avro record!",
61            ));
62        }
63
64        Ok(Either::Left(avro_schema))
65    }
66
67    fn field(
68        &mut self,
69        field: &NestedFieldRef,
70        avro_schema: AvroSchemaOrField,
71    ) -> Result<AvroSchemaOrField> {
72        let mut field_schema = avro_schema.unwrap_left();
73        if let AvroSchema::Record(record) = &mut field_schema {
74            record.name = Name::from(format!("r{}", field.id).as_str());
75        }
76
77        if !field.required {
78            field_schema = avro_optional(field_schema)?;
79        }
80
81        let default = if let Some(literal) = &field.initial_default {
82            Some(literal.clone().try_into_json(&field.field_type)?)
83        } else if !field.required {
84            Some(Value::Null)
85        } else {
86            None
87        };
88
89        let mut avro_record_field = AvroRecordField {
90            name: field.name.clone(),
91            schema: field_schema,
92            order: RecordFieldOrder::Ignore,
93            position: 0,
94            doc: field.doc.clone(),
95            aliases: None,
96            default,
97            custom_attributes: Default::default(),
98        };
99
100        avro_record_field.custom_attributes.insert(
101            FIELD_ID_PROP.to_string(),
102            Value::Number(Number::from(field.id)),
103        );
104
105        Ok(Either::Right(avro_record_field))
106    }
107
108    fn r#struct(
109        &mut self,
110        _struct: &StructType,
111        results: Vec<AvroSchemaOrField>,
112    ) -> Result<AvroSchemaOrField> {
113        let avro_fields = results.into_iter().map(|r| r.unwrap_right()).collect_vec();
114
115        Ok(Either::Left(
116            // The name of this record schema should be determined later, by schema name or field
117            // name, here we use a temporary placeholder to do it.
118            avro_record_schema("null", avro_fields)?,
119        ))
120    }
121
122    fn list(&mut self, list: &ListType, value: AvroSchemaOrField) -> Result<AvroSchemaOrField> {
123        let mut field_schema = value.unwrap_left();
124
125        if let AvroSchema::Record(record) = &mut field_schema {
126            record.name = Name::from(format!("r{}", list.element_field.id).as_str());
127        }
128
129        if !list.element_field.required {
130            field_schema = avro_optional(field_schema)?;
131        }
132
133        Ok(Either::Left(AvroSchema::Array(ArraySchema {
134            items: Box::new(field_schema),
135            attributes: BTreeMap::from([(
136                ELEMENT_ID.to_string(),
137                Value::Number(Number::from(list.element_field.id)),
138            )]),
139        })))
140    }
141
142    fn map(
143        &mut self,
144        map: &MapType,
145        key_value: AvroSchemaOrField,
146        value: AvroSchemaOrField,
147    ) -> Result<AvroSchemaOrField> {
148        let key_field_schema = key_value.unwrap_left();
149        let mut value_field_schema = value.unwrap_left();
150        if !map.value_field.required {
151            value_field_schema = avro_optional(value_field_schema)?;
152        }
153
154        if matches!(key_field_schema, AvroSchema::String) {
155            Ok(Either::Left(AvroSchema::Map(MapSchema {
156                types: Box::new(value_field_schema),
157                attributes: BTreeMap::from([
158                    (
159                        KEY_ID.to_string(),
160                        Value::Number(Number::from(map.key_field.id)),
161                    ),
162                    (
163                        VALUE_ID.to_string(),
164                        Value::Number(Number::from(map.value_field.id)),
165                    ),
166                ]),
167            })))
168        } else {
169            // Avro map requires that key must be string type. Here we convert it to array if key is
170            // not string type.
171            let key_field = {
172                let mut field = AvroRecordField {
173                    name: map.key_field.name.clone(),
174                    doc: None,
175                    aliases: None,
176                    default: None,
177                    schema: key_field_schema,
178                    order: RecordFieldOrder::Ascending,
179                    position: 0,
180                    custom_attributes: Default::default(),
181                };
182                field.custom_attributes.insert(
183                    FIELD_ID_PROP.to_string(),
184                    Value::Number(Number::from(map.key_field.id)),
185                );
186                field
187            };
188
189            let value_field = {
190                let mut field = AvroRecordField {
191                    name: map.value_field.name.clone(),
192                    doc: None,
193                    aliases: None,
194                    default: None,
195                    schema: value_field_schema,
196                    order: RecordFieldOrder::Ignore,
197                    position: 0,
198                    custom_attributes: Default::default(),
199                };
200                field.custom_attributes.insert(
201                    FIELD_ID_PROP.to_string(),
202                    Value::Number(Number::from(map.value_field.id)),
203                );
204                field
205            };
206
207            let fields = vec![key_field, value_field];
208            let item_avro_schema = avro_record_schema(
209                format!("k{}_v{}", map.key_field.id, map.value_field.id).as_str(),
210                fields,
211            )?;
212
213            Ok(Either::Left(AvroSchema::Array(ArraySchema {
214                items: Box::new(item_avro_schema),
215                attributes: BTreeMap::from([(
216                    LOGICAL_TYPE.to_string(),
217                    Value::String(MAP_LOGICAL_TYPE.to_string()),
218                )]),
219            })))
220        }
221    }
222
223    fn primitive(&mut self, p: &PrimitiveType) -> Result<AvroSchemaOrField> {
224        let avro_schema = match p {
225            PrimitiveType::Boolean => AvroSchema::Boolean,
226            PrimitiveType::Int => AvroSchema::Int,
227            PrimitiveType::Long => AvroSchema::Long,
228            PrimitiveType::Float => AvroSchema::Float,
229            PrimitiveType::Double => AvroSchema::Double,
230            PrimitiveType::Date => AvroSchema::Date,
231            PrimitiveType::Time => AvroSchema::TimeMicros,
232            PrimitiveType::Timestamp => AvroSchema::TimestampMicros,
233            PrimitiveType::Timestamptz => AvroSchema::TimestampMicros,
234            PrimitiveType::TimestampNs => AvroSchema::TimestampNanos,
235            PrimitiveType::TimestamptzNs => AvroSchema::TimestampNanos,
236            PrimitiveType::String => AvroSchema::String,
237            PrimitiveType::Uuid => AvroSchema::Uuid,
238            PrimitiveType::Fixed(len) => avro_fixed_schema((*len) as usize)?,
239            PrimitiveType::Binary => AvroSchema::Bytes,
240            PrimitiveType::Decimal { precision, scale } => {
241                avro_decimal_schema(*precision as usize, *scale as usize)?
242            }
243        };
244        Ok(Either::Left(avro_schema))
245    }
246}
247
248/// Converting iceberg schema to avro schema.
249pub(crate) fn schema_to_avro_schema(name: impl ToString, schema: &Schema) -> Result<AvroSchema> {
250    let mut converter = SchemaToAvroSchema {
251        schema: name.to_string(),
252    };
253
254    visit_schema(schema, &mut converter).map(Either::unwrap_left)
255}
256
257fn avro_record_schema(name: &str, fields: Vec<AvroRecordField>) -> Result<AvroSchema> {
258    let lookup = fields
259        .iter()
260        .enumerate()
261        .map(|f| (f.1.name.clone(), f.0))
262        .collect();
263
264    Ok(AvroSchema::Record(RecordSchema {
265        name: Name::new(name)?,
266        aliases: None,
267        doc: None,
268        fields,
269        lookup,
270        attributes: Default::default(),
271    }))
272}
273
274pub(crate) fn avro_fixed_schema(len: usize) -> Result<AvroSchema> {
275    Ok(AvroSchema::Fixed(FixedSchema {
276        name: Name::new(format!("fixed_{len}").as_str())?,
277        aliases: None,
278        doc: None,
279        size: len,
280        attributes: Default::default(),
281        default: None,
282    }))
283}
284
285pub(crate) fn avro_decimal_schema(precision: usize, scale: usize) -> Result<AvroSchema> {
286    // Avro decimal logical type annotates Avro bytes _or_ fixed types.
287    // https://avro.apache.org/docs/1.11.1/specification/_print/#decimal
288    // Iceberg spec: Stored as _fixed_ using the minimum number of bytes for the given precision.
289    // https://iceberg.apache.org/spec/#avro
290    Ok(AvroSchema::Decimal(DecimalSchema {
291        precision,
292        scale,
293        inner: Box::new(AvroSchema::Fixed(FixedSchema {
294            // Name is not restricted by the spec.
295            // Refer to iceberg-python https://github.com/apache/iceberg-python/blob/d8bc1ca9af7957ce4d4db99a52c701ac75db7688/pyiceberg/utils/schema_conversion.py#L574-L582
296            name: Name::new(&format!("decimal_{precision}_{scale}")).unwrap(),
297            aliases: None,
298            doc: None,
299            size: crate::spec::Type::decimal_required_bytes(precision as u32)? as usize,
300            attributes: Default::default(),
301            default: None,
302        })),
303    }))
304}
305
306fn avro_optional(avro_schema: AvroSchema) -> Result<AvroSchema> {
307    Ok(AvroSchema::Union(UnionSchema::new(vec![
308        AvroSchema::Null,
309        avro_schema,
310    ])?))
311}
312
313fn is_avro_optional(avro_schema: &AvroSchema) -> bool {
314    match avro_schema {
315        AvroSchema::Union(union) => union.is_nullable(),
316        _ => false,
317    }
318}
319
320/// Post order avro schema visitor.
321pub(crate) trait AvroSchemaVisitor {
322    type T;
323
324    fn record(&mut self, record: &RecordSchema, fields: Vec<Self::T>) -> Result<Self::T>;
325
326    fn union(&mut self, union: &UnionSchema, options: Vec<Self::T>) -> Result<Self::T>;
327
328    fn array(&mut self, array: &ArraySchema, item: Self::T) -> Result<Self::T>;
329    fn map(&mut self, map: &MapSchema, value: Self::T) -> Result<Self::T>;
330    // There are two representation for iceberg map in avro: array of key-value records, or map when keys are strings (optional),
331    // ref: https://iceberg.apache.org/spec/#avro
332    fn map_array(&mut self, array: &RecordSchema, key: Self::T, value: Self::T) -> Result<Self::T>;
333
334    fn primitive(&mut self, schema: &AvroSchema) -> Result<Self::T>;
335}
336
337/// Visit avro schema in post order visitor.
338pub(crate) fn visit<V: AvroSchemaVisitor>(schema: &AvroSchema, visitor: &mut V) -> Result<V::T> {
339    match schema {
340        AvroSchema::Record(record) => {
341            let field_results = record
342                .fields
343                .iter()
344                .map(|f| visit(&f.schema, visitor))
345                .collect::<Result<Vec<V::T>>>()?;
346
347            visitor.record(record, field_results)
348        }
349        AvroSchema::Union(union) => {
350            let option_results = union
351                .variants()
352                .iter()
353                .map(|f| visit(f, visitor))
354                .collect::<Result<Vec<V::T>>>()?;
355
356            visitor.union(union, option_results)
357        }
358        AvroSchema::Array(item) => {
359            if let Some(logical_type) = item
360                .attributes
361                .get(LOGICAL_TYPE)
362                .and_then(|v| Value::as_str(v))
363            {
364                if logical_type == MAP_LOGICAL_TYPE {
365                    if let AvroSchema::Record(record_schema) = &*item.items {
366                        let key = visit(&record_schema.fields[0].schema, visitor)?;
367                        let value = visit(&record_schema.fields[1].schema, visitor)?;
368                        return visitor.map_array(record_schema, key, value);
369                    } else {
370                        return Err(Error::new(
371                            ErrorKind::DataInvalid,
372                            "Can't convert avro map schema, item is not a record.",
373                        ));
374                    }
375                } else {
376                    return Err(Error::new(
377                        ErrorKind::FeatureUnsupported,
378                        format!(
379                            "Logical type {logical_type} is not support in iceberg array type.",
380                        ),
381                    ));
382                }
383            }
384            let item_result = visit(&item.items, visitor)?;
385            visitor.array(item, item_result)
386        }
387        AvroSchema::Map(inner) => {
388            let item_result = visit(&inner.types, visitor)?;
389            visitor.map(inner, item_result)
390        }
391        schema => visitor.primitive(schema),
392    }
393}
394
395struct AvroSchemaToSchema;
396
397impl AvroSchemaToSchema {
398    /// A convenient way to get element id(i32) from attributes.
399    #[inline]
400    fn get_element_id_from_attributes(
401        attributes: &BTreeMap<String, Value>,
402        name: &str,
403    ) -> Result<i32> {
404        attributes
405            .get(name)
406            .ok_or_else(|| {
407                Error::new(
408                    ErrorKind::DataInvalid,
409                    "Can't convert avro array schema, missing element id.",
410                )
411            })?
412            .as_i64()
413            .ok_or_else(|| {
414                Error::new(
415                    ErrorKind::DataInvalid,
416                    "Can't convert avro array schema, element id is not a valid i64 number.",
417                )
418            })?
419            .try_into()
420            .map_err(|_| {
421                Error::new(
422                    ErrorKind::DataInvalid,
423                    "Can't convert avro array schema, element id is not a valid i32.",
424                )
425            })
426    }
427}
428
429impl AvroSchemaVisitor for AvroSchemaToSchema {
430    // Only `AvroSchema::Null` will return `None`
431    type T = Option<Type>;
432
433    fn record(
434        &mut self,
435        record: &RecordSchema,
436        field_types: Vec<Option<Type>>,
437    ) -> Result<Option<Type>> {
438        let mut fields = Vec::with_capacity(field_types.len());
439        for (avro_field, field_type) in record.fields.iter().zip_eq(field_types) {
440            let field_id =
441                Self::get_element_id_from_attributes(&avro_field.custom_attributes, FIELD_ID_PROP)?;
442
443            let optional = is_avro_optional(&avro_field.schema);
444
445            let mut field =
446                NestedField::new(field_id, &avro_field.name, field_type.unwrap(), !optional);
447
448            if let Some(doc) = &avro_field.doc {
449                field = field.with_doc(doc);
450            }
451
452            fields.push(field.into());
453        }
454
455        Ok(Some(Type::Struct(StructType::new(fields))))
456    }
457
458    fn union(
459        &mut self,
460        union: &UnionSchema,
461        mut options: Vec<Option<Type>>,
462    ) -> Result<Option<Type>> {
463        ensure_data_valid!(
464            options.len() <= 2 && !options.is_empty(),
465            "Can't convert avro union type {:?} to iceberg.",
466            union
467        );
468
469        if options.len() > 1 {
470            ensure_data_valid!(
471                options[0].is_none(),
472                "Can't convert avro union type {:?} to iceberg.",
473                union
474            );
475        }
476
477        if options.len() == 1 {
478            Ok(Some(options.remove(0).unwrap()))
479        } else {
480            Ok(Some(options.remove(1).unwrap()))
481        }
482    }
483
484    fn array(&mut self, array: &ArraySchema, item: Option<Type>) -> Result<Self::T> {
485        let element_field_id = Self::get_element_id_from_attributes(&array.attributes, ELEMENT_ID)?;
486        let element_field = NestedField::list_element(
487            element_field_id,
488            item.unwrap(),
489            !is_avro_optional(&array.items),
490        )
491        .into();
492        Ok(Some(Type::List(ListType { element_field })))
493    }
494
495    fn map(&mut self, map: &MapSchema, value: Option<Type>) -> Result<Option<Type>> {
496        let key_field_id = Self::get_element_id_from_attributes(&map.attributes, KEY_ID)?;
497        let key_field =
498            NestedField::map_key_element(key_field_id, Type::Primitive(PrimitiveType::String));
499        let value_field_id = Self::get_element_id_from_attributes(&map.attributes, VALUE_ID)?;
500        let value_field = NestedField::map_value_element(
501            value_field_id,
502            value.unwrap(),
503            !is_avro_optional(&map.types),
504        );
505        Ok(Some(Type::Map(MapType {
506            key_field: key_field.into(),
507            value_field: value_field.into(),
508        })))
509    }
510
511    fn primitive(&mut self, schema: &AvroSchema) -> Result<Option<Type>> {
512        let schema_type = match schema {
513            AvroSchema::Decimal(decimal) => {
514                Type::decimal(decimal.precision as u32, decimal.scale as u32)?
515            }
516            AvroSchema::Date => Type::Primitive(PrimitiveType::Date),
517            AvroSchema::TimeMicros => Type::Primitive(PrimitiveType::Time),
518            AvroSchema::TimestampMicros => Type::Primitive(PrimitiveType::Timestamp),
519            AvroSchema::TimestampNanos => Type::Primitive(PrimitiveType::TimestampNs),
520            AvroSchema::Boolean => Type::Primitive(PrimitiveType::Boolean),
521            AvroSchema::Int => Type::Primitive(PrimitiveType::Int),
522            AvroSchema::Long => Type::Primitive(PrimitiveType::Long),
523            AvroSchema::Float => Type::Primitive(PrimitiveType::Float),
524            AvroSchema::Double => Type::Primitive(PrimitiveType::Double),
525            AvroSchema::Uuid => Type::Primitive(PrimitiveType::Uuid),
526            AvroSchema::String | AvroSchema::Enum(_) => Type::Primitive(PrimitiveType::String),
527            AvroSchema::Fixed(fixed) => Type::Primitive(PrimitiveType::Fixed(fixed.size as u64)),
528            AvroSchema::Bytes => Type::Primitive(PrimitiveType::Binary),
529            AvroSchema::Null => return Ok(None),
530            _ => {
531                return Err(Error::new(
532                    ErrorKind::Unexpected,
533                    "Unable to convert avro {schema} to iceberg primitive type.",
534                ));
535            }
536        };
537
538        Ok(Some(schema_type))
539    }
540
541    fn map_array(
542        &mut self,
543        array: &RecordSchema,
544        key: Option<Type>,
545        value: Option<Type>,
546    ) -> Result<Self::T> {
547        let key = key.ok_or_else(|| {
548            Error::new(
549                ErrorKind::DataInvalid,
550                "Can't convert avro map schema, missing key schema.",
551            )
552        })?;
553        let value = value.ok_or_else(|| {
554            Error::new(
555                ErrorKind::DataInvalid,
556                "Can't convert avro map schema, missing value schema.",
557            )
558        })?;
559        let key_id = Self::get_element_id_from_attributes(
560            &array.fields[0].custom_attributes,
561            FIELD_ID_PROP,
562        )?;
563        let value_id = Self::get_element_id_from_attributes(
564            &array.fields[1].custom_attributes,
565            FIELD_ID_PROP,
566        )?;
567        let key_field = NestedField::map_key_element(key_id, key);
568        let value_field = NestedField::map_value_element(
569            value_id,
570            value,
571            !is_avro_optional(&array.fields[1].schema),
572        );
573        Ok(Some(Type::Map(MapType {
574            key_field: key_field.into(),
575            value_field: value_field.into(),
576        })))
577    }
578}
579
580// # TODO
581// Fix this when we have used `avro_schema_to_schema` inner.
582#[allow(unused)]
583/// Converts avro schema to iceberg schema.
584pub(crate) fn avro_schema_to_schema(avro_schema: &AvroSchema) -> Result<Schema> {
585    if let AvroSchema::Record(_) = avro_schema {
586        let mut converter = AvroSchemaToSchema;
587        let schema_type =
588            visit(avro_schema, &mut converter)?.expect("Iceberg schema should not be none.");
589        if let Type::Struct(s) = schema_type {
590            Schema::builder()
591                .with_fields(s.fields().iter().cloned())
592                .build()
593        } else {
594            Err(Error::new(
595                ErrorKind::Unexpected,
596                format!("Expected to convert avro record schema to struct type, but {schema_type}"),
597            ))
598        }
599    } else {
600        Err(Error::new(
601            ErrorKind::DataInvalid,
602            "Can't convert non record avro schema to iceberg schema: {avro_schema}",
603        ))
604    }
605}
606
607#[cfg(test)]
608mod tests {
609    use std::fs::read_to_string;
610    use std::sync::Arc;
611
612    use apache_avro::Schema as AvroSchema;
613    use apache_avro::schema::{Namespace, UnionSchema};
614
615    use super::*;
616    use crate::avro::schema::AvroSchemaToSchema;
617    use crate::spec::{ListType, MapType, NestedField, PrimitiveType, Schema, StructType, Type};
618
619    fn read_test_data_file_to_avro_schema(filename: &str) -> AvroSchema {
620        let input = read_to_string(format!(
621            "{}/testdata/{}",
622            env!("CARGO_MANIFEST_DIR"),
623            filename
624        ))
625        .unwrap();
626
627        AvroSchema::parse_str(input.as_str()).unwrap()
628    }
629
630    /// Help function to check schema conversion between avro and iceberg:
631    /// 1. avro to iceberg
632    /// 2. iceberg to avro
633    /// 3. iceberg to avro to iceberg back
634    fn check_schema_conversion(avro_schema: AvroSchema, iceberg_schema: Schema) {
635        // 1. avro to iceberg
636        let converted_iceberg_schema = avro_schema_to_schema(&avro_schema).unwrap();
637        assert_eq!(iceberg_schema, converted_iceberg_schema);
638
639        // 2. iceberg to avro
640        let converted_avro_schema = schema_to_avro_schema(
641            avro_schema.name().unwrap().fullname(Namespace::None),
642            &iceberg_schema,
643        )
644        .unwrap();
645        assert_eq!(avro_schema, converted_avro_schema);
646
647        // 3.iceberg to avro to iceberg back
648        let converted_avro_converted_iceberg_schema =
649            avro_schema_to_schema(&converted_avro_schema).unwrap();
650        assert_eq!(iceberg_schema, converted_avro_converted_iceberg_schema);
651    }
652
653    #[test]
654    fn test_manifest_file_v1_schema() {
655        let fields = vec![
656            NestedField::required(500, "manifest_path", PrimitiveType::String.into())
657                .with_doc("Location URI with FS scheme")
658                .into(),
659            NestedField::required(501, "manifest_length", PrimitiveType::Long.into())
660                .with_doc("Total file size in bytes")
661                .into(),
662            NestedField::required(502, "partition_spec_id", PrimitiveType::Int.into())
663                .with_doc("Spec ID used to write")
664                .into(),
665            NestedField::optional(503, "added_snapshot_id", PrimitiveType::Long.into())
666                .with_doc("Snapshot ID that added the manifest")
667                .into(),
668            NestedField::optional(504, "added_data_files_count", PrimitiveType::Int.into())
669                .with_doc("Added entry count")
670                .into(),
671            NestedField::optional(505, "existing_data_files_count", PrimitiveType::Int.into())
672                .with_doc("Existing entry count")
673                .into(),
674            NestedField::optional(506, "deleted_data_files_count", PrimitiveType::Int.into())
675                .with_doc("Deleted entry count")
676                .into(),
677            NestedField::optional(
678                507,
679                "partitions",
680                ListType {
681                    element_field: NestedField::list_element(
682                        508,
683                        StructType::new(vec![
684                            NestedField::required(
685                                509,
686                                "contains_null",
687                                PrimitiveType::Boolean.into(),
688                            )
689                            .with_doc("True if any file has a null partition value")
690                            .into(),
691                            NestedField::optional(
692                                518,
693                                "contains_nan",
694                                PrimitiveType::Boolean.into(),
695                            )
696                            .with_doc("True if any file has a nan partition value")
697                            .into(),
698                            NestedField::optional(510, "lower_bound", PrimitiveType::Binary.into())
699                                .with_doc("Partition lower bound for all files")
700                                .into(),
701                            NestedField::optional(511, "upper_bound", PrimitiveType::Binary.into())
702                                .with_doc("Partition upper bound for all files")
703                                .into(),
704                        ])
705                        .into(),
706                        true,
707                    )
708                    .into(),
709                }
710                .into(),
711            )
712            .with_doc("Summary for each partition")
713            .into(),
714            NestedField::optional(512, "added_rows_count", PrimitiveType::Long.into())
715                .with_doc("Added rows count")
716                .into(),
717            NestedField::optional(513, "existing_rows_count", PrimitiveType::Long.into())
718                .with_doc("Existing rows count")
719                .into(),
720            NestedField::optional(514, "deleted_rows_count", PrimitiveType::Long.into())
721                .with_doc("Deleted rows count")
722                .into(),
723        ];
724
725        let iceberg_schema = Schema::builder().with_fields(fields).build().unwrap();
726        check_schema_conversion(
727            read_test_data_file_to_avro_schema("avro_schema_manifest_file_v1.json"),
728            iceberg_schema,
729        );
730    }
731
732    #[test]
733    fn test_avro_list_required_primitive() {
734        let avro_schema = {
735            AvroSchema::parse_str(
736                r#"
737{
738    "type": "record",
739    "name": "avro_schema",
740    "fields": [
741        {
742            "name": "array_with_string",
743            "type": {
744                "type": "array",
745                "items": "string",
746                "default": [],
747                "element-id": 101
748            },
749            "field-id": 100
750        }
751    ]
752}"#,
753            )
754            .unwrap()
755        };
756
757        let iceberg_schema = {
758            Schema::builder()
759                .with_fields(vec![
760                    NestedField::required(
761                        100,
762                        "array_with_string",
763                        ListType {
764                            element_field: NestedField::list_element(
765                                101,
766                                PrimitiveType::String.into(),
767                                true,
768                            )
769                            .into(),
770                        }
771                        .into(),
772                    )
773                    .into(),
774                ])
775                .build()
776                .unwrap()
777        };
778
779        check_schema_conversion(avro_schema, iceberg_schema);
780    }
781
782    #[test]
783    fn test_avro_list_wrapped_primitive() {
784        let avro_schema = {
785            AvroSchema::parse_str(
786                r#"
787{
788    "type": "record",
789    "name": "avro_schema",
790    "fields": [
791        {
792            "name": "array_with_string",
793            "type": {
794                "type": "array",
795                "items": {"type": "string"},
796                "default": [],
797                "element-id": 101
798            },
799            "field-id": 100
800        }
801    ]
802}
803"#,
804            )
805            .unwrap()
806        };
807
808        let iceberg_schema = {
809            Schema::builder()
810                .with_fields(vec![
811                    NestedField::required(
812                        100,
813                        "array_with_string",
814                        ListType {
815                            element_field: NestedField::list_element(
816                                101,
817                                PrimitiveType::String.into(),
818                                true,
819                            )
820                            .into(),
821                        }
822                        .into(),
823                    )
824                    .into(),
825                ])
826                .build()
827                .unwrap()
828        };
829
830        check_schema_conversion(avro_schema, iceberg_schema);
831    }
832
833    #[test]
834    fn test_avro_list_required_record() {
835        let avro_schema = {
836            AvroSchema::parse_str(
837                r#"
838{
839    "type": "record",
840    "name": "avro_schema",
841    "fields": [
842        {
843            "name": "array_with_record",
844            "type": {
845                "type": "array",
846                "items": {
847                    "type": "record",
848                    "name": "r101",
849                    "fields": [
850                        {
851                            "name": "contains_null",
852                            "type": "boolean",
853                            "field-id": 102
854                        },
855                        {
856                            "name": "contains_nan",
857                            "type": ["null", "boolean"],
858                            "field-id": 103
859                        }
860                    ]
861                },
862                "element-id": 101
863            },
864            "field-id": 100
865        }
866    ]
867}
868"#,
869            )
870            .unwrap()
871        };
872
873        let iceberg_schema = {
874            Schema::builder()
875                .with_fields(vec![
876                    NestedField::required(
877                        100,
878                        "array_with_record",
879                        ListType {
880                            element_field: NestedField::list_element(
881                                101,
882                                StructType::new(vec![
883                                    NestedField::required(
884                                        102,
885                                        "contains_null",
886                                        PrimitiveType::Boolean.into(),
887                                    )
888                                    .into(),
889                                    NestedField::optional(
890                                        103,
891                                        "contains_nan",
892                                        PrimitiveType::Boolean.into(),
893                                    )
894                                    .into(),
895                                ])
896                                .into(),
897                                true,
898                            )
899                            .into(),
900                        }
901                        .into(),
902                    )
903                    .into(),
904                ])
905                .build()
906                .unwrap()
907        };
908
909        check_schema_conversion(avro_schema, iceberg_schema);
910    }
911
912    #[test]
913    fn test_schema_with_array_map() {
914        let avro_schema = {
915            AvroSchema::parse_str(
916                r#"
917{
918    "type": "record",
919    "name": "avro_schema",
920    "fields": [
921        {
922            "name": "optional",
923            "type": {
924                "type": "array",
925                "items": {
926                    "type": "record",
927                    "name": "k102_v103",
928                    "fields": [
929                        {
930                            "name": "key",
931                            "type": "boolean",
932                            "field-id": 102
933                        },
934                        {
935                            "name": "value",
936                            "type": ["null", "boolean"],
937                            "field-id": 103
938                        }
939                    ]
940                },
941                "default": [],
942                "element-id": 101,
943                "logicalType": "map"
944            },
945            "field-id": 100
946        },{
947            "name": "required",
948            "type": {
949                "type": "array",
950                "items": {
951                    "type": "record",
952                    "name": "k105_v106",
953                    "fields": [
954                        {
955                            "name": "key",
956                            "type": "boolean",
957                            "field-id": 105
958                        },
959                        {
960                            "name": "value",
961                            "type": "boolean",
962                            "field-id": 106
963                        }
964                    ]
965                },
966                "default": [],
967                "logicalType": "map"
968            },
969            "field-id": 104
970        }, {
971            "name": "string_map",
972            "type": {
973                "type": "map",
974                "values": ["null", "long"],
975                "key-id": 108,
976                "value-id": 109
977            },
978            "field-id": 107
979        }
980    ]
981}
982"#,
983            )
984            .unwrap()
985        };
986
987        let iceberg_schema = {
988            Schema::builder()
989                .with_fields(vec![
990                    Arc::new(NestedField::required(
991                        100,
992                        "optional",
993                        Type::Map(MapType {
994                            key_field: NestedField::map_key_element(
995                                102,
996                                PrimitiveType::Boolean.into(),
997                            )
998                            .into(),
999                            value_field: NestedField::map_value_element(
1000                                103,
1001                                PrimitiveType::Boolean.into(),
1002                                false,
1003                            )
1004                            .into(),
1005                        }),
1006                    )),
1007                    Arc::new(NestedField::required(
1008                        104,
1009                        "required",
1010                        Type::Map(MapType {
1011                            key_field: NestedField::map_key_element(
1012                                105,
1013                                PrimitiveType::Boolean.into(),
1014                            )
1015                            .into(),
1016                            value_field: NestedField::map_value_element(
1017                                106,
1018                                PrimitiveType::Boolean.into(),
1019                                true,
1020                            )
1021                            .into(),
1022                        }),
1023                    )),
1024                    Arc::new(NestedField::required(
1025                        107,
1026                        "string_map",
1027                        Type::Map(MapType {
1028                            key_field: NestedField::map_key_element(
1029                                108,
1030                                PrimitiveType::String.into(),
1031                            )
1032                            .into(),
1033                            value_field: NestedField::map_value_element(
1034                                109,
1035                                PrimitiveType::Long.into(),
1036                                false,
1037                            )
1038                            .into(),
1039                        }),
1040                    )),
1041                ])
1042                .build()
1043                .unwrap()
1044        };
1045
1046        check_schema_conversion(avro_schema, iceberg_schema);
1047    }
1048
1049    #[test]
1050    fn test_resolve_union() {
1051        let avro_schema = UnionSchema::new(vec![
1052            AvroSchema::Null,
1053            AvroSchema::String,
1054            AvroSchema::Boolean,
1055        ])
1056        .unwrap();
1057
1058        let mut converter = AvroSchemaToSchema;
1059
1060        let options = avro_schema
1061            .variants()
1062            .iter()
1063            .map(|v| converter.primitive(v).unwrap())
1064            .collect();
1065        assert!(converter.union(&avro_schema, options).is_err());
1066    }
1067
1068    #[test]
1069    fn test_string_type() {
1070        let mut converter = AvroSchemaToSchema;
1071        let avro_schema = AvroSchema::String;
1072
1073        assert_eq!(
1074            Some(PrimitiveType::String.into()),
1075            converter.primitive(&avro_schema).unwrap()
1076        );
1077    }
1078
1079    #[test]
1080    fn test_map_type() {
1081        let avro_schema = {
1082            AvroSchema::parse_str(
1083                r#"
1084{
1085    "type": "map",
1086    "values": ["null", "long"],
1087    "key-id": 101,
1088    "value-id": 102
1089}
1090"#,
1091            )
1092            .unwrap()
1093        };
1094
1095        let AvroSchema::Map(avro_schema) = avro_schema else {
1096            unreachable!()
1097        };
1098
1099        let mut converter = AvroSchemaToSchema;
1100        let iceberg_type = Type::Map(MapType {
1101            key_field: NestedField::map_key_element(101, PrimitiveType::String.into()).into(),
1102            value_field: NestedField::map_value_element(102, PrimitiveType::Long.into(), false)
1103                .into(),
1104        });
1105
1106        assert_eq!(
1107            iceberg_type,
1108            converter
1109                .map(&avro_schema, Some(PrimitiveType::Long.into()))
1110                .unwrap()
1111                .unwrap()
1112        );
1113    }
1114
1115    #[test]
1116    fn test_fixed_type() {
1117        let avro_schema = {
1118            AvroSchema::parse_str(
1119                r#"
1120            {"name": "test", "type": "fixed", "size": 22}
1121            "#,
1122            )
1123            .unwrap()
1124        };
1125
1126        let mut converter = AvroSchemaToSchema;
1127
1128        let iceberg_type = Type::from(PrimitiveType::Fixed(22));
1129
1130        assert_eq!(
1131            iceberg_type,
1132            converter.primitive(&avro_schema).unwrap().unwrap()
1133        );
1134    }
1135
1136    #[test]
1137    fn test_unknown_primitive() {
1138        let mut converter = AvroSchemaToSchema;
1139
1140        assert!(converter.primitive(&AvroSchema::Duration).is_err());
1141    }
1142
1143    #[test]
1144    fn test_no_field_id() {
1145        let avro_schema = {
1146            AvroSchema::parse_str(
1147                r#"
1148{
1149    "type": "record",
1150    "name": "avro_schema",
1151    "fields": [
1152        {
1153            "name": "array_with_string",
1154            "type": "string"
1155        }
1156    ]
1157}
1158"#,
1159            )
1160            .unwrap()
1161        };
1162
1163        assert!(avro_schema_to_schema(&avro_schema).is_err());
1164    }
1165
1166    #[test]
1167    fn test_decimal_type() {
1168        let avro_schema = {
1169            AvroSchema::parse_str(
1170                r#"
1171      {"type": "bytes", "logicalType": "decimal", "precision": 25, "scale": 19}
1172            "#,
1173            )
1174            .unwrap()
1175        };
1176
1177        let mut converter = AvroSchemaToSchema;
1178
1179        assert_eq!(
1180            Type::decimal(25, 19).unwrap(),
1181            converter.primitive(&avro_schema).unwrap().unwrap()
1182        );
1183    }
1184
1185    #[test]
1186    fn test_date_type() {
1187        let mut converter = AvroSchemaToSchema;
1188
1189        assert_eq!(
1190            Type::from(PrimitiveType::Date),
1191            converter.primitive(&AvroSchema::Date).unwrap().unwrap()
1192        );
1193    }
1194
1195    #[test]
1196    fn test_uuid_type() {
1197        let avro_schema = {
1198            AvroSchema::parse_str(
1199                r#"
1200            {"name": "test", "type": "fixed", "size": 16, "logicalType": "uuid"}
1201            "#,
1202            )
1203            .unwrap()
1204        };
1205
1206        let mut converter = AvroSchemaToSchema;
1207
1208        let iceberg_type = Type::from(PrimitiveType::Uuid);
1209
1210        assert_eq!(
1211            iceberg_type,
1212            converter.primitive(&avro_schema).unwrap().unwrap()
1213        );
1214    }
1215}