1use std::collections::HashMap;
21use std::sync::Arc;
22
23use arrow_array::types::{Decimal128Type, validate_decimal_precision_and_scale};
24use arrow_array::{
25 BinaryArray, BooleanArray, Date32Array, Datum as ArrowDatum, Decimal128Array,
26 FixedSizeBinaryArray, Float32Array, Float64Array, Int32Array, Int64Array, Scalar, StringArray,
27 TimestampMicrosecondArray, TimestampNanosecondArray,
28};
29use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
30use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
31use parquet::file::statistics::Statistics;
32use uuid::Uuid;
33
34use crate::error::Result;
35use crate::spec::decimal_utils::i128_from_be_bytes;
36use crate::spec::{
37 Datum, FIRST_FIELD_ID, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral,
38 PrimitiveType, Schema, SchemaVisitor, StructType, Type,
39};
40use crate::{Error, ErrorKind};
41
42pub const DEFAULT_MAP_FIELD_NAME: &str = "key_value";
44pub const UTC_TIME_ZONE: &str = "+00:00";
46
47pub trait ArrowSchemaVisitor {
51 type T;
53
54 type U;
56
57 fn before_field(&mut self, _field: &Field) -> Result<()> {
59 Ok(())
60 }
61
62 fn after_field(&mut self, _field: &Field) -> Result<()> {
64 Ok(())
65 }
66
67 fn before_list_element(&mut self, _field: &Field) -> Result<()> {
69 Ok(())
70 }
71
72 fn after_list_element(&mut self, _field: &Field) -> Result<()> {
74 Ok(())
75 }
76
77 fn before_map_key(&mut self, _field: &Field) -> Result<()> {
79 Ok(())
80 }
81
82 fn after_map_key(&mut self, _field: &Field) -> Result<()> {
84 Ok(())
85 }
86
87 fn before_map_value(&mut self, _field: &Field) -> Result<()> {
89 Ok(())
90 }
91
92 fn after_map_value(&mut self, _field: &Field) -> Result<()> {
94 Ok(())
95 }
96
97 fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U>;
99
100 fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> Result<Self::T>;
102
103 fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T>;
105
106 fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result<Self::T>;
108
109 fn primitive(&mut self, p: &DataType) -> Result<Self::T>;
111}
112
113fn visit_type<V: ArrowSchemaVisitor>(r#type: &DataType, visitor: &mut V) -> Result<V::T> {
115 match r#type {
116 p if p.is_primitive()
117 || matches!(
118 p,
119 DataType::Boolean
120 | DataType::Utf8
121 | DataType::LargeUtf8
122 | DataType::Utf8View
123 | DataType::Binary
124 | DataType::LargeBinary
125 | DataType::BinaryView
126 | DataType::FixedSizeBinary(_)
127 ) =>
128 {
129 visitor.primitive(p)
130 }
131 DataType::List(element_field) => visit_list(r#type, element_field, visitor),
132 DataType::LargeList(element_field) => visit_list(r#type, element_field, visitor),
133 DataType::FixedSizeList(element_field, _) => visit_list(r#type, element_field, visitor),
134 DataType::Map(field, _) => match field.data_type() {
135 DataType::Struct(fields) => {
136 if fields.len() != 2 {
137 return Err(Error::new(
138 ErrorKind::DataInvalid,
139 "Map field must have exactly 2 fields",
140 ));
141 }
142
143 let key_field = &fields[0];
144 let value_field = &fields[1];
145
146 let key_result = {
147 visitor.before_map_key(key_field)?;
148 let ret = visit_type(key_field.data_type(), visitor)?;
149 visitor.after_map_key(key_field)?;
150 ret
151 };
152
153 let value_result = {
154 visitor.before_map_value(value_field)?;
155 let ret = visit_type(value_field.data_type(), visitor)?;
156 visitor.after_map_value(value_field)?;
157 ret
158 };
159
160 visitor.map(r#type, key_result, value_result)
161 }
162 _ => Err(Error::new(
163 ErrorKind::DataInvalid,
164 "Map field must have struct type",
165 )),
166 },
167 DataType::Struct(fields) => visit_struct(fields, visitor),
168 DataType::Dictionary(_key_type, value_type) => visit_type(value_type, visitor),
169 other => Err(Error::new(
170 ErrorKind::DataInvalid,
171 format!("Cannot visit Arrow data type: {other}"),
172 )),
173 }
174}
175
176fn visit_list<V: ArrowSchemaVisitor>(
178 data_type: &DataType,
179 element_field: &Field,
180 visitor: &mut V,
181) -> Result<V::T> {
182 visitor.before_list_element(element_field)?;
183 let value = visit_type(element_field.data_type(), visitor)?;
184 visitor.after_list_element(element_field)?;
185 visitor.list(data_type, value)
186}
187
188fn visit_struct<V: ArrowSchemaVisitor>(fields: &Fields, visitor: &mut V) -> Result<V::T> {
190 let mut results = Vec::with_capacity(fields.len());
191 for field in fields {
192 visitor.before_field(field)?;
193 let result = visit_type(field.data_type(), visitor)?;
194 visitor.after_field(field)?;
195 results.push(result);
196 }
197
198 visitor.r#struct(fields, results)
199}
200
201fn visit_schema<V: ArrowSchemaVisitor>(schema: &ArrowSchema, visitor: &mut V) -> Result<V::U> {
203 let mut results = Vec::with_capacity(schema.fields().len());
204 for field in schema.fields() {
205 visitor.before_field(field)?;
206 let result = visit_type(field.data_type(), visitor)?;
207 visitor.after_field(field)?;
208 results.push(result);
209 }
210 visitor.schema(schema, results)
211}
212
213pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result<Schema> {
219 let mut visitor = ArrowSchemaConverter::new();
220 visit_schema(schema, &mut visitor)
221}
222
223pub fn arrow_schema_to_schema_auto_assign_ids(schema: &ArrowSchema) -> Result<Schema> {
232 let mut visitor = ArrowSchemaConverter::new_with_field_ids_from(FIRST_FIELD_ID);
233 visit_schema(schema, &mut visitor)
234}
235
236pub fn arrow_type_to_type(ty: &DataType) -> Result<Type> {
238 let mut visitor = ArrowSchemaConverter::new();
239 visit_type(ty, &mut visitor)
240}
241
242const ARROW_FIELD_DOC_KEY: &str = "doc";
243
244pub(super) fn get_field_id_from_metadata(field: &Field) -> Result<i32> {
245 if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
246 return value.parse::<i32>().map_err(|e| {
247 Error::new(
248 ErrorKind::DataInvalid,
249 "Failed to parse field id".to_string(),
250 )
251 .with_context("value", value)
252 .with_source(e)
253 });
254 }
255 Err(Error::new(
256 ErrorKind::DataInvalid,
257 "Field id not found in metadata",
258 ))
259}
260
261fn get_field_doc(field: &Field) -> Option<String> {
262 if let Some(value) = field.metadata().get(ARROW_FIELD_DOC_KEY) {
263 return Some(value.clone());
264 }
265 None
266}
267
268struct ArrowSchemaConverter {
269 reassign_field_ids_from: Option<i32>,
272 next_field_id: i32,
276}
277
278impl ArrowSchemaConverter {
279 fn new() -> Self {
280 Self {
281 reassign_field_ids_from: None,
282 next_field_id: 0,
283 }
284 }
285
286 fn new_with_field_ids_from(start_from: i32) -> Self {
287 Self {
288 reassign_field_ids_from: Some(start_from),
289 next_field_id: 0,
290 }
291 }
292
293 fn get_field_id(&mut self, field: &Field) -> Result<i32> {
294 if self.reassign_field_ids_from.is_some() {
295 let temp_id = self.next_field_id;
299 self.next_field_id += 1;
300 Ok(temp_id)
301 } else {
302 get_field_id_from_metadata(field)
304 }
305 }
306
307 fn convert_fields(
308 &mut self,
309 fields: &Fields,
310 field_results: &[Type],
311 ) -> Result<Vec<NestedFieldRef>> {
312 let mut results = Vec::with_capacity(fields.len());
313 for i in 0..fields.len() {
314 let field = &fields[i];
315 let field_type = &field_results[i];
316 let id = self.get_field_id(field)?;
317 let doc = get_field_doc(field);
318 let nested_field = NestedField {
319 id,
320 doc,
321 name: field.name().clone(),
322 required: !field.is_nullable(),
323 field_type: Box::new(field_type.clone()),
324 initial_default: None,
325 write_default: None,
326 };
327 results.push(Arc::new(nested_field));
328 }
329 Ok(results)
330 }
331}
332
333impl ArrowSchemaVisitor for ArrowSchemaConverter {
334 type T = Type;
335 type U = Schema;
336
337 fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U> {
338 let fields = self.convert_fields(schema.fields(), &values)?;
339 let mut builder = Schema::builder().with_fields(fields);
340 if let Some(start_from) = self.reassign_field_ids_from {
341 builder = builder.with_reassigned_field_ids(start_from)
342 }
343 builder.build()
344 }
345
346 fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> Result<Self::T> {
347 let fields = self.convert_fields(fields, &results)?;
348 Ok(Type::Struct(StructType::new(fields)))
349 }
350
351 fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T> {
352 let element_field = match list {
353 DataType::List(element_field) => element_field,
354 DataType::LargeList(element_field) => element_field,
355 DataType::FixedSizeList(element_field, _) => element_field,
356 _ => {
357 return Err(Error::new(
358 ErrorKind::DataInvalid,
359 "List type must have list data type",
360 ));
361 }
362 };
363
364 let id = self.get_field_id(element_field)?;
365 let doc = get_field_doc(element_field);
366 let mut element_field =
367 NestedField::list_element(id, value.clone(), !element_field.is_nullable());
368 if let Some(doc) = doc {
369 element_field = element_field.with_doc(doc);
370 }
371 let element_field = Arc::new(element_field);
372 Ok(Type::List(ListType { element_field }))
373 }
374
375 fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result<Self::T> {
376 match map {
377 DataType::Map(field, _) => match field.data_type() {
378 DataType::Struct(fields) => {
379 if fields.len() != 2 {
380 return Err(Error::new(
381 ErrorKind::DataInvalid,
382 "Map field must have exactly 2 fields",
383 ));
384 }
385
386 let key_field = &fields[0];
387 let value_field = &fields[1];
388
389 let key_id = self.get_field_id(key_field)?;
390 let key_doc = get_field_doc(key_field);
391 let mut key_field = NestedField::map_key_element(key_id, key_value.clone());
392 if let Some(doc) = key_doc {
393 key_field = key_field.with_doc(doc);
394 }
395 let key_field = Arc::new(key_field);
396
397 let value_id = self.get_field_id(value_field)?;
398 let value_doc = get_field_doc(value_field);
399 let mut value_field = NestedField::map_value_element(
400 value_id,
401 value.clone(),
402 !value_field.is_nullable(),
403 );
404 if let Some(doc) = value_doc {
405 value_field = value_field.with_doc(doc);
406 }
407 let value_field = Arc::new(value_field);
408
409 Ok(Type::Map(MapType {
410 key_field,
411 value_field,
412 }))
413 }
414 _ => Err(Error::new(
415 ErrorKind::DataInvalid,
416 "Map field must have struct type",
417 )),
418 },
419 _ => Err(Error::new(
420 ErrorKind::DataInvalid,
421 "Map type must have map data type",
422 )),
423 }
424 }
425
426 fn primitive(&mut self, p: &DataType) -> Result<Self::T> {
427 match p {
428 DataType::Boolean => Ok(Type::Primitive(PrimitiveType::Boolean)),
429 DataType::Int8 | DataType::Int16 | DataType::Int32 => {
430 Ok(Type::Primitive(PrimitiveType::Int))
431 }
432 DataType::UInt8 | DataType::UInt16 => Ok(Type::Primitive(PrimitiveType::Int)),
433 DataType::UInt32 => Ok(Type::Primitive(PrimitiveType::Long)),
434 DataType::Int64 => Ok(Type::Primitive(PrimitiveType::Long)),
435 DataType::UInt64 => {
436 Err(Error::new(
438 ErrorKind::DataInvalid,
439 "UInt64 is not supported. Use Int64 for values ≤ 9,223,372,036,854,775,807 or Decimal(20,0) for full uint64 range.",
440 ))
441 }
442 DataType::Float32 => Ok(Type::Primitive(PrimitiveType::Float)),
443 DataType::Float64 => Ok(Type::Primitive(PrimitiveType::Double)),
444 DataType::Decimal128(p, s) => Type::decimal(*p as u32, *s as u32).map_err(|e| {
445 Error::new(
446 ErrorKind::DataInvalid,
447 "Failed to create decimal type".to_string(),
448 )
449 .with_source(e)
450 }),
451 DataType::Date32 => Ok(Type::Primitive(PrimitiveType::Date)),
452 DataType::Time64(unit) if unit == &TimeUnit::Microsecond => {
453 Ok(Type::Primitive(PrimitiveType::Time))
454 }
455 DataType::Timestamp(unit, None) if unit == &TimeUnit::Microsecond => {
456 Ok(Type::Primitive(PrimitiveType::Timestamp))
457 }
458 DataType::Timestamp(unit, None) if unit == &TimeUnit::Nanosecond => {
459 Ok(Type::Primitive(PrimitiveType::TimestampNs))
460 }
461 DataType::Timestamp(unit, Some(zone))
462 if unit == &TimeUnit::Microsecond
463 && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") =>
464 {
465 Ok(Type::Primitive(PrimitiveType::Timestamptz))
466 }
467 DataType::Timestamp(unit, Some(zone))
468 if unit == &TimeUnit::Nanosecond
469 && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") =>
470 {
471 Ok(Type::Primitive(PrimitiveType::TimestamptzNs))
472 }
473 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
474 Ok(Type::Primitive(PrimitiveType::Binary))
475 }
476 DataType::FixedSizeBinary(width) => {
477 Ok(Type::Primitive(PrimitiveType::Fixed(*width as u64)))
478 }
479 DataType::Utf8View | DataType::Utf8 | DataType::LargeUtf8 => {
480 Ok(Type::Primitive(PrimitiveType::String))
481 }
482 _ => Err(Error::new(
483 ErrorKind::DataInvalid,
484 format!("Unsupported Arrow data type: {p}"),
485 )),
486 }
487 }
488}
489
490struct ToArrowSchemaConverter;
491
492enum ArrowSchemaOrFieldOrType {
493 Schema(ArrowSchema),
494 Field(Field),
495 Type(DataType),
496}
497
498impl SchemaVisitor for ToArrowSchemaConverter {
499 type T = ArrowSchemaOrFieldOrType;
500
501 fn schema(
502 &mut self,
503 _schema: &crate::spec::Schema,
504 value: ArrowSchemaOrFieldOrType,
505 ) -> crate::Result<ArrowSchemaOrFieldOrType> {
506 let struct_type = match value {
507 ArrowSchemaOrFieldOrType::Type(DataType::Struct(fields)) => fields,
508 _ => unreachable!(),
509 };
510 Ok(ArrowSchemaOrFieldOrType::Schema(ArrowSchema::new(
511 struct_type,
512 )))
513 }
514
515 fn field(
516 &mut self,
517 field: &crate::spec::NestedFieldRef,
518 value: ArrowSchemaOrFieldOrType,
519 ) -> crate::Result<ArrowSchemaOrFieldOrType> {
520 let ty = match value {
521 ArrowSchemaOrFieldOrType::Type(ty) => ty,
522 _ => unreachable!(),
523 };
524 let metadata = if let Some(doc) = &field.doc {
525 HashMap::from([
526 (PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string()),
527 (ARROW_FIELD_DOC_KEY.to_string(), doc.clone()),
528 ])
529 } else {
530 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string())])
531 };
532 Ok(ArrowSchemaOrFieldOrType::Field(
533 Field::new(field.name.clone(), ty, !field.required).with_metadata(metadata),
534 ))
535 }
536
537 fn r#struct(
538 &mut self,
539 _: &crate::spec::StructType,
540 results: Vec<ArrowSchemaOrFieldOrType>,
541 ) -> crate::Result<ArrowSchemaOrFieldOrType> {
542 let fields = results
543 .into_iter()
544 .map(|result| match result {
545 ArrowSchemaOrFieldOrType::Field(field) => field,
546 _ => unreachable!(),
547 })
548 .collect();
549 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Struct(fields)))
550 }
551
552 fn list(
553 &mut self,
554 list: &crate::spec::ListType,
555 value: ArrowSchemaOrFieldOrType,
556 ) -> crate::Result<Self::T> {
557 let field = match self.field(&list.element_field, value)? {
558 ArrowSchemaOrFieldOrType::Field(field) => field,
559 _ => unreachable!(),
560 };
561 let meta = if let Some(doc) = &list.element_field.doc {
562 HashMap::from([
563 (
564 PARQUET_FIELD_ID_META_KEY.to_string(),
565 list.element_field.id.to_string(),
566 ),
567 (ARROW_FIELD_DOC_KEY.to_string(), doc.clone()),
568 ])
569 } else {
570 HashMap::from([(
571 PARQUET_FIELD_ID_META_KEY.to_string(),
572 list.element_field.id.to_string(),
573 )])
574 };
575 let field = field.with_metadata(meta);
576 Ok(ArrowSchemaOrFieldOrType::Type(DataType::List(Arc::new(
577 field,
578 ))))
579 }
580
581 fn map(
582 &mut self,
583 map: &crate::spec::MapType,
584 key_value: ArrowSchemaOrFieldOrType,
585 value: ArrowSchemaOrFieldOrType,
586 ) -> crate::Result<ArrowSchemaOrFieldOrType> {
587 let key_field = match self.field(&map.key_field, key_value)? {
588 ArrowSchemaOrFieldOrType::Field(field) => field,
589 _ => unreachable!(),
590 };
591 let value_field = match self.field(&map.value_field, value)? {
592 ArrowSchemaOrFieldOrType::Field(field) => field,
593 _ => unreachable!(),
594 };
595 let field = Field::new(
596 DEFAULT_MAP_FIELD_NAME,
597 DataType::Struct(vec![key_field, value_field].into()),
598 false,
600 );
601
602 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Map(
603 field.into(),
604 false,
605 )))
606 }
607
608 fn primitive(
609 &mut self,
610 p: &crate::spec::PrimitiveType,
611 ) -> crate::Result<ArrowSchemaOrFieldOrType> {
612 match p {
613 crate::spec::PrimitiveType::Boolean => {
614 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Boolean))
615 }
616 crate::spec::PrimitiveType::Int => Ok(ArrowSchemaOrFieldOrType::Type(DataType::Int32)),
617 crate::spec::PrimitiveType::Long => Ok(ArrowSchemaOrFieldOrType::Type(DataType::Int64)),
618 crate::spec::PrimitiveType::Float => {
619 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Float32))
620 }
621 crate::spec::PrimitiveType::Double => {
622 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Float64))
623 }
624 crate::spec::PrimitiveType::Decimal { precision, scale } => {
625 let (precision, scale) = {
626 let precision: u8 = precision.to_owned().try_into().map_err(|err| {
627 Error::new(
628 crate::ErrorKind::DataInvalid,
629 "incompatible precision for decimal type convert",
630 )
631 .with_source(err)
632 })?;
633 let scale = scale.to_owned().try_into().map_err(|err| {
634 Error::new(
635 crate::ErrorKind::DataInvalid,
636 "incompatible scale for decimal type convert",
637 )
638 .with_source(err)
639 })?;
640 (precision, scale)
641 };
642 validate_decimal_precision_and_scale::<Decimal128Type>(precision, scale).map_err(
643 |err| {
644 Error::new(
645 crate::ErrorKind::DataInvalid,
646 "incompatible precision and scale for decimal type convert",
647 )
648 .with_source(err)
649 },
650 )?;
651 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Decimal128(
652 precision, scale,
653 )))
654 }
655 crate::spec::PrimitiveType::Date => {
656 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Date32))
657 }
658 crate::spec::PrimitiveType::Time => Ok(ArrowSchemaOrFieldOrType::Type(
659 DataType::Time64(TimeUnit::Microsecond),
660 )),
661 crate::spec::PrimitiveType::Timestamp => Ok(ArrowSchemaOrFieldOrType::Type(
662 DataType::Timestamp(TimeUnit::Microsecond, None),
663 )),
664 crate::spec::PrimitiveType::Timestamptz => Ok(ArrowSchemaOrFieldOrType::Type(
665 DataType::Timestamp(TimeUnit::Microsecond, Some(UTC_TIME_ZONE.into())),
667 )),
668 crate::spec::PrimitiveType::TimestampNs => Ok(ArrowSchemaOrFieldOrType::Type(
669 DataType::Timestamp(TimeUnit::Nanosecond, None),
670 )),
671 crate::spec::PrimitiveType::TimestamptzNs => Ok(ArrowSchemaOrFieldOrType::Type(
672 DataType::Timestamp(TimeUnit::Nanosecond, Some(UTC_TIME_ZONE.into())),
674 )),
675 crate::spec::PrimitiveType::String => {
676 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Utf8))
677 }
678 crate::spec::PrimitiveType::Uuid => Ok(ArrowSchemaOrFieldOrType::Type(
679 DataType::FixedSizeBinary(16),
680 )),
681 crate::spec::PrimitiveType::Fixed(len) => Ok(ArrowSchemaOrFieldOrType::Type(
682 i32::try_from(*len)
683 .ok()
684 .map(DataType::FixedSizeBinary)
685 .unwrap_or(DataType::LargeBinary),
686 )),
687 crate::spec::PrimitiveType::Binary => {
688 Ok(ArrowSchemaOrFieldOrType::Type(DataType::LargeBinary))
689 }
690 }
691 }
692}
693
694pub fn schema_to_arrow_schema(schema: &crate::spec::Schema) -> crate::Result<ArrowSchema> {
696 let mut converter = ToArrowSchemaConverter;
697 match crate::spec::visit_schema(schema, &mut converter)? {
698 ArrowSchemaOrFieldOrType::Schema(schema) => Ok(schema),
699 _ => unreachable!(),
700 }
701}
702
703pub fn type_to_arrow_type(ty: &crate::spec::Type) -> crate::Result<DataType> {
705 let mut converter = ToArrowSchemaConverter;
706 match crate::spec::visit_type(ty, &mut converter)? {
707 ArrowSchemaOrFieldOrType::Type(ty) => Ok(ty),
708 _ => unreachable!(),
709 }
710}
711
712pub(crate) fn get_arrow_datum(datum: &Datum) -> Result<Arc<dyn ArrowDatum + Send + Sync>> {
714 match (datum.data_type(), datum.literal()) {
715 (PrimitiveType::Boolean, PrimitiveLiteral::Boolean(value)) => {
716 Ok(Arc::new(BooleanArray::new_scalar(*value)))
717 }
718 (PrimitiveType::Int, PrimitiveLiteral::Int(value)) => {
719 Ok(Arc::new(Int32Array::new_scalar(*value)))
720 }
721 (PrimitiveType::Long, PrimitiveLiteral::Long(value)) => {
722 Ok(Arc::new(Int64Array::new_scalar(*value)))
723 }
724 (PrimitiveType::Float, PrimitiveLiteral::Float(value)) => {
725 Ok(Arc::new(Float32Array::new_scalar(value.into_inner())))
726 }
727 (PrimitiveType::Double, PrimitiveLiteral::Double(value)) => {
728 Ok(Arc::new(Float64Array::new_scalar(value.into_inner())))
729 }
730 (PrimitiveType::String, PrimitiveLiteral::String(value)) => {
731 Ok(Arc::new(StringArray::new_scalar(value.as_str())))
732 }
733 (PrimitiveType::Binary, PrimitiveLiteral::Binary(value)) => {
734 Ok(Arc::new(BinaryArray::new_scalar(value.as_slice())))
735 }
736 (PrimitiveType::Date, PrimitiveLiteral::Int(value)) => {
737 Ok(Arc::new(Date32Array::new_scalar(*value)))
738 }
739 (PrimitiveType::Timestamp, PrimitiveLiteral::Long(value)) => {
740 Ok(Arc::new(TimestampMicrosecondArray::new_scalar(*value)))
741 }
742 (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(value)) => Ok(Arc::new(Scalar::new(
743 TimestampMicrosecondArray::new(vec![*value; 1].into(), None).with_timezone_utc(),
744 ))),
745 (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(value)) => {
746 Ok(Arc::new(TimestampNanosecondArray::new_scalar(*value)))
747 }
748 (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(value)) => Ok(Arc::new(Scalar::new(
749 TimestampNanosecondArray::new(vec![*value; 1].into(), None).with_timezone_utc(),
750 ))),
751 (PrimitiveType::Decimal { precision, scale }, PrimitiveLiteral::Int128(value)) => {
752 let array = Decimal128Array::from_value(*value, 1)
753 .with_precision_and_scale(*precision as _, *scale as _)
754 .unwrap();
755 Ok(Arc::new(Scalar::new(array)))
756 }
757 (PrimitiveType::Uuid, PrimitiveLiteral::UInt128(value)) => {
758 let bytes = Uuid::from_u128(*value).into_bytes();
759 let array = FixedSizeBinaryArray::try_from_iter(vec![bytes].into_iter()).unwrap();
760 Ok(Arc::new(Scalar::new(array)))
761 }
762
763 (primitive_type, _) => Err(Error::new(
764 ErrorKind::FeatureUnsupported,
765 format!("Converting datum from type {primitive_type:?} to arrow not supported yet."),
766 )),
767 }
768}
769
770pub(crate) fn get_parquet_stat_min_as_datum(
771 primitive_type: &PrimitiveType,
772 stats: &Statistics,
773) -> Result<Option<Datum>> {
774 Ok(match (primitive_type, stats) {
775 (PrimitiveType::Boolean, Statistics::Boolean(stats)) => {
776 stats.min_opt().map(|val| Datum::bool(*val))
777 }
778 (PrimitiveType::Int, Statistics::Int32(stats)) => {
779 stats.min_opt().map(|val| Datum::int(*val))
780 }
781 (PrimitiveType::Date, Statistics::Int32(stats)) => {
782 stats.min_opt().map(|val| Datum::date(*val))
783 }
784 (PrimitiveType::Long, Statistics::Int64(stats)) => {
785 stats.min_opt().map(|val| Datum::long(*val))
786 }
787 (PrimitiveType::Time, Statistics::Int64(stats)) => {
788 let Some(val) = stats.min_opt() else {
789 return Ok(None);
790 };
791
792 Some(Datum::time_micros(*val)?)
793 }
794 (PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
795 stats.min_opt().map(|val| Datum::timestamp_micros(*val))
796 }
797 (PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
798 stats.min_opt().map(|val| Datum::timestamptz_micros(*val))
799 }
800 (PrimitiveType::TimestampNs, Statistics::Int64(stats)) => {
801 stats.min_opt().map(|val| Datum::timestamp_nanos(*val))
802 }
803 (PrimitiveType::TimestamptzNs, Statistics::Int64(stats)) => {
804 stats.min_opt().map(|val| Datum::timestamptz_nanos(*val))
805 }
806 (PrimitiveType::Float, Statistics::Float(stats)) => {
807 stats.min_opt().map(|val| Datum::float(*val))
808 }
809 (PrimitiveType::Double, Statistics::Double(stats)) => {
810 stats.min_opt().map(|val| Datum::double(*val))
811 }
812 (PrimitiveType::String, Statistics::ByteArray(stats)) => {
813 let Some(val) = stats.min_opt() else {
814 return Ok(None);
815 };
816
817 Some(Datum::string(val.as_utf8()?))
818 }
819 (
820 PrimitiveType::Decimal {
821 precision: _,
822 scale: _,
823 },
824 Statistics::ByteArray(stats),
825 ) => {
826 let Some(bytes) = stats.min_bytes_opt() else {
827 return Ok(None);
828 };
829 Some(Datum::new(
830 primitive_type.clone(),
831 PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
832 ))
833 }
834 (
835 PrimitiveType::Decimal {
836 precision: _,
837 scale: _,
838 },
839 Statistics::FixedLenByteArray(stats),
840 ) => {
841 let Some(bytes) = stats.min_bytes_opt() else {
842 return Ok(None);
843 };
844 Some(Datum::new(
845 primitive_type.clone(),
846 PrimitiveLiteral::Int128(i128_from_be_bytes(bytes).ok_or_else(|| {
847 Error::new(
848 ErrorKind::DataInvalid,
849 format!("Can't convert bytes to i128: {bytes:?}"),
850 )
851 })?),
852 ))
853 }
854 (
855 PrimitiveType::Decimal {
856 precision: _,
857 scale: _,
858 },
859 Statistics::Int32(stats),
860 ) => stats.min_opt().map(|val| {
861 Datum::new(
862 primitive_type.clone(),
863 PrimitiveLiteral::Int128(i128::from(*val)),
864 )
865 }),
866
867 (
868 PrimitiveType::Decimal {
869 precision: _,
870 scale: _,
871 },
872 Statistics::Int64(stats),
873 ) => stats.min_opt().map(|val| {
874 Datum::new(
875 primitive_type.clone(),
876 PrimitiveLiteral::Int128(i128::from(*val)),
877 )
878 }),
879 (PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) => {
880 let Some(bytes) = stats.min_bytes_opt() else {
881 return Ok(None);
882 };
883 if bytes.len() != 16 {
884 return Err(Error::new(
885 ErrorKind::Unexpected,
886 "Invalid length of uuid bytes.",
887 ));
888 }
889 Some(Datum::uuid(Uuid::from_bytes(
890 bytes[..16].try_into().unwrap(),
891 )))
892 }
893 (PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => {
894 let Some(bytes) = stat.min_bytes_opt() else {
895 return Ok(None);
896 };
897 if bytes.len() != *len as usize {
898 return Err(Error::new(
899 ErrorKind::Unexpected,
900 "Invalid length of fixed bytes.",
901 ));
902 }
903 Some(Datum::fixed(bytes.to_vec()))
904 }
905 (PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
906 return Ok(stat
907 .min_bytes_opt()
908 .map(|bytes| Datum::binary(bytes.to_vec())));
909 }
910 _ => {
911 return Ok(None);
912 }
913 })
914}
915
916pub(crate) fn get_parquet_stat_max_as_datum(
917 primitive_type: &PrimitiveType,
918 stats: &Statistics,
919) -> Result<Option<Datum>> {
920 Ok(match (primitive_type, stats) {
921 (PrimitiveType::Boolean, Statistics::Boolean(stats)) => {
922 stats.max_opt().map(|val| Datum::bool(*val))
923 }
924 (PrimitiveType::Int, Statistics::Int32(stats)) => {
925 stats.max_opt().map(|val| Datum::int(*val))
926 }
927 (PrimitiveType::Date, Statistics::Int32(stats)) => {
928 stats.max_opt().map(|val| Datum::date(*val))
929 }
930 (PrimitiveType::Long, Statistics::Int64(stats)) => {
931 stats.max_opt().map(|val| Datum::long(*val))
932 }
933 (PrimitiveType::Time, Statistics::Int64(stats)) => {
934 let Some(val) = stats.max_opt() else {
935 return Ok(None);
936 };
937
938 Some(Datum::time_micros(*val)?)
939 }
940 (PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
941 stats.max_opt().map(|val| Datum::timestamp_micros(*val))
942 }
943 (PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
944 stats.max_opt().map(|val| Datum::timestamptz_micros(*val))
945 }
946 (PrimitiveType::TimestampNs, Statistics::Int64(stats)) => {
947 stats.max_opt().map(|val| Datum::timestamp_nanos(*val))
948 }
949 (PrimitiveType::TimestamptzNs, Statistics::Int64(stats)) => {
950 stats.max_opt().map(|val| Datum::timestamptz_nanos(*val))
951 }
952 (PrimitiveType::Float, Statistics::Float(stats)) => {
953 stats.max_opt().map(|val| Datum::float(*val))
954 }
955 (PrimitiveType::Double, Statistics::Double(stats)) => {
956 stats.max_opt().map(|val| Datum::double(*val))
957 }
958 (PrimitiveType::String, Statistics::ByteArray(stats)) => {
959 let Some(val) = stats.max_opt() else {
960 return Ok(None);
961 };
962
963 Some(Datum::string(val.as_utf8()?))
964 }
965 (
966 PrimitiveType::Decimal {
967 precision: _,
968 scale: _,
969 },
970 Statistics::ByteArray(stats),
971 ) => {
972 let Some(bytes) = stats.max_bytes_opt() else {
973 return Ok(None);
974 };
975 Some(Datum::new(
976 primitive_type.clone(),
977 PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
978 ))
979 }
980 (
981 PrimitiveType::Decimal {
982 precision: _,
983 scale: _,
984 },
985 Statistics::FixedLenByteArray(stats),
986 ) => {
987 let Some(bytes) = stats.max_bytes_opt() else {
988 return Ok(None);
989 };
990 Some(Datum::new(
991 primitive_type.clone(),
992 PrimitiveLiteral::Int128(i128_from_be_bytes(bytes).ok_or_else(|| {
993 Error::new(
994 ErrorKind::DataInvalid,
995 format!("Can't convert bytes to i128: {bytes:?}"),
996 )
997 })?),
998 ))
999 }
1000 (
1001 PrimitiveType::Decimal {
1002 precision: _,
1003 scale: _,
1004 },
1005 Statistics::Int32(stats),
1006 ) => stats.max_opt().map(|val| {
1007 Datum::new(
1008 primitive_type.clone(),
1009 PrimitiveLiteral::Int128(i128::from(*val)),
1010 )
1011 }),
1012
1013 (
1014 PrimitiveType::Decimal {
1015 precision: _,
1016 scale: _,
1017 },
1018 Statistics::Int64(stats),
1019 ) => stats.max_opt().map(|val| {
1020 Datum::new(
1021 primitive_type.clone(),
1022 PrimitiveLiteral::Int128(i128::from(*val)),
1023 )
1024 }),
1025 (PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) => {
1026 let Some(bytes) = stats.max_bytes_opt() else {
1027 return Ok(None);
1028 };
1029 if bytes.len() != 16 {
1030 return Err(Error::new(
1031 ErrorKind::Unexpected,
1032 "Invalid length of uuid bytes.",
1033 ));
1034 }
1035 Some(Datum::uuid(Uuid::from_bytes(
1036 bytes[..16].try_into().unwrap(),
1037 )))
1038 }
1039 (PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => {
1040 let Some(bytes) = stat.max_bytes_opt() else {
1041 return Ok(None);
1042 };
1043 if bytes.len() != *len as usize {
1044 return Err(Error::new(
1045 ErrorKind::Unexpected,
1046 "Invalid length of fixed bytes.",
1047 ));
1048 }
1049 Some(Datum::fixed(bytes.to_vec()))
1050 }
1051 (PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
1052 return Ok(stat
1053 .max_bytes_opt()
1054 .map(|bytes| Datum::binary(bytes.to_vec())));
1055 }
1056 _ => {
1057 return Ok(None);
1058 }
1059 })
1060}
1061
1062impl TryFrom<&ArrowSchema> for crate::spec::Schema {
1063 type Error = Error;
1064
1065 fn try_from(schema: &ArrowSchema) -> crate::Result<Self> {
1066 arrow_schema_to_schema(schema)
1067 }
1068}
1069
1070impl TryFrom<&crate::spec::Schema> for ArrowSchema {
1071 type Error = Error;
1072
1073 fn try_from(schema: &crate::spec::Schema) -> crate::Result<Self> {
1074 schema_to_arrow_schema(schema)
1075 }
1076}
1077
1078pub fn datum_to_arrow_type_with_ree(datum: &Datum) -> DataType {
1100 let make_ree = |values_type: DataType| -> DataType {
1104 let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false));
1105 let values_field = Arc::new(Field::new("values", values_type, true));
1106 DataType::RunEndEncoded(run_ends_field, values_field)
1107 };
1108
1109 match datum.data_type() {
1111 PrimitiveType::Boolean => make_ree(DataType::Boolean),
1112 PrimitiveType::Int => make_ree(DataType::Int32),
1113 PrimitiveType::Long => make_ree(DataType::Int64),
1114 PrimitiveType::Float => make_ree(DataType::Float32),
1115 PrimitiveType::Double => make_ree(DataType::Float64),
1116 PrimitiveType::Date => make_ree(DataType::Date32),
1117 PrimitiveType::Time => make_ree(DataType::Int64),
1118 PrimitiveType::Timestamp => make_ree(DataType::Int64),
1119 PrimitiveType::Timestamptz => make_ree(DataType::Int64),
1120 PrimitiveType::TimestampNs => make_ree(DataType::Int64),
1121 PrimitiveType::TimestamptzNs => make_ree(DataType::Int64),
1122 PrimitiveType::String => make_ree(DataType::Utf8),
1123 PrimitiveType::Uuid => make_ree(DataType::Binary),
1124 PrimitiveType::Fixed(_) => make_ree(DataType::Binary),
1125 PrimitiveType::Binary => make_ree(DataType::Binary),
1126 PrimitiveType::Decimal { precision, scale } => {
1127 make_ree(DataType::Decimal128(*precision as u8, *scale as i8))
1128 }
1129 }
1130}
1131
1132struct MetadataStripVisitor {
1138 field_stack: Vec<Field>,
1140}
1141
1142impl MetadataStripVisitor {
1143 fn new() -> Self {
1144 Self {
1145 field_stack: Vec::new(),
1146 }
1147 }
1148}
1149
1150impl ArrowSchemaVisitor for MetadataStripVisitor {
1151 type T = Field;
1152 type U = ArrowSchema;
1153
1154 fn before_field(&mut self, field: &Field) -> Result<()> {
1155 self.field_stack.push(Field::new(
1157 field.name(),
1158 DataType::Null, field.is_nullable(),
1160 ));
1161 Ok(())
1162 }
1163
1164 fn after_field(&mut self, _field: &Field) -> Result<()> {
1165 Ok(())
1166 }
1167
1168 fn schema(&mut self, _schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U> {
1169 Ok(ArrowSchema::new(values))
1170 }
1171
1172 fn r#struct(&mut self, _fields: &Fields, results: Vec<Self::T>) -> Result<Self::T> {
1173 let field_info = self
1175 .field_stack
1176 .pop()
1177 .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Field stack underflow in struct"))?;
1178
1179 Ok(Field::new(
1181 field_info.name(),
1182 DataType::Struct(Fields::from(results)),
1183 field_info.is_nullable(),
1184 ))
1185 }
1186
1187 fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T> {
1188 let field_info = self
1190 .field_stack
1191 .pop()
1192 .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Field stack underflow in list"))?;
1193
1194 let list_type = match list {
1196 DataType::List(_) => DataType::List(Arc::new(value)),
1197 DataType::LargeList(_) => DataType::LargeList(Arc::new(value)),
1198 DataType::FixedSizeList(_, size) => DataType::FixedSizeList(Arc::new(value), *size),
1199 _ => {
1200 return Err(Error::new(
1201 ErrorKind::Unexpected,
1202 format!("Expected list type, got {list}"),
1203 ));
1204 }
1205 };
1206
1207 Ok(Field::new(
1208 field_info.name(),
1209 list_type,
1210 field_info.is_nullable(),
1211 ))
1212 }
1213
1214 fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result<Self::T> {
1215 let field_info = self
1217 .field_stack
1218 .pop()
1219 .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Field stack underflow in map"))?;
1220
1221 let struct_field = Field::new(
1223 DEFAULT_MAP_FIELD_NAME,
1224 DataType::Struct(Fields::from(vec![key_value, value])),
1225 false,
1226 );
1227
1228 let sorted = match map {
1230 DataType::Map(_, sorted) => *sorted,
1231 _ => {
1232 return Err(Error::new(
1233 ErrorKind::Unexpected,
1234 format!("Expected map type, got {map}"),
1235 ));
1236 }
1237 };
1238
1239 Ok(Field::new(
1241 field_info.name(),
1242 DataType::Map(Arc::new(struct_field), sorted),
1243 field_info.is_nullable(),
1244 ))
1245 }
1246
1247 fn primitive(&mut self, p: &DataType) -> Result<Self::T> {
1248 let field_info = self.field_stack.pop().ok_or_else(|| {
1250 Error::new(ErrorKind::Unexpected, "Field stack underflow in primitive")
1251 })?;
1252
1253 Ok(Field::new(
1255 field_info.name(),
1256 p.clone(),
1257 field_info.is_nullable(),
1258 ))
1259 }
1260}
1261
1262pub fn strip_metadata_from_schema(schema: &ArrowSchema) -> Result<ArrowSchema> {
1292 let mut visitor = MetadataStripVisitor::new();
1293 visit_schema(schema, &mut visitor)
1294}
1295
1296#[cfg(test)]
1297mod tests {
1298 use std::collections::HashMap;
1299 use std::sync::Arc;
1300
1301 use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1302
1303 use super::*;
1304 use crate::spec::decimal_utils::decimal_new;
1305 use crate::spec::{Literal, Schema};
1306
1307 fn simple_field(name: &str, ty: DataType, nullable: bool, value: &str) -> Field {
1309 Field::new(name, ty, nullable).with_metadata(HashMap::from([(
1310 PARQUET_FIELD_ID_META_KEY.to_string(),
1311 value.to_string(),
1312 )]))
1313 }
1314
1315 fn arrow_schema_for_arrow_schema_to_schema_test() -> ArrowSchema {
1316 let fields = Fields::from(vec![
1317 simple_field("key", DataType::Int32, false, "28"),
1318 simple_field("value", DataType::Utf8, true, "29"),
1319 ]);
1320
1321 let r#struct = DataType::Struct(fields);
1322 let map = DataType::Map(
1323 Arc::new(simple_field(DEFAULT_MAP_FIELD_NAME, r#struct, false, "17")),
1324 false,
1325 );
1326 let dictionary = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1327
1328 let fields = Fields::from(vec![
1329 simple_field("aa", DataType::Int32, false, "18"),
1330 simple_field("bb", DataType::Utf8, true, "19"),
1331 simple_field(
1332 "cc",
1333 DataType::Timestamp(TimeUnit::Microsecond, None),
1334 false,
1335 "20",
1336 ),
1337 ]);
1338
1339 let r#struct = DataType::Struct(fields);
1340
1341 ArrowSchema::new(vec![
1342 simple_field("a", DataType::Int32, false, "2"),
1343 simple_field("b", DataType::Int64, false, "1"),
1344 simple_field("c", DataType::Utf8, false, "3"),
1345 simple_field("n", DataType::Utf8, false, "21"),
1346 simple_field(
1347 "d",
1348 DataType::Timestamp(TimeUnit::Microsecond, None),
1349 true,
1350 "4",
1351 ),
1352 simple_field("e", DataType::Boolean, true, "6"),
1353 simple_field("f", DataType::Float32, false, "5"),
1354 simple_field("g", DataType::Float64, false, "7"),
1355 simple_field("p", DataType::Decimal128(10, 2), false, "27"),
1356 simple_field("h", DataType::Date32, false, "8"),
1357 simple_field("i", DataType::Time64(TimeUnit::Microsecond), false, "9"),
1358 simple_field(
1359 "j",
1360 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1361 false,
1362 "10",
1363 ),
1364 simple_field(
1365 "k",
1366 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1367 false,
1368 "12",
1369 ),
1370 simple_field("l", DataType::Binary, false, "13"),
1371 simple_field("o", DataType::LargeBinary, false, "22"),
1372 simple_field("m", DataType::FixedSizeBinary(10), false, "11"),
1373 simple_field(
1374 "list",
1375 DataType::List(Arc::new(simple_field(
1376 "element",
1377 DataType::Int32,
1378 false,
1379 "15",
1380 ))),
1381 true,
1382 "14",
1383 ),
1384 simple_field(
1385 "large_list",
1386 DataType::LargeList(Arc::new(simple_field(
1387 "element",
1388 DataType::Utf8,
1389 false,
1390 "23",
1391 ))),
1392 true,
1393 "24",
1394 ),
1395 simple_field(
1396 "fixed_list",
1397 DataType::FixedSizeList(
1398 Arc::new(simple_field("element", DataType::Binary, false, "26")),
1399 10,
1400 ),
1401 true,
1402 "25",
1403 ),
1404 simple_field("map", map, false, "16"),
1405 simple_field("struct", r#struct, false, "17"),
1406 simple_field("dictionary", dictionary, false, "30"),
1407 ])
1408 }
1409
1410 fn iceberg_schema_for_arrow_schema_to_schema_test() -> Schema {
1411 let schema_json = r#"{
1412 "type":"struct",
1413 "schema-id":0,
1414 "fields":[
1415 {
1416 "id":2,
1417 "name":"a",
1418 "required":true,
1419 "type":"int"
1420 },
1421 {
1422 "id":1,
1423 "name":"b",
1424 "required":true,
1425 "type":"long"
1426 },
1427 {
1428 "id":3,
1429 "name":"c",
1430 "required":true,
1431 "type":"string"
1432 },
1433 {
1434 "id":21,
1435 "name":"n",
1436 "required":true,
1437 "type":"string"
1438 },
1439 {
1440 "id":4,
1441 "name":"d",
1442 "required":false,
1443 "type":"timestamp"
1444 },
1445 {
1446 "id":6,
1447 "name":"e",
1448 "required":false,
1449 "type":"boolean"
1450 },
1451 {
1452 "id":5,
1453 "name":"f",
1454 "required":true,
1455 "type":"float"
1456 },
1457 {
1458 "id":7,
1459 "name":"g",
1460 "required":true,
1461 "type":"double"
1462 },
1463 {
1464 "id":27,
1465 "name":"p",
1466 "required":true,
1467 "type":"decimal(10,2)"
1468 },
1469 {
1470 "id":8,
1471 "name":"h",
1472 "required":true,
1473 "type":"date"
1474 },
1475 {
1476 "id":9,
1477 "name":"i",
1478 "required":true,
1479 "type":"time"
1480 },
1481 {
1482 "id":10,
1483 "name":"j",
1484 "required":true,
1485 "type":"timestamptz"
1486 },
1487 {
1488 "id":12,
1489 "name":"k",
1490 "required":true,
1491 "type":"timestamptz"
1492 },
1493 {
1494 "id":13,
1495 "name":"l",
1496 "required":true,
1497 "type":"binary"
1498 },
1499 {
1500 "id":22,
1501 "name":"o",
1502 "required":true,
1503 "type":"binary"
1504 },
1505 {
1506 "id":11,
1507 "name":"m",
1508 "required":true,
1509 "type":"fixed[10]"
1510 },
1511 {
1512 "id":14,
1513 "name":"list",
1514 "required": false,
1515 "type": {
1516 "type": "list",
1517 "element-id": 15,
1518 "element-required": true,
1519 "element": "int"
1520 }
1521 },
1522 {
1523 "id":24,
1524 "name":"large_list",
1525 "required": false,
1526 "type": {
1527 "type": "list",
1528 "element-id": 23,
1529 "element-required": true,
1530 "element": "string"
1531 }
1532 },
1533 {
1534 "id":25,
1535 "name":"fixed_list",
1536 "required": false,
1537 "type": {
1538 "type": "list",
1539 "element-id": 26,
1540 "element-required": true,
1541 "element": "binary"
1542 }
1543 },
1544 {
1545 "id":16,
1546 "name":"map",
1547 "required": true,
1548 "type": {
1549 "type": "map",
1550 "key-id": 28,
1551 "key": "int",
1552 "value-id": 29,
1553 "value-required": false,
1554 "value": "string"
1555 }
1556 },
1557 {
1558 "id":17,
1559 "name":"struct",
1560 "required": true,
1561 "type": {
1562 "type": "struct",
1563 "fields": [
1564 {
1565 "id":18,
1566 "name":"aa",
1567 "required":true,
1568 "type":"int"
1569 },
1570 {
1571 "id":19,
1572 "name":"bb",
1573 "required":false,
1574 "type":"string"
1575 },
1576 {
1577 "id":20,
1578 "name":"cc",
1579 "required":true,
1580 "type":"timestamp"
1581 }
1582 ]
1583 }
1584 },
1585 {
1586 "id":30,
1587 "name":"dictionary",
1588 "required":true,
1589 "type":"string"
1590 }
1591 ],
1592 "identifier-field-ids":[]
1593 }"#;
1594
1595 let schema: Schema = serde_json::from_str(schema_json).unwrap();
1596 schema
1597 }
1598
1599 #[test]
1600 fn test_arrow_schema_to_schema() {
1601 let arrow_schema = arrow_schema_for_arrow_schema_to_schema_test();
1602 let schema = iceberg_schema_for_arrow_schema_to_schema_test();
1603 let converted_schema = arrow_schema_to_schema(&arrow_schema).unwrap();
1604 pretty_assertions::assert_eq!(converted_schema, schema);
1605 }
1606
1607 fn arrow_schema_for_schema_to_arrow_schema_test() -> ArrowSchema {
1608 let fields = Fields::from(vec![
1609 simple_field("key", DataType::Int32, false, "28"),
1610 simple_field("value", DataType::Utf8, true, "29"),
1611 ]);
1612
1613 let r#struct = DataType::Struct(fields);
1614 let map = DataType::Map(
1615 Arc::new(Field::new(DEFAULT_MAP_FIELD_NAME, r#struct, false)),
1616 false,
1617 );
1618
1619 let fields = Fields::from(vec![
1620 simple_field("aa", DataType::Int32, false, "18"),
1621 simple_field("bb", DataType::Utf8, true, "19"),
1622 simple_field(
1623 "cc",
1624 DataType::Timestamp(TimeUnit::Microsecond, None),
1625 false,
1626 "20",
1627 ),
1628 ]);
1629
1630 let r#struct = DataType::Struct(fields);
1631
1632 ArrowSchema::new(vec![
1633 simple_field("a", DataType::Int32, false, "2"),
1634 simple_field("b", DataType::Int64, false, "1"),
1635 simple_field("c", DataType::Utf8, false, "3"),
1636 simple_field("n", DataType::Utf8, false, "21"),
1637 simple_field(
1638 "d",
1639 DataType::Timestamp(TimeUnit::Microsecond, None),
1640 true,
1641 "4",
1642 ),
1643 simple_field("e", DataType::Boolean, true, "6"),
1644 simple_field("f", DataType::Float32, false, "5"),
1645 simple_field("g", DataType::Float64, false, "7"),
1646 simple_field("p", DataType::Decimal128(10, 2), false, "27"),
1647 simple_field("h", DataType::Date32, false, "8"),
1648 simple_field("i", DataType::Time64(TimeUnit::Microsecond), false, "9"),
1649 simple_field(
1650 "j",
1651 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1652 false,
1653 "10",
1654 ),
1655 simple_field(
1656 "k",
1657 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1658 false,
1659 "12",
1660 ),
1661 simple_field("l", DataType::LargeBinary, false, "13"),
1662 simple_field("o", DataType::LargeBinary, false, "22"),
1663 simple_field("m", DataType::FixedSizeBinary(10), false, "11"),
1664 simple_field(
1665 "list",
1666 DataType::List(Arc::new(simple_field(
1667 "element",
1668 DataType::Int32,
1669 false,
1670 "15",
1671 ))),
1672 true,
1673 "14",
1674 ),
1675 simple_field(
1676 "large_list",
1677 DataType::List(Arc::new(simple_field(
1678 "element",
1679 DataType::Utf8,
1680 false,
1681 "23",
1682 ))),
1683 true,
1684 "24",
1685 ),
1686 simple_field(
1687 "fixed_list",
1688 DataType::List(Arc::new(simple_field(
1689 "element",
1690 DataType::LargeBinary,
1691 false,
1692 "26",
1693 ))),
1694 true,
1695 "25",
1696 ),
1697 simple_field("map", map, false, "16"),
1698 simple_field("struct", r#struct, false, "17"),
1699 simple_field("uuid", DataType::FixedSizeBinary(16), false, "30"),
1700 ])
1701 }
1702
1703 fn iceberg_schema_for_schema_to_arrow_schema() -> Schema {
1704 let schema_json = r#"{
1705 "type":"struct",
1706 "schema-id":0,
1707 "fields":[
1708 {
1709 "id":2,
1710 "name":"a",
1711 "required":true,
1712 "type":"int"
1713 },
1714 {
1715 "id":1,
1716 "name":"b",
1717 "required":true,
1718 "type":"long"
1719 },
1720 {
1721 "id":3,
1722 "name":"c",
1723 "required":true,
1724 "type":"string"
1725 },
1726 {
1727 "id":21,
1728 "name":"n",
1729 "required":true,
1730 "type":"string"
1731 },
1732 {
1733 "id":4,
1734 "name":"d",
1735 "required":false,
1736 "type":"timestamp"
1737 },
1738 {
1739 "id":6,
1740 "name":"e",
1741 "required":false,
1742 "type":"boolean"
1743 },
1744 {
1745 "id":5,
1746 "name":"f",
1747 "required":true,
1748 "type":"float"
1749 },
1750 {
1751 "id":7,
1752 "name":"g",
1753 "required":true,
1754 "type":"double"
1755 },
1756 {
1757 "id":27,
1758 "name":"p",
1759 "required":true,
1760 "type":"decimal(10,2)"
1761 },
1762 {
1763 "id":8,
1764 "name":"h",
1765 "required":true,
1766 "type":"date"
1767 },
1768 {
1769 "id":9,
1770 "name":"i",
1771 "required":true,
1772 "type":"time"
1773 },
1774 {
1775 "id":10,
1776 "name":"j",
1777 "required":true,
1778 "type":"timestamptz"
1779 },
1780 {
1781 "id":12,
1782 "name":"k",
1783 "required":true,
1784 "type":"timestamptz"
1785 },
1786 {
1787 "id":13,
1788 "name":"l",
1789 "required":true,
1790 "type":"binary"
1791 },
1792 {
1793 "id":22,
1794 "name":"o",
1795 "required":true,
1796 "type":"binary"
1797 },
1798 {
1799 "id":11,
1800 "name":"m",
1801 "required":true,
1802 "type":"fixed[10]"
1803 },
1804 {
1805 "id":14,
1806 "name":"list",
1807 "required": false,
1808 "type": {
1809 "type": "list",
1810 "element-id": 15,
1811 "element-required": true,
1812 "element": "int"
1813 }
1814 },
1815 {
1816 "id":24,
1817 "name":"large_list",
1818 "required": false,
1819 "type": {
1820 "type": "list",
1821 "element-id": 23,
1822 "element-required": true,
1823 "element": "string"
1824 }
1825 },
1826 {
1827 "id":25,
1828 "name":"fixed_list",
1829 "required": false,
1830 "type": {
1831 "type": "list",
1832 "element-id": 26,
1833 "element-required": true,
1834 "element": "binary"
1835 }
1836 },
1837 {
1838 "id":16,
1839 "name":"map",
1840 "required": true,
1841 "type": {
1842 "type": "map",
1843 "key-id": 28,
1844 "key": "int",
1845 "value-id": 29,
1846 "value-required": false,
1847 "value": "string"
1848 }
1849 },
1850 {
1851 "id":17,
1852 "name":"struct",
1853 "required": true,
1854 "type": {
1855 "type": "struct",
1856 "fields": [
1857 {
1858 "id":18,
1859 "name":"aa",
1860 "required":true,
1861 "type":"int"
1862 },
1863 {
1864 "id":19,
1865 "name":"bb",
1866 "required":false,
1867 "type":"string"
1868 },
1869 {
1870 "id":20,
1871 "name":"cc",
1872 "required":true,
1873 "type":"timestamp"
1874 }
1875 ]
1876 }
1877 },
1878 {
1879 "id":30,
1880 "name":"uuid",
1881 "required":true,
1882 "type":"uuid"
1883 }
1884 ],
1885 "identifier-field-ids":[]
1886 }"#;
1887
1888 let schema: Schema = serde_json::from_str(schema_json).unwrap();
1889 schema
1890 }
1891
1892 #[test]
1893 fn test_schema_to_arrow_schema() {
1894 let arrow_schema = arrow_schema_for_schema_to_arrow_schema_test();
1895 let schema = iceberg_schema_for_schema_to_arrow_schema();
1896 let converted_arrow_schema = schema_to_arrow_schema(&schema).unwrap();
1897 assert_eq!(converted_arrow_schema, arrow_schema);
1898 }
1899
1900 #[test]
1901 fn test_type_conversion() {
1902 {
1904 let arrow_type = DataType::Int32;
1905 let iceberg_type = Type::Primitive(PrimitiveType::Int);
1906 assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1907 assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1908 }
1909
1910 {
1912 let arrow_type = DataType::Struct(Fields::from(vec![
1914 Field::new("a", DataType::Int64, false),
1915 Field::new("b", DataType::Utf8, true),
1916 ]));
1917 assert_eq!(
1918 &arrow_type_to_type(&arrow_type).unwrap_err().to_string(),
1919 "DataInvalid => Field id not found in metadata"
1920 );
1921
1922 let arrow_type = DataType::Struct(Fields::from(vec![
1923 Field::new("a", DataType::Int64, false).with_metadata(HashMap::from_iter([(
1924 PARQUET_FIELD_ID_META_KEY.to_string(),
1925 1.to_string(),
1926 )])),
1927 Field::new("b", DataType::Utf8, true).with_metadata(HashMap::from_iter([(
1928 PARQUET_FIELD_ID_META_KEY.to_string(),
1929 2.to_string(),
1930 )])),
1931 ]));
1932 let iceberg_type = Type::Struct(StructType::new(vec![
1933 NestedField {
1934 id: 1,
1935 doc: None,
1936 name: "a".to_string(),
1937 required: true,
1938 field_type: Box::new(Type::Primitive(PrimitiveType::Long)),
1939 initial_default: None,
1940 write_default: None,
1941 }
1942 .into(),
1943 NestedField {
1944 id: 2,
1945 doc: None,
1946 name: "b".to_string(),
1947 required: false,
1948 field_type: Box::new(Type::Primitive(PrimitiveType::String)),
1949 initial_default: None,
1950 write_default: None,
1951 }
1952 .into(),
1953 ]));
1954 assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1955 assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1956
1957 let iceberg_type = Type::Struct(StructType::new(vec![
1959 NestedField {
1960 id: 1,
1961 doc: None,
1962 name: "a".to_string(),
1963 required: true,
1964 field_type: Box::new(Type::Primitive(PrimitiveType::Long)),
1965 initial_default: Some(Literal::Primitive(PrimitiveLiteral::Int(114514))),
1966 write_default: None,
1967 }
1968 .into(),
1969 NestedField {
1970 id: 2,
1971 doc: None,
1972 name: "b".to_string(),
1973 required: false,
1974 field_type: Box::new(Type::Primitive(PrimitiveType::String)),
1975 initial_default: None,
1976 write_default: Some(Literal::Primitive(PrimitiveLiteral::String(
1977 "514".to_string(),
1978 ))),
1979 }
1980 .into(),
1981 ]));
1982 assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1983 }
1984
1985 {
1987 let arrow_type =
1988 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int8));
1989 let iceberg_type = Type::Primitive(PrimitiveType::Int);
1990 assert_eq!(
1991 iceberg_type,
1992 arrow_type_to_type(&arrow_type).unwrap(),
1993 "Expected dictionary conversion to use the contained value"
1994 );
1995
1996 let arrow_type =
1997 DataType::Dictionary(Box::new(DataType::Utf8), Box::new(DataType::Boolean));
1998 let iceberg_type = Type::Primitive(PrimitiveType::Boolean);
1999 assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
2000 }
2001 }
2002
2003 #[test]
2004 fn test_unsigned_integer_type_conversion() {
2005 let test_cases = vec![
2006 (DataType::UInt8, PrimitiveType::Int),
2007 (DataType::UInt16, PrimitiveType::Int),
2008 (DataType::UInt32, PrimitiveType::Long),
2009 ];
2010
2011 for (arrow_type, expected_iceberg_type) in test_cases {
2012 let arrow_field = Field::new("test", arrow_type.clone(), false).with_metadata(
2013 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
2014 );
2015 let arrow_schema = ArrowSchema::new(vec![arrow_field]);
2016
2017 let iceberg_schema = arrow_schema_to_schema(&arrow_schema).unwrap();
2018 let iceberg_field = iceberg_schema.as_struct().fields().first().unwrap();
2019
2020 assert!(
2021 matches!(iceberg_field.field_type.as_ref(), Type::Primitive(t) if *t == expected_iceberg_type),
2022 "Expected {arrow_type:?} to map to {expected_iceberg_type:?}"
2023 );
2024 }
2025
2026 {
2028 let arrow_field = Field::new("test", DataType::UInt64, false).with_metadata(
2029 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
2030 );
2031 let arrow_schema = ArrowSchema::new(vec![arrow_field]);
2032
2033 let result = arrow_schema_to_schema(&arrow_schema);
2034 assert!(result.is_err());
2035 assert!(
2036 result
2037 .unwrap_err()
2038 .to_string()
2039 .contains("UInt64 is not supported")
2040 );
2041 }
2042 }
2043
2044 #[test]
2045 fn test_datum_conversion() {
2046 {
2047 let datum = Datum::bool(true);
2048 let arrow_datum = get_arrow_datum(&datum).unwrap();
2049 let (array, is_scalar) = arrow_datum.get();
2050 let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
2051 assert!(is_scalar);
2052 assert!(array.value(0));
2053 }
2054 {
2055 let datum = Datum::int(42);
2056 let arrow_datum = get_arrow_datum(&datum).unwrap();
2057 let (array, is_scalar) = arrow_datum.get();
2058 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
2059 assert!(is_scalar);
2060 assert_eq!(array.value(0), 42);
2061 }
2062 {
2063 let datum = Datum::long(42);
2064 let arrow_datum = get_arrow_datum(&datum).unwrap();
2065 let (array, is_scalar) = arrow_datum.get();
2066 let array = array.as_any().downcast_ref::<Int64Array>().unwrap();
2067 assert!(is_scalar);
2068 assert_eq!(array.value(0), 42);
2069 }
2070 {
2071 let datum = Datum::float(42.42);
2072 let arrow_datum = get_arrow_datum(&datum).unwrap();
2073 let (array, is_scalar) = arrow_datum.get();
2074 let array = array.as_any().downcast_ref::<Float32Array>().unwrap();
2075 assert!(is_scalar);
2076 assert_eq!(array.value(0), 42.42);
2077 }
2078 {
2079 let datum = Datum::double(42.42);
2080 let arrow_datum = get_arrow_datum(&datum).unwrap();
2081 let (array, is_scalar) = arrow_datum.get();
2082 let array = array.as_any().downcast_ref::<Float64Array>().unwrap();
2083 assert!(is_scalar);
2084 assert_eq!(array.value(0), 42.42);
2085 }
2086 {
2087 let datum = Datum::string("abc");
2088 let arrow_datum = get_arrow_datum(&datum).unwrap();
2089 let (array, is_scalar) = arrow_datum.get();
2090 let array = array.as_any().downcast_ref::<StringArray>().unwrap();
2091 assert!(is_scalar);
2092 assert_eq!(array.value(0), "abc");
2093 }
2094 {
2095 let datum = Datum::binary(vec![1, 2, 3, 4]);
2096 let arrow_datum = get_arrow_datum(&datum).unwrap();
2097 let (array, is_scalar) = arrow_datum.get();
2098 let array = array.as_any().downcast_ref::<BinaryArray>().unwrap();
2099 assert!(is_scalar);
2100 assert_eq!(array.value(0), &[1, 2, 3, 4]);
2101 }
2102 {
2103 let datum = Datum::date(42);
2104 let arrow_datum = get_arrow_datum(&datum).unwrap();
2105 let (array, is_scalar) = arrow_datum.get();
2106 let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
2107 assert!(is_scalar);
2108 assert_eq!(array.value(0), 42);
2109 }
2110 {
2111 let datum = Datum::timestamp_micros(42);
2112 let arrow_datum = get_arrow_datum(&datum).unwrap();
2113 let (array, is_scalar) = arrow_datum.get();
2114 let array = array
2115 .as_any()
2116 .downcast_ref::<TimestampMicrosecondArray>()
2117 .unwrap();
2118 assert!(is_scalar);
2119 assert_eq!(array.value(0), 42);
2120 }
2121 {
2122 let datum = Datum::timestamptz_micros(42);
2123 let arrow_datum = get_arrow_datum(&datum).unwrap();
2124 let (array, is_scalar) = arrow_datum.get();
2125 let array = array
2126 .as_any()
2127 .downcast_ref::<TimestampMicrosecondArray>()
2128 .unwrap();
2129 assert!(is_scalar);
2130 assert_eq!(array.timezone(), Some("+00:00"));
2131 assert_eq!(array.value(0), 42);
2132 }
2133 {
2134 let datum = Datum::decimal_with_precision(decimal_new(123, 2), 30).unwrap();
2135 let arrow_datum = get_arrow_datum(&datum).unwrap();
2136 let (array, is_scalar) = arrow_datum.get();
2137 let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
2138 assert!(is_scalar);
2139 assert_eq!(array.precision(), 30);
2140 assert_eq!(array.scale(), 2);
2141 assert_eq!(array.value(0), 123);
2142 }
2143 {
2144 let datum = Datum::uuid_from_str("42424242-4242-4242-4242-424242424242").unwrap();
2145 let arrow_datum = get_arrow_datum(&datum).unwrap();
2146 let (array, is_scalar) = arrow_datum.get();
2147 let array = array
2148 .as_any()
2149 .downcast_ref::<FixedSizeBinaryArray>()
2150 .unwrap();
2151 assert!(is_scalar);
2152 assert_eq!(array.value(0), [66u8; 16]);
2153 }
2154 }
2155
2156 #[test]
2157 fn test_arrow_schema_to_schema_with_field_id() {
2158 let arrow_schema = ArrowSchema::new(vec![
2161 Field::new("id", DataType::Int64, false),
2162 Field::new("name", DataType::Utf8, true),
2163 Field::new("price", DataType::Decimal128(10, 2), false),
2164 Field::new(
2165 "created_at",
2166 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
2167 true,
2168 ),
2169 Field::new(
2170 "tags",
2171 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2172 true,
2173 ),
2174 Field::new(
2175 "address",
2176 DataType::Struct(Fields::from(vec![
2177 Field::new("street", DataType::Utf8, true),
2178 Field::new("city", DataType::Utf8, false),
2179 Field::new("zip", DataType::Int32, true),
2180 ])),
2181 true,
2182 ),
2183 Field::new(
2184 "attributes",
2185 DataType::Map(
2186 Arc::new(Field::new(
2187 DEFAULT_MAP_FIELD_NAME,
2188 DataType::Struct(Fields::from(vec![
2189 Field::new("key", DataType::Utf8, false),
2190 Field::new("value", DataType::Utf8, true),
2191 ])),
2192 false,
2193 )),
2194 false,
2195 ),
2196 true,
2197 ),
2198 Field::new(
2199 "orders",
2200 DataType::List(Arc::new(Field::new(
2201 "element",
2202 DataType::Struct(Fields::from(vec![
2203 Field::new("order_id", DataType::Int64, false),
2204 Field::new("amount", DataType::Float64, false),
2205 ])),
2206 true,
2207 ))),
2208 true,
2209 ),
2210 ]);
2211
2212 let schema = arrow_schema_to_schema_auto_assign_ids(&arrow_schema).unwrap();
2213
2214 let expected = Schema::builder()
2219 .with_fields(vec![
2220 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
2221 NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
2222 NestedField::required(
2223 3,
2224 "price",
2225 Type::Primitive(PrimitiveType::Decimal {
2226 precision: 10,
2227 scale: 2,
2228 }),
2229 )
2230 .into(),
2231 NestedField::optional(4, "created_at", Type::Primitive(PrimitiveType::Timestamptz))
2232 .into(),
2233 NestedField::optional(
2234 5,
2235 "tags",
2236 Type::List(ListType {
2237 element_field: NestedField::list_element(
2238 9,
2239 Type::Primitive(PrimitiveType::String),
2240 false,
2241 )
2242 .into(),
2243 }),
2244 )
2245 .into(),
2246 NestedField::optional(
2247 6,
2248 "address",
2249 Type::Struct(StructType::new(vec![
2250 NestedField::optional(10, "street", Type::Primitive(PrimitiveType::String))
2251 .into(),
2252 NestedField::required(11, "city", Type::Primitive(PrimitiveType::String))
2253 .into(),
2254 NestedField::optional(12, "zip", Type::Primitive(PrimitiveType::Int))
2255 .into(),
2256 ])),
2257 )
2258 .into(),
2259 NestedField::optional(
2260 7,
2261 "attributes",
2262 Type::Map(MapType {
2263 key_field: NestedField::map_key_element(
2264 13,
2265 Type::Primitive(PrimitiveType::String),
2266 )
2267 .into(),
2268 value_field: NestedField::map_value_element(
2269 14,
2270 Type::Primitive(PrimitiveType::String),
2271 false,
2272 )
2273 .into(),
2274 }),
2275 )
2276 .into(),
2277 NestedField::optional(
2278 8,
2279 "orders",
2280 Type::List(ListType {
2281 element_field: NestedField::list_element(
2282 15,
2283 Type::Struct(StructType::new(vec![
2284 NestedField::required(
2285 16,
2286 "order_id",
2287 Type::Primitive(PrimitiveType::Long),
2288 )
2289 .into(),
2290 NestedField::required(
2291 17,
2292 "amount",
2293 Type::Primitive(PrimitiveType::Double),
2294 )
2295 .into(),
2296 ])),
2297 false,
2298 )
2299 .into(),
2300 }),
2301 )
2302 .into(),
2303 ])
2304 .build()
2305 .unwrap();
2306
2307 pretty_assertions::assert_eq!(schema, expected);
2308 assert_eq!(schema.highest_field_id(), 17);
2309 }
2310}