1use std::collections::HashMap;
21use std::sync::Arc;
22
23use arrow_array::types::{Decimal128Type, validate_decimal_precision_and_scale};
24use arrow_array::{
25 BinaryArray, BooleanArray, Date32Array, Datum as ArrowDatum, Decimal128Array,
26 FixedSizeBinaryArray, Float32Array, Float64Array, Int32Array, Int64Array, Scalar, StringArray,
27 TimestampMicrosecondArray,
28};
29use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
30use num_bigint::BigInt;
31use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
32use parquet::file::statistics::Statistics;
33use rust_decimal::prelude::ToPrimitive;
34use uuid::Uuid;
35
36use crate::error::Result;
37use crate::spec::{
38 Datum, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral, PrimitiveType, Schema,
39 SchemaVisitor, StructType, Type,
40};
41use crate::{Error, ErrorKind};
42
43pub const DEFAULT_MAP_FIELD_NAME: &str = "key_value";
45pub const UTC_TIME_ZONE: &str = "+00:00";
47
48pub trait ArrowSchemaVisitor {
52 type T;
54
55 type U;
57
58 fn before_field(&mut self, _field: &Field) -> Result<()> {
60 Ok(())
61 }
62
63 fn after_field(&mut self, _field: &Field) -> Result<()> {
65 Ok(())
66 }
67
68 fn before_list_element(&mut self, _field: &Field) -> Result<()> {
70 Ok(())
71 }
72
73 fn after_list_element(&mut self, _field: &Field) -> Result<()> {
75 Ok(())
76 }
77
78 fn before_map_key(&mut self, _field: &Field) -> Result<()> {
80 Ok(())
81 }
82
83 fn after_map_key(&mut self, _field: &Field) -> Result<()> {
85 Ok(())
86 }
87
88 fn before_map_value(&mut self, _field: &Field) -> Result<()> {
90 Ok(())
91 }
92
93 fn after_map_value(&mut self, _field: &Field) -> Result<()> {
95 Ok(())
96 }
97
98 fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U>;
100
101 fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> Result<Self::T>;
103
104 fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T>;
106
107 fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result<Self::T>;
109
110 fn primitive(&mut self, p: &DataType) -> Result<Self::T>;
112}
113
114fn visit_type<V: ArrowSchemaVisitor>(r#type: &DataType, visitor: &mut V) -> Result<V::T> {
116 match r#type {
117 p if p.is_primitive()
118 || matches!(
119 p,
120 DataType::Boolean
121 | DataType::Utf8
122 | DataType::LargeUtf8
123 | DataType::Utf8View
124 | DataType::Binary
125 | DataType::LargeBinary
126 | DataType::BinaryView
127 | DataType::FixedSizeBinary(_)
128 ) =>
129 {
130 visitor.primitive(p)
131 }
132 DataType::List(element_field) => visit_list(r#type, element_field, visitor),
133 DataType::LargeList(element_field) => visit_list(r#type, element_field, visitor),
134 DataType::FixedSizeList(element_field, _) => visit_list(r#type, element_field, visitor),
135 DataType::Map(field, _) => match field.data_type() {
136 DataType::Struct(fields) => {
137 if fields.len() != 2 {
138 return Err(Error::new(
139 ErrorKind::DataInvalid,
140 "Map field must have exactly 2 fields",
141 ));
142 }
143
144 let key_field = &fields[0];
145 let value_field = &fields[1];
146
147 let key_result = {
148 visitor.before_map_key(key_field)?;
149 let ret = visit_type(key_field.data_type(), visitor)?;
150 visitor.after_map_key(key_field)?;
151 ret
152 };
153
154 let value_result = {
155 visitor.before_map_value(value_field)?;
156 let ret = visit_type(value_field.data_type(), visitor)?;
157 visitor.after_map_value(value_field)?;
158 ret
159 };
160
161 visitor.map(r#type, key_result, value_result)
162 }
163 _ => Err(Error::new(
164 ErrorKind::DataInvalid,
165 "Map field must have struct type",
166 )),
167 },
168 DataType::Struct(fields) => visit_struct(fields, visitor),
169 DataType::Dictionary(_key_type, value_type) => visit_type(value_type, visitor),
170 other => Err(Error::new(
171 ErrorKind::DataInvalid,
172 format!("Cannot visit Arrow data type: {other}"),
173 )),
174 }
175}
176
177fn visit_list<V: ArrowSchemaVisitor>(
179 data_type: &DataType,
180 element_field: &Field,
181 visitor: &mut V,
182) -> Result<V::T> {
183 visitor.before_list_element(element_field)?;
184 let value = visit_type(element_field.data_type(), visitor)?;
185 visitor.after_list_element(element_field)?;
186 visitor.list(data_type, value)
187}
188
189fn visit_struct<V: ArrowSchemaVisitor>(fields: &Fields, visitor: &mut V) -> Result<V::T> {
191 let mut results = Vec::with_capacity(fields.len());
192 for field in fields {
193 visitor.before_field(field)?;
194 let result = visit_type(field.data_type(), visitor)?;
195 visitor.after_field(field)?;
196 results.push(result);
197 }
198
199 visitor.r#struct(fields, results)
200}
201
202fn visit_schema<V: ArrowSchemaVisitor>(schema: &ArrowSchema, visitor: &mut V) -> Result<V::U> {
204 let mut results = Vec::with_capacity(schema.fields().len());
205 for field in schema.fields() {
206 visitor.before_field(field)?;
207 let result = visit_type(field.data_type(), visitor)?;
208 visitor.after_field(field)?;
209 results.push(result);
210 }
211 visitor.schema(schema, results)
212}
213
214pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result<Schema> {
220 let mut visitor = ArrowSchemaConverter::new();
221 visit_schema(schema, &mut visitor)
222}
223
224pub fn arrow_type_to_type(ty: &DataType) -> Result<Type> {
226 let mut visitor = ArrowSchemaConverter::new();
227 visit_type(ty, &mut visitor)
228}
229
230const ARROW_FIELD_DOC_KEY: &str = "doc";
231
232pub(super) fn get_field_id(field: &Field) -> Result<i32> {
233 if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
234 return value.parse::<i32>().map_err(|e| {
235 Error::new(
236 ErrorKind::DataInvalid,
237 "Failed to parse field id".to_string(),
238 )
239 .with_context("value", value)
240 .with_source(e)
241 });
242 }
243 Err(Error::new(
244 ErrorKind::DataInvalid,
245 "Field id not found in metadata",
246 ))
247}
248
249fn get_field_doc(field: &Field) -> Option<String> {
250 if let Some(value) = field.metadata().get(ARROW_FIELD_DOC_KEY) {
251 return Some(value.clone());
252 }
253 None
254}
255
256struct ArrowSchemaConverter;
257
258impl ArrowSchemaConverter {
259 fn new() -> Self {
260 Self {}
261 }
262
263 fn convert_fields(fields: &Fields, field_results: &[Type]) -> Result<Vec<NestedFieldRef>> {
264 let mut results = Vec::with_capacity(fields.len());
265 for i in 0..fields.len() {
266 let field = &fields[i];
267 let field_type = &field_results[i];
268 let id = get_field_id(field)?;
269 let doc = get_field_doc(field);
270 let nested_field = NestedField {
271 id,
272 doc,
273 name: field.name().clone(),
274 required: !field.is_nullable(),
275 field_type: Box::new(field_type.clone()),
276 initial_default: None,
277 write_default: None,
278 };
279 results.push(Arc::new(nested_field));
280 }
281 Ok(results)
282 }
283}
284
285impl ArrowSchemaVisitor for ArrowSchemaConverter {
286 type T = Type;
287 type U = Schema;
288
289 fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U> {
290 let fields = Self::convert_fields(schema.fields(), &values)?;
291 let builder = Schema::builder().with_fields(fields);
292 builder.build()
293 }
294
295 fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> Result<Self::T> {
296 let fields = Self::convert_fields(fields, &results)?;
297 Ok(Type::Struct(StructType::new(fields)))
298 }
299
300 fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T> {
301 let element_field = match list {
302 DataType::List(element_field) => element_field,
303 DataType::LargeList(element_field) => element_field,
304 DataType::FixedSizeList(element_field, _) => element_field,
305 _ => {
306 return Err(Error::new(
307 ErrorKind::DataInvalid,
308 "List type must have list data type",
309 ));
310 }
311 };
312
313 let id = get_field_id(element_field)?;
314 let doc = get_field_doc(element_field);
315 let mut element_field =
316 NestedField::list_element(id, value.clone(), !element_field.is_nullable());
317 if let Some(doc) = doc {
318 element_field = element_field.with_doc(doc);
319 }
320 let element_field = Arc::new(element_field);
321 Ok(Type::List(ListType { element_field }))
322 }
323
324 fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result<Self::T> {
325 match map {
326 DataType::Map(field, _) => match field.data_type() {
327 DataType::Struct(fields) => {
328 if fields.len() != 2 {
329 return Err(Error::new(
330 ErrorKind::DataInvalid,
331 "Map field must have exactly 2 fields",
332 ));
333 }
334
335 let key_field = &fields[0];
336 let value_field = &fields[1];
337
338 let key_id = get_field_id(key_field)?;
339 let key_doc = get_field_doc(key_field);
340 let mut key_field = NestedField::map_key_element(key_id, key_value.clone());
341 if let Some(doc) = key_doc {
342 key_field = key_field.with_doc(doc);
343 }
344 let key_field = Arc::new(key_field);
345
346 let value_id = get_field_id(value_field)?;
347 let value_doc = get_field_doc(value_field);
348 let mut value_field = NestedField::map_value_element(
349 value_id,
350 value.clone(),
351 !value_field.is_nullable(),
352 );
353 if let Some(doc) = value_doc {
354 value_field = value_field.with_doc(doc);
355 }
356 let value_field = Arc::new(value_field);
357
358 Ok(Type::Map(MapType {
359 key_field,
360 value_field,
361 }))
362 }
363 _ => Err(Error::new(
364 ErrorKind::DataInvalid,
365 "Map field must have struct type",
366 )),
367 },
368 _ => Err(Error::new(
369 ErrorKind::DataInvalid,
370 "Map type must have map data type",
371 )),
372 }
373 }
374
375 fn primitive(&mut self, p: &DataType) -> Result<Self::T> {
376 match p {
377 DataType::Boolean => Ok(Type::Primitive(PrimitiveType::Boolean)),
378 DataType::Int8 | DataType::Int16 | DataType::Int32 => {
379 Ok(Type::Primitive(PrimitiveType::Int))
380 }
381 DataType::UInt8 | DataType::UInt16 => Ok(Type::Primitive(PrimitiveType::Int)),
382 DataType::UInt32 => Ok(Type::Primitive(PrimitiveType::Long)),
383 DataType::Int64 => Ok(Type::Primitive(PrimitiveType::Long)),
384 DataType::UInt64 => {
385 Err(Error::new(
387 ErrorKind::DataInvalid,
388 "UInt64 is not supported. Use Int64 for values ≤ 9,223,372,036,854,775,807 or Decimal(20,0) for full uint64 range.",
389 ))
390 }
391 DataType::Float32 => Ok(Type::Primitive(PrimitiveType::Float)),
392 DataType::Float64 => Ok(Type::Primitive(PrimitiveType::Double)),
393 DataType::Decimal128(p, s) => Type::decimal(*p as u32, *s as u32).map_err(|e| {
394 Error::new(
395 ErrorKind::DataInvalid,
396 "Failed to create decimal type".to_string(),
397 )
398 .with_source(e)
399 }),
400 DataType::Date32 => Ok(Type::Primitive(PrimitiveType::Date)),
401 DataType::Time64(unit) if unit == &TimeUnit::Microsecond => {
402 Ok(Type::Primitive(PrimitiveType::Time))
403 }
404 DataType::Timestamp(unit, None) if unit == &TimeUnit::Microsecond => {
405 Ok(Type::Primitive(PrimitiveType::Timestamp))
406 }
407 DataType::Timestamp(unit, None) if unit == &TimeUnit::Nanosecond => {
408 Ok(Type::Primitive(PrimitiveType::TimestampNs))
409 }
410 DataType::Timestamp(unit, Some(zone))
411 if unit == &TimeUnit::Microsecond
412 && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") =>
413 {
414 Ok(Type::Primitive(PrimitiveType::Timestamptz))
415 }
416 DataType::Timestamp(unit, Some(zone))
417 if unit == &TimeUnit::Nanosecond
418 && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") =>
419 {
420 Ok(Type::Primitive(PrimitiveType::TimestamptzNs))
421 }
422 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
423 Ok(Type::Primitive(PrimitiveType::Binary))
424 }
425 DataType::FixedSizeBinary(width) => {
426 Ok(Type::Primitive(PrimitiveType::Fixed(*width as u64)))
427 }
428 DataType::Utf8View | DataType::Utf8 | DataType::LargeUtf8 => {
429 Ok(Type::Primitive(PrimitiveType::String))
430 }
431 _ => Err(Error::new(
432 ErrorKind::DataInvalid,
433 format!("Unsupported Arrow data type: {p}"),
434 )),
435 }
436 }
437}
438
439struct ToArrowSchemaConverter;
440
441enum ArrowSchemaOrFieldOrType {
442 Schema(ArrowSchema),
443 Field(Field),
444 Type(DataType),
445}
446
447impl SchemaVisitor for ToArrowSchemaConverter {
448 type T = ArrowSchemaOrFieldOrType;
449
450 fn schema(
451 &mut self,
452 _schema: &crate::spec::Schema,
453 value: ArrowSchemaOrFieldOrType,
454 ) -> crate::Result<ArrowSchemaOrFieldOrType> {
455 let struct_type = match value {
456 ArrowSchemaOrFieldOrType::Type(DataType::Struct(fields)) => fields,
457 _ => unreachable!(),
458 };
459 Ok(ArrowSchemaOrFieldOrType::Schema(ArrowSchema::new(
460 struct_type,
461 )))
462 }
463
464 fn field(
465 &mut self,
466 field: &crate::spec::NestedFieldRef,
467 value: ArrowSchemaOrFieldOrType,
468 ) -> crate::Result<ArrowSchemaOrFieldOrType> {
469 let ty = match value {
470 ArrowSchemaOrFieldOrType::Type(ty) => ty,
471 _ => unreachable!(),
472 };
473 let metadata = if let Some(doc) = &field.doc {
474 HashMap::from([
475 (PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string()),
476 (ARROW_FIELD_DOC_KEY.to_string(), doc.clone()),
477 ])
478 } else {
479 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string())])
480 };
481 Ok(ArrowSchemaOrFieldOrType::Field(
482 Field::new(field.name.clone(), ty, !field.required).with_metadata(metadata),
483 ))
484 }
485
486 fn r#struct(
487 &mut self,
488 _: &crate::spec::StructType,
489 results: Vec<ArrowSchemaOrFieldOrType>,
490 ) -> crate::Result<ArrowSchemaOrFieldOrType> {
491 let fields = results
492 .into_iter()
493 .map(|result| match result {
494 ArrowSchemaOrFieldOrType::Field(field) => field,
495 _ => unreachable!(),
496 })
497 .collect();
498 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Struct(fields)))
499 }
500
501 fn list(
502 &mut self,
503 list: &crate::spec::ListType,
504 value: ArrowSchemaOrFieldOrType,
505 ) -> crate::Result<Self::T> {
506 let field = match self.field(&list.element_field, value)? {
507 ArrowSchemaOrFieldOrType::Field(field) => field,
508 _ => unreachable!(),
509 };
510 let meta = if let Some(doc) = &list.element_field.doc {
511 HashMap::from([
512 (
513 PARQUET_FIELD_ID_META_KEY.to_string(),
514 list.element_field.id.to_string(),
515 ),
516 (ARROW_FIELD_DOC_KEY.to_string(), doc.clone()),
517 ])
518 } else {
519 HashMap::from([(
520 PARQUET_FIELD_ID_META_KEY.to_string(),
521 list.element_field.id.to_string(),
522 )])
523 };
524 let field = field.with_metadata(meta);
525 Ok(ArrowSchemaOrFieldOrType::Type(DataType::List(Arc::new(
526 field,
527 ))))
528 }
529
530 fn map(
531 &mut self,
532 map: &crate::spec::MapType,
533 key_value: ArrowSchemaOrFieldOrType,
534 value: ArrowSchemaOrFieldOrType,
535 ) -> crate::Result<ArrowSchemaOrFieldOrType> {
536 let key_field = match self.field(&map.key_field, key_value)? {
537 ArrowSchemaOrFieldOrType::Field(field) => field,
538 _ => unreachable!(),
539 };
540 let value_field = match self.field(&map.value_field, value)? {
541 ArrowSchemaOrFieldOrType::Field(field) => field,
542 _ => unreachable!(),
543 };
544 let field = Field::new(
545 DEFAULT_MAP_FIELD_NAME,
546 DataType::Struct(vec![key_field, value_field].into()),
547 false,
549 );
550
551 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Map(
552 field.into(),
553 false,
554 )))
555 }
556
557 fn primitive(
558 &mut self,
559 p: &crate::spec::PrimitiveType,
560 ) -> crate::Result<ArrowSchemaOrFieldOrType> {
561 match p {
562 crate::spec::PrimitiveType::Boolean => {
563 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Boolean))
564 }
565 crate::spec::PrimitiveType::Int => Ok(ArrowSchemaOrFieldOrType::Type(DataType::Int32)),
566 crate::spec::PrimitiveType::Long => Ok(ArrowSchemaOrFieldOrType::Type(DataType::Int64)),
567 crate::spec::PrimitiveType::Float => {
568 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Float32))
569 }
570 crate::spec::PrimitiveType::Double => {
571 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Float64))
572 }
573 crate::spec::PrimitiveType::Decimal { precision, scale } => {
574 let (precision, scale) = {
575 let precision: u8 = precision.to_owned().try_into().map_err(|err| {
576 Error::new(
577 crate::ErrorKind::DataInvalid,
578 "incompatible precision for decimal type convert",
579 )
580 .with_source(err)
581 })?;
582 let scale = scale.to_owned().try_into().map_err(|err| {
583 Error::new(
584 crate::ErrorKind::DataInvalid,
585 "incompatible scale for decimal type convert",
586 )
587 .with_source(err)
588 })?;
589 (precision, scale)
590 };
591 validate_decimal_precision_and_scale::<Decimal128Type>(precision, scale).map_err(
592 |err| {
593 Error::new(
594 crate::ErrorKind::DataInvalid,
595 "incompatible precision and scale for decimal type convert",
596 )
597 .with_source(err)
598 },
599 )?;
600 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Decimal128(
601 precision, scale,
602 )))
603 }
604 crate::spec::PrimitiveType::Date => {
605 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Date32))
606 }
607 crate::spec::PrimitiveType::Time => Ok(ArrowSchemaOrFieldOrType::Type(
608 DataType::Time64(TimeUnit::Microsecond),
609 )),
610 crate::spec::PrimitiveType::Timestamp => Ok(ArrowSchemaOrFieldOrType::Type(
611 DataType::Timestamp(TimeUnit::Microsecond, None),
612 )),
613 crate::spec::PrimitiveType::Timestamptz => Ok(ArrowSchemaOrFieldOrType::Type(
614 DataType::Timestamp(TimeUnit::Microsecond, Some(UTC_TIME_ZONE.into())),
616 )),
617 crate::spec::PrimitiveType::TimestampNs => Ok(ArrowSchemaOrFieldOrType::Type(
618 DataType::Timestamp(TimeUnit::Nanosecond, None),
619 )),
620 crate::spec::PrimitiveType::TimestamptzNs => Ok(ArrowSchemaOrFieldOrType::Type(
621 DataType::Timestamp(TimeUnit::Nanosecond, Some(UTC_TIME_ZONE.into())),
623 )),
624 crate::spec::PrimitiveType::String => {
625 Ok(ArrowSchemaOrFieldOrType::Type(DataType::Utf8))
626 }
627 crate::spec::PrimitiveType::Uuid => Ok(ArrowSchemaOrFieldOrType::Type(
628 DataType::FixedSizeBinary(16),
629 )),
630 crate::spec::PrimitiveType::Fixed(len) => Ok(ArrowSchemaOrFieldOrType::Type(
631 len.to_i32()
632 .map(DataType::FixedSizeBinary)
633 .unwrap_or(DataType::LargeBinary),
634 )),
635 crate::spec::PrimitiveType::Binary => {
636 Ok(ArrowSchemaOrFieldOrType::Type(DataType::LargeBinary))
637 }
638 }
639 }
640}
641
642pub fn schema_to_arrow_schema(schema: &crate::spec::Schema) -> crate::Result<ArrowSchema> {
644 let mut converter = ToArrowSchemaConverter;
645 match crate::spec::visit_schema(schema, &mut converter)? {
646 ArrowSchemaOrFieldOrType::Schema(schema) => Ok(schema),
647 _ => unreachable!(),
648 }
649}
650
651pub fn type_to_arrow_type(ty: &crate::spec::Type) -> crate::Result<DataType> {
653 let mut converter = ToArrowSchemaConverter;
654 match crate::spec::visit_type(ty, &mut converter)? {
655 ArrowSchemaOrFieldOrType::Type(ty) => Ok(ty),
656 _ => unreachable!(),
657 }
658}
659
660pub(crate) fn get_arrow_datum(datum: &Datum) -> Result<Arc<dyn ArrowDatum + Send + Sync>> {
662 match (datum.data_type(), datum.literal()) {
663 (PrimitiveType::Boolean, PrimitiveLiteral::Boolean(value)) => {
664 Ok(Arc::new(BooleanArray::new_scalar(*value)))
665 }
666 (PrimitiveType::Int, PrimitiveLiteral::Int(value)) => {
667 Ok(Arc::new(Int32Array::new_scalar(*value)))
668 }
669 (PrimitiveType::Long, PrimitiveLiteral::Long(value)) => {
670 Ok(Arc::new(Int64Array::new_scalar(*value)))
671 }
672 (PrimitiveType::Float, PrimitiveLiteral::Float(value)) => {
673 Ok(Arc::new(Float32Array::new_scalar(value.to_f32().unwrap())))
674 }
675 (PrimitiveType::Double, PrimitiveLiteral::Double(value)) => {
676 Ok(Arc::new(Float64Array::new_scalar(value.to_f64().unwrap())))
677 }
678 (PrimitiveType::String, PrimitiveLiteral::String(value)) => {
679 Ok(Arc::new(StringArray::new_scalar(value.as_str())))
680 }
681 (PrimitiveType::Binary, PrimitiveLiteral::Binary(value)) => {
682 Ok(Arc::new(BinaryArray::new_scalar(value.as_slice())))
683 }
684 (PrimitiveType::Date, PrimitiveLiteral::Int(value)) => {
685 Ok(Arc::new(Date32Array::new_scalar(*value)))
686 }
687 (PrimitiveType::Timestamp, PrimitiveLiteral::Long(value)) => {
688 Ok(Arc::new(TimestampMicrosecondArray::new_scalar(*value)))
689 }
690 (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(value)) => Ok(Arc::new(Scalar::new(
691 TimestampMicrosecondArray::new(vec![*value; 1].into(), None).with_timezone_utc(),
692 ))),
693 (PrimitiveType::Decimal { precision, scale }, PrimitiveLiteral::Int128(value)) => {
694 let array = Decimal128Array::from_value(*value, 1)
695 .with_precision_and_scale(*precision as _, *scale as _)
696 .unwrap();
697 Ok(Arc::new(Scalar::new(array)))
698 }
699 (PrimitiveType::Uuid, PrimitiveLiteral::UInt128(value)) => {
700 let bytes = Uuid::from_u128(*value).into_bytes();
701 let array = FixedSizeBinaryArray::try_from_iter(vec![bytes].into_iter()).unwrap();
702 Ok(Arc::new(Scalar::new(array)))
703 }
704
705 (primitive_type, _) => Err(Error::new(
706 ErrorKind::FeatureUnsupported,
707 format!("Converting datum from type {primitive_type:?} to arrow not supported yet."),
708 )),
709 }
710}
711
712pub(crate) fn get_parquet_stat_min_as_datum(
713 primitive_type: &PrimitiveType,
714 stats: &Statistics,
715) -> Result<Option<Datum>> {
716 Ok(match (primitive_type, stats) {
717 (PrimitiveType::Boolean, Statistics::Boolean(stats)) => {
718 stats.min_opt().map(|val| Datum::bool(*val))
719 }
720 (PrimitiveType::Int, Statistics::Int32(stats)) => {
721 stats.min_opt().map(|val| Datum::int(*val))
722 }
723 (PrimitiveType::Date, Statistics::Int32(stats)) => {
724 stats.min_opt().map(|val| Datum::date(*val))
725 }
726 (PrimitiveType::Long, Statistics::Int64(stats)) => {
727 stats.min_opt().map(|val| Datum::long(*val))
728 }
729 (PrimitiveType::Time, Statistics::Int64(stats)) => {
730 let Some(val) = stats.min_opt() else {
731 return Ok(None);
732 };
733
734 Some(Datum::time_micros(*val)?)
735 }
736 (PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
737 stats.min_opt().map(|val| Datum::timestamp_micros(*val))
738 }
739 (PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
740 stats.min_opt().map(|val| Datum::timestamptz_micros(*val))
741 }
742 (PrimitiveType::TimestampNs, Statistics::Int64(stats)) => {
743 stats.min_opt().map(|val| Datum::timestamp_nanos(*val))
744 }
745 (PrimitiveType::TimestamptzNs, Statistics::Int64(stats)) => {
746 stats.min_opt().map(|val| Datum::timestamptz_nanos(*val))
747 }
748 (PrimitiveType::Float, Statistics::Float(stats)) => {
749 stats.min_opt().map(|val| Datum::float(*val))
750 }
751 (PrimitiveType::Double, Statistics::Double(stats)) => {
752 stats.min_opt().map(|val| Datum::double(*val))
753 }
754 (PrimitiveType::String, Statistics::ByteArray(stats)) => {
755 let Some(val) = stats.min_opt() else {
756 return Ok(None);
757 };
758
759 Some(Datum::string(val.as_utf8()?))
760 }
761 (
762 PrimitiveType::Decimal {
763 precision: _,
764 scale: _,
765 },
766 Statistics::ByteArray(stats),
767 ) => {
768 let Some(bytes) = stats.min_bytes_opt() else {
769 return Ok(None);
770 };
771 Some(Datum::new(
772 primitive_type.clone(),
773 PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
774 ))
775 }
776 (
777 PrimitiveType::Decimal {
778 precision: _,
779 scale: _,
780 },
781 Statistics::FixedLenByteArray(stats),
782 ) => {
783 let Some(bytes) = stats.min_bytes_opt() else {
784 return Ok(None);
785 };
786 let unscaled_value = BigInt::from_signed_bytes_be(bytes);
787 Some(Datum::new(
788 primitive_type.clone(),
789 PrimitiveLiteral::Int128(unscaled_value.to_i128().ok_or_else(|| {
790 Error::new(
791 ErrorKind::DataInvalid,
792 format!("Can't convert bytes to i128: {bytes:?}"),
793 )
794 })?),
795 ))
796 }
797 (
798 PrimitiveType::Decimal {
799 precision: _,
800 scale: _,
801 },
802 Statistics::Int32(stats),
803 ) => stats.min_opt().map(|val| {
804 Datum::new(
805 primitive_type.clone(),
806 PrimitiveLiteral::Int128(i128::from(*val)),
807 )
808 }),
809
810 (
811 PrimitiveType::Decimal {
812 precision: _,
813 scale: _,
814 },
815 Statistics::Int64(stats),
816 ) => stats.min_opt().map(|val| {
817 Datum::new(
818 primitive_type.clone(),
819 PrimitiveLiteral::Int128(i128::from(*val)),
820 )
821 }),
822 (PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) => {
823 let Some(bytes) = stats.min_bytes_opt() else {
824 return Ok(None);
825 };
826 if bytes.len() != 16 {
827 return Err(Error::new(
828 ErrorKind::Unexpected,
829 "Invalid length of uuid bytes.",
830 ));
831 }
832 Some(Datum::uuid(Uuid::from_bytes(
833 bytes[..16].try_into().unwrap(),
834 )))
835 }
836 (PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => {
837 let Some(bytes) = stat.min_bytes_opt() else {
838 return Ok(None);
839 };
840 if bytes.len() != *len as usize {
841 return Err(Error::new(
842 ErrorKind::Unexpected,
843 "Invalid length of fixed bytes.",
844 ));
845 }
846 Some(Datum::fixed(bytes.to_vec()))
847 }
848 (PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
849 return Ok(stat
850 .min_bytes_opt()
851 .map(|bytes| Datum::binary(bytes.to_vec())));
852 }
853 _ => {
854 return Ok(None);
855 }
856 })
857}
858
859pub(crate) fn get_parquet_stat_max_as_datum(
860 primitive_type: &PrimitiveType,
861 stats: &Statistics,
862) -> Result<Option<Datum>> {
863 Ok(match (primitive_type, stats) {
864 (PrimitiveType::Boolean, Statistics::Boolean(stats)) => {
865 stats.max_opt().map(|val| Datum::bool(*val))
866 }
867 (PrimitiveType::Int, Statistics::Int32(stats)) => {
868 stats.max_opt().map(|val| Datum::int(*val))
869 }
870 (PrimitiveType::Date, Statistics::Int32(stats)) => {
871 stats.max_opt().map(|val| Datum::date(*val))
872 }
873 (PrimitiveType::Long, Statistics::Int64(stats)) => {
874 stats.max_opt().map(|val| Datum::long(*val))
875 }
876 (PrimitiveType::Time, Statistics::Int64(stats)) => {
877 let Some(val) = stats.max_opt() else {
878 return Ok(None);
879 };
880
881 Some(Datum::time_micros(*val)?)
882 }
883 (PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
884 stats.max_opt().map(|val| Datum::timestamp_micros(*val))
885 }
886 (PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
887 stats.max_opt().map(|val| Datum::timestamptz_micros(*val))
888 }
889 (PrimitiveType::TimestampNs, Statistics::Int64(stats)) => {
890 stats.max_opt().map(|val| Datum::timestamp_nanos(*val))
891 }
892 (PrimitiveType::TimestamptzNs, Statistics::Int64(stats)) => {
893 stats.max_opt().map(|val| Datum::timestamptz_nanos(*val))
894 }
895 (PrimitiveType::Float, Statistics::Float(stats)) => {
896 stats.max_opt().map(|val| Datum::float(*val))
897 }
898 (PrimitiveType::Double, Statistics::Double(stats)) => {
899 stats.max_opt().map(|val| Datum::double(*val))
900 }
901 (PrimitiveType::String, Statistics::ByteArray(stats)) => {
902 let Some(val) = stats.max_opt() else {
903 return Ok(None);
904 };
905
906 Some(Datum::string(val.as_utf8()?))
907 }
908 (
909 PrimitiveType::Decimal {
910 precision: _,
911 scale: _,
912 },
913 Statistics::ByteArray(stats),
914 ) => {
915 let Some(bytes) = stats.max_bytes_opt() else {
916 return Ok(None);
917 };
918 Some(Datum::new(
919 primitive_type.clone(),
920 PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
921 ))
922 }
923 (
924 PrimitiveType::Decimal {
925 precision: _,
926 scale: _,
927 },
928 Statistics::FixedLenByteArray(stats),
929 ) => {
930 let Some(bytes) = stats.max_bytes_opt() else {
931 return Ok(None);
932 };
933 let unscaled_value = BigInt::from_signed_bytes_be(bytes);
934 Some(Datum::new(
935 primitive_type.clone(),
936 PrimitiveLiteral::Int128(unscaled_value.to_i128().ok_or_else(|| {
937 Error::new(
938 ErrorKind::DataInvalid,
939 format!("Can't convert bytes to i128: {bytes:?}"),
940 )
941 })?),
942 ))
943 }
944 (
945 PrimitiveType::Decimal {
946 precision: _,
947 scale: _,
948 },
949 Statistics::Int32(stats),
950 ) => stats.max_opt().map(|val| {
951 Datum::new(
952 primitive_type.clone(),
953 PrimitiveLiteral::Int128(i128::from(*val)),
954 )
955 }),
956
957 (
958 PrimitiveType::Decimal {
959 precision: _,
960 scale: _,
961 },
962 Statistics::Int64(stats),
963 ) => stats.max_opt().map(|val| {
964 Datum::new(
965 primitive_type.clone(),
966 PrimitiveLiteral::Int128(i128::from(*val)),
967 )
968 }),
969 (PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) => {
970 let Some(bytes) = stats.max_bytes_opt() else {
971 return Ok(None);
972 };
973 if bytes.len() != 16 {
974 return Err(Error::new(
975 ErrorKind::Unexpected,
976 "Invalid length of uuid bytes.",
977 ));
978 }
979 Some(Datum::uuid(Uuid::from_bytes(
980 bytes[..16].try_into().unwrap(),
981 )))
982 }
983 (PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => {
984 let Some(bytes) = stat.max_bytes_opt() else {
985 return Ok(None);
986 };
987 if bytes.len() != *len as usize {
988 return Err(Error::new(
989 ErrorKind::Unexpected,
990 "Invalid length of fixed bytes.",
991 ));
992 }
993 Some(Datum::fixed(bytes.to_vec()))
994 }
995 (PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
996 return Ok(stat
997 .max_bytes_opt()
998 .map(|bytes| Datum::binary(bytes.to_vec())));
999 }
1000 _ => {
1001 return Ok(None);
1002 }
1003 })
1004}
1005
1006impl TryFrom<&ArrowSchema> for crate::spec::Schema {
1007 type Error = Error;
1008
1009 fn try_from(schema: &ArrowSchema) -> crate::Result<Self> {
1010 arrow_schema_to_schema(schema)
1011 }
1012}
1013
1014impl TryFrom<&crate::spec::Schema> for ArrowSchema {
1015 type Error = Error;
1016
1017 fn try_from(schema: &crate::spec::Schema) -> crate::Result<Self> {
1018 schema_to_arrow_schema(schema)
1019 }
1020}
1021
1022#[cfg(test)]
1023mod tests {
1024 use std::collections::HashMap;
1025 use std::sync::Arc;
1026
1027 use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1028 use rust_decimal::Decimal;
1029
1030 use super::*;
1031 use crate::spec::{Literal, Schema};
1032
1033 fn simple_field(name: &str, ty: DataType, nullable: bool, value: &str) -> Field {
1035 Field::new(name, ty, nullable).with_metadata(HashMap::from([(
1036 PARQUET_FIELD_ID_META_KEY.to_string(),
1037 value.to_string(),
1038 )]))
1039 }
1040
1041 fn arrow_schema_for_arrow_schema_to_schema_test() -> ArrowSchema {
1042 let fields = Fields::from(vec![
1043 simple_field("key", DataType::Int32, false, "28"),
1044 simple_field("value", DataType::Utf8, true, "29"),
1045 ]);
1046
1047 let r#struct = DataType::Struct(fields);
1048 let map = DataType::Map(
1049 Arc::new(simple_field(DEFAULT_MAP_FIELD_NAME, r#struct, false, "17")),
1050 false,
1051 );
1052 let dictionary = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1053
1054 let fields = Fields::from(vec![
1055 simple_field("aa", DataType::Int32, false, "18"),
1056 simple_field("bb", DataType::Utf8, true, "19"),
1057 simple_field(
1058 "cc",
1059 DataType::Timestamp(TimeUnit::Microsecond, None),
1060 false,
1061 "20",
1062 ),
1063 ]);
1064
1065 let r#struct = DataType::Struct(fields);
1066
1067 ArrowSchema::new(vec![
1068 simple_field("a", DataType::Int32, false, "2"),
1069 simple_field("b", DataType::Int64, false, "1"),
1070 simple_field("c", DataType::Utf8, false, "3"),
1071 simple_field("n", DataType::Utf8, false, "21"),
1072 simple_field(
1073 "d",
1074 DataType::Timestamp(TimeUnit::Microsecond, None),
1075 true,
1076 "4",
1077 ),
1078 simple_field("e", DataType::Boolean, true, "6"),
1079 simple_field("f", DataType::Float32, false, "5"),
1080 simple_field("g", DataType::Float64, false, "7"),
1081 simple_field("p", DataType::Decimal128(10, 2), false, "27"),
1082 simple_field("h", DataType::Date32, false, "8"),
1083 simple_field("i", DataType::Time64(TimeUnit::Microsecond), false, "9"),
1084 simple_field(
1085 "j",
1086 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1087 false,
1088 "10",
1089 ),
1090 simple_field(
1091 "k",
1092 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1093 false,
1094 "12",
1095 ),
1096 simple_field("l", DataType::Binary, false, "13"),
1097 simple_field("o", DataType::LargeBinary, false, "22"),
1098 simple_field("m", DataType::FixedSizeBinary(10), false, "11"),
1099 simple_field(
1100 "list",
1101 DataType::List(Arc::new(simple_field(
1102 "element",
1103 DataType::Int32,
1104 false,
1105 "15",
1106 ))),
1107 true,
1108 "14",
1109 ),
1110 simple_field(
1111 "large_list",
1112 DataType::LargeList(Arc::new(simple_field(
1113 "element",
1114 DataType::Utf8,
1115 false,
1116 "23",
1117 ))),
1118 true,
1119 "24",
1120 ),
1121 simple_field(
1122 "fixed_list",
1123 DataType::FixedSizeList(
1124 Arc::new(simple_field("element", DataType::Binary, false, "26")),
1125 10,
1126 ),
1127 true,
1128 "25",
1129 ),
1130 simple_field("map", map, false, "16"),
1131 simple_field("struct", r#struct, false, "17"),
1132 simple_field("dictionary", dictionary, false, "30"),
1133 ])
1134 }
1135
1136 fn iceberg_schema_for_arrow_schema_to_schema_test() -> Schema {
1137 let schema_json = r#"{
1138 "type":"struct",
1139 "schema-id":0,
1140 "fields":[
1141 {
1142 "id":2,
1143 "name":"a",
1144 "required":true,
1145 "type":"int"
1146 },
1147 {
1148 "id":1,
1149 "name":"b",
1150 "required":true,
1151 "type":"long"
1152 },
1153 {
1154 "id":3,
1155 "name":"c",
1156 "required":true,
1157 "type":"string"
1158 },
1159 {
1160 "id":21,
1161 "name":"n",
1162 "required":true,
1163 "type":"string"
1164 },
1165 {
1166 "id":4,
1167 "name":"d",
1168 "required":false,
1169 "type":"timestamp"
1170 },
1171 {
1172 "id":6,
1173 "name":"e",
1174 "required":false,
1175 "type":"boolean"
1176 },
1177 {
1178 "id":5,
1179 "name":"f",
1180 "required":true,
1181 "type":"float"
1182 },
1183 {
1184 "id":7,
1185 "name":"g",
1186 "required":true,
1187 "type":"double"
1188 },
1189 {
1190 "id":27,
1191 "name":"p",
1192 "required":true,
1193 "type":"decimal(10,2)"
1194 },
1195 {
1196 "id":8,
1197 "name":"h",
1198 "required":true,
1199 "type":"date"
1200 },
1201 {
1202 "id":9,
1203 "name":"i",
1204 "required":true,
1205 "type":"time"
1206 },
1207 {
1208 "id":10,
1209 "name":"j",
1210 "required":true,
1211 "type":"timestamptz"
1212 },
1213 {
1214 "id":12,
1215 "name":"k",
1216 "required":true,
1217 "type":"timestamptz"
1218 },
1219 {
1220 "id":13,
1221 "name":"l",
1222 "required":true,
1223 "type":"binary"
1224 },
1225 {
1226 "id":22,
1227 "name":"o",
1228 "required":true,
1229 "type":"binary"
1230 },
1231 {
1232 "id":11,
1233 "name":"m",
1234 "required":true,
1235 "type":"fixed[10]"
1236 },
1237 {
1238 "id":14,
1239 "name":"list",
1240 "required": false,
1241 "type": {
1242 "type": "list",
1243 "element-id": 15,
1244 "element-required": true,
1245 "element": "int"
1246 }
1247 },
1248 {
1249 "id":24,
1250 "name":"large_list",
1251 "required": false,
1252 "type": {
1253 "type": "list",
1254 "element-id": 23,
1255 "element-required": true,
1256 "element": "string"
1257 }
1258 },
1259 {
1260 "id":25,
1261 "name":"fixed_list",
1262 "required": false,
1263 "type": {
1264 "type": "list",
1265 "element-id": 26,
1266 "element-required": true,
1267 "element": "binary"
1268 }
1269 },
1270 {
1271 "id":16,
1272 "name":"map",
1273 "required": true,
1274 "type": {
1275 "type": "map",
1276 "key-id": 28,
1277 "key": "int",
1278 "value-id": 29,
1279 "value-required": false,
1280 "value": "string"
1281 }
1282 },
1283 {
1284 "id":17,
1285 "name":"struct",
1286 "required": true,
1287 "type": {
1288 "type": "struct",
1289 "fields": [
1290 {
1291 "id":18,
1292 "name":"aa",
1293 "required":true,
1294 "type":"int"
1295 },
1296 {
1297 "id":19,
1298 "name":"bb",
1299 "required":false,
1300 "type":"string"
1301 },
1302 {
1303 "id":20,
1304 "name":"cc",
1305 "required":true,
1306 "type":"timestamp"
1307 }
1308 ]
1309 }
1310 },
1311 {
1312 "id":30,
1313 "name":"dictionary",
1314 "required":true,
1315 "type":"string"
1316 }
1317 ],
1318 "identifier-field-ids":[]
1319 }"#;
1320
1321 let schema: Schema = serde_json::from_str(schema_json).unwrap();
1322 schema
1323 }
1324
1325 #[test]
1326 fn test_arrow_schema_to_schema() {
1327 let arrow_schema = arrow_schema_for_arrow_schema_to_schema_test();
1328 let schema = iceberg_schema_for_arrow_schema_to_schema_test();
1329 let converted_schema = arrow_schema_to_schema(&arrow_schema).unwrap();
1330 pretty_assertions::assert_eq!(converted_schema, schema);
1331 }
1332
1333 fn arrow_schema_for_schema_to_arrow_schema_test() -> ArrowSchema {
1334 let fields = Fields::from(vec![
1335 simple_field("key", DataType::Int32, false, "28"),
1336 simple_field("value", DataType::Utf8, true, "29"),
1337 ]);
1338
1339 let r#struct = DataType::Struct(fields);
1340 let map = DataType::Map(
1341 Arc::new(Field::new(DEFAULT_MAP_FIELD_NAME, r#struct, false)),
1342 false,
1343 );
1344
1345 let fields = Fields::from(vec![
1346 simple_field("aa", DataType::Int32, false, "18"),
1347 simple_field("bb", DataType::Utf8, true, "19"),
1348 simple_field(
1349 "cc",
1350 DataType::Timestamp(TimeUnit::Microsecond, None),
1351 false,
1352 "20",
1353 ),
1354 ]);
1355
1356 let r#struct = DataType::Struct(fields);
1357
1358 ArrowSchema::new(vec![
1359 simple_field("a", DataType::Int32, false, "2"),
1360 simple_field("b", DataType::Int64, false, "1"),
1361 simple_field("c", DataType::Utf8, false, "3"),
1362 simple_field("n", DataType::Utf8, false, "21"),
1363 simple_field(
1364 "d",
1365 DataType::Timestamp(TimeUnit::Microsecond, None),
1366 true,
1367 "4",
1368 ),
1369 simple_field("e", DataType::Boolean, true, "6"),
1370 simple_field("f", DataType::Float32, false, "5"),
1371 simple_field("g", DataType::Float64, false, "7"),
1372 simple_field("p", DataType::Decimal128(10, 2), false, "27"),
1373 simple_field("h", DataType::Date32, false, "8"),
1374 simple_field("i", DataType::Time64(TimeUnit::Microsecond), false, "9"),
1375 simple_field(
1376 "j",
1377 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1378 false,
1379 "10",
1380 ),
1381 simple_field(
1382 "k",
1383 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1384 false,
1385 "12",
1386 ),
1387 simple_field("l", DataType::LargeBinary, false, "13"),
1388 simple_field("o", DataType::LargeBinary, false, "22"),
1389 simple_field("m", DataType::FixedSizeBinary(10), false, "11"),
1390 simple_field(
1391 "list",
1392 DataType::List(Arc::new(simple_field(
1393 "element",
1394 DataType::Int32,
1395 false,
1396 "15",
1397 ))),
1398 true,
1399 "14",
1400 ),
1401 simple_field(
1402 "large_list",
1403 DataType::List(Arc::new(simple_field(
1404 "element",
1405 DataType::Utf8,
1406 false,
1407 "23",
1408 ))),
1409 true,
1410 "24",
1411 ),
1412 simple_field(
1413 "fixed_list",
1414 DataType::List(Arc::new(simple_field(
1415 "element",
1416 DataType::LargeBinary,
1417 false,
1418 "26",
1419 ))),
1420 true,
1421 "25",
1422 ),
1423 simple_field("map", map, false, "16"),
1424 simple_field("struct", r#struct, false, "17"),
1425 simple_field("uuid", DataType::FixedSizeBinary(16), false, "30"),
1426 ])
1427 }
1428
1429 fn iceberg_schema_for_schema_to_arrow_schema() -> Schema {
1430 let schema_json = r#"{
1431 "type":"struct",
1432 "schema-id":0,
1433 "fields":[
1434 {
1435 "id":2,
1436 "name":"a",
1437 "required":true,
1438 "type":"int"
1439 },
1440 {
1441 "id":1,
1442 "name":"b",
1443 "required":true,
1444 "type":"long"
1445 },
1446 {
1447 "id":3,
1448 "name":"c",
1449 "required":true,
1450 "type":"string"
1451 },
1452 {
1453 "id":21,
1454 "name":"n",
1455 "required":true,
1456 "type":"string"
1457 },
1458 {
1459 "id":4,
1460 "name":"d",
1461 "required":false,
1462 "type":"timestamp"
1463 },
1464 {
1465 "id":6,
1466 "name":"e",
1467 "required":false,
1468 "type":"boolean"
1469 },
1470 {
1471 "id":5,
1472 "name":"f",
1473 "required":true,
1474 "type":"float"
1475 },
1476 {
1477 "id":7,
1478 "name":"g",
1479 "required":true,
1480 "type":"double"
1481 },
1482 {
1483 "id":27,
1484 "name":"p",
1485 "required":true,
1486 "type":"decimal(10,2)"
1487 },
1488 {
1489 "id":8,
1490 "name":"h",
1491 "required":true,
1492 "type":"date"
1493 },
1494 {
1495 "id":9,
1496 "name":"i",
1497 "required":true,
1498 "type":"time"
1499 },
1500 {
1501 "id":10,
1502 "name":"j",
1503 "required":true,
1504 "type":"timestamptz"
1505 },
1506 {
1507 "id":12,
1508 "name":"k",
1509 "required":true,
1510 "type":"timestamptz"
1511 },
1512 {
1513 "id":13,
1514 "name":"l",
1515 "required":true,
1516 "type":"binary"
1517 },
1518 {
1519 "id":22,
1520 "name":"o",
1521 "required":true,
1522 "type":"binary"
1523 },
1524 {
1525 "id":11,
1526 "name":"m",
1527 "required":true,
1528 "type":"fixed[10]"
1529 },
1530 {
1531 "id":14,
1532 "name":"list",
1533 "required": false,
1534 "type": {
1535 "type": "list",
1536 "element-id": 15,
1537 "element-required": true,
1538 "element": "int"
1539 }
1540 },
1541 {
1542 "id":24,
1543 "name":"large_list",
1544 "required": false,
1545 "type": {
1546 "type": "list",
1547 "element-id": 23,
1548 "element-required": true,
1549 "element": "string"
1550 }
1551 },
1552 {
1553 "id":25,
1554 "name":"fixed_list",
1555 "required": false,
1556 "type": {
1557 "type": "list",
1558 "element-id": 26,
1559 "element-required": true,
1560 "element": "binary"
1561 }
1562 },
1563 {
1564 "id":16,
1565 "name":"map",
1566 "required": true,
1567 "type": {
1568 "type": "map",
1569 "key-id": 28,
1570 "key": "int",
1571 "value-id": 29,
1572 "value-required": false,
1573 "value": "string"
1574 }
1575 },
1576 {
1577 "id":17,
1578 "name":"struct",
1579 "required": true,
1580 "type": {
1581 "type": "struct",
1582 "fields": [
1583 {
1584 "id":18,
1585 "name":"aa",
1586 "required":true,
1587 "type":"int"
1588 },
1589 {
1590 "id":19,
1591 "name":"bb",
1592 "required":false,
1593 "type":"string"
1594 },
1595 {
1596 "id":20,
1597 "name":"cc",
1598 "required":true,
1599 "type":"timestamp"
1600 }
1601 ]
1602 }
1603 },
1604 {
1605 "id":30,
1606 "name":"uuid",
1607 "required":true,
1608 "type":"uuid"
1609 }
1610 ],
1611 "identifier-field-ids":[]
1612 }"#;
1613
1614 let schema: Schema = serde_json::from_str(schema_json).unwrap();
1615 schema
1616 }
1617
1618 #[test]
1619 fn test_schema_to_arrow_schema() {
1620 let arrow_schema = arrow_schema_for_schema_to_arrow_schema_test();
1621 let schema = iceberg_schema_for_schema_to_arrow_schema();
1622 let converted_arrow_schema = schema_to_arrow_schema(&schema).unwrap();
1623 assert_eq!(converted_arrow_schema, arrow_schema);
1624 }
1625
1626 #[test]
1627 fn test_type_conversion() {
1628 {
1630 let arrow_type = DataType::Int32;
1631 let iceberg_type = Type::Primitive(PrimitiveType::Int);
1632 assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1633 assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1634 }
1635
1636 {
1638 let arrow_type = DataType::Struct(Fields::from(vec![
1640 Field::new("a", DataType::Int64, false),
1641 Field::new("b", DataType::Utf8, true),
1642 ]));
1643 assert_eq!(
1644 &arrow_type_to_type(&arrow_type).unwrap_err().to_string(),
1645 "DataInvalid => Field id not found in metadata"
1646 );
1647
1648 let arrow_type = DataType::Struct(Fields::from(vec![
1649 Field::new("a", DataType::Int64, false).with_metadata(HashMap::from_iter([(
1650 PARQUET_FIELD_ID_META_KEY.to_string(),
1651 1.to_string(),
1652 )])),
1653 Field::new("b", DataType::Utf8, true).with_metadata(HashMap::from_iter([(
1654 PARQUET_FIELD_ID_META_KEY.to_string(),
1655 2.to_string(),
1656 )])),
1657 ]));
1658 let iceberg_type = Type::Struct(StructType::new(vec![
1659 NestedField {
1660 id: 1,
1661 doc: None,
1662 name: "a".to_string(),
1663 required: true,
1664 field_type: Box::new(Type::Primitive(PrimitiveType::Long)),
1665 initial_default: None,
1666 write_default: None,
1667 }
1668 .into(),
1669 NestedField {
1670 id: 2,
1671 doc: None,
1672 name: "b".to_string(),
1673 required: false,
1674 field_type: Box::new(Type::Primitive(PrimitiveType::String)),
1675 initial_default: None,
1676 write_default: None,
1677 }
1678 .into(),
1679 ]));
1680 assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1681 assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1682
1683 let iceberg_type = Type::Struct(StructType::new(vec![
1685 NestedField {
1686 id: 1,
1687 doc: None,
1688 name: "a".to_string(),
1689 required: true,
1690 field_type: Box::new(Type::Primitive(PrimitiveType::Long)),
1691 initial_default: Some(Literal::Primitive(PrimitiveLiteral::Int(114514))),
1692 write_default: None,
1693 }
1694 .into(),
1695 NestedField {
1696 id: 2,
1697 doc: None,
1698 name: "b".to_string(),
1699 required: false,
1700 field_type: Box::new(Type::Primitive(PrimitiveType::String)),
1701 initial_default: None,
1702 write_default: Some(Literal::Primitive(PrimitiveLiteral::String(
1703 "514".to_string(),
1704 ))),
1705 }
1706 .into(),
1707 ]));
1708 assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1709 }
1710
1711 {
1713 let arrow_type =
1714 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int8));
1715 let iceberg_type = Type::Primitive(PrimitiveType::Int);
1716 assert_eq!(
1717 iceberg_type,
1718 arrow_type_to_type(&arrow_type).unwrap(),
1719 "Expected dictionary conversion to use the contained value"
1720 );
1721
1722 let arrow_type =
1723 DataType::Dictionary(Box::new(DataType::Utf8), Box::new(DataType::Boolean));
1724 let iceberg_type = Type::Primitive(PrimitiveType::Boolean);
1725 assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1726 }
1727 }
1728
1729 #[test]
1730 fn test_unsigned_integer_type_conversion() {
1731 let test_cases = vec![
1732 (DataType::UInt8, PrimitiveType::Int),
1733 (DataType::UInt16, PrimitiveType::Int),
1734 (DataType::UInt32, PrimitiveType::Long),
1735 ];
1736
1737 for (arrow_type, expected_iceberg_type) in test_cases {
1738 let arrow_field = Field::new("test", arrow_type.clone(), false).with_metadata(
1739 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1740 );
1741 let arrow_schema = ArrowSchema::new(vec![arrow_field]);
1742
1743 let iceberg_schema = arrow_schema_to_schema(&arrow_schema).unwrap();
1744 let iceberg_field = iceberg_schema.as_struct().fields().first().unwrap();
1745
1746 assert!(
1747 matches!(iceberg_field.field_type.as_ref(), Type::Primitive(t) if *t == expected_iceberg_type),
1748 "Expected {arrow_type:?} to map to {expected_iceberg_type:?}"
1749 );
1750 }
1751
1752 {
1754 let arrow_field = Field::new("test", DataType::UInt64, false).with_metadata(
1755 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1756 );
1757 let arrow_schema = ArrowSchema::new(vec![arrow_field]);
1758
1759 let result = arrow_schema_to_schema(&arrow_schema);
1760 assert!(result.is_err());
1761 assert!(
1762 result
1763 .unwrap_err()
1764 .to_string()
1765 .contains("UInt64 is not supported")
1766 );
1767 }
1768 }
1769
1770 #[test]
1771 fn test_datum_conversion() {
1772 {
1773 let datum = Datum::bool(true);
1774 let arrow_datum = get_arrow_datum(&datum).unwrap();
1775 let (array, is_scalar) = arrow_datum.get();
1776 let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
1777 assert!(is_scalar);
1778 assert!(array.value(0));
1779 }
1780 {
1781 let datum = Datum::int(42);
1782 let arrow_datum = get_arrow_datum(&datum).unwrap();
1783 let (array, is_scalar) = arrow_datum.get();
1784 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
1785 assert!(is_scalar);
1786 assert_eq!(array.value(0), 42);
1787 }
1788 {
1789 let datum = Datum::long(42);
1790 let arrow_datum = get_arrow_datum(&datum).unwrap();
1791 let (array, is_scalar) = arrow_datum.get();
1792 let array = array.as_any().downcast_ref::<Int64Array>().unwrap();
1793 assert!(is_scalar);
1794 assert_eq!(array.value(0), 42);
1795 }
1796 {
1797 let datum = Datum::float(42.42);
1798 let arrow_datum = get_arrow_datum(&datum).unwrap();
1799 let (array, is_scalar) = arrow_datum.get();
1800 let array = array.as_any().downcast_ref::<Float32Array>().unwrap();
1801 assert!(is_scalar);
1802 assert_eq!(array.value(0), 42.42);
1803 }
1804 {
1805 let datum = Datum::double(42.42);
1806 let arrow_datum = get_arrow_datum(&datum).unwrap();
1807 let (array, is_scalar) = arrow_datum.get();
1808 let array = array.as_any().downcast_ref::<Float64Array>().unwrap();
1809 assert!(is_scalar);
1810 assert_eq!(array.value(0), 42.42);
1811 }
1812 {
1813 let datum = Datum::string("abc");
1814 let arrow_datum = get_arrow_datum(&datum).unwrap();
1815 let (array, is_scalar) = arrow_datum.get();
1816 let array = array.as_any().downcast_ref::<StringArray>().unwrap();
1817 assert!(is_scalar);
1818 assert_eq!(array.value(0), "abc");
1819 }
1820 {
1821 let datum = Datum::binary(vec![1, 2, 3, 4]);
1822 let arrow_datum = get_arrow_datum(&datum).unwrap();
1823 let (array, is_scalar) = arrow_datum.get();
1824 let array = array.as_any().downcast_ref::<BinaryArray>().unwrap();
1825 assert!(is_scalar);
1826 assert_eq!(array.value(0), &[1, 2, 3, 4]);
1827 }
1828 {
1829 let datum = Datum::date(42);
1830 let arrow_datum = get_arrow_datum(&datum).unwrap();
1831 let (array, is_scalar) = arrow_datum.get();
1832 let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
1833 assert!(is_scalar);
1834 assert_eq!(array.value(0), 42);
1835 }
1836 {
1837 let datum = Datum::timestamp_micros(42);
1838 let arrow_datum = get_arrow_datum(&datum).unwrap();
1839 let (array, is_scalar) = arrow_datum.get();
1840 let array = array
1841 .as_any()
1842 .downcast_ref::<TimestampMicrosecondArray>()
1843 .unwrap();
1844 assert!(is_scalar);
1845 assert_eq!(array.value(0), 42);
1846 }
1847 {
1848 let datum = Datum::timestamptz_micros(42);
1849 let arrow_datum = get_arrow_datum(&datum).unwrap();
1850 let (array, is_scalar) = arrow_datum.get();
1851 let array = array
1852 .as_any()
1853 .downcast_ref::<TimestampMicrosecondArray>()
1854 .unwrap();
1855 assert!(is_scalar);
1856 assert_eq!(array.timezone(), Some("+00:00"));
1857 assert_eq!(array.value(0), 42);
1858 }
1859 {
1860 let datum = Datum::decimal_with_precision(Decimal::new(123, 2), 30).unwrap();
1861 let arrow_datum = get_arrow_datum(&datum).unwrap();
1862 let (array, is_scalar) = arrow_datum.get();
1863 let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
1864 assert!(is_scalar);
1865 assert_eq!(array.precision(), 30);
1866 assert_eq!(array.scale(), 2);
1867 assert_eq!(array.value(0), 123);
1868 }
1869 {
1870 let datum = Datum::uuid_from_str("42424242-4242-4242-4242-424242424242").unwrap();
1871 let arrow_datum = get_arrow_datum(&datum).unwrap();
1872 let (array, is_scalar) = arrow_datum.get();
1873 let array = array
1874 .as_any()
1875 .downcast_ref::<FixedSizeBinaryArray>()
1876 .unwrap();
1877 assert!(is_scalar);
1878 assert_eq!(array.value(0), [66u8; 16]);
1879 }
1880 }
1881}