iceberg/arrow/
schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Conversion between Arrow schema and Iceberg schema.
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use arrow_array::types::{Decimal128Type, validate_decimal_precision_and_scale};
24use arrow_array::{
25    BinaryArray, BooleanArray, Date32Array, Datum as ArrowDatum, Decimal128Array,
26    FixedSizeBinaryArray, Float32Array, Float64Array, Int32Array, Int64Array, Scalar, StringArray,
27    TimestampMicrosecondArray,
28};
29use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
30use num_bigint::BigInt;
31use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
32use parquet::file::statistics::Statistics;
33use rust_decimal::prelude::ToPrimitive;
34use uuid::Uuid;
35
36use crate::error::Result;
37use crate::spec::{
38    Datum, FIRST_FIELD_ID, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral,
39    PrimitiveType, Schema, SchemaVisitor, StructType, Type,
40};
41use crate::{Error, ErrorKind};
42
43/// When iceberg map type convert to Arrow map type, the default map field name is "key_value".
44pub const DEFAULT_MAP_FIELD_NAME: &str = "key_value";
45/// UTC time zone for Arrow timestamp type.
46pub const UTC_TIME_ZONE: &str = "+00:00";
47
48/// A post order arrow schema visitor.
49///
50/// For order of methods called, please refer to [`visit_schema`].
51pub trait ArrowSchemaVisitor {
52    /// Return type of this visitor on arrow field.
53    type T;
54
55    /// Return type of this visitor on arrow schema.
56    type U;
57
58    /// Called before struct/list/map field.
59    fn before_field(&mut self, _field: &Field) -> Result<()> {
60        Ok(())
61    }
62
63    /// Called after struct/list/map field.
64    fn after_field(&mut self, _field: &Field) -> Result<()> {
65        Ok(())
66    }
67
68    /// Called before list element.
69    fn before_list_element(&mut self, _field: &Field) -> Result<()> {
70        Ok(())
71    }
72
73    /// Called after list element.
74    fn after_list_element(&mut self, _field: &Field) -> Result<()> {
75        Ok(())
76    }
77
78    /// Called before map key.
79    fn before_map_key(&mut self, _field: &Field) -> Result<()> {
80        Ok(())
81    }
82
83    /// Called after map key.
84    fn after_map_key(&mut self, _field: &Field) -> Result<()> {
85        Ok(())
86    }
87
88    /// Called before map value.
89    fn before_map_value(&mut self, _field: &Field) -> Result<()> {
90        Ok(())
91    }
92
93    /// Called after map value.
94    fn after_map_value(&mut self, _field: &Field) -> Result<()> {
95        Ok(())
96    }
97
98    /// Called after schema's type visited.
99    fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U>;
100
101    /// Called after struct's fields visited.
102    fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> Result<Self::T>;
103
104    /// Called after list fields visited.
105    fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T>;
106
107    /// Called after map's key and value fields visited.
108    fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result<Self::T>;
109
110    /// Called when see a primitive type.
111    fn primitive(&mut self, p: &DataType) -> Result<Self::T>;
112}
113
114/// Visiting a type in post order.
115fn visit_type<V: ArrowSchemaVisitor>(r#type: &DataType, visitor: &mut V) -> Result<V::T> {
116    match r#type {
117        p if p.is_primitive()
118            || matches!(
119                p,
120                DataType::Boolean
121                    | DataType::Utf8
122                    | DataType::LargeUtf8
123                    | DataType::Utf8View
124                    | DataType::Binary
125                    | DataType::LargeBinary
126                    | DataType::BinaryView
127                    | DataType::FixedSizeBinary(_)
128            ) =>
129        {
130            visitor.primitive(p)
131        }
132        DataType::List(element_field) => visit_list(r#type, element_field, visitor),
133        DataType::LargeList(element_field) => visit_list(r#type, element_field, visitor),
134        DataType::FixedSizeList(element_field, _) => visit_list(r#type, element_field, visitor),
135        DataType::Map(field, _) => match field.data_type() {
136            DataType::Struct(fields) => {
137                if fields.len() != 2 {
138                    return Err(Error::new(
139                        ErrorKind::DataInvalid,
140                        "Map field must have exactly 2 fields",
141                    ));
142                }
143
144                let key_field = &fields[0];
145                let value_field = &fields[1];
146
147                let key_result = {
148                    visitor.before_map_key(key_field)?;
149                    let ret = visit_type(key_field.data_type(), visitor)?;
150                    visitor.after_map_key(key_field)?;
151                    ret
152                };
153
154                let value_result = {
155                    visitor.before_map_value(value_field)?;
156                    let ret = visit_type(value_field.data_type(), visitor)?;
157                    visitor.after_map_value(value_field)?;
158                    ret
159                };
160
161                visitor.map(r#type, key_result, value_result)
162            }
163            _ => Err(Error::new(
164                ErrorKind::DataInvalid,
165                "Map field must have struct type",
166            )),
167        },
168        DataType::Struct(fields) => visit_struct(fields, visitor),
169        DataType::Dictionary(_key_type, value_type) => visit_type(value_type, visitor),
170        other => Err(Error::new(
171            ErrorKind::DataInvalid,
172            format!("Cannot visit Arrow data type: {other}"),
173        )),
174    }
175}
176
177/// Visit list types in post order.
178fn visit_list<V: ArrowSchemaVisitor>(
179    data_type: &DataType,
180    element_field: &Field,
181    visitor: &mut V,
182) -> Result<V::T> {
183    visitor.before_list_element(element_field)?;
184    let value = visit_type(element_field.data_type(), visitor)?;
185    visitor.after_list_element(element_field)?;
186    visitor.list(data_type, value)
187}
188
189/// Visit struct type in post order.
190fn visit_struct<V: ArrowSchemaVisitor>(fields: &Fields, visitor: &mut V) -> Result<V::T> {
191    let mut results = Vec::with_capacity(fields.len());
192    for field in fields {
193        visitor.before_field(field)?;
194        let result = visit_type(field.data_type(), visitor)?;
195        visitor.after_field(field)?;
196        results.push(result);
197    }
198
199    visitor.r#struct(fields, results)
200}
201
202/// Visit schema in post order.
203fn visit_schema<V: ArrowSchemaVisitor>(schema: &ArrowSchema, visitor: &mut V) -> Result<V::U> {
204    let mut results = Vec::with_capacity(schema.fields().len());
205    for field in schema.fields() {
206        visitor.before_field(field)?;
207        let result = visit_type(field.data_type(), visitor)?;
208        visitor.after_field(field)?;
209        results.push(result);
210    }
211    visitor.schema(schema, results)
212}
213
214/// Convert Arrow schema to Iceberg schema.
215///
216/// Iceberg schema fields require a unique field id, and this function assumes that each field
217/// in the provided Arrow schema contains a field id in its metadata. If the metadata is missing
218/// or the field id is not set, the conversion will fail
219pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result<Schema> {
220    let mut visitor = ArrowSchemaConverter::new();
221    visit_schema(schema, &mut visitor)
222}
223
224/// Convert Arrow schema to Iceberg schema with automatically assigned field IDs.
225///
226/// Unlike [`arrow_schema_to_schema`], this function does not require field IDs in the Arrow
227/// schema metadata. Instead, it automatically assigns unique field IDs starting from 1,
228/// following Iceberg's field ID assignment rules.
229///
230/// This is useful when converting Arrow schemas that don't originate from Iceberg tables,
231/// such as schemas from DataFusion or other Arrow-based systems.
232pub fn arrow_schema_to_schema_auto_assign_ids(schema: &ArrowSchema) -> Result<Schema> {
233    let mut visitor = ArrowSchemaConverter::new_with_field_ids_from(FIRST_FIELD_ID);
234    visit_schema(schema, &mut visitor)
235}
236
237/// Convert Arrow type to iceberg type.
238pub fn arrow_type_to_type(ty: &DataType) -> Result<Type> {
239    let mut visitor = ArrowSchemaConverter::new();
240    visit_type(ty, &mut visitor)
241}
242
243const ARROW_FIELD_DOC_KEY: &str = "doc";
244
245pub(super) fn get_field_id_from_metadata(field: &Field) -> Result<i32> {
246    if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
247        return value.parse::<i32>().map_err(|e| {
248            Error::new(
249                ErrorKind::DataInvalid,
250                "Failed to parse field id".to_string(),
251            )
252            .with_context("value", value)
253            .with_source(e)
254        });
255    }
256    Err(Error::new(
257        ErrorKind::DataInvalid,
258        "Field id not found in metadata",
259    ))
260}
261
262fn get_field_doc(field: &Field) -> Option<String> {
263    if let Some(value) = field.metadata().get(ARROW_FIELD_DOC_KEY) {
264        return Some(value.clone());
265    }
266    None
267}
268
269struct ArrowSchemaConverter {
270    /// When set, the schema builder will reassign field IDs starting from this value
271    /// using level-order traversal (breadth-first).
272    reassign_field_ids_from: Option<i32>,
273    /// Generates unique placeholder IDs for fields before reassignment.
274    /// Required because `ReassignFieldIds` builds an old-to-new ID mapping
275    /// that expects unique input IDs.
276    next_field_id: i32,
277}
278
279impl ArrowSchemaConverter {
280    fn new() -> Self {
281        Self {
282            reassign_field_ids_from: None,
283            next_field_id: 0,
284        }
285    }
286
287    fn new_with_field_ids_from(start_from: i32) -> Self {
288        Self {
289            reassign_field_ids_from: Some(start_from),
290            next_field_id: 0,
291        }
292    }
293
294    fn get_field_id(&mut self, field: &Field) -> Result<i32> {
295        if self.reassign_field_ids_from.is_some() {
296            // Field IDs will be reassigned by the schema builder.
297            // We need unique temporary IDs because ReassignFieldIds builds an
298            // old->new ID mapping that requires unique input IDs.
299            let temp_id = self.next_field_id;
300            self.next_field_id += 1;
301            Ok(temp_id)
302        } else {
303            // Get field ID from arrow field metadata
304            get_field_id_from_metadata(field)
305        }
306    }
307
308    fn convert_fields(
309        &mut self,
310        fields: &Fields,
311        field_results: &[Type],
312    ) -> Result<Vec<NestedFieldRef>> {
313        let mut results = Vec::with_capacity(fields.len());
314        for i in 0..fields.len() {
315            let field = &fields[i];
316            let field_type = &field_results[i];
317            let id = self.get_field_id(field)?;
318            let doc = get_field_doc(field);
319            let nested_field = NestedField {
320                id,
321                doc,
322                name: field.name().clone(),
323                required: !field.is_nullable(),
324                field_type: Box::new(field_type.clone()),
325                initial_default: None,
326                write_default: None,
327            };
328            results.push(Arc::new(nested_field));
329        }
330        Ok(results)
331    }
332}
333
334impl ArrowSchemaVisitor for ArrowSchemaConverter {
335    type T = Type;
336    type U = Schema;
337
338    fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U> {
339        let fields = self.convert_fields(schema.fields(), &values)?;
340        let mut builder = Schema::builder().with_fields(fields);
341        if let Some(start_from) = self.reassign_field_ids_from {
342            builder = builder.with_reassigned_field_ids(start_from)
343        }
344        builder.build()
345    }
346
347    fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> Result<Self::T> {
348        let fields = self.convert_fields(fields, &results)?;
349        Ok(Type::Struct(StructType::new(fields)))
350    }
351
352    fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T> {
353        let element_field = match list {
354            DataType::List(element_field) => element_field,
355            DataType::LargeList(element_field) => element_field,
356            DataType::FixedSizeList(element_field, _) => element_field,
357            _ => {
358                return Err(Error::new(
359                    ErrorKind::DataInvalid,
360                    "List type must have list data type",
361                ));
362            }
363        };
364
365        let id = self.get_field_id(element_field)?;
366        let doc = get_field_doc(element_field);
367        let mut element_field =
368            NestedField::list_element(id, value.clone(), !element_field.is_nullable());
369        if let Some(doc) = doc {
370            element_field = element_field.with_doc(doc);
371        }
372        let element_field = Arc::new(element_field);
373        Ok(Type::List(ListType { element_field }))
374    }
375
376    fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result<Self::T> {
377        match map {
378            DataType::Map(field, _) => match field.data_type() {
379                DataType::Struct(fields) => {
380                    if fields.len() != 2 {
381                        return Err(Error::new(
382                            ErrorKind::DataInvalid,
383                            "Map field must have exactly 2 fields",
384                        ));
385                    }
386
387                    let key_field = &fields[0];
388                    let value_field = &fields[1];
389
390                    let key_id = self.get_field_id(key_field)?;
391                    let key_doc = get_field_doc(key_field);
392                    let mut key_field = NestedField::map_key_element(key_id, key_value.clone());
393                    if let Some(doc) = key_doc {
394                        key_field = key_field.with_doc(doc);
395                    }
396                    let key_field = Arc::new(key_field);
397
398                    let value_id = self.get_field_id(value_field)?;
399                    let value_doc = get_field_doc(value_field);
400                    let mut value_field = NestedField::map_value_element(
401                        value_id,
402                        value.clone(),
403                        !value_field.is_nullable(),
404                    );
405                    if let Some(doc) = value_doc {
406                        value_field = value_field.with_doc(doc);
407                    }
408                    let value_field = Arc::new(value_field);
409
410                    Ok(Type::Map(MapType {
411                        key_field,
412                        value_field,
413                    }))
414                }
415                _ => Err(Error::new(
416                    ErrorKind::DataInvalid,
417                    "Map field must have struct type",
418                )),
419            },
420            _ => Err(Error::new(
421                ErrorKind::DataInvalid,
422                "Map type must have map data type",
423            )),
424        }
425    }
426
427    fn primitive(&mut self, p: &DataType) -> Result<Self::T> {
428        match p {
429            DataType::Boolean => Ok(Type::Primitive(PrimitiveType::Boolean)),
430            DataType::Int8 | DataType::Int16 | DataType::Int32 => {
431                Ok(Type::Primitive(PrimitiveType::Int))
432            }
433            DataType::UInt8 | DataType::UInt16 => Ok(Type::Primitive(PrimitiveType::Int)),
434            DataType::UInt32 => Ok(Type::Primitive(PrimitiveType::Long)),
435            DataType::Int64 => Ok(Type::Primitive(PrimitiveType::Long)),
436            DataType::UInt64 => {
437                // Block uint64 - no safe casting option
438                Err(Error::new(
439                    ErrorKind::DataInvalid,
440                    "UInt64 is not supported. Use Int64 for values ≤ 9,223,372,036,854,775,807 or Decimal(20,0) for full uint64 range.",
441                ))
442            }
443            DataType::Float32 => Ok(Type::Primitive(PrimitiveType::Float)),
444            DataType::Float64 => Ok(Type::Primitive(PrimitiveType::Double)),
445            DataType::Decimal128(p, s) => Type::decimal(*p as u32, *s as u32).map_err(|e| {
446                Error::new(
447                    ErrorKind::DataInvalid,
448                    "Failed to create decimal type".to_string(),
449                )
450                .with_source(e)
451            }),
452            DataType::Date32 => Ok(Type::Primitive(PrimitiveType::Date)),
453            DataType::Time64(unit) if unit == &TimeUnit::Microsecond => {
454                Ok(Type::Primitive(PrimitiveType::Time))
455            }
456            DataType::Timestamp(unit, None) if unit == &TimeUnit::Microsecond => {
457                Ok(Type::Primitive(PrimitiveType::Timestamp))
458            }
459            DataType::Timestamp(unit, None) if unit == &TimeUnit::Nanosecond => {
460                Ok(Type::Primitive(PrimitiveType::TimestampNs))
461            }
462            DataType::Timestamp(unit, Some(zone))
463                if unit == &TimeUnit::Microsecond
464                    && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") =>
465            {
466                Ok(Type::Primitive(PrimitiveType::Timestamptz))
467            }
468            DataType::Timestamp(unit, Some(zone))
469                if unit == &TimeUnit::Nanosecond
470                    && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") =>
471            {
472                Ok(Type::Primitive(PrimitiveType::TimestamptzNs))
473            }
474            DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
475                Ok(Type::Primitive(PrimitiveType::Binary))
476            }
477            DataType::FixedSizeBinary(width) => {
478                Ok(Type::Primitive(PrimitiveType::Fixed(*width as u64)))
479            }
480            DataType::Utf8View | DataType::Utf8 | DataType::LargeUtf8 => {
481                Ok(Type::Primitive(PrimitiveType::String))
482            }
483            _ => Err(Error::new(
484                ErrorKind::DataInvalid,
485                format!("Unsupported Arrow data type: {p}"),
486            )),
487        }
488    }
489}
490
491struct ToArrowSchemaConverter;
492
493enum ArrowSchemaOrFieldOrType {
494    Schema(ArrowSchema),
495    Field(Field),
496    Type(DataType),
497}
498
499impl SchemaVisitor for ToArrowSchemaConverter {
500    type T = ArrowSchemaOrFieldOrType;
501
502    fn schema(
503        &mut self,
504        _schema: &crate::spec::Schema,
505        value: ArrowSchemaOrFieldOrType,
506    ) -> crate::Result<ArrowSchemaOrFieldOrType> {
507        let struct_type = match value {
508            ArrowSchemaOrFieldOrType::Type(DataType::Struct(fields)) => fields,
509            _ => unreachable!(),
510        };
511        Ok(ArrowSchemaOrFieldOrType::Schema(ArrowSchema::new(
512            struct_type,
513        )))
514    }
515
516    fn field(
517        &mut self,
518        field: &crate::spec::NestedFieldRef,
519        value: ArrowSchemaOrFieldOrType,
520    ) -> crate::Result<ArrowSchemaOrFieldOrType> {
521        let ty = match value {
522            ArrowSchemaOrFieldOrType::Type(ty) => ty,
523            _ => unreachable!(),
524        };
525        let metadata = if let Some(doc) = &field.doc {
526            HashMap::from([
527                (PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string()),
528                (ARROW_FIELD_DOC_KEY.to_string(), doc.clone()),
529            ])
530        } else {
531            HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string())])
532        };
533        Ok(ArrowSchemaOrFieldOrType::Field(
534            Field::new(field.name.clone(), ty, !field.required).with_metadata(metadata),
535        ))
536    }
537
538    fn r#struct(
539        &mut self,
540        _: &crate::spec::StructType,
541        results: Vec<ArrowSchemaOrFieldOrType>,
542    ) -> crate::Result<ArrowSchemaOrFieldOrType> {
543        let fields = results
544            .into_iter()
545            .map(|result| match result {
546                ArrowSchemaOrFieldOrType::Field(field) => field,
547                _ => unreachable!(),
548            })
549            .collect();
550        Ok(ArrowSchemaOrFieldOrType::Type(DataType::Struct(fields)))
551    }
552
553    fn list(
554        &mut self,
555        list: &crate::spec::ListType,
556        value: ArrowSchemaOrFieldOrType,
557    ) -> crate::Result<Self::T> {
558        let field = match self.field(&list.element_field, value)? {
559            ArrowSchemaOrFieldOrType::Field(field) => field,
560            _ => unreachable!(),
561        };
562        let meta = if let Some(doc) = &list.element_field.doc {
563            HashMap::from([
564                (
565                    PARQUET_FIELD_ID_META_KEY.to_string(),
566                    list.element_field.id.to_string(),
567                ),
568                (ARROW_FIELD_DOC_KEY.to_string(), doc.clone()),
569            ])
570        } else {
571            HashMap::from([(
572                PARQUET_FIELD_ID_META_KEY.to_string(),
573                list.element_field.id.to_string(),
574            )])
575        };
576        let field = field.with_metadata(meta);
577        Ok(ArrowSchemaOrFieldOrType::Type(DataType::List(Arc::new(
578            field,
579        ))))
580    }
581
582    fn map(
583        &mut self,
584        map: &crate::spec::MapType,
585        key_value: ArrowSchemaOrFieldOrType,
586        value: ArrowSchemaOrFieldOrType,
587    ) -> crate::Result<ArrowSchemaOrFieldOrType> {
588        let key_field = match self.field(&map.key_field, key_value)? {
589            ArrowSchemaOrFieldOrType::Field(field) => field,
590            _ => unreachable!(),
591        };
592        let value_field = match self.field(&map.value_field, value)? {
593            ArrowSchemaOrFieldOrType::Field(field) => field,
594            _ => unreachable!(),
595        };
596        let field = Field::new(
597            DEFAULT_MAP_FIELD_NAME,
598            DataType::Struct(vec![key_field, value_field].into()),
599            // Map field is always not nullable
600            false,
601        );
602
603        Ok(ArrowSchemaOrFieldOrType::Type(DataType::Map(
604            field.into(),
605            false,
606        )))
607    }
608
609    fn primitive(
610        &mut self,
611        p: &crate::spec::PrimitiveType,
612    ) -> crate::Result<ArrowSchemaOrFieldOrType> {
613        match p {
614            crate::spec::PrimitiveType::Boolean => {
615                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Boolean))
616            }
617            crate::spec::PrimitiveType::Int => Ok(ArrowSchemaOrFieldOrType::Type(DataType::Int32)),
618            crate::spec::PrimitiveType::Long => Ok(ArrowSchemaOrFieldOrType::Type(DataType::Int64)),
619            crate::spec::PrimitiveType::Float => {
620                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Float32))
621            }
622            crate::spec::PrimitiveType::Double => {
623                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Float64))
624            }
625            crate::spec::PrimitiveType::Decimal { precision, scale } => {
626                let (precision, scale) = {
627                    let precision: u8 = precision.to_owned().try_into().map_err(|err| {
628                        Error::new(
629                            crate::ErrorKind::DataInvalid,
630                            "incompatible precision for decimal type convert",
631                        )
632                        .with_source(err)
633                    })?;
634                    let scale = scale.to_owned().try_into().map_err(|err| {
635                        Error::new(
636                            crate::ErrorKind::DataInvalid,
637                            "incompatible scale for decimal type convert",
638                        )
639                        .with_source(err)
640                    })?;
641                    (precision, scale)
642                };
643                validate_decimal_precision_and_scale::<Decimal128Type>(precision, scale).map_err(
644                    |err| {
645                        Error::new(
646                            crate::ErrorKind::DataInvalid,
647                            "incompatible precision and scale for decimal type convert",
648                        )
649                        .with_source(err)
650                    },
651                )?;
652                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Decimal128(
653                    precision, scale,
654                )))
655            }
656            crate::spec::PrimitiveType::Date => {
657                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Date32))
658            }
659            crate::spec::PrimitiveType::Time => Ok(ArrowSchemaOrFieldOrType::Type(
660                DataType::Time64(TimeUnit::Microsecond),
661            )),
662            crate::spec::PrimitiveType::Timestamp => Ok(ArrowSchemaOrFieldOrType::Type(
663                DataType::Timestamp(TimeUnit::Microsecond, None),
664            )),
665            crate::spec::PrimitiveType::Timestamptz => Ok(ArrowSchemaOrFieldOrType::Type(
666                // Timestampz always stored as UTC
667                DataType::Timestamp(TimeUnit::Microsecond, Some(UTC_TIME_ZONE.into())),
668            )),
669            crate::spec::PrimitiveType::TimestampNs => Ok(ArrowSchemaOrFieldOrType::Type(
670                DataType::Timestamp(TimeUnit::Nanosecond, None),
671            )),
672            crate::spec::PrimitiveType::TimestamptzNs => Ok(ArrowSchemaOrFieldOrType::Type(
673                // Store timestamptz_ns as UTC
674                DataType::Timestamp(TimeUnit::Nanosecond, Some(UTC_TIME_ZONE.into())),
675            )),
676            crate::spec::PrimitiveType::String => {
677                Ok(ArrowSchemaOrFieldOrType::Type(DataType::Utf8))
678            }
679            crate::spec::PrimitiveType::Uuid => Ok(ArrowSchemaOrFieldOrType::Type(
680                DataType::FixedSizeBinary(16),
681            )),
682            crate::spec::PrimitiveType::Fixed(len) => Ok(ArrowSchemaOrFieldOrType::Type(
683                len.to_i32()
684                    .map(DataType::FixedSizeBinary)
685                    .unwrap_or(DataType::LargeBinary),
686            )),
687            crate::spec::PrimitiveType::Binary => {
688                Ok(ArrowSchemaOrFieldOrType::Type(DataType::LargeBinary))
689            }
690        }
691    }
692}
693
694/// Convert iceberg schema to an arrow schema.
695pub fn schema_to_arrow_schema(schema: &crate::spec::Schema) -> crate::Result<ArrowSchema> {
696    let mut converter = ToArrowSchemaConverter;
697    match crate::spec::visit_schema(schema, &mut converter)? {
698        ArrowSchemaOrFieldOrType::Schema(schema) => Ok(schema),
699        _ => unreachable!(),
700    }
701}
702
703/// Convert iceberg type to an arrow type.
704pub fn type_to_arrow_type(ty: &crate::spec::Type) -> crate::Result<DataType> {
705    let mut converter = ToArrowSchemaConverter;
706    match crate::spec::visit_type(ty, &mut converter)? {
707        ArrowSchemaOrFieldOrType::Type(ty) => Ok(ty),
708        _ => unreachable!(),
709    }
710}
711
712/// Convert Iceberg Datum to Arrow Datum.
713pub(crate) fn get_arrow_datum(datum: &Datum) -> Result<Arc<dyn ArrowDatum + Send + Sync>> {
714    match (datum.data_type(), datum.literal()) {
715        (PrimitiveType::Boolean, PrimitiveLiteral::Boolean(value)) => {
716            Ok(Arc::new(BooleanArray::new_scalar(*value)))
717        }
718        (PrimitiveType::Int, PrimitiveLiteral::Int(value)) => {
719            Ok(Arc::new(Int32Array::new_scalar(*value)))
720        }
721        (PrimitiveType::Long, PrimitiveLiteral::Long(value)) => {
722            Ok(Arc::new(Int64Array::new_scalar(*value)))
723        }
724        (PrimitiveType::Float, PrimitiveLiteral::Float(value)) => {
725            Ok(Arc::new(Float32Array::new_scalar(value.to_f32().unwrap())))
726        }
727        (PrimitiveType::Double, PrimitiveLiteral::Double(value)) => {
728            Ok(Arc::new(Float64Array::new_scalar(value.to_f64().unwrap())))
729        }
730        (PrimitiveType::String, PrimitiveLiteral::String(value)) => {
731            Ok(Arc::new(StringArray::new_scalar(value.as_str())))
732        }
733        (PrimitiveType::Binary, PrimitiveLiteral::Binary(value)) => {
734            Ok(Arc::new(BinaryArray::new_scalar(value.as_slice())))
735        }
736        (PrimitiveType::Date, PrimitiveLiteral::Int(value)) => {
737            Ok(Arc::new(Date32Array::new_scalar(*value)))
738        }
739        (PrimitiveType::Timestamp, PrimitiveLiteral::Long(value)) => {
740            Ok(Arc::new(TimestampMicrosecondArray::new_scalar(*value)))
741        }
742        (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(value)) => Ok(Arc::new(Scalar::new(
743            TimestampMicrosecondArray::new(vec![*value; 1].into(), None).with_timezone_utc(),
744        ))),
745        (PrimitiveType::Decimal { precision, scale }, PrimitiveLiteral::Int128(value)) => {
746            let array = Decimal128Array::from_value(*value, 1)
747                .with_precision_and_scale(*precision as _, *scale as _)
748                .unwrap();
749            Ok(Arc::new(Scalar::new(array)))
750        }
751        (PrimitiveType::Uuid, PrimitiveLiteral::UInt128(value)) => {
752            let bytes = Uuid::from_u128(*value).into_bytes();
753            let array = FixedSizeBinaryArray::try_from_iter(vec![bytes].into_iter()).unwrap();
754            Ok(Arc::new(Scalar::new(array)))
755        }
756
757        (primitive_type, _) => Err(Error::new(
758            ErrorKind::FeatureUnsupported,
759            format!("Converting datum from type {primitive_type:?} to arrow not supported yet."),
760        )),
761    }
762}
763
764pub(crate) fn get_parquet_stat_min_as_datum(
765    primitive_type: &PrimitiveType,
766    stats: &Statistics,
767) -> Result<Option<Datum>> {
768    Ok(match (primitive_type, stats) {
769        (PrimitiveType::Boolean, Statistics::Boolean(stats)) => {
770            stats.min_opt().map(|val| Datum::bool(*val))
771        }
772        (PrimitiveType::Int, Statistics::Int32(stats)) => {
773            stats.min_opt().map(|val| Datum::int(*val))
774        }
775        (PrimitiveType::Date, Statistics::Int32(stats)) => {
776            stats.min_opt().map(|val| Datum::date(*val))
777        }
778        (PrimitiveType::Long, Statistics::Int64(stats)) => {
779            stats.min_opt().map(|val| Datum::long(*val))
780        }
781        (PrimitiveType::Time, Statistics::Int64(stats)) => {
782            let Some(val) = stats.min_opt() else {
783                return Ok(None);
784            };
785
786            Some(Datum::time_micros(*val)?)
787        }
788        (PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
789            stats.min_opt().map(|val| Datum::timestamp_micros(*val))
790        }
791        (PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
792            stats.min_opt().map(|val| Datum::timestamptz_micros(*val))
793        }
794        (PrimitiveType::TimestampNs, Statistics::Int64(stats)) => {
795            stats.min_opt().map(|val| Datum::timestamp_nanos(*val))
796        }
797        (PrimitiveType::TimestamptzNs, Statistics::Int64(stats)) => {
798            stats.min_opt().map(|val| Datum::timestamptz_nanos(*val))
799        }
800        (PrimitiveType::Float, Statistics::Float(stats)) => {
801            stats.min_opt().map(|val| Datum::float(*val))
802        }
803        (PrimitiveType::Double, Statistics::Double(stats)) => {
804            stats.min_opt().map(|val| Datum::double(*val))
805        }
806        (PrimitiveType::String, Statistics::ByteArray(stats)) => {
807            let Some(val) = stats.min_opt() else {
808                return Ok(None);
809            };
810
811            Some(Datum::string(val.as_utf8()?))
812        }
813        (
814            PrimitiveType::Decimal {
815                precision: _,
816                scale: _,
817            },
818            Statistics::ByteArray(stats),
819        ) => {
820            let Some(bytes) = stats.min_bytes_opt() else {
821                return Ok(None);
822            };
823            Some(Datum::new(
824                primitive_type.clone(),
825                PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
826            ))
827        }
828        (
829            PrimitiveType::Decimal {
830                precision: _,
831                scale: _,
832            },
833            Statistics::FixedLenByteArray(stats),
834        ) => {
835            let Some(bytes) = stats.min_bytes_opt() else {
836                return Ok(None);
837            };
838            let unscaled_value = BigInt::from_signed_bytes_be(bytes);
839            Some(Datum::new(
840                primitive_type.clone(),
841                PrimitiveLiteral::Int128(unscaled_value.to_i128().ok_or_else(|| {
842                    Error::new(
843                        ErrorKind::DataInvalid,
844                        format!("Can't convert bytes to i128: {bytes:?}"),
845                    )
846                })?),
847            ))
848        }
849        (
850            PrimitiveType::Decimal {
851                precision: _,
852                scale: _,
853            },
854            Statistics::Int32(stats),
855        ) => stats.min_opt().map(|val| {
856            Datum::new(
857                primitive_type.clone(),
858                PrimitiveLiteral::Int128(i128::from(*val)),
859            )
860        }),
861
862        (
863            PrimitiveType::Decimal {
864                precision: _,
865                scale: _,
866            },
867            Statistics::Int64(stats),
868        ) => stats.min_opt().map(|val| {
869            Datum::new(
870                primitive_type.clone(),
871                PrimitiveLiteral::Int128(i128::from(*val)),
872            )
873        }),
874        (PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) => {
875            let Some(bytes) = stats.min_bytes_opt() else {
876                return Ok(None);
877            };
878            if bytes.len() != 16 {
879                return Err(Error::new(
880                    ErrorKind::Unexpected,
881                    "Invalid length of uuid bytes.",
882                ));
883            }
884            Some(Datum::uuid(Uuid::from_bytes(
885                bytes[..16].try_into().unwrap(),
886            )))
887        }
888        (PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => {
889            let Some(bytes) = stat.min_bytes_opt() else {
890                return Ok(None);
891            };
892            if bytes.len() != *len as usize {
893                return Err(Error::new(
894                    ErrorKind::Unexpected,
895                    "Invalid length of fixed bytes.",
896                ));
897            }
898            Some(Datum::fixed(bytes.to_vec()))
899        }
900        (PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
901            return Ok(stat
902                .min_bytes_opt()
903                .map(|bytes| Datum::binary(bytes.to_vec())));
904        }
905        _ => {
906            return Ok(None);
907        }
908    })
909}
910
911pub(crate) fn get_parquet_stat_max_as_datum(
912    primitive_type: &PrimitiveType,
913    stats: &Statistics,
914) -> Result<Option<Datum>> {
915    Ok(match (primitive_type, stats) {
916        (PrimitiveType::Boolean, Statistics::Boolean(stats)) => {
917            stats.max_opt().map(|val| Datum::bool(*val))
918        }
919        (PrimitiveType::Int, Statistics::Int32(stats)) => {
920            stats.max_opt().map(|val| Datum::int(*val))
921        }
922        (PrimitiveType::Date, Statistics::Int32(stats)) => {
923            stats.max_opt().map(|val| Datum::date(*val))
924        }
925        (PrimitiveType::Long, Statistics::Int64(stats)) => {
926            stats.max_opt().map(|val| Datum::long(*val))
927        }
928        (PrimitiveType::Time, Statistics::Int64(stats)) => {
929            let Some(val) = stats.max_opt() else {
930                return Ok(None);
931            };
932
933            Some(Datum::time_micros(*val)?)
934        }
935        (PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
936            stats.max_opt().map(|val| Datum::timestamp_micros(*val))
937        }
938        (PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
939            stats.max_opt().map(|val| Datum::timestamptz_micros(*val))
940        }
941        (PrimitiveType::TimestampNs, Statistics::Int64(stats)) => {
942            stats.max_opt().map(|val| Datum::timestamp_nanos(*val))
943        }
944        (PrimitiveType::TimestamptzNs, Statistics::Int64(stats)) => {
945            stats.max_opt().map(|val| Datum::timestamptz_nanos(*val))
946        }
947        (PrimitiveType::Float, Statistics::Float(stats)) => {
948            stats.max_opt().map(|val| Datum::float(*val))
949        }
950        (PrimitiveType::Double, Statistics::Double(stats)) => {
951            stats.max_opt().map(|val| Datum::double(*val))
952        }
953        (PrimitiveType::String, Statistics::ByteArray(stats)) => {
954            let Some(val) = stats.max_opt() else {
955                return Ok(None);
956            };
957
958            Some(Datum::string(val.as_utf8()?))
959        }
960        (
961            PrimitiveType::Decimal {
962                precision: _,
963                scale: _,
964            },
965            Statistics::ByteArray(stats),
966        ) => {
967            let Some(bytes) = stats.max_bytes_opt() else {
968                return Ok(None);
969            };
970            Some(Datum::new(
971                primitive_type.clone(),
972                PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
973            ))
974        }
975        (
976            PrimitiveType::Decimal {
977                precision: _,
978                scale: _,
979            },
980            Statistics::FixedLenByteArray(stats),
981        ) => {
982            let Some(bytes) = stats.max_bytes_opt() else {
983                return Ok(None);
984            };
985            let unscaled_value = BigInt::from_signed_bytes_be(bytes);
986            Some(Datum::new(
987                primitive_type.clone(),
988                PrimitiveLiteral::Int128(unscaled_value.to_i128().ok_or_else(|| {
989                    Error::new(
990                        ErrorKind::DataInvalid,
991                        format!("Can't convert bytes to i128: {bytes:?}"),
992                    )
993                })?),
994            ))
995        }
996        (
997            PrimitiveType::Decimal {
998                precision: _,
999                scale: _,
1000            },
1001            Statistics::Int32(stats),
1002        ) => stats.max_opt().map(|val| {
1003            Datum::new(
1004                primitive_type.clone(),
1005                PrimitiveLiteral::Int128(i128::from(*val)),
1006            )
1007        }),
1008
1009        (
1010            PrimitiveType::Decimal {
1011                precision: _,
1012                scale: _,
1013            },
1014            Statistics::Int64(stats),
1015        ) => stats.max_opt().map(|val| {
1016            Datum::new(
1017                primitive_type.clone(),
1018                PrimitiveLiteral::Int128(i128::from(*val)),
1019            )
1020        }),
1021        (PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) => {
1022            let Some(bytes) = stats.max_bytes_opt() else {
1023                return Ok(None);
1024            };
1025            if bytes.len() != 16 {
1026                return Err(Error::new(
1027                    ErrorKind::Unexpected,
1028                    "Invalid length of uuid bytes.",
1029                ));
1030            }
1031            Some(Datum::uuid(Uuid::from_bytes(
1032                bytes[..16].try_into().unwrap(),
1033            )))
1034        }
1035        (PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => {
1036            let Some(bytes) = stat.max_bytes_opt() else {
1037                return Ok(None);
1038            };
1039            if bytes.len() != *len as usize {
1040                return Err(Error::new(
1041                    ErrorKind::Unexpected,
1042                    "Invalid length of fixed bytes.",
1043                ));
1044            }
1045            Some(Datum::fixed(bytes.to_vec()))
1046        }
1047        (PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
1048            return Ok(stat
1049                .max_bytes_opt()
1050                .map(|bytes| Datum::binary(bytes.to_vec())));
1051        }
1052        _ => {
1053            return Ok(None);
1054        }
1055    })
1056}
1057
1058impl TryFrom<&ArrowSchema> for crate::spec::Schema {
1059    type Error = Error;
1060
1061    fn try_from(schema: &ArrowSchema) -> crate::Result<Self> {
1062        arrow_schema_to_schema(schema)
1063    }
1064}
1065
1066impl TryFrom<&crate::spec::Schema> for ArrowSchema {
1067    type Error = Error;
1068
1069    fn try_from(schema: &crate::spec::Schema) -> crate::Result<Self> {
1070        schema_to_arrow_schema(schema)
1071    }
1072}
1073
1074/// Converts a Datum (Iceberg type + primitive literal) to its corresponding Arrow DataType
1075/// with Run-End Encoding (REE).
1076///
1077/// This function is used for constant fields in record batches, where all values are the same.
1078/// Run-End Encoding provides efficient storage for such constant columns.
1079///
1080/// # Arguments
1081/// * `datum` - The Datum to convert, which contains both type and value information
1082///
1083/// # Returns
1084/// Arrow DataType with Run-End Encoding applied
1085///
1086/// # Example
1087/// ```
1088/// use iceberg::arrow::datum_to_arrow_type_with_ree;
1089/// use iceberg::spec::Datum;
1090///
1091/// let datum = Datum::string("test_file.parquet");
1092/// let ree_type = datum_to_arrow_type_with_ree(&datum);
1093/// // Returns: RunEndEncoded(Int32, Utf8)
1094/// ```
1095pub fn datum_to_arrow_type_with_ree(datum: &Datum) -> DataType {
1096    // Helper to create REE type with the given values type.
1097    // Note: values field is nullable as Arrow expects this when building the
1098    // final Arrow schema with `RunArray::try_new`.
1099    let make_ree = |values_type: DataType| -> DataType {
1100        let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false));
1101        let values_field = Arc::new(Field::new("values", values_type, true));
1102        DataType::RunEndEncoded(run_ends_field, values_field)
1103    };
1104
1105    // Match on the PrimitiveType from the Datum to determine the Arrow type
1106    match datum.data_type() {
1107        PrimitiveType::Boolean => make_ree(DataType::Boolean),
1108        PrimitiveType::Int => make_ree(DataType::Int32),
1109        PrimitiveType::Long => make_ree(DataType::Int64),
1110        PrimitiveType::Float => make_ree(DataType::Float32),
1111        PrimitiveType::Double => make_ree(DataType::Float64),
1112        PrimitiveType::Date => make_ree(DataType::Date32),
1113        PrimitiveType::Time => make_ree(DataType::Int64),
1114        PrimitiveType::Timestamp => make_ree(DataType::Int64),
1115        PrimitiveType::Timestamptz => make_ree(DataType::Int64),
1116        PrimitiveType::TimestampNs => make_ree(DataType::Int64),
1117        PrimitiveType::TimestamptzNs => make_ree(DataType::Int64),
1118        PrimitiveType::String => make_ree(DataType::Utf8),
1119        PrimitiveType::Uuid => make_ree(DataType::Binary),
1120        PrimitiveType::Fixed(_) => make_ree(DataType::Binary),
1121        PrimitiveType::Binary => make_ree(DataType::Binary),
1122        PrimitiveType::Decimal { precision, scale } => {
1123            make_ree(DataType::Decimal128(*precision as u8, *scale as i8))
1124        }
1125    }
1126}
1127
1128/// A visitor that strips metadata from an Arrow schema.
1129///
1130/// This visitor recursively removes all metadata from fields at every level of the schema,
1131/// including nested struct, list, and map fields. This is useful for schema comparison
1132/// where metadata differences should be ignored.
1133struct MetadataStripVisitor {
1134    /// Stack to track field information during traversal
1135    field_stack: Vec<Field>,
1136}
1137
1138impl MetadataStripVisitor {
1139    fn new() -> Self {
1140        Self {
1141            field_stack: Vec::new(),
1142        }
1143    }
1144}
1145
1146impl ArrowSchemaVisitor for MetadataStripVisitor {
1147    type T = Field;
1148    type U = ArrowSchema;
1149
1150    fn before_field(&mut self, field: &Field) -> Result<()> {
1151        // Store field name and nullability for later reconstruction
1152        self.field_stack.push(Field::new(
1153            field.name(),
1154            DataType::Null, // Placeholder, will be replaced
1155            field.is_nullable(),
1156        ));
1157        Ok(())
1158    }
1159
1160    fn after_field(&mut self, _field: &Field) -> Result<()> {
1161        Ok(())
1162    }
1163
1164    fn schema(&mut self, _schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U> {
1165        Ok(ArrowSchema::new(values))
1166    }
1167
1168    fn r#struct(&mut self, _fields: &Fields, results: Vec<Self::T>) -> Result<Self::T> {
1169        // Pop the struct field from the stack
1170        let field_info = self
1171            .field_stack
1172            .pop()
1173            .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Field stack underflow in struct"))?;
1174
1175        // Reconstruct struct field without metadata
1176        Ok(Field::new(
1177            field_info.name(),
1178            DataType::Struct(Fields::from(results)),
1179            field_info.is_nullable(),
1180        ))
1181    }
1182
1183    fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T> {
1184        // Pop the list field from the stack
1185        let field_info = self
1186            .field_stack
1187            .pop()
1188            .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Field stack underflow in list"))?;
1189
1190        // Reconstruct list field without metadata
1191        let list_type = match list {
1192            DataType::List(_) => DataType::List(Arc::new(value)),
1193            DataType::LargeList(_) => DataType::LargeList(Arc::new(value)),
1194            DataType::FixedSizeList(_, size) => DataType::FixedSizeList(Arc::new(value), *size),
1195            _ => {
1196                return Err(Error::new(
1197                    ErrorKind::Unexpected,
1198                    format!("Expected list type, got {list}"),
1199                ));
1200            }
1201        };
1202
1203        Ok(Field::new(
1204            field_info.name(),
1205            list_type,
1206            field_info.is_nullable(),
1207        ))
1208    }
1209
1210    fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result<Self::T> {
1211        // Pop the map field from the stack
1212        let field_info = self
1213            .field_stack
1214            .pop()
1215            .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Field stack underflow in map"))?;
1216
1217        // Reconstruct the map's struct field (contains key and value)
1218        let struct_field = Field::new(
1219            DEFAULT_MAP_FIELD_NAME,
1220            DataType::Struct(Fields::from(vec![key_value, value])),
1221            false,
1222        );
1223
1224        // Get the sorted flag from the original map type
1225        let sorted = match map {
1226            DataType::Map(_, sorted) => *sorted,
1227            _ => {
1228                return Err(Error::new(
1229                    ErrorKind::Unexpected,
1230                    format!("Expected map type, got {map}"),
1231                ));
1232            }
1233        };
1234
1235        // Reconstruct map field without metadata
1236        Ok(Field::new(
1237            field_info.name(),
1238            DataType::Map(Arc::new(struct_field), sorted),
1239            field_info.is_nullable(),
1240        ))
1241    }
1242
1243    fn primitive(&mut self, p: &DataType) -> Result<Self::T> {
1244        // Pop the primitive field from the stack
1245        let field_info = self.field_stack.pop().ok_or_else(|| {
1246            Error::new(ErrorKind::Unexpected, "Field stack underflow in primitive")
1247        })?;
1248
1249        // Return field without metadata
1250        Ok(Field::new(
1251            field_info.name(),
1252            p.clone(),
1253            field_info.is_nullable(),
1254        ))
1255    }
1256}
1257
1258/// Strips all metadata from an Arrow schema and its nested fields.
1259///
1260/// This function recursively removes metadata from all fields at every level of the schema,
1261/// including nested struct, list, and map fields. This is useful for schema comparison
1262/// where metadata differences should be ignored.
1263///
1264/// # Arguments
1265/// * `schema` - The Arrow schema to strip metadata from
1266///
1267/// # Returns
1268/// A new Arrow schema with all metadata removed, or an error if the schema structure
1269/// is invalid.
1270///
1271/// # Example
1272/// ```
1273/// use std::collections::HashMap;
1274///
1275/// use arrow_schema::{DataType, Field, Schema as ArrowSchema};
1276/// use iceberg::arrow::strip_metadata_from_schema;
1277///
1278/// let mut metadata = HashMap::new();
1279/// metadata.insert("key".to_string(), "value".to_string());
1280///
1281/// let field = Field::new("col1", DataType::Int32, false).with_metadata(metadata);
1282/// let schema = ArrowSchema::new(vec![field]);
1283///
1284/// let stripped = strip_metadata_from_schema(&schema).unwrap();
1285/// assert!(stripped.field(0).metadata().is_empty());
1286/// ```
1287pub fn strip_metadata_from_schema(schema: &ArrowSchema) -> Result<ArrowSchema> {
1288    let mut visitor = MetadataStripVisitor::new();
1289    visit_schema(schema, &mut visitor)
1290}
1291
1292#[cfg(test)]
1293mod tests {
1294    use std::collections::HashMap;
1295    use std::sync::Arc;
1296
1297    use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1298    use rust_decimal::Decimal;
1299
1300    use super::*;
1301    use crate::spec::{Literal, Schema};
1302
1303    /// Create a simple field with metadata.
1304    fn simple_field(name: &str, ty: DataType, nullable: bool, value: &str) -> Field {
1305        Field::new(name, ty, nullable).with_metadata(HashMap::from([(
1306            PARQUET_FIELD_ID_META_KEY.to_string(),
1307            value.to_string(),
1308        )]))
1309    }
1310
1311    fn arrow_schema_for_arrow_schema_to_schema_test() -> ArrowSchema {
1312        let fields = Fields::from(vec![
1313            simple_field("key", DataType::Int32, false, "28"),
1314            simple_field("value", DataType::Utf8, true, "29"),
1315        ]);
1316
1317        let r#struct = DataType::Struct(fields);
1318        let map = DataType::Map(
1319            Arc::new(simple_field(DEFAULT_MAP_FIELD_NAME, r#struct, false, "17")),
1320            false,
1321        );
1322        let dictionary = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1323
1324        let fields = Fields::from(vec![
1325            simple_field("aa", DataType::Int32, false, "18"),
1326            simple_field("bb", DataType::Utf8, true, "19"),
1327            simple_field(
1328                "cc",
1329                DataType::Timestamp(TimeUnit::Microsecond, None),
1330                false,
1331                "20",
1332            ),
1333        ]);
1334
1335        let r#struct = DataType::Struct(fields);
1336
1337        ArrowSchema::new(vec![
1338            simple_field("a", DataType::Int32, false, "2"),
1339            simple_field("b", DataType::Int64, false, "1"),
1340            simple_field("c", DataType::Utf8, false, "3"),
1341            simple_field("n", DataType::Utf8, false, "21"),
1342            simple_field(
1343                "d",
1344                DataType::Timestamp(TimeUnit::Microsecond, None),
1345                true,
1346                "4",
1347            ),
1348            simple_field("e", DataType::Boolean, true, "6"),
1349            simple_field("f", DataType::Float32, false, "5"),
1350            simple_field("g", DataType::Float64, false, "7"),
1351            simple_field("p", DataType::Decimal128(10, 2), false, "27"),
1352            simple_field("h", DataType::Date32, false, "8"),
1353            simple_field("i", DataType::Time64(TimeUnit::Microsecond), false, "9"),
1354            simple_field(
1355                "j",
1356                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1357                false,
1358                "10",
1359            ),
1360            simple_field(
1361                "k",
1362                DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1363                false,
1364                "12",
1365            ),
1366            simple_field("l", DataType::Binary, false, "13"),
1367            simple_field("o", DataType::LargeBinary, false, "22"),
1368            simple_field("m", DataType::FixedSizeBinary(10), false, "11"),
1369            simple_field(
1370                "list",
1371                DataType::List(Arc::new(simple_field(
1372                    "element",
1373                    DataType::Int32,
1374                    false,
1375                    "15",
1376                ))),
1377                true,
1378                "14",
1379            ),
1380            simple_field(
1381                "large_list",
1382                DataType::LargeList(Arc::new(simple_field(
1383                    "element",
1384                    DataType::Utf8,
1385                    false,
1386                    "23",
1387                ))),
1388                true,
1389                "24",
1390            ),
1391            simple_field(
1392                "fixed_list",
1393                DataType::FixedSizeList(
1394                    Arc::new(simple_field("element", DataType::Binary, false, "26")),
1395                    10,
1396                ),
1397                true,
1398                "25",
1399            ),
1400            simple_field("map", map, false, "16"),
1401            simple_field("struct", r#struct, false, "17"),
1402            simple_field("dictionary", dictionary, false, "30"),
1403        ])
1404    }
1405
1406    fn iceberg_schema_for_arrow_schema_to_schema_test() -> Schema {
1407        let schema_json = r#"{
1408            "type":"struct",
1409            "schema-id":0,
1410            "fields":[
1411                {
1412                    "id":2,
1413                    "name":"a",
1414                    "required":true,
1415                    "type":"int"
1416                },
1417                {
1418                    "id":1,
1419                    "name":"b",
1420                    "required":true,
1421                    "type":"long"
1422                },
1423                {
1424                    "id":3,
1425                    "name":"c",
1426                    "required":true,
1427                    "type":"string"
1428                },
1429                {
1430                    "id":21,
1431                    "name":"n",
1432                    "required":true,
1433                    "type":"string"
1434                },
1435                {
1436                    "id":4,
1437                    "name":"d",
1438                    "required":false,
1439                    "type":"timestamp"
1440                },
1441                {
1442                    "id":6,
1443                    "name":"e",
1444                    "required":false,
1445                    "type":"boolean"
1446                },
1447                {
1448                    "id":5,
1449                    "name":"f",
1450                    "required":true,
1451                    "type":"float"
1452                },
1453                {
1454                    "id":7,
1455                    "name":"g",
1456                    "required":true,
1457                    "type":"double"
1458                },
1459                {
1460                    "id":27,
1461                    "name":"p",
1462                    "required":true,
1463                    "type":"decimal(10,2)"
1464                },
1465                {
1466                    "id":8,
1467                    "name":"h",
1468                    "required":true,
1469                    "type":"date"
1470                },
1471                {
1472                    "id":9,
1473                    "name":"i",
1474                    "required":true,
1475                    "type":"time"
1476                },
1477                {
1478                    "id":10,
1479                    "name":"j",
1480                    "required":true,
1481                    "type":"timestamptz"
1482                },
1483                {
1484                    "id":12,
1485                    "name":"k",
1486                    "required":true,
1487                    "type":"timestamptz"
1488                },
1489                {
1490                    "id":13,
1491                    "name":"l",
1492                    "required":true,
1493                    "type":"binary"
1494                },
1495                {
1496                    "id":22,
1497                    "name":"o",
1498                    "required":true,
1499                    "type":"binary"
1500                },
1501                {
1502                    "id":11,
1503                    "name":"m",
1504                    "required":true,
1505                    "type":"fixed[10]"
1506                },
1507                {
1508                    "id":14,
1509                    "name":"list",
1510                    "required": false,
1511                    "type": {
1512                        "type": "list",
1513                        "element-id": 15,
1514                        "element-required": true,
1515                        "element": "int"
1516                    }
1517                },
1518                {
1519                    "id":24,
1520                    "name":"large_list",
1521                    "required": false,
1522                    "type": {
1523                        "type": "list",
1524                        "element-id": 23,
1525                        "element-required": true,
1526                        "element": "string"
1527                    }
1528                },
1529                {
1530                    "id":25,
1531                    "name":"fixed_list",
1532                    "required": false,
1533                    "type": {
1534                        "type": "list",
1535                        "element-id": 26,
1536                        "element-required": true,
1537                        "element": "binary"
1538                    }
1539                },
1540                {
1541                    "id":16,
1542                    "name":"map",
1543                    "required": true,
1544                    "type": {
1545                        "type": "map",
1546                        "key-id": 28,
1547                        "key": "int",
1548                        "value-id": 29,
1549                        "value-required": false,
1550                        "value": "string"
1551                    }
1552                },
1553                {
1554                    "id":17,
1555                    "name":"struct",
1556                    "required": true,
1557                    "type": {
1558                        "type": "struct",
1559                        "fields": [
1560                            {
1561                                "id":18,
1562                                "name":"aa",
1563                                "required":true,
1564                                "type":"int"
1565                            },
1566                            {
1567                                "id":19,
1568                                "name":"bb",
1569                                "required":false,
1570                                "type":"string"
1571                            },
1572                            {
1573                                "id":20,
1574                                "name":"cc",
1575                                "required":true,
1576                                "type":"timestamp"
1577                            }
1578                        ]
1579                    }
1580                },
1581                {
1582                    "id":30,
1583                    "name":"dictionary",
1584                    "required":true,
1585                    "type":"string"
1586                }
1587            ],
1588            "identifier-field-ids":[]
1589        }"#;
1590
1591        let schema: Schema = serde_json::from_str(schema_json).unwrap();
1592        schema
1593    }
1594
1595    #[test]
1596    fn test_arrow_schema_to_schema() {
1597        let arrow_schema = arrow_schema_for_arrow_schema_to_schema_test();
1598        let schema = iceberg_schema_for_arrow_schema_to_schema_test();
1599        let converted_schema = arrow_schema_to_schema(&arrow_schema).unwrap();
1600        pretty_assertions::assert_eq!(converted_schema, schema);
1601    }
1602
1603    fn arrow_schema_for_schema_to_arrow_schema_test() -> ArrowSchema {
1604        let fields = Fields::from(vec![
1605            simple_field("key", DataType::Int32, false, "28"),
1606            simple_field("value", DataType::Utf8, true, "29"),
1607        ]);
1608
1609        let r#struct = DataType::Struct(fields);
1610        let map = DataType::Map(
1611            Arc::new(Field::new(DEFAULT_MAP_FIELD_NAME, r#struct, false)),
1612            false,
1613        );
1614
1615        let fields = Fields::from(vec![
1616            simple_field("aa", DataType::Int32, false, "18"),
1617            simple_field("bb", DataType::Utf8, true, "19"),
1618            simple_field(
1619                "cc",
1620                DataType::Timestamp(TimeUnit::Microsecond, None),
1621                false,
1622                "20",
1623            ),
1624        ]);
1625
1626        let r#struct = DataType::Struct(fields);
1627
1628        ArrowSchema::new(vec![
1629            simple_field("a", DataType::Int32, false, "2"),
1630            simple_field("b", DataType::Int64, false, "1"),
1631            simple_field("c", DataType::Utf8, false, "3"),
1632            simple_field("n", DataType::Utf8, false, "21"),
1633            simple_field(
1634                "d",
1635                DataType::Timestamp(TimeUnit::Microsecond, None),
1636                true,
1637                "4",
1638            ),
1639            simple_field("e", DataType::Boolean, true, "6"),
1640            simple_field("f", DataType::Float32, false, "5"),
1641            simple_field("g", DataType::Float64, false, "7"),
1642            simple_field("p", DataType::Decimal128(10, 2), false, "27"),
1643            simple_field("h", DataType::Date32, false, "8"),
1644            simple_field("i", DataType::Time64(TimeUnit::Microsecond), false, "9"),
1645            simple_field(
1646                "j",
1647                DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1648                false,
1649                "10",
1650            ),
1651            simple_field(
1652                "k",
1653                DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1654                false,
1655                "12",
1656            ),
1657            simple_field("l", DataType::LargeBinary, false, "13"),
1658            simple_field("o", DataType::LargeBinary, false, "22"),
1659            simple_field("m", DataType::FixedSizeBinary(10), false, "11"),
1660            simple_field(
1661                "list",
1662                DataType::List(Arc::new(simple_field(
1663                    "element",
1664                    DataType::Int32,
1665                    false,
1666                    "15",
1667                ))),
1668                true,
1669                "14",
1670            ),
1671            simple_field(
1672                "large_list",
1673                DataType::List(Arc::new(simple_field(
1674                    "element",
1675                    DataType::Utf8,
1676                    false,
1677                    "23",
1678                ))),
1679                true,
1680                "24",
1681            ),
1682            simple_field(
1683                "fixed_list",
1684                DataType::List(Arc::new(simple_field(
1685                    "element",
1686                    DataType::LargeBinary,
1687                    false,
1688                    "26",
1689                ))),
1690                true,
1691                "25",
1692            ),
1693            simple_field("map", map, false, "16"),
1694            simple_field("struct", r#struct, false, "17"),
1695            simple_field("uuid", DataType::FixedSizeBinary(16), false, "30"),
1696        ])
1697    }
1698
1699    fn iceberg_schema_for_schema_to_arrow_schema() -> Schema {
1700        let schema_json = r#"{
1701            "type":"struct",
1702            "schema-id":0,
1703            "fields":[
1704                {
1705                    "id":2,
1706                    "name":"a",
1707                    "required":true,
1708                    "type":"int"
1709                },
1710                {
1711                    "id":1,
1712                    "name":"b",
1713                    "required":true,
1714                    "type":"long"
1715                },
1716                {
1717                    "id":3,
1718                    "name":"c",
1719                    "required":true,
1720                    "type":"string"
1721                },
1722                {
1723                    "id":21,
1724                    "name":"n",
1725                    "required":true,
1726                    "type":"string"
1727                },
1728                {
1729                    "id":4,
1730                    "name":"d",
1731                    "required":false,
1732                    "type":"timestamp"
1733                },
1734                {
1735                    "id":6,
1736                    "name":"e",
1737                    "required":false,
1738                    "type":"boolean"
1739                },
1740                {
1741                    "id":5,
1742                    "name":"f",
1743                    "required":true,
1744                    "type":"float"
1745                },
1746                {
1747                    "id":7,
1748                    "name":"g",
1749                    "required":true,
1750                    "type":"double"
1751                },
1752                {
1753                    "id":27,
1754                    "name":"p",
1755                    "required":true,
1756                    "type":"decimal(10,2)"
1757                },
1758                {
1759                    "id":8,
1760                    "name":"h",
1761                    "required":true,
1762                    "type":"date"
1763                },
1764                {
1765                    "id":9,
1766                    "name":"i",
1767                    "required":true,
1768                    "type":"time"
1769                },
1770                {
1771                    "id":10,
1772                    "name":"j",
1773                    "required":true,
1774                    "type":"timestamptz"
1775                },
1776                {
1777                    "id":12,
1778                    "name":"k",
1779                    "required":true,
1780                    "type":"timestamptz"
1781                },
1782                {
1783                    "id":13,
1784                    "name":"l",
1785                    "required":true,
1786                    "type":"binary"
1787                },
1788                {
1789                    "id":22,
1790                    "name":"o",
1791                    "required":true,
1792                    "type":"binary"
1793                },
1794                {
1795                    "id":11,
1796                    "name":"m",
1797                    "required":true,
1798                    "type":"fixed[10]"
1799                },
1800                {
1801                    "id":14,
1802                    "name":"list",
1803                    "required": false,
1804                    "type": {
1805                        "type": "list",
1806                        "element-id": 15,
1807                        "element-required": true,
1808                        "element": "int"
1809                    }
1810                },
1811                {
1812                    "id":24,
1813                    "name":"large_list",
1814                    "required": false,
1815                    "type": {
1816                        "type": "list",
1817                        "element-id": 23,
1818                        "element-required": true,
1819                        "element": "string"
1820                    }
1821                },
1822                {
1823                    "id":25,
1824                    "name":"fixed_list",
1825                    "required": false,
1826                    "type": {
1827                        "type": "list",
1828                        "element-id": 26,
1829                        "element-required": true,
1830                        "element": "binary"
1831                    }
1832                },
1833                {
1834                    "id":16,
1835                    "name":"map",
1836                    "required": true,
1837                    "type": {
1838                        "type": "map",
1839                        "key-id": 28,
1840                        "key": "int",
1841                        "value-id": 29,
1842                        "value-required": false,
1843                        "value": "string"
1844                    }
1845                },
1846                {
1847                    "id":17,
1848                    "name":"struct",
1849                    "required": true,
1850                    "type": {
1851                        "type": "struct",
1852                        "fields": [
1853                            {
1854                                "id":18,
1855                                "name":"aa",
1856                                "required":true,
1857                                "type":"int"
1858                            },
1859                            {
1860                                "id":19,
1861                                "name":"bb",
1862                                "required":false,
1863                                "type":"string"
1864                            },
1865                            {
1866                                "id":20,
1867                                "name":"cc",
1868                                "required":true,
1869                                "type":"timestamp"
1870                            }
1871                        ]
1872                    }
1873                },
1874                {
1875                    "id":30,
1876                    "name":"uuid",
1877                    "required":true,
1878                    "type":"uuid"
1879                }
1880            ],
1881            "identifier-field-ids":[]
1882        }"#;
1883
1884        let schema: Schema = serde_json::from_str(schema_json).unwrap();
1885        schema
1886    }
1887
1888    #[test]
1889    fn test_schema_to_arrow_schema() {
1890        let arrow_schema = arrow_schema_for_schema_to_arrow_schema_test();
1891        let schema = iceberg_schema_for_schema_to_arrow_schema();
1892        let converted_arrow_schema = schema_to_arrow_schema(&schema).unwrap();
1893        assert_eq!(converted_arrow_schema, arrow_schema);
1894    }
1895
1896    #[test]
1897    fn test_type_conversion() {
1898        // test primitive type
1899        {
1900            let arrow_type = DataType::Int32;
1901            let iceberg_type = Type::Primitive(PrimitiveType::Int);
1902            assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1903            assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1904        }
1905
1906        // test struct type
1907        {
1908            // no metadata will cause error
1909            let arrow_type = DataType::Struct(Fields::from(vec![
1910                Field::new("a", DataType::Int64, false),
1911                Field::new("b", DataType::Utf8, true),
1912            ]));
1913            assert_eq!(
1914                &arrow_type_to_type(&arrow_type).unwrap_err().to_string(),
1915                "DataInvalid => Field id not found in metadata"
1916            );
1917
1918            let arrow_type = DataType::Struct(Fields::from(vec![
1919                Field::new("a", DataType::Int64, false).with_metadata(HashMap::from_iter([(
1920                    PARQUET_FIELD_ID_META_KEY.to_string(),
1921                    1.to_string(),
1922                )])),
1923                Field::new("b", DataType::Utf8, true).with_metadata(HashMap::from_iter([(
1924                    PARQUET_FIELD_ID_META_KEY.to_string(),
1925                    2.to_string(),
1926                )])),
1927            ]));
1928            let iceberg_type = Type::Struct(StructType::new(vec![
1929                NestedField {
1930                    id: 1,
1931                    doc: None,
1932                    name: "a".to_string(),
1933                    required: true,
1934                    field_type: Box::new(Type::Primitive(PrimitiveType::Long)),
1935                    initial_default: None,
1936                    write_default: None,
1937                }
1938                .into(),
1939                NestedField {
1940                    id: 2,
1941                    doc: None,
1942                    name: "b".to_string(),
1943                    required: false,
1944                    field_type: Box::new(Type::Primitive(PrimitiveType::String)),
1945                    initial_default: None,
1946                    write_default: None,
1947                }
1948                .into(),
1949            ]));
1950            assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1951            assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1952
1953            // initial_default and write_default is ignored
1954            let iceberg_type = Type::Struct(StructType::new(vec![
1955                NestedField {
1956                    id: 1,
1957                    doc: None,
1958                    name: "a".to_string(),
1959                    required: true,
1960                    field_type: Box::new(Type::Primitive(PrimitiveType::Long)),
1961                    initial_default: Some(Literal::Primitive(PrimitiveLiteral::Int(114514))),
1962                    write_default: None,
1963                }
1964                .into(),
1965                NestedField {
1966                    id: 2,
1967                    doc: None,
1968                    name: "b".to_string(),
1969                    required: false,
1970                    field_type: Box::new(Type::Primitive(PrimitiveType::String)),
1971                    initial_default: None,
1972                    write_default: Some(Literal::Primitive(PrimitiveLiteral::String(
1973                        "514".to_string(),
1974                    ))),
1975                }
1976                .into(),
1977            ]));
1978            assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1979        }
1980
1981        // test dictionary type
1982        {
1983            let arrow_type =
1984                DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int8));
1985            let iceberg_type = Type::Primitive(PrimitiveType::Int);
1986            assert_eq!(
1987                iceberg_type,
1988                arrow_type_to_type(&arrow_type).unwrap(),
1989                "Expected dictionary conversion to use the contained value"
1990            );
1991
1992            let arrow_type =
1993                DataType::Dictionary(Box::new(DataType::Utf8), Box::new(DataType::Boolean));
1994            let iceberg_type = Type::Primitive(PrimitiveType::Boolean);
1995            assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1996        }
1997    }
1998
1999    #[test]
2000    fn test_unsigned_integer_type_conversion() {
2001        let test_cases = vec![
2002            (DataType::UInt8, PrimitiveType::Int),
2003            (DataType::UInt16, PrimitiveType::Int),
2004            (DataType::UInt32, PrimitiveType::Long),
2005        ];
2006
2007        for (arrow_type, expected_iceberg_type) in test_cases {
2008            let arrow_field = Field::new("test", arrow_type.clone(), false).with_metadata(
2009                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
2010            );
2011            let arrow_schema = ArrowSchema::new(vec![arrow_field]);
2012
2013            let iceberg_schema = arrow_schema_to_schema(&arrow_schema).unwrap();
2014            let iceberg_field = iceberg_schema.as_struct().fields().first().unwrap();
2015
2016            assert!(
2017                matches!(iceberg_field.field_type.as_ref(), Type::Primitive(t) if *t == expected_iceberg_type),
2018                "Expected {arrow_type:?} to map to {expected_iceberg_type:?}"
2019            );
2020        }
2021
2022        // Test UInt64 blocking
2023        {
2024            let arrow_field = Field::new("test", DataType::UInt64, false).with_metadata(
2025                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
2026            );
2027            let arrow_schema = ArrowSchema::new(vec![arrow_field]);
2028
2029            let result = arrow_schema_to_schema(&arrow_schema);
2030            assert!(result.is_err());
2031            assert!(
2032                result
2033                    .unwrap_err()
2034                    .to_string()
2035                    .contains("UInt64 is not supported")
2036            );
2037        }
2038    }
2039
2040    #[test]
2041    fn test_datum_conversion() {
2042        {
2043            let datum = Datum::bool(true);
2044            let arrow_datum = get_arrow_datum(&datum).unwrap();
2045            let (array, is_scalar) = arrow_datum.get();
2046            let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
2047            assert!(is_scalar);
2048            assert!(array.value(0));
2049        }
2050        {
2051            let datum = Datum::int(42);
2052            let arrow_datum = get_arrow_datum(&datum).unwrap();
2053            let (array, is_scalar) = arrow_datum.get();
2054            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
2055            assert!(is_scalar);
2056            assert_eq!(array.value(0), 42);
2057        }
2058        {
2059            let datum = Datum::long(42);
2060            let arrow_datum = get_arrow_datum(&datum).unwrap();
2061            let (array, is_scalar) = arrow_datum.get();
2062            let array = array.as_any().downcast_ref::<Int64Array>().unwrap();
2063            assert!(is_scalar);
2064            assert_eq!(array.value(0), 42);
2065        }
2066        {
2067            let datum = Datum::float(42.42);
2068            let arrow_datum = get_arrow_datum(&datum).unwrap();
2069            let (array, is_scalar) = arrow_datum.get();
2070            let array = array.as_any().downcast_ref::<Float32Array>().unwrap();
2071            assert!(is_scalar);
2072            assert_eq!(array.value(0), 42.42);
2073        }
2074        {
2075            let datum = Datum::double(42.42);
2076            let arrow_datum = get_arrow_datum(&datum).unwrap();
2077            let (array, is_scalar) = arrow_datum.get();
2078            let array = array.as_any().downcast_ref::<Float64Array>().unwrap();
2079            assert!(is_scalar);
2080            assert_eq!(array.value(0), 42.42);
2081        }
2082        {
2083            let datum = Datum::string("abc");
2084            let arrow_datum = get_arrow_datum(&datum).unwrap();
2085            let (array, is_scalar) = arrow_datum.get();
2086            let array = array.as_any().downcast_ref::<StringArray>().unwrap();
2087            assert!(is_scalar);
2088            assert_eq!(array.value(0), "abc");
2089        }
2090        {
2091            let datum = Datum::binary(vec![1, 2, 3, 4]);
2092            let arrow_datum = get_arrow_datum(&datum).unwrap();
2093            let (array, is_scalar) = arrow_datum.get();
2094            let array = array.as_any().downcast_ref::<BinaryArray>().unwrap();
2095            assert!(is_scalar);
2096            assert_eq!(array.value(0), &[1, 2, 3, 4]);
2097        }
2098        {
2099            let datum = Datum::date(42);
2100            let arrow_datum = get_arrow_datum(&datum).unwrap();
2101            let (array, is_scalar) = arrow_datum.get();
2102            let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
2103            assert!(is_scalar);
2104            assert_eq!(array.value(0), 42);
2105        }
2106        {
2107            let datum = Datum::timestamp_micros(42);
2108            let arrow_datum = get_arrow_datum(&datum).unwrap();
2109            let (array, is_scalar) = arrow_datum.get();
2110            let array = array
2111                .as_any()
2112                .downcast_ref::<TimestampMicrosecondArray>()
2113                .unwrap();
2114            assert!(is_scalar);
2115            assert_eq!(array.value(0), 42);
2116        }
2117        {
2118            let datum = Datum::timestamptz_micros(42);
2119            let arrow_datum = get_arrow_datum(&datum).unwrap();
2120            let (array, is_scalar) = arrow_datum.get();
2121            let array = array
2122                .as_any()
2123                .downcast_ref::<TimestampMicrosecondArray>()
2124                .unwrap();
2125            assert!(is_scalar);
2126            assert_eq!(array.timezone(), Some("+00:00"));
2127            assert_eq!(array.value(0), 42);
2128        }
2129        {
2130            let datum = Datum::decimal_with_precision(Decimal::new(123, 2), 30).unwrap();
2131            let arrow_datum = get_arrow_datum(&datum).unwrap();
2132            let (array, is_scalar) = arrow_datum.get();
2133            let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
2134            assert!(is_scalar);
2135            assert_eq!(array.precision(), 30);
2136            assert_eq!(array.scale(), 2);
2137            assert_eq!(array.value(0), 123);
2138        }
2139        {
2140            let datum = Datum::uuid_from_str("42424242-4242-4242-4242-424242424242").unwrap();
2141            let arrow_datum = get_arrow_datum(&datum).unwrap();
2142            let (array, is_scalar) = arrow_datum.get();
2143            let array = array
2144                .as_any()
2145                .downcast_ref::<FixedSizeBinaryArray>()
2146                .unwrap();
2147            assert!(is_scalar);
2148            assert_eq!(array.value(0), [66u8; 16]);
2149        }
2150    }
2151
2152    #[test]
2153    fn test_arrow_schema_to_schema_with_field_id() {
2154        // Create a complex Arrow schema without field ID metadata
2155        // Including: primitives, list, nested struct, map, and nested list of structs
2156        let arrow_schema = ArrowSchema::new(vec![
2157            Field::new("id", DataType::Int64, false),
2158            Field::new("name", DataType::Utf8, true),
2159            Field::new("price", DataType::Decimal128(10, 2), false),
2160            Field::new(
2161                "created_at",
2162                DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2163                true,
2164            ),
2165            Field::new(
2166                "tags",
2167                DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2168                true,
2169            ),
2170            Field::new(
2171                "address",
2172                DataType::Struct(Fields::from(vec![
2173                    Field::new("street", DataType::Utf8, true),
2174                    Field::new("city", DataType::Utf8, false),
2175                    Field::new("zip", DataType::Int32, true),
2176                ])),
2177                true,
2178            ),
2179            Field::new(
2180                "attributes",
2181                DataType::Map(
2182                    Arc::new(Field::new(
2183                        DEFAULT_MAP_FIELD_NAME,
2184                        DataType::Struct(Fields::from(vec![
2185                            Field::new("key", DataType::Utf8, false),
2186                            Field::new("value", DataType::Utf8, true),
2187                        ])),
2188                        false,
2189                    )),
2190                    false,
2191                ),
2192                true,
2193            ),
2194            Field::new(
2195                "orders",
2196                DataType::List(Arc::new(Field::new(
2197                    "element",
2198                    DataType::Struct(Fields::from(vec![
2199                        Field::new("order_id", DataType::Int64, false),
2200                        Field::new("amount", DataType::Float64, false),
2201                    ])),
2202                    true,
2203                ))),
2204                true,
2205            ),
2206        ]);
2207
2208        let schema = arrow_schema_to_schema_auto_assign_ids(&arrow_schema).unwrap();
2209
2210        // Build expected schema with exact field IDs following level-order assignment:
2211        // Level 0: id=1, name=2, price=3, created_at=4, tags=5, address=6, attributes=7, orders=8
2212        // Level 1: tags.element=9, address.{street=10,city=11,zip=12}, attributes.{key=13,value=14}, orders.element=15
2213        // Level 2: orders.element.{order_id=16,amount=17}
2214        let expected = Schema::builder()
2215            .with_fields(vec![
2216                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
2217                NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
2218                NestedField::required(
2219                    3,
2220                    "price",
2221                    Type::Primitive(PrimitiveType::Decimal {
2222                        precision: 10,
2223                        scale: 2,
2224                    }),
2225                )
2226                .into(),
2227                NestedField::optional(4, "created_at", Type::Primitive(PrimitiveType::Timestamptz))
2228                    .into(),
2229                NestedField::optional(
2230                    5,
2231                    "tags",
2232                    Type::List(ListType {
2233                        element_field: NestedField::list_element(
2234                            9,
2235                            Type::Primitive(PrimitiveType::String),
2236                            false,
2237                        )
2238                        .into(),
2239                    }),
2240                )
2241                .into(),
2242                NestedField::optional(
2243                    6,
2244                    "address",
2245                    Type::Struct(StructType::new(vec![
2246                        NestedField::optional(10, "street", Type::Primitive(PrimitiveType::String))
2247                            .into(),
2248                        NestedField::required(11, "city", Type::Primitive(PrimitiveType::String))
2249                            .into(),
2250                        NestedField::optional(12, "zip", Type::Primitive(PrimitiveType::Int))
2251                            .into(),
2252                    ])),
2253                )
2254                .into(),
2255                NestedField::optional(
2256                    7,
2257                    "attributes",
2258                    Type::Map(MapType {
2259                        key_field: NestedField::map_key_element(
2260                            13,
2261                            Type::Primitive(PrimitiveType::String),
2262                        )
2263                        .into(),
2264                        value_field: NestedField::map_value_element(
2265                            14,
2266                            Type::Primitive(PrimitiveType::String),
2267                            false,
2268                        )
2269                        .into(),
2270                    }),
2271                )
2272                .into(),
2273                NestedField::optional(
2274                    8,
2275                    "orders",
2276                    Type::List(ListType {
2277                        element_field: NestedField::list_element(
2278                            15,
2279                            Type::Struct(StructType::new(vec![
2280                                NestedField::required(
2281                                    16,
2282                                    "order_id",
2283                                    Type::Primitive(PrimitiveType::Long),
2284                                )
2285                                .into(),
2286                                NestedField::required(
2287                                    17,
2288                                    "amount",
2289                                    Type::Primitive(PrimitiveType::Double),
2290                                )
2291                                .into(),
2292                            ])),
2293                            false,
2294                        )
2295                        .into(),
2296                    }),
2297                )
2298                .into(),
2299            ])
2300            .build()
2301            .unwrap();
2302
2303        pretty_assertions::assert_eq!(schema, expected);
2304        assert_eq!(schema.highest_field_id(), 17);
2305    }
2306}