1use std::collections::HashMap;
21use std::sync::Arc;
22
23use arrow_array::types::{Decimal128Type, validate_decimal_precision_and_scale};
24use arrow_array::{
25 BinaryArray, BooleanArray, Date32Array, Datum as ArrowDatum, Decimal128Array,
26 FixedSizeBinaryArray, Float32Array, Float64Array, Int32Array, Int64Array, Scalar, StringArray,
27 TimestampMicrosecondArray, TimestampNanosecondArray,
28};
29use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
30use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
31use parquet::file::statistics::Statistics;
32use uuid::Uuid;
33
34use crate::error::Result;
35use crate::spec::decimal_utils::i128_from_be_bytes;
36use crate::spec::{
37 Datum, FIRST_FIELD_ID, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral,
38 PrimitiveType, Schema, SchemaVisitor, StructType, Type,
39};
40use crate::{Error, ErrorKind};
41
42pub const DEFAULT_MAP_FIELD_NAME: &str = "key_value";
44pub const UTC_TIME_ZONE: &str = "+00:00";
46
47pub trait ArrowSchemaVisitor {
51 type T;
53
54 type U;
56
57 fn before_field(&mut self, _field: &Field) -> Result<()> {
59 Ok(())
60 }
61
62 fn after_field(&mut self, _field: &Field) -> Result<()> {
64 Ok(())
65 }
66
67 fn before_list_element(&mut self, _field: &Field) -> Result<()> {
69 Ok(())
70 }
71
72 fn after_list_element(&mut self, _field: &Field) -> Result<()> {
74 Ok(())
75 }
76
77 fn before_map_key(&mut self, _field: &Field) -> Result<()> {
79 Ok(())
80 }
81
82 fn after_map_key(&mut self, _field: &Field) -> Result<()> {
84 Ok(())
85 }
86
87 fn before_map_value(&mut self, _field: &Field) -> Result<()> {
89 Ok(())
90 }
91
92 fn after_map_value(&mut self, _field: &Field) -> Result<()> {
94 Ok(())
95 }
96
97 fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U>;
99
100 fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> Result<Self::T>;
102
103 fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T>;
105
106 fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result<Self::T>;
108
109 fn primitive(&mut self, p: &DataType) -> Result<Self::T>;
111}
112
113fn visit_type<V: ArrowSchemaVisitor>(r#type: &DataType, visitor: &mut V) -> Result<V::T> {
115 match r#type {
116 p if p.is_primitive()
117 || matches!(
118 p,
119 DataType::Boolean
120 | DataType::Utf8
121 | DataType::LargeUtf8
122 | DataType::Utf8View
123 | DataType::Binary
124 | DataType::LargeBinary
125 | DataType::BinaryView
126 | DataType::FixedSizeBinary(_)
127 ) =>
128 {
129 visitor.primitive(p)
130 }
131 DataType::List(element_field) => visit_list(r#type, element_field, visitor),
132 DataType::LargeList(element_field) => visit_list(r#type, element_field, visitor),
133 DataType::FixedSizeList(element_field, _) => visit_list(r#type, element_field, visitor),
134 DataType::Map(field, _) => match field.data_type() {
135 DataType::Struct(fields) => {
136 if fields.len() != 2 {
137 return Err(Error::new(
138 ErrorKind::DataInvalid,
139 "Map field must have exactly 2 fields",
140 ));
141 }
142
143 let key_field = &fields[0];
144 let value_field = &fields[1];
145
146 let key_result = {
147 visitor.before_map_key(key_field)?;
148 let ret = visit_type(key_field.data_type(), visitor)?;
149 visitor.after_map_key(key_field)?;
150 ret
151 };
152
153 let value_result = {
154 visitor.before_map_value(value_field)?;
155 let ret = visit_type(value_field.data_type(), visitor)?;
156 visitor.after_map_value(value_field)?;
157 ret
158 };
159
160 visitor.map(r#type, key_result, value_result)
161 }
162 _ => Err(Error::new(
163 ErrorKind::DataInvalid,
164 "Map field must have struct type",
165 )),
166 },
167 DataType::Struct(fields) => visit_struct(fields, visitor),
168 DataType::Dictionary(_key_type, value_type) => visit_type(value_type, visitor),
169 other => Err(Error::new(
170 ErrorKind::DataInvalid,
171 format!("Cannot visit Arrow data type: {other}"),
172 )),
173 }
174}
175
176fn visit_list<V: ArrowSchemaVisitor>(
178 data_type: &DataType,
179 element_field: &Field,
180 visitor: &mut V,
181) -> Result<V::T> {
182 visitor.before_list_element(element_field)?;
183 let value = visit_type(element_field.data_type(), visitor)?;
184 visitor.after_list_element(element_field)?;
185 visitor.list(data_type, value)
186}
187
188fn visit_struct<V: ArrowSchemaVisitor>(fields: &Fields, visitor: &mut V) -> Result<V::T> {
190 let mut results = Vec::with_capacity(fields.len());
191 for field in fields {
192 visitor.before_field(field)?;
193 let result = visit_type(field.data_type(), visitor)?;
194 visitor.after_field(field)?;
195 results.push(result);
196 }
197
198 visitor.r#struct(fields, results)
199}
200
201pub(crate) fn visit_schema<V: ArrowSchemaVisitor>(
203 schema: &ArrowSchema,
204 visitor: &mut V,
205) -> Result<V::U> {
206 let mut results = Vec::with_capacity(schema.fields().len());
207 for field in schema.fields() {
208 visitor.before_field(field)?;
209 let result = visit_type(field.data_type(), visitor)?;
210 visitor.after_field(field)?;
211 results.push(result);
212 }
213 visitor.schema(schema, results)
214}
215
216pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result<Schema> {
222 let mut visitor = ArrowSchemaConverter::new();
223 visit_schema(schema, &mut visitor)
224}
225
226pub fn arrow_schema_to_schema_auto_assign_ids(schema: &ArrowSchema) -> Result<Schema> {
235 let mut visitor = ArrowSchemaConverter::new_with_field_ids_from(FIRST_FIELD_ID);
236 visit_schema(schema, &mut visitor)
237}
238
239pub fn arrow_type_to_type(ty: &DataType) -> Result<Type> {
241 let mut visitor = ArrowSchemaConverter::new();
242 visit_type(ty, &mut visitor)
243}
244
245const ARROW_FIELD_DOC_KEY: &str = "doc";
246
247pub(super) fn get_field_id_from_metadata(field: &Field) -> Result<i32> {
248 if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
249 return value.parse::<i32>().map_err(|e| {
250 Error::new(
251 ErrorKind::DataInvalid,
252 "Failed to parse field id".to_string(),
253 )
254 .with_context("value", value)
255 .with_source(e)
256 });
257 }
258 Err(Error::new(
259 ErrorKind::DataInvalid,
260 "Field id not found in metadata",
261 ))
262}
263
264fn get_field_doc(field: &Field) -> Option<String> {
265 if let Some(value) = field.metadata().get(ARROW_FIELD_DOC_KEY) {
266 return Some(value.clone());
267 }
268 None
269}
270
271struct ArrowSchemaConverter {
272 reassign_field_ids_from: Option<i32>,
275 next_field_id: i32,
279}
280
281impl ArrowSchemaConverter {
282 fn new() -> Self {
283 Self {
284 reassign_field_ids_from: None,
285 next_field_id: 0,
286 }
287 }
288
289 fn new_with_field_ids_from(start_from: i32) -> Self {
290 Self {
291 reassign_field_ids_from: Some(start_from),
292 next_field_id: 0,
293 }
294 }
295
296 fn get_field_id(&mut self, field: &Field) -> Result<i32> {
297 if self.reassign_field_ids_from.is_some() {
298 let temp_id = self.next_field_id;
302 self.next_field_id += 1;
303 Ok(temp_id)
304 } else {
305 get_field_id_from_metadata(field)
307 }
308 }
309
310 fn convert_fields(
311 &mut self,
312 fields: &Fields,
313 field_results: &[Type],
314 ) -> Result<Vec<NestedFieldRef>> {
315 let mut results = Vec::with_capacity(fields.len());
316 for i in 0..fields.len() {
317 let field = &fields[i];
318 let field_type = &field_results[i];
319 let id = self.get_field_id(field)?;
320 let doc = get_field_doc(field);
321 let nested_field = NestedField {
322 id,
323 doc,
324 name: field.name().clone(),
325 required: !field.is_nullable(),
326 field_type: Box::new(field_type.clone()),
327 initial_default: None,
328 write_default: None,
329 };
330 results.push(Arc::new(nested_field));
331 }
332 Ok(results)
333 }
334}
335
336impl ArrowSchemaVisitor for ArrowSchemaConverter {
337 type T = Type;
338 type U = Schema;
339
340 fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U> {
341 let fields = self.convert_fields(schema.fields(), &values)?;
342 let mut builder = Schema::builder().with_fields(fields);
343 if let Some(start_from) = self.reassign_field_ids_from {
344 builder = builder.with_reassigned_field_ids(start_from)
345 }
346 builder.build()
347 }
348
349 fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> Result<Self::T> {
350 let fields = self.convert_fields(fields, &results)?;
351 Ok(Type::Struct(StructType::new(fields)))
352 }
353
354 fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T> {
355 let element_field = match list {
356 DataType::List(element_field) => element_field,
357 DataType::LargeList(element_field) => element_field,
358 DataType::FixedSizeList(element_field, _) => element_field,
359 _ => {
360 return Err(Error::new(
361 ErrorKind::DataInvalid,
362 "List type must have list data type",
363 ));
364 }
365 };
366
367 let id = self.get_field_id(element_field)?;
368 let doc = get_field_doc(element_field);
369 let mut element_field =
370 NestedField::list_element(id, value.clone(), !element_field.is_nullable());
371 if let Some(doc) = doc {
372 element_field = element_field.with_doc(doc);
373 }
374 let element_field = Arc::new(element_field);
375 Ok(Type::List(ListType { element_field }))
376 }
377
378 fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result<Self::T> {
379 match map {
380 DataType::Map(field, _) => match field.data_type() {
381 DataType::Struct(fields) => {
382 if fields.len() != 2 {
383 return Err(Error::new(
384 ErrorKind::DataInvalid,
385 "Map field must have exactly 2 fields",
386 ));
387 }
388
389 let key_field = &fields[0];
390 let value_field = &fields[1];
391
392 let key_id = self.get_field_id(key_field)?;
393 let key_doc = get_field_doc(key_field);
394 let mut key_field = NestedField::map_key_element(key_id, key_value.clone());
395 if let Some(doc) = key_doc {
396 key_field = key_field.with_doc(doc);
397 }
398 let key_field = Arc::new(key_field);
399
400 let value_id = self.get_field_id(value_field)?;
401 let value_doc = get_field_doc(value_field);
402 let mut value_field = NestedField::map_value_element(
403 value_id,
404 value.clone(),
405 !value_field.is_nullable(),
406 );
407 if let Some(doc) = value_doc {
408 value_field = value_field.with_doc(doc);
409 }
410 let value_field = Arc::new(value_field);
411
412 Ok(Type::Map(MapType {
413 key_field,
414 value_field,
415 }))
416 }
417 _ => Err(Error::new(
418 ErrorKind::DataInvalid,
419 "Map field must have struct type",
420 )),
421 },
422 _ => Err(Error::new(
423 ErrorKind::DataInvalid,
424 "Map type must have map data type",
425 )),
426 }
427 }
428
429 fn primitive(&mut self, p: &DataType) -> Result<Self::T> {
430 match p {
431 DataType::Boolean => Ok(Type::Primitive(PrimitiveType::Boolean)),
432 DataType::Int8 | DataType::Int16 | DataType::Int32 => {
433 Ok(Type::Primitive(PrimitiveType::Int))
434 }
435 DataType::UInt8 | DataType::UInt16 => Ok(Type::Primitive(PrimitiveType::Int)),
436 DataType::UInt32 => Ok(Type::Primitive(PrimitiveType::Long)),
437 DataType::Int64 => Ok(Type::Primitive(PrimitiveType::Long)),
438 DataType::UInt64 => {
439 Err(Error::new(
441 ErrorKind::DataInvalid,
442 "UInt64 is not supported. Use Int64 for values ≤ 9,223,372,036,854,775,807 or Decimal(20,0) for full uint64 range.",
443 ))
444 }
445 DataType::Float32 => Ok(Type::Primitive(PrimitiveType::Float)),
446 DataType::Float64 => Ok(Type::Primitive(PrimitiveType::Double)),
447 DataType::Decimal128(p, s) => Type::decimal(*p as u32, *s as u32).map_err(|e| {
448 Error::new(
449 ErrorKind::DataInvalid,
450 "Failed to create decimal type".to_string(),
451 )
452 .with_source(e)
453 }),
454 DataType::Date32 => Ok(Type::Primitive(PrimitiveType::Date)),
455 DataType::Time64(unit) if unit == &TimeUnit::Microsecond => {
456 Ok(Type::Primitive(PrimitiveType::Time))
457 }
458 DataType::Timestamp(unit, None) if unit == &TimeUnit::Microsecond => {
459 Ok(Type::Primitive(PrimitiveType::Timestamp))
460 }
461 DataType::Timestamp(unit, None) if unit == &TimeUnit::Nanosecond => {
462 Ok(Type::Primitive(PrimitiveType::TimestampNs))
463 }
464 DataType::Timestamp(unit, Some(zone))
465 if unit == &TimeUnit::Microsecond
466 && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") =>
467 {
468 Ok(Type::Primitive(PrimitiveType::Timestamptz))
469 }
470 DataType::Timestamp(unit, Some(zone))
471 if unit == &TimeUnit::Nanosecond
472 && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") =>
473 {
474 Ok(Type::Primitive(PrimitiveType::TimestamptzNs))
475 }
476 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
477 Ok(Type::Primitive(PrimitiveType::Binary))
478 }
479 DataType::FixedSizeBinary(width) => {
480 Ok(Type::Primitive(PrimitiveType::Fixed(*width as u64)))
481 }
482 DataType::Utf8View | DataType::Utf8 | DataType::LargeUtf8 => {
483 Ok(Type::Primitive(PrimitiveType::String))
484 }
485 _ => Err(Error::new(
486 ErrorKind::DataInvalid,
487 format!("Unsupported Arrow data type: {p}"),
488 )),
489 }
490 }
491}
492
493struct ToArrowSchemaConverter;
494
495enum ArrowSchemaOrFieldOrType {
496 Schema(ArrowSchema),
497 Field(Field),
498 Type(DataType),
499}
500
501impl SchemaVisitor for ToArrowSchemaConverter {
502 type T = ArrowSchemaOrFieldOrType;
503
504 fn schema(
505 &mut self,
506 _schema: &crate::spec::Schema,
507 value: ArrowSchemaOrFieldOrType,
508 ) -> crate::Result<ArrowSchemaOrFieldOrType> {
509 let struct_type = match value {
510 ArrowSchemaOrFieldOrType::Type(DataType::Struct(fields)) => fields,
511 _ => unreachable!(),
512 };
513 Ok(ArrowSchemaOrFieldOrType::Schema(ArrowSchema::new(
514 struct_type,
515 )))
516 }
517
518 fn field(
519 &mut self,
520 field: &crate::spec::NestedFieldRef,
521 value: ArrowSchemaOrFieldOrType,
522 ) -> crate::Result<ArrowSchemaOrFieldOrType> {
523 let ty = match value {
524 ArrowSchemaOrFieldOrType::Type(ty) => ty,
525 _ => unreachable!(),
526 };
527 let metadata = if let Some(doc) = &field.doc {
528 HashMap::from([
529 (PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string()),
530 (ARROW_FIELD_DOC_KEY.to_string(), doc.clone()),
531 ])
532 } else {
533 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string())])
534 };
535 Ok(ArrowSchemaOrFieldOrType::Field(
536 Field::new(field.name.clone(), ty, !field.required).with_metadata(metadata),
537 ))
538 }
539
540 fn r#struct(
541 &mut self,
542 _: &crate::spec::StructType,
543 results: Vec<ArrowSchemaOrFieldOrType>,
544 ) -> crate::Result<ArrowSchemaOrFieldOrType> {
545 let fields = results
546 .into_iter()
547 .map(|result| match result {
548 ArrowSchemaOrFieldOrType::Field(field) => field,
549 _ => unreachable!(),
550 })
551 .collect();
552 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Struct(fields)))
553 }
554
555 fn list(
556 &mut self,
557 list: &crate::spec::ListType,
558 value: ArrowSchemaOrFieldOrType,
559 ) -> crate::Result<Self::T> {
560 let field = match self.field(&list.element_field, value)? {
561 ArrowSchemaOrFieldOrType::Field(field) => field,
562 _ => unreachable!(),
563 };
564 let meta = if let Some(doc) = &list.element_field.doc {
565 HashMap::from([
566 (
567 PARQUET_FIELD_ID_META_KEY.to_string(),
568 list.element_field.id.to_string(),
569 ),
570 (ARROW_FIELD_DOC_KEY.to_string(), doc.clone()),
571 ])
572 } else {
573 HashMap::from([(
574 PARQUET_FIELD_ID_META_KEY.to_string(),
575 list.element_field.id.to_string(),
576 )])
577 };
578 let field = field.with_metadata(meta);
579 Ok(ArrowSchemaOrFieldOrType::Type(DataType::List(Arc::new(
580 field,
581 ))))
582 }
583
584 fn map(
585 &mut self,
586 map: &crate::spec::MapType,
587 key_value: ArrowSchemaOrFieldOrType,
588 value: ArrowSchemaOrFieldOrType,
589 ) -> crate::Result<ArrowSchemaOrFieldOrType> {
590 let key_field = match self.field(&map.key_field, key_value)? {
591 ArrowSchemaOrFieldOrType::Field(field) => field,
592 _ => unreachable!(),
593 };
594 let value_field = match self.field(&map.value_field, value)? {
595 ArrowSchemaOrFieldOrType::Field(field) => field,
596 _ => unreachable!(),
597 };
598 let field = Field::new(
599 DEFAULT_MAP_FIELD_NAME,
600 DataType::Struct(vec![key_field, value_field].into()),
601 false,
603 );
604
605 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Map(
606 field.into(),
607 false,
608 )))
609 }
610
611 fn primitive(
612 &mut self,
613 p: &crate::spec::PrimitiveType,
614 ) -> crate::Result<ArrowSchemaOrFieldOrType> {
615 match p {
616 crate::spec::PrimitiveType::Boolean => {
617 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Boolean))
618 }
619 crate::spec::PrimitiveType::Int => Ok(ArrowSchemaOrFieldOrType::Type(DataType::Int32)),
620 crate::spec::PrimitiveType::Long => Ok(ArrowSchemaOrFieldOrType::Type(DataType::Int64)),
621 crate::spec::PrimitiveType::Float => {
622 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Float32))
623 }
624 crate::spec::PrimitiveType::Double => {
625 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Float64))
626 }
627 crate::spec::PrimitiveType::Decimal { precision, scale } => {
628 let (precision, scale) = {
629 let precision: u8 = precision.to_owned().try_into().map_err(|err| {
630 Error::new(
631 crate::ErrorKind::DataInvalid,
632 "incompatible precision for decimal type convert",
633 )
634 .with_source(err)
635 })?;
636 let scale = scale.to_owned().try_into().map_err(|err| {
637 Error::new(
638 crate::ErrorKind::DataInvalid,
639 "incompatible scale for decimal type convert",
640 )
641 .with_source(err)
642 })?;
643 (precision, scale)
644 };
645 validate_decimal_precision_and_scale::<Decimal128Type>(precision, scale).map_err(
646 |err| {
647 Error::new(
648 crate::ErrorKind::DataInvalid,
649 "incompatible precision and scale for decimal type convert",
650 )
651 .with_source(err)
652 },
653 )?;
654 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Decimal128(
655 precision, scale,
656 )))
657 }
658 crate::spec::PrimitiveType::Date => {
659 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Date32))
660 }
661 crate::spec::PrimitiveType::Time => Ok(ArrowSchemaOrFieldOrType::Type(
662 DataType::Time64(TimeUnit::Microsecond),
663 )),
664 crate::spec::PrimitiveType::Timestamp => Ok(ArrowSchemaOrFieldOrType::Type(
665 DataType::Timestamp(TimeUnit::Microsecond, None),
666 )),
667 crate::spec::PrimitiveType::Timestamptz => Ok(ArrowSchemaOrFieldOrType::Type(
668 DataType::Timestamp(TimeUnit::Microsecond, Some(UTC_TIME_ZONE.into())),
670 )),
671 crate::spec::PrimitiveType::TimestampNs => Ok(ArrowSchemaOrFieldOrType::Type(
672 DataType::Timestamp(TimeUnit::Nanosecond, None),
673 )),
674 crate::spec::PrimitiveType::TimestamptzNs => Ok(ArrowSchemaOrFieldOrType::Type(
675 DataType::Timestamp(TimeUnit::Nanosecond, Some(UTC_TIME_ZONE.into())),
677 )),
678 crate::spec::PrimitiveType::String => {
679 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Utf8))
680 }
681 crate::spec::PrimitiveType::Uuid => Ok(ArrowSchemaOrFieldOrType::Type(
682 DataType::FixedSizeBinary(16),
683 )),
684 crate::spec::PrimitiveType::Fixed(len) => Ok(ArrowSchemaOrFieldOrType::Type(
685 i32::try_from(*len)
686 .ok()
687 .map(DataType::FixedSizeBinary)
688 .unwrap_or(DataType::LargeBinary),
689 )),
690 crate::spec::PrimitiveType::Binary => {
691 Ok(ArrowSchemaOrFieldOrType::Type(DataType::LargeBinary))
692 }
693 }
694 }
695}
696
697pub fn schema_to_arrow_schema(schema: &crate::spec::Schema) -> crate::Result<ArrowSchema> {
699 let mut converter = ToArrowSchemaConverter;
700 match crate::spec::visit_schema(schema, &mut converter)? {
701 ArrowSchemaOrFieldOrType::Schema(schema) => Ok(schema),
702 _ => unreachable!(),
703 }
704}
705
706pub fn type_to_arrow_type(ty: &crate::spec::Type) -> crate::Result<DataType> {
708 let mut converter = ToArrowSchemaConverter;
709 match crate::spec::visit_type(ty, &mut converter)? {
710 ArrowSchemaOrFieldOrType::Type(ty) => Ok(ty),
711 _ => unreachable!(),
712 }
713}
714
715pub(crate) fn get_arrow_datum(datum: &Datum) -> Result<Arc<dyn ArrowDatum + Send + Sync>> {
717 match (datum.data_type(), datum.literal()) {
718 (PrimitiveType::Boolean, PrimitiveLiteral::Boolean(value)) => {
719 Ok(Arc::new(BooleanArray::new_scalar(*value)))
720 }
721 (PrimitiveType::Int, PrimitiveLiteral::Int(value)) => {
722 Ok(Arc::new(Int32Array::new_scalar(*value)))
723 }
724 (PrimitiveType::Long, PrimitiveLiteral::Long(value)) => {
725 Ok(Arc::new(Int64Array::new_scalar(*value)))
726 }
727 (PrimitiveType::Float, PrimitiveLiteral::Float(value)) => {
728 Ok(Arc::new(Float32Array::new_scalar(value.into_inner())))
729 }
730 (PrimitiveType::Double, PrimitiveLiteral::Double(value)) => {
731 Ok(Arc::new(Float64Array::new_scalar(value.into_inner())))
732 }
733 (PrimitiveType::String, PrimitiveLiteral::String(value)) => {
734 Ok(Arc::new(StringArray::new_scalar(value.as_str())))
735 }
736 (PrimitiveType::Binary, PrimitiveLiteral::Binary(value)) => {
737 Ok(Arc::new(BinaryArray::new_scalar(value.as_slice())))
738 }
739 (PrimitiveType::Date, PrimitiveLiteral::Int(value)) => {
740 Ok(Arc::new(Date32Array::new_scalar(*value)))
741 }
742 (PrimitiveType::Timestamp, PrimitiveLiteral::Long(value)) => {
743 Ok(Arc::new(TimestampMicrosecondArray::new_scalar(*value)))
744 }
745 (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(value)) => Ok(Arc::new(Scalar::new(
746 TimestampMicrosecondArray::new(vec![*value; 1].into(), None).with_timezone_utc(),
747 ))),
748 (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(value)) => {
749 Ok(Arc::new(TimestampNanosecondArray::new_scalar(*value)))
750 }
751 (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(value)) => Ok(Arc::new(Scalar::new(
752 TimestampNanosecondArray::new(vec![*value; 1].into(), None).with_timezone_utc(),
753 ))),
754 (PrimitiveType::Decimal { precision, scale }, PrimitiveLiteral::Int128(value)) => {
755 let array = Decimal128Array::from_value(*value, 1)
756 .with_precision_and_scale(*precision as _, *scale as _)
757 .unwrap();
758 Ok(Arc::new(Scalar::new(array)))
759 }
760 (PrimitiveType::Uuid, PrimitiveLiteral::UInt128(value)) => {
761 let bytes = Uuid::from_u128(*value).into_bytes();
762 let array = FixedSizeBinaryArray::try_from_iter(vec![bytes].into_iter()).unwrap();
763 Ok(Arc::new(Scalar::new(array)))
764 }
765 (PrimitiveType::Fixed(_), PrimitiveLiteral::Binary(value)) => {
766 let array = FixedSizeBinaryArray::try_from_iter(std::iter::once(value.as_slice()))
767 .map_err(|e| Error::new(ErrorKind::DataInvalid, e.to_string()))?;
768 Ok(Arc::new(Scalar::new(array)))
769 }
770
771 (primitive_type, _) => Err(Error::new(
772 ErrorKind::FeatureUnsupported,
773 format!("Converting datum from type {primitive_type:?} to arrow not supported yet."),
774 )),
775 }
776}
777
778pub(crate) fn get_parquet_stat_min_as_datum(
779 primitive_type: &PrimitiveType,
780 stats: &Statistics,
781) -> Result<Option<Datum>> {
782 Ok(match (primitive_type, stats) {
783 (PrimitiveType::Boolean, Statistics::Boolean(stats)) => {
784 stats.min_opt().map(|val| Datum::bool(*val))
785 }
786 (PrimitiveType::Int, Statistics::Int32(stats)) => {
787 stats.min_opt().map(|val| Datum::int(*val))
788 }
789 (PrimitiveType::Date, Statistics::Int32(stats)) => {
790 stats.min_opt().map(|val| Datum::date(*val))
791 }
792 (PrimitiveType::Long, Statistics::Int64(stats)) => {
793 stats.min_opt().map(|val| Datum::long(*val))
794 }
795 (PrimitiveType::Time, Statistics::Int64(stats)) => {
796 let Some(val) = stats.min_opt() else {
797 return Ok(None);
798 };
799
800 Some(Datum::time_micros(*val)?)
801 }
802 (PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
803 stats.min_opt().map(|val| Datum::timestamp_micros(*val))
804 }
805 (PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
806 stats.min_opt().map(|val| Datum::timestamptz_micros(*val))
807 }
808 (PrimitiveType::TimestampNs, Statistics::Int64(stats)) => {
809 stats.min_opt().map(|val| Datum::timestamp_nanos(*val))
810 }
811 (PrimitiveType::TimestamptzNs, Statistics::Int64(stats)) => {
812 stats.min_opt().map(|val| Datum::timestamptz_nanos(*val))
813 }
814 (PrimitiveType::Float, Statistics::Float(stats)) => {
815 stats.min_opt().map(|val| Datum::float(*val))
816 }
817 (PrimitiveType::Double, Statistics::Double(stats)) => {
818 stats.min_opt().map(|val| Datum::double(*val))
819 }
820 (PrimitiveType::String, Statistics::ByteArray(stats)) => {
821 let Some(val) = stats.min_opt() else {
822 return Ok(None);
823 };
824
825 Some(Datum::string(val.as_utf8()?))
826 }
827 (
828 PrimitiveType::Decimal {
829 precision: _,
830 scale: _,
831 },
832 Statistics::ByteArray(stats),
833 ) => {
834 let Some(bytes) = stats.min_bytes_opt() else {
835 return Ok(None);
836 };
837 Some(Datum::new(
838 primitive_type.clone(),
839 PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
840 ))
841 }
842 (
843 PrimitiveType::Decimal {
844 precision: _,
845 scale: _,
846 },
847 Statistics::FixedLenByteArray(stats),
848 ) => {
849 let Some(bytes) = stats.min_bytes_opt() else {
850 return Ok(None);
851 };
852 Some(Datum::new(
853 primitive_type.clone(),
854 PrimitiveLiteral::Int128(i128_from_be_bytes(bytes).ok_or_else(|| {
855 Error::new(
856 ErrorKind::DataInvalid,
857 format!("Can't convert bytes to i128: {bytes:?}"),
858 )
859 })?),
860 ))
861 }
862 (
863 PrimitiveType::Decimal {
864 precision: _,
865 scale: _,
866 },
867 Statistics::Int32(stats),
868 ) => stats.min_opt().map(|val| {
869 Datum::new(
870 primitive_type.clone(),
871 PrimitiveLiteral::Int128(i128::from(*val)),
872 )
873 }),
874
875 (
876 PrimitiveType::Decimal {
877 precision: _,
878 scale: _,
879 },
880 Statistics::Int64(stats),
881 ) => stats.min_opt().map(|val| {
882 Datum::new(
883 primitive_type.clone(),
884 PrimitiveLiteral::Int128(i128::from(*val)),
885 )
886 }),
887 (PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) => {
888 let Some(bytes) = stats.min_bytes_opt() else {
889 return Ok(None);
890 };
891 if bytes.len() != 16 {
892 return Err(Error::new(
893 ErrorKind::Unexpected,
894 "Invalid length of uuid bytes.",
895 ));
896 }
897 Some(Datum::uuid(Uuid::from_bytes(
898 bytes[..16].try_into().unwrap(),
899 )))
900 }
901 (PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => {
902 let Some(bytes) = stat.min_bytes_opt() else {
903 return Ok(None);
904 };
905 if bytes.len() != *len as usize {
906 return Err(Error::new(
907 ErrorKind::Unexpected,
908 "Invalid length of fixed bytes.",
909 ));
910 }
911 Some(Datum::fixed(bytes.to_vec()))
912 }
913 (PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
914 return Ok(stat
915 .min_bytes_opt()
916 .map(|bytes| Datum::binary(bytes.to_vec())));
917 }
918 _ => {
919 return Ok(None);
920 }
921 })
922}
923
924pub(crate) fn get_parquet_stat_max_as_datum(
925 primitive_type: &PrimitiveType,
926 stats: &Statistics,
927) -> Result<Option<Datum>> {
928 Ok(match (primitive_type, stats) {
929 (PrimitiveType::Boolean, Statistics::Boolean(stats)) => {
930 stats.max_opt().map(|val| Datum::bool(*val))
931 }
932 (PrimitiveType::Int, Statistics::Int32(stats)) => {
933 stats.max_opt().map(|val| Datum::int(*val))
934 }
935 (PrimitiveType::Date, Statistics::Int32(stats)) => {
936 stats.max_opt().map(|val| Datum::date(*val))
937 }
938 (PrimitiveType::Long, Statistics::Int64(stats)) => {
939 stats.max_opt().map(|val| Datum::long(*val))
940 }
941 (PrimitiveType::Time, Statistics::Int64(stats)) => {
942 let Some(val) = stats.max_opt() else {
943 return Ok(None);
944 };
945
946 Some(Datum::time_micros(*val)?)
947 }
948 (PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
949 stats.max_opt().map(|val| Datum::timestamp_micros(*val))
950 }
951 (PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
952 stats.max_opt().map(|val| Datum::timestamptz_micros(*val))
953 }
954 (PrimitiveType::TimestampNs, Statistics::Int64(stats)) => {
955 stats.max_opt().map(|val| Datum::timestamp_nanos(*val))
956 }
957 (PrimitiveType::TimestamptzNs, Statistics::Int64(stats)) => {
958 stats.max_opt().map(|val| Datum::timestamptz_nanos(*val))
959 }
960 (PrimitiveType::Float, Statistics::Float(stats)) => {
961 stats.max_opt().map(|val| Datum::float(*val))
962 }
963 (PrimitiveType::Double, Statistics::Double(stats)) => {
964 stats.max_opt().map(|val| Datum::double(*val))
965 }
966 (PrimitiveType::String, Statistics::ByteArray(stats)) => {
967 let Some(val) = stats.max_opt() else {
968 return Ok(None);
969 };
970
971 Some(Datum::string(val.as_utf8()?))
972 }
973 (
974 PrimitiveType::Decimal {
975 precision: _,
976 scale: _,
977 },
978 Statistics::ByteArray(stats),
979 ) => {
980 let Some(bytes) = stats.max_bytes_opt() else {
981 return Ok(None);
982 };
983 Some(Datum::new(
984 primitive_type.clone(),
985 PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
986 ))
987 }
988 (
989 PrimitiveType::Decimal {
990 precision: _,
991 scale: _,
992 },
993 Statistics::FixedLenByteArray(stats),
994 ) => {
995 let Some(bytes) = stats.max_bytes_opt() else {
996 return Ok(None);
997 };
998 Some(Datum::new(
999 primitive_type.clone(),
1000 PrimitiveLiteral::Int128(i128_from_be_bytes(bytes).ok_or_else(|| {
1001 Error::new(
1002 ErrorKind::DataInvalid,
1003 format!("Can't convert bytes to i128: {bytes:?}"),
1004 )
1005 })?),
1006 ))
1007 }
1008 (
1009 PrimitiveType::Decimal {
1010 precision: _,
1011 scale: _,
1012 },
1013 Statistics::Int32(stats),
1014 ) => stats.max_opt().map(|val| {
1015 Datum::new(
1016 primitive_type.clone(),
1017 PrimitiveLiteral::Int128(i128::from(*val)),
1018 )
1019 }),
1020
1021 (
1022 PrimitiveType::Decimal {
1023 precision: _,
1024 scale: _,
1025 },
1026 Statistics::Int64(stats),
1027 ) => stats.max_opt().map(|val| {
1028 Datum::new(
1029 primitive_type.clone(),
1030 PrimitiveLiteral::Int128(i128::from(*val)),
1031 )
1032 }),
1033 (PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) => {
1034 let Some(bytes) = stats.max_bytes_opt() else {
1035 return Ok(None);
1036 };
1037 if bytes.len() != 16 {
1038 return Err(Error::new(
1039 ErrorKind::Unexpected,
1040 "Invalid length of uuid bytes.",
1041 ));
1042 }
1043 Some(Datum::uuid(Uuid::from_bytes(
1044 bytes[..16].try_into().unwrap(),
1045 )))
1046 }
1047 (PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => {
1048 let Some(bytes) = stat.max_bytes_opt() else {
1049 return Ok(None);
1050 };
1051 if bytes.len() != *len as usize {
1052 return Err(Error::new(
1053 ErrorKind::Unexpected,
1054 "Invalid length of fixed bytes.",
1055 ));
1056 }
1057 Some(Datum::fixed(bytes.to_vec()))
1058 }
1059 (PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
1060 return Ok(stat
1061 .max_bytes_opt()
1062 .map(|bytes| Datum::binary(bytes.to_vec())));
1063 }
1064 _ => {
1065 return Ok(None);
1066 }
1067 })
1068}
1069
1070impl TryFrom<&ArrowSchema> for crate::spec::Schema {
1071 type Error = Error;
1072
1073 fn try_from(schema: &ArrowSchema) -> crate::Result<Self> {
1074 arrow_schema_to_schema(schema)
1075 }
1076}
1077
1078impl TryFrom<&crate::spec::Schema> for ArrowSchema {
1079 type Error = Error;
1080
1081 fn try_from(schema: &crate::spec::Schema) -> crate::Result<Self> {
1082 schema_to_arrow_schema(schema)
1083 }
1084}
1085
1086pub fn datum_to_arrow_type_with_ree(datum: &Datum) -> DataType {
1108 let make_ree = |values_type: DataType| -> DataType {
1112 let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false));
1113 let values_field = Arc::new(Field::new("values", values_type, true));
1114 DataType::RunEndEncoded(run_ends_field, values_field)
1115 };
1116
1117 match datum.data_type() {
1119 PrimitiveType::Boolean => make_ree(DataType::Boolean),
1120 PrimitiveType::Int => make_ree(DataType::Int32),
1121 PrimitiveType::Long => make_ree(DataType::Int64),
1122 PrimitiveType::Float => make_ree(DataType::Float32),
1123 PrimitiveType::Double => make_ree(DataType::Float64),
1124 PrimitiveType::Date => make_ree(DataType::Date32),
1125 PrimitiveType::Time => make_ree(DataType::Int64),
1126 PrimitiveType::Timestamp => make_ree(DataType::Int64),
1127 PrimitiveType::Timestamptz => make_ree(DataType::Int64),
1128 PrimitiveType::TimestampNs => make_ree(DataType::Int64),
1129 PrimitiveType::TimestamptzNs => make_ree(DataType::Int64),
1130 PrimitiveType::String => make_ree(DataType::Utf8),
1131 PrimitiveType::Uuid => make_ree(DataType::Binary),
1132 PrimitiveType::Fixed(_) => make_ree(DataType::Binary),
1133 PrimitiveType::Binary => make_ree(DataType::Binary),
1134 PrimitiveType::Decimal { precision, scale } => {
1135 make_ree(DataType::Decimal128(*precision as u8, *scale as i8))
1136 }
1137 }
1138}
1139
1140struct MetadataStripVisitor {
1146 field_stack: Vec<Field>,
1148}
1149
1150impl MetadataStripVisitor {
1151 fn new() -> Self {
1152 Self {
1153 field_stack: Vec::new(),
1154 }
1155 }
1156}
1157
1158impl ArrowSchemaVisitor for MetadataStripVisitor {
1159 type T = Field;
1160 type U = ArrowSchema;
1161
1162 fn before_field(&mut self, field: &Field) -> Result<()> {
1163 self.field_stack.push(Field::new(
1165 field.name(),
1166 DataType::Null, field.is_nullable(),
1168 ));
1169 Ok(())
1170 }
1171
1172 fn after_field(&mut self, _field: &Field) -> Result<()> {
1173 Ok(())
1174 }
1175
1176 fn schema(&mut self, _schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U> {
1177 Ok(ArrowSchema::new(values))
1178 }
1179
1180 fn r#struct(&mut self, _fields: &Fields, results: Vec<Self::T>) -> Result<Self::T> {
1181 let field_info = self
1183 .field_stack
1184 .pop()
1185 .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Field stack underflow in struct"))?;
1186
1187 Ok(Field::new(
1189 field_info.name(),
1190 DataType::Struct(Fields::from(results)),
1191 field_info.is_nullable(),
1192 ))
1193 }
1194
1195 fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T> {
1196 let field_info = self
1198 .field_stack
1199 .pop()
1200 .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Field stack underflow in list"))?;
1201
1202 let list_type = match list {
1204 DataType::List(_) => DataType::List(Arc::new(value)),
1205 DataType::LargeList(_) => DataType::LargeList(Arc::new(value)),
1206 DataType::FixedSizeList(_, size) => DataType::FixedSizeList(Arc::new(value), *size),
1207 _ => {
1208 return Err(Error::new(
1209 ErrorKind::Unexpected,
1210 format!("Expected list type, got {list}"),
1211 ));
1212 }
1213 };
1214
1215 Ok(Field::new(
1216 field_info.name(),
1217 list_type,
1218 field_info.is_nullable(),
1219 ))
1220 }
1221
1222 fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result<Self::T> {
1223 let field_info = self
1225 .field_stack
1226 .pop()
1227 .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Field stack underflow in map"))?;
1228
1229 let struct_field = Field::new(
1231 DEFAULT_MAP_FIELD_NAME,
1232 DataType::Struct(Fields::from(vec![key_value, value])),
1233 false,
1234 );
1235
1236 let sorted = match map {
1238 DataType::Map(_, sorted) => *sorted,
1239 _ => {
1240 return Err(Error::new(
1241 ErrorKind::Unexpected,
1242 format!("Expected map type, got {map}"),
1243 ));
1244 }
1245 };
1246
1247 Ok(Field::new(
1249 field_info.name(),
1250 DataType::Map(Arc::new(struct_field), sorted),
1251 field_info.is_nullable(),
1252 ))
1253 }
1254
1255 fn primitive(&mut self, p: &DataType) -> Result<Self::T> {
1256 let field_info = self.field_stack.pop().ok_or_else(|| {
1258 Error::new(ErrorKind::Unexpected, "Field stack underflow in primitive")
1259 })?;
1260
1261 Ok(Field::new(
1263 field_info.name(),
1264 p.clone(),
1265 field_info.is_nullable(),
1266 ))
1267 }
1268}
1269
1270pub fn strip_metadata_from_schema(schema: &ArrowSchema) -> Result<ArrowSchema> {
1300 let mut visitor = MetadataStripVisitor::new();
1301 visit_schema(schema, &mut visitor)
1302}
1303
1304#[cfg(test)]
1305mod tests {
1306 use std::collections::HashMap;
1307 use std::sync::Arc;
1308
1309 use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1310
1311 use super::*;
1312 use crate::spec::decimal_utils::decimal_new;
1313 use crate::spec::{Literal, Schema};
1314
1315 fn simple_field(name: &str, ty: DataType, nullable: bool, value: &str) -> Field {
1317 Field::new(name, ty, nullable).with_metadata(HashMap::from([(
1318 PARQUET_FIELD_ID_META_KEY.to_string(),
1319 value.to_string(),
1320 )]))
1321 }
1322
1323 fn arrow_schema_for_arrow_schema_to_schema_test() -> ArrowSchema {
1324 let fields = Fields::from(vec![
1325 simple_field("key", DataType::Int32, false, "28"),
1326 simple_field("value", DataType::Utf8, true, "29"),
1327 ]);
1328
1329 let r#struct = DataType::Struct(fields);
1330 let map = DataType::Map(
1331 Arc::new(simple_field(DEFAULT_MAP_FIELD_NAME, r#struct, false, "17")),
1332 false,
1333 );
1334 let dictionary = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1335
1336 let fields = Fields::from(vec![
1337 simple_field("aa", DataType::Int32, false, "18"),
1338 simple_field("bb", DataType::Utf8, true, "19"),
1339 simple_field(
1340 "cc",
1341 DataType::Timestamp(TimeUnit::Microsecond, None),
1342 false,
1343 "20",
1344 ),
1345 ]);
1346
1347 let r#struct = DataType::Struct(fields);
1348
1349 ArrowSchema::new(vec![
1350 simple_field("a", DataType::Int32, false, "2"),
1351 simple_field("b", DataType::Int64, false, "1"),
1352 simple_field("c", DataType::Utf8, false, "3"),
1353 simple_field("n", DataType::Utf8, false, "21"),
1354 simple_field(
1355 "d",
1356 DataType::Timestamp(TimeUnit::Microsecond, None),
1357 true,
1358 "4",
1359 ),
1360 simple_field("e", DataType::Boolean, true, "6"),
1361 simple_field("f", DataType::Float32, false, "5"),
1362 simple_field("g", DataType::Float64, false, "7"),
1363 simple_field("p", DataType::Decimal128(10, 2), false, "27"),
1364 simple_field("h", DataType::Date32, false, "8"),
1365 simple_field("i", DataType::Time64(TimeUnit::Microsecond), false, "9"),
1366 simple_field(
1367 "j",
1368 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1369 false,
1370 "10",
1371 ),
1372 simple_field(
1373 "k",
1374 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1375 false,
1376 "12",
1377 ),
1378 simple_field("l", DataType::Binary, false, "13"),
1379 simple_field("o", DataType::LargeBinary, false, "22"),
1380 simple_field("m", DataType::FixedSizeBinary(10), false, "11"),
1381 simple_field(
1382 "list",
1383 DataType::List(Arc::new(simple_field(
1384 "element",
1385 DataType::Int32,
1386 false,
1387 "15",
1388 ))),
1389 true,
1390 "14",
1391 ),
1392 simple_field(
1393 "large_list",
1394 DataType::LargeList(Arc::new(simple_field(
1395 "element",
1396 DataType::Utf8,
1397 false,
1398 "23",
1399 ))),
1400 true,
1401 "24",
1402 ),
1403 simple_field(
1404 "fixed_list",
1405 DataType::FixedSizeList(
1406 Arc::new(simple_field("element", DataType::Binary, false, "26")),
1407 10,
1408 ),
1409 true,
1410 "25",
1411 ),
1412 simple_field("map", map, false, "16"),
1413 simple_field("struct", r#struct, false, "17"),
1414 simple_field("dictionary", dictionary, false, "30"),
1415 ])
1416 }
1417
1418 fn iceberg_schema_for_arrow_schema_to_schema_test() -> Schema {
1419 let schema_json = r#"{
1420 "type":"struct",
1421 "schema-id":0,
1422 "fields":[
1423 {
1424 "id":2,
1425 "name":"a",
1426 "required":true,
1427 "type":"int"
1428 },
1429 {
1430 "id":1,
1431 "name":"b",
1432 "required":true,
1433 "type":"long"
1434 },
1435 {
1436 "id":3,
1437 "name":"c",
1438 "required":true,
1439 "type":"string"
1440 },
1441 {
1442 "id":21,
1443 "name":"n",
1444 "required":true,
1445 "type":"string"
1446 },
1447 {
1448 "id":4,
1449 "name":"d",
1450 "required":false,
1451 "type":"timestamp"
1452 },
1453 {
1454 "id":6,
1455 "name":"e",
1456 "required":false,
1457 "type":"boolean"
1458 },
1459 {
1460 "id":5,
1461 "name":"f",
1462 "required":true,
1463 "type":"float"
1464 },
1465 {
1466 "id":7,
1467 "name":"g",
1468 "required":true,
1469 "type":"double"
1470 },
1471 {
1472 "id":27,
1473 "name":"p",
1474 "required":true,
1475 "type":"decimal(10,2)"
1476 },
1477 {
1478 "id":8,
1479 "name":"h",
1480 "required":true,
1481 "type":"date"
1482 },
1483 {
1484 "id":9,
1485 "name":"i",
1486 "required":true,
1487 "type":"time"
1488 },
1489 {
1490 "id":10,
1491 "name":"j",
1492 "required":true,
1493 "type":"timestamptz"
1494 },
1495 {
1496 "id":12,
1497 "name":"k",
1498 "required":true,
1499 "type":"timestamptz"
1500 },
1501 {
1502 "id":13,
1503 "name":"l",
1504 "required":true,
1505 "type":"binary"
1506 },
1507 {
1508 "id":22,
1509 "name":"o",
1510 "required":true,
1511 "type":"binary"
1512 },
1513 {
1514 "id":11,
1515 "name":"m",
1516 "required":true,
1517 "type":"fixed[10]"
1518 },
1519 {
1520 "id":14,
1521 "name":"list",
1522 "required": false,
1523 "type": {
1524 "type": "list",
1525 "element-id": 15,
1526 "element-required": true,
1527 "element": "int"
1528 }
1529 },
1530 {
1531 "id":24,
1532 "name":"large_list",
1533 "required": false,
1534 "type": {
1535 "type": "list",
1536 "element-id": 23,
1537 "element-required": true,
1538 "element": "string"
1539 }
1540 },
1541 {
1542 "id":25,
1543 "name":"fixed_list",
1544 "required": false,
1545 "type": {
1546 "type": "list",
1547 "element-id": 26,
1548 "element-required": true,
1549 "element": "binary"
1550 }
1551 },
1552 {
1553 "id":16,
1554 "name":"map",
1555 "required": true,
1556 "type": {
1557 "type": "map",
1558 "key-id": 28,
1559 "key": "int",
1560 "value-id": 29,
1561 "value-required": false,
1562 "value": "string"
1563 }
1564 },
1565 {
1566 "id":17,
1567 "name":"struct",
1568 "required": true,
1569 "type": {
1570 "type": "struct",
1571 "fields": [
1572 {
1573 "id":18,
1574 "name":"aa",
1575 "required":true,
1576 "type":"int"
1577 },
1578 {
1579 "id":19,
1580 "name":"bb",
1581 "required":false,
1582 "type":"string"
1583 },
1584 {
1585 "id":20,
1586 "name":"cc",
1587 "required":true,
1588 "type":"timestamp"
1589 }
1590 ]
1591 }
1592 },
1593 {
1594 "id":30,
1595 "name":"dictionary",
1596 "required":true,
1597 "type":"string"
1598 }
1599 ],
1600 "identifier-field-ids":[]
1601 }"#;
1602
1603 let schema: Schema = serde_json::from_str(schema_json).unwrap();
1604 schema
1605 }
1606
1607 #[test]
1608 fn test_arrow_schema_to_schema() {
1609 let arrow_schema = arrow_schema_for_arrow_schema_to_schema_test();
1610 let schema = iceberg_schema_for_arrow_schema_to_schema_test();
1611 let converted_schema = arrow_schema_to_schema(&arrow_schema).unwrap();
1612 pretty_assertions::assert_eq!(converted_schema, schema);
1613 }
1614
1615 fn arrow_schema_for_schema_to_arrow_schema_test() -> ArrowSchema {
1616 let fields = Fields::from(vec![
1617 simple_field("key", DataType::Int32, false, "28"),
1618 simple_field("value", DataType::Utf8, true, "29"),
1619 ]);
1620
1621 let r#struct = DataType::Struct(fields);
1622 let map = DataType::Map(
1623 Arc::new(Field::new(DEFAULT_MAP_FIELD_NAME, r#struct, false)),
1624 false,
1625 );
1626
1627 let fields = Fields::from(vec![
1628 simple_field("aa", DataType::Int32, false, "18"),
1629 simple_field("bb", DataType::Utf8, true, "19"),
1630 simple_field(
1631 "cc",
1632 DataType::Timestamp(TimeUnit::Microsecond, None),
1633 false,
1634 "20",
1635 ),
1636 ]);
1637
1638 let r#struct = DataType::Struct(fields);
1639
1640 ArrowSchema::new(vec![
1641 simple_field("a", DataType::Int32, false, "2"),
1642 simple_field("b", DataType::Int64, false, "1"),
1643 simple_field("c", DataType::Utf8, false, "3"),
1644 simple_field("n", DataType::Utf8, false, "21"),
1645 simple_field(
1646 "d",
1647 DataType::Timestamp(TimeUnit::Microsecond, None),
1648 true,
1649 "4",
1650 ),
1651 simple_field("e", DataType::Boolean, true, "6"),
1652 simple_field("f", DataType::Float32, false, "5"),
1653 simple_field("g", DataType::Float64, false, "7"),
1654 simple_field("p", DataType::Decimal128(10, 2), false, "27"),
1655 simple_field("h", DataType::Date32, false, "8"),
1656 simple_field("i", DataType::Time64(TimeUnit::Microsecond), false, "9"),
1657 simple_field(
1658 "j",
1659 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1660 false,
1661 "10",
1662 ),
1663 simple_field(
1664 "k",
1665 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1666 false,
1667 "12",
1668 ),
1669 simple_field("l", DataType::LargeBinary, false, "13"),
1670 simple_field("o", DataType::LargeBinary, false, "22"),
1671 simple_field("m", DataType::FixedSizeBinary(10), false, "11"),
1672 simple_field(
1673 "list",
1674 DataType::List(Arc::new(simple_field(
1675 "element",
1676 DataType::Int32,
1677 false,
1678 "15",
1679 ))),
1680 true,
1681 "14",
1682 ),
1683 simple_field(
1684 "large_list",
1685 DataType::List(Arc::new(simple_field(
1686 "element",
1687 DataType::Utf8,
1688 false,
1689 "23",
1690 ))),
1691 true,
1692 "24",
1693 ),
1694 simple_field(
1695 "fixed_list",
1696 DataType::List(Arc::new(simple_field(
1697 "element",
1698 DataType::LargeBinary,
1699 false,
1700 "26",
1701 ))),
1702 true,
1703 "25",
1704 ),
1705 simple_field("map", map, false, "16"),
1706 simple_field("struct", r#struct, false, "17"),
1707 simple_field("uuid", DataType::FixedSizeBinary(16), false, "30"),
1708 ])
1709 }
1710
1711 fn iceberg_schema_for_schema_to_arrow_schema() -> Schema {
1712 let schema_json = r#"{
1713 "type":"struct",
1714 "schema-id":0,
1715 "fields":[
1716 {
1717 "id":2,
1718 "name":"a",
1719 "required":true,
1720 "type":"int"
1721 },
1722 {
1723 "id":1,
1724 "name":"b",
1725 "required":true,
1726 "type":"long"
1727 },
1728 {
1729 "id":3,
1730 "name":"c",
1731 "required":true,
1732 "type":"string"
1733 },
1734 {
1735 "id":21,
1736 "name":"n",
1737 "required":true,
1738 "type":"string"
1739 },
1740 {
1741 "id":4,
1742 "name":"d",
1743 "required":false,
1744 "type":"timestamp"
1745 },
1746 {
1747 "id":6,
1748 "name":"e",
1749 "required":false,
1750 "type":"boolean"
1751 },
1752 {
1753 "id":5,
1754 "name":"f",
1755 "required":true,
1756 "type":"float"
1757 },
1758 {
1759 "id":7,
1760 "name":"g",
1761 "required":true,
1762 "type":"double"
1763 },
1764 {
1765 "id":27,
1766 "name":"p",
1767 "required":true,
1768 "type":"decimal(10,2)"
1769 },
1770 {
1771 "id":8,
1772 "name":"h",
1773 "required":true,
1774 "type":"date"
1775 },
1776 {
1777 "id":9,
1778 "name":"i",
1779 "required":true,
1780 "type":"time"
1781 },
1782 {
1783 "id":10,
1784 "name":"j",
1785 "required":true,
1786 "type":"timestamptz"
1787 },
1788 {
1789 "id":12,
1790 "name":"k",
1791 "required":true,
1792 "type":"timestamptz"
1793 },
1794 {
1795 "id":13,
1796 "name":"l",
1797 "required":true,
1798 "type":"binary"
1799 },
1800 {
1801 "id":22,
1802 "name":"o",
1803 "required":true,
1804 "type":"binary"
1805 },
1806 {
1807 "id":11,
1808 "name":"m",
1809 "required":true,
1810 "type":"fixed[10]"
1811 },
1812 {
1813 "id":14,
1814 "name":"list",
1815 "required": false,
1816 "type": {
1817 "type": "list",
1818 "element-id": 15,
1819 "element-required": true,
1820 "element": "int"
1821 }
1822 },
1823 {
1824 "id":24,
1825 "name":"large_list",
1826 "required": false,
1827 "type": {
1828 "type": "list",
1829 "element-id": 23,
1830 "element-required": true,
1831 "element": "string"
1832 }
1833 },
1834 {
1835 "id":25,
1836 "name":"fixed_list",
1837 "required": false,
1838 "type": {
1839 "type": "list",
1840 "element-id": 26,
1841 "element-required": true,
1842 "element": "binary"
1843 }
1844 },
1845 {
1846 "id":16,
1847 "name":"map",
1848 "required": true,
1849 "type": {
1850 "type": "map",
1851 "key-id": 28,
1852 "key": "int",
1853 "value-id": 29,
1854 "value-required": false,
1855 "value": "string"
1856 }
1857 },
1858 {
1859 "id":17,
1860 "name":"struct",
1861 "required": true,
1862 "type": {
1863 "type": "struct",
1864 "fields": [
1865 {
1866 "id":18,
1867 "name":"aa",
1868 "required":true,
1869 "type":"int"
1870 },
1871 {
1872 "id":19,
1873 "name":"bb",
1874 "required":false,
1875 "type":"string"
1876 },
1877 {
1878 "id":20,
1879 "name":"cc",
1880 "required":true,
1881 "type":"timestamp"
1882 }
1883 ]
1884 }
1885 },
1886 {
1887 "id":30,
1888 "name":"uuid",
1889 "required":true,
1890 "type":"uuid"
1891 }
1892 ],
1893 "identifier-field-ids":[]
1894 }"#;
1895
1896 let schema: Schema = serde_json::from_str(schema_json).unwrap();
1897 schema
1898 }
1899
1900 #[test]
1901 fn test_schema_to_arrow_schema() {
1902 let arrow_schema = arrow_schema_for_schema_to_arrow_schema_test();
1903 let schema = iceberg_schema_for_schema_to_arrow_schema();
1904 let converted_arrow_schema = schema_to_arrow_schema(&schema).unwrap();
1905 assert_eq!(converted_arrow_schema, arrow_schema);
1906 }
1907
1908 #[test]
1909 fn test_type_conversion() {
1910 {
1912 let arrow_type = DataType::Int32;
1913 let iceberg_type = Type::Primitive(PrimitiveType::Int);
1914 assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1915 assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1916 }
1917
1918 {
1920 let arrow_type = DataType::Struct(Fields::from(vec![
1922 Field::new("a", DataType::Int64, false),
1923 Field::new("b", DataType::Utf8, true),
1924 ]));
1925 assert_eq!(
1926 &arrow_type_to_type(&arrow_type).unwrap_err().to_string(),
1927 "DataInvalid => Field id not found in metadata"
1928 );
1929
1930 let arrow_type = DataType::Struct(Fields::from(vec![
1931 Field::new("a", DataType::Int64, false).with_metadata(HashMap::from_iter([(
1932 PARQUET_FIELD_ID_META_KEY.to_string(),
1933 1.to_string(),
1934 )])),
1935 Field::new("b", DataType::Utf8, true).with_metadata(HashMap::from_iter([(
1936 PARQUET_FIELD_ID_META_KEY.to_string(),
1937 2.to_string(),
1938 )])),
1939 ]));
1940 let iceberg_type = Type::Struct(StructType::new(vec![
1941 NestedField {
1942 id: 1,
1943 doc: None,
1944 name: "a".to_string(),
1945 required: true,
1946 field_type: Box::new(Type::Primitive(PrimitiveType::Long)),
1947 initial_default: None,
1948 write_default: None,
1949 }
1950 .into(),
1951 NestedField {
1952 id: 2,
1953 doc: None,
1954 name: "b".to_string(),
1955 required: false,
1956 field_type: Box::new(Type::Primitive(PrimitiveType::String)),
1957 initial_default: None,
1958 write_default: None,
1959 }
1960 .into(),
1961 ]));
1962 assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1963 assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1964
1965 let iceberg_type = Type::Struct(StructType::new(vec![
1967 NestedField {
1968 id: 1,
1969 doc: None,
1970 name: "a".to_string(),
1971 required: true,
1972 field_type: Box::new(Type::Primitive(PrimitiveType::Long)),
1973 initial_default: Some(Literal::Primitive(PrimitiveLiteral::Int(114514))),
1974 write_default: None,
1975 }
1976 .into(),
1977 NestedField {
1978 id: 2,
1979 doc: None,
1980 name: "b".to_string(),
1981 required: false,
1982 field_type: Box::new(Type::Primitive(PrimitiveType::String)),
1983 initial_default: None,
1984 write_default: Some(Literal::Primitive(PrimitiveLiteral::String(
1985 "514".to_string(),
1986 ))),
1987 }
1988 .into(),
1989 ]));
1990 assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1991 }
1992
1993 {
1995 let arrow_type =
1996 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int8));
1997 let iceberg_type = Type::Primitive(PrimitiveType::Int);
1998 assert_eq!(
1999 iceberg_type,
2000 arrow_type_to_type(&arrow_type).unwrap(),
2001 "Expected dictionary conversion to use the contained value"
2002 );
2003
2004 let arrow_type =
2005 DataType::Dictionary(Box::new(DataType::Utf8), Box::new(DataType::Boolean));
2006 let iceberg_type = Type::Primitive(PrimitiveType::Boolean);
2007 assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
2008 }
2009 }
2010
2011 #[test]
2012 fn test_unsigned_integer_type_conversion() {
2013 let test_cases = vec![
2014 (DataType::UInt8, PrimitiveType::Int),
2015 (DataType::UInt16, PrimitiveType::Int),
2016 (DataType::UInt32, PrimitiveType::Long),
2017 ];
2018
2019 for (arrow_type, expected_iceberg_type) in test_cases {
2020 let arrow_field = Field::new("test", arrow_type.clone(), false).with_metadata(
2021 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
2022 );
2023 let arrow_schema = ArrowSchema::new(vec![arrow_field]);
2024
2025 let iceberg_schema = arrow_schema_to_schema(&arrow_schema).unwrap();
2026 let iceberg_field = iceberg_schema.as_struct().fields().first().unwrap();
2027
2028 assert!(
2029 matches!(iceberg_field.field_type.as_ref(), Type::Primitive(t) if *t == expected_iceberg_type),
2030 "Expected {arrow_type:?} to map to {expected_iceberg_type:?}"
2031 );
2032 }
2033
2034 {
2036 let arrow_field = Field::new("test", DataType::UInt64, false).with_metadata(
2037 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
2038 );
2039 let arrow_schema = ArrowSchema::new(vec![arrow_field]);
2040
2041 let result = arrow_schema_to_schema(&arrow_schema);
2042 assert!(result.is_err());
2043 assert!(
2044 result
2045 .unwrap_err()
2046 .to_string()
2047 .contains("UInt64 is not supported")
2048 );
2049 }
2050 }
2051
2052 #[test]
2053 fn test_datum_conversion() {
2054 {
2055 let datum = Datum::bool(true);
2056 let arrow_datum = get_arrow_datum(&datum).unwrap();
2057 let (array, is_scalar) = arrow_datum.get();
2058 let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
2059 assert!(is_scalar);
2060 assert!(array.value(0));
2061 }
2062 {
2063 let datum = Datum::int(42);
2064 let arrow_datum = get_arrow_datum(&datum).unwrap();
2065 let (array, is_scalar) = arrow_datum.get();
2066 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
2067 assert!(is_scalar);
2068 assert_eq!(array.value(0), 42);
2069 }
2070 {
2071 let datum = Datum::long(42);
2072 let arrow_datum = get_arrow_datum(&datum).unwrap();
2073 let (array, is_scalar) = arrow_datum.get();
2074 let array = array.as_any().downcast_ref::<Int64Array>().unwrap();
2075 assert!(is_scalar);
2076 assert_eq!(array.value(0), 42);
2077 }
2078 {
2079 let datum = Datum::float(42.42);
2080 let arrow_datum = get_arrow_datum(&datum).unwrap();
2081 let (array, is_scalar) = arrow_datum.get();
2082 let array = array.as_any().downcast_ref::<Float32Array>().unwrap();
2083 assert!(is_scalar);
2084 assert_eq!(array.value(0), 42.42);
2085 }
2086 {
2087 let datum = Datum::double(42.42);
2088 let arrow_datum = get_arrow_datum(&datum).unwrap();
2089 let (array, is_scalar) = arrow_datum.get();
2090 let array = array.as_any().downcast_ref::<Float64Array>().unwrap();
2091 assert!(is_scalar);
2092 assert_eq!(array.value(0), 42.42);
2093 }
2094 {
2095 let datum = Datum::string("abc");
2096 let arrow_datum = get_arrow_datum(&datum).unwrap();
2097 let (array, is_scalar) = arrow_datum.get();
2098 let array = array.as_any().downcast_ref::<StringArray>().unwrap();
2099 assert!(is_scalar);
2100 assert_eq!(array.value(0), "abc");
2101 }
2102 {
2103 let datum = Datum::binary(vec![1, 2, 3, 4]);
2104 let arrow_datum = get_arrow_datum(&datum).unwrap();
2105 let (array, is_scalar) = arrow_datum.get();
2106 let array = array.as_any().downcast_ref::<BinaryArray>().unwrap();
2107 assert!(is_scalar);
2108 assert_eq!(array.value(0), &[1, 2, 3, 4]);
2109 }
2110 {
2111 let datum = Datum::date(42);
2112 let arrow_datum = get_arrow_datum(&datum).unwrap();
2113 let (array, is_scalar) = arrow_datum.get();
2114 let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
2115 assert!(is_scalar);
2116 assert_eq!(array.value(0), 42);
2117 }
2118 {
2119 let datum = Datum::timestamp_micros(42);
2120 let arrow_datum = get_arrow_datum(&datum).unwrap();
2121 let (array, is_scalar) = arrow_datum.get();
2122 let array = array
2123 .as_any()
2124 .downcast_ref::<TimestampMicrosecondArray>()
2125 .unwrap();
2126 assert!(is_scalar);
2127 assert_eq!(array.value(0), 42);
2128 }
2129 {
2130 let datum = Datum::timestamptz_micros(42);
2131 let arrow_datum = get_arrow_datum(&datum).unwrap();
2132 let (array, is_scalar) = arrow_datum.get();
2133 let array = array
2134 .as_any()
2135 .downcast_ref::<TimestampMicrosecondArray>()
2136 .unwrap();
2137 assert!(is_scalar);
2138 assert_eq!(array.timezone(), Some("+00:00"));
2139 assert_eq!(array.value(0), 42);
2140 }
2141 {
2142 let datum = Datum::decimal_with_precision(decimal_new(123, 2), 30).unwrap();
2143 let arrow_datum = get_arrow_datum(&datum).unwrap();
2144 let (array, is_scalar) = arrow_datum.get();
2145 let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
2146 assert!(is_scalar);
2147 assert_eq!(array.precision(), 30);
2148 assert_eq!(array.scale(), 2);
2149 assert_eq!(array.value(0), 123);
2150 }
2151 {
2152 let datum = Datum::uuid_from_str("42424242-4242-4242-4242-424242424242").unwrap();
2153 let arrow_datum = get_arrow_datum(&datum).unwrap();
2154 let (array, is_scalar) = arrow_datum.get();
2155 let array = array
2156 .as_any()
2157 .downcast_ref::<FixedSizeBinaryArray>()
2158 .unwrap();
2159 assert!(is_scalar);
2160 assert_eq!(array.value(0), [66u8; 16]);
2161 }
2162 {
2163 let datum = Datum::fixed(vec![1u8, 2, 3, 4, 5, 6, 7, 8]);
2164 let arrow_datum = get_arrow_datum(&datum).unwrap();
2165 let (array, is_scalar) = arrow_datum.get();
2166 let array = array
2167 .as_any()
2168 .downcast_ref::<FixedSizeBinaryArray>()
2169 .unwrap();
2170 assert!(is_scalar);
2171 assert_eq!(array.value_length(), 8);
2172 assert_eq!(array.value(0), &[1u8, 2, 3, 4, 5, 6, 7, 8]);
2173 }
2174 }
2175
2176 #[test]
2177 fn test_arrow_schema_to_schema_with_field_id() {
2178 let arrow_schema = ArrowSchema::new(vec![
2181 Field::new("id", DataType::Int64, false),
2182 Field::new("name", DataType::Utf8, true),
2183 Field::new("price", DataType::Decimal128(10, 2), false),
2184 Field::new(
2185 "created_at",
2186 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2187 true,
2188 ),
2189 Field::new(
2190 "tags",
2191 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2192 true,
2193 ),
2194 Field::new(
2195 "address",
2196 DataType::Struct(Fields::from(vec![
2197 Field::new("street", DataType::Utf8, true),
2198 Field::new("city", DataType::Utf8, false),
2199 Field::new("zip", DataType::Int32, true),
2200 ])),
2201 true,
2202 ),
2203 Field::new(
2204 "attributes",
2205 DataType::Map(
2206 Arc::new(Field::new(
2207 DEFAULT_MAP_FIELD_NAME,
2208 DataType::Struct(Fields::from(vec![
2209 Field::new("key", DataType::Utf8, false),
2210 Field::new("value", DataType::Utf8, true),
2211 ])),
2212 false,
2213 )),
2214 false,
2215 ),
2216 true,
2217 ),
2218 Field::new(
2219 "orders",
2220 DataType::List(Arc::new(Field::new(
2221 "element",
2222 DataType::Struct(Fields::from(vec![
2223 Field::new("order_id", DataType::Int64, false),
2224 Field::new("amount", DataType::Float64, false),
2225 ])),
2226 true,
2227 ))),
2228 true,
2229 ),
2230 ]);
2231
2232 let schema = arrow_schema_to_schema_auto_assign_ids(&arrow_schema).unwrap();
2233
2234 let expected = Schema::builder()
2239 .with_fields(vec![
2240 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
2241 NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
2242 NestedField::required(
2243 3,
2244 "price",
2245 Type::Primitive(PrimitiveType::Decimal {
2246 precision: 10,
2247 scale: 2,
2248 }),
2249 )
2250 .into(),
2251 NestedField::optional(4, "created_at", Type::Primitive(PrimitiveType::Timestamptz))
2252 .into(),
2253 NestedField::optional(
2254 5,
2255 "tags",
2256 Type::List(ListType {
2257 element_field: NestedField::list_element(
2258 9,
2259 Type::Primitive(PrimitiveType::String),
2260 false,
2261 )
2262 .into(),
2263 }),
2264 )
2265 .into(),
2266 NestedField::optional(
2267 6,
2268 "address",
2269 Type::Struct(StructType::new(vec![
2270 NestedField::optional(10, "street", Type::Primitive(PrimitiveType::String))
2271 .into(),
2272 NestedField::required(11, "city", Type::Primitive(PrimitiveType::String))
2273 .into(),
2274 NestedField::optional(12, "zip", Type::Primitive(PrimitiveType::Int))
2275 .into(),
2276 ])),
2277 )
2278 .into(),
2279 NestedField::optional(
2280 7,
2281 "attributes",
2282 Type::Map(MapType {
2283 key_field: NestedField::map_key_element(
2284 13,
2285 Type::Primitive(PrimitiveType::String),
2286 )
2287 .into(),
2288 value_field: NestedField::map_value_element(
2289 14,
2290 Type::Primitive(PrimitiveType::String),
2291 false,
2292 )
2293 .into(),
2294 }),
2295 )
2296 .into(),
2297 NestedField::optional(
2298 8,
2299 "orders",
2300 Type::List(ListType {
2301 element_field: NestedField::list_element(
2302 15,
2303 Type::Struct(StructType::new(vec![
2304 NestedField::required(
2305 16,
2306 "order_id",
2307 Type::Primitive(PrimitiveType::Long),
2308 )
2309 .into(),
2310 NestedField::required(
2311 17,
2312 "amount",
2313 Type::Primitive(PrimitiveType::Double),
2314 )
2315 .into(),
2316 ])),
2317 false,
2318 )
2319 .into(),
2320 }),
2321 )
2322 .into(),
2323 ])
2324 .build()
2325 .unwrap();
2326
2327 pretty_assertions::assert_eq!(schema, expected);
2328 assert_eq!(schema.highest_field_id(), 17);
2329 }
2330}