iceberg/arrow/
value.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow_array::{
21    Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, FixedSizeBinaryArray,
22    FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array, LargeBinaryArray,
23    LargeListArray, LargeStringArray, ListArray, MapArray, StringArray, StructArray,
24    Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray,
25};
26use arrow_buffer::NullBuffer;
27use arrow_schema::{DataType, FieldRef, TimeUnit};
28use uuid::Uuid;
29
30use super::get_field_id_from_metadata;
31use crate::spec::{
32    ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveLiteral, PrimitiveType,
33    SchemaWithPartnerVisitor, Struct, StructType, Type, visit_struct_with_partner,
34    visit_type_with_partner,
35};
36use crate::{Error, ErrorKind, Result};
37
38struct ArrowArrayToIcebergStructConverter;
39
40impl SchemaWithPartnerVisitor<ArrayRef> for ArrowArrayToIcebergStructConverter {
41    type T = Vec<Option<Literal>>;
42
43    fn schema(
44        &mut self,
45        _schema: &crate::spec::Schema,
46        _partner: &ArrayRef,
47        value: Vec<Option<Literal>>,
48    ) -> Result<Vec<Option<Literal>>> {
49        Ok(value)
50    }
51
52    fn field(
53        &mut self,
54        field: &crate::spec::NestedFieldRef,
55        _partner: &ArrayRef,
56        value: Vec<Option<Literal>>,
57    ) -> Result<Vec<Option<Literal>>> {
58        // Make there is no null value if the field is required
59        if field.required && value.iter().any(Option::is_none) {
60            return Err(Error::new(
61                ErrorKind::DataInvalid,
62                "The field is required but has null value",
63            )
64            .with_context("field_id", field.id.to_string())
65            .with_context("field_name", &field.name));
66        }
67        Ok(value)
68    }
69
70    fn r#struct(
71        &mut self,
72        _struct: &StructType,
73        array: &ArrayRef,
74        results: Vec<Vec<Option<Literal>>>,
75    ) -> Result<Vec<Option<Literal>>> {
76        let row_len = results.first().map(|column| column.len()).unwrap_or(0);
77        if let Some(col) = results.iter().find(|col| col.len() != row_len) {
78            return Err(Error::new(
79                ErrorKind::DataInvalid,
80                "The struct columns have different row length",
81            )
82            .with_context("first col length", row_len.to_string())
83            .with_context("actual col length", col.len().to_string()));
84        }
85
86        let mut struct_literals = Vec::with_capacity(row_len);
87        let mut columns_iters = results
88            .into_iter()
89            .map(|column| column.into_iter())
90            .collect::<Vec<_>>();
91
92        for i in 0..row_len {
93            let mut literals = Vec::with_capacity(columns_iters.len());
94            for column_iter in columns_iters.iter_mut() {
95                literals.push(column_iter.next().unwrap());
96            }
97            if array.is_null(i) {
98                struct_literals.push(None);
99            } else {
100                struct_literals.push(Some(Literal::Struct(Struct::from_iter(literals))));
101            }
102        }
103
104        Ok(struct_literals)
105    }
106
107    fn list(
108        &mut self,
109        list: &ListType,
110        array: &ArrayRef,
111        elements: Vec<Option<Literal>>,
112    ) -> Result<Vec<Option<Literal>>> {
113        if list.element_field.required && elements.iter().any(Option::is_none) {
114            return Err(Error::new(
115                ErrorKind::DataInvalid,
116                "The list should not have null value",
117            ));
118        }
119        match array.data_type() {
120            DataType::List(_) => {
121                let offset = array
122                    .as_any()
123                    .downcast_ref::<ListArray>()
124                    .ok_or_else(|| {
125                        Error::new(ErrorKind::DataInvalid, "The partner is not a list array")
126                    })?
127                    .offsets();
128                // combine the result according to the offset
129                let mut result = Vec::with_capacity(offset.len() - 1);
130                for i in 0..offset.len() - 1 {
131                    let start = offset[i] as usize;
132                    let end = offset[i + 1] as usize;
133                    result.push(Some(Literal::List(elements[start..end].to_vec())));
134                }
135                Ok(result)
136            }
137            DataType::LargeList(_) => {
138                let offset = array
139                    .as_any()
140                    .downcast_ref::<LargeListArray>()
141                    .ok_or_else(|| {
142                        Error::new(
143                            ErrorKind::DataInvalid,
144                            "The partner is not a large list array",
145                        )
146                    })?
147                    .offsets();
148                // combine the result according to the offset
149                let mut result = Vec::with_capacity(offset.len() - 1);
150                for i in 0..offset.len() - 1 {
151                    let start = offset[i] as usize;
152                    let end = offset[i + 1] as usize;
153                    result.push(Some(Literal::List(elements[start..end].to_vec())));
154                }
155                Ok(result)
156            }
157            DataType::FixedSizeList(_, len) => {
158                let mut result = Vec::with_capacity(elements.len() / *len as usize);
159                for i in 0..elements.len() / *len as usize {
160                    let start = i * *len as usize;
161                    let end = (i + 1) * *len as usize;
162                    result.push(Some(Literal::List(elements[start..end].to_vec())));
163                }
164                Ok(result)
165            }
166            _ => Err(Error::new(
167                ErrorKind::DataInvalid,
168                "The partner is not a list type",
169            )),
170        }
171    }
172
173    fn map(
174        &mut self,
175        _map: &MapType,
176        partner: &ArrayRef,
177        key_values: Vec<Option<Literal>>,
178        values: Vec<Option<Literal>>,
179    ) -> Result<Vec<Option<Literal>>> {
180        // Make sure key_value and value have the same row length
181        if key_values.len() != values.len() {
182            return Err(Error::new(
183                ErrorKind::DataInvalid,
184                "The key value and value of map should have the same row length",
185            ));
186        }
187
188        let offsets = partner
189            .as_any()
190            .downcast_ref::<MapArray>()
191            .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "The partner is not a map array"))?
192            .offsets();
193        // combine the result according to the offset
194        let mut result = Vec::with_capacity(offsets.len() - 1);
195        for i in 0..offsets.len() - 1 {
196            let start = offsets[i] as usize;
197            let end = offsets[i + 1] as usize;
198            let mut map = Map::new();
199            for (key, value) in key_values[start..end].iter().zip(values[start..end].iter()) {
200                map.insert(key.clone().unwrap(), value.clone());
201            }
202            result.push(Some(Literal::Map(map)));
203        }
204        Ok(result)
205    }
206
207    fn primitive(&mut self, p: &PrimitiveType, partner: &ArrayRef) -> Result<Vec<Option<Literal>>> {
208        match p {
209            PrimitiveType::Boolean => {
210                let array = partner
211                    .as_any()
212                    .downcast_ref::<BooleanArray>()
213                    .ok_or_else(|| {
214                        Error::new(ErrorKind::DataInvalid, "The partner is not a boolean array")
215                    })?;
216                Ok(array.iter().map(|v| v.map(Literal::bool)).collect())
217            }
218            PrimitiveType::Int => {
219                let array = partner
220                    .as_any()
221                    .downcast_ref::<Int32Array>()
222                    .ok_or_else(|| {
223                        Error::new(ErrorKind::DataInvalid, "The partner is not a int32 array")
224                    })?;
225                Ok(array.iter().map(|v| v.map(Literal::int)).collect())
226            }
227            PrimitiveType::Long => {
228                let array = partner
229                    .as_any()
230                    .downcast_ref::<Int64Array>()
231                    .ok_or_else(|| {
232                        Error::new(ErrorKind::DataInvalid, "The partner is not a int64 array")
233                    })?;
234                Ok(array.iter().map(|v| v.map(Literal::long)).collect())
235            }
236            PrimitiveType::Float => {
237                let array = partner
238                    .as_any()
239                    .downcast_ref::<Float32Array>()
240                    .ok_or_else(|| {
241                        Error::new(ErrorKind::DataInvalid, "The partner is not a float32 array")
242                    })?;
243                Ok(array.iter().map(|v| v.map(Literal::float)).collect())
244            }
245            PrimitiveType::Double => {
246                let array = partner
247                    .as_any()
248                    .downcast_ref::<Float64Array>()
249                    .ok_or_else(|| {
250                        Error::new(ErrorKind::DataInvalid, "The partner is not a float64 array")
251                    })?;
252                Ok(array.iter().map(|v| v.map(Literal::double)).collect())
253            }
254            PrimitiveType::Decimal { precision, scale } => {
255                let array = partner
256                    .as_any()
257                    .downcast_ref::<Decimal128Array>()
258                    .ok_or_else(|| {
259                        Error::new(
260                            ErrorKind::DataInvalid,
261                            "The partner is not a decimal128 array",
262                        )
263                    })?;
264                if let DataType::Decimal128(arrow_precision, arrow_scale) = array.data_type()
265                    && (*arrow_precision as u32 != *precision || *arrow_scale as u32 != *scale)
266                {
267                    return Err(Error::new(
268                        ErrorKind::DataInvalid,
269                        format!(
270                            "The precision or scale ({arrow_precision},{arrow_scale}) of arrow decimal128 array is not compatible with iceberg decimal type ({precision},{scale})"
271                        ),
272                    ));
273                }
274                Ok(array.iter().map(|v| v.map(Literal::decimal)).collect())
275            }
276            PrimitiveType::Date => {
277                let array = partner
278                    .as_any()
279                    .downcast_ref::<Date32Array>()
280                    .ok_or_else(|| {
281                        Error::new(ErrorKind::DataInvalid, "The partner is not a date32 array")
282                    })?;
283                Ok(array.iter().map(|v| v.map(Literal::date)).collect())
284            }
285            PrimitiveType::Time => {
286                let array = partner
287                    .as_any()
288                    .downcast_ref::<Time64MicrosecondArray>()
289                    .ok_or_else(|| {
290                        Error::new(ErrorKind::DataInvalid, "The partner is not a time64 array")
291                    })?;
292                Ok(array.iter().map(|v| v.map(Literal::time)).collect())
293            }
294            PrimitiveType::Timestamp => {
295                let array = partner
296                    .as_any()
297                    .downcast_ref::<TimestampMicrosecondArray>()
298                    .ok_or_else(|| {
299                        Error::new(
300                            ErrorKind::DataInvalid,
301                            "The partner is not a timestamp array",
302                        )
303                    })?;
304                Ok(array.iter().map(|v| v.map(Literal::timestamp)).collect())
305            }
306            PrimitiveType::Timestamptz => {
307                let array = partner
308                    .as_any()
309                    .downcast_ref::<TimestampMicrosecondArray>()
310                    .ok_or_else(|| {
311                        Error::new(
312                            ErrorKind::DataInvalid,
313                            "The partner is not a timestamptz array",
314                        )
315                    })?;
316                Ok(array.iter().map(|v| v.map(Literal::timestamptz)).collect())
317            }
318            PrimitiveType::TimestampNs => {
319                let array = partner
320                    .as_any()
321                    .downcast_ref::<TimestampNanosecondArray>()
322                    .ok_or_else(|| {
323                        Error::new(
324                            ErrorKind::DataInvalid,
325                            "The partner is not a timestamp_ns array",
326                        )
327                    })?;
328                Ok(array
329                    .iter()
330                    .map(|v| v.map(Literal::timestamp_nano))
331                    .collect())
332            }
333            PrimitiveType::TimestamptzNs => {
334                let array = partner
335                    .as_any()
336                    .downcast_ref::<TimestampNanosecondArray>()
337                    .ok_or_else(|| {
338                        Error::new(
339                            ErrorKind::DataInvalid,
340                            "The partner is not a timestamptz_ns array",
341                        )
342                    })?;
343                Ok(array
344                    .iter()
345                    .map(|v| v.map(Literal::timestamptz_nano))
346                    .collect())
347            }
348            PrimitiveType::String => {
349                if let Some(array) = partner.as_any().downcast_ref::<LargeStringArray>() {
350                    Ok(array.iter().map(|v| v.map(Literal::string)).collect())
351                } else if let Some(array) = partner.as_any().downcast_ref::<StringArray>() {
352                    Ok(array.iter().map(|v| v.map(Literal::string)).collect())
353                } else {
354                    Err(Error::new(
355                        ErrorKind::DataInvalid,
356                        "The partner is not a string array",
357                    ))
358                }
359            }
360            PrimitiveType::Uuid => {
361                if let Some(array) = partner.as_any().downcast_ref::<FixedSizeBinaryArray>() {
362                    if array.value_length() != 16 {
363                        return Err(Error::new(
364                            ErrorKind::DataInvalid,
365                            "The partner is not a uuid array",
366                        ));
367                    }
368                    Ok(array
369                        .iter()
370                        .map(|v| {
371                            v.map(|v| {
372                                Ok(Literal::uuid(Uuid::from_bytes(v.try_into().map_err(
373                                    |_| {
374                                        Error::new(
375                                            ErrorKind::DataInvalid,
376                                            "Failed to convert binary to uuid",
377                                        )
378                                    },
379                                )?)))
380                            })
381                            .transpose()
382                        })
383                        .collect::<Result<Vec<_>>>()?)
384                } else {
385                    Err(Error::new(
386                        ErrorKind::DataInvalid,
387                        "The partner is not a uuid array",
388                    ))
389                }
390            }
391            PrimitiveType::Fixed(len) => {
392                let array = partner
393                    .as_any()
394                    .downcast_ref::<FixedSizeBinaryArray>()
395                    .ok_or_else(|| {
396                        Error::new(ErrorKind::DataInvalid, "The partner is not a fixed array")
397                    })?;
398                if array.value_length() != *len as i32 {
399                    return Err(Error::new(
400                        ErrorKind::DataInvalid,
401                        "The length of fixed size binary array is not compatible with iceberg fixed type",
402                    ));
403                }
404                Ok(array
405                    .iter()
406                    .map(|v| v.map(|v| Literal::fixed(v.iter().cloned())))
407                    .collect())
408            }
409            PrimitiveType::Binary => {
410                if let Some(array) = partner.as_any().downcast_ref::<LargeBinaryArray>() {
411                    Ok(array
412                        .iter()
413                        .map(|v| v.map(|v| Literal::binary(v.to_vec())))
414                        .collect())
415                } else if let Some(array) = partner.as_any().downcast_ref::<BinaryArray>() {
416                    Ok(array
417                        .iter()
418                        .map(|v| v.map(|v| Literal::binary(v.to_vec())))
419                        .collect())
420                } else {
421                    Err(Error::new(
422                        ErrorKind::DataInvalid,
423                        "The partner is not a binary array",
424                    ))
425                }
426            }
427        }
428    }
429}
430
431/// Defines how Arrow fields are matched with Iceberg fields when converting data.
432///
433/// This enum provides two strategies for matching fields:
434/// - `Id`: Match fields by their ID, which is stored in Arrow field metadata.
435/// - `Name`: Match fields by their name, ignoring the field ID.
436///
437/// The ID matching mode is the default and preferred approach as it's more robust
438/// against schema evolution where field names might change but IDs remain stable.
439/// The name matching mode can be useful in scenarios where field IDs are not available
440/// or when working with systems that don't preserve field IDs.
441#[derive(Clone, Copy, Debug)]
442pub enum FieldMatchMode {
443    /// Match fields by their ID stored in Arrow field metadata
444    Id,
445    /// Match fields by their name, ignoring field IDs
446    Name,
447}
448
449impl FieldMatchMode {
450    /// Determines if an Arrow field matches an Iceberg field based on the matching mode.
451    pub fn match_field(&self, arrow_field: &FieldRef, iceberg_field: &NestedField) -> bool {
452        match self {
453            FieldMatchMode::Id => get_field_id_from_metadata(arrow_field)
454                .map(|id| id == iceberg_field.id)
455                .unwrap_or(false),
456            FieldMatchMode::Name => arrow_field.name() == &iceberg_field.name,
457        }
458    }
459}
460
461/// Partner type representing accessing and walking arrow arrays alongside iceberg schema
462pub struct ArrowArrayAccessor {
463    match_mode: FieldMatchMode,
464}
465
466impl ArrowArrayAccessor {
467    /// Creates a new instance of ArrowArrayAccessor with the default ID matching mode
468    pub fn new() -> Self {
469        Self {
470            match_mode: FieldMatchMode::Id,
471        }
472    }
473
474    /// Creates a new instance of ArrowArrayAccessor with the specified matching mode
475    pub fn new_with_match_mode(match_mode: FieldMatchMode) -> Self {
476        Self { match_mode }
477    }
478}
479
480impl Default for ArrowArrayAccessor {
481    fn default() -> Self {
482        Self::new()
483    }
484}
485
486impl PartnerAccessor<ArrayRef> for ArrowArrayAccessor {
487    fn struct_partner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
488        if !matches!(schema_partner.data_type(), DataType::Struct(_)) {
489            return Err(Error::new(
490                ErrorKind::DataInvalid,
491                "The schema partner is not a struct type",
492            ));
493        }
494
495        Ok(schema_partner)
496    }
497
498    fn field_partner<'a>(
499        &self,
500        struct_partner: &'a ArrayRef,
501        field: &NestedField,
502    ) -> Result<&'a ArrayRef> {
503        let struct_array = struct_partner
504            .as_any()
505            .downcast_ref::<StructArray>()
506            .ok_or_else(|| {
507                Error::new(
508                    ErrorKind::DataInvalid,
509                    format!(
510                        "The struct partner is not a struct array, partner: {struct_partner:?}"
511                    ),
512                )
513            })?;
514
515        let field_pos = struct_array
516            .fields()
517            .iter()
518            .position(|arrow_field| self.match_mode.match_field(arrow_field, field))
519            .ok_or_else(|| {
520                Error::new(
521                    ErrorKind::DataInvalid,
522                    format!("Field id {} not found in struct array", field.id),
523                )
524            })?;
525
526        Ok(struct_array.column(field_pos))
527    }
528
529    fn list_element_partner<'a>(&self, list_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
530        match list_partner.data_type() {
531            DataType::List(_) => {
532                let list_array = list_partner
533                    .as_any()
534                    .downcast_ref::<ListArray>()
535                    .ok_or_else(|| {
536                        Error::new(
537                            ErrorKind::DataInvalid,
538                            "The list partner is not a list array",
539                        )
540                    })?;
541                Ok(list_array.values())
542            }
543            DataType::LargeList(_) => {
544                let list_array = list_partner
545                    .as_any()
546                    .downcast_ref::<LargeListArray>()
547                    .ok_or_else(|| {
548                        Error::new(
549                            ErrorKind::DataInvalid,
550                            "The list partner is not a large list array",
551                        )
552                    })?;
553                Ok(list_array.values())
554            }
555            DataType::FixedSizeList(_, _) => {
556                let list_array = list_partner
557                    .as_any()
558                    .downcast_ref::<FixedSizeListArray>()
559                    .ok_or_else(|| {
560                        Error::new(
561                            ErrorKind::DataInvalid,
562                            "The list partner is not a fixed size list array",
563                        )
564                    })?;
565                Ok(list_array.values())
566            }
567            _ => Err(Error::new(
568                ErrorKind::DataInvalid,
569                "The list partner is not a list type",
570            )),
571        }
572    }
573
574    fn map_key_partner<'a>(&self, map_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
575        let map_array = map_partner
576            .as_any()
577            .downcast_ref::<MapArray>()
578            .ok_or_else(|| {
579                Error::new(ErrorKind::DataInvalid, "The map partner is not a map array")
580            })?;
581        Ok(map_array.keys())
582    }
583
584    fn map_value_partner<'a>(&self, map_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
585        let map_array = map_partner
586            .as_any()
587            .downcast_ref::<MapArray>()
588            .ok_or_else(|| {
589                Error::new(ErrorKind::DataInvalid, "The map partner is not a map array")
590            })?;
591        Ok(map_array.values())
592    }
593}
594
595/// Convert arrow struct array to iceberg struct value array.
596/// This function will assume the schema of arrow struct array is the same as iceberg struct type.
597pub fn arrow_struct_to_literal(
598    struct_array: &ArrayRef,
599    ty: &StructType,
600) -> Result<Vec<Option<Literal>>> {
601    visit_struct_with_partner(
602        ty,
603        struct_array,
604        &mut ArrowArrayToIcebergStructConverter,
605        &ArrowArrayAccessor::new(),
606    )
607}
608
609/// Convert arrow primitive array to iceberg primitive value array.
610/// This function will assume the schema of arrow struct array is the same as iceberg struct type.
611pub fn arrow_primitive_to_literal(
612    primitive_array: &ArrayRef,
613    ty: &Type,
614) -> Result<Vec<Option<Literal>>> {
615    visit_type_with_partner(
616        ty,
617        primitive_array,
618        &mut ArrowArrayToIcebergStructConverter,
619        &ArrowArrayAccessor::new(),
620    )
621}
622
623/// Create a single-element array from a primitive literal.
624///
625/// This is used for creating constant arrays (Run-End Encoded arrays) where we need
626/// a single value that represents all rows.
627pub(crate) fn create_primitive_array_single_element(
628    data_type: &DataType,
629    prim_lit: &Option<PrimitiveLiteral>,
630) -> Result<ArrayRef> {
631    match (data_type, prim_lit) {
632        (DataType::Boolean, Some(PrimitiveLiteral::Boolean(v))) => {
633            Ok(Arc::new(BooleanArray::from(vec![*v])))
634        }
635        (DataType::Boolean, None) => Ok(Arc::new(BooleanArray::from(vec![Option::<bool>::None]))),
636        (DataType::Int32, Some(PrimitiveLiteral::Int(v))) => {
637            Ok(Arc::new(Int32Array::from(vec![*v])))
638        }
639        (DataType::Int32, None) => Ok(Arc::new(Int32Array::from(vec![Option::<i32>::None]))),
640        (DataType::Date32, Some(PrimitiveLiteral::Int(v))) => {
641            Ok(Arc::new(Date32Array::from(vec![*v])))
642        }
643        (DataType::Date32, None) => Ok(Arc::new(Date32Array::from(vec![Option::<i32>::None]))),
644        (DataType::Int64, Some(PrimitiveLiteral::Long(v))) => {
645            Ok(Arc::new(Int64Array::from(vec![*v])))
646        }
647        (DataType::Int64, None) => Ok(Arc::new(Int64Array::from(vec![Option::<i64>::None]))),
648        (DataType::Timestamp(TimeUnit::Microsecond, timezone), Some(PrimitiveLiteral::Long(v))) => {
649            let array = TimestampMicrosecondArray::from(vec![*v]);
650            if let Some(timezone) = timezone {
651                Ok(Arc::new(array.with_timezone(timezone.clone())))
652            } else {
653                Ok(Arc::new(array))
654            }
655        }
656        (DataType::Timestamp(TimeUnit::Microsecond, timezone), None) => {
657            let array = TimestampMicrosecondArray::from(vec![Option::<i64>::None]);
658            if let Some(timezone) = timezone {
659                Ok(Arc::new(array.with_timezone(timezone.clone())))
660            } else {
661                Ok(Arc::new(array))
662            }
663        }
664        (DataType::Timestamp(TimeUnit::Nanosecond, timezone), Some(PrimitiveLiteral::Long(v))) => {
665            let array = TimestampNanosecondArray::from(vec![*v]);
666            if let Some(timezone) = timezone {
667                Ok(Arc::new(array.with_timezone(timezone.clone())))
668            } else {
669                Ok(Arc::new(array))
670            }
671        }
672        (DataType::Timestamp(TimeUnit::Nanosecond, timezone), None) => {
673            let array = TimestampNanosecondArray::from(vec![Option::<i64>::None]);
674            if let Some(timezone) = timezone {
675                Ok(Arc::new(array.with_timezone(timezone.clone())))
676            } else {
677                Ok(Arc::new(array))
678            }
679        }
680        (DataType::Float32, Some(PrimitiveLiteral::Float(v))) => {
681            Ok(Arc::new(Float32Array::from(vec![v.0])))
682        }
683        (DataType::Float32, None) => Ok(Arc::new(Float32Array::from(vec![Option::<f32>::None]))),
684        (DataType::Float64, Some(PrimitiveLiteral::Double(v))) => {
685            Ok(Arc::new(Float64Array::from(vec![v.0])))
686        }
687        (DataType::Float64, None) => Ok(Arc::new(Float64Array::from(vec![Option::<f64>::None]))),
688        (DataType::Utf8, Some(PrimitiveLiteral::String(v))) => {
689            Ok(Arc::new(StringArray::from(vec![v.as_str()])))
690        }
691        (DataType::Utf8, None) => Ok(Arc::new(StringArray::from(vec![Option::<&str>::None]))),
692        (DataType::Binary, Some(PrimitiveLiteral::Binary(v))) => {
693            Ok(Arc::new(BinaryArray::from_vec(vec![v.as_slice()])))
694        }
695        (DataType::Binary, None) => Ok(Arc::new(BinaryArray::from_opt_vec(vec![
696            Option::<&[u8]>::None,
697        ]))),
698        (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::Int128(v))) => {
699            let array = Decimal128Array::from(vec![{ *v }])
700                .with_precision_and_scale(*precision, *scale)
701                .map_err(|e| {
702                    Error::new(
703                        ErrorKind::DataInvalid,
704                        format!(
705                            "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
706                        ),
707                    )
708                })?;
709            Ok(Arc::new(array))
710        }
711        (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::UInt128(v))) => {
712            let array = Decimal128Array::from(vec![*v as i128])
713                .with_precision_and_scale(*precision, *scale)
714                .map_err(|e| {
715                    Error::new(
716                        ErrorKind::DataInvalid,
717                        format!(
718                            "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
719                        ),
720                    )
721                })?;
722            Ok(Arc::new(array))
723        }
724        (DataType::Decimal128(precision, scale), None) => {
725            let array = Decimal128Array::from(vec![Option::<i128>::None])
726                .with_precision_and_scale(*precision, *scale)
727                .map_err(|e| {
728                    Error::new(
729                        ErrorKind::DataInvalid,
730                        format!(
731                            "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
732                        ),
733                    )
734                })?;
735            Ok(Arc::new(array))
736        }
737        (DataType::Struct(fields), None) => {
738            // Create a single-element StructArray with nulls
739            let null_arrays: Vec<ArrayRef> = fields
740                .iter()
741                .map(|f| {
742                    // Recursively create null arrays for struct fields
743                    // For primitive fields in structs, use simple null arrays (not REE within struct)
744                    match f.data_type() {
745                        DataType::Boolean => {
746                            Ok(Arc::new(BooleanArray::from(vec![Option::<bool>::None]))
747                                as ArrayRef)
748                        }
749                        DataType::Int32 | DataType::Date32 => {
750                            Ok(Arc::new(Int32Array::from(vec![Option::<i32>::None])) as ArrayRef)
751                        }
752                        DataType::Int64 => {
753                            Ok(Arc::new(Int64Array::from(vec![Option::<i64>::None])) as ArrayRef)
754                        }
755                        DataType::Timestamp(TimeUnit::Microsecond, timezone) => {
756                            let array = TimestampMicrosecondArray::from(vec![Option::<i64>::None]);
757                            if let Some(timezone) = timezone {
758                                Ok(Arc::new(array.with_timezone(timezone.clone())) as ArrayRef)
759                            } else {
760                                Ok(Arc::new(array) as ArrayRef)
761                            }
762                        }
763                        DataType::Timestamp(TimeUnit::Nanosecond, timezone) => {
764                            let array = TimestampNanosecondArray::from(vec![Option::<i64>::None]);
765                            if let Some(timezone) = timezone {
766                                Ok(Arc::new(array.with_timezone(timezone.clone())) as ArrayRef)
767                            } else {
768                                Ok(Arc::new(array) as ArrayRef)
769                            }
770                        }
771                        DataType::Float32 => {
772                            Ok(Arc::new(Float32Array::from(vec![Option::<f32>::None])) as ArrayRef)
773                        }
774                        DataType::Float64 => {
775                            Ok(Arc::new(Float64Array::from(vec![Option::<f64>::None])) as ArrayRef)
776                        }
777                        DataType::Utf8 => {
778                            Ok(Arc::new(StringArray::from(vec![Option::<&str>::None])) as ArrayRef)
779                        }
780                        DataType::Binary => {
781                            Ok(
782                                Arc::new(BinaryArray::from_opt_vec(vec![Option::<&[u8]>::None]))
783                                    as ArrayRef,
784                            )
785                        }
786                        _ => Err(Error::new(
787                            ErrorKind::Unexpected,
788                            format!("Unsupported struct field type: {:?}", f.data_type()),
789                        )),
790                    }
791                })
792                .collect::<Result<Vec<_>>>()?;
793            Ok(Arc::new(arrow_array::StructArray::new(
794                fields.clone(),
795                null_arrays,
796                Some(arrow_buffer::NullBuffer::new_null(1)),
797            )))
798        }
799        _ => Err(Error::new(
800            ErrorKind::Unexpected,
801            format!("Unsupported constant type combination: {data_type:?} with {prim_lit:?}"),
802        )),
803    }
804}
805
806/// Create a repeated array from a primitive literal for a given number of rows.
807///
808/// This is used for creating non-constant arrays where we need the same value
809/// repeated for each row.
810pub(crate) fn create_primitive_array_repeated(
811    data_type: &DataType,
812    prim_lit: &Option<PrimitiveLiteral>,
813    num_rows: usize,
814) -> Result<ArrayRef> {
815    Ok(match (data_type, prim_lit) {
816        (DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => {
817            Arc::new(BooleanArray::from(vec![*value; num_rows]))
818        }
819        (DataType::Boolean, None) => {
820            let vals: Vec<Option<bool>> = vec![None; num_rows];
821            Arc::new(BooleanArray::from(vals))
822        }
823        (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => {
824            Arc::new(Int32Array::from(vec![*value; num_rows]))
825        }
826        (DataType::Int32, None) => {
827            let vals: Vec<Option<i32>> = vec![None; num_rows];
828            Arc::new(Int32Array::from(vals))
829        }
830        (DataType::Date32, Some(PrimitiveLiteral::Int(value))) => {
831            Arc::new(Date32Array::from(vec![*value; num_rows]))
832        }
833        (DataType::Date32, None) => {
834            let vals: Vec<Option<i32>> = vec![None; num_rows];
835            Arc::new(Date32Array::from(vals))
836        }
837        (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => {
838            Arc::new(Int64Array::from(vec![*value; num_rows]))
839        }
840        (DataType::Int64, None) => {
841            let vals: Vec<Option<i64>> = vec![None; num_rows];
842            Arc::new(Int64Array::from(vals))
843        }
844        (
845            DataType::Timestamp(TimeUnit::Microsecond, timezone),
846            Some(PrimitiveLiteral::Long(value)),
847        ) => {
848            let array = TimestampMicrosecondArray::from(vec![*value; num_rows]);
849            if let Some(timezone) = timezone {
850                Arc::new(array.with_timezone(timezone.clone()))
851            } else {
852                Arc::new(array)
853            }
854        }
855        (DataType::Timestamp(TimeUnit::Microsecond, timezone), None) => {
856            let vals: Vec<Option<i64>> = vec![None; num_rows];
857            let array = TimestampMicrosecondArray::from(vals);
858            if let Some(timezone) = timezone {
859                Arc::new(array.with_timezone(timezone.clone()))
860            } else {
861                Arc::new(array)
862            }
863        }
864        (
865            DataType::Timestamp(TimeUnit::Nanosecond, timezone),
866            Some(PrimitiveLiteral::Long(value)),
867        ) => {
868            let array = TimestampNanosecondArray::from(vec![*value; num_rows]);
869            if let Some(timezone) = timezone {
870                Arc::new(array.with_timezone(timezone.clone()))
871            } else {
872                Arc::new(array)
873            }
874        }
875        (DataType::Timestamp(TimeUnit::Nanosecond, timezone), None) => {
876            let vals: Vec<Option<i64>> = vec![None; num_rows];
877            let array = TimestampNanosecondArray::from(vals);
878            if let Some(timezone) = timezone {
879                Arc::new(array.with_timezone(timezone.clone()))
880            } else {
881                Arc::new(array)
882            }
883        }
884        (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => {
885            Arc::new(Float32Array::from(vec![value.0; num_rows]))
886        }
887        (DataType::Float32, None) => {
888            let vals: Vec<Option<f32>> = vec![None; num_rows];
889            Arc::new(Float32Array::from(vals))
890        }
891        (DataType::Float64, Some(PrimitiveLiteral::Double(value))) => {
892            Arc::new(Float64Array::from(vec![value.0; num_rows]))
893        }
894        (DataType::Float64, None) => {
895            let vals: Vec<Option<f64>> = vec![None; num_rows];
896            Arc::new(Float64Array::from(vals))
897        }
898        (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => {
899            Arc::new(StringArray::from(vec![value.clone(); num_rows]))
900        }
901        (DataType::Utf8, None) => {
902            let vals: Vec<Option<String>> = vec![None; num_rows];
903            Arc::new(StringArray::from(vals))
904        }
905        (DataType::Binary, Some(PrimitiveLiteral::Binary(value))) => {
906            Arc::new(BinaryArray::from_vec(vec![value; num_rows]))
907        }
908        (DataType::Binary, None) => {
909            let vals: Vec<Option<&[u8]>> = vec![None; num_rows];
910            Arc::new(BinaryArray::from_opt_vec(vals))
911        }
912        (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::Int128(value))) => {
913            Arc::new(
914                Decimal128Array::from(vec![*value; num_rows])
915                    .with_precision_and_scale(*precision, *scale)
916                    .map_err(|e| {
917                        Error::new(
918                            ErrorKind::DataInvalid,
919                            format!(
920                                "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
921                            ),
922                        )
923                    })?,
924            )
925        }
926        (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::UInt128(value))) => {
927            Arc::new(
928                Decimal128Array::from(vec![*value as i128; num_rows])
929                    .with_precision_and_scale(*precision, *scale)
930                    .map_err(|e| {
931                        Error::new(
932                            ErrorKind::DataInvalid,
933                            format!(
934                                "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
935                            ),
936                        )
937                    })?,
938            )
939        }
940        (DataType::Decimal128(precision, scale), None) => {
941            let vals: Vec<Option<i128>> = vec![None; num_rows];
942            Arc::new(
943                Decimal128Array::from(vals)
944                    .with_precision_and_scale(*precision, *scale)
945                    .map_err(|e| {
946                        Error::new(
947                            ErrorKind::DataInvalid,
948                            format!(
949                                "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
950                            ),
951                        )
952                    })?,
953            )
954        }
955        (DataType::Struct(fields), None) => {
956            // Create a StructArray filled with nulls
957            let null_arrays: Vec<ArrayRef> = fields
958                .iter()
959                .map(|field| create_primitive_array_repeated(field.data_type(), &None, num_rows))
960                .collect::<Result<Vec<_>>>()?;
961
962            Arc::new(StructArray::new(
963                fields.clone(),
964                null_arrays,
965                Some(NullBuffer::new_null(num_rows)),
966            ))
967        }
968        (DataType::Null, _) => Arc::new(arrow_array::NullArray::new(num_rows)),
969        (dt, _) => {
970            return Err(Error::new(
971                ErrorKind::Unexpected,
972                format!("unexpected target column type {dt}"),
973            ));
974        }
975    })
976}
977
978#[cfg(test)]
979mod test {
980    use std::collections::HashMap;
981    use std::sync::Arc;
982
983    use arrow_array::builder::{Int32Builder, ListBuilder, MapBuilder, StructBuilder};
984    use arrow_array::{
985        ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array,
986        Float64Array, Int32Array, Int64Array, StringArray, StructArray, Time64MicrosecondArray,
987        TimestampMicrosecondArray, TimestampNanosecondArray,
988    };
989    use arrow_schema::{DataType, Field, Fields, TimeUnit};
990    use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
991
992    use super::*;
993    use crate::spec::{ListType, Literal, MapType, NestedField, PrimitiveType, StructType, Type};
994
995    #[test]
996    fn test_arrow_struct_to_iceberg_struct() {
997        let bool_array = BooleanArray::from(vec![Some(true), Some(false), None]);
998        let int32_array = Int32Array::from(vec![Some(3), Some(4), None]);
999        let int64_array = Int64Array::from(vec![Some(5), Some(6), None]);
1000        let float32_array = Float32Array::from(vec![Some(1.1), Some(2.2), None]);
1001        let float64_array = Float64Array::from(vec![Some(3.3), Some(4.4), None]);
1002        let decimal_array = Decimal128Array::from(vec![Some(1000), Some(2000), None])
1003            .with_precision_and_scale(10, 2)
1004            .unwrap();
1005        let date_array = Date32Array::from(vec![Some(18628), Some(18629), None]);
1006        let time_array = Time64MicrosecondArray::from(vec![Some(123456789), Some(987654321), None]);
1007        let timestamp_micro_array = TimestampMicrosecondArray::from(vec![
1008            Some(1622548800000000),
1009            Some(1622635200000000),
1010            None,
1011        ]);
1012        let timestamp_nano_array = TimestampNanosecondArray::from(vec![
1013            Some(1622548800000000000),
1014            Some(1622635200000000000),
1015            None,
1016        ]);
1017        let string_array = StringArray::from(vec![Some("a"), Some("b"), None]);
1018        let binary_array =
1019            BinaryArray::from(vec![Some(b"abc".as_ref()), Some(b"def".as_ref()), None]);
1020
1021        let struct_array = Arc::new(StructArray::from(vec![
1022            (
1023                Arc::new(
1024                    Field::new("bool_field", DataType::Boolean, true).with_metadata(HashMap::from(
1025                        [(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())],
1026                    )),
1027                ),
1028                Arc::new(bool_array) as ArrayRef,
1029            ),
1030            (
1031                Arc::new(
1032                    Field::new("int32_field", DataType::Int32, true).with_metadata(HashMap::from(
1033                        [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
1034                    )),
1035                ),
1036                Arc::new(int32_array) as ArrayRef,
1037            ),
1038            (
1039                Arc::new(
1040                    Field::new("int64_field", DataType::Int64, true).with_metadata(HashMap::from(
1041                        [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())],
1042                    )),
1043                ),
1044                Arc::new(int64_array) as ArrayRef,
1045            ),
1046            (
1047                Arc::new(
1048                    Field::new("float32_field", DataType::Float32, true).with_metadata(
1049                        HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
1050                    ),
1051                ),
1052                Arc::new(float32_array) as ArrayRef,
1053            ),
1054            (
1055                Arc::new(
1056                    Field::new("float64_field", DataType::Float64, true).with_metadata(
1057                        HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "5".to_string())]),
1058                    ),
1059                ),
1060                Arc::new(float64_array) as ArrayRef,
1061            ),
1062            (
1063                Arc::new(
1064                    Field::new("decimal_field", DataType::Decimal128(10, 2), true).with_metadata(
1065                        HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "6".to_string())]),
1066                    ),
1067                ),
1068                Arc::new(decimal_array) as ArrayRef,
1069            ),
1070            (
1071                Arc::new(
1072                    Field::new("date_field", DataType::Date32, true).with_metadata(HashMap::from(
1073                        [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())],
1074                    )),
1075                ),
1076                Arc::new(date_array) as ArrayRef,
1077            ),
1078            (
1079                Arc::new(
1080                    Field::new("time_field", DataType::Time64(TimeUnit::Microsecond), true)
1081                        .with_metadata(HashMap::from([(
1082                            PARQUET_FIELD_ID_META_KEY.to_string(),
1083                            "8".to_string(),
1084                        )])),
1085                ),
1086                Arc::new(time_array) as ArrayRef,
1087            ),
1088            (
1089                Arc::new(
1090                    Field::new(
1091                        "timestamp_micro_field",
1092                        DataType::Timestamp(TimeUnit::Microsecond, None),
1093                        true,
1094                    )
1095                    .with_metadata(HashMap::from([(
1096                        PARQUET_FIELD_ID_META_KEY.to_string(),
1097                        "9".to_string(),
1098                    )])),
1099                ),
1100                Arc::new(timestamp_micro_array) as ArrayRef,
1101            ),
1102            (
1103                Arc::new(
1104                    Field::new(
1105                        "timestamp_nano_field",
1106                        DataType::Timestamp(TimeUnit::Nanosecond, None),
1107                        true,
1108                    )
1109                    .with_metadata(HashMap::from([(
1110                        PARQUET_FIELD_ID_META_KEY.to_string(),
1111                        "10".to_string(),
1112                    )])),
1113                ),
1114                Arc::new(timestamp_nano_array) as ArrayRef,
1115            ),
1116            (
1117                Arc::new(
1118                    Field::new("string_field", DataType::Utf8, true).with_metadata(HashMap::from(
1119                        [(PARQUET_FIELD_ID_META_KEY.to_string(), "11".to_string())],
1120                    )),
1121                ),
1122                Arc::new(string_array) as ArrayRef,
1123            ),
1124            (
1125                Arc::new(
1126                    Field::new("binary_field", DataType::Binary, true).with_metadata(
1127                        HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "12".to_string())]),
1128                    ),
1129                ),
1130                Arc::new(binary_array) as ArrayRef,
1131            ),
1132        ])) as ArrayRef;
1133
1134        let iceberg_struct_type = StructType::new(vec![
1135            Arc::new(NestedField::optional(
1136                0,
1137                "bool_field",
1138                Type::Primitive(PrimitiveType::Boolean),
1139            )),
1140            Arc::new(NestedField::optional(
1141                2,
1142                "int32_field",
1143                Type::Primitive(PrimitiveType::Int),
1144            )),
1145            Arc::new(NestedField::optional(
1146                3,
1147                "int64_field",
1148                Type::Primitive(PrimitiveType::Long),
1149            )),
1150            Arc::new(NestedField::optional(
1151                4,
1152                "float32_field",
1153                Type::Primitive(PrimitiveType::Float),
1154            )),
1155            Arc::new(NestedField::optional(
1156                5,
1157                "float64_field",
1158                Type::Primitive(PrimitiveType::Double),
1159            )),
1160            Arc::new(NestedField::optional(
1161                6,
1162                "decimal_field",
1163                Type::Primitive(PrimitiveType::Decimal {
1164                    precision: 10,
1165                    scale: 2,
1166                }),
1167            )),
1168            Arc::new(NestedField::optional(
1169                7,
1170                "date_field",
1171                Type::Primitive(PrimitiveType::Date),
1172            )),
1173            Arc::new(NestedField::optional(
1174                8,
1175                "time_field",
1176                Type::Primitive(PrimitiveType::Time),
1177            )),
1178            Arc::new(NestedField::optional(
1179                9,
1180                "timestamp_micro_field",
1181                Type::Primitive(PrimitiveType::Timestamp),
1182            )),
1183            Arc::new(NestedField::optional(
1184                10,
1185                "timestamp_nao_field",
1186                Type::Primitive(PrimitiveType::TimestampNs),
1187            )),
1188            Arc::new(NestedField::optional(
1189                11,
1190                "string_field",
1191                Type::Primitive(PrimitiveType::String),
1192            )),
1193            Arc::new(NestedField::optional(
1194                12,
1195                "binary_field",
1196                Type::Primitive(PrimitiveType::Binary),
1197            )),
1198        ]);
1199
1200        let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap();
1201
1202        assert_eq!(result, vec![
1203            Some(Literal::Struct(Struct::from_iter(vec![
1204                Some(Literal::bool(true)),
1205                Some(Literal::int(3)),
1206                Some(Literal::long(5)),
1207                Some(Literal::float(1.1)),
1208                Some(Literal::double(3.3)),
1209                Some(Literal::decimal(1000)),
1210                Some(Literal::date(18628)),
1211                Some(Literal::time(123456789)),
1212                Some(Literal::timestamp(1622548800000000)),
1213                Some(Literal::timestamp_nano(1622548800000000000)),
1214                Some(Literal::string("a".to_string())),
1215                Some(Literal::binary(b"abc".to_vec())),
1216            ]))),
1217            Some(Literal::Struct(Struct::from_iter(vec![
1218                Some(Literal::bool(false)),
1219                Some(Literal::int(4)),
1220                Some(Literal::long(6)),
1221                Some(Literal::float(2.2)),
1222                Some(Literal::double(4.4)),
1223                Some(Literal::decimal(2000)),
1224                Some(Literal::date(18629)),
1225                Some(Literal::time(987654321)),
1226                Some(Literal::timestamp(1622635200000000)),
1227                Some(Literal::timestamp_nano(1622635200000000000)),
1228                Some(Literal::string("b".to_string())),
1229                Some(Literal::binary(b"def".to_vec())),
1230            ]))),
1231            Some(Literal::Struct(Struct::from_iter(vec![
1232                None, None, None, None, None, None, None, None, None, None, None, None,
1233            ]))),
1234        ]);
1235    }
1236
1237    #[test]
1238    fn test_nullable_struct() {
1239        // test case that partial columns are null
1240        // [
1241        //   {a: null, b: null} // child column is null
1242        //   {a: 1, b: null},   // partial child column is null
1243        //   null               // parent column is null
1244        // ]
1245        let struct_array = {
1246            let mut builder = StructBuilder::from_fields(
1247                Fields::from(vec![
1248                    Field::new("a", DataType::Int32, true).with_metadata(HashMap::from([(
1249                        PARQUET_FIELD_ID_META_KEY.to_string(),
1250                        "0".to_string(),
1251                    )])),
1252                    Field::new("b", DataType::Int32, true).with_metadata(HashMap::from([(
1253                        PARQUET_FIELD_ID_META_KEY.to_string(),
1254                        "1".to_string(),
1255                    )])),
1256                ]),
1257                3,
1258            );
1259            builder
1260                .field_builder::<Int32Builder>(0)
1261                .unwrap()
1262                .append_null();
1263            builder
1264                .field_builder::<Int32Builder>(1)
1265                .unwrap()
1266                .append_null();
1267            builder.append(true);
1268
1269            builder
1270                .field_builder::<Int32Builder>(0)
1271                .unwrap()
1272                .append_value(1);
1273            builder
1274                .field_builder::<Int32Builder>(1)
1275                .unwrap()
1276                .append_null();
1277            builder.append(true);
1278
1279            builder
1280                .field_builder::<Int32Builder>(0)
1281                .unwrap()
1282                .append_value(1);
1283            builder
1284                .field_builder::<Int32Builder>(1)
1285                .unwrap()
1286                .append_value(1);
1287            builder.append_null();
1288
1289            Arc::new(builder.finish()) as ArrayRef
1290        };
1291
1292        let iceberg_struct_type = StructType::new(vec![
1293            Arc::new(NestedField::optional(
1294                0,
1295                "a",
1296                Type::Primitive(PrimitiveType::Int),
1297            )),
1298            Arc::new(NestedField::optional(
1299                1,
1300                "b",
1301                Type::Primitive(PrimitiveType::Int),
1302            )),
1303        ]);
1304
1305        let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap();
1306        assert_eq!(result, vec![
1307            Some(Literal::Struct(Struct::from_iter(vec![None, None,]))),
1308            Some(Literal::Struct(Struct::from_iter(vec![
1309                Some(Literal::int(1)),
1310                None,
1311            ]))),
1312            None,
1313        ]);
1314    }
1315
1316    #[test]
1317    fn test_empty_struct() {
1318        let struct_array = Arc::new(StructArray::new_null(Fields::empty(), 3)) as ArrayRef;
1319        let iceberg_struct_type = StructType::new(vec![]);
1320        let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap();
1321        assert_eq!(result, vec![None; 0]);
1322    }
1323
1324    #[test]
1325    fn test_find_field_by_id() {
1326        // Create Arrow arrays for the nested structure
1327        let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]);
1328        let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]);
1329
1330        // Create the nested struct array with field IDs in metadata
1331        let nested_struct_array =
1332            Arc::new(StructArray::from(vec![
1333                (
1334                    Arc::new(Field::new("field_a", DataType::Int32, true).with_metadata(
1335                        HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1336                    )),
1337                    Arc::new(field_a_array) as ArrayRef,
1338                ),
1339                (
1340                    Arc::new(Field::new("field_b", DataType::Utf8, true).with_metadata(
1341                        HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
1342                    )),
1343                    Arc::new(field_b_array) as ArrayRef,
1344                ),
1345            ])) as ArrayRef;
1346
1347        let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]);
1348
1349        // Create the top-level struct array with field IDs in metadata
1350        let struct_array = Arc::new(StructArray::from(vec![
1351            (
1352                Arc::new(
1353                    Field::new(
1354                        "nested_struct",
1355                        DataType::Struct(Fields::from(vec![
1356                            Field::new("field_a", DataType::Int32, true).with_metadata(
1357                                HashMap::from([(
1358                                    PARQUET_FIELD_ID_META_KEY.to_string(),
1359                                    "1".to_string(),
1360                                )]),
1361                            ),
1362                            Field::new("field_b", DataType::Utf8, true).with_metadata(
1363                                HashMap::from([(
1364                                    PARQUET_FIELD_ID_META_KEY.to_string(),
1365                                    "2".to_string(),
1366                                )]),
1367                            ),
1368                        ])),
1369                        true,
1370                    )
1371                    .with_metadata(HashMap::from([(
1372                        PARQUET_FIELD_ID_META_KEY.to_string(),
1373                        "3".to_string(),
1374                    )])),
1375                ),
1376                nested_struct_array,
1377            ),
1378            (
1379                Arc::new(Field::new("field_c", DataType::Int32, true).with_metadata(
1380                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
1381                )),
1382                Arc::new(field_c_array) as ArrayRef,
1383            ),
1384        ])) as ArrayRef;
1385
1386        // Create an ArrowArrayAccessor with ID matching mode
1387        let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Id);
1388
1389        // Test finding fields by ID
1390        let nested_field = NestedField::optional(
1391            3,
1392            "nested_struct",
1393            Type::Struct(StructType::new(vec![
1394                Arc::new(NestedField::optional(
1395                    1,
1396                    "field_a",
1397                    Type::Primitive(PrimitiveType::Int),
1398                )),
1399                Arc::new(NestedField::optional(
1400                    2,
1401                    "field_b",
1402                    Type::Primitive(PrimitiveType::String),
1403                )),
1404            ])),
1405        );
1406        let nested_partner = accessor
1407            .field_partner(&struct_array, &nested_field)
1408            .unwrap();
1409
1410        // Verify we can access the nested field
1411        let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int));
1412        let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap();
1413
1414        // Verify the field has the expected value
1415        let int_array = field_a_partner
1416            .as_any()
1417            .downcast_ref::<Int32Array>()
1418            .unwrap();
1419        assert_eq!(int_array.value(0), 42);
1420        assert_eq!(int_array.value(1), 43);
1421        assert!(int_array.is_null(2));
1422    }
1423
1424    #[test]
1425    fn test_find_field_by_name() {
1426        // Create Arrow arrays for the nested structure
1427        let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]);
1428        let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]);
1429
1430        // Create the nested struct array WITHOUT field IDs in metadata
1431        let nested_struct_array = Arc::new(StructArray::from(vec![
1432            (
1433                Arc::new(Field::new("field_a", DataType::Int32, true)),
1434                Arc::new(field_a_array) as ArrayRef,
1435            ),
1436            (
1437                Arc::new(Field::new("field_b", DataType::Utf8, true)),
1438                Arc::new(field_b_array) as ArrayRef,
1439            ),
1440        ])) as ArrayRef;
1441
1442        let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]);
1443
1444        // Create the top-level struct array WITHOUT field IDs in metadata
1445        let struct_array = Arc::new(StructArray::from(vec![
1446            (
1447                Arc::new(Field::new(
1448                    "nested_struct",
1449                    DataType::Struct(Fields::from(vec![
1450                        Field::new("field_a", DataType::Int32, true),
1451                        Field::new("field_b", DataType::Utf8, true),
1452                    ])),
1453                    true,
1454                )),
1455                nested_struct_array,
1456            ),
1457            (
1458                Arc::new(Field::new("field_c", DataType::Int32, true)),
1459                Arc::new(field_c_array) as ArrayRef,
1460            ),
1461        ])) as ArrayRef;
1462
1463        // Create an ArrowArrayAccessor with Name matching mode
1464        let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Name);
1465
1466        // Test finding fields by name
1467        let nested_field = NestedField::optional(
1468            3,
1469            "nested_struct",
1470            Type::Struct(StructType::new(vec![
1471                Arc::new(NestedField::optional(
1472                    1,
1473                    "field_a",
1474                    Type::Primitive(PrimitiveType::Int),
1475                )),
1476                Arc::new(NestedField::optional(
1477                    2,
1478                    "field_b",
1479                    Type::Primitive(PrimitiveType::String),
1480                )),
1481            ])),
1482        );
1483        let nested_partner = accessor
1484            .field_partner(&struct_array, &nested_field)
1485            .unwrap();
1486
1487        // Verify we can access the nested field by name
1488        let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int));
1489        let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap();
1490
1491        // Verify the field has the expected value
1492        let int_array = field_a_partner
1493            .as_any()
1494            .downcast_ref::<Int32Array>()
1495            .unwrap();
1496        assert_eq!(int_array.value(0), 42);
1497        assert_eq!(int_array.value(1), 43);
1498        assert!(int_array.is_null(2));
1499    }
1500
1501    #[test]
1502    fn test_complex_nested() {
1503        // complex nested type for test
1504        // <
1505        //   A: list< struct(a1: int, a2: int) >,
1506        //   B: list< map<int, int> >,
1507        //   C: list< list<int> >,
1508        // >
1509        let struct_type = StructType::new(vec![
1510            Arc::new(NestedField::required(
1511                0,
1512                "A",
1513                Type::List(ListType::new(Arc::new(NestedField::required(
1514                    1,
1515                    "item",
1516                    Type::Struct(StructType::new(vec![
1517                        Arc::new(NestedField::required(
1518                            2,
1519                            "a1",
1520                            Type::Primitive(PrimitiveType::Int),
1521                        )),
1522                        Arc::new(NestedField::required(
1523                            3,
1524                            "a2",
1525                            Type::Primitive(PrimitiveType::Int),
1526                        )),
1527                    ])),
1528                )))),
1529            )),
1530            Arc::new(NestedField::required(
1531                4,
1532                "B",
1533                Type::List(ListType::new(Arc::new(NestedField::required(
1534                    5,
1535                    "item",
1536                    Type::Map(MapType::new(
1537                        NestedField::optional(6, "keys", Type::Primitive(PrimitiveType::Int))
1538                            .into(),
1539                        NestedField::optional(7, "values", Type::Primitive(PrimitiveType::Int))
1540                            .into(),
1541                    )),
1542                )))),
1543            )),
1544            Arc::new(NestedField::required(
1545                8,
1546                "C",
1547                Type::List(ListType::new(Arc::new(NestedField::required(
1548                    9,
1549                    "item",
1550                    Type::List(ListType::new(Arc::new(NestedField::optional(
1551                        10,
1552                        "item",
1553                        Type::Primitive(PrimitiveType::Int),
1554                    )))),
1555                )))),
1556            )),
1557        ]);
1558
1559        // Generate a complex nested struct array
1560        // [
1561        //   {A: [{a1: 10, a2: 20}, {a1: 11, a2: 21}], B: [{(1,100),(3,300)},{(2,200)}], C: [[100,101,102], [200,201]]},
1562        //   {A: [{a1: 12, a2: 22}, {a1: 13, a2: 23}], B: [{(3,300)},{(4,400)}], C: [[300,301,302], [400,401]]},
1563        // ]
1564        let struct_array =
1565            {
1566                let a_struct_a1_builder = Int32Builder::new();
1567                let a_struct_a2_builder = Int32Builder::new();
1568                let a_struct_builder =
1569                    StructBuilder::new(
1570                        vec![
1571                            Field::new("a1", DataType::Int32, false).with_metadata(HashMap::from(
1572                                [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
1573                            )),
1574                            Field::new("a2", DataType::Int32, false).with_metadata(HashMap::from(
1575                                [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())],
1576                            )),
1577                        ],
1578                        vec![Box::new(a_struct_a1_builder), Box::new(a_struct_a2_builder)],
1579                    );
1580                let a_builder = ListBuilder::new(a_struct_builder);
1581
1582                let map_key_builder = Int32Builder::new();
1583                let map_value_builder = Int32Builder::new();
1584                let map_builder = MapBuilder::new(None, map_key_builder, map_value_builder);
1585                let b_builder = ListBuilder::new(map_builder);
1586
1587                let inner_list_item_builder = Int32Builder::new();
1588                let inner_list_builder = ListBuilder::new(inner_list_item_builder);
1589                let c_builder = ListBuilder::new(inner_list_builder);
1590
1591                let mut top_struct_builder = {
1592                    let a_struct_type =
1593                        DataType::Struct(Fields::from(vec![
1594                            Field::new("a1", DataType::Int32, false).with_metadata(HashMap::from(
1595                                [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
1596                            )),
1597                            Field::new("a2", DataType::Int32, false).with_metadata(HashMap::from(
1598                                [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())],
1599                            )),
1600                        ]));
1601                    let a_type =
1602                        DataType::List(Arc::new(Field::new("item", a_struct_type.clone(), true)));
1603
1604                    let b_map_entry_struct = Field::new(
1605                        "entries",
1606                        DataType::Struct(Fields::from(vec![
1607                            Field::new("keys", DataType::Int32, false),
1608                            Field::new("values", DataType::Int32, true),
1609                        ])),
1610                        false,
1611                    );
1612                    let b_map_type =
1613                        DataType::Map(Arc::new(b_map_entry_struct), /* sorted_keys = */ false);
1614                    let b_type =
1615                        DataType::List(Arc::new(Field::new("item", b_map_type.clone(), true)));
1616
1617                    let c_inner_list_type =
1618                        DataType::List(Arc::new(Field::new("item", DataType::Int32, true)));
1619                    let c_type = DataType::List(Arc::new(Field::new(
1620                        "item",
1621                        c_inner_list_type.clone(),
1622                        true,
1623                    )));
1624                    StructBuilder::new(
1625                        Fields::from(vec![
1626                            Field::new("A", a_type.clone(), false).with_metadata(HashMap::from([
1627                                (PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string()),
1628                            ])),
1629                            Field::new("B", b_type.clone(), false).with_metadata(HashMap::from([
1630                                (PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string()),
1631                            ])),
1632                            Field::new("C", c_type.clone(), false).with_metadata(HashMap::from([
1633                                (PARQUET_FIELD_ID_META_KEY.to_string(), "8".to_string()),
1634                            ])),
1635                        ]),
1636                        vec![
1637                            Box::new(a_builder),
1638                            Box::new(b_builder),
1639                            Box::new(c_builder),
1640                        ],
1641                    )
1642                };
1643
1644                // first row
1645                // {A: [{a1: 10, a2: 20}, {a1: 11, a2: 21}], B: [{(1,100),(3,300)},{(2,200)}], C: [[100,101,102], [200,201]]},
1646                {
1647                    let a_builder = top_struct_builder
1648                        .field_builder::<ListBuilder<StructBuilder>>(0)
1649                        .unwrap();
1650                    let struct_builder = a_builder.values();
1651                    struct_builder
1652                        .field_builder::<Int32Builder>(0)
1653                        .unwrap()
1654                        .append_value(10);
1655                    struct_builder
1656                        .field_builder::<Int32Builder>(1)
1657                        .unwrap()
1658                        .append_value(20);
1659                    struct_builder.append(true);
1660                    let struct_builder = a_builder.values();
1661                    struct_builder
1662                        .field_builder::<Int32Builder>(0)
1663                        .unwrap()
1664                        .append_value(11);
1665                    struct_builder
1666                        .field_builder::<Int32Builder>(1)
1667                        .unwrap()
1668                        .append_value(21);
1669                    struct_builder.append(true);
1670                    a_builder.append(true);
1671                }
1672                {
1673                    let b_builder = top_struct_builder
1674                        .field_builder::<ListBuilder<MapBuilder<Int32Builder, Int32Builder>>>(1)
1675                        .unwrap();
1676                    let map_builder = b_builder.values();
1677                    map_builder.keys().append_value(1);
1678                    map_builder.values().append_value(100);
1679                    map_builder.keys().append_value(3);
1680                    map_builder.values().append_value(300);
1681                    map_builder.append(true).unwrap();
1682
1683                    map_builder.keys().append_value(2);
1684                    map_builder.values().append_value(200);
1685                    map_builder.append(true).unwrap();
1686
1687                    b_builder.append(true);
1688                }
1689                {
1690                    let c_builder = top_struct_builder
1691                        .field_builder::<ListBuilder<ListBuilder<Int32Builder>>>(2)
1692                        .unwrap();
1693                    let inner_list_builder = c_builder.values();
1694                    inner_list_builder.values().append_value(100);
1695                    inner_list_builder.values().append_value(101);
1696                    inner_list_builder.values().append_value(102);
1697                    inner_list_builder.append(true);
1698                    let inner_list_builder = c_builder.values();
1699                    inner_list_builder.values().append_value(200);
1700                    inner_list_builder.values().append_value(201);
1701                    inner_list_builder.append(true);
1702                    c_builder.append(true);
1703                }
1704                top_struct_builder.append(true);
1705
1706                // second row
1707                // {A: [{a1: 12, a2: 22}, {a1: 13, a2: 23}], B: [{(3,300)}], C: [[300,301,302], [400,401]]},
1708                {
1709                    let a_builder = top_struct_builder
1710                        .field_builder::<ListBuilder<StructBuilder>>(0)
1711                        .unwrap();
1712                    let struct_builder = a_builder.values();
1713                    struct_builder
1714                        .field_builder::<Int32Builder>(0)
1715                        .unwrap()
1716                        .append_value(12);
1717                    struct_builder
1718                        .field_builder::<Int32Builder>(1)
1719                        .unwrap()
1720                        .append_value(22);
1721                    struct_builder.append(true);
1722                    let struct_builder = a_builder.values();
1723                    struct_builder
1724                        .field_builder::<Int32Builder>(0)
1725                        .unwrap()
1726                        .append_value(13);
1727                    struct_builder
1728                        .field_builder::<Int32Builder>(1)
1729                        .unwrap()
1730                        .append_value(23);
1731                    struct_builder.append(true);
1732                    a_builder.append(true);
1733                }
1734                {
1735                    let b_builder = top_struct_builder
1736                        .field_builder::<ListBuilder<MapBuilder<Int32Builder, Int32Builder>>>(1)
1737                        .unwrap();
1738                    let map_builder = b_builder.values();
1739                    map_builder.keys().append_value(3);
1740                    map_builder.values().append_value(300);
1741                    map_builder.append(true).unwrap();
1742
1743                    b_builder.append(true);
1744                }
1745                {
1746                    let c_builder = top_struct_builder
1747                        .field_builder::<ListBuilder<ListBuilder<Int32Builder>>>(2)
1748                        .unwrap();
1749                    let inner_list_builder = c_builder.values();
1750                    inner_list_builder.values().append_value(300);
1751                    inner_list_builder.values().append_value(301);
1752                    inner_list_builder.values().append_value(302);
1753                    inner_list_builder.append(true);
1754                    let inner_list_builder = c_builder.values();
1755                    inner_list_builder.values().append_value(400);
1756                    inner_list_builder.values().append_value(401);
1757                    inner_list_builder.append(true);
1758                    c_builder.append(true);
1759                }
1760                top_struct_builder.append(true);
1761
1762                Arc::new(top_struct_builder.finish()) as ArrayRef
1763            };
1764
1765        let result = arrow_struct_to_literal(&struct_array, &struct_type).unwrap();
1766        assert_eq!(result, vec![
1767            Some(Literal::Struct(Struct::from_iter(vec![
1768                Some(Literal::List(vec![
1769                    Some(Literal::Struct(Struct::from_iter(vec![
1770                        Some(Literal::int(10)),
1771                        Some(Literal::int(20)),
1772                    ]))),
1773                    Some(Literal::Struct(Struct::from_iter(vec![
1774                        Some(Literal::int(11)),
1775                        Some(Literal::int(21)),
1776                    ]))),
1777                ])),
1778                Some(Literal::List(vec![
1779                    Some(Literal::Map(Map::from_iter(vec![
1780                        (Literal::int(1), Some(Literal::int(100))),
1781                        (Literal::int(3), Some(Literal::int(300))),
1782                    ]))),
1783                    Some(Literal::Map(Map::from_iter(vec![(
1784                        Literal::int(2),
1785                        Some(Literal::int(200))
1786                    ),]))),
1787                ])),
1788                Some(Literal::List(vec![
1789                    Some(Literal::List(vec![
1790                        Some(Literal::int(100)),
1791                        Some(Literal::int(101)),
1792                        Some(Literal::int(102)),
1793                    ])),
1794                    Some(Literal::List(vec![
1795                        Some(Literal::int(200)),
1796                        Some(Literal::int(201)),
1797                    ])),
1798                ])),
1799            ]))),
1800            Some(Literal::Struct(Struct::from_iter(vec![
1801                Some(Literal::List(vec![
1802                    Some(Literal::Struct(Struct::from_iter(vec![
1803                        Some(Literal::int(12)),
1804                        Some(Literal::int(22)),
1805                    ]))),
1806                    Some(Literal::Struct(Struct::from_iter(vec![
1807                        Some(Literal::int(13)),
1808                        Some(Literal::int(23)),
1809                    ]))),
1810                ])),
1811                Some(Literal::List(vec![Some(Literal::Map(Map::from_iter(
1812                    vec![(Literal::int(3), Some(Literal::int(300))),]
1813                ))),])),
1814                Some(Literal::List(vec![
1815                    Some(Literal::List(vec![
1816                        Some(Literal::int(300)),
1817                        Some(Literal::int(301)),
1818                        Some(Literal::int(302)),
1819                    ])),
1820                    Some(Literal::List(vec![
1821                        Some(Literal::int(400)),
1822                        Some(Literal::int(401)),
1823                    ])),
1824                ])),
1825            ]))),
1826        ]);
1827    }
1828
1829    #[test]
1830    fn test_create_decimal_array_respects_precision() {
1831        // Decimal128Array::from() uses Arrow's default precision (38) instead of the
1832        // target precision, causing RecordBatch construction to fail when schemas don't match.
1833        let target_precision = 18u8;
1834        let target_scale = 10i8;
1835        let target_type = DataType::Decimal128(target_precision, target_scale);
1836        let value = PrimitiveLiteral::Int128(10000000000);
1837
1838        let array = create_primitive_array_single_element(&target_type, &Some(value))
1839            .expect("Failed to create decimal array");
1840
1841        match array.data_type() {
1842            DataType::Decimal128(precision, scale) => {
1843                assert_eq!(*precision, target_precision);
1844                assert_eq!(*scale, target_scale);
1845            }
1846            other => panic!("Expected Decimal128, got {other:?}"),
1847        }
1848    }
1849
1850    #[test]
1851    fn test_create_decimal_array_repeated_respects_precision() {
1852        // Ensure repeated arrays also respect target precision, not Arrow's default.
1853        let target_precision = 18u8;
1854        let target_scale = 10i8;
1855        let target_type = DataType::Decimal128(target_precision, target_scale);
1856        let value = PrimitiveLiteral::Int128(10000000000);
1857        let num_rows = 5;
1858
1859        let array = create_primitive_array_repeated(&target_type, &Some(value), num_rows)
1860            .expect("Failed to create repeated decimal array");
1861
1862        match array.data_type() {
1863            DataType::Decimal128(precision, scale) => {
1864                assert_eq!(*precision, target_precision);
1865                assert_eq!(*scale, target_scale);
1866            }
1867            other => panic!("Expected Decimal128, got {other:?}"),
1868        }
1869
1870        assert_eq!(array.len(), num_rows);
1871    }
1872
1873    #[test]
1874    fn test_create_timestamp_microsecond_array_repeated() {
1875        let target_type = DataType::Timestamp(TimeUnit::Microsecond, None);
1876        let value = PrimitiveLiteral::Long(1_740_600_000_000_000);
1877        let num_rows = 3;
1878
1879        let array = create_primitive_array_repeated(&target_type, &Some(value), num_rows)
1880            .expect("Failed to create repeated timestamp microsecond array");
1881
1882        assert_eq!(array.data_type(), &target_type);
1883        assert_eq!(array.len(), num_rows);
1884    }
1885
1886    #[test]
1887    fn test_create_timestamp_microsecond_with_timezone_array_repeated() {
1888        let target_type = DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()));
1889        let value = PrimitiveLiteral::Long(1_740_600_000_000_000);
1890        let num_rows = 2;
1891
1892        let array = create_primitive_array_repeated(&target_type, &Some(value), num_rows)
1893            .expect("Failed to create repeated timestamp microsecond array with timezone");
1894
1895        assert_eq!(array.data_type(), &target_type);
1896        assert_eq!(array.len(), num_rows);
1897    }
1898}