1use arrow_array::{
19 Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, FixedSizeBinaryArray,
20 FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array, LargeBinaryArray,
21 LargeListArray, LargeStringArray, ListArray, MapArray, StringArray, StructArray,
22 Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray,
23};
24use arrow_schema::{DataType, FieldRef};
25use uuid::Uuid;
26
27use super::get_field_id;
28use crate::spec::{
29 ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveType,
30 SchemaWithPartnerVisitor, Struct, StructType, Type, visit_struct_with_partner,
31 visit_type_with_partner,
32};
33use crate::{Error, ErrorKind, Result};
34
35struct ArrowArrayToIcebergStructConverter;
36
37impl SchemaWithPartnerVisitor<ArrayRef> for ArrowArrayToIcebergStructConverter {
38 type T = Vec<Option<Literal>>;
39
40 fn schema(
41 &mut self,
42 _schema: &crate::spec::Schema,
43 _partner: &ArrayRef,
44 value: Vec<Option<Literal>>,
45 ) -> Result<Vec<Option<Literal>>> {
46 Ok(value)
47 }
48
49 fn field(
50 &mut self,
51 field: &crate::spec::NestedFieldRef,
52 _partner: &ArrayRef,
53 value: Vec<Option<Literal>>,
54 ) -> Result<Vec<Option<Literal>>> {
55 if field.required && value.iter().any(Option::is_none) {
57 return Err(Error::new(
58 ErrorKind::DataInvalid,
59 "The field is required but has null value",
60 )
61 .with_context("field_id", field.id.to_string())
62 .with_context("field_name", &field.name));
63 }
64 Ok(value)
65 }
66
67 fn r#struct(
68 &mut self,
69 _struct: &StructType,
70 array: &ArrayRef,
71 results: Vec<Vec<Option<Literal>>>,
72 ) -> Result<Vec<Option<Literal>>> {
73 let row_len = results.first().map(|column| column.len()).unwrap_or(0);
74 if let Some(col) = results.iter().find(|col| col.len() != row_len) {
75 return Err(Error::new(
76 ErrorKind::DataInvalid,
77 "The struct columns have different row length",
78 )
79 .with_context("first col length", row_len.to_string())
80 .with_context("actual col length", col.len().to_string()));
81 }
82
83 let mut struct_literals = Vec::with_capacity(row_len);
84 let mut columns_iters = results
85 .into_iter()
86 .map(|column| column.into_iter())
87 .collect::<Vec<_>>();
88
89 for i in 0..row_len {
90 let mut literals = Vec::with_capacity(columns_iters.len());
91 for column_iter in columns_iters.iter_mut() {
92 literals.push(column_iter.next().unwrap());
93 }
94 if array.is_null(i) {
95 struct_literals.push(None);
96 } else {
97 struct_literals.push(Some(Literal::Struct(Struct::from_iter(literals))));
98 }
99 }
100
101 Ok(struct_literals)
102 }
103
104 fn list(
105 &mut self,
106 list: &ListType,
107 array: &ArrayRef,
108 elements: Vec<Option<Literal>>,
109 ) -> Result<Vec<Option<Literal>>> {
110 if list.element_field.required && elements.iter().any(Option::is_none) {
111 return Err(Error::new(
112 ErrorKind::DataInvalid,
113 "The list should not have null value",
114 ));
115 }
116 match array.data_type() {
117 DataType::List(_) => {
118 let offset = array
119 .as_any()
120 .downcast_ref::<ListArray>()
121 .ok_or_else(|| {
122 Error::new(ErrorKind::DataInvalid, "The partner is not a list array")
123 })?
124 .offsets();
125 let mut result = Vec::with_capacity(offset.len() - 1);
127 for i in 0..offset.len() - 1 {
128 let start = offset[i] as usize;
129 let end = offset[i + 1] as usize;
130 result.push(Some(Literal::List(elements[start..end].to_vec())));
131 }
132 Ok(result)
133 }
134 DataType::LargeList(_) => {
135 let offset = array
136 .as_any()
137 .downcast_ref::<LargeListArray>()
138 .ok_or_else(|| {
139 Error::new(
140 ErrorKind::DataInvalid,
141 "The partner is not a large list array",
142 )
143 })?
144 .offsets();
145 let mut result = Vec::with_capacity(offset.len() - 1);
147 for i in 0..offset.len() - 1 {
148 let start = offset[i] as usize;
149 let end = offset[i + 1] as usize;
150 result.push(Some(Literal::List(elements[start..end].to_vec())));
151 }
152 Ok(result)
153 }
154 DataType::FixedSizeList(_, len) => {
155 let mut result = Vec::with_capacity(elements.len() / *len as usize);
156 for i in 0..elements.len() / *len as usize {
157 let start = i * *len as usize;
158 let end = (i + 1) * *len as usize;
159 result.push(Some(Literal::List(elements[start..end].to_vec())));
160 }
161 Ok(result)
162 }
163 _ => Err(Error::new(
164 ErrorKind::DataInvalid,
165 "The partner is not a list type",
166 )),
167 }
168 }
169
170 fn map(
171 &mut self,
172 _map: &MapType,
173 partner: &ArrayRef,
174 key_values: Vec<Option<Literal>>,
175 values: Vec<Option<Literal>>,
176 ) -> Result<Vec<Option<Literal>>> {
177 if key_values.len() != values.len() {
179 return Err(Error::new(
180 ErrorKind::DataInvalid,
181 "The key value and value of map should have the same row length",
182 ));
183 }
184
185 let offsets = partner
186 .as_any()
187 .downcast_ref::<MapArray>()
188 .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "The partner is not a map array"))?
189 .offsets();
190 let mut result = Vec::with_capacity(offsets.len() - 1);
192 for i in 0..offsets.len() - 1 {
193 let start = offsets[i] as usize;
194 let end = offsets[i + 1] as usize;
195 let mut map = Map::new();
196 for (key, value) in key_values[start..end].iter().zip(values[start..end].iter()) {
197 map.insert(key.clone().unwrap(), value.clone());
198 }
199 result.push(Some(Literal::Map(map)));
200 }
201 Ok(result)
202 }
203
204 fn primitive(&mut self, p: &PrimitiveType, partner: &ArrayRef) -> Result<Vec<Option<Literal>>> {
205 match p {
206 PrimitiveType::Boolean => {
207 let array = partner
208 .as_any()
209 .downcast_ref::<BooleanArray>()
210 .ok_or_else(|| {
211 Error::new(ErrorKind::DataInvalid, "The partner is not a boolean array")
212 })?;
213 Ok(array.iter().map(|v| v.map(Literal::bool)).collect())
214 }
215 PrimitiveType::Int => {
216 let array = partner
217 .as_any()
218 .downcast_ref::<Int32Array>()
219 .ok_or_else(|| {
220 Error::new(ErrorKind::DataInvalid, "The partner is not a int32 array")
221 })?;
222 Ok(array.iter().map(|v| v.map(Literal::int)).collect())
223 }
224 PrimitiveType::Long => {
225 let array = partner
226 .as_any()
227 .downcast_ref::<Int64Array>()
228 .ok_or_else(|| {
229 Error::new(ErrorKind::DataInvalid, "The partner is not a int64 array")
230 })?;
231 Ok(array.iter().map(|v| v.map(Literal::long)).collect())
232 }
233 PrimitiveType::Float => {
234 let array = partner
235 .as_any()
236 .downcast_ref::<Float32Array>()
237 .ok_or_else(|| {
238 Error::new(ErrorKind::DataInvalid, "The partner is not a float32 array")
239 })?;
240 Ok(array.iter().map(|v| v.map(Literal::float)).collect())
241 }
242 PrimitiveType::Double => {
243 let array = partner
244 .as_any()
245 .downcast_ref::<Float64Array>()
246 .ok_or_else(|| {
247 Error::new(ErrorKind::DataInvalid, "The partner is not a float64 array")
248 })?;
249 Ok(array.iter().map(|v| v.map(Literal::double)).collect())
250 }
251 PrimitiveType::Decimal { precision, scale } => {
252 let array = partner
253 .as_any()
254 .downcast_ref::<Decimal128Array>()
255 .ok_or_else(|| {
256 Error::new(
257 ErrorKind::DataInvalid,
258 "The partner is not a decimal128 array",
259 )
260 })?;
261 if let DataType::Decimal128(arrow_precision, arrow_scale) = array.data_type() {
262 if *arrow_precision as u32 != *precision || *arrow_scale as u32 != *scale {
263 return Err(Error::new(
264 ErrorKind::DataInvalid,
265 format!(
266 "The precision or scale ({arrow_precision},{arrow_scale}) of arrow decimal128 array is not compatible with iceberg decimal type ({precision},{scale})"
267 ),
268 ));
269 }
270 }
271 Ok(array.iter().map(|v| v.map(Literal::decimal)).collect())
272 }
273 PrimitiveType::Date => {
274 let array = partner
275 .as_any()
276 .downcast_ref::<Date32Array>()
277 .ok_or_else(|| {
278 Error::new(ErrorKind::DataInvalid, "The partner is not a date32 array")
279 })?;
280 Ok(array.iter().map(|v| v.map(Literal::date)).collect())
281 }
282 PrimitiveType::Time => {
283 let array = partner
284 .as_any()
285 .downcast_ref::<Time64MicrosecondArray>()
286 .ok_or_else(|| {
287 Error::new(ErrorKind::DataInvalid, "The partner is not a time64 array")
288 })?;
289 Ok(array.iter().map(|v| v.map(Literal::time)).collect())
290 }
291 PrimitiveType::Timestamp => {
292 let array = partner
293 .as_any()
294 .downcast_ref::<TimestampMicrosecondArray>()
295 .ok_or_else(|| {
296 Error::new(
297 ErrorKind::DataInvalid,
298 "The partner is not a timestamp array",
299 )
300 })?;
301 Ok(array.iter().map(|v| v.map(Literal::timestamp)).collect())
302 }
303 PrimitiveType::Timestamptz => {
304 let array = partner
305 .as_any()
306 .downcast_ref::<TimestampMicrosecondArray>()
307 .ok_or_else(|| {
308 Error::new(
309 ErrorKind::DataInvalid,
310 "The partner is not a timestamptz array",
311 )
312 })?;
313 Ok(array.iter().map(|v| v.map(Literal::timestamptz)).collect())
314 }
315 PrimitiveType::TimestampNs => {
316 let array = partner
317 .as_any()
318 .downcast_ref::<TimestampNanosecondArray>()
319 .ok_or_else(|| {
320 Error::new(
321 ErrorKind::DataInvalid,
322 "The partner is not a timestamp_ns array",
323 )
324 })?;
325 Ok(array
326 .iter()
327 .map(|v| v.map(Literal::timestamp_nano))
328 .collect())
329 }
330 PrimitiveType::TimestamptzNs => {
331 let array = partner
332 .as_any()
333 .downcast_ref::<TimestampNanosecondArray>()
334 .ok_or_else(|| {
335 Error::new(
336 ErrorKind::DataInvalid,
337 "The partner is not a timestamptz_ns array",
338 )
339 })?;
340 Ok(array
341 .iter()
342 .map(|v| v.map(Literal::timestamptz_nano))
343 .collect())
344 }
345 PrimitiveType::String => {
346 if let Some(array) = partner.as_any().downcast_ref::<LargeStringArray>() {
347 Ok(array.iter().map(|v| v.map(Literal::string)).collect())
348 } else if let Some(array) = partner.as_any().downcast_ref::<StringArray>() {
349 Ok(array.iter().map(|v| v.map(Literal::string)).collect())
350 } else {
351 return Err(Error::new(
352 ErrorKind::DataInvalid,
353 "The partner is not a string array",
354 ));
355 }
356 }
357 PrimitiveType::Uuid => {
358 if let Some(array) = partner.as_any().downcast_ref::<FixedSizeBinaryArray>() {
359 if array.value_length() != 16 {
360 return Err(Error::new(
361 ErrorKind::DataInvalid,
362 "The partner is not a uuid array",
363 ));
364 }
365 Ok(array
366 .iter()
367 .map(|v| {
368 v.map(|v| {
369 Ok(Literal::uuid(Uuid::from_bytes(v.try_into().map_err(
370 |_| {
371 Error::new(
372 ErrorKind::DataInvalid,
373 "Failed to convert binary to uuid",
374 )
375 },
376 )?)))
377 })
378 .transpose()
379 })
380 .collect::<Result<Vec<_>>>()?)
381 } else {
382 Err(Error::new(
383 ErrorKind::DataInvalid,
384 "The partner is not a uuid array",
385 ))
386 }
387 }
388 PrimitiveType::Fixed(len) => {
389 let array = partner
390 .as_any()
391 .downcast_ref::<FixedSizeBinaryArray>()
392 .ok_or_else(|| {
393 Error::new(ErrorKind::DataInvalid, "The partner is not a fixed array")
394 })?;
395 if array.value_length() != *len as i32 {
396 return Err(Error::new(
397 ErrorKind::DataInvalid,
398 "The length of fixed size binary array is not compatible with iceberg fixed type",
399 ));
400 }
401 Ok(array
402 .iter()
403 .map(|v| v.map(|v| Literal::fixed(v.iter().cloned())))
404 .collect())
405 }
406 PrimitiveType::Binary => {
407 if let Some(array) = partner.as_any().downcast_ref::<LargeBinaryArray>() {
408 Ok(array
409 .iter()
410 .map(|v| v.map(|v| Literal::binary(v.to_vec())))
411 .collect())
412 } else if let Some(array) = partner.as_any().downcast_ref::<BinaryArray>() {
413 Ok(array
414 .iter()
415 .map(|v| v.map(|v| Literal::binary(v.to_vec())))
416 .collect())
417 } else {
418 return Err(Error::new(
419 ErrorKind::DataInvalid,
420 "The partner is not a binary array",
421 ));
422 }
423 }
424 }
425 }
426}
427
428#[derive(Clone, Copy, Debug)]
439pub enum FieldMatchMode {
440 Id,
442 Name,
444}
445
446impl FieldMatchMode {
447 pub fn match_field(&self, arrow_field: &FieldRef, iceberg_field: &NestedField) -> bool {
449 match self {
450 FieldMatchMode::Id => get_field_id(arrow_field)
451 .map(|id| id == iceberg_field.id)
452 .unwrap_or(false),
453 FieldMatchMode::Name => arrow_field.name() == &iceberg_field.name,
454 }
455 }
456}
457
458pub struct ArrowArrayAccessor {
460 match_mode: FieldMatchMode,
461}
462
463impl ArrowArrayAccessor {
464 pub fn new() -> Self {
466 Self {
467 match_mode: FieldMatchMode::Id,
468 }
469 }
470
471 pub fn new_with_match_mode(match_mode: FieldMatchMode) -> Self {
473 Self { match_mode }
474 }
475}
476
477impl Default for ArrowArrayAccessor {
478 fn default() -> Self {
479 Self::new()
480 }
481}
482
483impl PartnerAccessor<ArrayRef> for ArrowArrayAccessor {
484 fn struct_partner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
485 if !matches!(schema_partner.data_type(), DataType::Struct(_)) {
486 return Err(Error::new(
487 ErrorKind::DataInvalid,
488 "The schema partner is not a struct type",
489 ));
490 }
491
492 Ok(schema_partner)
493 }
494
495 fn field_partner<'a>(
496 &self,
497 struct_partner: &'a ArrayRef,
498 field: &NestedField,
499 ) -> Result<&'a ArrayRef> {
500 let struct_array = struct_partner
501 .as_any()
502 .downcast_ref::<StructArray>()
503 .ok_or_else(|| {
504 Error::new(
505 ErrorKind::DataInvalid,
506 format!(
507 "The struct partner is not a struct array, partner: {struct_partner:?}"
508 ),
509 )
510 })?;
511
512 let field_pos = struct_array
513 .fields()
514 .iter()
515 .position(|arrow_field| self.match_mode.match_field(arrow_field, field))
516 .ok_or_else(|| {
517 Error::new(
518 ErrorKind::DataInvalid,
519 format!("Field id {} not found in struct array", field.id),
520 )
521 })?;
522
523 Ok(struct_array.column(field_pos))
524 }
525
526 fn list_element_partner<'a>(&self, list_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
527 match list_partner.data_type() {
528 DataType::List(_) => {
529 let list_array = list_partner
530 .as_any()
531 .downcast_ref::<ListArray>()
532 .ok_or_else(|| {
533 Error::new(
534 ErrorKind::DataInvalid,
535 "The list partner is not a list array",
536 )
537 })?;
538 Ok(list_array.values())
539 }
540 DataType::LargeList(_) => {
541 let list_array = list_partner
542 .as_any()
543 .downcast_ref::<LargeListArray>()
544 .ok_or_else(|| {
545 Error::new(
546 ErrorKind::DataInvalid,
547 "The list partner is not a large list array",
548 )
549 })?;
550 Ok(list_array.values())
551 }
552 DataType::FixedSizeList(_, _) => {
553 let list_array = list_partner
554 .as_any()
555 .downcast_ref::<FixedSizeListArray>()
556 .ok_or_else(|| {
557 Error::new(
558 ErrorKind::DataInvalid,
559 "The list partner is not a fixed size list array",
560 )
561 })?;
562 Ok(list_array.values())
563 }
564 _ => Err(Error::new(
565 ErrorKind::DataInvalid,
566 "The list partner is not a list type",
567 )),
568 }
569 }
570
571 fn map_key_partner<'a>(&self, map_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
572 let map_array = map_partner
573 .as_any()
574 .downcast_ref::<MapArray>()
575 .ok_or_else(|| {
576 Error::new(ErrorKind::DataInvalid, "The map partner is not a map array")
577 })?;
578 Ok(map_array.keys())
579 }
580
581 fn map_value_partner<'a>(&self, map_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
582 let map_array = map_partner
583 .as_any()
584 .downcast_ref::<MapArray>()
585 .ok_or_else(|| {
586 Error::new(ErrorKind::DataInvalid, "The map partner is not a map array")
587 })?;
588 Ok(map_array.values())
589 }
590}
591
592pub fn arrow_struct_to_literal(
595 struct_array: &ArrayRef,
596 ty: &StructType,
597) -> Result<Vec<Option<Literal>>> {
598 visit_struct_with_partner(
599 ty,
600 struct_array,
601 &mut ArrowArrayToIcebergStructConverter,
602 &ArrowArrayAccessor::new(),
603 )
604}
605
606pub fn arrow_primitive_to_literal(
609 primitive_array: &ArrayRef,
610 ty: &Type,
611) -> Result<Vec<Option<Literal>>> {
612 visit_type_with_partner(
613 ty,
614 primitive_array,
615 &mut ArrowArrayToIcebergStructConverter,
616 &ArrowArrayAccessor::new(),
617 )
618}
619
620#[cfg(test)]
621mod test {
622 use std::collections::HashMap;
623 use std::sync::Arc;
624
625 use arrow_array::builder::{Int32Builder, ListBuilder, MapBuilder, StructBuilder};
626 use arrow_array::{
627 ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array,
628 Float64Array, Int32Array, Int64Array, StringArray, StructArray, Time64MicrosecondArray,
629 TimestampMicrosecondArray, TimestampNanosecondArray,
630 };
631 use arrow_schema::{DataType, Field, Fields, TimeUnit};
632 use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
633
634 use super::*;
635 use crate::spec::{ListType, Literal, MapType, NestedField, PrimitiveType, StructType, Type};
636
637 #[test]
638 fn test_arrow_struct_to_iceberg_struct() {
639 let bool_array = BooleanArray::from(vec![Some(true), Some(false), None]);
640 let int32_array = Int32Array::from(vec![Some(3), Some(4), None]);
641 let int64_array = Int64Array::from(vec![Some(5), Some(6), None]);
642 let float32_array = Float32Array::from(vec![Some(1.1), Some(2.2), None]);
643 let float64_array = Float64Array::from(vec![Some(3.3), Some(4.4), None]);
644 let decimal_array = Decimal128Array::from(vec![Some(1000), Some(2000), None])
645 .with_precision_and_scale(10, 2)
646 .unwrap();
647 let date_array = Date32Array::from(vec![Some(18628), Some(18629), None]);
648 let time_array = Time64MicrosecondArray::from(vec![Some(123456789), Some(987654321), None]);
649 let timestamp_micro_array = TimestampMicrosecondArray::from(vec![
650 Some(1622548800000000),
651 Some(1622635200000000),
652 None,
653 ]);
654 let timestamp_nano_array = TimestampNanosecondArray::from(vec![
655 Some(1622548800000000000),
656 Some(1622635200000000000),
657 None,
658 ]);
659 let string_array = StringArray::from(vec![Some("a"), Some("b"), None]);
660 let binary_array =
661 BinaryArray::from(vec![Some(b"abc".as_ref()), Some(b"def".as_ref()), None]);
662
663 let struct_array = Arc::new(StructArray::from(vec![
664 (
665 Arc::new(
666 Field::new("bool_field", DataType::Boolean, true).with_metadata(HashMap::from(
667 [(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())],
668 )),
669 ),
670 Arc::new(bool_array) as ArrayRef,
671 ),
672 (
673 Arc::new(
674 Field::new("int32_field", DataType::Int32, true).with_metadata(HashMap::from(
675 [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
676 )),
677 ),
678 Arc::new(int32_array) as ArrayRef,
679 ),
680 (
681 Arc::new(
682 Field::new("int64_field", DataType::Int64, true).with_metadata(HashMap::from(
683 [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())],
684 )),
685 ),
686 Arc::new(int64_array) as ArrayRef,
687 ),
688 (
689 Arc::new(
690 Field::new("float32_field", DataType::Float32, true).with_metadata(
691 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
692 ),
693 ),
694 Arc::new(float32_array) as ArrayRef,
695 ),
696 (
697 Arc::new(
698 Field::new("float64_field", DataType::Float64, true).with_metadata(
699 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "5".to_string())]),
700 ),
701 ),
702 Arc::new(float64_array) as ArrayRef,
703 ),
704 (
705 Arc::new(
706 Field::new("decimal_field", DataType::Decimal128(10, 2), true).with_metadata(
707 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "6".to_string())]),
708 ),
709 ),
710 Arc::new(decimal_array) as ArrayRef,
711 ),
712 (
713 Arc::new(
714 Field::new("date_field", DataType::Date32, true).with_metadata(HashMap::from(
715 [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())],
716 )),
717 ),
718 Arc::new(date_array) as ArrayRef,
719 ),
720 (
721 Arc::new(
722 Field::new("time_field", DataType::Time64(TimeUnit::Microsecond), true)
723 .with_metadata(HashMap::from([(
724 PARQUET_FIELD_ID_META_KEY.to_string(),
725 "8".to_string(),
726 )])),
727 ),
728 Arc::new(time_array) as ArrayRef,
729 ),
730 (
731 Arc::new(
732 Field::new(
733 "timestamp_micro_field",
734 DataType::Timestamp(TimeUnit::Microsecond, None),
735 true,
736 )
737 .with_metadata(HashMap::from([(
738 PARQUET_FIELD_ID_META_KEY.to_string(),
739 "9".to_string(),
740 )])),
741 ),
742 Arc::new(timestamp_micro_array) as ArrayRef,
743 ),
744 (
745 Arc::new(
746 Field::new(
747 "timestamp_nano_field",
748 DataType::Timestamp(TimeUnit::Nanosecond, None),
749 true,
750 )
751 .with_metadata(HashMap::from([(
752 PARQUET_FIELD_ID_META_KEY.to_string(),
753 "10".to_string(),
754 )])),
755 ),
756 Arc::new(timestamp_nano_array) as ArrayRef,
757 ),
758 (
759 Arc::new(
760 Field::new("string_field", DataType::Utf8, true).with_metadata(HashMap::from(
761 [(PARQUET_FIELD_ID_META_KEY.to_string(), "11".to_string())],
762 )),
763 ),
764 Arc::new(string_array) as ArrayRef,
765 ),
766 (
767 Arc::new(
768 Field::new("binary_field", DataType::Binary, true).with_metadata(
769 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "12".to_string())]),
770 ),
771 ),
772 Arc::new(binary_array) as ArrayRef,
773 ),
774 ])) as ArrayRef;
775
776 let iceberg_struct_type = StructType::new(vec![
777 Arc::new(NestedField::optional(
778 0,
779 "bool_field",
780 Type::Primitive(PrimitiveType::Boolean),
781 )),
782 Arc::new(NestedField::optional(
783 2,
784 "int32_field",
785 Type::Primitive(PrimitiveType::Int),
786 )),
787 Arc::new(NestedField::optional(
788 3,
789 "int64_field",
790 Type::Primitive(PrimitiveType::Long),
791 )),
792 Arc::new(NestedField::optional(
793 4,
794 "float32_field",
795 Type::Primitive(PrimitiveType::Float),
796 )),
797 Arc::new(NestedField::optional(
798 5,
799 "float64_field",
800 Type::Primitive(PrimitiveType::Double),
801 )),
802 Arc::new(NestedField::optional(
803 6,
804 "decimal_field",
805 Type::Primitive(PrimitiveType::Decimal {
806 precision: 10,
807 scale: 2,
808 }),
809 )),
810 Arc::new(NestedField::optional(
811 7,
812 "date_field",
813 Type::Primitive(PrimitiveType::Date),
814 )),
815 Arc::new(NestedField::optional(
816 8,
817 "time_field",
818 Type::Primitive(PrimitiveType::Time),
819 )),
820 Arc::new(NestedField::optional(
821 9,
822 "timestamp_micro_field",
823 Type::Primitive(PrimitiveType::Timestamp),
824 )),
825 Arc::new(NestedField::optional(
826 10,
827 "timestamp_nao_field",
828 Type::Primitive(PrimitiveType::TimestampNs),
829 )),
830 Arc::new(NestedField::optional(
831 11,
832 "string_field",
833 Type::Primitive(PrimitiveType::String),
834 )),
835 Arc::new(NestedField::optional(
836 12,
837 "binary_field",
838 Type::Primitive(PrimitiveType::Binary),
839 )),
840 ]);
841
842 let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap();
843
844 assert_eq!(result, vec![
845 Some(Literal::Struct(Struct::from_iter(vec![
846 Some(Literal::bool(true)),
847 Some(Literal::int(3)),
848 Some(Literal::long(5)),
849 Some(Literal::float(1.1)),
850 Some(Literal::double(3.3)),
851 Some(Literal::decimal(1000)),
852 Some(Literal::date(18628)),
853 Some(Literal::time(123456789)),
854 Some(Literal::timestamp(1622548800000000)),
855 Some(Literal::timestamp_nano(1622548800000000000)),
856 Some(Literal::string("a".to_string())),
857 Some(Literal::binary(b"abc".to_vec())),
858 ]))),
859 Some(Literal::Struct(Struct::from_iter(vec![
860 Some(Literal::bool(false)),
861 Some(Literal::int(4)),
862 Some(Literal::long(6)),
863 Some(Literal::float(2.2)),
864 Some(Literal::double(4.4)),
865 Some(Literal::decimal(2000)),
866 Some(Literal::date(18629)),
867 Some(Literal::time(987654321)),
868 Some(Literal::timestamp(1622635200000000)),
869 Some(Literal::timestamp_nano(1622635200000000000)),
870 Some(Literal::string("b".to_string())),
871 Some(Literal::binary(b"def".to_vec())),
872 ]))),
873 Some(Literal::Struct(Struct::from_iter(vec![
874 None, None, None, None, None, None, None, None, None, None, None, None,
875 ]))),
876 ]);
877 }
878
879 #[test]
880 fn test_nullable_struct() {
881 let struct_array = {
888 let mut builder = StructBuilder::from_fields(
889 Fields::from(vec![
890 Field::new("a", DataType::Int32, true).with_metadata(HashMap::from([(
891 PARQUET_FIELD_ID_META_KEY.to_string(),
892 "0".to_string(),
893 )])),
894 Field::new("b", DataType::Int32, true).with_metadata(HashMap::from([(
895 PARQUET_FIELD_ID_META_KEY.to_string(),
896 "1".to_string(),
897 )])),
898 ]),
899 3,
900 );
901 builder
902 .field_builder::<Int32Builder>(0)
903 .unwrap()
904 .append_null();
905 builder
906 .field_builder::<Int32Builder>(1)
907 .unwrap()
908 .append_null();
909 builder.append(true);
910
911 builder
912 .field_builder::<Int32Builder>(0)
913 .unwrap()
914 .append_value(1);
915 builder
916 .field_builder::<Int32Builder>(1)
917 .unwrap()
918 .append_null();
919 builder.append(true);
920
921 builder
922 .field_builder::<Int32Builder>(0)
923 .unwrap()
924 .append_value(1);
925 builder
926 .field_builder::<Int32Builder>(1)
927 .unwrap()
928 .append_value(1);
929 builder.append_null();
930
931 Arc::new(builder.finish()) as ArrayRef
932 };
933
934 let iceberg_struct_type = StructType::new(vec![
935 Arc::new(NestedField::optional(
936 0,
937 "a",
938 Type::Primitive(PrimitiveType::Int),
939 )),
940 Arc::new(NestedField::optional(
941 1,
942 "b",
943 Type::Primitive(PrimitiveType::Int),
944 )),
945 ]);
946
947 let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap();
948 assert_eq!(result, vec![
949 Some(Literal::Struct(Struct::from_iter(vec![None, None,]))),
950 Some(Literal::Struct(Struct::from_iter(vec![
951 Some(Literal::int(1)),
952 None,
953 ]))),
954 None,
955 ]);
956 }
957
958 #[test]
959 fn test_empty_struct() {
960 let struct_array = Arc::new(StructArray::new_null(Fields::empty(), 3)) as ArrayRef;
961 let iceberg_struct_type = StructType::new(vec![]);
962 let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap();
963 assert_eq!(result, vec![None; 0]);
964 }
965
966 #[test]
967 fn test_find_field_by_id() {
968 let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]);
970 let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]);
971
972 let nested_struct_array =
974 Arc::new(StructArray::from(vec![
975 (
976 Arc::new(Field::new("field_a", DataType::Int32, true).with_metadata(
977 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
978 )),
979 Arc::new(field_a_array) as ArrayRef,
980 ),
981 (
982 Arc::new(Field::new("field_b", DataType::Utf8, true).with_metadata(
983 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
984 )),
985 Arc::new(field_b_array) as ArrayRef,
986 ),
987 ])) as ArrayRef;
988
989 let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]);
990
991 let struct_array = Arc::new(StructArray::from(vec![
993 (
994 Arc::new(
995 Field::new(
996 "nested_struct",
997 DataType::Struct(Fields::from(vec![
998 Field::new("field_a", DataType::Int32, true).with_metadata(
999 HashMap::from([(
1000 PARQUET_FIELD_ID_META_KEY.to_string(),
1001 "1".to_string(),
1002 )]),
1003 ),
1004 Field::new("field_b", DataType::Utf8, true).with_metadata(
1005 HashMap::from([(
1006 PARQUET_FIELD_ID_META_KEY.to_string(),
1007 "2".to_string(),
1008 )]),
1009 ),
1010 ])),
1011 true,
1012 )
1013 .with_metadata(HashMap::from([(
1014 PARQUET_FIELD_ID_META_KEY.to_string(),
1015 "3".to_string(),
1016 )])),
1017 ),
1018 nested_struct_array,
1019 ),
1020 (
1021 Arc::new(Field::new("field_c", DataType::Int32, true).with_metadata(
1022 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
1023 )),
1024 Arc::new(field_c_array) as ArrayRef,
1025 ),
1026 ])) as ArrayRef;
1027
1028 let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Id);
1030
1031 let nested_field = NestedField::optional(
1033 3,
1034 "nested_struct",
1035 Type::Struct(StructType::new(vec![
1036 Arc::new(NestedField::optional(
1037 1,
1038 "field_a",
1039 Type::Primitive(PrimitiveType::Int),
1040 )),
1041 Arc::new(NestedField::optional(
1042 2,
1043 "field_b",
1044 Type::Primitive(PrimitiveType::String),
1045 )),
1046 ])),
1047 );
1048 let nested_partner = accessor
1049 .field_partner(&struct_array, &nested_field)
1050 .unwrap();
1051
1052 let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int));
1054 let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap();
1055
1056 let int_array = field_a_partner
1058 .as_any()
1059 .downcast_ref::<Int32Array>()
1060 .unwrap();
1061 assert_eq!(int_array.value(0), 42);
1062 assert_eq!(int_array.value(1), 43);
1063 assert!(int_array.is_null(2));
1064 }
1065
1066 #[test]
1067 fn test_find_field_by_name() {
1068 let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]);
1070 let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]);
1071
1072 let nested_struct_array = Arc::new(StructArray::from(vec![
1074 (
1075 Arc::new(Field::new("field_a", DataType::Int32, true)),
1076 Arc::new(field_a_array) as ArrayRef,
1077 ),
1078 (
1079 Arc::new(Field::new("field_b", DataType::Utf8, true)),
1080 Arc::new(field_b_array) as ArrayRef,
1081 ),
1082 ])) as ArrayRef;
1083
1084 let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]);
1085
1086 let struct_array = Arc::new(StructArray::from(vec![
1088 (
1089 Arc::new(Field::new(
1090 "nested_struct",
1091 DataType::Struct(Fields::from(vec![
1092 Field::new("field_a", DataType::Int32, true),
1093 Field::new("field_b", DataType::Utf8, true),
1094 ])),
1095 true,
1096 )),
1097 nested_struct_array,
1098 ),
1099 (
1100 Arc::new(Field::new("field_c", DataType::Int32, true)),
1101 Arc::new(field_c_array) as ArrayRef,
1102 ),
1103 ])) as ArrayRef;
1104
1105 let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Name);
1107
1108 let nested_field = NestedField::optional(
1110 3,
1111 "nested_struct",
1112 Type::Struct(StructType::new(vec![
1113 Arc::new(NestedField::optional(
1114 1,
1115 "field_a",
1116 Type::Primitive(PrimitiveType::Int),
1117 )),
1118 Arc::new(NestedField::optional(
1119 2,
1120 "field_b",
1121 Type::Primitive(PrimitiveType::String),
1122 )),
1123 ])),
1124 );
1125 let nested_partner = accessor
1126 .field_partner(&struct_array, &nested_field)
1127 .unwrap();
1128
1129 let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int));
1131 let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap();
1132
1133 let int_array = field_a_partner
1135 .as_any()
1136 .downcast_ref::<Int32Array>()
1137 .unwrap();
1138 assert_eq!(int_array.value(0), 42);
1139 assert_eq!(int_array.value(1), 43);
1140 assert!(int_array.is_null(2));
1141 }
1142
1143 #[test]
1144 fn test_complex_nested() {
1145 let struct_type = StructType::new(vec![
1152 Arc::new(NestedField::required(
1153 0,
1154 "A",
1155 Type::List(ListType::new(Arc::new(NestedField::required(
1156 1,
1157 "item",
1158 Type::Struct(StructType::new(vec![
1159 Arc::new(NestedField::required(
1160 2,
1161 "a1",
1162 Type::Primitive(PrimitiveType::Int),
1163 )),
1164 Arc::new(NestedField::required(
1165 3,
1166 "a2",
1167 Type::Primitive(PrimitiveType::Int),
1168 )),
1169 ])),
1170 )))),
1171 )),
1172 Arc::new(NestedField::required(
1173 4,
1174 "B",
1175 Type::List(ListType::new(Arc::new(NestedField::required(
1176 5,
1177 "item",
1178 Type::Map(MapType::new(
1179 NestedField::optional(6, "keys", Type::Primitive(PrimitiveType::Int))
1180 .into(),
1181 NestedField::optional(7, "values", Type::Primitive(PrimitiveType::Int))
1182 .into(),
1183 )),
1184 )))),
1185 )),
1186 Arc::new(NestedField::required(
1187 8,
1188 "C",
1189 Type::List(ListType::new(Arc::new(NestedField::required(
1190 9,
1191 "item",
1192 Type::List(ListType::new(Arc::new(NestedField::optional(
1193 10,
1194 "item",
1195 Type::Primitive(PrimitiveType::Int),
1196 )))),
1197 )))),
1198 )),
1199 ]);
1200
1201 let struct_array =
1207 {
1208 let a_struct_a1_builder = Int32Builder::new();
1209 let a_struct_a2_builder = Int32Builder::new();
1210 let a_struct_builder =
1211 StructBuilder::new(
1212 vec![
1213 Field::new("a1", DataType::Int32, false).with_metadata(HashMap::from(
1214 [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
1215 )),
1216 Field::new("a2", DataType::Int32, false).with_metadata(HashMap::from(
1217 [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())],
1218 )),
1219 ],
1220 vec![Box::new(a_struct_a1_builder), Box::new(a_struct_a2_builder)],
1221 );
1222 let a_builder = ListBuilder::new(a_struct_builder);
1223
1224 let map_key_builder = Int32Builder::new();
1225 let map_value_builder = Int32Builder::new();
1226 let map_builder = MapBuilder::new(None, map_key_builder, map_value_builder);
1227 let b_builder = ListBuilder::new(map_builder);
1228
1229 let inner_list_item_builder = Int32Builder::new();
1230 let inner_list_builder = ListBuilder::new(inner_list_item_builder);
1231 let c_builder = ListBuilder::new(inner_list_builder);
1232
1233 let mut top_struct_builder = {
1234 let a_struct_type =
1235 DataType::Struct(Fields::from(vec![
1236 Field::new("a1", DataType::Int32, false).with_metadata(HashMap::from(
1237 [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
1238 )),
1239 Field::new("a2", DataType::Int32, false).with_metadata(HashMap::from(
1240 [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())],
1241 )),
1242 ]));
1243 let a_type =
1244 DataType::List(Arc::new(Field::new("item", a_struct_type.clone(), true)));
1245
1246 let b_map_entry_struct = Field::new(
1247 "entries",
1248 DataType::Struct(Fields::from(vec![
1249 Field::new("keys", DataType::Int32, false),
1250 Field::new("values", DataType::Int32, true),
1251 ])),
1252 false,
1253 );
1254 let b_map_type =
1255 DataType::Map(Arc::new(b_map_entry_struct), false);
1256 let b_type =
1257 DataType::List(Arc::new(Field::new("item", b_map_type.clone(), true)));
1258
1259 let c_inner_list_type =
1260 DataType::List(Arc::new(Field::new("item", DataType::Int32, true)));
1261 let c_type = DataType::List(Arc::new(Field::new(
1262 "item",
1263 c_inner_list_type.clone(),
1264 true,
1265 )));
1266 StructBuilder::new(
1267 Fields::from(vec![
1268 Field::new("A", a_type.clone(), false).with_metadata(HashMap::from([
1269 (PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string()),
1270 ])),
1271 Field::new("B", b_type.clone(), false).with_metadata(HashMap::from([
1272 (PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string()),
1273 ])),
1274 Field::new("C", c_type.clone(), false).with_metadata(HashMap::from([
1275 (PARQUET_FIELD_ID_META_KEY.to_string(), "8".to_string()),
1276 ])),
1277 ]),
1278 vec![
1279 Box::new(a_builder),
1280 Box::new(b_builder),
1281 Box::new(c_builder),
1282 ],
1283 )
1284 };
1285
1286 {
1289 let a_builder = top_struct_builder
1290 .field_builder::<ListBuilder<StructBuilder>>(0)
1291 .unwrap();
1292 let struct_builder = a_builder.values();
1293 struct_builder
1294 .field_builder::<Int32Builder>(0)
1295 .unwrap()
1296 .append_value(10);
1297 struct_builder
1298 .field_builder::<Int32Builder>(1)
1299 .unwrap()
1300 .append_value(20);
1301 struct_builder.append(true);
1302 let struct_builder = a_builder.values();
1303 struct_builder
1304 .field_builder::<Int32Builder>(0)
1305 .unwrap()
1306 .append_value(11);
1307 struct_builder
1308 .field_builder::<Int32Builder>(1)
1309 .unwrap()
1310 .append_value(21);
1311 struct_builder.append(true);
1312 a_builder.append(true);
1313 }
1314 {
1315 let b_builder = top_struct_builder
1316 .field_builder::<ListBuilder<MapBuilder<Int32Builder, Int32Builder>>>(1)
1317 .unwrap();
1318 let map_builder = b_builder.values();
1319 map_builder.keys().append_value(1);
1320 map_builder.values().append_value(100);
1321 map_builder.keys().append_value(3);
1322 map_builder.values().append_value(300);
1323 map_builder.append(true).unwrap();
1324
1325 map_builder.keys().append_value(2);
1326 map_builder.values().append_value(200);
1327 map_builder.append(true).unwrap();
1328
1329 b_builder.append(true);
1330 }
1331 {
1332 let c_builder = top_struct_builder
1333 .field_builder::<ListBuilder<ListBuilder<Int32Builder>>>(2)
1334 .unwrap();
1335 let inner_list_builder = c_builder.values();
1336 inner_list_builder.values().append_value(100);
1337 inner_list_builder.values().append_value(101);
1338 inner_list_builder.values().append_value(102);
1339 inner_list_builder.append(true);
1340 let inner_list_builder = c_builder.values();
1341 inner_list_builder.values().append_value(200);
1342 inner_list_builder.values().append_value(201);
1343 inner_list_builder.append(true);
1344 c_builder.append(true);
1345 }
1346 top_struct_builder.append(true);
1347
1348 {
1351 let a_builder = top_struct_builder
1352 .field_builder::<ListBuilder<StructBuilder>>(0)
1353 .unwrap();
1354 let struct_builder = a_builder.values();
1355 struct_builder
1356 .field_builder::<Int32Builder>(0)
1357 .unwrap()
1358 .append_value(12);
1359 struct_builder
1360 .field_builder::<Int32Builder>(1)
1361 .unwrap()
1362 .append_value(22);
1363 struct_builder.append(true);
1364 let struct_builder = a_builder.values();
1365 struct_builder
1366 .field_builder::<Int32Builder>(0)
1367 .unwrap()
1368 .append_value(13);
1369 struct_builder
1370 .field_builder::<Int32Builder>(1)
1371 .unwrap()
1372 .append_value(23);
1373 struct_builder.append(true);
1374 a_builder.append(true);
1375 }
1376 {
1377 let b_builder = top_struct_builder
1378 .field_builder::<ListBuilder<MapBuilder<Int32Builder, Int32Builder>>>(1)
1379 .unwrap();
1380 let map_builder = b_builder.values();
1381 map_builder.keys().append_value(3);
1382 map_builder.values().append_value(300);
1383 map_builder.append(true).unwrap();
1384
1385 b_builder.append(true);
1386 }
1387 {
1388 let c_builder = top_struct_builder
1389 .field_builder::<ListBuilder<ListBuilder<Int32Builder>>>(2)
1390 .unwrap();
1391 let inner_list_builder = c_builder.values();
1392 inner_list_builder.values().append_value(300);
1393 inner_list_builder.values().append_value(301);
1394 inner_list_builder.values().append_value(302);
1395 inner_list_builder.append(true);
1396 let inner_list_builder = c_builder.values();
1397 inner_list_builder.values().append_value(400);
1398 inner_list_builder.values().append_value(401);
1399 inner_list_builder.append(true);
1400 c_builder.append(true);
1401 }
1402 top_struct_builder.append(true);
1403
1404 Arc::new(top_struct_builder.finish()) as ArrayRef
1405 };
1406
1407 let result = arrow_struct_to_literal(&struct_array, &struct_type).unwrap();
1408 assert_eq!(result, vec![
1409 Some(Literal::Struct(Struct::from_iter(vec![
1410 Some(Literal::List(vec![
1411 Some(Literal::Struct(Struct::from_iter(vec![
1412 Some(Literal::int(10)),
1413 Some(Literal::int(20)),
1414 ]))),
1415 Some(Literal::Struct(Struct::from_iter(vec![
1416 Some(Literal::int(11)),
1417 Some(Literal::int(21)),
1418 ]))),
1419 ])),
1420 Some(Literal::List(vec![
1421 Some(Literal::Map(Map::from_iter(vec![
1422 (Literal::int(1), Some(Literal::int(100))),
1423 (Literal::int(3), Some(Literal::int(300))),
1424 ]))),
1425 Some(Literal::Map(Map::from_iter(vec![(
1426 Literal::int(2),
1427 Some(Literal::int(200))
1428 ),]))),
1429 ])),
1430 Some(Literal::List(vec![
1431 Some(Literal::List(vec![
1432 Some(Literal::int(100)),
1433 Some(Literal::int(101)),
1434 Some(Literal::int(102)),
1435 ])),
1436 Some(Literal::List(vec![
1437 Some(Literal::int(200)),
1438 Some(Literal::int(201)),
1439 ])),
1440 ])),
1441 ]))),
1442 Some(Literal::Struct(Struct::from_iter(vec![
1443 Some(Literal::List(vec![
1444 Some(Literal::Struct(Struct::from_iter(vec![
1445 Some(Literal::int(12)),
1446 Some(Literal::int(22)),
1447 ]))),
1448 Some(Literal::Struct(Struct::from_iter(vec![
1449 Some(Literal::int(13)),
1450 Some(Literal::int(23)),
1451 ]))),
1452 ])),
1453 Some(Literal::List(vec![Some(Literal::Map(Map::from_iter(
1454 vec![(Literal::int(3), Some(Literal::int(300))),]
1455 ))),])),
1456 Some(Literal::List(vec![
1457 Some(Literal::List(vec![
1458 Some(Literal::int(300)),
1459 Some(Literal::int(301)),
1460 Some(Literal::int(302)),
1461 ])),
1462 Some(Literal::List(vec![
1463 Some(Literal::int(400)),
1464 Some(Literal::int(401)),
1465 ])),
1466 ])),
1467 ]))),
1468 ]);
1469 }
1470}