iceberg/arrow/
schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Conversion between Arrow schema and Iceberg schema.
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use arrow_array::types::{Decimal128Type, validate_decimal_precision_and_scale};
24use arrow_array::{
25    BinaryArray, BooleanArray, Date32Array, Datum as ArrowDatum, Decimal128Array,
26    FixedSizeBinaryArray, Float32Array, Float64Array, Int32Array, Int64Array, Scalar, StringArray,
27    TimestampMicrosecondArray, TimestampNanosecondArray,
28};
29use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
30use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
31use parquet::file::statistics::Statistics;
32use uuid::Uuid;
33
34use crate::error::Result;
35use crate::spec::decimal_utils::i128_from_be_bytes;
36use crate::spec::{
37    Datum, FIRST_FIELD_ID, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral,
38    PrimitiveType, Schema, SchemaVisitor, StructType, Type,
39};
40use crate::{Error, ErrorKind};
41
42/// When iceberg map type convert to Arrow map type, the default map field name is "key_value".
43pub const DEFAULT_MAP_FIELD_NAME: &str = "key_value";
44/// UTC time zone for Arrow timestamp type.
45pub const UTC_TIME_ZONE: &str = "+00:00";
46
47/// A post order arrow schema visitor.
48///
49/// For order of methods called, please refer to [`visit_schema`].
50pub trait ArrowSchemaVisitor {
51    /// Return type of this visitor on arrow field.
52    type T;
53
54    /// Return type of this visitor on arrow schema.
55    type U;
56
57    /// Called before struct/list/map field.
58    fn before_field(&mut self, _field: &Field) -> Result<()> {
59        Ok(())
60    }
61
62    /// Called after struct/list/map field.
63    fn after_field(&mut self, _field: &Field) -> Result<()> {
64        Ok(())
65    }
66
67    /// Called before list element.
68    fn before_list_element(&mut self, _field: &Field) -> Result<()> {
69        Ok(())
70    }
71
72    /// Called after list element.
73    fn after_list_element(&mut self, _field: &Field) -> Result<()> {
74        Ok(())
75    }
76
77    /// Called before map key.
78    fn before_map_key(&mut self, _field: &Field) -> Result<()> {
79        Ok(())
80    }
81
82    /// Called after map key.
83    fn after_map_key(&mut self, _field: &Field) -> Result<()> {
84        Ok(())
85    }
86
87    /// Called before map value.
88    fn before_map_value(&mut self, _field: &Field) -> Result<()> {
89        Ok(())
90    }
91
92    /// Called after map value.
93    fn after_map_value(&mut self, _field: &Field) -> Result<()> {
94        Ok(())
95    }
96
97    /// Called after schema's type visited.
98    fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U>;
99
100    /// Called after struct's fields visited.
101    fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> Result<Self::T>;
102
103    /// Called after list fields visited.
104    fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T>;
105
106    /// Called after map's key and value fields visited.
107    fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result<Self::T>;
108
109    /// Called when see a primitive type.
110    fn primitive(&mut self, p: &DataType) -> Result<Self::T>;
111}
112
113/// Visiting a type in post order.
114fn visit_type<V: ArrowSchemaVisitor>(r#type: &DataType, visitor: &mut V) -> Result<V::T> {
115    match r#type {
116        p if p.is_primitive()
117            || matches!(
118                p,
119                DataType::Boolean
120                    | DataType::Utf8
121                    | DataType::LargeUtf8
122                    | DataType::Utf8View
123                    | DataType::Binary
124                    | DataType::LargeBinary
125                    | DataType::BinaryView
126                    | DataType::FixedSizeBinary(_)
127            ) =>
128        {
129            visitor.primitive(p)
130        }
131        DataType::List(element_field) => visit_list(r#type, element_field, visitor),
132        DataType::LargeList(element_field) => visit_list(r#type, element_field, visitor),
133        DataType::FixedSizeList(element_field, _) => visit_list(r#type, element_field, visitor),
134        DataType::Map(field, _) => match field.data_type() {
135            DataType::Struct(fields) => {
136                if fields.len() != 2 {
137                    return Err(Error::new(
138                        ErrorKind::DataInvalid,
139                        "Map field must have exactly 2 fields",
140                    ));
141                }
142
143                let key_field = &fields[0];
144                let value_field = &fields[1];
145
146                let key_result = {
147                    visitor.before_map_key(key_field)?;
148                    let ret = visit_type(key_field.data_type(), visitor)?;
149                    visitor.after_map_key(key_field)?;
150                    ret
151                };
152
153                let value_result = {
154                    visitor.before_map_value(value_field)?;
155                    let ret = visit_type(value_field.data_type(), visitor)?;
156                    visitor.after_map_value(value_field)?;
157                    ret
158                };
159
160                visitor.map(r#type, key_result, value_result)
161            }
162            _ => Err(Error::new(
163                ErrorKind::DataInvalid,
164                "Map field must have struct type",
165            )),
166        },
167        DataType::Struct(fields) => visit_struct(fields, visitor),
168        DataType::Dictionary(_key_type, value_type) => visit_type(value_type, visitor),
169        other => Err(Error::new(
170            ErrorKind::DataInvalid,
171            format!("Cannot visit Arrow data type: {other}"),
172        )),
173    }
174}
175
176/// Visit list types in post order.
177fn visit_list<V: ArrowSchemaVisitor>(
178    data_type: &DataType,
179    element_field: &Field,
180    visitor: &mut V,
181) -> Result<V::T> {
182    visitor.before_list_element(element_field)?;
183    let value = visit_type(element_field.data_type(), visitor)?;
184    visitor.after_list_element(element_field)?;
185    visitor.list(data_type, value)
186}
187
188/// Visit struct type in post order.
189fn visit_struct<V: ArrowSchemaVisitor>(fields: &Fields, visitor: &mut V) -> Result<V::T> {
190    let mut results = Vec::with_capacity(fields.len());
191    for field in fields {
192        visitor.before_field(field)?;
193        let result = visit_type(field.data_type(), visitor)?;
194        visitor.after_field(field)?;
195        results.push(result);
196    }
197
198    visitor.r#struct(fields, results)
199}
200
201/// Visit schema in post order.
202fn visit_schema<V: ArrowSchemaVisitor>(schema: &ArrowSchema, visitor: &mut V) -> Result<V::U> {
203    let mut results = Vec::with_capacity(schema.fields().len());
204    for field in schema.fields() {
205        visitor.before_field(field)?;
206        let result = visit_type(field.data_type(), visitor)?;
207        visitor.after_field(field)?;
208        results.push(result);
209    }
210    visitor.schema(schema, results)
211}
212
213/// Convert Arrow schema to Iceberg schema.
214///
215/// Iceberg schema fields require a unique field id, and this function assumes that each field
216/// in the provided Arrow schema contains a field id in its metadata. If the metadata is missing
217/// or the field id is not set, the conversion will fail
218pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result<Schema> {
219    let mut visitor = ArrowSchemaConverter::new();
220    visit_schema(schema, &mut visitor)
221}
222
223/// Convert Arrow schema to Iceberg schema with automatically assigned field IDs.
224///
225/// Unlike [`arrow_schema_to_schema`], this function does not require field IDs in the Arrow
226/// schema metadata. Instead, it automatically assigns unique field IDs starting from 1,
227/// following Iceberg's field ID assignment rules.
228///
229/// This is useful when converting Arrow schemas that don't originate from Iceberg tables,
230/// such as schemas from DataFusion or other Arrow-based systems.
231pub fn arrow_schema_to_schema_auto_assign_ids(schema: &ArrowSchema) -> Result<Schema> {
232    let mut visitor = ArrowSchemaConverter::new_with_field_ids_from(FIRST_FIELD_ID);
233    visit_schema(schema, &mut visitor)
234}
235
236/// Convert Arrow type to iceberg type.
237pub fn arrow_type_to_type(ty: &DataType) -> Result<Type> {
238    let mut visitor = ArrowSchemaConverter::new();
239    visit_type(ty, &mut visitor)
240}
241
242const ARROW_FIELD_DOC_KEY: &str = "doc";
243
244pub(super) fn get_field_id_from_metadata(field: &Field) -> Result<i32> {
245    if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
246        return value.parse::<i32>().map_err(|e| {
247            Error::new(
248                ErrorKind::DataInvalid,
249                "Failed to parse field id".to_string(),
250            )
251            .with_context("value", value)
252            .with_source(e)
253        });
254    }
255    Err(Error::new(
256        ErrorKind::DataInvalid,
257        "Field id not found in metadata",
258    ))
259}
260
261fn get_field_doc(field: &Field) -> Option<String> {
262    if let Some(value) = field.metadata().get(ARROW_FIELD_DOC_KEY) {
263        return Some(value.clone());
264    }
265    None
266}
267
268struct ArrowSchemaConverter {
269    /// When set, the schema builder will reassign field IDs starting from this value
270    /// using level-order traversal (breadth-first).
271    reassign_field_ids_from: Option<i32>,
272    /// Generates unique placeholder IDs for fields before reassignment.
273    /// Required because `ReassignFieldIds` builds an old-to-new ID mapping
274    /// that expects unique input IDs.
275    next_field_id: i32,
276}
277
278impl ArrowSchemaConverter {
279    fn new() -> Self {
280        Self {
281            reassign_field_ids_from: None,
282            next_field_id: 0,
283        }
284    }
285
286    fn new_with_field_ids_from(start_from: i32) -> Self {
287        Self {
288            reassign_field_ids_from: Some(start_from),
289            next_field_id: 0,
290        }
291    }
292
293    fn get_field_id(&mut self, field: &Field) -> Result<i32> {
294        if self.reassign_field_ids_from.is_some() {
295            // Field IDs will be reassigned by the schema builder.
296            // We need unique temporary IDs because ReassignFieldIds builds an
297            // old->new ID mapping that requires unique input IDs.
298            let temp_id = self.next_field_id;
299            self.next_field_id += 1;
300            Ok(temp_id)
301        } else {
302            // Get field ID from arrow field metadata
303            get_field_id_from_metadata(field)
304        }
305    }
306
307    fn convert_fields(
308        &mut self,
309        fields: &Fields,
310        field_results: &[Type],
311    ) -> Result<Vec<NestedFieldRef>> {
312        let mut results = Vec::with_capacity(fields.len());
313        for i in 0..fields.len() {
314            let field = &fields[i];
315            let field_type = &field_results[i];
316            let id = self.get_field_id(field)?;
317            let doc = get_field_doc(field);
318            let nested_field = NestedField {
319                id,
320                doc,
321                name: field.name().clone(),
322                required: !field.is_nullable(),
323                field_type: Box::new(field_type.clone()),
324                initial_default: None,
325                write_default: None,
326            };
327            results.push(Arc::new(nested_field));
328        }
329        Ok(results)
330    }
331}
332
333impl ArrowSchemaVisitor for ArrowSchemaConverter {
334    type T = Type;
335    type U = Schema;
336
337    fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U> {
338        let fields = self.convert_fields(schema.fields(), &values)?;
339        let mut builder = Schema::builder().with_fields(fields);
340        if let Some(start_from) = self.reassign_field_ids_from {
341            builder = builder.with_reassigned_field_ids(start_from)
342        }
343        builder.build()
344    }
345
346    fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> Result<Self::T> {
347        let fields = self.convert_fields(fields, &results)?;
348        Ok(Type::Struct(StructType::new(fields)))
349    }
350
351    fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T> {
352        let element_field = match list {
353            DataType::List(element_field) => element_field,
354            DataType::LargeList(element_field) => element_field,
355            DataType::FixedSizeList(element_field, _) => element_field,
356            _ => {
357                return Err(Error::new(
358                    ErrorKind::DataInvalid,
359                    "List type must have list data type",
360                ));
361            }
362        };
363
364        let id = self.get_field_id(element_field)?;
365        let doc = get_field_doc(element_field);
366        let mut element_field =
367            NestedField::list_element(id, value.clone(), !element_field.is_nullable());
368        if let Some(doc) = doc {
369            element_field = element_field.with_doc(doc);
370        }
371        let element_field = Arc::new(element_field);
372        Ok(Type::List(ListType { element_field }))
373    }
374
375    fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result<Self::T> {
376        match map {
377            DataType::Map(field, _) => match field.data_type() {
378                DataType::Struct(fields) => {
379                    if fields.len() != 2 {
380                        return Err(Error::new(
381                            ErrorKind::DataInvalid,
382                            "Map field must have exactly 2 fields",
383                        ));
384                    }
385
386                    let key_field = &fields[0];
387                    let value_field = &fields[1];
388
389                    let key_id = self.get_field_id(key_field)?;
390                    let key_doc = get_field_doc(key_field);
391                    let mut key_field = NestedField::map_key_element(key_id, key_value.clone());
392                    if let Some(doc) = key_doc {
393                        key_field = key_field.with_doc(doc);
394                    }
395                    let key_field = Arc::new(key_field);
396
397                    let value_id = self.get_field_id(value_field)?;
398                    let value_doc = get_field_doc(value_field);
399                    let mut value_field = NestedField::map_value_element(
400                        value_id,
401                        value.clone(),
402                        !value_field.is_nullable(),
403                    );
404                    if let Some(doc) = value_doc {
405                        value_field = value_field.with_doc(doc);
406                    }
407                    let value_field = Arc::new(value_field);
408
409                    Ok(Type::Map(MapType {
410                        key_field,
411                        value_field,
412                    }))
413                }
414                _ => Err(Error::new(
415                    ErrorKind::DataInvalid,
416                    "Map field must have struct type",
417                )),
418            },
419            _ => Err(Error::new(
420                ErrorKind::DataInvalid,
421                "Map type must have map data type",
422            )),
423        }
424    }
425
426    fn primitive(&mut self, p: &DataType) -> Result<Self::T> {
427        match p {
428            DataType::Boolean => Ok(Type::Primitive(PrimitiveType::Boolean)),
429            DataType::Int8 | DataType::Int16 | DataType::Int32 => {
430                Ok(Type::Primitive(PrimitiveType::Int))
431            }
432            DataType::UInt8 | DataType::UInt16 => Ok(Type::Primitive(PrimitiveType::Int)),
433            DataType::UInt32 => Ok(Type::Primitive(PrimitiveType::Long)),
434            DataType::Int64 => Ok(Type::Primitive(PrimitiveType::Long)),
435            DataType::UInt64 => {
436                // Block uint64 - no safe casting option
437                Err(Error::new(
438                    ErrorKind::DataInvalid,
439                    "UInt64 is not supported. Use Int64 for values ≤ 9,223,372,036,854,775,807 or Decimal(20,0) for full uint64 range.",
440                ))
441            }
442            DataType::Float32 => Ok(Type::Primitive(PrimitiveType::Float)),
443            DataType::Float64 => Ok(Type::Primitive(PrimitiveType::Double)),
444            DataType::Decimal128(p, s) => Type::decimal(*p as u32, *s as u32).map_err(|e| {
445                Error::new(
446                    ErrorKind::DataInvalid,
447                    "Failed to create decimal type".to_string(),
448                )
449                .with_source(e)
450            }),
451            DataType::Date32 => Ok(Type::Primitive(PrimitiveType::Date)),
452            DataType::Time64(unit) if unit == &TimeUnit::Microsecond => {
453                Ok(Type::Primitive(PrimitiveType::Time))
454            }
455            DataType::Timestamp(unit, None) if unit == &TimeUnit::Microsecond => {
456                Ok(Type::Primitive(PrimitiveType::Timestamp))
457            }
458            DataType::Timestamp(unit, None) if unit == &TimeUnit::Nanosecond => {
459                Ok(Type::Primitive(PrimitiveType::TimestampNs))
460            }
461            DataType::Timestamp(unit, Some(zone))
462                if unit == &TimeUnit::Microsecond
463                    && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") =>
464            {
465                Ok(Type::Primitive(PrimitiveType::Timestamptz))
466            }
467            DataType::Timestamp(unit, Some(zone))
468                if unit == &TimeUnit::Nanosecond
469                    && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") =>
470            {
471                Ok(Type::Primitive(PrimitiveType::TimestamptzNs))
472            }
473            DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
474                Ok(Type::Primitive(PrimitiveType::Binary))
475            }
476            DataType::FixedSizeBinary(width) => {
477                Ok(Type::Primitive(PrimitiveType::Fixed(*width as u64)))
478            }
479            DataType::Utf8View | DataType::Utf8 | DataType::LargeUtf8 => {
480                Ok(Type::Primitive(PrimitiveType::String))
481            }
482            _ => Err(Error::new(
483                ErrorKind::DataInvalid,
484                format!("Unsupported Arrow data type: {p}"),
485            )),
486        }
487    }
488}
489
490struct ToArrowSchemaConverter;
491
492enum ArrowSchemaOrFieldOrType {
493    Schema(ArrowSchema),
494    Field(Field),
495    Type(DataType),
496}
497
498impl SchemaVisitor for ToArrowSchemaConverter {
499    type T = ArrowSchemaOrFieldOrType;
500
501    fn schema(
502        &mut self,
503        _schema: &crate::spec::Schema,
504        value: ArrowSchemaOrFieldOrType,
505    ) -> crate::Result<ArrowSchemaOrFieldOrType> {
506        let struct_type = match value {
507            ArrowSchemaOrFieldOrType::Type(DataType::Struct(fields)) => fields,
508            _ => unreachable!(),
509        };
510        Ok(ArrowSchemaOrFieldOrType::Schema(ArrowSchema::new(
511            struct_type,
512        )))
513    }
514
515    fn field(
516        &mut self,
517        field: &crate::spec::NestedFieldRef,
518        value: ArrowSchemaOrFieldOrType,
519    ) -> crate::Result<ArrowSchemaOrFieldOrType> {
520        let ty = match value {
521            ArrowSchemaOrFieldOrType::Type(ty) => ty,
522            _ => unreachable!(),
523        };
524        let metadata = if let Some(doc) = &field.doc {
525            HashMap::from([
526                (PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string()),
527                (ARROW_FIELD_DOC_KEY.to_string(), doc.clone()),
528            ])
529        } else {
530            HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string())])
531        };
532        Ok(ArrowSchemaOrFieldOrType::Field(
533            Field::new(field.name.clone(), ty, !field.required).with_metadata(metadata),
534        ))
535    }
536
537    fn r#struct(
538        &mut self,
539        _: &crate::spec::StructType,
540        results: Vec<ArrowSchemaOrFieldOrType>,
541    ) -> crate::Result<ArrowSchemaOrFieldOrType> {
542        let fields = results
543            .into_iter()
544            .map(|result| match result {
545                ArrowSchemaOrFieldOrType::Field(field) => field,
546                _ => unreachable!(),
547            })
548            .collect();
549        Ok(ArrowSchemaOrFieldOrType::Type(DataType::Struct(fields)))
550    }
551
552    fn list(
553        &mut self,
554        list: &crate::spec::ListType,
555        value: ArrowSchemaOrFieldOrType,
556    ) -> crate::Result<Self::T> {
557        let field = match self.field(&list.element_field, value)? {
558            ArrowSchemaOrFieldOrType::Field(field) => field,
559            _ => unreachable!(),
560        };
561        let meta = if let Some(doc) = &list.element_field.doc {
562            HashMap::from([
563                (
564                    PARQUET_FIELD_ID_META_KEY.to_string(),
565                    list.element_field.id.to_string(),
566                ),
567                (ARROW_FIELD_DOC_KEY.to_string(), doc.clone()),
568            ])
569        } else {
570            HashMap::from([(
571                PARQUET_FIELD_ID_META_KEY.to_string(),
572                list.element_field.id.to_string(),
573            )])
574        };
575        let field = field.with_metadata(meta);
576        Ok(ArrowSchemaOrFieldOrType::Type(DataType::List(Arc::new(
577            field,
578        ))))
579    }
580
581    fn map(
582        &mut self,
583        map: &crate::spec::MapType,
584        key_value: ArrowSchemaOrFieldOrType,
585        value: ArrowSchemaOrFieldOrType,
586    ) -> crate::Result<ArrowSchemaOrFieldOrType> {
587        let key_field = match self.field(&map.key_field, key_value)? {
588            ArrowSchemaOrFieldOrType::Field(field) => field,
589            _ => unreachable!(),
590        };
591        let value_field = match self.field(&map.value_field, value)? {
592            ArrowSchemaOrFieldOrType::Field(field) => field,
593            _ => unreachable!(),
594        };
595        let field = Field::new(
596            DEFAULT_MAP_FIELD_NAME,
597            DataType::Struct(vec![key_field, value_field].into()),
598            // Map field is always not nullable
599            false,
600        );
601
602        Ok(ArrowSchemaOrFieldOrType::Type(DataType::Map(
603            field.into(),
604            false,
605        )))
606    }
607
608    fn primitive(
609        &mut self,
610        p: &crate::spec::PrimitiveType,
611    ) -> crate::Result<ArrowSchemaOrFieldOrType> {
612        match p {
613            crate::spec::PrimitiveType::Boolean => {
614                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Boolean))
615            }
616            crate::spec::PrimitiveType::Int => Ok(ArrowSchemaOrFieldOrType::Type(DataType::Int32)),
617            crate::spec::PrimitiveType::Long => Ok(ArrowSchemaOrFieldOrType::Type(DataType::Int64)),
618            crate::spec::PrimitiveType::Float => {
619                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Float32))
620            }
621            crate::spec::PrimitiveType::Double => {
622                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Float64))
623            }
624            crate::spec::PrimitiveType::Decimal { precision, scale } => {
625                let (precision, scale) = {
626                    let precision: u8 = precision.to_owned().try_into().map_err(|err| {
627                        Error::new(
628                            crate::ErrorKind::DataInvalid,
629                            "incompatible precision for decimal type convert",
630                        )
631                        .with_source(err)
632                    })?;
633                    let scale = scale.to_owned().try_into().map_err(|err| {
634                        Error::new(
635                            crate::ErrorKind::DataInvalid,
636                            "incompatible scale for decimal type convert",
637                        )
638                        .with_source(err)
639                    })?;
640                    (precision, scale)
641                };
642                validate_decimal_precision_and_scale::<Decimal128Type>(precision, scale).map_err(
643                    |err| {
644                        Error::new(
645                            crate::ErrorKind::DataInvalid,
646                            "incompatible precision and scale for decimal type convert",
647                        )
648                        .with_source(err)
649                    },
650                )?;
651                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Decimal128(
652                    precision, scale,
653                )))
654            }
655            crate::spec::PrimitiveType::Date => {
656                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Date32))
657            }
658            crate::spec::PrimitiveType::Time => Ok(ArrowSchemaOrFieldOrType::Type(
659                DataType::Time64(TimeUnit::Microsecond),
660            )),
661            crate::spec::PrimitiveType::Timestamp => Ok(ArrowSchemaOrFieldOrType::Type(
662                DataType::Timestamp(TimeUnit::Microsecond, None),
663            )),
664            crate::spec::PrimitiveType::Timestamptz => Ok(ArrowSchemaOrFieldOrType::Type(
665                // Timestampz always stored as UTC
666                DataType::Timestamp(TimeUnit::Microsecond, Some(UTC_TIME_ZONE.into())),
667            )),
668            crate::spec::PrimitiveType::TimestampNs => Ok(ArrowSchemaOrFieldOrType::Type(
669                DataType::Timestamp(TimeUnit::Nanosecond, None),
670            )),
671            crate::spec::PrimitiveType::TimestamptzNs => Ok(ArrowSchemaOrFieldOrType::Type(
672                // Store timestamptz_ns as UTC
673                DataType::Timestamp(TimeUnit::Nanosecond, Some(UTC_TIME_ZONE.into())),
674            )),
675            crate::spec::PrimitiveType::String => {
676                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Utf8))
677            }
678            crate::spec::PrimitiveType::Uuid => Ok(ArrowSchemaOrFieldOrType::Type(
679                DataType::FixedSizeBinary(16),
680            )),
681            crate::spec::PrimitiveType::Fixed(len) => Ok(ArrowSchemaOrFieldOrType::Type(
682                i32::try_from(*len)
683                    .ok()
684                    .map(DataType::FixedSizeBinary)
685                    .unwrap_or(DataType::LargeBinary),
686            )),
687            crate::spec::PrimitiveType::Binary => {
688                Ok(ArrowSchemaOrFieldOrType::Type(DataType::LargeBinary))
689            }
690        }
691    }
692}
693
694/// Convert iceberg schema to an arrow schema.
695pub fn schema_to_arrow_schema(schema: &crate::spec::Schema) -> crate::Result<ArrowSchema> {
696    let mut converter = ToArrowSchemaConverter;
697    match crate::spec::visit_schema(schema, &mut converter)? {
698        ArrowSchemaOrFieldOrType::Schema(schema) => Ok(schema),
699        _ => unreachable!(),
700    }
701}
702
703/// Convert iceberg type to an arrow type.
704pub fn type_to_arrow_type(ty: &crate::spec::Type) -> crate::Result<DataType> {
705    let mut converter = ToArrowSchemaConverter;
706    match crate::spec::visit_type(ty, &mut converter)? {
707        ArrowSchemaOrFieldOrType::Type(ty) => Ok(ty),
708        _ => unreachable!(),
709    }
710}
711
712/// Convert Iceberg Datum to Arrow Datum.
713pub(crate) fn get_arrow_datum(datum: &Datum) -> Result<Arc<dyn ArrowDatum + Send + Sync>> {
714    match (datum.data_type(), datum.literal()) {
715        (PrimitiveType::Boolean, PrimitiveLiteral::Boolean(value)) => {
716            Ok(Arc::new(BooleanArray::new_scalar(*value)))
717        }
718        (PrimitiveType::Int, PrimitiveLiteral::Int(value)) => {
719            Ok(Arc::new(Int32Array::new_scalar(*value)))
720        }
721        (PrimitiveType::Long, PrimitiveLiteral::Long(value)) => {
722            Ok(Arc::new(Int64Array::new_scalar(*value)))
723        }
724        (PrimitiveType::Float, PrimitiveLiteral::Float(value)) => {
725            Ok(Arc::new(Float32Array::new_scalar(value.into_inner())))
726        }
727        (PrimitiveType::Double, PrimitiveLiteral::Double(value)) => {
728            Ok(Arc::new(Float64Array::new_scalar(value.into_inner())))
729        }
730        (PrimitiveType::String, PrimitiveLiteral::String(value)) => {
731            Ok(Arc::new(StringArray::new_scalar(value.as_str())))
732        }
733        (PrimitiveType::Binary, PrimitiveLiteral::Binary(value)) => {
734            Ok(Arc::new(BinaryArray::new_scalar(value.as_slice())))
735        }
736        (PrimitiveType::Date, PrimitiveLiteral::Int(value)) => {
737            Ok(Arc::new(Date32Array::new_scalar(*value)))
738        }
739        (PrimitiveType::Timestamp, PrimitiveLiteral::Long(value)) => {
740            Ok(Arc::new(TimestampMicrosecondArray::new_scalar(*value)))
741        }
742        (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(value)) => Ok(Arc::new(Scalar::new(
743            TimestampMicrosecondArray::new(vec![*value; 1].into(), None).with_timezone_utc(),
744        ))),
745        (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(value)) => {
746            Ok(Arc::new(TimestampNanosecondArray::new_scalar(*value)))
747        }
748        (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(value)) => Ok(Arc::new(Scalar::new(
749            TimestampNanosecondArray::new(vec![*value; 1].into(), None).with_timezone_utc(),
750        ))),
751        (PrimitiveType::Decimal { precision, scale }, PrimitiveLiteral::Int128(value)) => {
752            let array = Decimal128Array::from_value(*value, 1)
753                .with_precision_and_scale(*precision as _, *scale as _)
754                .unwrap();
755            Ok(Arc::new(Scalar::new(array)))
756        }
757        (PrimitiveType::Uuid, PrimitiveLiteral::UInt128(value)) => {
758            let bytes = Uuid::from_u128(*value).into_bytes();
759            let array = FixedSizeBinaryArray::try_from_iter(vec![bytes].into_iter()).unwrap();
760            Ok(Arc::new(Scalar::new(array)))
761        }
762
763        (primitive_type, _) => Err(Error::new(
764            ErrorKind::FeatureUnsupported,
765            format!("Converting datum from type {primitive_type:?} to arrow not supported yet."),
766        )),
767    }
768}
769
770pub(crate) fn get_parquet_stat_min_as_datum(
771    primitive_type: &PrimitiveType,
772    stats: &Statistics,
773) -> Result<Option<Datum>> {
774    Ok(match (primitive_type, stats) {
775        (PrimitiveType::Boolean, Statistics::Boolean(stats)) => {
776            stats.min_opt().map(|val| Datum::bool(*val))
777        }
778        (PrimitiveType::Int, Statistics::Int32(stats)) => {
779            stats.min_opt().map(|val| Datum::int(*val))
780        }
781        (PrimitiveType::Date, Statistics::Int32(stats)) => {
782            stats.min_opt().map(|val| Datum::date(*val))
783        }
784        (PrimitiveType::Long, Statistics::Int64(stats)) => {
785            stats.min_opt().map(|val| Datum::long(*val))
786        }
787        (PrimitiveType::Time, Statistics::Int64(stats)) => {
788            let Some(val) = stats.min_opt() else {
789                return Ok(None);
790            };
791
792            Some(Datum::time_micros(*val)?)
793        }
794        (PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
795            stats.min_opt().map(|val| Datum::timestamp_micros(*val))
796        }
797        (PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
798            stats.min_opt().map(|val| Datum::timestamptz_micros(*val))
799        }
800        (PrimitiveType::TimestampNs, Statistics::Int64(stats)) => {
801            stats.min_opt().map(|val| Datum::timestamp_nanos(*val))
802        }
803        (PrimitiveType::TimestamptzNs, Statistics::Int64(stats)) => {
804            stats.min_opt().map(|val| Datum::timestamptz_nanos(*val))
805        }
806        (PrimitiveType::Float, Statistics::Float(stats)) => {
807            stats.min_opt().map(|val| Datum::float(*val))
808        }
809        (PrimitiveType::Double, Statistics::Double(stats)) => {
810            stats.min_opt().map(|val| Datum::double(*val))
811        }
812        (PrimitiveType::String, Statistics::ByteArray(stats)) => {
813            let Some(val) = stats.min_opt() else {
814                return Ok(None);
815            };
816
817            Some(Datum::string(val.as_utf8()?))
818        }
819        (
820            PrimitiveType::Decimal {
821                precision: _,
822                scale: _,
823            },
824            Statistics::ByteArray(stats),
825        ) => {
826            let Some(bytes) = stats.min_bytes_opt() else {
827                return Ok(None);
828            };
829            Some(Datum::new(
830                primitive_type.clone(),
831                PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
832            ))
833        }
834        (
835            PrimitiveType::Decimal {
836                precision: _,
837                scale: _,
838            },
839            Statistics::FixedLenByteArray(stats),
840        ) => {
841            let Some(bytes) = stats.min_bytes_opt() else {
842                return Ok(None);
843            };
844            Some(Datum::new(
845                primitive_type.clone(),
846                PrimitiveLiteral::Int128(i128_from_be_bytes(bytes).ok_or_else(|| {
847                    Error::new(
848                        ErrorKind::DataInvalid,
849                        format!("Can't convert bytes to i128: {bytes:?}"),
850                    )
851                })?),
852            ))
853        }
854        (
855            PrimitiveType::Decimal {
856                precision: _,
857                scale: _,
858            },
859            Statistics::Int32(stats),
860        ) => stats.min_opt().map(|val| {
861            Datum::new(
862                primitive_type.clone(),
863                PrimitiveLiteral::Int128(i128::from(*val)),
864            )
865        }),
866
867        (
868            PrimitiveType::Decimal {
869                precision: _,
870                scale: _,
871            },
872            Statistics::Int64(stats),
873        ) => stats.min_opt().map(|val| {
874            Datum::new(
875                primitive_type.clone(),
876                PrimitiveLiteral::Int128(i128::from(*val)),
877            )
878        }),
879        (PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) => {
880            let Some(bytes) = stats.min_bytes_opt() else {
881                return Ok(None);
882            };
883            if bytes.len() != 16 {
884                return Err(Error::new(
885                    ErrorKind::Unexpected,
886                    "Invalid length of uuid bytes.",
887                ));
888            }
889            Some(Datum::uuid(Uuid::from_bytes(
890                bytes[..16].try_into().unwrap(),
891            )))
892        }
893        (PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => {
894            let Some(bytes) = stat.min_bytes_opt() else {
895                return Ok(None);
896            };
897            if bytes.len() != *len as usize {
898                return Err(Error::new(
899                    ErrorKind::Unexpected,
900                    "Invalid length of fixed bytes.",
901                ));
902            }
903            Some(Datum::fixed(bytes.to_vec()))
904        }
905        (PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
906            return Ok(stat
907                .min_bytes_opt()
908                .map(|bytes| Datum::binary(bytes.to_vec())));
909        }
910        _ => {
911            return Ok(None);
912        }
913    })
914}
915
916pub(crate) fn get_parquet_stat_max_as_datum(
917    primitive_type: &PrimitiveType,
918    stats: &Statistics,
919) -> Result<Option<Datum>> {
920    Ok(match (primitive_type, stats) {
921        (PrimitiveType::Boolean, Statistics::Boolean(stats)) => {
922            stats.max_opt().map(|val| Datum::bool(*val))
923        }
924        (PrimitiveType::Int, Statistics::Int32(stats)) => {
925            stats.max_opt().map(|val| Datum::int(*val))
926        }
927        (PrimitiveType::Date, Statistics::Int32(stats)) => {
928            stats.max_opt().map(|val| Datum::date(*val))
929        }
930        (PrimitiveType::Long, Statistics::Int64(stats)) => {
931            stats.max_opt().map(|val| Datum::long(*val))
932        }
933        (PrimitiveType::Time, Statistics::Int64(stats)) => {
934            let Some(val) = stats.max_opt() else {
935                return Ok(None);
936            };
937
938            Some(Datum::time_micros(*val)?)
939        }
940        (PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
941            stats.max_opt().map(|val| Datum::timestamp_micros(*val))
942        }
943        (PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
944            stats.max_opt().map(|val| Datum::timestamptz_micros(*val))
945        }
946        (PrimitiveType::TimestampNs, Statistics::Int64(stats)) => {
947            stats.max_opt().map(|val| Datum::timestamp_nanos(*val))
948        }
949        (PrimitiveType::TimestamptzNs, Statistics::Int64(stats)) => {
950            stats.max_opt().map(|val| Datum::timestamptz_nanos(*val))
951        }
952        (PrimitiveType::Float, Statistics::Float(stats)) => {
953            stats.max_opt().map(|val| Datum::float(*val))
954        }
955        (PrimitiveType::Double, Statistics::Double(stats)) => {
956            stats.max_opt().map(|val| Datum::double(*val))
957        }
958        (PrimitiveType::String, Statistics::ByteArray(stats)) => {
959            let Some(val) = stats.max_opt() else {
960                return Ok(None);
961            };
962
963            Some(Datum::string(val.as_utf8()?))
964        }
965        (
966            PrimitiveType::Decimal {
967                precision: _,
968                scale: _,
969            },
970            Statistics::ByteArray(stats),
971        ) => {
972            let Some(bytes) = stats.max_bytes_opt() else {
973                return Ok(None);
974            };
975            Some(Datum::new(
976                primitive_type.clone(),
977                PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
978            ))
979        }
980        (
981            PrimitiveType::Decimal {
982                precision: _,
983                scale: _,
984            },
985            Statistics::FixedLenByteArray(stats),
986        ) => {
987            let Some(bytes) = stats.max_bytes_opt() else {
988                return Ok(None);
989            };
990            Some(Datum::new(
991                primitive_type.clone(),
992                PrimitiveLiteral::Int128(i128_from_be_bytes(bytes).ok_or_else(|| {
993                    Error::new(
994                        ErrorKind::DataInvalid,
995                        format!("Can't convert bytes to i128: {bytes:?}"),
996                    )
997                })?),
998            ))
999        }
1000        (
1001            PrimitiveType::Decimal {
1002                precision: _,
1003                scale: _,
1004            },
1005            Statistics::Int32(stats),
1006        ) => stats.max_opt().map(|val| {
1007            Datum::new(
1008                primitive_type.clone(),
1009                PrimitiveLiteral::Int128(i128::from(*val)),
1010            )
1011        }),
1012
1013        (
1014            PrimitiveType::Decimal {
1015                precision: _,
1016                scale: _,
1017            },
1018            Statistics::Int64(stats),
1019        ) => stats.max_opt().map(|val| {
1020            Datum::new(
1021                primitive_type.clone(),
1022                PrimitiveLiteral::Int128(i128::from(*val)),
1023            )
1024        }),
1025        (PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) => {
1026            let Some(bytes) = stats.max_bytes_opt() else {
1027                return Ok(None);
1028            };
1029            if bytes.len() != 16 {
1030                return Err(Error::new(
1031                    ErrorKind::Unexpected,
1032                    "Invalid length of uuid bytes.",
1033                ));
1034            }
1035            Some(Datum::uuid(Uuid::from_bytes(
1036                bytes[..16].try_into().unwrap(),
1037            )))
1038        }
1039        (PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => {
1040            let Some(bytes) = stat.max_bytes_opt() else {
1041                return Ok(None);
1042            };
1043            if bytes.len() != *len as usize {
1044                return Err(Error::new(
1045                    ErrorKind::Unexpected,
1046                    "Invalid length of fixed bytes.",
1047                ));
1048            }
1049            Some(Datum::fixed(bytes.to_vec()))
1050        }
1051        (PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
1052            return Ok(stat
1053                .max_bytes_opt()
1054                .map(|bytes| Datum::binary(bytes.to_vec())));
1055        }
1056        _ => {
1057            return Ok(None);
1058        }
1059    })
1060}
1061
1062impl TryFrom<&ArrowSchema> for crate::spec::Schema {
1063    type Error = Error;
1064
1065    fn try_from(schema: &ArrowSchema) -> crate::Result<Self> {
1066        arrow_schema_to_schema(schema)
1067    }
1068}
1069
1070impl TryFrom<&crate::spec::Schema> for ArrowSchema {
1071    type Error = Error;
1072
1073    fn try_from(schema: &crate::spec::Schema) -> crate::Result<Self> {
1074        schema_to_arrow_schema(schema)
1075    }
1076}
1077
1078/// Converts a Datum (Iceberg type + primitive literal) to its corresponding Arrow DataType
1079/// with Run-End Encoding (REE).
1080///
1081/// This function is used for constant fields in record batches, where all values are the same.
1082/// Run-End Encoding provides efficient storage for such constant columns.
1083///
1084/// # Arguments
1085/// * `datum` - The Datum to convert, which contains both type and value information
1086///
1087/// # Returns
1088/// Arrow DataType with Run-End Encoding applied
1089///
1090/// # Example
1091/// ```
1092/// use iceberg::arrow::datum_to_arrow_type_with_ree;
1093/// use iceberg::spec::Datum;
1094///
1095/// let datum = Datum::string("test_file.parquet");
1096/// let ree_type = datum_to_arrow_type_with_ree(&datum);
1097/// // Returns: RunEndEncoded(Int32, Utf8)
1098/// ```
1099pub fn datum_to_arrow_type_with_ree(datum: &Datum) -> DataType {
1100    // Helper to create REE type with the given values type.
1101    // Note: values field is nullable as Arrow expects this when building the
1102    // final Arrow schema with `RunArray::try_new`.
1103    let make_ree = |values_type: DataType| -> DataType {
1104        let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false));
1105        let values_field = Arc::new(Field::new("values", values_type, true));
1106        DataType::RunEndEncoded(run_ends_field, values_field)
1107    };
1108
1109    // Match on the PrimitiveType from the Datum to determine the Arrow type
1110    match datum.data_type() {
1111        PrimitiveType::Boolean => make_ree(DataType::Boolean),
1112        PrimitiveType::Int => make_ree(DataType::Int32),
1113        PrimitiveType::Long => make_ree(DataType::Int64),
1114        PrimitiveType::Float => make_ree(DataType::Float32),
1115        PrimitiveType::Double => make_ree(DataType::Float64),
1116        PrimitiveType::Date => make_ree(DataType::Date32),
1117        PrimitiveType::Time => make_ree(DataType::Int64),
1118        PrimitiveType::Timestamp => make_ree(DataType::Int64),
1119        PrimitiveType::Timestamptz => make_ree(DataType::Int64),
1120        PrimitiveType::TimestampNs => make_ree(DataType::Int64),
1121        PrimitiveType::TimestamptzNs => make_ree(DataType::Int64),
1122        PrimitiveType::String => make_ree(DataType::Utf8),
1123        PrimitiveType::Uuid => make_ree(DataType::Binary),
1124        PrimitiveType::Fixed(_) => make_ree(DataType::Binary),
1125        PrimitiveType::Binary => make_ree(DataType::Binary),
1126        PrimitiveType::Decimal { precision, scale } => {
1127            make_ree(DataType::Decimal128(*precision as u8, *scale as i8))
1128        }
1129    }
1130}
1131
1132/// A visitor that strips metadata from an Arrow schema.
1133///
1134/// This visitor recursively removes all metadata from fields at every level of the schema,
1135/// including nested struct, list, and map fields. This is useful for schema comparison
1136/// where metadata differences should be ignored.
1137struct MetadataStripVisitor {
1138    /// Stack to track field information during traversal
1139    field_stack: Vec<Field>,
1140}
1141
1142impl MetadataStripVisitor {
1143    fn new() -> Self {
1144        Self {
1145            field_stack: Vec::new(),
1146        }
1147    }
1148}
1149
1150impl ArrowSchemaVisitor for MetadataStripVisitor {
1151    type T = Field;
1152    type U = ArrowSchema;
1153
1154    fn before_field(&mut self, field: &Field) -> Result<()> {
1155        // Store field name and nullability for later reconstruction
1156        self.field_stack.push(Field::new(
1157            field.name(),
1158            DataType::Null, // Placeholder, will be replaced
1159            field.is_nullable(),
1160        ));
1161        Ok(())
1162    }
1163
1164    fn after_field(&mut self, _field: &Field) -> Result<()> {
1165        Ok(())
1166    }
1167
1168    fn schema(&mut self, _schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U> {
1169        Ok(ArrowSchema::new(values))
1170    }
1171
1172    fn r#struct(&mut self, _fields: &Fields, results: Vec<Self::T>) -> Result<Self::T> {
1173        // Pop the struct field from the stack
1174        let field_info = self
1175            .field_stack
1176            .pop()
1177            .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Field stack underflow in struct"))?;
1178
1179        // Reconstruct struct field without metadata
1180        Ok(Field::new(
1181            field_info.name(),
1182            DataType::Struct(Fields::from(results)),
1183            field_info.is_nullable(),
1184        ))
1185    }
1186
1187    fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T> {
1188        // Pop the list field from the stack
1189        let field_info = self
1190            .field_stack
1191            .pop()
1192            .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Field stack underflow in list"))?;
1193
1194        // Reconstruct list field without metadata
1195        let list_type = match list {
1196            DataType::List(_) => DataType::List(Arc::new(value)),
1197            DataType::LargeList(_) => DataType::LargeList(Arc::new(value)),
1198            DataType::FixedSizeList(_, size) => DataType::FixedSizeList(Arc::new(value), *size),
1199            _ => {
1200                return Err(Error::new(
1201                    ErrorKind::Unexpected,
1202                    format!("Expected list type, got {list}"),
1203                ));
1204            }
1205        };
1206
1207        Ok(Field::new(
1208            field_info.name(),
1209            list_type,
1210            field_info.is_nullable(),
1211        ))
1212    }
1213
1214    fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result<Self::T> {
1215        // Pop the map field from the stack
1216        let field_info = self
1217            .field_stack
1218            .pop()
1219            .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Field stack underflow in map"))?;
1220
1221        // Reconstruct the map's struct field (contains key and value)
1222        let struct_field = Field::new(
1223            DEFAULT_MAP_FIELD_NAME,
1224            DataType::Struct(Fields::from(vec![key_value, value])),
1225            false,
1226        );
1227
1228        // Get the sorted flag from the original map type
1229        let sorted = match map {
1230            DataType::Map(_, sorted) => *sorted,
1231            _ => {
1232                return Err(Error::new(
1233                    ErrorKind::Unexpected,
1234                    format!("Expected map type, got {map}"),
1235                ));
1236            }
1237        };
1238
1239        // Reconstruct map field without metadata
1240        Ok(Field::new(
1241            field_info.name(),
1242            DataType::Map(Arc::new(struct_field), sorted),
1243            field_info.is_nullable(),
1244        ))
1245    }
1246
1247    fn primitive(&mut self, p: &DataType) -> Result<Self::T> {
1248        // Pop the primitive field from the stack
1249        let field_info = self.field_stack.pop().ok_or_else(|| {
1250            Error::new(ErrorKind::Unexpected, "Field stack underflow in primitive")
1251        })?;
1252
1253        // Return field without metadata
1254        Ok(Field::new(
1255            field_info.name(),
1256            p.clone(),
1257            field_info.is_nullable(),
1258        ))
1259    }
1260}
1261
1262/// Strips all metadata from an Arrow schema and its nested fields.
1263///
1264/// This function recursively removes metadata from all fields at every level of the schema,
1265/// including nested struct, list, and map fields. This is useful for schema comparison
1266/// where metadata differences should be ignored.
1267///
1268/// # Arguments
1269/// * `schema` - The Arrow schema to strip metadata from
1270///
1271/// # Returns
1272/// A new Arrow schema with all metadata removed, or an error if the schema structure
1273/// is invalid.
1274///
1275/// # Example
1276/// ```
1277/// use std::collections::HashMap;
1278///
1279/// use arrow_schema::{DataType, Field, Schema as ArrowSchema};
1280/// use iceberg::arrow::strip_metadata_from_schema;
1281///
1282/// let mut metadata = HashMap::new();
1283/// metadata.insert("key".to_string(), "value".to_string());
1284///
1285/// let field = Field::new("col1", DataType::Int32, false).with_metadata(metadata);
1286/// let schema = ArrowSchema::new(vec![field]);
1287///
1288/// let stripped = strip_metadata_from_schema(&schema).unwrap();
1289/// assert!(stripped.field(0).metadata().is_empty());
1290/// ```
1291pub fn strip_metadata_from_schema(schema: &ArrowSchema) -> Result<ArrowSchema> {
1292    let mut visitor = MetadataStripVisitor::new();
1293    visit_schema(schema, &mut visitor)
1294}
1295
1296#[cfg(test)]
1297mod tests {
1298    use std::collections::HashMap;
1299    use std::sync::Arc;
1300
1301    use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1302
1303    use super::*;
1304    use crate::spec::decimal_utils::decimal_new;
1305    use crate::spec::{Literal, Schema};
1306
1307    /// Create a simple field with metadata.
1308    fn simple_field(name: &str, ty: DataType, nullable: bool, value: &str) -> Field {
1309        Field::new(name, ty, nullable).with_metadata(HashMap::from([(
1310            PARQUET_FIELD_ID_META_KEY.to_string(),
1311            value.to_string(),
1312        )]))
1313    }
1314
1315    fn arrow_schema_for_arrow_schema_to_schema_test() -> ArrowSchema {
1316        let fields = Fields::from(vec![
1317            simple_field("key", DataType::Int32, false, "28"),
1318            simple_field("value", DataType::Utf8, true, "29"),
1319        ]);
1320
1321        let r#struct = DataType::Struct(fields);
1322        let map = DataType::Map(
1323            Arc::new(simple_field(DEFAULT_MAP_FIELD_NAME, r#struct, false, "17")),
1324            false,
1325        );
1326        let dictionary = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1327
1328        let fields = Fields::from(vec![
1329            simple_field("aa", DataType::Int32, false, "18"),
1330            simple_field("bb", DataType::Utf8, true, "19"),
1331            simple_field(
1332                "cc",
1333                DataType::Timestamp(TimeUnit::Microsecond, None),
1334                false,
1335                "20",
1336            ),
1337        ]);
1338
1339        let r#struct = DataType::Struct(fields);
1340
1341        ArrowSchema::new(vec![
1342            simple_field("a", DataType::Int32, false, "2"),
1343            simple_field("b", DataType::Int64, false, "1"),
1344            simple_field("c", DataType::Utf8, false, "3"),
1345            simple_field("n", DataType::Utf8, false, "21"),
1346            simple_field(
1347                "d",
1348                DataType::Timestamp(TimeUnit::Microsecond, None),
1349                true,
1350                "4",
1351            ),
1352            simple_field("e", DataType::Boolean, true, "6"),
1353            simple_field("f", DataType::Float32, false, "5"),
1354            simple_field("g", DataType::Float64, false, "7"),
1355            simple_field("p", DataType::Decimal128(10, 2), false, "27"),
1356            simple_field("h", DataType::Date32, false, "8"),
1357            simple_field("i", DataType::Time64(TimeUnit::Microsecond), false, "9"),
1358            simple_field(
1359                "j",
1360                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1361                false,
1362                "10",
1363            ),
1364            simple_field(
1365                "k",
1366                DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1367                false,
1368                "12",
1369            ),
1370            simple_field("l", DataType::Binary, false, "13"),
1371            simple_field("o", DataType::LargeBinary, false, "22"),
1372            simple_field("m", DataType::FixedSizeBinary(10), false, "11"),
1373            simple_field(
1374                "list",
1375                DataType::List(Arc::new(simple_field(
1376                    "element",
1377                    DataType::Int32,
1378                    false,
1379                    "15",
1380                ))),
1381                true,
1382                "14",
1383            ),
1384            simple_field(
1385                "large_list",
1386                DataType::LargeList(Arc::new(simple_field(
1387                    "element",
1388                    DataType::Utf8,
1389                    false,
1390                    "23",
1391                ))),
1392                true,
1393                "24",
1394            ),
1395            simple_field(
1396                "fixed_list",
1397                DataType::FixedSizeList(
1398                    Arc::new(simple_field("element", DataType::Binary, false, "26")),
1399                    10,
1400                ),
1401                true,
1402                "25",
1403            ),
1404            simple_field("map", map, false, "16"),
1405            simple_field("struct", r#struct, false, "17"),
1406            simple_field("dictionary", dictionary, false, "30"),
1407        ])
1408    }
1409
1410    fn iceberg_schema_for_arrow_schema_to_schema_test() -> Schema {
1411        let schema_json = r#"{
1412            "type":"struct",
1413            "schema-id":0,
1414            "fields":[
1415                {
1416                    "id":2,
1417                    "name":"a",
1418                    "required":true,
1419                    "type":"int"
1420                },
1421                {
1422                    "id":1,
1423                    "name":"b",
1424                    "required":true,
1425                    "type":"long"
1426                },
1427                {
1428                    "id":3,
1429                    "name":"c",
1430                    "required":true,
1431                    "type":"string"
1432                },
1433                {
1434                    "id":21,
1435                    "name":"n",
1436                    "required":true,
1437                    "type":"string"
1438                },
1439                {
1440                    "id":4,
1441                    "name":"d",
1442                    "required":false,
1443                    "type":"timestamp"
1444                },
1445                {
1446                    "id":6,
1447                    "name":"e",
1448                    "required":false,
1449                    "type":"boolean"
1450                },
1451                {
1452                    "id":5,
1453                    "name":"f",
1454                    "required":true,
1455                    "type":"float"
1456                },
1457                {
1458                    "id":7,
1459                    "name":"g",
1460                    "required":true,
1461                    "type":"double"
1462                },
1463                {
1464                    "id":27,
1465                    "name":"p",
1466                    "required":true,
1467                    "type":"decimal(10,2)"
1468                },
1469                {
1470                    "id":8,
1471                    "name":"h",
1472                    "required":true,
1473                    "type":"date"
1474                },
1475                {
1476                    "id":9,
1477                    "name":"i",
1478                    "required":true,
1479                    "type":"time"
1480                },
1481                {
1482                    "id":10,
1483                    "name":"j",
1484                    "required":true,
1485                    "type":"timestamptz"
1486                },
1487                {
1488                    "id":12,
1489                    "name":"k",
1490                    "required":true,
1491                    "type":"timestamptz"
1492                },
1493                {
1494                    "id":13,
1495                    "name":"l",
1496                    "required":true,
1497                    "type":"binary"
1498                },
1499                {
1500                    "id":22,
1501                    "name":"o",
1502                    "required":true,
1503                    "type":"binary"
1504                },
1505                {
1506                    "id":11,
1507                    "name":"m",
1508                    "required":true,
1509                    "type":"fixed[10]"
1510                },
1511                {
1512                    "id":14,
1513                    "name":"list",
1514                    "required": false,
1515                    "type": {
1516                        "type": "list",
1517                        "element-id": 15,
1518                        "element-required": true,
1519                        "element": "int"
1520                    }
1521                },
1522                {
1523                    "id":24,
1524                    "name":"large_list",
1525                    "required": false,
1526                    "type": {
1527                        "type": "list",
1528                        "element-id": 23,
1529                        "element-required": true,
1530                        "element": "string"
1531                    }
1532                },
1533                {
1534                    "id":25,
1535                    "name":"fixed_list",
1536                    "required": false,
1537                    "type": {
1538                        "type": "list",
1539                        "element-id": 26,
1540                        "element-required": true,
1541                        "element": "binary"
1542                    }
1543                },
1544                {
1545                    "id":16,
1546                    "name":"map",
1547                    "required": true,
1548                    "type": {
1549                        "type": "map",
1550                        "key-id": 28,
1551                        "key": "int",
1552                        "value-id": 29,
1553                        "value-required": false,
1554                        "value": "string"
1555                    }
1556                },
1557                {
1558                    "id":17,
1559                    "name":"struct",
1560                    "required": true,
1561                    "type": {
1562                        "type": "struct",
1563                        "fields": [
1564                            {
1565                                "id":18,
1566                                "name":"aa",
1567                                "required":true,
1568                                "type":"int"
1569                            },
1570                            {
1571                                "id":19,
1572                                "name":"bb",
1573                                "required":false,
1574                                "type":"string"
1575                            },
1576                            {
1577                                "id":20,
1578                                "name":"cc",
1579                                "required":true,
1580                                "type":"timestamp"
1581                            }
1582                        ]
1583                    }
1584                },
1585                {
1586                    "id":30,
1587                    "name":"dictionary",
1588                    "required":true,
1589                    "type":"string"
1590                }
1591            ],
1592            "identifier-field-ids":[]
1593        }"#;
1594
1595        let schema: Schema = serde_json::from_str(schema_json).unwrap();
1596        schema
1597    }
1598
1599    #[test]
1600    fn test_arrow_schema_to_schema() {
1601        let arrow_schema = arrow_schema_for_arrow_schema_to_schema_test();
1602        let schema = iceberg_schema_for_arrow_schema_to_schema_test();
1603        let converted_schema = arrow_schema_to_schema(&arrow_schema).unwrap();
1604        pretty_assertions::assert_eq!(converted_schema, schema);
1605    }
1606
1607    fn arrow_schema_for_schema_to_arrow_schema_test() -> ArrowSchema {
1608        let fields = Fields::from(vec![
1609            simple_field("key", DataType::Int32, false, "28"),
1610            simple_field("value", DataType::Utf8, true, "29"),
1611        ]);
1612
1613        let r#struct = DataType::Struct(fields);
1614        let map = DataType::Map(
1615            Arc::new(Field::new(DEFAULT_MAP_FIELD_NAME, r#struct, false)),
1616            false,
1617        );
1618
1619        let fields = Fields::from(vec![
1620            simple_field("aa", DataType::Int32, false, "18"),
1621            simple_field("bb", DataType::Utf8, true, "19"),
1622            simple_field(
1623                "cc",
1624                DataType::Timestamp(TimeUnit::Microsecond, None),
1625                false,
1626                "20",
1627            ),
1628        ]);
1629
1630        let r#struct = DataType::Struct(fields);
1631
1632        ArrowSchema::new(vec![
1633            simple_field("a", DataType::Int32, false, "2"),
1634            simple_field("b", DataType::Int64, false, "1"),
1635            simple_field("c", DataType::Utf8, false, "3"),
1636            simple_field("n", DataType::Utf8, false, "21"),
1637            simple_field(
1638                "d",
1639                DataType::Timestamp(TimeUnit::Microsecond, None),
1640                true,
1641                "4",
1642            ),
1643            simple_field("e", DataType::Boolean, true, "6"),
1644            simple_field("f", DataType::Float32, false, "5"),
1645            simple_field("g", DataType::Float64, false, "7"),
1646            simple_field("p", DataType::Decimal128(10, 2), false, "27"),
1647            simple_field("h", DataType::Date32, false, "8"),
1648            simple_field("i", DataType::Time64(TimeUnit::Microsecond), false, "9"),
1649            simple_field(
1650                "j",
1651                DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1652                false,
1653                "10",
1654            ),
1655            simple_field(
1656                "k",
1657                DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1658                false,
1659                "12",
1660            ),
1661            simple_field("l", DataType::LargeBinary, false, "13"),
1662            simple_field("o", DataType::LargeBinary, false, "22"),
1663            simple_field("m", DataType::FixedSizeBinary(10), false, "11"),
1664            simple_field(
1665                "list",
1666                DataType::List(Arc::new(simple_field(
1667                    "element",
1668                    DataType::Int32,
1669                    false,
1670                    "15",
1671                ))),
1672                true,
1673                "14",
1674            ),
1675            simple_field(
1676                "large_list",
1677                DataType::List(Arc::new(simple_field(
1678                    "element",
1679                    DataType::Utf8,
1680                    false,
1681                    "23",
1682                ))),
1683                true,
1684                "24",
1685            ),
1686            simple_field(
1687                "fixed_list",
1688                DataType::List(Arc::new(simple_field(
1689                    "element",
1690                    DataType::LargeBinary,
1691                    false,
1692                    "26",
1693                ))),
1694                true,
1695                "25",
1696            ),
1697            simple_field("map", map, false, "16"),
1698            simple_field("struct", r#struct, false, "17"),
1699            simple_field("uuid", DataType::FixedSizeBinary(16), false, "30"),
1700        ])
1701    }
1702
1703    fn iceberg_schema_for_schema_to_arrow_schema() -> Schema {
1704        let schema_json = r#"{
1705            "type":"struct",
1706            "schema-id":0,
1707            "fields":[
1708                {
1709                    "id":2,
1710                    "name":"a",
1711                    "required":true,
1712                    "type":"int"
1713                },
1714                {
1715                    "id":1,
1716                    "name":"b",
1717                    "required":true,
1718                    "type":"long"
1719                },
1720                {
1721                    "id":3,
1722                    "name":"c",
1723                    "required":true,
1724                    "type":"string"
1725                },
1726                {
1727                    "id":21,
1728                    "name":"n",
1729                    "required":true,
1730                    "type":"string"
1731                },
1732                {
1733                    "id":4,
1734                    "name":"d",
1735                    "required":false,
1736                    "type":"timestamp"
1737                },
1738                {
1739                    "id":6,
1740                    "name":"e",
1741                    "required":false,
1742                    "type":"boolean"
1743                },
1744                {
1745                    "id":5,
1746                    "name":"f",
1747                    "required":true,
1748                    "type":"float"
1749                },
1750                {
1751                    "id":7,
1752                    "name":"g",
1753                    "required":true,
1754                    "type":"double"
1755                },
1756                {
1757                    "id":27,
1758                    "name":"p",
1759                    "required":true,
1760                    "type":"decimal(10,2)"
1761                },
1762                {
1763                    "id":8,
1764                    "name":"h",
1765                    "required":true,
1766                    "type":"date"
1767                },
1768                {
1769                    "id":9,
1770                    "name":"i",
1771                    "required":true,
1772                    "type":"time"
1773                },
1774                {
1775                    "id":10,
1776                    "name":"j",
1777                    "required":true,
1778                    "type":"timestamptz"
1779                },
1780                {
1781                    "id":12,
1782                    "name":"k",
1783                    "required":true,
1784                    "type":"timestamptz"
1785                },
1786                {
1787                    "id":13,
1788                    "name":"l",
1789                    "required":true,
1790                    "type":"binary"
1791                },
1792                {
1793                    "id":22,
1794                    "name":"o",
1795                    "required":true,
1796                    "type":"binary"
1797                },
1798                {
1799                    "id":11,
1800                    "name":"m",
1801                    "required":true,
1802                    "type":"fixed[10]"
1803                },
1804                {
1805                    "id":14,
1806                    "name":"list",
1807                    "required": false,
1808                    "type": {
1809                        "type": "list",
1810                        "element-id": 15,
1811                        "element-required": true,
1812                        "element": "int"
1813                    }
1814                },
1815                {
1816                    "id":24,
1817                    "name":"large_list",
1818                    "required": false,
1819                    "type": {
1820                        "type": "list",
1821                        "element-id": 23,
1822                        "element-required": true,
1823                        "element": "string"
1824                    }
1825                },
1826                {
1827                    "id":25,
1828                    "name":"fixed_list",
1829                    "required": false,
1830                    "type": {
1831                        "type": "list",
1832                        "element-id": 26,
1833                        "element-required": true,
1834                        "element": "binary"
1835                    }
1836                },
1837                {
1838                    "id":16,
1839                    "name":"map",
1840                    "required": true,
1841                    "type": {
1842                        "type": "map",
1843                        "key-id": 28,
1844                        "key": "int",
1845                        "value-id": 29,
1846                        "value-required": false,
1847                        "value": "string"
1848                    }
1849                },
1850                {
1851                    "id":17,
1852                    "name":"struct",
1853                    "required": true,
1854                    "type": {
1855                        "type": "struct",
1856                        "fields": [
1857                            {
1858                                "id":18,
1859                                "name":"aa",
1860                                "required":true,
1861                                "type":"int"
1862                            },
1863                            {
1864                                "id":19,
1865                                "name":"bb",
1866                                "required":false,
1867                                "type":"string"
1868                            },
1869                            {
1870                                "id":20,
1871                                "name":"cc",
1872                                "required":true,
1873                                "type":"timestamp"
1874                            }
1875                        ]
1876                    }
1877                },
1878                {
1879                    "id":30,
1880                    "name":"uuid",
1881                    "required":true,
1882                    "type":"uuid"
1883                }
1884            ],
1885            "identifier-field-ids":[]
1886        }"#;
1887
1888        let schema: Schema = serde_json::from_str(schema_json).unwrap();
1889        schema
1890    }
1891
1892    #[test]
1893    fn test_schema_to_arrow_schema() {
1894        let arrow_schema = arrow_schema_for_schema_to_arrow_schema_test();
1895        let schema = iceberg_schema_for_schema_to_arrow_schema();
1896        let converted_arrow_schema = schema_to_arrow_schema(&schema).unwrap();
1897        assert_eq!(converted_arrow_schema, arrow_schema);
1898    }
1899
1900    #[test]
1901    fn test_type_conversion() {
1902        // test primitive type
1903        {
1904            let arrow_type = DataType::Int32;
1905            let iceberg_type = Type::Primitive(PrimitiveType::Int);
1906            assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1907            assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1908        }
1909
1910        // test struct type
1911        {
1912            // no metadata will cause error
1913            let arrow_type = DataType::Struct(Fields::from(vec![
1914                Field::new("a", DataType::Int64, false),
1915                Field::new("b", DataType::Utf8, true),
1916            ]));
1917            assert_eq!(
1918                &arrow_type_to_type(&arrow_type).unwrap_err().to_string(),
1919                "DataInvalid => Field id not found in metadata"
1920            );
1921
1922            let arrow_type = DataType::Struct(Fields::from(vec![
1923                Field::new("a", DataType::Int64, false).with_metadata(HashMap::from_iter([(
1924                    PARQUET_FIELD_ID_META_KEY.to_string(),
1925                    1.to_string(),
1926                )])),
1927                Field::new("b", DataType::Utf8, true).with_metadata(HashMap::from_iter([(
1928                    PARQUET_FIELD_ID_META_KEY.to_string(),
1929                    2.to_string(),
1930                )])),
1931            ]));
1932            let iceberg_type = Type::Struct(StructType::new(vec![
1933                NestedField {
1934                    id: 1,
1935                    doc: None,
1936                    name: "a".to_string(),
1937                    required: true,
1938                    field_type: Box::new(Type::Primitive(PrimitiveType::Long)),
1939                    initial_default: None,
1940                    write_default: None,
1941                }
1942                .into(),
1943                NestedField {
1944                    id: 2,
1945                    doc: None,
1946                    name: "b".to_string(),
1947                    required: false,
1948                    field_type: Box::new(Type::Primitive(PrimitiveType::String)),
1949                    initial_default: None,
1950                    write_default: None,
1951                }
1952                .into(),
1953            ]));
1954            assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1955            assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1956
1957            // initial_default and write_default is ignored
1958            let iceberg_type = Type::Struct(StructType::new(vec![
1959                NestedField {
1960                    id: 1,
1961                    doc: None,
1962                    name: "a".to_string(),
1963                    required: true,
1964                    field_type: Box::new(Type::Primitive(PrimitiveType::Long)),
1965                    initial_default: Some(Literal::Primitive(PrimitiveLiteral::Int(114514))),
1966                    write_default: None,
1967                }
1968                .into(),
1969                NestedField {
1970                    id: 2,
1971                    doc: None,
1972                    name: "b".to_string(),
1973                    required: false,
1974                    field_type: Box::new(Type::Primitive(PrimitiveType::String)),
1975                    initial_default: None,
1976                    write_default: Some(Literal::Primitive(PrimitiveLiteral::String(
1977                        "514".to_string(),
1978                    ))),
1979                }
1980                .into(),
1981            ]));
1982            assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1983        }
1984
1985        // test dictionary type
1986        {
1987            let arrow_type =
1988                DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int8));
1989            let iceberg_type = Type::Primitive(PrimitiveType::Int);
1990            assert_eq!(
1991                iceberg_type,
1992                arrow_type_to_type(&arrow_type).unwrap(),
1993                "Expected dictionary conversion to use the contained value"
1994            );
1995
1996            let arrow_type =
1997                DataType::Dictionary(Box::new(DataType::Utf8), Box::new(DataType::Boolean));
1998            let iceberg_type = Type::Primitive(PrimitiveType::Boolean);
1999            assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
2000        }
2001    }
2002
2003    #[test]
2004    fn test_unsigned_integer_type_conversion() {
2005        let test_cases = vec![
2006            (DataType::UInt8, PrimitiveType::Int),
2007            (DataType::UInt16, PrimitiveType::Int),
2008            (DataType::UInt32, PrimitiveType::Long),
2009        ];
2010
2011        for (arrow_type, expected_iceberg_type) in test_cases {
2012            let arrow_field = Field::new("test", arrow_type.clone(), false).with_metadata(
2013                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
2014            );
2015            let arrow_schema = ArrowSchema::new(vec![arrow_field]);
2016
2017            let iceberg_schema = arrow_schema_to_schema(&arrow_schema).unwrap();
2018            let iceberg_field = iceberg_schema.as_struct().fields().first().unwrap();
2019
2020            assert!(
2021                matches!(iceberg_field.field_type.as_ref(), Type::Primitive(t) if *t == expected_iceberg_type),
2022                "Expected {arrow_type:?} to map to {expected_iceberg_type:?}"
2023            );
2024        }
2025
2026        // Test UInt64 blocking
2027        {
2028            let arrow_field = Field::new("test", DataType::UInt64, false).with_metadata(
2029                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
2030            );
2031            let arrow_schema = ArrowSchema::new(vec![arrow_field]);
2032
2033            let result = arrow_schema_to_schema(&arrow_schema);
2034            assert!(result.is_err());
2035            assert!(
2036                result
2037                    .unwrap_err()
2038                    .to_string()
2039                    .contains("UInt64 is not supported")
2040            );
2041        }
2042    }
2043
2044    #[test]
2045    fn test_datum_conversion() {
2046        {
2047            let datum = Datum::bool(true);
2048            let arrow_datum = get_arrow_datum(&datum).unwrap();
2049            let (array, is_scalar) = arrow_datum.get();
2050            let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
2051            assert!(is_scalar);
2052            assert!(array.value(0));
2053        }
2054        {
2055            let datum = Datum::int(42);
2056            let arrow_datum = get_arrow_datum(&datum).unwrap();
2057            let (array, is_scalar) = arrow_datum.get();
2058            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
2059            assert!(is_scalar);
2060            assert_eq!(array.value(0), 42);
2061        }
2062        {
2063            let datum = Datum::long(42);
2064            let arrow_datum = get_arrow_datum(&datum).unwrap();
2065            let (array, is_scalar) = arrow_datum.get();
2066            let array = array.as_any().downcast_ref::<Int64Array>().unwrap();
2067            assert!(is_scalar);
2068            assert_eq!(array.value(0), 42);
2069        }
2070        {
2071            let datum = Datum::float(42.42);
2072            let arrow_datum = get_arrow_datum(&datum).unwrap();
2073            let (array, is_scalar) = arrow_datum.get();
2074            let array = array.as_any().downcast_ref::<Float32Array>().unwrap();
2075            assert!(is_scalar);
2076            assert_eq!(array.value(0), 42.42);
2077        }
2078        {
2079            let datum = Datum::double(42.42);
2080            let arrow_datum = get_arrow_datum(&datum).unwrap();
2081            let (array, is_scalar) = arrow_datum.get();
2082            let array = array.as_any().downcast_ref::<Float64Array>().unwrap();
2083            assert!(is_scalar);
2084            assert_eq!(array.value(0), 42.42);
2085        }
2086        {
2087            let datum = Datum::string("abc");
2088            let arrow_datum = get_arrow_datum(&datum).unwrap();
2089            let (array, is_scalar) = arrow_datum.get();
2090            let array = array.as_any().downcast_ref::<StringArray>().unwrap();
2091            assert!(is_scalar);
2092            assert_eq!(array.value(0), "abc");
2093        }
2094        {
2095            let datum = Datum::binary(vec![1, 2, 3, 4]);
2096            let arrow_datum = get_arrow_datum(&datum).unwrap();
2097            let (array, is_scalar) = arrow_datum.get();
2098            let array = array.as_any().downcast_ref::<BinaryArray>().unwrap();
2099            assert!(is_scalar);
2100            assert_eq!(array.value(0), &[1, 2, 3, 4]);
2101        }
2102        {
2103            let datum = Datum::date(42);
2104            let arrow_datum = get_arrow_datum(&datum).unwrap();
2105            let (array, is_scalar) = arrow_datum.get();
2106            let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
2107            assert!(is_scalar);
2108            assert_eq!(array.value(0), 42);
2109        }
2110        {
2111            let datum = Datum::timestamp_micros(42);
2112            let arrow_datum = get_arrow_datum(&datum).unwrap();
2113            let (array, is_scalar) = arrow_datum.get();
2114            let array = array
2115                .as_any()
2116                .downcast_ref::<TimestampMicrosecondArray>()
2117                .unwrap();
2118            assert!(is_scalar);
2119            assert_eq!(array.value(0), 42);
2120        }
2121        {
2122            let datum = Datum::timestamptz_micros(42);
2123            let arrow_datum = get_arrow_datum(&datum).unwrap();
2124            let (array, is_scalar) = arrow_datum.get();
2125            let array = array
2126                .as_any()
2127                .downcast_ref::<TimestampMicrosecondArray>()
2128                .unwrap();
2129            assert!(is_scalar);
2130            assert_eq!(array.timezone(), Some("+00:00"));
2131            assert_eq!(array.value(0), 42);
2132        }
2133        {
2134            let datum = Datum::decimal_with_precision(decimal_new(123, 2), 30).unwrap();
2135            let arrow_datum = get_arrow_datum(&datum).unwrap();
2136            let (array, is_scalar) = arrow_datum.get();
2137            let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
2138            assert!(is_scalar);
2139            assert_eq!(array.precision(), 30);
2140            assert_eq!(array.scale(), 2);
2141            assert_eq!(array.value(0), 123);
2142        }
2143        {
2144            let datum = Datum::uuid_from_str("42424242-4242-4242-4242-424242424242").unwrap();
2145            let arrow_datum = get_arrow_datum(&datum).unwrap();
2146            let (array, is_scalar) = arrow_datum.get();
2147            let array = array
2148                .as_any()
2149                .downcast_ref::<FixedSizeBinaryArray>()
2150                .unwrap();
2151            assert!(is_scalar);
2152            assert_eq!(array.value(0), [66u8; 16]);
2153        }
2154    }
2155
2156    #[test]
2157    fn test_arrow_schema_to_schema_with_field_id() {
2158        // Create a complex Arrow schema without field ID metadata
2159        // Including: primitives, list, nested struct, map, and nested list of structs
2160        let arrow_schema = ArrowSchema::new(vec![
2161            Field::new("id", DataType::Int64, false),
2162            Field::new("name", DataType::Utf8, true),
2163            Field::new("price", DataType::Decimal128(10, 2), false),
2164            Field::new(
2165                "created_at",
2166                DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2167                true,
2168            ),
2169            Field::new(
2170                "tags",
2171                DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2172                true,
2173            ),
2174            Field::new(
2175                "address",
2176                DataType::Struct(Fields::from(vec![
2177                    Field::new("street", DataType::Utf8, true),
2178                    Field::new("city", DataType::Utf8, false),
2179                    Field::new("zip", DataType::Int32, true),
2180                ])),
2181                true,
2182            ),
2183            Field::new(
2184                "attributes",
2185                DataType::Map(
2186                    Arc::new(Field::new(
2187                        DEFAULT_MAP_FIELD_NAME,
2188                        DataType::Struct(Fields::from(vec![
2189                            Field::new("key", DataType::Utf8, false),
2190                            Field::new("value", DataType::Utf8, true),
2191                        ])),
2192                        false,
2193                    )),
2194                    false,
2195                ),
2196                true,
2197            ),
2198            Field::new(
2199                "orders",
2200                DataType::List(Arc::new(Field::new(
2201                    "element",
2202                    DataType::Struct(Fields::from(vec![
2203                        Field::new("order_id", DataType::Int64, false),
2204                        Field::new("amount", DataType::Float64, false),
2205                    ])),
2206                    true,
2207                ))),
2208                true,
2209            ),
2210        ]);
2211
2212        let schema = arrow_schema_to_schema_auto_assign_ids(&arrow_schema).unwrap();
2213
2214        // Build expected schema with exact field IDs following level-order assignment:
2215        // Level 0: id=1, name=2, price=3, created_at=4, tags=5, address=6, attributes=7, orders=8
2216        // Level 1: tags.element=9, address.{street=10,city=11,zip=12}, attributes.{key=13,value=14}, orders.element=15
2217        // Level 2: orders.element.{order_id=16,amount=17}
2218        let expected = Schema::builder()
2219            .with_fields(vec![
2220                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
2221                NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
2222                NestedField::required(
2223                    3,
2224                    "price",
2225                    Type::Primitive(PrimitiveType::Decimal {
2226                        precision: 10,
2227                        scale: 2,
2228                    }),
2229                )
2230                .into(),
2231                NestedField::optional(4, "created_at", Type::Primitive(PrimitiveType::Timestamptz))
2232                    .into(),
2233                NestedField::optional(
2234                    5,
2235                    "tags",
2236                    Type::List(ListType {
2237                        element_field: NestedField::list_element(
2238                            9,
2239                            Type::Primitive(PrimitiveType::String),
2240                            false,
2241                        )
2242                        .into(),
2243                    }),
2244                )
2245                .into(),
2246                NestedField::optional(
2247                    6,
2248                    "address",
2249                    Type::Struct(StructType::new(vec![
2250                        NestedField::optional(10, "street", Type::Primitive(PrimitiveType::String))
2251                            .into(),
2252                        NestedField::required(11, "city", Type::Primitive(PrimitiveType::String))
2253                            .into(),
2254                        NestedField::optional(12, "zip", Type::Primitive(PrimitiveType::Int))
2255                            .into(),
2256                    ])),
2257                )
2258                .into(),
2259                NestedField::optional(
2260                    7,
2261                    "attributes",
2262                    Type::Map(MapType {
2263                        key_field: NestedField::map_key_element(
2264                            13,
2265                            Type::Primitive(PrimitiveType::String),
2266                        )
2267                        .into(),
2268                        value_field: NestedField::map_value_element(
2269                            14,
2270                            Type::Primitive(PrimitiveType::String),
2271                            false,
2272                        )
2273                        .into(),
2274                    }),
2275                )
2276                .into(),
2277                NestedField::optional(
2278                    8,
2279                    "orders",
2280                    Type::List(ListType {
2281                        element_field: NestedField::list_element(
2282                            15,
2283                            Type::Struct(StructType::new(vec![
2284                                NestedField::required(
2285                                    16,
2286                                    "order_id",
2287                                    Type::Primitive(PrimitiveType::Long),
2288                                )
2289                                .into(),
2290                                NestedField::required(
2291                                    17,
2292                                    "amount",
2293                                    Type::Primitive(PrimitiveType::Double),
2294                                )
2295                                .into(),
2296                            ])),
2297                            false,
2298                        )
2299                        .into(),
2300                    }),
2301                )
2302                .into(),
2303            ])
2304            .build()
2305            .unwrap();
2306
2307        pretty_assertions::assert_eq!(schema, expected);
2308        assert_eq!(schema.highest_field_id(), 17);
2309    }
2310}