1use std::sync::Arc;
19
20use arrow_array::{
21 Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, FixedSizeBinaryArray,
22 FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array, LargeBinaryArray,
23 LargeListArray, LargeStringArray, ListArray, MapArray, StringArray, StructArray,
24 Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray,
25};
26use arrow_buffer::NullBuffer;
27use arrow_schema::{DataType, FieldRef, TimeUnit};
28use uuid::Uuid;
29
30use super::get_field_id_from_metadata;
31use crate::spec::{
32 ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveLiteral, PrimitiveType,
33 SchemaWithPartnerVisitor, Struct, StructType, Type, visit_struct_with_partner,
34 visit_type_with_partner,
35};
36use crate::{Error, ErrorKind, Result};
37
38struct ArrowArrayToIcebergStructConverter;
39
40impl SchemaWithPartnerVisitor<ArrayRef> for ArrowArrayToIcebergStructConverter {
41 type T = Vec<Option<Literal>>;
42
43 fn schema(
44 &mut self,
45 _schema: &crate::spec::Schema,
46 _partner: &ArrayRef,
47 value: Vec<Option<Literal>>,
48 ) -> Result<Vec<Option<Literal>>> {
49 Ok(value)
50 }
51
52 fn field(
53 &mut self,
54 field: &crate::spec::NestedFieldRef,
55 _partner: &ArrayRef,
56 value: Vec<Option<Literal>>,
57 ) -> Result<Vec<Option<Literal>>> {
58 if field.required && value.iter().any(Option::is_none) {
60 return Err(Error::new(
61 ErrorKind::DataInvalid,
62 "The field is required but has null value",
63 )
64 .with_context("field_id", field.id.to_string())
65 .with_context("field_name", &field.name));
66 }
67 Ok(value)
68 }
69
70 fn r#struct(
71 &mut self,
72 _struct: &StructType,
73 array: &ArrayRef,
74 results: Vec<Vec<Option<Literal>>>,
75 ) -> Result<Vec<Option<Literal>>> {
76 let row_len = results.first().map(|column| column.len()).unwrap_or(0);
77 if let Some(col) = results.iter().find(|col| col.len() != row_len) {
78 return Err(Error::new(
79 ErrorKind::DataInvalid,
80 "The struct columns have different row length",
81 )
82 .with_context("first col length", row_len.to_string())
83 .with_context("actual col length", col.len().to_string()));
84 }
85
86 let mut struct_literals = Vec::with_capacity(row_len);
87 let mut columns_iters = results
88 .into_iter()
89 .map(|column| column.into_iter())
90 .collect::<Vec<_>>();
91
92 for i in 0..row_len {
93 let mut literals = Vec::with_capacity(columns_iters.len());
94 for column_iter in columns_iters.iter_mut() {
95 literals.push(column_iter.next().unwrap());
96 }
97 if array.is_null(i) {
98 struct_literals.push(None);
99 } else {
100 struct_literals.push(Some(Literal::Struct(Struct::from_iter(literals))));
101 }
102 }
103
104 Ok(struct_literals)
105 }
106
107 fn list(
108 &mut self,
109 list: &ListType,
110 array: &ArrayRef,
111 elements: Vec<Option<Literal>>,
112 ) -> Result<Vec<Option<Literal>>> {
113 if list.element_field.required && elements.iter().any(Option::is_none) {
114 return Err(Error::new(
115 ErrorKind::DataInvalid,
116 "The list should not have null value",
117 ));
118 }
119 match array.data_type() {
120 DataType::List(_) => {
121 let offset = array
122 .as_any()
123 .downcast_ref::<ListArray>()
124 .ok_or_else(|| {
125 Error::new(ErrorKind::DataInvalid, "The partner is not a list array")
126 })?
127 .offsets();
128 let mut result = Vec::with_capacity(offset.len() - 1);
130 for i in 0..offset.len() - 1 {
131 let start = offset[i] as usize;
132 let end = offset[i + 1] as usize;
133 result.push(Some(Literal::List(elements[start..end].to_vec())));
134 }
135 Ok(result)
136 }
137 DataType::LargeList(_) => {
138 let offset = array
139 .as_any()
140 .downcast_ref::<LargeListArray>()
141 .ok_or_else(|| {
142 Error::new(
143 ErrorKind::DataInvalid,
144 "The partner is not a large list array",
145 )
146 })?
147 .offsets();
148 let mut result = Vec::with_capacity(offset.len() - 1);
150 for i in 0..offset.len() - 1 {
151 let start = offset[i] as usize;
152 let end = offset[i + 1] as usize;
153 result.push(Some(Literal::List(elements[start..end].to_vec())));
154 }
155 Ok(result)
156 }
157 DataType::FixedSizeList(_, len) => {
158 let mut result = Vec::with_capacity(elements.len() / *len as usize);
159 for i in 0..elements.len() / *len as usize {
160 let start = i * *len as usize;
161 let end = (i + 1) * *len as usize;
162 result.push(Some(Literal::List(elements[start..end].to_vec())));
163 }
164 Ok(result)
165 }
166 _ => Err(Error::new(
167 ErrorKind::DataInvalid,
168 "The partner is not a list type",
169 )),
170 }
171 }
172
173 fn map(
174 &mut self,
175 _map: &MapType,
176 partner: &ArrayRef,
177 key_values: Vec<Option<Literal>>,
178 values: Vec<Option<Literal>>,
179 ) -> Result<Vec<Option<Literal>>> {
180 if key_values.len() != values.len() {
182 return Err(Error::new(
183 ErrorKind::DataInvalid,
184 "The key value and value of map should have the same row length",
185 ));
186 }
187
188 let offsets = partner
189 .as_any()
190 .downcast_ref::<MapArray>()
191 .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "The partner is not a map array"))?
192 .offsets();
193 let mut result = Vec::with_capacity(offsets.len() - 1);
195 for i in 0..offsets.len() - 1 {
196 let start = offsets[i] as usize;
197 let end = offsets[i + 1] as usize;
198 let mut map = Map::new();
199 for (key, value) in key_values[start..end].iter().zip(values[start..end].iter()) {
200 map.insert(key.clone().unwrap(), value.clone());
201 }
202 result.push(Some(Literal::Map(map)));
203 }
204 Ok(result)
205 }
206
207 fn primitive(&mut self, p: &PrimitiveType, partner: &ArrayRef) -> Result<Vec<Option<Literal>>> {
208 match p {
209 PrimitiveType::Boolean => {
210 let array = partner
211 .as_any()
212 .downcast_ref::<BooleanArray>()
213 .ok_or_else(|| {
214 Error::new(ErrorKind::DataInvalid, "The partner is not a boolean array")
215 })?;
216 Ok(array.iter().map(|v| v.map(Literal::bool)).collect())
217 }
218 PrimitiveType::Int => {
219 let array = partner
220 .as_any()
221 .downcast_ref::<Int32Array>()
222 .ok_or_else(|| {
223 Error::new(ErrorKind::DataInvalid, "The partner is not a int32 array")
224 })?;
225 Ok(array.iter().map(|v| v.map(Literal::int)).collect())
226 }
227 PrimitiveType::Long => {
228 let array = partner
229 .as_any()
230 .downcast_ref::<Int64Array>()
231 .ok_or_else(|| {
232 Error::new(ErrorKind::DataInvalid, "The partner is not a int64 array")
233 })?;
234 Ok(array.iter().map(|v| v.map(Literal::long)).collect())
235 }
236 PrimitiveType::Float => {
237 let array = partner
238 .as_any()
239 .downcast_ref::<Float32Array>()
240 .ok_or_else(|| {
241 Error::new(ErrorKind::DataInvalid, "The partner is not a float32 array")
242 })?;
243 Ok(array.iter().map(|v| v.map(Literal::float)).collect())
244 }
245 PrimitiveType::Double => {
246 let array = partner
247 .as_any()
248 .downcast_ref::<Float64Array>()
249 .ok_or_else(|| {
250 Error::new(ErrorKind::DataInvalid, "The partner is not a float64 array")
251 })?;
252 Ok(array.iter().map(|v| v.map(Literal::double)).collect())
253 }
254 PrimitiveType::Decimal { precision, scale } => {
255 let array = partner
256 .as_any()
257 .downcast_ref::<Decimal128Array>()
258 .ok_or_else(|| {
259 Error::new(
260 ErrorKind::DataInvalid,
261 "The partner is not a decimal128 array",
262 )
263 })?;
264 if let DataType::Decimal128(arrow_precision, arrow_scale) = array.data_type()
265 && (*arrow_precision as u32 != *precision || *arrow_scale as u32 != *scale)
266 {
267 return Err(Error::new(
268 ErrorKind::DataInvalid,
269 format!(
270 "The precision or scale ({arrow_precision},{arrow_scale}) of arrow decimal128 array is not compatible with iceberg decimal type ({precision},{scale})"
271 ),
272 ));
273 }
274 Ok(array.iter().map(|v| v.map(Literal::decimal)).collect())
275 }
276 PrimitiveType::Date => {
277 let array = partner
278 .as_any()
279 .downcast_ref::<Date32Array>()
280 .ok_or_else(|| {
281 Error::new(ErrorKind::DataInvalid, "The partner is not a date32 array")
282 })?;
283 Ok(array.iter().map(|v| v.map(Literal::date)).collect())
284 }
285 PrimitiveType::Time => {
286 let array = partner
287 .as_any()
288 .downcast_ref::<Time64MicrosecondArray>()
289 .ok_or_else(|| {
290 Error::new(ErrorKind::DataInvalid, "The partner is not a time64 array")
291 })?;
292 Ok(array.iter().map(|v| v.map(Literal::time)).collect())
293 }
294 PrimitiveType::Timestamp => {
295 let array = partner
296 .as_any()
297 .downcast_ref::<TimestampMicrosecondArray>()
298 .ok_or_else(|| {
299 Error::new(
300 ErrorKind::DataInvalid,
301 "The partner is not a timestamp array",
302 )
303 })?;
304 Ok(array.iter().map(|v| v.map(Literal::timestamp)).collect())
305 }
306 PrimitiveType::Timestamptz => {
307 let array = partner
308 .as_any()
309 .downcast_ref::<TimestampMicrosecondArray>()
310 .ok_or_else(|| {
311 Error::new(
312 ErrorKind::DataInvalid,
313 "The partner is not a timestamptz array",
314 )
315 })?;
316 Ok(array.iter().map(|v| v.map(Literal::timestamptz)).collect())
317 }
318 PrimitiveType::TimestampNs => {
319 let array = partner
320 .as_any()
321 .downcast_ref::<TimestampNanosecondArray>()
322 .ok_or_else(|| {
323 Error::new(
324 ErrorKind::DataInvalid,
325 "The partner is not a timestamp_ns array",
326 )
327 })?;
328 Ok(array
329 .iter()
330 .map(|v| v.map(Literal::timestamp_nano))
331 .collect())
332 }
333 PrimitiveType::TimestamptzNs => {
334 let array = partner
335 .as_any()
336 .downcast_ref::<TimestampNanosecondArray>()
337 .ok_or_else(|| {
338 Error::new(
339 ErrorKind::DataInvalid,
340 "The partner is not a timestamptz_ns array",
341 )
342 })?;
343 Ok(array
344 .iter()
345 .map(|v| v.map(Literal::timestamptz_nano))
346 .collect())
347 }
348 PrimitiveType::String => {
349 if let Some(array) = partner.as_any().downcast_ref::<LargeStringArray>() {
350 Ok(array.iter().map(|v| v.map(Literal::string)).collect())
351 } else if let Some(array) = partner.as_any().downcast_ref::<StringArray>() {
352 Ok(array.iter().map(|v| v.map(Literal::string)).collect())
353 } else {
354 Err(Error::new(
355 ErrorKind::DataInvalid,
356 "The partner is not a string array",
357 ))
358 }
359 }
360 PrimitiveType::Uuid => {
361 if let Some(array) = partner.as_any().downcast_ref::<FixedSizeBinaryArray>() {
362 if array.value_length() != 16 {
363 return Err(Error::new(
364 ErrorKind::DataInvalid,
365 "The partner is not a uuid array",
366 ));
367 }
368 Ok(array
369 .iter()
370 .map(|v| {
371 v.map(|v| {
372 Ok(Literal::uuid(Uuid::from_bytes(v.try_into().map_err(
373 |_| {
374 Error::new(
375 ErrorKind::DataInvalid,
376 "Failed to convert binary to uuid",
377 )
378 },
379 )?)))
380 })
381 .transpose()
382 })
383 .collect::<Result<Vec<_>>>()?)
384 } else {
385 Err(Error::new(
386 ErrorKind::DataInvalid,
387 "The partner is not a uuid array",
388 ))
389 }
390 }
391 PrimitiveType::Fixed(len) => {
392 let array = partner
393 .as_any()
394 .downcast_ref::<FixedSizeBinaryArray>()
395 .ok_or_else(|| {
396 Error::new(ErrorKind::DataInvalid, "The partner is not a fixed array")
397 })?;
398 if array.value_length() != *len as i32 {
399 return Err(Error::new(
400 ErrorKind::DataInvalid,
401 "The length of fixed size binary array is not compatible with iceberg fixed type",
402 ));
403 }
404 Ok(array
405 .iter()
406 .map(|v| v.map(|v| Literal::fixed(v.iter().cloned())))
407 .collect())
408 }
409 PrimitiveType::Binary => {
410 if let Some(array) = partner.as_any().downcast_ref::<LargeBinaryArray>() {
411 Ok(array
412 .iter()
413 .map(|v| v.map(|v| Literal::binary(v.to_vec())))
414 .collect())
415 } else if let Some(array) = partner.as_any().downcast_ref::<BinaryArray>() {
416 Ok(array
417 .iter()
418 .map(|v| v.map(|v| Literal::binary(v.to_vec())))
419 .collect())
420 } else {
421 Err(Error::new(
422 ErrorKind::DataInvalid,
423 "The partner is not a binary array",
424 ))
425 }
426 }
427 }
428 }
429}
430
431#[derive(Clone, Copy, Debug)]
442pub enum FieldMatchMode {
443 Id,
445 Name,
447}
448
449impl FieldMatchMode {
450 pub fn match_field(&self, arrow_field: &FieldRef, iceberg_field: &NestedField) -> bool {
452 match self {
453 FieldMatchMode::Id => get_field_id_from_metadata(arrow_field)
454 .map(|id| id == iceberg_field.id)
455 .unwrap_or(false),
456 FieldMatchMode::Name => arrow_field.name() == &iceberg_field.name,
457 }
458 }
459}
460
461pub struct ArrowArrayAccessor {
463 match_mode: FieldMatchMode,
464}
465
466impl ArrowArrayAccessor {
467 pub fn new() -> Self {
469 Self {
470 match_mode: FieldMatchMode::Id,
471 }
472 }
473
474 pub fn new_with_match_mode(match_mode: FieldMatchMode) -> Self {
476 Self { match_mode }
477 }
478}
479
480impl Default for ArrowArrayAccessor {
481 fn default() -> Self {
482 Self::new()
483 }
484}
485
486impl PartnerAccessor<ArrayRef> for ArrowArrayAccessor {
487 fn struct_partner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
488 if !matches!(schema_partner.data_type(), DataType::Struct(_)) {
489 return Err(Error::new(
490 ErrorKind::DataInvalid,
491 "The schema partner is not a struct type",
492 ));
493 }
494
495 Ok(schema_partner)
496 }
497
498 fn field_partner<'a>(
499 &self,
500 struct_partner: &'a ArrayRef,
501 field: &NestedField,
502 ) -> Result<&'a ArrayRef> {
503 let struct_array = struct_partner
504 .as_any()
505 .downcast_ref::<StructArray>()
506 .ok_or_else(|| {
507 Error::new(
508 ErrorKind::DataInvalid,
509 format!(
510 "The struct partner is not a struct array, partner: {struct_partner:?}"
511 ),
512 )
513 })?;
514
515 let field_pos = struct_array
516 .fields()
517 .iter()
518 .position(|arrow_field| self.match_mode.match_field(arrow_field, field))
519 .ok_or_else(|| {
520 Error::new(
521 ErrorKind::DataInvalid,
522 format!("Field id {} not found in struct array", field.id),
523 )
524 })?;
525
526 Ok(struct_array.column(field_pos))
527 }
528
529 fn list_element_partner<'a>(&self, list_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
530 match list_partner.data_type() {
531 DataType::List(_) => {
532 let list_array = list_partner
533 .as_any()
534 .downcast_ref::<ListArray>()
535 .ok_or_else(|| {
536 Error::new(
537 ErrorKind::DataInvalid,
538 "The list partner is not a list array",
539 )
540 })?;
541 Ok(list_array.values())
542 }
543 DataType::LargeList(_) => {
544 let list_array = list_partner
545 .as_any()
546 .downcast_ref::<LargeListArray>()
547 .ok_or_else(|| {
548 Error::new(
549 ErrorKind::DataInvalid,
550 "The list partner is not a large list array",
551 )
552 })?;
553 Ok(list_array.values())
554 }
555 DataType::FixedSizeList(_, _) => {
556 let list_array = list_partner
557 .as_any()
558 .downcast_ref::<FixedSizeListArray>()
559 .ok_or_else(|| {
560 Error::new(
561 ErrorKind::DataInvalid,
562 "The list partner is not a fixed size list array",
563 )
564 })?;
565 Ok(list_array.values())
566 }
567 _ => Err(Error::new(
568 ErrorKind::DataInvalid,
569 "The list partner is not a list type",
570 )),
571 }
572 }
573
574 fn map_key_partner<'a>(&self, map_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
575 let map_array = map_partner
576 .as_any()
577 .downcast_ref::<MapArray>()
578 .ok_or_else(|| {
579 Error::new(ErrorKind::DataInvalid, "The map partner is not a map array")
580 })?;
581 Ok(map_array.keys())
582 }
583
584 fn map_value_partner<'a>(&self, map_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
585 let map_array = map_partner
586 .as_any()
587 .downcast_ref::<MapArray>()
588 .ok_or_else(|| {
589 Error::new(ErrorKind::DataInvalid, "The map partner is not a map array")
590 })?;
591 Ok(map_array.values())
592 }
593}
594
595pub fn arrow_struct_to_literal(
598 struct_array: &ArrayRef,
599 ty: &StructType,
600) -> Result<Vec<Option<Literal>>> {
601 visit_struct_with_partner(
602 ty,
603 struct_array,
604 &mut ArrowArrayToIcebergStructConverter,
605 &ArrowArrayAccessor::new(),
606 )
607}
608
609pub fn arrow_primitive_to_literal(
612 primitive_array: &ArrayRef,
613 ty: &Type,
614) -> Result<Vec<Option<Literal>>> {
615 visit_type_with_partner(
616 ty,
617 primitive_array,
618 &mut ArrowArrayToIcebergStructConverter,
619 &ArrowArrayAccessor::new(),
620 )
621}
622
623pub(crate) fn create_primitive_array_single_element(
628 data_type: &DataType,
629 prim_lit: &Option<PrimitiveLiteral>,
630) -> Result<ArrayRef> {
631 match (data_type, prim_lit) {
632 (DataType::Boolean, Some(PrimitiveLiteral::Boolean(v))) => {
633 Ok(Arc::new(BooleanArray::from(vec![*v])))
634 }
635 (DataType::Boolean, None) => Ok(Arc::new(BooleanArray::from(vec![Option::<bool>::None]))),
636 (DataType::Int32, Some(PrimitiveLiteral::Int(v))) => {
637 Ok(Arc::new(Int32Array::from(vec![*v])))
638 }
639 (DataType::Int32, None) => Ok(Arc::new(Int32Array::from(vec![Option::<i32>::None]))),
640 (DataType::Date32, Some(PrimitiveLiteral::Int(v))) => {
641 Ok(Arc::new(Date32Array::from(vec![*v])))
642 }
643 (DataType::Date32, None) => Ok(Arc::new(Date32Array::from(vec![Option::<i32>::None]))),
644 (DataType::Int64, Some(PrimitiveLiteral::Long(v))) => {
645 Ok(Arc::new(Int64Array::from(vec![*v])))
646 }
647 (DataType::Int64, None) => Ok(Arc::new(Int64Array::from(vec![Option::<i64>::None]))),
648 (DataType::Timestamp(TimeUnit::Microsecond, timezone), Some(PrimitiveLiteral::Long(v))) => {
649 let array = TimestampMicrosecondArray::from(vec![*v]);
650 if let Some(timezone) = timezone {
651 Ok(Arc::new(array.with_timezone(timezone.clone())))
652 } else {
653 Ok(Arc::new(array))
654 }
655 }
656 (DataType::Timestamp(TimeUnit::Microsecond, timezone), None) => {
657 let array = TimestampMicrosecondArray::from(vec![Option::<i64>::None]);
658 if let Some(timezone) = timezone {
659 Ok(Arc::new(array.with_timezone(timezone.clone())))
660 } else {
661 Ok(Arc::new(array))
662 }
663 }
664 (DataType::Timestamp(TimeUnit::Nanosecond, timezone), Some(PrimitiveLiteral::Long(v))) => {
665 let array = TimestampNanosecondArray::from(vec![*v]);
666 if let Some(timezone) = timezone {
667 Ok(Arc::new(array.with_timezone(timezone.clone())))
668 } else {
669 Ok(Arc::new(array))
670 }
671 }
672 (DataType::Timestamp(TimeUnit::Nanosecond, timezone), None) => {
673 let array = TimestampNanosecondArray::from(vec![Option::<i64>::None]);
674 if let Some(timezone) = timezone {
675 Ok(Arc::new(array.with_timezone(timezone.clone())))
676 } else {
677 Ok(Arc::new(array))
678 }
679 }
680 (DataType::Float32, Some(PrimitiveLiteral::Float(v))) => {
681 Ok(Arc::new(Float32Array::from(vec![v.0])))
682 }
683 (DataType::Float32, None) => Ok(Arc::new(Float32Array::from(vec![Option::<f32>::None]))),
684 (DataType::Float64, Some(PrimitiveLiteral::Double(v))) => {
685 Ok(Arc::new(Float64Array::from(vec![v.0])))
686 }
687 (DataType::Float64, None) => Ok(Arc::new(Float64Array::from(vec![Option::<f64>::None]))),
688 (DataType::Utf8, Some(PrimitiveLiteral::String(v))) => {
689 Ok(Arc::new(StringArray::from(vec![v.as_str()])))
690 }
691 (DataType::Utf8, None) => Ok(Arc::new(StringArray::from(vec![Option::<&str>::None]))),
692 (DataType::Binary, Some(PrimitiveLiteral::Binary(v))) => {
693 Ok(Arc::new(BinaryArray::from_vec(vec![v.as_slice()])))
694 }
695 (DataType::Binary, None) => Ok(Arc::new(BinaryArray::from_opt_vec(vec![
696 Option::<&[u8]>::None,
697 ]))),
698 (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::Int128(v))) => {
699 let array = Decimal128Array::from(vec![{ *v }])
700 .with_precision_and_scale(*precision, *scale)
701 .map_err(|e| {
702 Error::new(
703 ErrorKind::DataInvalid,
704 format!(
705 "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
706 ),
707 )
708 })?;
709 Ok(Arc::new(array))
710 }
711 (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::UInt128(v))) => {
712 let array = Decimal128Array::from(vec![*v as i128])
713 .with_precision_and_scale(*precision, *scale)
714 .map_err(|e| {
715 Error::new(
716 ErrorKind::DataInvalid,
717 format!(
718 "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
719 ),
720 )
721 })?;
722 Ok(Arc::new(array))
723 }
724 (DataType::Decimal128(precision, scale), None) => {
725 let array = Decimal128Array::from(vec![Option::<i128>::None])
726 .with_precision_and_scale(*precision, *scale)
727 .map_err(|e| {
728 Error::new(
729 ErrorKind::DataInvalid,
730 format!(
731 "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
732 ),
733 )
734 })?;
735 Ok(Arc::new(array))
736 }
737 (DataType::Struct(fields), None) => {
738 let null_arrays: Vec<ArrayRef> = fields
740 .iter()
741 .map(|f| {
742 match f.data_type() {
745 DataType::Boolean => {
746 Ok(Arc::new(BooleanArray::from(vec![Option::<bool>::None]))
747 as ArrayRef)
748 }
749 DataType::Int32 | DataType::Date32 => {
750 Ok(Arc::new(Int32Array::from(vec![Option::<i32>::None])) as ArrayRef)
751 }
752 DataType::Int64 => {
753 Ok(Arc::new(Int64Array::from(vec![Option::<i64>::None])) as ArrayRef)
754 }
755 DataType::Timestamp(TimeUnit::Microsecond, timezone) => {
756 let array = TimestampMicrosecondArray::from(vec![Option::<i64>::None]);
757 if let Some(timezone) = timezone {
758 Ok(Arc::new(array.with_timezone(timezone.clone())) as ArrayRef)
759 } else {
760 Ok(Arc::new(array) as ArrayRef)
761 }
762 }
763 DataType::Timestamp(TimeUnit::Nanosecond, timezone) => {
764 let array = TimestampNanosecondArray::from(vec![Option::<i64>::None]);
765 if let Some(timezone) = timezone {
766 Ok(Arc::new(array.with_timezone(timezone.clone())) as ArrayRef)
767 } else {
768 Ok(Arc::new(array) as ArrayRef)
769 }
770 }
771 DataType::Float32 => {
772 Ok(Arc::new(Float32Array::from(vec![Option::<f32>::None])) as ArrayRef)
773 }
774 DataType::Float64 => {
775 Ok(Arc::new(Float64Array::from(vec![Option::<f64>::None])) as ArrayRef)
776 }
777 DataType::Utf8 => {
778 Ok(Arc::new(StringArray::from(vec![Option::<&str>::None])) as ArrayRef)
779 }
780 DataType::Binary => {
781 Ok(
782 Arc::new(BinaryArray::from_opt_vec(vec![Option::<&[u8]>::None]))
783 as ArrayRef,
784 )
785 }
786 _ => Err(Error::new(
787 ErrorKind::Unexpected,
788 format!("Unsupported struct field type: {:?}", f.data_type()),
789 )),
790 }
791 })
792 .collect::<Result<Vec<_>>>()?;
793 Ok(Arc::new(arrow_array::StructArray::new(
794 fields.clone(),
795 null_arrays,
796 Some(arrow_buffer::NullBuffer::new_null(1)),
797 )))
798 }
799 _ => Err(Error::new(
800 ErrorKind::Unexpected,
801 format!("Unsupported constant type combination: {data_type:?} with {prim_lit:?}"),
802 )),
803 }
804}
805
806pub(crate) fn create_primitive_array_repeated(
811 data_type: &DataType,
812 prim_lit: &Option<PrimitiveLiteral>,
813 num_rows: usize,
814) -> Result<ArrayRef> {
815 Ok(match (data_type, prim_lit) {
816 (DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => {
817 Arc::new(BooleanArray::from(vec![*value; num_rows]))
818 }
819 (DataType::Boolean, None) => {
820 let vals: Vec<Option<bool>> = vec![None; num_rows];
821 Arc::new(BooleanArray::from(vals))
822 }
823 (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => {
824 Arc::new(Int32Array::from(vec![*value; num_rows]))
825 }
826 (DataType::Int32, None) => {
827 let vals: Vec<Option<i32>> = vec![None; num_rows];
828 Arc::new(Int32Array::from(vals))
829 }
830 (DataType::Date32, Some(PrimitiveLiteral::Int(value))) => {
831 Arc::new(Date32Array::from(vec![*value; num_rows]))
832 }
833 (DataType::Date32, None) => {
834 let vals: Vec<Option<i32>> = vec![None; num_rows];
835 Arc::new(Date32Array::from(vals))
836 }
837 (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => {
838 Arc::new(Int64Array::from(vec![*value; num_rows]))
839 }
840 (DataType::Int64, None) => {
841 let vals: Vec<Option<i64>> = vec![None; num_rows];
842 Arc::new(Int64Array::from(vals))
843 }
844 (
845 DataType::Timestamp(TimeUnit::Microsecond, timezone),
846 Some(PrimitiveLiteral::Long(value)),
847 ) => {
848 let array = TimestampMicrosecondArray::from(vec![*value; num_rows]);
849 if let Some(timezone) = timezone {
850 Arc::new(array.with_timezone(timezone.clone()))
851 } else {
852 Arc::new(array)
853 }
854 }
855 (DataType::Timestamp(TimeUnit::Microsecond, timezone), None) => {
856 let vals: Vec<Option<i64>> = vec![None; num_rows];
857 let array = TimestampMicrosecondArray::from(vals);
858 if let Some(timezone) = timezone {
859 Arc::new(array.with_timezone(timezone.clone()))
860 } else {
861 Arc::new(array)
862 }
863 }
864 (
865 DataType::Timestamp(TimeUnit::Nanosecond, timezone),
866 Some(PrimitiveLiteral::Long(value)),
867 ) => {
868 let array = TimestampNanosecondArray::from(vec![*value; num_rows]);
869 if let Some(timezone) = timezone {
870 Arc::new(array.with_timezone(timezone.clone()))
871 } else {
872 Arc::new(array)
873 }
874 }
875 (DataType::Timestamp(TimeUnit::Nanosecond, timezone), None) => {
876 let vals: Vec<Option<i64>> = vec![None; num_rows];
877 let array = TimestampNanosecondArray::from(vals);
878 if let Some(timezone) = timezone {
879 Arc::new(array.with_timezone(timezone.clone()))
880 } else {
881 Arc::new(array)
882 }
883 }
884 (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => {
885 Arc::new(Float32Array::from(vec![value.0; num_rows]))
886 }
887 (DataType::Float32, None) => {
888 let vals: Vec<Option<f32>> = vec![None; num_rows];
889 Arc::new(Float32Array::from(vals))
890 }
891 (DataType::Float64, Some(PrimitiveLiteral::Double(value))) => {
892 Arc::new(Float64Array::from(vec![value.0; num_rows]))
893 }
894 (DataType::Float64, None) => {
895 let vals: Vec<Option<f64>> = vec![None; num_rows];
896 Arc::new(Float64Array::from(vals))
897 }
898 (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => {
899 Arc::new(StringArray::from(vec![value.clone(); num_rows]))
900 }
901 (DataType::Utf8, None) => {
902 let vals: Vec<Option<String>> = vec![None; num_rows];
903 Arc::new(StringArray::from(vals))
904 }
905 (DataType::Binary, Some(PrimitiveLiteral::Binary(value))) => {
906 Arc::new(BinaryArray::from_vec(vec![value; num_rows]))
907 }
908 (DataType::Binary, None) => {
909 let vals: Vec<Option<&[u8]>> = vec![None; num_rows];
910 Arc::new(BinaryArray::from_opt_vec(vals))
911 }
912 (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::Int128(value))) => {
913 Arc::new(
914 Decimal128Array::from(vec![*value; num_rows])
915 .with_precision_and_scale(*precision, *scale)
916 .map_err(|e| {
917 Error::new(
918 ErrorKind::DataInvalid,
919 format!(
920 "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
921 ),
922 )
923 })?,
924 )
925 }
926 (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::UInt128(value))) => {
927 Arc::new(
928 Decimal128Array::from(vec![*value as i128; num_rows])
929 .with_precision_and_scale(*precision, *scale)
930 .map_err(|e| {
931 Error::new(
932 ErrorKind::DataInvalid,
933 format!(
934 "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
935 ),
936 )
937 })?,
938 )
939 }
940 (DataType::Decimal128(precision, scale), None) => {
941 let vals: Vec<Option<i128>> = vec![None; num_rows];
942 Arc::new(
943 Decimal128Array::from(vals)
944 .with_precision_and_scale(*precision, *scale)
945 .map_err(|e| {
946 Error::new(
947 ErrorKind::DataInvalid,
948 format!(
949 "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}"
950 ),
951 )
952 })?,
953 )
954 }
955 (DataType::Struct(fields), None) => {
956 let null_arrays: Vec<ArrayRef> = fields
958 .iter()
959 .map(|field| create_primitive_array_repeated(field.data_type(), &None, num_rows))
960 .collect::<Result<Vec<_>>>()?;
961
962 Arc::new(StructArray::new(
963 fields.clone(),
964 null_arrays,
965 Some(NullBuffer::new_null(num_rows)),
966 ))
967 }
968 (DataType::Null, _) => Arc::new(arrow_array::NullArray::new(num_rows)),
969 (dt, _) => {
970 return Err(Error::new(
971 ErrorKind::Unexpected,
972 format!("unexpected target column type {dt}"),
973 ));
974 }
975 })
976}
977
978#[cfg(test)]
979mod test {
980 use std::collections::HashMap;
981 use std::sync::Arc;
982
983 use arrow_array::builder::{Int32Builder, ListBuilder, MapBuilder, StructBuilder};
984 use arrow_array::{
985 ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array,
986 Float64Array, Int32Array, Int64Array, StringArray, StructArray, Time64MicrosecondArray,
987 TimestampMicrosecondArray, TimestampNanosecondArray,
988 };
989 use arrow_schema::{DataType, Field, Fields, TimeUnit};
990 use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
991
992 use super::*;
993 use crate::spec::{ListType, Literal, MapType, NestedField, PrimitiveType, StructType, Type};
994
995 #[test]
996 fn test_arrow_struct_to_iceberg_struct() {
997 let bool_array = BooleanArray::from(vec![Some(true), Some(false), None]);
998 let int32_array = Int32Array::from(vec![Some(3), Some(4), None]);
999 let int64_array = Int64Array::from(vec![Some(5), Some(6), None]);
1000 let float32_array = Float32Array::from(vec![Some(1.1), Some(2.2), None]);
1001 let float64_array = Float64Array::from(vec![Some(3.3), Some(4.4), None]);
1002 let decimal_array = Decimal128Array::from(vec![Some(1000), Some(2000), None])
1003 .with_precision_and_scale(10, 2)
1004 .unwrap();
1005 let date_array = Date32Array::from(vec![Some(18628), Some(18629), None]);
1006 let time_array = Time64MicrosecondArray::from(vec![Some(123456789), Some(987654321), None]);
1007 let timestamp_micro_array = TimestampMicrosecondArray::from(vec![
1008 Some(1622548800000000),
1009 Some(1622635200000000),
1010 None,
1011 ]);
1012 let timestamp_nano_array = TimestampNanosecondArray::from(vec![
1013 Some(1622548800000000000),
1014 Some(1622635200000000000),
1015 None,
1016 ]);
1017 let string_array = StringArray::from(vec![Some("a"), Some("b"), None]);
1018 let binary_array =
1019 BinaryArray::from(vec![Some(b"abc".as_ref()), Some(b"def".as_ref()), None]);
1020
1021 let struct_array = Arc::new(StructArray::from(vec![
1022 (
1023 Arc::new(
1024 Field::new("bool_field", DataType::Boolean, true).with_metadata(HashMap::from(
1025 [(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())],
1026 )),
1027 ),
1028 Arc::new(bool_array) as ArrayRef,
1029 ),
1030 (
1031 Arc::new(
1032 Field::new("int32_field", DataType::Int32, true).with_metadata(HashMap::from(
1033 [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
1034 )),
1035 ),
1036 Arc::new(int32_array) as ArrayRef,
1037 ),
1038 (
1039 Arc::new(
1040 Field::new("int64_field", DataType::Int64, true).with_metadata(HashMap::from(
1041 [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())],
1042 )),
1043 ),
1044 Arc::new(int64_array) as ArrayRef,
1045 ),
1046 (
1047 Arc::new(
1048 Field::new("float32_field", DataType::Float32, true).with_metadata(
1049 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
1050 ),
1051 ),
1052 Arc::new(float32_array) as ArrayRef,
1053 ),
1054 (
1055 Arc::new(
1056 Field::new("float64_field", DataType::Float64, true).with_metadata(
1057 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "5".to_string())]),
1058 ),
1059 ),
1060 Arc::new(float64_array) as ArrayRef,
1061 ),
1062 (
1063 Arc::new(
1064 Field::new("decimal_field", DataType::Decimal128(10, 2), true).with_metadata(
1065 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "6".to_string())]),
1066 ),
1067 ),
1068 Arc::new(decimal_array) as ArrayRef,
1069 ),
1070 (
1071 Arc::new(
1072 Field::new("date_field", DataType::Date32, true).with_metadata(HashMap::from(
1073 [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())],
1074 )),
1075 ),
1076 Arc::new(date_array) as ArrayRef,
1077 ),
1078 (
1079 Arc::new(
1080 Field::new("time_field", DataType::Time64(TimeUnit::Microsecond), true)
1081 .with_metadata(HashMap::from([(
1082 PARQUET_FIELD_ID_META_KEY.to_string(),
1083 "8".to_string(),
1084 )])),
1085 ),
1086 Arc::new(time_array) as ArrayRef,
1087 ),
1088 (
1089 Arc::new(
1090 Field::new(
1091 "timestamp_micro_field",
1092 DataType::Timestamp(TimeUnit::Microsecond, None),
1093 true,
1094 )
1095 .with_metadata(HashMap::from([(
1096 PARQUET_FIELD_ID_META_KEY.to_string(),
1097 "9".to_string(),
1098 )])),
1099 ),
1100 Arc::new(timestamp_micro_array) as ArrayRef,
1101 ),
1102 (
1103 Arc::new(
1104 Field::new(
1105 "timestamp_nano_field",
1106 DataType::Timestamp(TimeUnit::Nanosecond, None),
1107 true,
1108 )
1109 .with_metadata(HashMap::from([(
1110 PARQUET_FIELD_ID_META_KEY.to_string(),
1111 "10".to_string(),
1112 )])),
1113 ),
1114 Arc::new(timestamp_nano_array) as ArrayRef,
1115 ),
1116 (
1117 Arc::new(
1118 Field::new("string_field", DataType::Utf8, true).with_metadata(HashMap::from(
1119 [(PARQUET_FIELD_ID_META_KEY.to_string(), "11".to_string())],
1120 )),
1121 ),
1122 Arc::new(string_array) as ArrayRef,
1123 ),
1124 (
1125 Arc::new(
1126 Field::new("binary_field", DataType::Binary, true).with_metadata(
1127 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "12".to_string())]),
1128 ),
1129 ),
1130 Arc::new(binary_array) as ArrayRef,
1131 ),
1132 ])) as ArrayRef;
1133
1134 let iceberg_struct_type = StructType::new(vec![
1135 Arc::new(NestedField::optional(
1136 0,
1137 "bool_field",
1138 Type::Primitive(PrimitiveType::Boolean),
1139 )),
1140 Arc::new(NestedField::optional(
1141 2,
1142 "int32_field",
1143 Type::Primitive(PrimitiveType::Int),
1144 )),
1145 Arc::new(NestedField::optional(
1146 3,
1147 "int64_field",
1148 Type::Primitive(PrimitiveType::Long),
1149 )),
1150 Arc::new(NestedField::optional(
1151 4,
1152 "float32_field",
1153 Type::Primitive(PrimitiveType::Float),
1154 )),
1155 Arc::new(NestedField::optional(
1156 5,
1157 "float64_field",
1158 Type::Primitive(PrimitiveType::Double),
1159 )),
1160 Arc::new(NestedField::optional(
1161 6,
1162 "decimal_field",
1163 Type::Primitive(PrimitiveType::Decimal {
1164 precision: 10,
1165 scale: 2,
1166 }),
1167 )),
1168 Arc::new(NestedField::optional(
1169 7,
1170 "date_field",
1171 Type::Primitive(PrimitiveType::Date),
1172 )),
1173 Arc::new(NestedField::optional(
1174 8,
1175 "time_field",
1176 Type::Primitive(PrimitiveType::Time),
1177 )),
1178 Arc::new(NestedField::optional(
1179 9,
1180 "timestamp_micro_field",
1181 Type::Primitive(PrimitiveType::Timestamp),
1182 )),
1183 Arc::new(NestedField::optional(
1184 10,
1185 "timestamp_nao_field",
1186 Type::Primitive(PrimitiveType::TimestampNs),
1187 )),
1188 Arc::new(NestedField::optional(
1189 11,
1190 "string_field",
1191 Type::Primitive(PrimitiveType::String),
1192 )),
1193 Arc::new(NestedField::optional(
1194 12,
1195 "binary_field",
1196 Type::Primitive(PrimitiveType::Binary),
1197 )),
1198 ]);
1199
1200 let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap();
1201
1202 assert_eq!(result, vec![
1203 Some(Literal::Struct(Struct::from_iter(vec![
1204 Some(Literal::bool(true)),
1205 Some(Literal::int(3)),
1206 Some(Literal::long(5)),
1207 Some(Literal::float(1.1)),
1208 Some(Literal::double(3.3)),
1209 Some(Literal::decimal(1000)),
1210 Some(Literal::date(18628)),
1211 Some(Literal::time(123456789)),
1212 Some(Literal::timestamp(1622548800000000)),
1213 Some(Literal::timestamp_nano(1622548800000000000)),
1214 Some(Literal::string("a".to_string())),
1215 Some(Literal::binary(b"abc".to_vec())),
1216 ]))),
1217 Some(Literal::Struct(Struct::from_iter(vec![
1218 Some(Literal::bool(false)),
1219 Some(Literal::int(4)),
1220 Some(Literal::long(6)),
1221 Some(Literal::float(2.2)),
1222 Some(Literal::double(4.4)),
1223 Some(Literal::decimal(2000)),
1224 Some(Literal::date(18629)),
1225 Some(Literal::time(987654321)),
1226 Some(Literal::timestamp(1622635200000000)),
1227 Some(Literal::timestamp_nano(1622635200000000000)),
1228 Some(Literal::string("b".to_string())),
1229 Some(Literal::binary(b"def".to_vec())),
1230 ]))),
1231 Some(Literal::Struct(Struct::from_iter(vec![
1232 None, None, None, None, None, None, None, None, None, None, None, None,
1233 ]))),
1234 ]);
1235 }
1236
1237 #[test]
1238 fn test_nullable_struct() {
1239 let struct_array = {
1246 let mut builder = StructBuilder::from_fields(
1247 Fields::from(vec![
1248 Field::new("a", DataType::Int32, true).with_metadata(HashMap::from([(
1249 PARQUET_FIELD_ID_META_KEY.to_string(),
1250 "0".to_string(),
1251 )])),
1252 Field::new("b", DataType::Int32, true).with_metadata(HashMap::from([(
1253 PARQUET_FIELD_ID_META_KEY.to_string(),
1254 "1".to_string(),
1255 )])),
1256 ]),
1257 3,
1258 );
1259 builder
1260 .field_builder::<Int32Builder>(0)
1261 .unwrap()
1262 .append_null();
1263 builder
1264 .field_builder::<Int32Builder>(1)
1265 .unwrap()
1266 .append_null();
1267 builder.append(true);
1268
1269 builder
1270 .field_builder::<Int32Builder>(0)
1271 .unwrap()
1272 .append_value(1);
1273 builder
1274 .field_builder::<Int32Builder>(1)
1275 .unwrap()
1276 .append_null();
1277 builder.append(true);
1278
1279 builder
1280 .field_builder::<Int32Builder>(0)
1281 .unwrap()
1282 .append_value(1);
1283 builder
1284 .field_builder::<Int32Builder>(1)
1285 .unwrap()
1286 .append_value(1);
1287 builder.append_null();
1288
1289 Arc::new(builder.finish()) as ArrayRef
1290 };
1291
1292 let iceberg_struct_type = StructType::new(vec![
1293 Arc::new(NestedField::optional(
1294 0,
1295 "a",
1296 Type::Primitive(PrimitiveType::Int),
1297 )),
1298 Arc::new(NestedField::optional(
1299 1,
1300 "b",
1301 Type::Primitive(PrimitiveType::Int),
1302 )),
1303 ]);
1304
1305 let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap();
1306 assert_eq!(result, vec![
1307 Some(Literal::Struct(Struct::from_iter(vec![None, None,]))),
1308 Some(Literal::Struct(Struct::from_iter(vec![
1309 Some(Literal::int(1)),
1310 None,
1311 ]))),
1312 None,
1313 ]);
1314 }
1315
1316 #[test]
1317 fn test_empty_struct() {
1318 let struct_array = Arc::new(StructArray::new_null(Fields::empty(), 3)) as ArrayRef;
1319 let iceberg_struct_type = StructType::new(vec![]);
1320 let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap();
1321 assert_eq!(result, vec![None; 0]);
1322 }
1323
1324 #[test]
1325 fn test_find_field_by_id() {
1326 let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]);
1328 let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]);
1329
1330 let nested_struct_array =
1332 Arc::new(StructArray::from(vec![
1333 (
1334 Arc::new(Field::new("field_a", DataType::Int32, true).with_metadata(
1335 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1336 )),
1337 Arc::new(field_a_array) as ArrayRef,
1338 ),
1339 (
1340 Arc::new(Field::new("field_b", DataType::Utf8, true).with_metadata(
1341 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
1342 )),
1343 Arc::new(field_b_array) as ArrayRef,
1344 ),
1345 ])) as ArrayRef;
1346
1347 let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]);
1348
1349 let struct_array = Arc::new(StructArray::from(vec![
1351 (
1352 Arc::new(
1353 Field::new(
1354 "nested_struct",
1355 DataType::Struct(Fields::from(vec![
1356 Field::new("field_a", DataType::Int32, true).with_metadata(
1357 HashMap::from([(
1358 PARQUET_FIELD_ID_META_KEY.to_string(),
1359 "1".to_string(),
1360 )]),
1361 ),
1362 Field::new("field_b", DataType::Utf8, true).with_metadata(
1363 HashMap::from([(
1364 PARQUET_FIELD_ID_META_KEY.to_string(),
1365 "2".to_string(),
1366 )]),
1367 ),
1368 ])),
1369 true,
1370 )
1371 .with_metadata(HashMap::from([(
1372 PARQUET_FIELD_ID_META_KEY.to_string(),
1373 "3".to_string(),
1374 )])),
1375 ),
1376 nested_struct_array,
1377 ),
1378 (
1379 Arc::new(Field::new("field_c", DataType::Int32, true).with_metadata(
1380 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
1381 )),
1382 Arc::new(field_c_array) as ArrayRef,
1383 ),
1384 ])) as ArrayRef;
1385
1386 let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Id);
1388
1389 let nested_field = NestedField::optional(
1391 3,
1392 "nested_struct",
1393 Type::Struct(StructType::new(vec![
1394 Arc::new(NestedField::optional(
1395 1,
1396 "field_a",
1397 Type::Primitive(PrimitiveType::Int),
1398 )),
1399 Arc::new(NestedField::optional(
1400 2,
1401 "field_b",
1402 Type::Primitive(PrimitiveType::String),
1403 )),
1404 ])),
1405 );
1406 let nested_partner = accessor
1407 .field_partner(&struct_array, &nested_field)
1408 .unwrap();
1409
1410 let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int));
1412 let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap();
1413
1414 let int_array = field_a_partner
1416 .as_any()
1417 .downcast_ref::<Int32Array>()
1418 .unwrap();
1419 assert_eq!(int_array.value(0), 42);
1420 assert_eq!(int_array.value(1), 43);
1421 assert!(int_array.is_null(2));
1422 }
1423
1424 #[test]
1425 fn test_find_field_by_name() {
1426 let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]);
1428 let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]);
1429
1430 let nested_struct_array = Arc::new(StructArray::from(vec![
1432 (
1433 Arc::new(Field::new("field_a", DataType::Int32, true)),
1434 Arc::new(field_a_array) as ArrayRef,
1435 ),
1436 (
1437 Arc::new(Field::new("field_b", DataType::Utf8, true)),
1438 Arc::new(field_b_array) as ArrayRef,
1439 ),
1440 ])) as ArrayRef;
1441
1442 let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]);
1443
1444 let struct_array = Arc::new(StructArray::from(vec![
1446 (
1447 Arc::new(Field::new(
1448 "nested_struct",
1449 DataType::Struct(Fields::from(vec![
1450 Field::new("field_a", DataType::Int32, true),
1451 Field::new("field_b", DataType::Utf8, true),
1452 ])),
1453 true,
1454 )),
1455 nested_struct_array,
1456 ),
1457 (
1458 Arc::new(Field::new("field_c", DataType::Int32, true)),
1459 Arc::new(field_c_array) as ArrayRef,
1460 ),
1461 ])) as ArrayRef;
1462
1463 let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Name);
1465
1466 let nested_field = NestedField::optional(
1468 3,
1469 "nested_struct",
1470 Type::Struct(StructType::new(vec![
1471 Arc::new(NestedField::optional(
1472 1,
1473 "field_a",
1474 Type::Primitive(PrimitiveType::Int),
1475 )),
1476 Arc::new(NestedField::optional(
1477 2,
1478 "field_b",
1479 Type::Primitive(PrimitiveType::String),
1480 )),
1481 ])),
1482 );
1483 let nested_partner = accessor
1484 .field_partner(&struct_array, &nested_field)
1485 .unwrap();
1486
1487 let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int));
1489 let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap();
1490
1491 let int_array = field_a_partner
1493 .as_any()
1494 .downcast_ref::<Int32Array>()
1495 .unwrap();
1496 assert_eq!(int_array.value(0), 42);
1497 assert_eq!(int_array.value(1), 43);
1498 assert!(int_array.is_null(2));
1499 }
1500
1501 #[test]
1502 fn test_complex_nested() {
1503 let struct_type = StructType::new(vec![
1510 Arc::new(NestedField::required(
1511 0,
1512 "A",
1513 Type::List(ListType::new(Arc::new(NestedField::required(
1514 1,
1515 "item",
1516 Type::Struct(StructType::new(vec![
1517 Arc::new(NestedField::required(
1518 2,
1519 "a1",
1520 Type::Primitive(PrimitiveType::Int),
1521 )),
1522 Arc::new(NestedField::required(
1523 3,
1524 "a2",
1525 Type::Primitive(PrimitiveType::Int),
1526 )),
1527 ])),
1528 )))),
1529 )),
1530 Arc::new(NestedField::required(
1531 4,
1532 "B",
1533 Type::List(ListType::new(Arc::new(NestedField::required(
1534 5,
1535 "item",
1536 Type::Map(MapType::new(
1537 NestedField::optional(6, "keys", Type::Primitive(PrimitiveType::Int))
1538 .into(),
1539 NestedField::optional(7, "values", Type::Primitive(PrimitiveType::Int))
1540 .into(),
1541 )),
1542 )))),
1543 )),
1544 Arc::new(NestedField::required(
1545 8,
1546 "C",
1547 Type::List(ListType::new(Arc::new(NestedField::required(
1548 9,
1549 "item",
1550 Type::List(ListType::new(Arc::new(NestedField::optional(
1551 10,
1552 "item",
1553 Type::Primitive(PrimitiveType::Int),
1554 )))),
1555 )))),
1556 )),
1557 ]);
1558
1559 let struct_array =
1565 {
1566 let a_struct_a1_builder = Int32Builder::new();
1567 let a_struct_a2_builder = Int32Builder::new();
1568 let a_struct_builder =
1569 StructBuilder::new(
1570 vec![
1571 Field::new("a1", DataType::Int32, false).with_metadata(HashMap::from(
1572 [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
1573 )),
1574 Field::new("a2", DataType::Int32, false).with_metadata(HashMap::from(
1575 [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())],
1576 )),
1577 ],
1578 vec![Box::new(a_struct_a1_builder), Box::new(a_struct_a2_builder)],
1579 );
1580 let a_builder = ListBuilder::new(a_struct_builder);
1581
1582 let map_key_builder = Int32Builder::new();
1583 let map_value_builder = Int32Builder::new();
1584 let map_builder = MapBuilder::new(None, map_key_builder, map_value_builder);
1585 let b_builder = ListBuilder::new(map_builder);
1586
1587 let inner_list_item_builder = Int32Builder::new();
1588 let inner_list_builder = ListBuilder::new(inner_list_item_builder);
1589 let c_builder = ListBuilder::new(inner_list_builder);
1590
1591 let mut top_struct_builder = {
1592 let a_struct_type =
1593 DataType::Struct(Fields::from(vec![
1594 Field::new("a1", DataType::Int32, false).with_metadata(HashMap::from(
1595 [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
1596 )),
1597 Field::new("a2", DataType::Int32, false).with_metadata(HashMap::from(
1598 [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())],
1599 )),
1600 ]));
1601 let a_type =
1602 DataType::List(Arc::new(Field::new("item", a_struct_type.clone(), true)));
1603
1604 let b_map_entry_struct = Field::new(
1605 "entries",
1606 DataType::Struct(Fields::from(vec![
1607 Field::new("keys", DataType::Int32, false),
1608 Field::new("values", DataType::Int32, true),
1609 ])),
1610 false,
1611 );
1612 let b_map_type =
1613 DataType::Map(Arc::new(b_map_entry_struct), false);
1614 let b_type =
1615 DataType::List(Arc::new(Field::new("item", b_map_type.clone(), true)));
1616
1617 let c_inner_list_type =
1618 DataType::List(Arc::new(Field::new("item", DataType::Int32, true)));
1619 let c_type = DataType::List(Arc::new(Field::new(
1620 "item",
1621 c_inner_list_type.clone(),
1622 true,
1623 )));
1624 StructBuilder::new(
1625 Fields::from(vec![
1626 Field::new("A", a_type.clone(), false).with_metadata(HashMap::from([
1627 (PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string()),
1628 ])),
1629 Field::new("B", b_type.clone(), false).with_metadata(HashMap::from([
1630 (PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string()),
1631 ])),
1632 Field::new("C", c_type.clone(), false).with_metadata(HashMap::from([
1633 (PARQUET_FIELD_ID_META_KEY.to_string(), "8".to_string()),
1634 ])),
1635 ]),
1636 vec![
1637 Box::new(a_builder),
1638 Box::new(b_builder),
1639 Box::new(c_builder),
1640 ],
1641 )
1642 };
1643
1644 {
1647 let a_builder = top_struct_builder
1648 .field_builder::<ListBuilder<StructBuilder>>(0)
1649 .unwrap();
1650 let struct_builder = a_builder.values();
1651 struct_builder
1652 .field_builder::<Int32Builder>(0)
1653 .unwrap()
1654 .append_value(10);
1655 struct_builder
1656 .field_builder::<Int32Builder>(1)
1657 .unwrap()
1658 .append_value(20);
1659 struct_builder.append(true);
1660 let struct_builder = a_builder.values();
1661 struct_builder
1662 .field_builder::<Int32Builder>(0)
1663 .unwrap()
1664 .append_value(11);
1665 struct_builder
1666 .field_builder::<Int32Builder>(1)
1667 .unwrap()
1668 .append_value(21);
1669 struct_builder.append(true);
1670 a_builder.append(true);
1671 }
1672 {
1673 let b_builder = top_struct_builder
1674 .field_builder::<ListBuilder<MapBuilder<Int32Builder, Int32Builder>>>(1)
1675 .unwrap();
1676 let map_builder = b_builder.values();
1677 map_builder.keys().append_value(1);
1678 map_builder.values().append_value(100);
1679 map_builder.keys().append_value(3);
1680 map_builder.values().append_value(300);
1681 map_builder.append(true).unwrap();
1682
1683 map_builder.keys().append_value(2);
1684 map_builder.values().append_value(200);
1685 map_builder.append(true).unwrap();
1686
1687 b_builder.append(true);
1688 }
1689 {
1690 let c_builder = top_struct_builder
1691 .field_builder::<ListBuilder<ListBuilder<Int32Builder>>>(2)
1692 .unwrap();
1693 let inner_list_builder = c_builder.values();
1694 inner_list_builder.values().append_value(100);
1695 inner_list_builder.values().append_value(101);
1696 inner_list_builder.values().append_value(102);
1697 inner_list_builder.append(true);
1698 let inner_list_builder = c_builder.values();
1699 inner_list_builder.values().append_value(200);
1700 inner_list_builder.values().append_value(201);
1701 inner_list_builder.append(true);
1702 c_builder.append(true);
1703 }
1704 top_struct_builder.append(true);
1705
1706 {
1709 let a_builder = top_struct_builder
1710 .field_builder::<ListBuilder<StructBuilder>>(0)
1711 .unwrap();
1712 let struct_builder = a_builder.values();
1713 struct_builder
1714 .field_builder::<Int32Builder>(0)
1715 .unwrap()
1716 .append_value(12);
1717 struct_builder
1718 .field_builder::<Int32Builder>(1)
1719 .unwrap()
1720 .append_value(22);
1721 struct_builder.append(true);
1722 let struct_builder = a_builder.values();
1723 struct_builder
1724 .field_builder::<Int32Builder>(0)
1725 .unwrap()
1726 .append_value(13);
1727 struct_builder
1728 .field_builder::<Int32Builder>(1)
1729 .unwrap()
1730 .append_value(23);
1731 struct_builder.append(true);
1732 a_builder.append(true);
1733 }
1734 {
1735 let b_builder = top_struct_builder
1736 .field_builder::<ListBuilder<MapBuilder<Int32Builder, Int32Builder>>>(1)
1737 .unwrap();
1738 let map_builder = b_builder.values();
1739 map_builder.keys().append_value(3);
1740 map_builder.values().append_value(300);
1741 map_builder.append(true).unwrap();
1742
1743 b_builder.append(true);
1744 }
1745 {
1746 let c_builder = top_struct_builder
1747 .field_builder::<ListBuilder<ListBuilder<Int32Builder>>>(2)
1748 .unwrap();
1749 let inner_list_builder = c_builder.values();
1750 inner_list_builder.values().append_value(300);
1751 inner_list_builder.values().append_value(301);
1752 inner_list_builder.values().append_value(302);
1753 inner_list_builder.append(true);
1754 let inner_list_builder = c_builder.values();
1755 inner_list_builder.values().append_value(400);
1756 inner_list_builder.values().append_value(401);
1757 inner_list_builder.append(true);
1758 c_builder.append(true);
1759 }
1760 top_struct_builder.append(true);
1761
1762 Arc::new(top_struct_builder.finish()) as ArrayRef
1763 };
1764
1765 let result = arrow_struct_to_literal(&struct_array, &struct_type).unwrap();
1766 assert_eq!(result, vec![
1767 Some(Literal::Struct(Struct::from_iter(vec![
1768 Some(Literal::List(vec![
1769 Some(Literal::Struct(Struct::from_iter(vec![
1770 Some(Literal::int(10)),
1771 Some(Literal::int(20)),
1772 ]))),
1773 Some(Literal::Struct(Struct::from_iter(vec![
1774 Some(Literal::int(11)),
1775 Some(Literal::int(21)),
1776 ]))),
1777 ])),
1778 Some(Literal::List(vec![
1779 Some(Literal::Map(Map::from_iter(vec![
1780 (Literal::int(1), Some(Literal::int(100))),
1781 (Literal::int(3), Some(Literal::int(300))),
1782 ]))),
1783 Some(Literal::Map(Map::from_iter(vec![(
1784 Literal::int(2),
1785 Some(Literal::int(200))
1786 ),]))),
1787 ])),
1788 Some(Literal::List(vec![
1789 Some(Literal::List(vec![
1790 Some(Literal::int(100)),
1791 Some(Literal::int(101)),
1792 Some(Literal::int(102)),
1793 ])),
1794 Some(Literal::List(vec![
1795 Some(Literal::int(200)),
1796 Some(Literal::int(201)),
1797 ])),
1798 ])),
1799 ]))),
1800 Some(Literal::Struct(Struct::from_iter(vec![
1801 Some(Literal::List(vec![
1802 Some(Literal::Struct(Struct::from_iter(vec![
1803 Some(Literal::int(12)),
1804 Some(Literal::int(22)),
1805 ]))),
1806 Some(Literal::Struct(Struct::from_iter(vec![
1807 Some(Literal::int(13)),
1808 Some(Literal::int(23)),
1809 ]))),
1810 ])),
1811 Some(Literal::List(vec![Some(Literal::Map(Map::from_iter(
1812 vec![(Literal::int(3), Some(Literal::int(300))),]
1813 ))),])),
1814 Some(Literal::List(vec![
1815 Some(Literal::List(vec![
1816 Some(Literal::int(300)),
1817 Some(Literal::int(301)),
1818 Some(Literal::int(302)),
1819 ])),
1820 Some(Literal::List(vec![
1821 Some(Literal::int(400)),
1822 Some(Literal::int(401)),
1823 ])),
1824 ])),
1825 ]))),
1826 ]);
1827 }
1828
1829 #[test]
1830 fn test_create_decimal_array_respects_precision() {
1831 let target_precision = 18u8;
1834 let target_scale = 10i8;
1835 let target_type = DataType::Decimal128(target_precision, target_scale);
1836 let value = PrimitiveLiteral::Int128(10000000000);
1837
1838 let array = create_primitive_array_single_element(&target_type, &Some(value))
1839 .expect("Failed to create decimal array");
1840
1841 match array.data_type() {
1842 DataType::Decimal128(precision, scale) => {
1843 assert_eq!(*precision, target_precision);
1844 assert_eq!(*scale, target_scale);
1845 }
1846 other => panic!("Expected Decimal128, got {other:?}"),
1847 }
1848 }
1849
1850 #[test]
1851 fn test_create_decimal_array_repeated_respects_precision() {
1852 let target_precision = 18u8;
1854 let target_scale = 10i8;
1855 let target_type = DataType::Decimal128(target_precision, target_scale);
1856 let value = PrimitiveLiteral::Int128(10000000000);
1857 let num_rows = 5;
1858
1859 let array = create_primitive_array_repeated(&target_type, &Some(value), num_rows)
1860 .expect("Failed to create repeated decimal array");
1861
1862 match array.data_type() {
1863 DataType::Decimal128(precision, scale) => {
1864 assert_eq!(*precision, target_precision);
1865 assert_eq!(*scale, target_scale);
1866 }
1867 other => panic!("Expected Decimal128, got {other:?}"),
1868 }
1869
1870 assert_eq!(array.len(), num_rows);
1871 }
1872
1873 #[test]
1874 fn test_create_timestamp_microsecond_array_repeated() {
1875 let target_type = DataType::Timestamp(TimeUnit::Microsecond, None);
1876 let value = PrimitiveLiteral::Long(1_740_600_000_000_000);
1877 let num_rows = 3;
1878
1879 let array = create_primitive_array_repeated(&target_type, &Some(value), num_rows)
1880 .expect("Failed to create repeated timestamp microsecond array");
1881
1882 assert_eq!(array.data_type(), &target_type);
1883 assert_eq!(array.len(), num_rows);
1884 }
1885
1886 #[test]
1887 fn test_create_timestamp_microsecond_with_timezone_array_repeated() {
1888 let target_type = DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()));
1889 let value = PrimitiveLiteral::Long(1_740_600_000_000_000);
1890 let num_rows = 2;
1891
1892 let array = create_primitive_array_repeated(&target_type, &Some(value), num_rows)
1893 .expect("Failed to create repeated timestamp microsecond array with timezone");
1894
1895 assert_eq!(array.data_type(), &target_type);
1896 assert_eq!(array.len(), num_rows);
1897 }
1898}