iceberg/arrow/
schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Conversion between Arrow schema and Iceberg schema.
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use arrow_array::types::{Decimal128Type, validate_decimal_precision_and_scale};
24use arrow_array::{
25    BinaryArray, BooleanArray, Date32Array, Datum as ArrowDatum, Decimal128Array,
26    FixedSizeBinaryArray, Float32Array, Float64Array, Int32Array, Int64Array, Scalar, StringArray,
27    TimestampMicrosecondArray,
28};
29use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
30use num_bigint::BigInt;
31use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
32use parquet::file::statistics::Statistics;
33use rust_decimal::prelude::ToPrimitive;
34use uuid::Uuid;
35
36use crate::error::Result;
37use crate::spec::{
38    Datum, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral, PrimitiveType, Schema,
39    SchemaVisitor, StructType, Type,
40};
41use crate::{Error, ErrorKind};
42
43/// When iceberg map type convert to Arrow map type, the default map field name is "key_value".
44pub const DEFAULT_MAP_FIELD_NAME: &str = "key_value";
45/// UTC time zone for Arrow timestamp type.
46pub const UTC_TIME_ZONE: &str = "+00:00";
47
48/// A post order arrow schema visitor.
49///
50/// For order of methods called, please refer to [`visit_schema`].
51pub trait ArrowSchemaVisitor {
52    /// Return type of this visitor on arrow field.
53    type T;
54
55    /// Return type of this visitor on arrow schema.
56    type U;
57
58    /// Called before struct/list/map field.
59    fn before_field(&mut self, _field: &Field) -> Result<()> {
60        Ok(())
61    }
62
63    /// Called after struct/list/map field.
64    fn after_field(&mut self, _field: &Field) -> Result<()> {
65        Ok(())
66    }
67
68    /// Called before list element.
69    fn before_list_element(&mut self, _field: &Field) -> Result<()> {
70        Ok(())
71    }
72
73    /// Called after list element.
74    fn after_list_element(&mut self, _field: &Field) -> Result<()> {
75        Ok(())
76    }
77
78    /// Called before map key.
79    fn before_map_key(&mut self, _field: &Field) -> Result<()> {
80        Ok(())
81    }
82
83    /// Called after map key.
84    fn after_map_key(&mut self, _field: &Field) -> Result<()> {
85        Ok(())
86    }
87
88    /// Called before map value.
89    fn before_map_value(&mut self, _field: &Field) -> Result<()> {
90        Ok(())
91    }
92
93    /// Called after map value.
94    fn after_map_value(&mut self, _field: &Field) -> Result<()> {
95        Ok(())
96    }
97
98    /// Called after schema's type visited.
99    fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U>;
100
101    /// Called after struct's fields visited.
102    fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> Result<Self::T>;
103
104    /// Called after list fields visited.
105    fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T>;
106
107    /// Called after map's key and value fields visited.
108    fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result<Self::T>;
109
110    /// Called when see a primitive type.
111    fn primitive(&mut self, p: &DataType) -> Result<Self::T>;
112}
113
114/// Visiting a type in post order.
115fn visit_type<V: ArrowSchemaVisitor>(r#type: &DataType, visitor: &mut V) -> Result<V::T> {
116    match r#type {
117        p if p.is_primitive()
118            || matches!(
119                p,
120                DataType::Boolean
121                    | DataType::Utf8
122                    | DataType::LargeUtf8
123                    | DataType::Utf8View
124                    | DataType::Binary
125                    | DataType::LargeBinary
126                    | DataType::BinaryView
127                    | DataType::FixedSizeBinary(_)
128            ) =>
129        {
130            visitor.primitive(p)
131        }
132        DataType::List(element_field) => visit_list(r#type, element_field, visitor),
133        DataType::LargeList(element_field) => visit_list(r#type, element_field, visitor),
134        DataType::FixedSizeList(element_field, _) => visit_list(r#type, element_field, visitor),
135        DataType::Map(field, _) => match field.data_type() {
136            DataType::Struct(fields) => {
137                if fields.len() != 2 {
138                    return Err(Error::new(
139                        ErrorKind::DataInvalid,
140                        "Map field must have exactly 2 fields",
141                    ));
142                }
143
144                let key_field = &fields[0];
145                let value_field = &fields[1];
146
147                let key_result = {
148                    visitor.before_map_key(key_field)?;
149                    let ret = visit_type(key_field.data_type(), visitor)?;
150                    visitor.after_map_key(key_field)?;
151                    ret
152                };
153
154                let value_result = {
155                    visitor.before_map_value(value_field)?;
156                    let ret = visit_type(value_field.data_type(), visitor)?;
157                    visitor.after_map_value(value_field)?;
158                    ret
159                };
160
161                visitor.map(r#type, key_result, value_result)
162            }
163            _ => Err(Error::new(
164                ErrorKind::DataInvalid,
165                "Map field must have struct type",
166            )),
167        },
168        DataType::Struct(fields) => visit_struct(fields, visitor),
169        DataType::Dictionary(_key_type, value_type) => visit_type(value_type, visitor),
170        other => Err(Error::new(
171            ErrorKind::DataInvalid,
172            format!("Cannot visit Arrow data type: {other}"),
173        )),
174    }
175}
176
177/// Visit list types in post order.
178fn visit_list<V: ArrowSchemaVisitor>(
179    data_type: &DataType,
180    element_field: &Field,
181    visitor: &mut V,
182) -> Result<V::T> {
183    visitor.before_list_element(element_field)?;
184    let value = visit_type(element_field.data_type(), visitor)?;
185    visitor.after_list_element(element_field)?;
186    visitor.list(data_type, value)
187}
188
189/// Visit struct type in post order.
190fn visit_struct<V: ArrowSchemaVisitor>(fields: &Fields, visitor: &mut V) -> Result<V::T> {
191    let mut results = Vec::with_capacity(fields.len());
192    for field in fields {
193        visitor.before_field(field)?;
194        let result = visit_type(field.data_type(), visitor)?;
195        visitor.after_field(field)?;
196        results.push(result);
197    }
198
199    visitor.r#struct(fields, results)
200}
201
202/// Visit schema in post order.
203fn visit_schema<V: ArrowSchemaVisitor>(schema: &ArrowSchema, visitor: &mut V) -> Result<V::U> {
204    let mut results = Vec::with_capacity(schema.fields().len());
205    for field in schema.fields() {
206        visitor.before_field(field)?;
207        let result = visit_type(field.data_type(), visitor)?;
208        visitor.after_field(field)?;
209        results.push(result);
210    }
211    visitor.schema(schema, results)
212}
213
214/// Convert Arrow schema to Iceberg schema.
215///
216/// Iceberg schema fields require a unique field id, and this function assumes that each field
217/// in the provided Arrow schema contains a field id in its metadata. If the metadata is missing
218/// or the field id is not set, the conversion will fail
219pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result<Schema> {
220    let mut visitor = ArrowSchemaConverter::new();
221    visit_schema(schema, &mut visitor)
222}
223
224/// Convert Arrow type to iceberg type.
225pub fn arrow_type_to_type(ty: &DataType) -> Result<Type> {
226    let mut visitor = ArrowSchemaConverter::new();
227    visit_type(ty, &mut visitor)
228}
229
230const ARROW_FIELD_DOC_KEY: &str = "doc";
231
232pub(super) fn get_field_id(field: &Field) -> Result<i32> {
233    if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
234        return value.parse::<i32>().map_err(|e| {
235            Error::new(
236                ErrorKind::DataInvalid,
237                "Failed to parse field id".to_string(),
238            )
239            .with_context("value", value)
240            .with_source(e)
241        });
242    }
243    Err(Error::new(
244        ErrorKind::DataInvalid,
245        "Field id not found in metadata",
246    ))
247}
248
249fn get_field_doc(field: &Field) -> Option<String> {
250    if let Some(value) = field.metadata().get(ARROW_FIELD_DOC_KEY) {
251        return Some(value.clone());
252    }
253    None
254}
255
256struct ArrowSchemaConverter;
257
258impl ArrowSchemaConverter {
259    fn new() -> Self {
260        Self {}
261    }
262
263    fn convert_fields(fields: &Fields, field_results: &[Type]) -> Result<Vec<NestedFieldRef>> {
264        let mut results = Vec::with_capacity(fields.len());
265        for i in 0..fields.len() {
266            let field = &fields[i];
267            let field_type = &field_results[i];
268            let id = get_field_id(field)?;
269            let doc = get_field_doc(field);
270            let nested_field = NestedField {
271                id,
272                doc,
273                name: field.name().clone(),
274                required: !field.is_nullable(),
275                field_type: Box::new(field_type.clone()),
276                initial_default: None,
277                write_default: None,
278            };
279            results.push(Arc::new(nested_field));
280        }
281        Ok(results)
282    }
283}
284
285impl ArrowSchemaVisitor for ArrowSchemaConverter {
286    type T = Type;
287    type U = Schema;
288
289    fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U> {
290        let fields = Self::convert_fields(schema.fields(), &values)?;
291        let builder = Schema::builder().with_fields(fields);
292        builder.build()
293    }
294
295    fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> Result<Self::T> {
296        let fields = Self::convert_fields(fields, &results)?;
297        Ok(Type::Struct(StructType::new(fields)))
298    }
299
300    fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T> {
301        let element_field = match list {
302            DataType::List(element_field) => element_field,
303            DataType::LargeList(element_field) => element_field,
304            DataType::FixedSizeList(element_field, _) => element_field,
305            _ => {
306                return Err(Error::new(
307                    ErrorKind::DataInvalid,
308                    "List type must have list data type",
309                ));
310            }
311        };
312
313        let id = get_field_id(element_field)?;
314        let doc = get_field_doc(element_field);
315        let mut element_field =
316            NestedField::list_element(id, value.clone(), !element_field.is_nullable());
317        if let Some(doc) = doc {
318            element_field = element_field.with_doc(doc);
319        }
320        let element_field = Arc::new(element_field);
321        Ok(Type::List(ListType { element_field }))
322    }
323
324    fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result<Self::T> {
325        match map {
326            DataType::Map(field, _) => match field.data_type() {
327                DataType::Struct(fields) => {
328                    if fields.len() != 2 {
329                        return Err(Error::new(
330                            ErrorKind::DataInvalid,
331                            "Map field must have exactly 2 fields",
332                        ));
333                    }
334
335                    let key_field = &fields[0];
336                    let value_field = &fields[1];
337
338                    let key_id = get_field_id(key_field)?;
339                    let key_doc = get_field_doc(key_field);
340                    let mut key_field = NestedField::map_key_element(key_id, key_value.clone());
341                    if let Some(doc) = key_doc {
342                        key_field = key_field.with_doc(doc);
343                    }
344                    let key_field = Arc::new(key_field);
345
346                    let value_id = get_field_id(value_field)?;
347                    let value_doc = get_field_doc(value_field);
348                    let mut value_field = NestedField::map_value_element(
349                        value_id,
350                        value.clone(),
351                        !value_field.is_nullable(),
352                    );
353                    if let Some(doc) = value_doc {
354                        value_field = value_field.with_doc(doc);
355                    }
356                    let value_field = Arc::new(value_field);
357
358                    Ok(Type::Map(MapType {
359                        key_field,
360                        value_field,
361                    }))
362                }
363                _ => Err(Error::new(
364                    ErrorKind::DataInvalid,
365                    "Map field must have struct type",
366                )),
367            },
368            _ => Err(Error::new(
369                ErrorKind::DataInvalid,
370                "Map type must have map data type",
371            )),
372        }
373    }
374
375    fn primitive(&mut self, p: &DataType) -> Result<Self::T> {
376        match p {
377            DataType::Boolean => Ok(Type::Primitive(PrimitiveType::Boolean)),
378            DataType::Int8 | DataType::Int16 | DataType::Int32 => {
379                Ok(Type::Primitive(PrimitiveType::Int))
380            }
381            DataType::UInt8 | DataType::UInt16 => Ok(Type::Primitive(PrimitiveType::Int)),
382            DataType::UInt32 => Ok(Type::Primitive(PrimitiveType::Long)),
383            DataType::Int64 => Ok(Type::Primitive(PrimitiveType::Long)),
384            DataType::UInt64 => {
385                // Block uint64 - no safe casting option
386                Err(Error::new(
387                    ErrorKind::DataInvalid,
388                    "UInt64 is not supported. Use Int64 for values ≤ 9,223,372,036,854,775,807 or Decimal(20,0) for full uint64 range.",
389                ))
390            }
391            DataType::Float32 => Ok(Type::Primitive(PrimitiveType::Float)),
392            DataType::Float64 => Ok(Type::Primitive(PrimitiveType::Double)),
393            DataType::Decimal128(p, s) => Type::decimal(*p as u32, *s as u32).map_err(|e| {
394                Error::new(
395                    ErrorKind::DataInvalid,
396                    "Failed to create decimal type".to_string(),
397                )
398                .with_source(e)
399            }),
400            DataType::Date32 => Ok(Type::Primitive(PrimitiveType::Date)),
401            DataType::Time64(unit) if unit == &TimeUnit::Microsecond => {
402                Ok(Type::Primitive(PrimitiveType::Time))
403            }
404            DataType::Timestamp(unit, None) if unit == &TimeUnit::Microsecond => {
405                Ok(Type::Primitive(PrimitiveType::Timestamp))
406            }
407            DataType::Timestamp(unit, None) if unit == &TimeUnit::Nanosecond => {
408                Ok(Type::Primitive(PrimitiveType::TimestampNs))
409            }
410            DataType::Timestamp(unit, Some(zone))
411                if unit == &TimeUnit::Microsecond
412                    && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") =>
413            {
414                Ok(Type::Primitive(PrimitiveType::Timestamptz))
415            }
416            DataType::Timestamp(unit, Some(zone))
417                if unit == &TimeUnit::Nanosecond
418                    && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") =>
419            {
420                Ok(Type::Primitive(PrimitiveType::TimestamptzNs))
421            }
422            DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
423                Ok(Type::Primitive(PrimitiveType::Binary))
424            }
425            DataType::FixedSizeBinary(width) => {
426                Ok(Type::Primitive(PrimitiveType::Fixed(*width as u64)))
427            }
428            DataType::Utf8View | DataType::Utf8 | DataType::LargeUtf8 => {
429                Ok(Type::Primitive(PrimitiveType::String))
430            }
431            _ => Err(Error::new(
432                ErrorKind::DataInvalid,
433                format!("Unsupported Arrow data type: {p}"),
434            )),
435        }
436    }
437}
438
439struct ToArrowSchemaConverter;
440
441enum ArrowSchemaOrFieldOrType {
442    Schema(ArrowSchema),
443    Field(Field),
444    Type(DataType),
445}
446
447impl SchemaVisitor for ToArrowSchemaConverter {
448    type T = ArrowSchemaOrFieldOrType;
449
450    fn schema(
451        &mut self,
452        _schema: &crate::spec::Schema,
453        value: ArrowSchemaOrFieldOrType,
454    ) -> crate::Result<ArrowSchemaOrFieldOrType> {
455        let struct_type = match value {
456            ArrowSchemaOrFieldOrType::Type(DataType::Struct(fields)) => fields,
457            _ => unreachable!(),
458        };
459        Ok(ArrowSchemaOrFieldOrType::Schema(ArrowSchema::new(
460            struct_type,
461        )))
462    }
463
464    fn field(
465        &mut self,
466        field: &crate::spec::NestedFieldRef,
467        value: ArrowSchemaOrFieldOrType,
468    ) -> crate::Result<ArrowSchemaOrFieldOrType> {
469        let ty = match value {
470            ArrowSchemaOrFieldOrType::Type(ty) => ty,
471            _ => unreachable!(),
472        };
473        let metadata = if let Some(doc) = &field.doc {
474            HashMap::from([
475                (PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string()),
476                (ARROW_FIELD_DOC_KEY.to_string(), doc.clone()),
477            ])
478        } else {
479            HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string())])
480        };
481        Ok(ArrowSchemaOrFieldOrType::Field(
482            Field::new(field.name.clone(), ty, !field.required).with_metadata(metadata),
483        ))
484    }
485
486    fn r#struct(
487        &mut self,
488        _: &crate::spec::StructType,
489        results: Vec<ArrowSchemaOrFieldOrType>,
490    ) -> crate::Result<ArrowSchemaOrFieldOrType> {
491        let fields = results
492            .into_iter()
493            .map(|result| match result {
494                ArrowSchemaOrFieldOrType::Field(field) => field,
495                _ => unreachable!(),
496            })
497            .collect();
498        Ok(ArrowSchemaOrFieldOrType::Type(DataType::Struct(fields)))
499    }
500
501    fn list(
502        &mut self,
503        list: &crate::spec::ListType,
504        value: ArrowSchemaOrFieldOrType,
505    ) -> crate::Result<Self::T> {
506        let field = match self.field(&list.element_field, value)? {
507            ArrowSchemaOrFieldOrType::Field(field) => field,
508            _ => unreachable!(),
509        };
510        let meta = if let Some(doc) = &list.element_field.doc {
511            HashMap::from([
512                (
513                    PARQUET_FIELD_ID_META_KEY.to_string(),
514                    list.element_field.id.to_string(),
515                ),
516                (ARROW_FIELD_DOC_KEY.to_string(), doc.clone()),
517            ])
518        } else {
519            HashMap::from([(
520                PARQUET_FIELD_ID_META_KEY.to_string(),
521                list.element_field.id.to_string(),
522            )])
523        };
524        let field = field.with_metadata(meta);
525        Ok(ArrowSchemaOrFieldOrType::Type(DataType::List(Arc::new(
526            field,
527        ))))
528    }
529
530    fn map(
531        &mut self,
532        map: &crate::spec::MapType,
533        key_value: ArrowSchemaOrFieldOrType,
534        value: ArrowSchemaOrFieldOrType,
535    ) -> crate::Result<ArrowSchemaOrFieldOrType> {
536        let key_field = match self.field(&map.key_field, key_value)? {
537            ArrowSchemaOrFieldOrType::Field(field) => field,
538            _ => unreachable!(),
539        };
540        let value_field = match self.field(&map.value_field, value)? {
541            ArrowSchemaOrFieldOrType::Field(field) => field,
542            _ => unreachable!(),
543        };
544        let field = Field::new(
545            DEFAULT_MAP_FIELD_NAME,
546            DataType::Struct(vec![key_field, value_field].into()),
547            // Map field is always not nullable
548            false,
549        );
550
551        Ok(ArrowSchemaOrFieldOrType::Type(DataType::Map(
552            field.into(),
553            false,
554        )))
555    }
556
557    fn primitive(
558        &mut self,
559        p: &crate::spec::PrimitiveType,
560    ) -> crate::Result<ArrowSchemaOrFieldOrType> {
561        match p {
562            crate::spec::PrimitiveType::Boolean => {
563                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Boolean))
564            }
565            crate::spec::PrimitiveType::Int => Ok(ArrowSchemaOrFieldOrType::Type(DataType::Int32)),
566            crate::spec::PrimitiveType::Long => Ok(ArrowSchemaOrFieldOrType::Type(DataType::Int64)),
567            crate::spec::PrimitiveType::Float => {
568                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Float32))
569            }
570            crate::spec::PrimitiveType::Double => {
571                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Float64))
572            }
573            crate::spec::PrimitiveType::Decimal { precision, scale } => {
574                let (precision, scale) = {
575                    let precision: u8 = precision.to_owned().try_into().map_err(|err| {
576                        Error::new(
577                            crate::ErrorKind::DataInvalid,
578                            "incompatible precision for decimal type convert",
579                        )
580                        .with_source(err)
581                    })?;
582                    let scale = scale.to_owned().try_into().map_err(|err| {
583                        Error::new(
584                            crate::ErrorKind::DataInvalid,
585                            "incompatible scale for decimal type convert",
586                        )
587                        .with_source(err)
588                    })?;
589                    (precision, scale)
590                };
591                validate_decimal_precision_and_scale::<Decimal128Type>(precision, scale).map_err(
592                    |err| {
593                        Error::new(
594                            crate::ErrorKind::DataInvalid,
595                            "incompatible precision and scale for decimal type convert",
596                        )
597                        .with_source(err)
598                    },
599                )?;
600                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Decimal128(
601                    precision, scale,
602                )))
603            }
604            crate::spec::PrimitiveType::Date => {
605                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Date32))
606            }
607            crate::spec::PrimitiveType::Time => Ok(ArrowSchemaOrFieldOrType::Type(
608                DataType::Time64(TimeUnit::Microsecond),
609            )),
610            crate::spec::PrimitiveType::Timestamp => Ok(ArrowSchemaOrFieldOrType::Type(
611                DataType::Timestamp(TimeUnit::Microsecond, None),
612            )),
613            crate::spec::PrimitiveType::Timestamptz => Ok(ArrowSchemaOrFieldOrType::Type(
614                // Timestampz always stored as UTC
615                DataType::Timestamp(TimeUnit::Microsecond, Some(UTC_TIME_ZONE.into())),
616            )),
617            crate::spec::PrimitiveType::TimestampNs => Ok(ArrowSchemaOrFieldOrType::Type(
618                DataType::Timestamp(TimeUnit::Nanosecond, None),
619            )),
620            crate::spec::PrimitiveType::TimestamptzNs => Ok(ArrowSchemaOrFieldOrType::Type(
621                // Store timestamptz_ns as UTC
622                DataType::Timestamp(TimeUnit::Nanosecond, Some(UTC_TIME_ZONE.into())),
623            )),
624            crate::spec::PrimitiveType::String => {
625                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Utf8))
626            }
627            crate::spec::PrimitiveType::Uuid => Ok(ArrowSchemaOrFieldOrType::Type(
628                DataType::FixedSizeBinary(16),
629            )),
630            crate::spec::PrimitiveType::Fixed(len) => Ok(ArrowSchemaOrFieldOrType::Type(
631                len.to_i32()
632                    .map(DataType::FixedSizeBinary)
633                    .unwrap_or(DataType::LargeBinary),
634            )),
635            crate::spec::PrimitiveType::Binary => {
636                Ok(ArrowSchemaOrFieldOrType::Type(DataType::LargeBinary))
637            }
638        }
639    }
640}
641
642/// Convert iceberg schema to an arrow schema.
643pub fn schema_to_arrow_schema(schema: &crate::spec::Schema) -> crate::Result<ArrowSchema> {
644    let mut converter = ToArrowSchemaConverter;
645    match crate::spec::visit_schema(schema, &mut converter)? {
646        ArrowSchemaOrFieldOrType::Schema(schema) => Ok(schema),
647        _ => unreachable!(),
648    }
649}
650
651/// Convert iceberg type to an arrow type.
652pub fn type_to_arrow_type(ty: &crate::spec::Type) -> crate::Result<DataType> {
653    let mut converter = ToArrowSchemaConverter;
654    match crate::spec::visit_type(ty, &mut converter)? {
655        ArrowSchemaOrFieldOrType::Type(ty) => Ok(ty),
656        _ => unreachable!(),
657    }
658}
659
660/// Convert Iceberg Datum to Arrow Datum.
661pub(crate) fn get_arrow_datum(datum: &Datum) -> Result<Arc<dyn ArrowDatum + Send + Sync>> {
662    match (datum.data_type(), datum.literal()) {
663        (PrimitiveType::Boolean, PrimitiveLiteral::Boolean(value)) => {
664            Ok(Arc::new(BooleanArray::new_scalar(*value)))
665        }
666        (PrimitiveType::Int, PrimitiveLiteral::Int(value)) => {
667            Ok(Arc::new(Int32Array::new_scalar(*value)))
668        }
669        (PrimitiveType::Long, PrimitiveLiteral::Long(value)) => {
670            Ok(Arc::new(Int64Array::new_scalar(*value)))
671        }
672        (PrimitiveType::Float, PrimitiveLiteral::Float(value)) => {
673            Ok(Arc::new(Float32Array::new_scalar(value.to_f32().unwrap())))
674        }
675        (PrimitiveType::Double, PrimitiveLiteral::Double(value)) => {
676            Ok(Arc::new(Float64Array::new_scalar(value.to_f64().unwrap())))
677        }
678        (PrimitiveType::String, PrimitiveLiteral::String(value)) => {
679            Ok(Arc::new(StringArray::new_scalar(value.as_str())))
680        }
681        (PrimitiveType::Binary, PrimitiveLiteral::Binary(value)) => {
682            Ok(Arc::new(BinaryArray::new_scalar(value.as_slice())))
683        }
684        (PrimitiveType::Date, PrimitiveLiteral::Int(value)) => {
685            Ok(Arc::new(Date32Array::new_scalar(*value)))
686        }
687        (PrimitiveType::Timestamp, PrimitiveLiteral::Long(value)) => {
688            Ok(Arc::new(TimestampMicrosecondArray::new_scalar(*value)))
689        }
690        (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(value)) => Ok(Arc::new(Scalar::new(
691            TimestampMicrosecondArray::new(vec![*value; 1].into(), None).with_timezone_utc(),
692        ))),
693        (PrimitiveType::Decimal { precision, scale }, PrimitiveLiteral::Int128(value)) => {
694            let array = Decimal128Array::from_value(*value, 1)
695                .with_precision_and_scale(*precision as _, *scale as _)
696                .unwrap();
697            Ok(Arc::new(Scalar::new(array)))
698        }
699        (PrimitiveType::Uuid, PrimitiveLiteral::UInt128(value)) => {
700            let bytes = Uuid::from_u128(*value).into_bytes();
701            let array = FixedSizeBinaryArray::try_from_iter(vec![bytes].into_iter()).unwrap();
702            Ok(Arc::new(Scalar::new(array)))
703        }
704
705        (primitive_type, _) => Err(Error::new(
706            ErrorKind::FeatureUnsupported,
707            format!("Converting datum from type {primitive_type:?} to arrow not supported yet."),
708        )),
709    }
710}
711
712pub(crate) fn get_parquet_stat_min_as_datum(
713    primitive_type: &PrimitiveType,
714    stats: &Statistics,
715) -> Result<Option<Datum>> {
716    Ok(match (primitive_type, stats) {
717        (PrimitiveType::Boolean, Statistics::Boolean(stats)) => {
718            stats.min_opt().map(|val| Datum::bool(*val))
719        }
720        (PrimitiveType::Int, Statistics::Int32(stats)) => {
721            stats.min_opt().map(|val| Datum::int(*val))
722        }
723        (PrimitiveType::Date, Statistics::Int32(stats)) => {
724            stats.min_opt().map(|val| Datum::date(*val))
725        }
726        (PrimitiveType::Long, Statistics::Int64(stats)) => {
727            stats.min_opt().map(|val| Datum::long(*val))
728        }
729        (PrimitiveType::Time, Statistics::Int64(stats)) => {
730            let Some(val) = stats.min_opt() else {
731                return Ok(None);
732            };
733
734            Some(Datum::time_micros(*val)?)
735        }
736        (PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
737            stats.min_opt().map(|val| Datum::timestamp_micros(*val))
738        }
739        (PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
740            stats.min_opt().map(|val| Datum::timestamptz_micros(*val))
741        }
742        (PrimitiveType::TimestampNs, Statistics::Int64(stats)) => {
743            stats.min_opt().map(|val| Datum::timestamp_nanos(*val))
744        }
745        (PrimitiveType::TimestamptzNs, Statistics::Int64(stats)) => {
746            stats.min_opt().map(|val| Datum::timestamptz_nanos(*val))
747        }
748        (PrimitiveType::Float, Statistics::Float(stats)) => {
749            stats.min_opt().map(|val| Datum::float(*val))
750        }
751        (PrimitiveType::Double, Statistics::Double(stats)) => {
752            stats.min_opt().map(|val| Datum::double(*val))
753        }
754        (PrimitiveType::String, Statistics::ByteArray(stats)) => {
755            let Some(val) = stats.min_opt() else {
756                return Ok(None);
757            };
758
759            Some(Datum::string(val.as_utf8()?))
760        }
761        (
762            PrimitiveType::Decimal {
763                precision: _,
764                scale: _,
765            },
766            Statistics::ByteArray(stats),
767        ) => {
768            let Some(bytes) = stats.min_bytes_opt() else {
769                return Ok(None);
770            };
771            Some(Datum::new(
772                primitive_type.clone(),
773                PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
774            ))
775        }
776        (
777            PrimitiveType::Decimal {
778                precision: _,
779                scale: _,
780            },
781            Statistics::FixedLenByteArray(stats),
782        ) => {
783            let Some(bytes) = stats.min_bytes_opt() else {
784                return Ok(None);
785            };
786            let unscaled_value = BigInt::from_signed_bytes_be(bytes);
787            Some(Datum::new(
788                primitive_type.clone(),
789                PrimitiveLiteral::Int128(unscaled_value.to_i128().ok_or_else(|| {
790                    Error::new(
791                        ErrorKind::DataInvalid,
792                        format!("Can't convert bytes to i128: {bytes:?}"),
793                    )
794                })?),
795            ))
796        }
797        (
798            PrimitiveType::Decimal {
799                precision: _,
800                scale: _,
801            },
802            Statistics::Int32(stats),
803        ) => stats.min_opt().map(|val| {
804            Datum::new(
805                primitive_type.clone(),
806                PrimitiveLiteral::Int128(i128::from(*val)),
807            )
808        }),
809
810        (
811            PrimitiveType::Decimal {
812                precision: _,
813                scale: _,
814            },
815            Statistics::Int64(stats),
816        ) => stats.min_opt().map(|val| {
817            Datum::new(
818                primitive_type.clone(),
819                PrimitiveLiteral::Int128(i128::from(*val)),
820            )
821        }),
822        (PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) => {
823            let Some(bytes) = stats.min_bytes_opt() else {
824                return Ok(None);
825            };
826            if bytes.len() != 16 {
827                return Err(Error::new(
828                    ErrorKind::Unexpected,
829                    "Invalid length of uuid bytes.",
830                ));
831            }
832            Some(Datum::uuid(Uuid::from_bytes(
833                bytes[..16].try_into().unwrap(),
834            )))
835        }
836        (PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => {
837            let Some(bytes) = stat.min_bytes_opt() else {
838                return Ok(None);
839            };
840            if bytes.len() != *len as usize {
841                return Err(Error::new(
842                    ErrorKind::Unexpected,
843                    "Invalid length of fixed bytes.",
844                ));
845            }
846            Some(Datum::fixed(bytes.to_vec()))
847        }
848        (PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
849            return Ok(stat
850                .min_bytes_opt()
851                .map(|bytes| Datum::binary(bytes.to_vec())));
852        }
853        _ => {
854            return Ok(None);
855        }
856    })
857}
858
859pub(crate) fn get_parquet_stat_max_as_datum(
860    primitive_type: &PrimitiveType,
861    stats: &Statistics,
862) -> Result<Option<Datum>> {
863    Ok(match (primitive_type, stats) {
864        (PrimitiveType::Boolean, Statistics::Boolean(stats)) => {
865            stats.max_opt().map(|val| Datum::bool(*val))
866        }
867        (PrimitiveType::Int, Statistics::Int32(stats)) => {
868            stats.max_opt().map(|val| Datum::int(*val))
869        }
870        (PrimitiveType::Date, Statistics::Int32(stats)) => {
871            stats.max_opt().map(|val| Datum::date(*val))
872        }
873        (PrimitiveType::Long, Statistics::Int64(stats)) => {
874            stats.max_opt().map(|val| Datum::long(*val))
875        }
876        (PrimitiveType::Time, Statistics::Int64(stats)) => {
877            let Some(val) = stats.max_opt() else {
878                return Ok(None);
879            };
880
881            Some(Datum::time_micros(*val)?)
882        }
883        (PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
884            stats.max_opt().map(|val| Datum::timestamp_micros(*val))
885        }
886        (PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
887            stats.max_opt().map(|val| Datum::timestamptz_micros(*val))
888        }
889        (PrimitiveType::TimestampNs, Statistics::Int64(stats)) => {
890            stats.max_opt().map(|val| Datum::timestamp_nanos(*val))
891        }
892        (PrimitiveType::TimestamptzNs, Statistics::Int64(stats)) => {
893            stats.max_opt().map(|val| Datum::timestamptz_nanos(*val))
894        }
895        (PrimitiveType::Float, Statistics::Float(stats)) => {
896            stats.max_opt().map(|val| Datum::float(*val))
897        }
898        (PrimitiveType::Double, Statistics::Double(stats)) => {
899            stats.max_opt().map(|val| Datum::double(*val))
900        }
901        (PrimitiveType::String, Statistics::ByteArray(stats)) => {
902            let Some(val) = stats.max_opt() else {
903                return Ok(None);
904            };
905
906            Some(Datum::string(val.as_utf8()?))
907        }
908        (
909            PrimitiveType::Decimal {
910                precision: _,
911                scale: _,
912            },
913            Statistics::ByteArray(stats),
914        ) => {
915            let Some(bytes) = stats.max_bytes_opt() else {
916                return Ok(None);
917            };
918            Some(Datum::new(
919                primitive_type.clone(),
920                PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
921            ))
922        }
923        (
924            PrimitiveType::Decimal {
925                precision: _,
926                scale: _,
927            },
928            Statistics::FixedLenByteArray(stats),
929        ) => {
930            let Some(bytes) = stats.max_bytes_opt() else {
931                return Ok(None);
932            };
933            let unscaled_value = BigInt::from_signed_bytes_be(bytes);
934            Some(Datum::new(
935                primitive_type.clone(),
936                PrimitiveLiteral::Int128(unscaled_value.to_i128().ok_or_else(|| {
937                    Error::new(
938                        ErrorKind::DataInvalid,
939                        format!("Can't convert bytes to i128: {bytes:?}"),
940                    )
941                })?),
942            ))
943        }
944        (
945            PrimitiveType::Decimal {
946                precision: _,
947                scale: _,
948            },
949            Statistics::Int32(stats),
950        ) => stats.max_opt().map(|val| {
951            Datum::new(
952                primitive_type.clone(),
953                PrimitiveLiteral::Int128(i128::from(*val)),
954            )
955        }),
956
957        (
958            PrimitiveType::Decimal {
959                precision: _,
960                scale: _,
961            },
962            Statistics::Int64(stats),
963        ) => stats.max_opt().map(|val| {
964            Datum::new(
965                primitive_type.clone(),
966                PrimitiveLiteral::Int128(i128::from(*val)),
967            )
968        }),
969        (PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) => {
970            let Some(bytes) = stats.max_bytes_opt() else {
971                return Ok(None);
972            };
973            if bytes.len() != 16 {
974                return Err(Error::new(
975                    ErrorKind::Unexpected,
976                    "Invalid length of uuid bytes.",
977                ));
978            }
979            Some(Datum::uuid(Uuid::from_bytes(
980                bytes[..16].try_into().unwrap(),
981            )))
982        }
983        (PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => {
984            let Some(bytes) = stat.max_bytes_opt() else {
985                return Ok(None);
986            };
987            if bytes.len() != *len as usize {
988                return Err(Error::new(
989                    ErrorKind::Unexpected,
990                    "Invalid length of fixed bytes.",
991                ));
992            }
993            Some(Datum::fixed(bytes.to_vec()))
994        }
995        (PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
996            return Ok(stat
997                .max_bytes_opt()
998                .map(|bytes| Datum::binary(bytes.to_vec())));
999        }
1000        _ => {
1001            return Ok(None);
1002        }
1003    })
1004}
1005
1006impl TryFrom<&ArrowSchema> for crate::spec::Schema {
1007    type Error = Error;
1008
1009    fn try_from(schema: &ArrowSchema) -> crate::Result<Self> {
1010        arrow_schema_to_schema(schema)
1011    }
1012}
1013
1014impl TryFrom<&crate::spec::Schema> for ArrowSchema {
1015    type Error = Error;
1016
1017    fn try_from(schema: &crate::spec::Schema) -> crate::Result<Self> {
1018        schema_to_arrow_schema(schema)
1019    }
1020}
1021
1022#[cfg(test)]
1023mod tests {
1024    use std::collections::HashMap;
1025    use std::sync::Arc;
1026
1027    use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1028    use rust_decimal::Decimal;
1029
1030    use super::*;
1031    use crate::spec::{Literal, Schema};
1032
1033    /// Create a simple field with metadata.
1034    fn simple_field(name: &str, ty: DataType, nullable: bool, value: &str) -> Field {
1035        Field::new(name, ty, nullable).with_metadata(HashMap::from([(
1036            PARQUET_FIELD_ID_META_KEY.to_string(),
1037            value.to_string(),
1038        )]))
1039    }
1040
1041    fn arrow_schema_for_arrow_schema_to_schema_test() -> ArrowSchema {
1042        let fields = Fields::from(vec![
1043            simple_field("key", DataType::Int32, false, "28"),
1044            simple_field("value", DataType::Utf8, true, "29"),
1045        ]);
1046
1047        let r#struct = DataType::Struct(fields);
1048        let map = DataType::Map(
1049            Arc::new(simple_field(DEFAULT_MAP_FIELD_NAME, r#struct, false, "17")),
1050            false,
1051        );
1052        let dictionary = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1053
1054        let fields = Fields::from(vec![
1055            simple_field("aa", DataType::Int32, false, "18"),
1056            simple_field("bb", DataType::Utf8, true, "19"),
1057            simple_field(
1058                "cc",
1059                DataType::Timestamp(TimeUnit::Microsecond, None),
1060                false,
1061                "20",
1062            ),
1063        ]);
1064
1065        let r#struct = DataType::Struct(fields);
1066
1067        ArrowSchema::new(vec![
1068            simple_field("a", DataType::Int32, false, "2"),
1069            simple_field("b", DataType::Int64, false, "1"),
1070            simple_field("c", DataType::Utf8, false, "3"),
1071            simple_field("n", DataType::Utf8, false, "21"),
1072            simple_field(
1073                "d",
1074                DataType::Timestamp(TimeUnit::Microsecond, None),
1075                true,
1076                "4",
1077            ),
1078            simple_field("e", DataType::Boolean, true, "6"),
1079            simple_field("f", DataType::Float32, false, "5"),
1080            simple_field("g", DataType::Float64, false, "7"),
1081            simple_field("p", DataType::Decimal128(10, 2), false, "27"),
1082            simple_field("h", DataType::Date32, false, "8"),
1083            simple_field("i", DataType::Time64(TimeUnit::Microsecond), false, "9"),
1084            simple_field(
1085                "j",
1086                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1087                false,
1088                "10",
1089            ),
1090            simple_field(
1091                "k",
1092                DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1093                false,
1094                "12",
1095            ),
1096            simple_field("l", DataType::Binary, false, "13"),
1097            simple_field("o", DataType::LargeBinary, false, "22"),
1098            simple_field("m", DataType::FixedSizeBinary(10), false, "11"),
1099            simple_field(
1100                "list",
1101                DataType::List(Arc::new(simple_field(
1102                    "element",
1103                    DataType::Int32,
1104                    false,
1105                    "15",
1106                ))),
1107                true,
1108                "14",
1109            ),
1110            simple_field(
1111                "large_list",
1112                DataType::LargeList(Arc::new(simple_field(
1113                    "element",
1114                    DataType::Utf8,
1115                    false,
1116                    "23",
1117                ))),
1118                true,
1119                "24",
1120            ),
1121            simple_field(
1122                "fixed_list",
1123                DataType::FixedSizeList(
1124                    Arc::new(simple_field("element", DataType::Binary, false, "26")),
1125                    10,
1126                ),
1127                true,
1128                "25",
1129            ),
1130            simple_field("map", map, false, "16"),
1131            simple_field("struct", r#struct, false, "17"),
1132            simple_field("dictionary", dictionary, false, "30"),
1133        ])
1134    }
1135
1136    fn iceberg_schema_for_arrow_schema_to_schema_test() -> Schema {
1137        let schema_json = r#"{
1138            "type":"struct",
1139            "schema-id":0,
1140            "fields":[
1141                {
1142                    "id":2,
1143                    "name":"a",
1144                    "required":true,
1145                    "type":"int"
1146                },
1147                {
1148                    "id":1,
1149                    "name":"b",
1150                    "required":true,
1151                    "type":"long"
1152                },
1153                {
1154                    "id":3,
1155                    "name":"c",
1156                    "required":true,
1157                    "type":"string"
1158                },
1159                {
1160                    "id":21,
1161                    "name":"n",
1162                    "required":true,
1163                    "type":"string"
1164                },
1165                {
1166                    "id":4,
1167                    "name":"d",
1168                    "required":false,
1169                    "type":"timestamp"
1170                },
1171                {
1172                    "id":6,
1173                    "name":"e",
1174                    "required":false,
1175                    "type":"boolean"
1176                },
1177                {
1178                    "id":5,
1179                    "name":"f",
1180                    "required":true,
1181                    "type":"float"
1182                },
1183                {
1184                    "id":7,
1185                    "name":"g",
1186                    "required":true,
1187                    "type":"double"
1188                },
1189                {
1190                    "id":27,
1191                    "name":"p",
1192                    "required":true,
1193                    "type":"decimal(10,2)"
1194                },
1195                {
1196                    "id":8,
1197                    "name":"h",
1198                    "required":true,
1199                    "type":"date"
1200                },
1201                {
1202                    "id":9,
1203                    "name":"i",
1204                    "required":true,
1205                    "type":"time"
1206                },
1207                {
1208                    "id":10,
1209                    "name":"j",
1210                    "required":true,
1211                    "type":"timestamptz"
1212                },
1213                {
1214                    "id":12,
1215                    "name":"k",
1216                    "required":true,
1217                    "type":"timestamptz"
1218                },
1219                {
1220                    "id":13,
1221                    "name":"l",
1222                    "required":true,
1223                    "type":"binary"
1224                },
1225                {
1226                    "id":22,
1227                    "name":"o",
1228                    "required":true,
1229                    "type":"binary"
1230                },
1231                {
1232                    "id":11,
1233                    "name":"m",
1234                    "required":true,
1235                    "type":"fixed[10]"
1236                },
1237                {
1238                    "id":14,
1239                    "name":"list",
1240                    "required": false,
1241                    "type": {
1242                        "type": "list",
1243                        "element-id": 15,
1244                        "element-required": true,
1245                        "element": "int"
1246                    }
1247                },
1248                {
1249                    "id":24,
1250                    "name":"large_list",
1251                    "required": false,
1252                    "type": {
1253                        "type": "list",
1254                        "element-id": 23,
1255                        "element-required": true,
1256                        "element": "string"
1257                    }
1258                },
1259                {
1260                    "id":25,
1261                    "name":"fixed_list",
1262                    "required": false,
1263                    "type": {
1264                        "type": "list",
1265                        "element-id": 26,
1266                        "element-required": true,
1267                        "element": "binary"
1268                    }
1269                },
1270                {
1271                    "id":16,
1272                    "name":"map",
1273                    "required": true,
1274                    "type": {
1275                        "type": "map",
1276                        "key-id": 28,
1277                        "key": "int",
1278                        "value-id": 29,
1279                        "value-required": false,
1280                        "value": "string"
1281                    }
1282                },
1283                {
1284                    "id":17,
1285                    "name":"struct",
1286                    "required": true,
1287                    "type": {
1288                        "type": "struct",
1289                        "fields": [
1290                            {
1291                                "id":18,
1292                                "name":"aa",
1293                                "required":true,
1294                                "type":"int"
1295                            },
1296                            {
1297                                "id":19,
1298                                "name":"bb",
1299                                "required":false,
1300                                "type":"string"
1301                            },
1302                            {
1303                                "id":20,
1304                                "name":"cc",
1305                                "required":true,
1306                                "type":"timestamp"
1307                            }
1308                        ]
1309                    }
1310                },
1311                {
1312                    "id":30,
1313                    "name":"dictionary",
1314                    "required":true,
1315                    "type":"string"
1316                }
1317            ],
1318            "identifier-field-ids":[]
1319        }"#;
1320
1321        let schema: Schema = serde_json::from_str(schema_json).unwrap();
1322        schema
1323    }
1324
1325    #[test]
1326    fn test_arrow_schema_to_schema() {
1327        let arrow_schema = arrow_schema_for_arrow_schema_to_schema_test();
1328        let schema = iceberg_schema_for_arrow_schema_to_schema_test();
1329        let converted_schema = arrow_schema_to_schema(&arrow_schema).unwrap();
1330        pretty_assertions::assert_eq!(converted_schema, schema);
1331    }
1332
1333    fn arrow_schema_for_schema_to_arrow_schema_test() -> ArrowSchema {
1334        let fields = Fields::from(vec![
1335            simple_field("key", DataType::Int32, false, "28"),
1336            simple_field("value", DataType::Utf8, true, "29"),
1337        ]);
1338
1339        let r#struct = DataType::Struct(fields);
1340        let map = DataType::Map(
1341            Arc::new(Field::new(DEFAULT_MAP_FIELD_NAME, r#struct, false)),
1342            false,
1343        );
1344
1345        let fields = Fields::from(vec![
1346            simple_field("aa", DataType::Int32, false, "18"),
1347            simple_field("bb", DataType::Utf8, true, "19"),
1348            simple_field(
1349                "cc",
1350                DataType::Timestamp(TimeUnit::Microsecond, None),
1351                false,
1352                "20",
1353            ),
1354        ]);
1355
1356        let r#struct = DataType::Struct(fields);
1357
1358        ArrowSchema::new(vec![
1359            simple_field("a", DataType::Int32, false, "2"),
1360            simple_field("b", DataType::Int64, false, "1"),
1361            simple_field("c", DataType::Utf8, false, "3"),
1362            simple_field("n", DataType::Utf8, false, "21"),
1363            simple_field(
1364                "d",
1365                DataType::Timestamp(TimeUnit::Microsecond, None),
1366                true,
1367                "4",
1368            ),
1369            simple_field("e", DataType::Boolean, true, "6"),
1370            simple_field("f", DataType::Float32, false, "5"),
1371            simple_field("g", DataType::Float64, false, "7"),
1372            simple_field("p", DataType::Decimal128(10, 2), false, "27"),
1373            simple_field("h", DataType::Date32, false, "8"),
1374            simple_field("i", DataType::Time64(TimeUnit::Microsecond), false, "9"),
1375            simple_field(
1376                "j",
1377                DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1378                false,
1379                "10",
1380            ),
1381            simple_field(
1382                "k",
1383                DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1384                false,
1385                "12",
1386            ),
1387            simple_field("l", DataType::LargeBinary, false, "13"),
1388            simple_field("o", DataType::LargeBinary, false, "22"),
1389            simple_field("m", DataType::FixedSizeBinary(10), false, "11"),
1390            simple_field(
1391                "list",
1392                DataType::List(Arc::new(simple_field(
1393                    "element",
1394                    DataType::Int32,
1395                    false,
1396                    "15",
1397                ))),
1398                true,
1399                "14",
1400            ),
1401            simple_field(
1402                "large_list",
1403                DataType::List(Arc::new(simple_field(
1404                    "element",
1405                    DataType::Utf8,
1406                    false,
1407                    "23",
1408                ))),
1409                true,
1410                "24",
1411            ),
1412            simple_field(
1413                "fixed_list",
1414                DataType::List(Arc::new(simple_field(
1415                    "element",
1416                    DataType::LargeBinary,
1417                    false,
1418                    "26",
1419                ))),
1420                true,
1421                "25",
1422            ),
1423            simple_field("map", map, false, "16"),
1424            simple_field("struct", r#struct, false, "17"),
1425            simple_field("uuid", DataType::FixedSizeBinary(16), false, "30"),
1426        ])
1427    }
1428
1429    fn iceberg_schema_for_schema_to_arrow_schema() -> Schema {
1430        let schema_json = r#"{
1431            "type":"struct",
1432            "schema-id":0,
1433            "fields":[
1434                {
1435                    "id":2,
1436                    "name":"a",
1437                    "required":true,
1438                    "type":"int"
1439                },
1440                {
1441                    "id":1,
1442                    "name":"b",
1443                    "required":true,
1444                    "type":"long"
1445                },
1446                {
1447                    "id":3,
1448                    "name":"c",
1449                    "required":true,
1450                    "type":"string"
1451                },
1452                {
1453                    "id":21,
1454                    "name":"n",
1455                    "required":true,
1456                    "type":"string"
1457                },
1458                {
1459                    "id":4,
1460                    "name":"d",
1461                    "required":false,
1462                    "type":"timestamp"
1463                },
1464                {
1465                    "id":6,
1466                    "name":"e",
1467                    "required":false,
1468                    "type":"boolean"
1469                },
1470                {
1471                    "id":5,
1472                    "name":"f",
1473                    "required":true,
1474                    "type":"float"
1475                },
1476                {
1477                    "id":7,
1478                    "name":"g",
1479                    "required":true,
1480                    "type":"double"
1481                },
1482                {
1483                    "id":27,
1484                    "name":"p",
1485                    "required":true,
1486                    "type":"decimal(10,2)"
1487                },
1488                {
1489                    "id":8,
1490                    "name":"h",
1491                    "required":true,
1492                    "type":"date"
1493                },
1494                {
1495                    "id":9,
1496                    "name":"i",
1497                    "required":true,
1498                    "type":"time"
1499                },
1500                {
1501                    "id":10,
1502                    "name":"j",
1503                    "required":true,
1504                    "type":"timestamptz"
1505                },
1506                {
1507                    "id":12,
1508                    "name":"k",
1509                    "required":true,
1510                    "type":"timestamptz"
1511                },
1512                {
1513                    "id":13,
1514                    "name":"l",
1515                    "required":true,
1516                    "type":"binary"
1517                },
1518                {
1519                    "id":22,
1520                    "name":"o",
1521                    "required":true,
1522                    "type":"binary"
1523                },
1524                {
1525                    "id":11,
1526                    "name":"m",
1527                    "required":true,
1528                    "type":"fixed[10]"
1529                },
1530                {
1531                    "id":14,
1532                    "name":"list",
1533                    "required": false,
1534                    "type": {
1535                        "type": "list",
1536                        "element-id": 15,
1537                        "element-required": true,
1538                        "element": "int"
1539                    }
1540                },
1541                {
1542                    "id":24,
1543                    "name":"large_list",
1544                    "required": false,
1545                    "type": {
1546                        "type": "list",
1547                        "element-id": 23,
1548                        "element-required": true,
1549                        "element": "string"
1550                    }
1551                },
1552                {
1553                    "id":25,
1554                    "name":"fixed_list",
1555                    "required": false,
1556                    "type": {
1557                        "type": "list",
1558                        "element-id": 26,
1559                        "element-required": true,
1560                        "element": "binary"
1561                    }
1562                },
1563                {
1564                    "id":16,
1565                    "name":"map",
1566                    "required": true,
1567                    "type": {
1568                        "type": "map",
1569                        "key-id": 28,
1570                        "key": "int",
1571                        "value-id": 29,
1572                        "value-required": false,
1573                        "value": "string"
1574                    }
1575                },
1576                {
1577                    "id":17,
1578                    "name":"struct",
1579                    "required": true,
1580                    "type": {
1581                        "type": "struct",
1582                        "fields": [
1583                            {
1584                                "id":18,
1585                                "name":"aa",
1586                                "required":true,
1587                                "type":"int"
1588                            },
1589                            {
1590                                "id":19,
1591                                "name":"bb",
1592                                "required":false,
1593                                "type":"string"
1594                            },
1595                            {
1596                                "id":20,
1597                                "name":"cc",
1598                                "required":true,
1599                                "type":"timestamp"
1600                            }
1601                        ]
1602                    }
1603                },
1604                {
1605                    "id":30,
1606                    "name":"uuid",
1607                    "required":true,
1608                    "type":"uuid"
1609                }
1610            ],
1611            "identifier-field-ids":[]
1612        }"#;
1613
1614        let schema: Schema = serde_json::from_str(schema_json).unwrap();
1615        schema
1616    }
1617
1618    #[test]
1619    fn test_schema_to_arrow_schema() {
1620        let arrow_schema = arrow_schema_for_schema_to_arrow_schema_test();
1621        let schema = iceberg_schema_for_schema_to_arrow_schema();
1622        let converted_arrow_schema = schema_to_arrow_schema(&schema).unwrap();
1623        assert_eq!(converted_arrow_schema, arrow_schema);
1624    }
1625
1626    #[test]
1627    fn test_type_conversion() {
1628        // test primitive type
1629        {
1630            let arrow_type = DataType::Int32;
1631            let iceberg_type = Type::Primitive(PrimitiveType::Int);
1632            assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1633            assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1634        }
1635
1636        // test struct type
1637        {
1638            // no metadata will cause error
1639            let arrow_type = DataType::Struct(Fields::from(vec![
1640                Field::new("a", DataType::Int64, false),
1641                Field::new("b", DataType::Utf8, true),
1642            ]));
1643            assert_eq!(
1644                &arrow_type_to_type(&arrow_type).unwrap_err().to_string(),
1645                "DataInvalid => Field id not found in metadata"
1646            );
1647
1648            let arrow_type = DataType::Struct(Fields::from(vec![
1649                Field::new("a", DataType::Int64, false).with_metadata(HashMap::from_iter([(
1650                    PARQUET_FIELD_ID_META_KEY.to_string(),
1651                    1.to_string(),
1652                )])),
1653                Field::new("b", DataType::Utf8, true).with_metadata(HashMap::from_iter([(
1654                    PARQUET_FIELD_ID_META_KEY.to_string(),
1655                    2.to_string(),
1656                )])),
1657            ]));
1658            let iceberg_type = Type::Struct(StructType::new(vec![
1659                NestedField {
1660                    id: 1,
1661                    doc: None,
1662                    name: "a".to_string(),
1663                    required: true,
1664                    field_type: Box::new(Type::Primitive(PrimitiveType::Long)),
1665                    initial_default: None,
1666                    write_default: None,
1667                }
1668                .into(),
1669                NestedField {
1670                    id: 2,
1671                    doc: None,
1672                    name: "b".to_string(),
1673                    required: false,
1674                    field_type: Box::new(Type::Primitive(PrimitiveType::String)),
1675                    initial_default: None,
1676                    write_default: None,
1677                }
1678                .into(),
1679            ]));
1680            assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1681            assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1682
1683            // initial_default and write_default is ignored
1684            let iceberg_type = Type::Struct(StructType::new(vec![
1685                NestedField {
1686                    id: 1,
1687                    doc: None,
1688                    name: "a".to_string(),
1689                    required: true,
1690                    field_type: Box::new(Type::Primitive(PrimitiveType::Long)),
1691                    initial_default: Some(Literal::Primitive(PrimitiveLiteral::Int(114514))),
1692                    write_default: None,
1693                }
1694                .into(),
1695                NestedField {
1696                    id: 2,
1697                    doc: None,
1698                    name: "b".to_string(),
1699                    required: false,
1700                    field_type: Box::new(Type::Primitive(PrimitiveType::String)),
1701                    initial_default: None,
1702                    write_default: Some(Literal::Primitive(PrimitiveLiteral::String(
1703                        "514".to_string(),
1704                    ))),
1705                }
1706                .into(),
1707            ]));
1708            assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1709        }
1710
1711        // test dictionary type
1712        {
1713            let arrow_type =
1714                DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int8));
1715            let iceberg_type = Type::Primitive(PrimitiveType::Int);
1716            assert_eq!(
1717                iceberg_type,
1718                arrow_type_to_type(&arrow_type).unwrap(),
1719                "Expected dictionary conversion to use the contained value"
1720            );
1721
1722            let arrow_type =
1723                DataType::Dictionary(Box::new(DataType::Utf8), Box::new(DataType::Boolean));
1724            let iceberg_type = Type::Primitive(PrimitiveType::Boolean);
1725            assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1726        }
1727    }
1728
1729    #[test]
1730    fn test_unsigned_integer_type_conversion() {
1731        let test_cases = vec![
1732            (DataType::UInt8, PrimitiveType::Int),
1733            (DataType::UInt16, PrimitiveType::Int),
1734            (DataType::UInt32, PrimitiveType::Long),
1735        ];
1736
1737        for (arrow_type, expected_iceberg_type) in test_cases {
1738            let arrow_field = Field::new("test", arrow_type.clone(), false).with_metadata(
1739                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1740            );
1741            let arrow_schema = ArrowSchema::new(vec![arrow_field]);
1742
1743            let iceberg_schema = arrow_schema_to_schema(&arrow_schema).unwrap();
1744            let iceberg_field = iceberg_schema.as_struct().fields().first().unwrap();
1745
1746            assert!(
1747                matches!(iceberg_field.field_type.as_ref(), Type::Primitive(t) if *t == expected_iceberg_type),
1748                "Expected {arrow_type:?} to map to {expected_iceberg_type:?}"
1749            );
1750        }
1751
1752        // Test UInt64 blocking
1753        {
1754            let arrow_field = Field::new("test", DataType::UInt64, false).with_metadata(
1755                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1756            );
1757            let arrow_schema = ArrowSchema::new(vec![arrow_field]);
1758
1759            let result = arrow_schema_to_schema(&arrow_schema);
1760            assert!(result.is_err());
1761            assert!(
1762                result
1763                    .unwrap_err()
1764                    .to_string()
1765                    .contains("UInt64 is not supported")
1766            );
1767        }
1768    }
1769
1770    #[test]
1771    fn test_datum_conversion() {
1772        {
1773            let datum = Datum::bool(true);
1774            let arrow_datum = get_arrow_datum(&datum).unwrap();
1775            let (array, is_scalar) = arrow_datum.get();
1776            let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
1777            assert!(is_scalar);
1778            assert!(array.value(0));
1779        }
1780        {
1781            let datum = Datum::int(42);
1782            let arrow_datum = get_arrow_datum(&datum).unwrap();
1783            let (array, is_scalar) = arrow_datum.get();
1784            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
1785            assert!(is_scalar);
1786            assert_eq!(array.value(0), 42);
1787        }
1788        {
1789            let datum = Datum::long(42);
1790            let arrow_datum = get_arrow_datum(&datum).unwrap();
1791            let (array, is_scalar) = arrow_datum.get();
1792            let array = array.as_any().downcast_ref::<Int64Array>().unwrap();
1793            assert!(is_scalar);
1794            assert_eq!(array.value(0), 42);
1795        }
1796        {
1797            let datum = Datum::float(42.42);
1798            let arrow_datum = get_arrow_datum(&datum).unwrap();
1799            let (array, is_scalar) = arrow_datum.get();
1800            let array = array.as_any().downcast_ref::<Float32Array>().unwrap();
1801            assert!(is_scalar);
1802            assert_eq!(array.value(0), 42.42);
1803        }
1804        {
1805            let datum = Datum::double(42.42);
1806            let arrow_datum = get_arrow_datum(&datum).unwrap();
1807            let (array, is_scalar) = arrow_datum.get();
1808            let array = array.as_any().downcast_ref::<Float64Array>().unwrap();
1809            assert!(is_scalar);
1810            assert_eq!(array.value(0), 42.42);
1811        }
1812        {
1813            let datum = Datum::string("abc");
1814            let arrow_datum = get_arrow_datum(&datum).unwrap();
1815            let (array, is_scalar) = arrow_datum.get();
1816            let array = array.as_any().downcast_ref::<StringArray>().unwrap();
1817            assert!(is_scalar);
1818            assert_eq!(array.value(0), "abc");
1819        }
1820        {
1821            let datum = Datum::binary(vec![1, 2, 3, 4]);
1822            let arrow_datum = get_arrow_datum(&datum).unwrap();
1823            let (array, is_scalar) = arrow_datum.get();
1824            let array = array.as_any().downcast_ref::<BinaryArray>().unwrap();
1825            assert!(is_scalar);
1826            assert_eq!(array.value(0), &[1, 2, 3, 4]);
1827        }
1828        {
1829            let datum = Datum::date(42);
1830            let arrow_datum = get_arrow_datum(&datum).unwrap();
1831            let (array, is_scalar) = arrow_datum.get();
1832            let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
1833            assert!(is_scalar);
1834            assert_eq!(array.value(0), 42);
1835        }
1836        {
1837            let datum = Datum::timestamp_micros(42);
1838            let arrow_datum = get_arrow_datum(&datum).unwrap();
1839            let (array, is_scalar) = arrow_datum.get();
1840            let array = array
1841                .as_any()
1842                .downcast_ref::<TimestampMicrosecondArray>()
1843                .unwrap();
1844            assert!(is_scalar);
1845            assert_eq!(array.value(0), 42);
1846        }
1847        {
1848            let datum = Datum::timestamptz_micros(42);
1849            let arrow_datum = get_arrow_datum(&datum).unwrap();
1850            let (array, is_scalar) = arrow_datum.get();
1851            let array = array
1852                .as_any()
1853                .downcast_ref::<TimestampMicrosecondArray>()
1854                .unwrap();
1855            assert!(is_scalar);
1856            assert_eq!(array.timezone(), Some("+00:00"));
1857            assert_eq!(array.value(0), 42);
1858        }
1859        {
1860            let datum = Datum::decimal_with_precision(Decimal::new(123, 2), 30).unwrap();
1861            let arrow_datum = get_arrow_datum(&datum).unwrap();
1862            let (array, is_scalar) = arrow_datum.get();
1863            let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
1864            assert!(is_scalar);
1865            assert_eq!(array.precision(), 30);
1866            assert_eq!(array.scale(), 2);
1867            assert_eq!(array.value(0), 123);
1868        }
1869        {
1870            let datum = Datum::uuid_from_str("42424242-4242-4242-4242-424242424242").unwrap();
1871            let arrow_datum = get_arrow_datum(&datum).unwrap();
1872            let (array, is_scalar) = arrow_datum.get();
1873            let array = array
1874                .as_any()
1875                .downcast_ref::<FixedSizeBinaryArray>()
1876                .unwrap();
1877            assert!(is_scalar);
1878            assert_eq!(array.value(0), [66u8; 16]);
1879        }
1880    }
1881}