iceberg/arrow/
schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Conversion between Arrow schema and Iceberg schema.
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use arrow_array::types::{Decimal128Type, validate_decimal_precision_and_scale};
24use arrow_array::{
25    BinaryArray, BooleanArray, Date32Array, Datum as ArrowDatum, Decimal128Array,
26    FixedSizeBinaryArray, Float32Array, Float64Array, Int32Array, Int64Array, Scalar, StringArray,
27    TimestampMicrosecondArray, TimestampNanosecondArray,
28};
29use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
30use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
31use parquet::file::statistics::Statistics;
32use uuid::Uuid;
33
34use crate::error::Result;
35use crate::spec::decimal_utils::i128_from_be_bytes;
36use crate::spec::{
37    Datum, FIRST_FIELD_ID, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral,
38    PrimitiveType, Schema, SchemaVisitor, StructType, Type,
39};
40use crate::{Error, ErrorKind};
41
42/// When iceberg map type convert to Arrow map type, the default map field name is "key_value".
43pub const DEFAULT_MAP_FIELD_NAME: &str = "key_value";
44/// UTC time zone for Arrow timestamp type.
45pub const UTC_TIME_ZONE: &str = "+00:00";
46
47/// A post order arrow schema visitor.
48///
49/// For order of methods called, please refer to [`visit_schema`].
50pub trait ArrowSchemaVisitor {
51    /// Return type of this visitor on arrow field.
52    type T;
53
54    /// Return type of this visitor on arrow schema.
55    type U;
56
57    /// Called before struct/list/map field.
58    fn before_field(&mut self, _field: &Field) -> Result<()> {
59        Ok(())
60    }
61
62    /// Called after struct/list/map field.
63    fn after_field(&mut self, _field: &Field) -> Result<()> {
64        Ok(())
65    }
66
67    /// Called before list element.
68    fn before_list_element(&mut self, _field: &Field) -> Result<()> {
69        Ok(())
70    }
71
72    /// Called after list element.
73    fn after_list_element(&mut self, _field: &Field) -> Result<()> {
74        Ok(())
75    }
76
77    /// Called before map key.
78    fn before_map_key(&mut self, _field: &Field) -> Result<()> {
79        Ok(())
80    }
81
82    /// Called after map key.
83    fn after_map_key(&mut self, _field: &Field) -> Result<()> {
84        Ok(())
85    }
86
87    /// Called before map value.
88    fn before_map_value(&mut self, _field: &Field) -> Result<()> {
89        Ok(())
90    }
91
92    /// Called after map value.
93    fn after_map_value(&mut self, _field: &Field) -> Result<()> {
94        Ok(())
95    }
96
97    /// Called after schema's type visited.
98    fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U>;
99
100    /// Called after struct's fields visited.
101    fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> Result<Self::T>;
102
103    /// Called after list fields visited.
104    fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T>;
105
106    /// Called after map's key and value fields visited.
107    fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result<Self::T>;
108
109    /// Called when see a primitive type.
110    fn primitive(&mut self, p: &DataType) -> Result<Self::T>;
111}
112
113/// Visiting a type in post order.
114fn visit_type<V: ArrowSchemaVisitor>(r#type: &DataType, visitor: &mut V) -> Result<V::T> {
115    match r#type {
116        p if p.is_primitive()
117            || matches!(
118                p,
119                DataType::Boolean
120                    | DataType::Utf8
121                    | DataType::LargeUtf8
122                    | DataType::Utf8View
123                    | DataType::Binary
124                    | DataType::LargeBinary
125                    | DataType::BinaryView
126                    | DataType::FixedSizeBinary(_)
127            ) =>
128        {
129            visitor.primitive(p)
130        }
131        DataType::List(element_field) => visit_list(r#type, element_field, visitor),
132        DataType::LargeList(element_field) => visit_list(r#type, element_field, visitor),
133        DataType::FixedSizeList(element_field, _) => visit_list(r#type, element_field, visitor),
134        DataType::Map(field, _) => match field.data_type() {
135            DataType::Struct(fields) => {
136                if fields.len() != 2 {
137                    return Err(Error::new(
138                        ErrorKind::DataInvalid,
139                        "Map field must have exactly 2 fields",
140                    ));
141                }
142
143                let key_field = &fields[0];
144                let value_field = &fields[1];
145
146                let key_result = {
147                    visitor.before_map_key(key_field)?;
148                    let ret = visit_type(key_field.data_type(), visitor)?;
149                    visitor.after_map_key(key_field)?;
150                    ret
151                };
152
153                let value_result = {
154                    visitor.before_map_value(value_field)?;
155                    let ret = visit_type(value_field.data_type(), visitor)?;
156                    visitor.after_map_value(value_field)?;
157                    ret
158                };
159
160                visitor.map(r#type, key_result, value_result)
161            }
162            _ => Err(Error::new(
163                ErrorKind::DataInvalid,
164                "Map field must have struct type",
165            )),
166        },
167        DataType::Struct(fields) => visit_struct(fields, visitor),
168        DataType::Dictionary(_key_type, value_type) => visit_type(value_type, visitor),
169        other => Err(Error::new(
170            ErrorKind::DataInvalid,
171            format!("Cannot visit Arrow data type: {other}"),
172        )),
173    }
174}
175
176/// Visit list types in post order.
177fn visit_list<V: ArrowSchemaVisitor>(
178    data_type: &DataType,
179    element_field: &Field,
180    visitor: &mut V,
181) -> Result<V::T> {
182    visitor.before_list_element(element_field)?;
183    let value = visit_type(element_field.data_type(), visitor)?;
184    visitor.after_list_element(element_field)?;
185    visitor.list(data_type, value)
186}
187
188/// Visit struct type in post order.
189fn visit_struct<V: ArrowSchemaVisitor>(fields: &Fields, visitor: &mut V) -> Result<V::T> {
190    let mut results = Vec::with_capacity(fields.len());
191    for field in fields {
192        visitor.before_field(field)?;
193        let result = visit_type(field.data_type(), visitor)?;
194        visitor.after_field(field)?;
195        results.push(result);
196    }
197
198    visitor.r#struct(fields, results)
199}
200
201/// Visit schema in post order.
202pub(crate) fn visit_schema<V: ArrowSchemaVisitor>(
203    schema: &ArrowSchema,
204    visitor: &mut V,
205) -> Result<V::U> {
206    let mut results = Vec::with_capacity(schema.fields().len());
207    for field in schema.fields() {
208        visitor.before_field(field)?;
209        let result = visit_type(field.data_type(), visitor)?;
210        visitor.after_field(field)?;
211        results.push(result);
212    }
213    visitor.schema(schema, results)
214}
215
216/// Convert Arrow schema to Iceberg schema.
217///
218/// Iceberg schema fields require a unique field id, and this function assumes that each field
219/// in the provided Arrow schema contains a field id in its metadata. If the metadata is missing
220/// or the field id is not set, the conversion will fail
221pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result<Schema> {
222    let mut visitor = ArrowSchemaConverter::new();
223    visit_schema(schema, &mut visitor)
224}
225
226/// Convert Arrow schema to Iceberg schema with automatically assigned field IDs.
227///
228/// Unlike [`arrow_schema_to_schema`], this function does not require field IDs in the Arrow
229/// schema metadata. Instead, it automatically assigns unique field IDs starting from 1,
230/// following Iceberg's field ID assignment rules.
231///
232/// This is useful when converting Arrow schemas that don't originate from Iceberg tables,
233/// such as schemas from DataFusion or other Arrow-based systems.
234pub fn arrow_schema_to_schema_auto_assign_ids(schema: &ArrowSchema) -> Result<Schema> {
235    let mut visitor = ArrowSchemaConverter::new_with_field_ids_from(FIRST_FIELD_ID);
236    visit_schema(schema, &mut visitor)
237}
238
239/// Convert Arrow type to iceberg type.
240pub fn arrow_type_to_type(ty: &DataType) -> Result<Type> {
241    let mut visitor = ArrowSchemaConverter::new();
242    visit_type(ty, &mut visitor)
243}
244
245const ARROW_FIELD_DOC_KEY: &str = "doc";
246
247pub(super) fn get_field_id_from_metadata(field: &Field) -> Result<i32> {
248    if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
249        return value.parse::<i32>().map_err(|e| {
250            Error::new(
251                ErrorKind::DataInvalid,
252                "Failed to parse field id".to_string(),
253            )
254            .with_context("value", value)
255            .with_source(e)
256        });
257    }
258    Err(Error::new(
259        ErrorKind::DataInvalid,
260        "Field id not found in metadata",
261    ))
262}
263
264fn get_field_doc(field: &Field) -> Option<String> {
265    if let Some(value) = field.metadata().get(ARROW_FIELD_DOC_KEY) {
266        return Some(value.clone());
267    }
268    None
269}
270
271struct ArrowSchemaConverter {
272    /// When set, the schema builder will reassign field IDs starting from this value
273    /// using level-order traversal (breadth-first).
274    reassign_field_ids_from: Option<i32>,
275    /// Generates unique placeholder IDs for fields before reassignment.
276    /// Required because `ReassignFieldIds` builds an old-to-new ID mapping
277    /// that expects unique input IDs.
278    next_field_id: i32,
279}
280
281impl ArrowSchemaConverter {
282    fn new() -> Self {
283        Self {
284            reassign_field_ids_from: None,
285            next_field_id: 0,
286        }
287    }
288
289    fn new_with_field_ids_from(start_from: i32) -> Self {
290        Self {
291            reassign_field_ids_from: Some(start_from),
292            next_field_id: 0,
293        }
294    }
295
296    fn get_field_id(&mut self, field: &Field) -> Result<i32> {
297        if self.reassign_field_ids_from.is_some() {
298            // Field IDs will be reassigned by the schema builder.
299            // We need unique temporary IDs because ReassignFieldIds builds an
300            // old->new ID mapping that requires unique input IDs.
301            let temp_id = self.next_field_id;
302            self.next_field_id += 1;
303            Ok(temp_id)
304        } else {
305            // Get field ID from arrow field metadata
306            get_field_id_from_metadata(field)
307        }
308    }
309
310    fn convert_fields(
311        &mut self,
312        fields: &Fields,
313        field_results: &[Type],
314    ) -> Result<Vec<NestedFieldRef>> {
315        let mut results = Vec::with_capacity(fields.len());
316        for i in 0..fields.len() {
317            let field = &fields[i];
318            let field_type = &field_results[i];
319            let id = self.get_field_id(field)?;
320            let doc = get_field_doc(field);
321            let nested_field = NestedField {
322                id,
323                doc,
324                name: field.name().clone(),
325                required: !field.is_nullable(),
326                field_type: Box::new(field_type.clone()),
327                initial_default: None,
328                write_default: None,
329            };
330            results.push(Arc::new(nested_field));
331        }
332        Ok(results)
333    }
334}
335
336impl ArrowSchemaVisitor for ArrowSchemaConverter {
337    type T = Type;
338    type U = Schema;
339
340    fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U> {
341        let fields = self.convert_fields(schema.fields(), &values)?;
342        let mut builder = Schema::builder().with_fields(fields);
343        if let Some(start_from) = self.reassign_field_ids_from {
344            builder = builder.with_reassigned_field_ids(start_from)
345        }
346        builder.build()
347    }
348
349    fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> Result<Self::T> {
350        let fields = self.convert_fields(fields, &results)?;
351        Ok(Type::Struct(StructType::new(fields)))
352    }
353
354    fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T> {
355        let element_field = match list {
356            DataType::List(element_field) => element_field,
357            DataType::LargeList(element_field) => element_field,
358            DataType::FixedSizeList(element_field, _) => element_field,
359            _ => {
360                return Err(Error::new(
361                    ErrorKind::DataInvalid,
362                    "List type must have list data type",
363                ));
364            }
365        };
366
367        let id = self.get_field_id(element_field)?;
368        let doc = get_field_doc(element_field);
369        let mut element_field =
370            NestedField::list_element(id, value.clone(), !element_field.is_nullable());
371        if let Some(doc) = doc {
372            element_field = element_field.with_doc(doc);
373        }
374        let element_field = Arc::new(element_field);
375        Ok(Type::List(ListType { element_field }))
376    }
377
378    fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result<Self::T> {
379        match map {
380            DataType::Map(field, _) => match field.data_type() {
381                DataType::Struct(fields) => {
382                    if fields.len() != 2 {
383                        return Err(Error::new(
384                            ErrorKind::DataInvalid,
385                            "Map field must have exactly 2 fields",
386                        ));
387                    }
388
389                    let key_field = &fields[0];
390                    let value_field = &fields[1];
391
392                    let key_id = self.get_field_id(key_field)?;
393                    let key_doc = get_field_doc(key_field);
394                    let mut key_field = NestedField::map_key_element(key_id, key_value.clone());
395                    if let Some(doc) = key_doc {
396                        key_field = key_field.with_doc(doc);
397                    }
398                    let key_field = Arc::new(key_field);
399
400                    let value_id = self.get_field_id(value_field)?;
401                    let value_doc = get_field_doc(value_field);
402                    let mut value_field = NestedField::map_value_element(
403                        value_id,
404                        value.clone(),
405                        !value_field.is_nullable(),
406                    );
407                    if let Some(doc) = value_doc {
408                        value_field = value_field.with_doc(doc);
409                    }
410                    let value_field = Arc::new(value_field);
411
412                    Ok(Type::Map(MapType {
413                        key_field,
414                        value_field,
415                    }))
416                }
417                _ => Err(Error::new(
418                    ErrorKind::DataInvalid,
419                    "Map field must have struct type",
420                )),
421            },
422            _ => Err(Error::new(
423                ErrorKind::DataInvalid,
424                "Map type must have map data type",
425            )),
426        }
427    }
428
429    fn primitive(&mut self, p: &DataType) -> Result<Self::T> {
430        match p {
431            DataType::Boolean => Ok(Type::Primitive(PrimitiveType::Boolean)),
432            DataType::Int8 | DataType::Int16 | DataType::Int32 => {
433                Ok(Type::Primitive(PrimitiveType::Int))
434            }
435            DataType::UInt8 | DataType::UInt16 => Ok(Type::Primitive(PrimitiveType::Int)),
436            DataType::UInt32 => Ok(Type::Primitive(PrimitiveType::Long)),
437            DataType::Int64 => Ok(Type::Primitive(PrimitiveType::Long)),
438            DataType::UInt64 => {
439                // Block uint64 - no safe casting option
440                Err(Error::new(
441                    ErrorKind::DataInvalid,
442                    "UInt64 is not supported. Use Int64 for values ≤ 9,223,372,036,854,775,807 or Decimal(20,0) for full uint64 range.",
443                ))
444            }
445            DataType::Float32 => Ok(Type::Primitive(PrimitiveType::Float)),
446            DataType::Float64 => Ok(Type::Primitive(PrimitiveType::Double)),
447            DataType::Decimal128(p, s) => Type::decimal(*p as u32, *s as u32).map_err(|e| {
448                Error::new(
449                    ErrorKind::DataInvalid,
450                    "Failed to create decimal type".to_string(),
451                )
452                .with_source(e)
453            }),
454            DataType::Date32 => Ok(Type::Primitive(PrimitiveType::Date)),
455            DataType::Time64(unit) if unit == &TimeUnit::Microsecond => {
456                Ok(Type::Primitive(PrimitiveType::Time))
457            }
458            DataType::Timestamp(unit, None) if unit == &TimeUnit::Microsecond => {
459                Ok(Type::Primitive(PrimitiveType::Timestamp))
460            }
461            DataType::Timestamp(unit, None) if unit == &TimeUnit::Nanosecond => {
462                Ok(Type::Primitive(PrimitiveType::TimestampNs))
463            }
464            DataType::Timestamp(unit, Some(zone))
465                if unit == &TimeUnit::Microsecond
466                    && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") =>
467            {
468                Ok(Type::Primitive(PrimitiveType::Timestamptz))
469            }
470            DataType::Timestamp(unit, Some(zone))
471                if unit == &TimeUnit::Nanosecond
472                    && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") =>
473            {
474                Ok(Type::Primitive(PrimitiveType::TimestamptzNs))
475            }
476            DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
477                Ok(Type::Primitive(PrimitiveType::Binary))
478            }
479            DataType::FixedSizeBinary(width) => {
480                Ok(Type::Primitive(PrimitiveType::Fixed(*width as u64)))
481            }
482            DataType::Utf8View | DataType::Utf8 | DataType::LargeUtf8 => {
483                Ok(Type::Primitive(PrimitiveType::String))
484            }
485            _ => Err(Error::new(
486                ErrorKind::DataInvalid,
487                format!("Unsupported Arrow data type: {p}"),
488            )),
489        }
490    }
491}
492
493struct ToArrowSchemaConverter;
494
495enum ArrowSchemaOrFieldOrType {
496    Schema(ArrowSchema),
497    Field(Field),
498    Type(DataType),
499}
500
501impl SchemaVisitor for ToArrowSchemaConverter {
502    type T = ArrowSchemaOrFieldOrType;
503
504    fn schema(
505        &mut self,
506        _schema: &crate::spec::Schema,
507        value: ArrowSchemaOrFieldOrType,
508    ) -> crate::Result<ArrowSchemaOrFieldOrType> {
509        let struct_type = match value {
510            ArrowSchemaOrFieldOrType::Type(DataType::Struct(fields)) => fields,
511            _ => unreachable!(),
512        };
513        Ok(ArrowSchemaOrFieldOrType::Schema(ArrowSchema::new(
514            struct_type,
515        )))
516    }
517
518    fn field(
519        &mut self,
520        field: &crate::spec::NestedFieldRef,
521        value: ArrowSchemaOrFieldOrType,
522    ) -> crate::Result<ArrowSchemaOrFieldOrType> {
523        let ty = match value {
524            ArrowSchemaOrFieldOrType::Type(ty) => ty,
525            _ => unreachable!(),
526        };
527        let metadata = if let Some(doc) = &field.doc {
528            HashMap::from([
529                (PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string()),
530                (ARROW_FIELD_DOC_KEY.to_string(), doc.clone()),
531            ])
532        } else {
533            HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string())])
534        };
535        Ok(ArrowSchemaOrFieldOrType::Field(
536            Field::new(field.name.clone(), ty, !field.required).with_metadata(metadata),
537        ))
538    }
539
540    fn r#struct(
541        &mut self,
542        _: &crate::spec::StructType,
543        results: Vec<ArrowSchemaOrFieldOrType>,
544    ) -> crate::Result<ArrowSchemaOrFieldOrType> {
545        let fields = results
546            .into_iter()
547            .map(|result| match result {
548                ArrowSchemaOrFieldOrType::Field(field) => field,
549                _ => unreachable!(),
550            })
551            .collect();
552        Ok(ArrowSchemaOrFieldOrType::Type(DataType::Struct(fields)))
553    }
554
555    fn list(
556        &mut self,
557        list: &crate::spec::ListType,
558        value: ArrowSchemaOrFieldOrType,
559    ) -> crate::Result<Self::T> {
560        let field = match self.field(&list.element_field, value)? {
561            ArrowSchemaOrFieldOrType::Field(field) => field,
562            _ => unreachable!(),
563        };
564        let meta = if let Some(doc) = &list.element_field.doc {
565            HashMap::from([
566                (
567                    PARQUET_FIELD_ID_META_KEY.to_string(),
568                    list.element_field.id.to_string(),
569                ),
570                (ARROW_FIELD_DOC_KEY.to_string(), doc.clone()),
571            ])
572        } else {
573            HashMap::from([(
574                PARQUET_FIELD_ID_META_KEY.to_string(),
575                list.element_field.id.to_string(),
576            )])
577        };
578        let field = field.with_metadata(meta);
579        Ok(ArrowSchemaOrFieldOrType::Type(DataType::List(Arc::new(
580            field,
581        ))))
582    }
583
584    fn map(
585        &mut self,
586        map: &crate::spec::MapType,
587        key_value: ArrowSchemaOrFieldOrType,
588        value: ArrowSchemaOrFieldOrType,
589    ) -> crate::Result<ArrowSchemaOrFieldOrType> {
590        let key_field = match self.field(&map.key_field, key_value)? {
591            ArrowSchemaOrFieldOrType::Field(field) => field,
592            _ => unreachable!(),
593        };
594        let value_field = match self.field(&map.value_field, value)? {
595            ArrowSchemaOrFieldOrType::Field(field) => field,
596            _ => unreachable!(),
597        };
598        let field = Field::new(
599            DEFAULT_MAP_FIELD_NAME,
600            DataType::Struct(vec![key_field, value_field].into()),
601            // Map field is always not nullable
602            false,
603        );
604
605        Ok(ArrowSchemaOrFieldOrType::Type(DataType::Map(
606            field.into(),
607            false,
608        )))
609    }
610
611    fn primitive(
612        &mut self,
613        p: &crate::spec::PrimitiveType,
614    ) -> crate::Result<ArrowSchemaOrFieldOrType> {
615        match p {
616            crate::spec::PrimitiveType::Boolean => {
617                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Boolean))
618            }
619            crate::spec::PrimitiveType::Int => Ok(ArrowSchemaOrFieldOrType::Type(DataType::Int32)),
620            crate::spec::PrimitiveType::Long => Ok(ArrowSchemaOrFieldOrType::Type(DataType::Int64)),
621            crate::spec::PrimitiveType::Float => {
622                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Float32))
623            }
624            crate::spec::PrimitiveType::Double => {
625                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Float64))
626            }
627            crate::spec::PrimitiveType::Decimal { precision, scale } => {
628                let (precision, scale) = {
629                    let precision: u8 = precision.to_owned().try_into().map_err(|err| {
630                        Error::new(
631                            crate::ErrorKind::DataInvalid,
632                            "incompatible precision for decimal type convert",
633                        )
634                        .with_source(err)
635                    })?;
636                    let scale = scale.to_owned().try_into().map_err(|err| {
637                        Error::new(
638                            crate::ErrorKind::DataInvalid,
639                            "incompatible scale for decimal type convert",
640                        )
641                        .with_source(err)
642                    })?;
643                    (precision, scale)
644                };
645                validate_decimal_precision_and_scale::<Decimal128Type>(precision, scale).map_err(
646                    |err| {
647                        Error::new(
648                            crate::ErrorKind::DataInvalid,
649                            "incompatible precision and scale for decimal type convert",
650                        )
651                        .with_source(err)
652                    },
653                )?;
654                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Decimal128(
655                    precision, scale,
656                )))
657            }
658            crate::spec::PrimitiveType::Date => {
659                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Date32))
660            }
661            crate::spec::PrimitiveType::Time => Ok(ArrowSchemaOrFieldOrType::Type(
662                DataType::Time64(TimeUnit::Microsecond),
663            )),
664            crate::spec::PrimitiveType::Timestamp => Ok(ArrowSchemaOrFieldOrType::Type(
665                DataType::Timestamp(TimeUnit::Microsecond, None),
666            )),
667            crate::spec::PrimitiveType::Timestamptz => Ok(ArrowSchemaOrFieldOrType::Type(
668                // Timestampz always stored as UTC
669                DataType::Timestamp(TimeUnit::Microsecond, Some(UTC_TIME_ZONE.into())),
670            )),
671            crate::spec::PrimitiveType::TimestampNs => Ok(ArrowSchemaOrFieldOrType::Type(
672                DataType::Timestamp(TimeUnit::Nanosecond, None),
673            )),
674            crate::spec::PrimitiveType::TimestamptzNs => Ok(ArrowSchemaOrFieldOrType::Type(
675                // Store timestamptz_ns as UTC
676                DataType::Timestamp(TimeUnit::Nanosecond, Some(UTC_TIME_ZONE.into())),
677            )),
678            crate::spec::PrimitiveType::String => {
679                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Utf8))
680            }
681            crate::spec::PrimitiveType::Uuid => Ok(ArrowSchemaOrFieldOrType::Type(
682                DataType::FixedSizeBinary(16),
683            )),
684            crate::spec::PrimitiveType::Fixed(len) => Ok(ArrowSchemaOrFieldOrType::Type(
685                i32::try_from(*len)
686                    .ok()
687                    .map(DataType::FixedSizeBinary)
688                    .unwrap_or(DataType::LargeBinary),
689            )),
690            crate::spec::PrimitiveType::Binary => {
691                Ok(ArrowSchemaOrFieldOrType::Type(DataType::LargeBinary))
692            }
693        }
694    }
695}
696
697/// Convert iceberg schema to an arrow schema.
698pub fn schema_to_arrow_schema(schema: &crate::spec::Schema) -> crate::Result<ArrowSchema> {
699    let mut converter = ToArrowSchemaConverter;
700    match crate::spec::visit_schema(schema, &mut converter)? {
701        ArrowSchemaOrFieldOrType::Schema(schema) => Ok(schema),
702        _ => unreachable!(),
703    }
704}
705
706/// Convert iceberg type to an arrow type.
707pub fn type_to_arrow_type(ty: &crate::spec::Type) -> crate::Result<DataType> {
708    let mut converter = ToArrowSchemaConverter;
709    match crate::spec::visit_type(ty, &mut converter)? {
710        ArrowSchemaOrFieldOrType::Type(ty) => Ok(ty),
711        _ => unreachable!(),
712    }
713}
714
715/// Convert Iceberg Datum to Arrow Datum.
716pub(crate) fn get_arrow_datum(datum: &Datum) -> Result<Arc<dyn ArrowDatum + Send + Sync>> {
717    match (datum.data_type(), datum.literal()) {
718        (PrimitiveType::Boolean, PrimitiveLiteral::Boolean(value)) => {
719            Ok(Arc::new(BooleanArray::new_scalar(*value)))
720        }
721        (PrimitiveType::Int, PrimitiveLiteral::Int(value)) => {
722            Ok(Arc::new(Int32Array::new_scalar(*value)))
723        }
724        (PrimitiveType::Long, PrimitiveLiteral::Long(value)) => {
725            Ok(Arc::new(Int64Array::new_scalar(*value)))
726        }
727        (PrimitiveType::Float, PrimitiveLiteral::Float(value)) => {
728            Ok(Arc::new(Float32Array::new_scalar(value.into_inner())))
729        }
730        (PrimitiveType::Double, PrimitiveLiteral::Double(value)) => {
731            Ok(Arc::new(Float64Array::new_scalar(value.into_inner())))
732        }
733        (PrimitiveType::String, PrimitiveLiteral::String(value)) => {
734            Ok(Arc::new(StringArray::new_scalar(value.as_str())))
735        }
736        (PrimitiveType::Binary, PrimitiveLiteral::Binary(value)) => {
737            Ok(Arc::new(BinaryArray::new_scalar(value.as_slice())))
738        }
739        (PrimitiveType::Date, PrimitiveLiteral::Int(value)) => {
740            Ok(Arc::new(Date32Array::new_scalar(*value)))
741        }
742        (PrimitiveType::Timestamp, PrimitiveLiteral::Long(value)) => {
743            Ok(Arc::new(TimestampMicrosecondArray::new_scalar(*value)))
744        }
745        (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(value)) => Ok(Arc::new(Scalar::new(
746            TimestampMicrosecondArray::new(vec![*value; 1].into(), None).with_timezone_utc(),
747        ))),
748        (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(value)) => {
749            Ok(Arc::new(TimestampNanosecondArray::new_scalar(*value)))
750        }
751        (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(value)) => Ok(Arc::new(Scalar::new(
752            TimestampNanosecondArray::new(vec![*value; 1].into(), None).with_timezone_utc(),
753        ))),
754        (PrimitiveType::Decimal { precision, scale }, PrimitiveLiteral::Int128(value)) => {
755            let array = Decimal128Array::from_value(*value, 1)
756                .with_precision_and_scale(*precision as _, *scale as _)
757                .unwrap();
758            Ok(Arc::new(Scalar::new(array)))
759        }
760        (PrimitiveType::Uuid, PrimitiveLiteral::UInt128(value)) => {
761            let bytes = Uuid::from_u128(*value).into_bytes();
762            let array = FixedSizeBinaryArray::try_from_iter(vec![bytes].into_iter()).unwrap();
763            Ok(Arc::new(Scalar::new(array)))
764        }
765        (PrimitiveType::Fixed(_), PrimitiveLiteral::Binary(value)) => {
766            let array = FixedSizeBinaryArray::try_from_iter(std::iter::once(value.as_slice()))
767                .map_err(|e| Error::new(ErrorKind::DataInvalid, e.to_string()))?;
768            Ok(Arc::new(Scalar::new(array)))
769        }
770
771        (primitive_type, _) => Err(Error::new(
772            ErrorKind::FeatureUnsupported,
773            format!("Converting datum from type {primitive_type:?} to arrow not supported yet."),
774        )),
775    }
776}
777
778pub(crate) fn get_parquet_stat_min_as_datum(
779    primitive_type: &PrimitiveType,
780    stats: &Statistics,
781) -> Result<Option<Datum>> {
782    Ok(match (primitive_type, stats) {
783        (PrimitiveType::Boolean, Statistics::Boolean(stats)) => {
784            stats.min_opt().map(|val| Datum::bool(*val))
785        }
786        (PrimitiveType::Int, Statistics::Int32(stats)) => {
787            stats.min_opt().map(|val| Datum::int(*val))
788        }
789        (PrimitiveType::Date, Statistics::Int32(stats)) => {
790            stats.min_opt().map(|val| Datum::date(*val))
791        }
792        (PrimitiveType::Long, Statistics::Int64(stats)) => {
793            stats.min_opt().map(|val| Datum::long(*val))
794        }
795        (PrimitiveType::Time, Statistics::Int64(stats)) => {
796            let Some(val) = stats.min_opt() else {
797                return Ok(None);
798            };
799
800            Some(Datum::time_micros(*val)?)
801        }
802        (PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
803            stats.min_opt().map(|val| Datum::timestamp_micros(*val))
804        }
805        (PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
806            stats.min_opt().map(|val| Datum::timestamptz_micros(*val))
807        }
808        (PrimitiveType::TimestampNs, Statistics::Int64(stats)) => {
809            stats.min_opt().map(|val| Datum::timestamp_nanos(*val))
810        }
811        (PrimitiveType::TimestamptzNs, Statistics::Int64(stats)) => {
812            stats.min_opt().map(|val| Datum::timestamptz_nanos(*val))
813        }
814        (PrimitiveType::Float, Statistics::Float(stats)) => {
815            stats.min_opt().map(|val| Datum::float(*val))
816        }
817        (PrimitiveType::Double, Statistics::Double(stats)) => {
818            stats.min_opt().map(|val| Datum::double(*val))
819        }
820        (PrimitiveType::String, Statistics::ByteArray(stats)) => {
821            let Some(val) = stats.min_opt() else {
822                return Ok(None);
823            };
824
825            Some(Datum::string(val.as_utf8()?))
826        }
827        (
828            PrimitiveType::Decimal {
829                precision: _,
830                scale: _,
831            },
832            Statistics::ByteArray(stats),
833        ) => {
834            let Some(bytes) = stats.min_bytes_opt() else {
835                return Ok(None);
836            };
837            Some(Datum::new(
838                primitive_type.clone(),
839                PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
840            ))
841        }
842        (
843            PrimitiveType::Decimal {
844                precision: _,
845                scale: _,
846            },
847            Statistics::FixedLenByteArray(stats),
848        ) => {
849            let Some(bytes) = stats.min_bytes_opt() else {
850                return Ok(None);
851            };
852            Some(Datum::new(
853                primitive_type.clone(),
854                PrimitiveLiteral::Int128(i128_from_be_bytes(bytes).ok_or_else(|| {
855                    Error::new(
856                        ErrorKind::DataInvalid,
857                        format!("Can't convert bytes to i128: {bytes:?}"),
858                    )
859                })?),
860            ))
861        }
862        (
863            PrimitiveType::Decimal {
864                precision: _,
865                scale: _,
866            },
867            Statistics::Int32(stats),
868        ) => stats.min_opt().map(|val| {
869            Datum::new(
870                primitive_type.clone(),
871                PrimitiveLiteral::Int128(i128::from(*val)),
872            )
873        }),
874
875        (
876            PrimitiveType::Decimal {
877                precision: _,
878                scale: _,
879            },
880            Statistics::Int64(stats),
881        ) => stats.min_opt().map(|val| {
882            Datum::new(
883                primitive_type.clone(),
884                PrimitiveLiteral::Int128(i128::from(*val)),
885            )
886        }),
887        (PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) => {
888            let Some(bytes) = stats.min_bytes_opt() else {
889                return Ok(None);
890            };
891            if bytes.len() != 16 {
892                return Err(Error::new(
893                    ErrorKind::Unexpected,
894                    "Invalid length of uuid bytes.",
895                ));
896            }
897            Some(Datum::uuid(Uuid::from_bytes(
898                bytes[..16].try_into().unwrap(),
899            )))
900        }
901        (PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => {
902            let Some(bytes) = stat.min_bytes_opt() else {
903                return Ok(None);
904            };
905            if bytes.len() != *len as usize {
906                return Err(Error::new(
907                    ErrorKind::Unexpected,
908                    "Invalid length of fixed bytes.",
909                ));
910            }
911            Some(Datum::fixed(bytes.to_vec()))
912        }
913        (PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
914            return Ok(stat
915                .min_bytes_opt()
916                .map(|bytes| Datum::binary(bytes.to_vec())));
917        }
918        _ => {
919            return Ok(None);
920        }
921    })
922}
923
924pub(crate) fn get_parquet_stat_max_as_datum(
925    primitive_type: &PrimitiveType,
926    stats: &Statistics,
927) -> Result<Option<Datum>> {
928    Ok(match (primitive_type, stats) {
929        (PrimitiveType::Boolean, Statistics::Boolean(stats)) => {
930            stats.max_opt().map(|val| Datum::bool(*val))
931        }
932        (PrimitiveType::Int, Statistics::Int32(stats)) => {
933            stats.max_opt().map(|val| Datum::int(*val))
934        }
935        (PrimitiveType::Date, Statistics::Int32(stats)) => {
936            stats.max_opt().map(|val| Datum::date(*val))
937        }
938        (PrimitiveType::Long, Statistics::Int64(stats)) => {
939            stats.max_opt().map(|val| Datum::long(*val))
940        }
941        (PrimitiveType::Time, Statistics::Int64(stats)) => {
942            let Some(val) = stats.max_opt() else {
943                return Ok(None);
944            };
945
946            Some(Datum::time_micros(*val)?)
947        }
948        (PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
949            stats.max_opt().map(|val| Datum::timestamp_micros(*val))
950        }
951        (PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
952            stats.max_opt().map(|val| Datum::timestamptz_micros(*val))
953        }
954        (PrimitiveType::TimestampNs, Statistics::Int64(stats)) => {
955            stats.max_opt().map(|val| Datum::timestamp_nanos(*val))
956        }
957        (PrimitiveType::TimestamptzNs, Statistics::Int64(stats)) => {
958            stats.max_opt().map(|val| Datum::timestamptz_nanos(*val))
959        }
960        (PrimitiveType::Float, Statistics::Float(stats)) => {
961            stats.max_opt().map(|val| Datum::float(*val))
962        }
963        (PrimitiveType::Double, Statistics::Double(stats)) => {
964            stats.max_opt().map(|val| Datum::double(*val))
965        }
966        (PrimitiveType::String, Statistics::ByteArray(stats)) => {
967            let Some(val) = stats.max_opt() else {
968                return Ok(None);
969            };
970
971            Some(Datum::string(val.as_utf8()?))
972        }
973        (
974            PrimitiveType::Decimal {
975                precision: _,
976                scale: _,
977            },
978            Statistics::ByteArray(stats),
979        ) => {
980            let Some(bytes) = stats.max_bytes_opt() else {
981                return Ok(None);
982            };
983            Some(Datum::new(
984                primitive_type.clone(),
985                PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
986            ))
987        }
988        (
989            PrimitiveType::Decimal {
990                precision: _,
991                scale: _,
992            },
993            Statistics::FixedLenByteArray(stats),
994        ) => {
995            let Some(bytes) = stats.max_bytes_opt() else {
996                return Ok(None);
997            };
998            Some(Datum::new(
999                primitive_type.clone(),
1000                PrimitiveLiteral::Int128(i128_from_be_bytes(bytes).ok_or_else(|| {
1001                    Error::new(
1002                        ErrorKind::DataInvalid,
1003                        format!("Can't convert bytes to i128: {bytes:?}"),
1004                    )
1005                })?),
1006            ))
1007        }
1008        (
1009            PrimitiveType::Decimal {
1010                precision: _,
1011                scale: _,
1012            },
1013            Statistics::Int32(stats),
1014        ) => stats.max_opt().map(|val| {
1015            Datum::new(
1016                primitive_type.clone(),
1017                PrimitiveLiteral::Int128(i128::from(*val)),
1018            )
1019        }),
1020
1021        (
1022            PrimitiveType::Decimal {
1023                precision: _,
1024                scale: _,
1025            },
1026            Statistics::Int64(stats),
1027        ) => stats.max_opt().map(|val| {
1028            Datum::new(
1029                primitive_type.clone(),
1030                PrimitiveLiteral::Int128(i128::from(*val)),
1031            )
1032        }),
1033        (PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) => {
1034            let Some(bytes) = stats.max_bytes_opt() else {
1035                return Ok(None);
1036            };
1037            if bytes.len() != 16 {
1038                return Err(Error::new(
1039                    ErrorKind::Unexpected,
1040                    "Invalid length of uuid bytes.",
1041                ));
1042            }
1043            Some(Datum::uuid(Uuid::from_bytes(
1044                bytes[..16].try_into().unwrap(),
1045            )))
1046        }
1047        (PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => {
1048            let Some(bytes) = stat.max_bytes_opt() else {
1049                return Ok(None);
1050            };
1051            if bytes.len() != *len as usize {
1052                return Err(Error::new(
1053                    ErrorKind::Unexpected,
1054                    "Invalid length of fixed bytes.",
1055                ));
1056            }
1057            Some(Datum::fixed(bytes.to_vec()))
1058        }
1059        (PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
1060            return Ok(stat
1061                .max_bytes_opt()
1062                .map(|bytes| Datum::binary(bytes.to_vec())));
1063        }
1064        _ => {
1065            return Ok(None);
1066        }
1067    })
1068}
1069
1070impl TryFrom<&ArrowSchema> for crate::spec::Schema {
1071    type Error = Error;
1072
1073    fn try_from(schema: &ArrowSchema) -> crate::Result<Self> {
1074        arrow_schema_to_schema(schema)
1075    }
1076}
1077
1078impl TryFrom<&crate::spec::Schema> for ArrowSchema {
1079    type Error = Error;
1080
1081    fn try_from(schema: &crate::spec::Schema) -> crate::Result<Self> {
1082        schema_to_arrow_schema(schema)
1083    }
1084}
1085
1086/// Converts a Datum (Iceberg type + primitive literal) to its corresponding Arrow DataType
1087/// with Run-End Encoding (REE).
1088///
1089/// This function is used for constant fields in record batches, where all values are the same.
1090/// Run-End Encoding provides efficient storage for such constant columns.
1091///
1092/// # Arguments
1093/// * `datum` - The Datum to convert, which contains both type and value information
1094///
1095/// # Returns
1096/// Arrow DataType with Run-End Encoding applied
1097///
1098/// # Example
1099/// ```
1100/// use iceberg::arrow::datum_to_arrow_type_with_ree;
1101/// use iceberg::spec::Datum;
1102///
1103/// let datum = Datum::string("test_file.parquet");
1104/// let ree_type = datum_to_arrow_type_with_ree(&datum);
1105/// // Returns: RunEndEncoded(Int32, Utf8)
1106/// ```
1107pub fn datum_to_arrow_type_with_ree(datum: &Datum) -> DataType {
1108    // Helper to create REE type with the given values type.
1109    // Note: values field is nullable as Arrow expects this when building the
1110    // final Arrow schema with `RunArray::try_new`.
1111    let make_ree = |values_type: DataType| -> DataType {
1112        let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false));
1113        let values_field = Arc::new(Field::new("values", values_type, true));
1114        DataType::RunEndEncoded(run_ends_field, values_field)
1115    };
1116
1117    // Match on the PrimitiveType from the Datum to determine the Arrow type
1118    match datum.data_type() {
1119        PrimitiveType::Boolean => make_ree(DataType::Boolean),
1120        PrimitiveType::Int => make_ree(DataType::Int32),
1121        PrimitiveType::Long => make_ree(DataType::Int64),
1122        PrimitiveType::Float => make_ree(DataType::Float32),
1123        PrimitiveType::Double => make_ree(DataType::Float64),
1124        PrimitiveType::Date => make_ree(DataType::Date32),
1125        PrimitiveType::Time => make_ree(DataType::Int64),
1126        PrimitiveType::Timestamp => make_ree(DataType::Int64),
1127        PrimitiveType::Timestamptz => make_ree(DataType::Int64),
1128        PrimitiveType::TimestampNs => make_ree(DataType::Int64),
1129        PrimitiveType::TimestamptzNs => make_ree(DataType::Int64),
1130        PrimitiveType::String => make_ree(DataType::Utf8),
1131        PrimitiveType::Uuid => make_ree(DataType::Binary),
1132        PrimitiveType::Fixed(_) => make_ree(DataType::Binary),
1133        PrimitiveType::Binary => make_ree(DataType::Binary),
1134        PrimitiveType::Decimal { precision, scale } => {
1135            make_ree(DataType::Decimal128(*precision as u8, *scale as i8))
1136        }
1137    }
1138}
1139
1140/// A visitor that strips metadata from an Arrow schema.
1141///
1142/// This visitor recursively removes all metadata from fields at every level of the schema,
1143/// including nested struct, list, and map fields. This is useful for schema comparison
1144/// where metadata differences should be ignored.
1145struct MetadataStripVisitor {
1146    /// Stack to track field information during traversal
1147    field_stack: Vec<Field>,
1148}
1149
1150impl MetadataStripVisitor {
1151    fn new() -> Self {
1152        Self {
1153            field_stack: Vec::new(),
1154        }
1155    }
1156}
1157
1158impl ArrowSchemaVisitor for MetadataStripVisitor {
1159    type T = Field;
1160    type U = ArrowSchema;
1161
1162    fn before_field(&mut self, field: &Field) -> Result<()> {
1163        // Store field name and nullability for later reconstruction
1164        self.field_stack.push(Field::new(
1165            field.name(),
1166            DataType::Null, // Placeholder, will be replaced
1167            field.is_nullable(),
1168        ));
1169        Ok(())
1170    }
1171
1172    fn after_field(&mut self, _field: &Field) -> Result<()> {
1173        Ok(())
1174    }
1175
1176    fn schema(&mut self, _schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U> {
1177        Ok(ArrowSchema::new(values))
1178    }
1179
1180    fn r#struct(&mut self, _fields: &Fields, results: Vec<Self::T>) -> Result<Self::T> {
1181        // Pop the struct field from the stack
1182        let field_info = self
1183            .field_stack
1184            .pop()
1185            .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Field stack underflow in struct"))?;
1186
1187        // Reconstruct struct field without metadata
1188        Ok(Field::new(
1189            field_info.name(),
1190            DataType::Struct(Fields::from(results)),
1191            field_info.is_nullable(),
1192        ))
1193    }
1194
1195    fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T> {
1196        // Pop the list field from the stack
1197        let field_info = self
1198            .field_stack
1199            .pop()
1200            .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Field stack underflow in list"))?;
1201
1202        // Reconstruct list field without metadata
1203        let list_type = match list {
1204            DataType::List(_) => DataType::List(Arc::new(value)),
1205            DataType::LargeList(_) => DataType::LargeList(Arc::new(value)),
1206            DataType::FixedSizeList(_, size) => DataType::FixedSizeList(Arc::new(value), *size),
1207            _ => {
1208                return Err(Error::new(
1209                    ErrorKind::Unexpected,
1210                    format!("Expected list type, got {list}"),
1211                ));
1212            }
1213        };
1214
1215        Ok(Field::new(
1216            field_info.name(),
1217            list_type,
1218            field_info.is_nullable(),
1219        ))
1220    }
1221
1222    fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result<Self::T> {
1223        // Pop the map field from the stack
1224        let field_info = self
1225            .field_stack
1226            .pop()
1227            .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Field stack underflow in map"))?;
1228
1229        // Reconstruct the map's struct field (contains key and value)
1230        let struct_field = Field::new(
1231            DEFAULT_MAP_FIELD_NAME,
1232            DataType::Struct(Fields::from(vec![key_value, value])),
1233            false,
1234        );
1235
1236        // Get the sorted flag from the original map type
1237        let sorted = match map {
1238            DataType::Map(_, sorted) => *sorted,
1239            _ => {
1240                return Err(Error::new(
1241                    ErrorKind::Unexpected,
1242                    format!("Expected map type, got {map}"),
1243                ));
1244            }
1245        };
1246
1247        // Reconstruct map field without metadata
1248        Ok(Field::new(
1249            field_info.name(),
1250            DataType::Map(Arc::new(struct_field), sorted),
1251            field_info.is_nullable(),
1252        ))
1253    }
1254
1255    fn primitive(&mut self, p: &DataType) -> Result<Self::T> {
1256        // Pop the primitive field from the stack
1257        let field_info = self.field_stack.pop().ok_or_else(|| {
1258            Error::new(ErrorKind::Unexpected, "Field stack underflow in primitive")
1259        })?;
1260
1261        // Return field without metadata
1262        Ok(Field::new(
1263            field_info.name(),
1264            p.clone(),
1265            field_info.is_nullable(),
1266        ))
1267    }
1268}
1269
1270/// Strips all metadata from an Arrow schema and its nested fields.
1271///
1272/// This function recursively removes metadata from all fields at every level of the schema,
1273/// including nested struct, list, and map fields. This is useful for schema comparison
1274/// where metadata differences should be ignored.
1275///
1276/// # Arguments
1277/// * `schema` - The Arrow schema to strip metadata from
1278///
1279/// # Returns
1280/// A new Arrow schema with all metadata removed, or an error if the schema structure
1281/// is invalid.
1282///
1283/// # Example
1284/// ```
1285/// use std::collections::HashMap;
1286///
1287/// use arrow_schema::{DataType, Field, Schema as ArrowSchema};
1288/// use iceberg::arrow::strip_metadata_from_schema;
1289///
1290/// let mut metadata = HashMap::new();
1291/// metadata.insert("key".to_string(), "value".to_string());
1292///
1293/// let field = Field::new("col1", DataType::Int32, false).with_metadata(metadata);
1294/// let schema = ArrowSchema::new(vec![field]);
1295///
1296/// let stripped = strip_metadata_from_schema(&schema).unwrap();
1297/// assert!(stripped.field(0).metadata().is_empty());
1298/// ```
1299pub fn strip_metadata_from_schema(schema: &ArrowSchema) -> Result<ArrowSchema> {
1300    let mut visitor = MetadataStripVisitor::new();
1301    visit_schema(schema, &mut visitor)
1302}
1303
1304#[cfg(test)]
1305mod tests {
1306    use std::collections::HashMap;
1307    use std::sync::Arc;
1308
1309    use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1310
1311    use super::*;
1312    use crate::spec::decimal_utils::decimal_new;
1313    use crate::spec::{Literal, Schema};
1314
1315    /// Create a simple field with metadata.
1316    fn simple_field(name: &str, ty: DataType, nullable: bool, value: &str) -> Field {
1317        Field::new(name, ty, nullable).with_metadata(HashMap::from([(
1318            PARQUET_FIELD_ID_META_KEY.to_string(),
1319            value.to_string(),
1320        )]))
1321    }
1322
1323    fn arrow_schema_for_arrow_schema_to_schema_test() -> ArrowSchema {
1324        let fields = Fields::from(vec![
1325            simple_field("key", DataType::Int32, false, "28"),
1326            simple_field("value", DataType::Utf8, true, "29"),
1327        ]);
1328
1329        let r#struct = DataType::Struct(fields);
1330        let map = DataType::Map(
1331            Arc::new(simple_field(DEFAULT_MAP_FIELD_NAME, r#struct, false, "17")),
1332            false,
1333        );
1334        let dictionary = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1335
1336        let fields = Fields::from(vec![
1337            simple_field("aa", DataType::Int32, false, "18"),
1338            simple_field("bb", DataType::Utf8, true, "19"),
1339            simple_field(
1340                "cc",
1341                DataType::Timestamp(TimeUnit::Microsecond, None),
1342                false,
1343                "20",
1344            ),
1345        ]);
1346
1347        let r#struct = DataType::Struct(fields);
1348
1349        ArrowSchema::new(vec![
1350            simple_field("a", DataType::Int32, false, "2"),
1351            simple_field("b", DataType::Int64, false, "1"),
1352            simple_field("c", DataType::Utf8, false, "3"),
1353            simple_field("n", DataType::Utf8, false, "21"),
1354            simple_field(
1355                "d",
1356                DataType::Timestamp(TimeUnit::Microsecond, None),
1357                true,
1358                "4",
1359            ),
1360            simple_field("e", DataType::Boolean, true, "6"),
1361            simple_field("f", DataType::Float32, false, "5"),
1362            simple_field("g", DataType::Float64, false, "7"),
1363            simple_field("p", DataType::Decimal128(10, 2), false, "27"),
1364            simple_field("h", DataType::Date32, false, "8"),
1365            simple_field("i", DataType::Time64(TimeUnit::Microsecond), false, "9"),
1366            simple_field(
1367                "j",
1368                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1369                false,
1370                "10",
1371            ),
1372            simple_field(
1373                "k",
1374                DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1375                false,
1376                "12",
1377            ),
1378            simple_field("l", DataType::Binary, false, "13"),
1379            simple_field("o", DataType::LargeBinary, false, "22"),
1380            simple_field("m", DataType::FixedSizeBinary(10), false, "11"),
1381            simple_field(
1382                "list",
1383                DataType::List(Arc::new(simple_field(
1384                    "element",
1385                    DataType::Int32,
1386                    false,
1387                    "15",
1388                ))),
1389                true,
1390                "14",
1391            ),
1392            simple_field(
1393                "large_list",
1394                DataType::LargeList(Arc::new(simple_field(
1395                    "element",
1396                    DataType::Utf8,
1397                    false,
1398                    "23",
1399                ))),
1400                true,
1401                "24",
1402            ),
1403            simple_field(
1404                "fixed_list",
1405                DataType::FixedSizeList(
1406                    Arc::new(simple_field("element", DataType::Binary, false, "26")),
1407                    10,
1408                ),
1409                true,
1410                "25",
1411            ),
1412            simple_field("map", map, false, "16"),
1413            simple_field("struct", r#struct, false, "17"),
1414            simple_field("dictionary", dictionary, false, "30"),
1415        ])
1416    }
1417
1418    fn iceberg_schema_for_arrow_schema_to_schema_test() -> Schema {
1419        let schema_json = r#"{
1420            "type":"struct",
1421            "schema-id":0,
1422            "fields":[
1423                {
1424                    "id":2,
1425                    "name":"a",
1426                    "required":true,
1427                    "type":"int"
1428                },
1429                {
1430                    "id":1,
1431                    "name":"b",
1432                    "required":true,
1433                    "type":"long"
1434                },
1435                {
1436                    "id":3,
1437                    "name":"c",
1438                    "required":true,
1439                    "type":"string"
1440                },
1441                {
1442                    "id":21,
1443                    "name":"n",
1444                    "required":true,
1445                    "type":"string"
1446                },
1447                {
1448                    "id":4,
1449                    "name":"d",
1450                    "required":false,
1451                    "type":"timestamp"
1452                },
1453                {
1454                    "id":6,
1455                    "name":"e",
1456                    "required":false,
1457                    "type":"boolean"
1458                },
1459                {
1460                    "id":5,
1461                    "name":"f",
1462                    "required":true,
1463                    "type":"float"
1464                },
1465                {
1466                    "id":7,
1467                    "name":"g",
1468                    "required":true,
1469                    "type":"double"
1470                },
1471                {
1472                    "id":27,
1473                    "name":"p",
1474                    "required":true,
1475                    "type":"decimal(10,2)"
1476                },
1477                {
1478                    "id":8,
1479                    "name":"h",
1480                    "required":true,
1481                    "type":"date"
1482                },
1483                {
1484                    "id":9,
1485                    "name":"i",
1486                    "required":true,
1487                    "type":"time"
1488                },
1489                {
1490                    "id":10,
1491                    "name":"j",
1492                    "required":true,
1493                    "type":"timestamptz"
1494                },
1495                {
1496                    "id":12,
1497                    "name":"k",
1498                    "required":true,
1499                    "type":"timestamptz"
1500                },
1501                {
1502                    "id":13,
1503                    "name":"l",
1504                    "required":true,
1505                    "type":"binary"
1506                },
1507                {
1508                    "id":22,
1509                    "name":"o",
1510                    "required":true,
1511                    "type":"binary"
1512                },
1513                {
1514                    "id":11,
1515                    "name":"m",
1516                    "required":true,
1517                    "type":"fixed[10]"
1518                },
1519                {
1520                    "id":14,
1521                    "name":"list",
1522                    "required": false,
1523                    "type": {
1524                        "type": "list",
1525                        "element-id": 15,
1526                        "element-required": true,
1527                        "element": "int"
1528                    }
1529                },
1530                {
1531                    "id":24,
1532                    "name":"large_list",
1533                    "required": false,
1534                    "type": {
1535                        "type": "list",
1536                        "element-id": 23,
1537                        "element-required": true,
1538                        "element": "string"
1539                    }
1540                },
1541                {
1542                    "id":25,
1543                    "name":"fixed_list",
1544                    "required": false,
1545                    "type": {
1546                        "type": "list",
1547                        "element-id": 26,
1548                        "element-required": true,
1549                        "element": "binary"
1550                    }
1551                },
1552                {
1553                    "id":16,
1554                    "name":"map",
1555                    "required": true,
1556                    "type": {
1557                        "type": "map",
1558                        "key-id": 28,
1559                        "key": "int",
1560                        "value-id": 29,
1561                        "value-required": false,
1562                        "value": "string"
1563                    }
1564                },
1565                {
1566                    "id":17,
1567                    "name":"struct",
1568                    "required": true,
1569                    "type": {
1570                        "type": "struct",
1571                        "fields": [
1572                            {
1573                                "id":18,
1574                                "name":"aa",
1575                                "required":true,
1576                                "type":"int"
1577                            },
1578                            {
1579                                "id":19,
1580                                "name":"bb",
1581                                "required":false,
1582                                "type":"string"
1583                            },
1584                            {
1585                                "id":20,
1586                                "name":"cc",
1587                                "required":true,
1588                                "type":"timestamp"
1589                            }
1590                        ]
1591                    }
1592                },
1593                {
1594                    "id":30,
1595                    "name":"dictionary",
1596                    "required":true,
1597                    "type":"string"
1598                }
1599            ],
1600            "identifier-field-ids":[]
1601        }"#;
1602
1603        let schema: Schema = serde_json::from_str(schema_json).unwrap();
1604        schema
1605    }
1606
1607    #[test]
1608    fn test_arrow_schema_to_schema() {
1609        let arrow_schema = arrow_schema_for_arrow_schema_to_schema_test();
1610        let schema = iceberg_schema_for_arrow_schema_to_schema_test();
1611        let converted_schema = arrow_schema_to_schema(&arrow_schema).unwrap();
1612        pretty_assertions::assert_eq!(converted_schema, schema);
1613    }
1614
1615    fn arrow_schema_for_schema_to_arrow_schema_test() -> ArrowSchema {
1616        let fields = Fields::from(vec![
1617            simple_field("key", DataType::Int32, false, "28"),
1618            simple_field("value", DataType::Utf8, true, "29"),
1619        ]);
1620
1621        let r#struct = DataType::Struct(fields);
1622        let map = DataType::Map(
1623            Arc::new(Field::new(DEFAULT_MAP_FIELD_NAME, r#struct, false)),
1624            false,
1625        );
1626
1627        let fields = Fields::from(vec![
1628            simple_field("aa", DataType::Int32, false, "18"),
1629            simple_field("bb", DataType::Utf8, true, "19"),
1630            simple_field(
1631                "cc",
1632                DataType::Timestamp(TimeUnit::Microsecond, None),
1633                false,
1634                "20",
1635            ),
1636        ]);
1637
1638        let r#struct = DataType::Struct(fields);
1639
1640        ArrowSchema::new(vec![
1641            simple_field("a", DataType::Int32, false, "2"),
1642            simple_field("b", DataType::Int64, false, "1"),
1643            simple_field("c", DataType::Utf8, false, "3"),
1644            simple_field("n", DataType::Utf8, false, "21"),
1645            simple_field(
1646                "d",
1647                DataType::Timestamp(TimeUnit::Microsecond, None),
1648                true,
1649                "4",
1650            ),
1651            simple_field("e", DataType::Boolean, true, "6"),
1652            simple_field("f", DataType::Float32, false, "5"),
1653            simple_field("g", DataType::Float64, false, "7"),
1654            simple_field("p", DataType::Decimal128(10, 2), false, "27"),
1655            simple_field("h", DataType::Date32, false, "8"),
1656            simple_field("i", DataType::Time64(TimeUnit::Microsecond), false, "9"),
1657            simple_field(
1658                "j",
1659                DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1660                false,
1661                "10",
1662            ),
1663            simple_field(
1664                "k",
1665                DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1666                false,
1667                "12",
1668            ),
1669            simple_field("l", DataType::LargeBinary, false, "13"),
1670            simple_field("o", DataType::LargeBinary, false, "22"),
1671            simple_field("m", DataType::FixedSizeBinary(10), false, "11"),
1672            simple_field(
1673                "list",
1674                DataType::List(Arc::new(simple_field(
1675                    "element",
1676                    DataType::Int32,
1677                    false,
1678                    "15",
1679                ))),
1680                true,
1681                "14",
1682            ),
1683            simple_field(
1684                "large_list",
1685                DataType::List(Arc::new(simple_field(
1686                    "element",
1687                    DataType::Utf8,
1688                    false,
1689                    "23",
1690                ))),
1691                true,
1692                "24",
1693            ),
1694            simple_field(
1695                "fixed_list",
1696                DataType::List(Arc::new(simple_field(
1697                    "element",
1698                    DataType::LargeBinary,
1699                    false,
1700                    "26",
1701                ))),
1702                true,
1703                "25",
1704            ),
1705            simple_field("map", map, false, "16"),
1706            simple_field("struct", r#struct, false, "17"),
1707            simple_field("uuid", DataType::FixedSizeBinary(16), false, "30"),
1708        ])
1709    }
1710
1711    fn iceberg_schema_for_schema_to_arrow_schema() -> Schema {
1712        let schema_json = r#"{
1713            "type":"struct",
1714            "schema-id":0,
1715            "fields":[
1716                {
1717                    "id":2,
1718                    "name":"a",
1719                    "required":true,
1720                    "type":"int"
1721                },
1722                {
1723                    "id":1,
1724                    "name":"b",
1725                    "required":true,
1726                    "type":"long"
1727                },
1728                {
1729                    "id":3,
1730                    "name":"c",
1731                    "required":true,
1732                    "type":"string"
1733                },
1734                {
1735                    "id":21,
1736                    "name":"n",
1737                    "required":true,
1738                    "type":"string"
1739                },
1740                {
1741                    "id":4,
1742                    "name":"d",
1743                    "required":false,
1744                    "type":"timestamp"
1745                },
1746                {
1747                    "id":6,
1748                    "name":"e",
1749                    "required":false,
1750                    "type":"boolean"
1751                },
1752                {
1753                    "id":5,
1754                    "name":"f",
1755                    "required":true,
1756                    "type":"float"
1757                },
1758                {
1759                    "id":7,
1760                    "name":"g",
1761                    "required":true,
1762                    "type":"double"
1763                },
1764                {
1765                    "id":27,
1766                    "name":"p",
1767                    "required":true,
1768                    "type":"decimal(10,2)"
1769                },
1770                {
1771                    "id":8,
1772                    "name":"h",
1773                    "required":true,
1774                    "type":"date"
1775                },
1776                {
1777                    "id":9,
1778                    "name":"i",
1779                    "required":true,
1780                    "type":"time"
1781                },
1782                {
1783                    "id":10,
1784                    "name":"j",
1785                    "required":true,
1786                    "type":"timestamptz"
1787                },
1788                {
1789                    "id":12,
1790                    "name":"k",
1791                    "required":true,
1792                    "type":"timestamptz"
1793                },
1794                {
1795                    "id":13,
1796                    "name":"l",
1797                    "required":true,
1798                    "type":"binary"
1799                },
1800                {
1801                    "id":22,
1802                    "name":"o",
1803                    "required":true,
1804                    "type":"binary"
1805                },
1806                {
1807                    "id":11,
1808                    "name":"m",
1809                    "required":true,
1810                    "type":"fixed[10]"
1811                },
1812                {
1813                    "id":14,
1814                    "name":"list",
1815                    "required": false,
1816                    "type": {
1817                        "type": "list",
1818                        "element-id": 15,
1819                        "element-required": true,
1820                        "element": "int"
1821                    }
1822                },
1823                {
1824                    "id":24,
1825                    "name":"large_list",
1826                    "required": false,
1827                    "type": {
1828                        "type": "list",
1829                        "element-id": 23,
1830                        "element-required": true,
1831                        "element": "string"
1832                    }
1833                },
1834                {
1835                    "id":25,
1836                    "name":"fixed_list",
1837                    "required": false,
1838                    "type": {
1839                        "type": "list",
1840                        "element-id": 26,
1841                        "element-required": true,
1842                        "element": "binary"
1843                    }
1844                },
1845                {
1846                    "id":16,
1847                    "name":"map",
1848                    "required": true,
1849                    "type": {
1850                        "type": "map",
1851                        "key-id": 28,
1852                        "key": "int",
1853                        "value-id": 29,
1854                        "value-required": false,
1855                        "value": "string"
1856                    }
1857                },
1858                {
1859                    "id":17,
1860                    "name":"struct",
1861                    "required": true,
1862                    "type": {
1863                        "type": "struct",
1864                        "fields": [
1865                            {
1866                                "id":18,
1867                                "name":"aa",
1868                                "required":true,
1869                                "type":"int"
1870                            },
1871                            {
1872                                "id":19,
1873                                "name":"bb",
1874                                "required":false,
1875                                "type":"string"
1876                            },
1877                            {
1878                                "id":20,
1879                                "name":"cc",
1880                                "required":true,
1881                                "type":"timestamp"
1882                            }
1883                        ]
1884                    }
1885                },
1886                {
1887                    "id":30,
1888                    "name":"uuid",
1889                    "required":true,
1890                    "type":"uuid"
1891                }
1892            ],
1893            "identifier-field-ids":[]
1894        }"#;
1895
1896        let schema: Schema = serde_json::from_str(schema_json).unwrap();
1897        schema
1898    }
1899
1900    #[test]
1901    fn test_schema_to_arrow_schema() {
1902        let arrow_schema = arrow_schema_for_schema_to_arrow_schema_test();
1903        let schema = iceberg_schema_for_schema_to_arrow_schema();
1904        let converted_arrow_schema = schema_to_arrow_schema(&schema).unwrap();
1905        assert_eq!(converted_arrow_schema, arrow_schema);
1906    }
1907
1908    #[test]
1909    fn test_type_conversion() {
1910        // test primitive type
1911        {
1912            let arrow_type = DataType::Int32;
1913            let iceberg_type = Type::Primitive(PrimitiveType::Int);
1914            assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1915            assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1916        }
1917
1918        // test struct type
1919        {
1920            // no metadata will cause error
1921            let arrow_type = DataType::Struct(Fields::from(vec![
1922                Field::new("a", DataType::Int64, false),
1923                Field::new("b", DataType::Utf8, true),
1924            ]));
1925            assert_eq!(
1926                &arrow_type_to_type(&arrow_type).unwrap_err().to_string(),
1927                "DataInvalid => Field id not found in metadata"
1928            );
1929
1930            let arrow_type = DataType::Struct(Fields::from(vec![
1931                Field::new("a", DataType::Int64, false).with_metadata(HashMap::from_iter([(
1932                    PARQUET_FIELD_ID_META_KEY.to_string(),
1933                    1.to_string(),
1934                )])),
1935                Field::new("b", DataType::Utf8, true).with_metadata(HashMap::from_iter([(
1936                    PARQUET_FIELD_ID_META_KEY.to_string(),
1937                    2.to_string(),
1938                )])),
1939            ]));
1940            let iceberg_type = Type::Struct(StructType::new(vec![
1941                NestedField {
1942                    id: 1,
1943                    doc: None,
1944                    name: "a".to_string(),
1945                    required: true,
1946                    field_type: Box::new(Type::Primitive(PrimitiveType::Long)),
1947                    initial_default: None,
1948                    write_default: None,
1949                }
1950                .into(),
1951                NestedField {
1952                    id: 2,
1953                    doc: None,
1954                    name: "b".to_string(),
1955                    required: false,
1956                    field_type: Box::new(Type::Primitive(PrimitiveType::String)),
1957                    initial_default: None,
1958                    write_default: None,
1959                }
1960                .into(),
1961            ]));
1962            assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1963            assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1964
1965            // initial_default and write_default is ignored
1966            let iceberg_type = Type::Struct(StructType::new(vec![
1967                NestedField {
1968                    id: 1,
1969                    doc: None,
1970                    name: "a".to_string(),
1971                    required: true,
1972                    field_type: Box::new(Type::Primitive(PrimitiveType::Long)),
1973                    initial_default: Some(Literal::Primitive(PrimitiveLiteral::Int(114514))),
1974                    write_default: None,
1975                }
1976                .into(),
1977                NestedField {
1978                    id: 2,
1979                    doc: None,
1980                    name: "b".to_string(),
1981                    required: false,
1982                    field_type: Box::new(Type::Primitive(PrimitiveType::String)),
1983                    initial_default: None,
1984                    write_default: Some(Literal::Primitive(PrimitiveLiteral::String(
1985                        "514".to_string(),
1986                    ))),
1987                }
1988                .into(),
1989            ]));
1990            assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1991        }
1992
1993        // test dictionary type
1994        {
1995            let arrow_type =
1996                DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int8));
1997            let iceberg_type = Type::Primitive(PrimitiveType::Int);
1998            assert_eq!(
1999                iceberg_type,
2000                arrow_type_to_type(&arrow_type).unwrap(),
2001                "Expected dictionary conversion to use the contained value"
2002            );
2003
2004            let arrow_type =
2005                DataType::Dictionary(Box::new(DataType::Utf8), Box::new(DataType::Boolean));
2006            let iceberg_type = Type::Primitive(PrimitiveType::Boolean);
2007            assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
2008        }
2009    }
2010
2011    #[test]
2012    fn test_unsigned_integer_type_conversion() {
2013        let test_cases = vec![
2014            (DataType::UInt8, PrimitiveType::Int),
2015            (DataType::UInt16, PrimitiveType::Int),
2016            (DataType::UInt32, PrimitiveType::Long),
2017        ];
2018
2019        for (arrow_type, expected_iceberg_type) in test_cases {
2020            let arrow_field = Field::new("test", arrow_type.clone(), false).with_metadata(
2021                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
2022            );
2023            let arrow_schema = ArrowSchema::new(vec![arrow_field]);
2024
2025            let iceberg_schema = arrow_schema_to_schema(&arrow_schema).unwrap();
2026            let iceberg_field = iceberg_schema.as_struct().fields().first().unwrap();
2027
2028            assert!(
2029                matches!(iceberg_field.field_type.as_ref(), Type::Primitive(t) if *t == expected_iceberg_type),
2030                "Expected {arrow_type:?} to map to {expected_iceberg_type:?}"
2031            );
2032        }
2033
2034        // Test UInt64 blocking
2035        {
2036            let arrow_field = Field::new("test", DataType::UInt64, false).with_metadata(
2037                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
2038            );
2039            let arrow_schema = ArrowSchema::new(vec![arrow_field]);
2040
2041            let result = arrow_schema_to_schema(&arrow_schema);
2042            assert!(result.is_err());
2043            assert!(
2044                result
2045                    .unwrap_err()
2046                    .to_string()
2047                    .contains("UInt64 is not supported")
2048            );
2049        }
2050    }
2051
2052    #[test]
2053    fn test_datum_conversion() {
2054        {
2055            let datum = Datum::bool(true);
2056            let arrow_datum = get_arrow_datum(&datum).unwrap();
2057            let (array, is_scalar) = arrow_datum.get();
2058            let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
2059            assert!(is_scalar);
2060            assert!(array.value(0));
2061        }
2062        {
2063            let datum = Datum::int(42);
2064            let arrow_datum = get_arrow_datum(&datum).unwrap();
2065            let (array, is_scalar) = arrow_datum.get();
2066            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
2067            assert!(is_scalar);
2068            assert_eq!(array.value(0), 42);
2069        }
2070        {
2071            let datum = Datum::long(42);
2072            let arrow_datum = get_arrow_datum(&datum).unwrap();
2073            let (array, is_scalar) = arrow_datum.get();
2074            let array = array.as_any().downcast_ref::<Int64Array>().unwrap();
2075            assert!(is_scalar);
2076            assert_eq!(array.value(0), 42);
2077        }
2078        {
2079            let datum = Datum::float(42.42);
2080            let arrow_datum = get_arrow_datum(&datum).unwrap();
2081            let (array, is_scalar) = arrow_datum.get();
2082            let array = array.as_any().downcast_ref::<Float32Array>().unwrap();
2083            assert!(is_scalar);
2084            assert_eq!(array.value(0), 42.42);
2085        }
2086        {
2087            let datum = Datum::double(42.42);
2088            let arrow_datum = get_arrow_datum(&datum).unwrap();
2089            let (array, is_scalar) = arrow_datum.get();
2090            let array = array.as_any().downcast_ref::<Float64Array>().unwrap();
2091            assert!(is_scalar);
2092            assert_eq!(array.value(0), 42.42);
2093        }
2094        {
2095            let datum = Datum::string("abc");
2096            let arrow_datum = get_arrow_datum(&datum).unwrap();
2097            let (array, is_scalar) = arrow_datum.get();
2098            let array = array.as_any().downcast_ref::<StringArray>().unwrap();
2099            assert!(is_scalar);
2100            assert_eq!(array.value(0), "abc");
2101        }
2102        {
2103            let datum = Datum::binary(vec![1, 2, 3, 4]);
2104            let arrow_datum = get_arrow_datum(&datum).unwrap();
2105            let (array, is_scalar) = arrow_datum.get();
2106            let array = array.as_any().downcast_ref::<BinaryArray>().unwrap();
2107            assert!(is_scalar);
2108            assert_eq!(array.value(0), &[1, 2, 3, 4]);
2109        }
2110        {
2111            let datum = Datum::date(42);
2112            let arrow_datum = get_arrow_datum(&datum).unwrap();
2113            let (array, is_scalar) = arrow_datum.get();
2114            let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
2115            assert!(is_scalar);
2116            assert_eq!(array.value(0), 42);
2117        }
2118        {
2119            let datum = Datum::timestamp_micros(42);
2120            let arrow_datum = get_arrow_datum(&datum).unwrap();
2121            let (array, is_scalar) = arrow_datum.get();
2122            let array = array
2123                .as_any()
2124                .downcast_ref::<TimestampMicrosecondArray>()
2125                .unwrap();
2126            assert!(is_scalar);
2127            assert_eq!(array.value(0), 42);
2128        }
2129        {
2130            let datum = Datum::timestamptz_micros(42);
2131            let arrow_datum = get_arrow_datum(&datum).unwrap();
2132            let (array, is_scalar) = arrow_datum.get();
2133            let array = array
2134                .as_any()
2135                .downcast_ref::<TimestampMicrosecondArray>()
2136                .unwrap();
2137            assert!(is_scalar);
2138            assert_eq!(array.timezone(), Some("+00:00"));
2139            assert_eq!(array.value(0), 42);
2140        }
2141        {
2142            let datum = Datum::decimal_with_precision(decimal_new(123, 2), 30).unwrap();
2143            let arrow_datum = get_arrow_datum(&datum).unwrap();
2144            let (array, is_scalar) = arrow_datum.get();
2145            let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
2146            assert!(is_scalar);
2147            assert_eq!(array.precision(), 30);
2148            assert_eq!(array.scale(), 2);
2149            assert_eq!(array.value(0), 123);
2150        }
2151        {
2152            let datum = Datum::uuid_from_str("42424242-4242-4242-4242-424242424242").unwrap();
2153            let arrow_datum = get_arrow_datum(&datum).unwrap();
2154            let (array, is_scalar) = arrow_datum.get();
2155            let array = array
2156                .as_any()
2157                .downcast_ref::<FixedSizeBinaryArray>()
2158                .unwrap();
2159            assert!(is_scalar);
2160            assert_eq!(array.value(0), [66u8; 16]);
2161        }
2162        {
2163            let datum = Datum::fixed(vec![1u8, 2, 3, 4, 5, 6, 7, 8]);
2164            let arrow_datum = get_arrow_datum(&datum).unwrap();
2165            let (array, is_scalar) = arrow_datum.get();
2166            let array = array
2167                .as_any()
2168                .downcast_ref::<FixedSizeBinaryArray>()
2169                .unwrap();
2170            assert!(is_scalar);
2171            assert_eq!(array.value_length(), 8);
2172            assert_eq!(array.value(0), &[1u8, 2, 3, 4, 5, 6, 7, 8]);
2173        }
2174    }
2175
2176    #[test]
2177    fn test_arrow_schema_to_schema_with_field_id() {
2178        // Create a complex Arrow schema without field ID metadata
2179        // Including: primitives, list, nested struct, map, and nested list of structs
2180        let arrow_schema = ArrowSchema::new(vec![
2181            Field::new("id", DataType::Int64, false),
2182            Field::new("name", DataType::Utf8, true),
2183            Field::new("price", DataType::Decimal128(10, 2), false),
2184            Field::new(
2185                "created_at",
2186                DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2187                true,
2188            ),
2189            Field::new(
2190                "tags",
2191                DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2192                true,
2193            ),
2194            Field::new(
2195                "address",
2196                DataType::Struct(Fields::from(vec![
2197                    Field::new("street", DataType::Utf8, true),
2198                    Field::new("city", DataType::Utf8, false),
2199                    Field::new("zip", DataType::Int32, true),
2200                ])),
2201                true,
2202            ),
2203            Field::new(
2204                "attributes",
2205                DataType::Map(
2206                    Arc::new(Field::new(
2207                        DEFAULT_MAP_FIELD_NAME,
2208                        DataType::Struct(Fields::from(vec![
2209                            Field::new("key", DataType::Utf8, false),
2210                            Field::new("value", DataType::Utf8, true),
2211                        ])),
2212                        false,
2213                    )),
2214                    false,
2215                ),
2216                true,
2217            ),
2218            Field::new(
2219                "orders",
2220                DataType::List(Arc::new(Field::new(
2221                    "element",
2222                    DataType::Struct(Fields::from(vec![
2223                        Field::new("order_id", DataType::Int64, false),
2224                        Field::new("amount", DataType::Float64, false),
2225                    ])),
2226                    true,
2227                ))),
2228                true,
2229            ),
2230        ]);
2231
2232        let schema = arrow_schema_to_schema_auto_assign_ids(&arrow_schema).unwrap();
2233
2234        // Build expected schema with exact field IDs following level-order assignment:
2235        // Level 0: id=1, name=2, price=3, created_at=4, tags=5, address=6, attributes=7, orders=8
2236        // Level 1: tags.element=9, address.{street=10,city=11,zip=12}, attributes.{key=13,value=14}, orders.element=15
2237        // Level 2: orders.element.{order_id=16,amount=17}
2238        let expected = Schema::builder()
2239            .with_fields(vec![
2240                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
2241                NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
2242                NestedField::required(
2243                    3,
2244                    "price",
2245                    Type::Primitive(PrimitiveType::Decimal {
2246                        precision: 10,
2247                        scale: 2,
2248                    }),
2249                )
2250                .into(),
2251                NestedField::optional(4, "created_at", Type::Primitive(PrimitiveType::Timestamptz))
2252                    .into(),
2253                NestedField::optional(
2254                    5,
2255                    "tags",
2256                    Type::List(ListType {
2257                        element_field: NestedField::list_element(
2258                            9,
2259                            Type::Primitive(PrimitiveType::String),
2260                            false,
2261                        )
2262                        .into(),
2263                    }),
2264                )
2265                .into(),
2266                NestedField::optional(
2267                    6,
2268                    "address",
2269                    Type::Struct(StructType::new(vec![
2270                        NestedField::optional(10, "street", Type::Primitive(PrimitiveType::String))
2271                            .into(),
2272                        NestedField::required(11, "city", Type::Primitive(PrimitiveType::String))
2273                            .into(),
2274                        NestedField::optional(12, "zip", Type::Primitive(PrimitiveType::Int))
2275                            .into(),
2276                    ])),
2277                )
2278                .into(),
2279                NestedField::optional(
2280                    7,
2281                    "attributes",
2282                    Type::Map(MapType {
2283                        key_field: NestedField::map_key_element(
2284                            13,
2285                            Type::Primitive(PrimitiveType::String),
2286                        )
2287                        .into(),
2288                        value_field: NestedField::map_value_element(
2289                            14,
2290                            Type::Primitive(PrimitiveType::String),
2291                            false,
2292                        )
2293                        .into(),
2294                    }),
2295                )
2296                .into(),
2297                NestedField::optional(
2298                    8,
2299                    "orders",
2300                    Type::List(ListType {
2301                        element_field: NestedField::list_element(
2302                            15,
2303                            Type::Struct(StructType::new(vec![
2304                                NestedField::required(
2305                                    16,
2306                                    "order_id",
2307                                    Type::Primitive(PrimitiveType::Long),
2308                                )
2309                                .into(),
2310                                NestedField::required(
2311                                    17,
2312                                    "amount",
2313                                    Type::Primitive(PrimitiveType::Double),
2314                                )
2315                                .into(),
2316                            ])),
2317                            false,
2318                        )
2319                        .into(),
2320                    }),
2321                )
2322                .into(),
2323            ])
2324            .build()
2325            .unwrap();
2326
2327        pretty_assertions::assert_eq!(schema, expected);
2328        assert_eq!(schema.highest_field_id(), 17);
2329    }
2330}