1use std::collections::HashMap;
21use std::sync::Arc;
22
23use arrow_schema::SchemaRef as ArrowSchemaRef;
24use bytes::Bytes;
25use futures::future::BoxFuture;
26use itertools::Itertools;
27use parquet::arrow::AsyncArrowWriter;
28use parquet::arrow::async_reader::AsyncFileReader;
29use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter;
30use parquet::file::metadata::ParquetMetaData;
31use parquet::file::properties::WriterProperties;
32use parquet::file::statistics::Statistics;
33
34use super::{FileWriter, FileWriterBuilder};
35use crate::arrow::{
36 ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor,
37 get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum,
38};
39use crate::io::{FileIO, FileWrite, OutputFile};
40use crate::spec::{
41 DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType,
42 NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct,
43 StructType, TableMetadata, Type, visit_schema,
44};
45use crate::transform::create_transform_function;
46use crate::writer::{CurrentFileStatus, DataFile};
47use crate::{Error, ErrorKind, Result};
48
49#[derive(Clone, Debug)]
51pub struct ParquetWriterBuilder {
52 props: WriterProperties,
53 schema: SchemaRef,
54 match_mode: FieldMatchMode,
55}
56
57impl ParquetWriterBuilder {
58 pub fn new(props: WriterProperties, schema: SchemaRef) -> Self {
61 Self::new_with_match_mode(props, schema, FieldMatchMode::Id)
62 }
63
64 pub fn new_with_match_mode(
66 props: WriterProperties,
67 schema: SchemaRef,
68 match_mode: FieldMatchMode,
69 ) -> Self {
70 Self {
71 props,
72 schema,
73 match_mode,
74 }
75 }
76}
77
78impl FileWriterBuilder for ParquetWriterBuilder {
79 type R = ParquetWriter;
80
81 async fn build(&self, output_file: OutputFile) -> Result<Self::R> {
82 Ok(ParquetWriter {
83 schema: self.schema.clone(),
84 inner_writer: None,
85 writer_properties: self.props.clone(),
86 current_row_num: 0,
87 output_file,
88 nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode),
89 })
90 }
91}
92
93struct IndexByParquetPathName {
95 name_to_id: HashMap<String, i32>,
96
97 field_names: Vec<String>,
98
99 field_id: i32,
100}
101
102impl IndexByParquetPathName {
103 pub fn new() -> Self {
105 Self {
106 name_to_id: HashMap::new(),
107 field_names: Vec::new(),
108 field_id: 0,
109 }
110 }
111
112 pub fn get(&self, name: &str) -> Option<&i32> {
114 self.name_to_id.get(name)
115 }
116}
117
118impl Default for IndexByParquetPathName {
119 fn default() -> Self {
120 Self::new()
121 }
122}
123
124impl SchemaVisitor for IndexByParquetPathName {
125 type T = ();
126
127 fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> {
128 self.field_names.push(field.name.to_string());
129 self.field_id = field.id;
130 Ok(())
131 }
132
133 fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> {
134 self.field_names.pop();
135 Ok(())
136 }
137
138 fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> {
139 self.field_names.push(format!("list.{}", field.name));
140 self.field_id = field.id;
141 Ok(())
142 }
143
144 fn after_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> {
145 self.field_names.pop();
146 Ok(())
147 }
148
149 fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> {
150 self.field_names
151 .push(format!("{DEFAULT_MAP_FIELD_NAME}.key"));
152 self.field_id = field.id;
153 Ok(())
154 }
155
156 fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> {
157 self.field_names.pop();
158 Ok(())
159 }
160
161 fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> {
162 self.field_names
163 .push(format!("{DEFAULT_MAP_FIELD_NAME}.value"));
164 self.field_id = field.id;
165 Ok(())
166 }
167
168 fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> {
169 self.field_names.pop();
170 Ok(())
171 }
172
173 fn schema(&mut self, _schema: &Schema, _value: Self::T) -> Result<Self::T> {
174 Ok(())
175 }
176
177 fn field(&mut self, _field: &NestedFieldRef, _value: Self::T) -> Result<Self::T> {
178 Ok(())
179 }
180
181 fn r#struct(&mut self, _struct: &StructType, _results: Vec<Self::T>) -> Result<Self::T> {
182 Ok(())
183 }
184
185 fn list(&mut self, _list: &ListType, _value: Self::T) -> Result<Self::T> {
186 Ok(())
187 }
188
189 fn map(&mut self, _map: &MapType, _key_value: Self::T, _value: Self::T) -> Result<Self::T> {
190 Ok(())
191 }
192
193 fn primitive(&mut self, _p: &PrimitiveType) -> Result<Self::T> {
194 let full_name = self.field_names.iter().map(String::as_str).join(".");
195 let field_id = self.field_id;
196 if let Some(existing_field_id) = self.name_to_id.get(full_name.as_str()) {
197 return Err(Error::new(
198 ErrorKind::DataInvalid,
199 format!(
200 "Invalid schema: multiple fields for name {full_name}: {field_id} and {existing_field_id}"
201 ),
202 ));
203 } else {
204 self.name_to_id.insert(full_name, field_id);
205 }
206
207 Ok(())
208 }
209}
210
211pub struct ParquetWriter {
213 schema: SchemaRef,
214 output_file: OutputFile,
215 inner_writer: Option<AsyncArrowWriter<AsyncFileWriter<Box<dyn FileWrite>>>>,
216 writer_properties: WriterProperties,
217 current_row_num: usize,
218 nan_value_count_visitor: NanValueCountVisitor,
219}
220
221struct MinMaxColAggregator {
223 lower_bounds: HashMap<i32, Datum>,
224 upper_bounds: HashMap<i32, Datum>,
225 schema: SchemaRef,
226}
227
228impl MinMaxColAggregator {
229 fn new(schema: SchemaRef) -> Self {
231 Self {
232 lower_bounds: HashMap::new(),
233 upper_bounds: HashMap::new(),
234 schema,
235 }
236 }
237
238 fn update_state_min(&mut self, field_id: i32, datum: Datum) {
239 self.lower_bounds
240 .entry(field_id)
241 .and_modify(|e| {
242 if *e > datum {
243 *e = datum.clone()
244 }
245 })
246 .or_insert(datum);
247 }
248
249 fn update_state_max(&mut self, field_id: i32, datum: Datum) {
250 self.upper_bounds
251 .entry(field_id)
252 .and_modify(|e| {
253 if *e < datum {
254 *e = datum.clone()
255 }
256 })
257 .or_insert(datum);
258 }
259
260 fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> {
262 let Some(ty) = self
263 .schema
264 .field_by_id(field_id)
265 .map(|f| f.field_type.as_ref())
266 else {
267 return Ok(());
270 };
271 let Type::Primitive(ty) = ty.clone() else {
272 return Err(Error::new(
273 ErrorKind::Unexpected,
274 format!("Composed type {ty} is not supported for min max aggregation."),
275 ));
276 };
277
278 if value.min_is_exact() {
279 let Some(min_datum) = get_parquet_stat_min_as_datum(&ty, &value)? else {
280 return Err(Error::new(
281 ErrorKind::Unexpected,
282 format!("Statistics {value} is not match with field type {ty}."),
283 ));
284 };
285
286 self.update_state_min(field_id, min_datum);
287 }
288
289 if value.max_is_exact() {
290 let Some(max_datum) = get_parquet_stat_max_as_datum(&ty, &value)? else {
291 return Err(Error::new(
292 ErrorKind::Unexpected,
293 format!("Statistics {value} is not match with field type {ty}."),
294 ));
295 };
296
297 self.update_state_max(field_id, max_datum);
298 }
299
300 Ok(())
301 }
302
303 fn produce(self) -> (HashMap<i32, Datum>, HashMap<i32, Datum>) {
305 (self.lower_bounds, self.upper_bounds)
306 }
307}
308
309impl ParquetWriter {
310 #[allow(dead_code)]
312 pub(crate) async fn parquet_files_to_data_files(
313 file_io: &FileIO,
314 file_paths: Vec<String>,
315 table_metadata: &TableMetadata,
316 ) -> Result<Vec<DataFile>> {
317 let mut data_files: Vec<DataFile> = Vec::new();
319
320 for file_path in file_paths {
321 let input_file = file_io.new_input(&file_path)?;
322 let file_metadata = input_file.metadata().await?;
323 let file_size_in_bytes = file_metadata.size as usize;
324 let reader = input_file.reader().await?;
325
326 let mut parquet_reader = ArrowFileReader::new(file_metadata, reader);
327 let parquet_metadata = parquet_reader.get_metadata(None).await.map_err(|err| {
328 Error::new(
329 ErrorKind::DataInvalid,
330 format!("Error reading Parquet metadata: {err}"),
331 )
332 })?;
333 let mut builder = ParquetWriter::parquet_to_data_file_builder(
334 table_metadata.current_schema().clone(),
335 parquet_metadata,
336 file_size_in_bytes,
337 file_path,
338 HashMap::new(),
340 )?;
341 builder.partition_spec_id(table_metadata.default_partition_spec_id());
342 let data_file = builder.build().unwrap();
343 data_files.push(data_file);
344 }
345
346 Ok(data_files)
347 }
348
349 pub(crate) fn parquet_to_data_file_builder(
351 schema: SchemaRef,
352 metadata: Arc<ParquetMetaData>,
353 written_size: usize,
354 file_path: String,
355 nan_value_counts: HashMap<i32, u64>,
356 ) -> Result<DataFileBuilder> {
357 let index_by_parquet_path = {
358 let mut visitor = IndexByParquetPathName::new();
359 visit_schema(&schema, &mut visitor)?;
360 visitor
361 };
362
363 let (column_sizes, value_counts, null_value_counts, (lower_bounds, upper_bounds)) = {
364 let mut per_col_size: HashMap<i32, u64> = HashMap::new();
365 let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
366 let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
367 let mut min_max_agg = MinMaxColAggregator::new(schema);
368
369 for row_group in metadata.row_groups() {
370 for column_chunk_metadata in row_group.columns() {
371 let parquet_path = column_chunk_metadata.column_descr().path().string();
372
373 let Some(&field_id) = index_by_parquet_path.get(&parquet_path) else {
374 continue;
375 };
376
377 *per_col_size.entry(field_id).or_insert(0) +=
378 column_chunk_metadata.compressed_size() as u64;
379 *per_col_val_num.entry(field_id).or_insert(0) +=
380 column_chunk_metadata.num_values() as u64;
381
382 if let Some(statistics) = column_chunk_metadata.statistics() {
383 if let Some(null_count) = statistics.null_count_opt() {
384 *per_col_null_val_num.entry(field_id).or_insert(0) += null_count;
385 }
386
387 min_max_agg.update(field_id, statistics.clone())?;
388 }
389 }
390 }
391 (
392 per_col_size,
393 per_col_val_num,
394 per_col_null_val_num,
395 min_max_agg.produce(),
396 )
397 };
398
399 let mut builder = DataFileBuilder::default();
400 builder
401 .content(DataContentType::Data)
402 .file_path(file_path)
403 .file_format(DataFileFormat::Parquet)
404 .partition(Struct::empty())
405 .record_count(metadata.file_metadata().num_rows() as u64)
406 .file_size_in_bytes(written_size as u64)
407 .column_sizes(column_sizes)
408 .value_counts(value_counts)
409 .null_value_counts(null_value_counts)
410 .nan_value_counts(nan_value_counts)
411 .lower_bounds(lower_bounds)
414 .upper_bounds(upper_bounds)
415 .split_offsets(Some(
416 metadata
417 .row_groups()
418 .iter()
419 .filter_map(|group| group.file_offset())
420 .collect(),
421 ));
422
423 Ok(builder)
424 }
425
426 #[allow(dead_code)]
427 fn partition_value_from_bounds(
428 table_spec: Arc<PartitionSpec>,
429 lower_bounds: &HashMap<i32, Datum>,
430 upper_bounds: &HashMap<i32, Datum>,
431 ) -> Result<Struct> {
432 let mut partition_literals: Vec<Option<Literal>> = Vec::new();
433
434 for field in table_spec.fields() {
435 if let (Some(lower), Some(upper)) = (
436 lower_bounds.get(&field.source_id),
437 upper_bounds.get(&field.source_id),
438 ) {
439 if !field.transform.preserves_order() {
440 return Err(Error::new(
441 ErrorKind::DataInvalid,
442 format!(
443 "cannot infer partition value for non linear partition field (needs to preserve order): {} with transform {}",
444 field.name, field.transform
445 ),
446 ));
447 }
448
449 if lower != upper {
450 return Err(Error::new(
451 ErrorKind::DataInvalid,
452 format!(
453 "multiple partition values for field {}: lower: {:?}, upper: {:?}",
454 field.name, lower, upper
455 ),
456 ));
457 }
458
459 let transform_fn = create_transform_function(&field.transform)?;
460 let transform_literal =
461 Literal::from(transform_fn.transform_literal_result(lower)?);
462
463 partition_literals.push(Some(transform_literal));
464 } else {
465 partition_literals.push(None);
466 }
467 }
468
469 let partition_struct = Struct::from_iter(partition_literals);
470
471 Ok(partition_struct)
472 }
473}
474
475impl FileWriter for ParquetWriter {
476 async fn write(&mut self, batch: &arrow_array::RecordBatch) -> Result<()> {
477 if batch.num_rows() == 0 {
479 return Ok(());
480 }
481
482 self.current_row_num += batch.num_rows();
483
484 let batch_c = batch.clone();
485 self.nan_value_count_visitor
486 .compute(self.schema.clone(), batch_c)?;
487
488 let writer = if let Some(writer) = &mut self.inner_writer {
490 writer
491 } else {
492 let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
493 let inner_writer = self.output_file.writer().await?;
494 let async_writer = AsyncFileWriter::new(inner_writer);
495 let writer = AsyncArrowWriter::try_new(
496 async_writer,
497 arrow_schema.clone(),
498 Some(self.writer_properties.clone()),
499 )
500 .map_err(|err| {
501 Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.")
502 .with_source(err)
503 })?;
504 self.inner_writer = Some(writer);
505 self.inner_writer.as_mut().unwrap()
506 };
507
508 writer.write(batch).await.map_err(|err| {
509 Error::new(
510 ErrorKind::Unexpected,
511 "Failed to write using parquet writer.",
512 )
513 .with_source(err)
514 })?;
515
516 Ok(())
517 }
518
519 async fn close(mut self) -> Result<Vec<DataFileBuilder>> {
520 let mut writer = match self.inner_writer.take() {
521 Some(writer) => writer,
522 None => return Ok(vec![]),
523 };
524
525 let metadata = writer.finish().await.map_err(|err| {
526 Error::new(ErrorKind::Unexpected, "Failed to finish parquet writer.").with_source(err)
527 })?;
528
529 let written_size = writer.bytes_written();
530
531 if self.current_row_num == 0 {
532 self.output_file.delete().await.map_err(|err| {
533 Error::new(
534 ErrorKind::Unexpected,
535 "Failed to delete empty parquet file.",
536 )
537 .with_source(err)
538 })?;
539 Ok(vec![])
540 } else {
541 let parquet_metadata = Arc::new(metadata);
542
543 Ok(vec![Self::parquet_to_data_file_builder(
544 self.schema,
545 parquet_metadata,
546 written_size,
547 self.output_file.location().to_string(),
548 self.nan_value_count_visitor.nan_value_counts,
549 )?])
550 }
551 }
552}
553
554impl CurrentFileStatus for ParquetWriter {
555 fn current_file_path(&self) -> String {
556 self.output_file.location().to_string()
557 }
558
559 fn current_row_num(&self) -> usize {
560 self.current_row_num
561 }
562
563 fn current_written_size(&self) -> usize {
564 if let Some(inner) = self.inner_writer.as_ref() {
565 inner.bytes_written() + inner.in_progress_size()
568 } else {
569 0
571 }
572 }
573}
574
575struct AsyncFileWriter<W: FileWrite>(W);
581
582impl<W: FileWrite> AsyncFileWriter<W> {
583 pub fn new(writer: W) -> Self {
585 Self(writer)
586 }
587}
588
589impl<W: FileWrite> ArrowAsyncFileWriter for AsyncFileWriter<W> {
590 fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
591 Box::pin(async {
592 self.0
593 .write(bs)
594 .await
595 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
596 })
597 }
598
599 fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
600 Box::pin(async {
601 self.0
602 .close()
603 .await
604 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
605 })
606 }
607}
608
609#[cfg(test)]
610mod tests {
611 use std::collections::HashMap;
612 use std::sync::Arc;
613
614 use anyhow::Result;
615 use arrow_array::builder::{Float32Builder, Int32Builder, MapBuilder};
616 use arrow_array::types::{Float32Type, Int64Type};
617 use arrow_array::{
618 Array, ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array,
619 Int64Array, ListArray, MapArray, RecordBatch, StructArray,
620 };
621 use arrow_schema::{DataType, Field, Fields, SchemaRef as ArrowSchemaRef};
622 use arrow_select::concat::concat_batches;
623 use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
624 use parquet::file::statistics::ValueStatistics;
625 use rust_decimal::Decimal;
626 use tempfile::TempDir;
627 use uuid::Uuid;
628
629 use super::*;
630 use crate::arrow::schema_to_arrow_schema;
631 use crate::io::FileIOBuilder;
632 use crate::spec::{PrimitiveLiteral, Struct, *};
633 use crate::writer::file_writer::location_generator::{
634 DefaultFileNameGenerator, DefaultLocationGenerator, FileNameGenerator, LocationGenerator,
635 };
636 use crate::writer::tests::check_parquet_data_file;
637
638 fn schema_for_all_type() -> Schema {
639 Schema::builder()
640 .with_schema_id(1)
641 .with_fields(vec![
642 NestedField::optional(0, "boolean", Type::Primitive(PrimitiveType::Boolean)).into(),
643 NestedField::optional(1, "int", Type::Primitive(PrimitiveType::Int)).into(),
644 NestedField::optional(2, "long", Type::Primitive(PrimitiveType::Long)).into(),
645 NestedField::optional(3, "float", Type::Primitive(PrimitiveType::Float)).into(),
646 NestedField::optional(4, "double", Type::Primitive(PrimitiveType::Double)).into(),
647 NestedField::optional(5, "string", Type::Primitive(PrimitiveType::String)).into(),
648 NestedField::optional(6, "binary", Type::Primitive(PrimitiveType::Binary)).into(),
649 NestedField::optional(7, "date", Type::Primitive(PrimitiveType::Date)).into(),
650 NestedField::optional(8, "time", Type::Primitive(PrimitiveType::Time)).into(),
651 NestedField::optional(9, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
652 .into(),
653 NestedField::optional(
654 10,
655 "timestamptz",
656 Type::Primitive(PrimitiveType::Timestamptz),
657 )
658 .into(),
659 NestedField::optional(
660 11,
661 "timestamp_ns",
662 Type::Primitive(PrimitiveType::TimestampNs),
663 )
664 .into(),
665 NestedField::optional(
666 12,
667 "timestamptz_ns",
668 Type::Primitive(PrimitiveType::TimestamptzNs),
669 )
670 .into(),
671 NestedField::optional(
672 13,
673 "decimal",
674 Type::Primitive(PrimitiveType::Decimal {
675 precision: 10,
676 scale: 5,
677 }),
678 )
679 .into(),
680 NestedField::optional(14, "uuid", Type::Primitive(PrimitiveType::Uuid)).into(),
681 NestedField::optional(15, "fixed", Type::Primitive(PrimitiveType::Fixed(10)))
682 .into(),
683 NestedField::optional(
686 16,
687 "decimal_38",
688 Type::Primitive(PrimitiveType::Decimal {
689 precision: 38,
690 scale: 5,
691 }),
692 )
693 .into(),
694 ])
695 .build()
696 .unwrap()
697 }
698
699 fn nested_schema_for_test() -> Schema {
700 Schema::builder()
702 .with_schema_id(1)
703 .with_fields(vec![
704 NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Long)).into(),
705 NestedField::required(
706 1,
707 "col1",
708 Type::Struct(StructType::new(vec![
709 NestedField::required(5, "col_1_5", Type::Primitive(PrimitiveType::Long))
710 .into(),
711 NestedField::required(6, "col_1_6", Type::Primitive(PrimitiveType::Long))
712 .into(),
713 ])),
714 )
715 .into(),
716 NestedField::required(2, "col2", Type::Primitive(PrimitiveType::String)).into(),
717 NestedField::required(
718 3,
719 "col3",
720 Type::List(ListType::new(
721 NestedField::required(7, "element", Type::Primitive(PrimitiveType::Long))
722 .into(),
723 )),
724 )
725 .into(),
726 NestedField::required(
727 4,
728 "col4",
729 Type::Struct(StructType::new(vec![
730 NestedField::required(
731 8,
732 "col_4_8",
733 Type::Struct(StructType::new(vec![
734 NestedField::required(
735 9,
736 "col_4_8_9",
737 Type::Primitive(PrimitiveType::Long),
738 )
739 .into(),
740 ])),
741 )
742 .into(),
743 ])),
744 )
745 .into(),
746 NestedField::required(
747 10,
748 "col5",
749 Type::Map(MapType::new(
750 NestedField::required(11, "key", Type::Primitive(PrimitiveType::String))
751 .into(),
752 NestedField::required(
753 12,
754 "value",
755 Type::List(ListType::new(
756 NestedField::required(
757 13,
758 "item",
759 Type::Primitive(PrimitiveType::Long),
760 )
761 .into(),
762 )),
763 )
764 .into(),
765 )),
766 )
767 .into(),
768 ])
769 .build()
770 .unwrap()
771 }
772
773 #[tokio::test]
774 async fn test_index_by_parquet_path() {
775 let expect = HashMap::from([
776 ("col0".to_string(), 0),
777 ("col1.col_1_5".to_string(), 5),
778 ("col1.col_1_6".to_string(), 6),
779 ("col2".to_string(), 2),
780 ("col3.list.element".to_string(), 7),
781 ("col4.col_4_8.col_4_8_9".to_string(), 9),
782 ("col5.key_value.key".to_string(), 11),
783 ("col5.key_value.value.list.item".to_string(), 13),
784 ]);
785 let mut visitor = IndexByParquetPathName::new();
786 visit_schema(&nested_schema_for_test(), &mut visitor).unwrap();
787 assert_eq!(visitor.name_to_id, expect);
788 }
789
790 #[tokio::test]
791 async fn test_parquet_writer() -> Result<()> {
792 let temp_dir = TempDir::new().unwrap();
793 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
794 let location_gen = DefaultLocationGenerator::with_data_location(
795 temp_dir.path().to_str().unwrap().to_string(),
796 );
797 let file_name_gen =
798 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
799
800 let schema = {
802 let fields =
803 vec![
804 Field::new("col", DataType::Int64, true).with_metadata(HashMap::from([(
805 PARQUET_FIELD_ID_META_KEY.to_string(),
806 "0".to_string(),
807 )])),
808 ];
809 Arc::new(arrow_schema::Schema::new(fields))
810 };
811 let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
812 let null_col = Arc::new(Int64Array::new_null(1024)) as ArrayRef;
813 let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
814 let to_write_null = RecordBatch::try_new(schema.clone(), vec![null_col]).unwrap();
815
816 let output_file = file_io.new_output(
817 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
818 )?;
819
820 let mut pw = ParquetWriterBuilder::new(
822 WriterProperties::builder()
823 .set_max_row_group_size(128)
824 .build(),
825 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
826 )
827 .build(output_file)
828 .await?;
829 pw.write(&to_write).await?;
830 pw.write(&to_write_null).await?;
831 let res = pw.close().await?;
832 assert_eq!(res.len(), 1);
833 let data_file = res
834 .into_iter()
835 .next()
836 .unwrap()
837 .content(DataContentType::Data)
839 .partition(Struct::empty())
840 .partition_spec_id(0)
841 .build()
842 .unwrap();
843
844 assert_eq!(data_file.record_count(), 2048);
846 assert_eq!(*data_file.value_counts(), HashMap::from([(0, 2048)]));
847 assert_eq!(
848 *data_file.lower_bounds(),
849 HashMap::from([(0, Datum::long(0))])
850 );
851 assert_eq!(
852 *data_file.upper_bounds(),
853 HashMap::from([(0, Datum::long(1023))])
854 );
855 assert_eq!(*data_file.null_value_counts(), HashMap::from([(0, 1024)]));
856
857 let expect_batch = concat_batches(&schema, vec![&to_write, &to_write_null]).unwrap();
859 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
860
861 Ok(())
862 }
863
864 #[tokio::test]
865 async fn test_parquet_writer_with_complex_schema() -> Result<()> {
866 let temp_dir = TempDir::new().unwrap();
867 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
868 let location_gen = DefaultLocationGenerator::with_data_location(
869 temp_dir.path().to_str().unwrap().to_string(),
870 );
871 let file_name_gen =
872 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
873
874 let schema = nested_schema_for_test();
876 let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
877 let col0 = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
878 let col1 = Arc::new(StructArray::new(
879 {
880 if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
881 fields.clone()
882 } else {
883 unreachable!()
884 }
885 },
886 vec![
887 Arc::new(Int64Array::from_iter_values(0..1024)),
888 Arc::new(Int64Array::from_iter_values(0..1024)),
889 ],
890 None,
891 ));
892 let col2 = Arc::new(arrow_array::StringArray::from_iter_values(
893 (0..1024).map(|n| n.to_string()),
894 )) as ArrayRef;
895 let col3 = Arc::new({
896 let list_parts = arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(
897 (0..1024).map(|n| Some(vec![Some(n)])),
898 )
899 .into_parts();
900 arrow_array::ListArray::new(
901 {
902 if let DataType::List(field) = arrow_schema.field(3).data_type() {
903 field.clone()
904 } else {
905 unreachable!()
906 }
907 },
908 list_parts.1,
909 list_parts.2,
910 list_parts.3,
911 )
912 }) as ArrayRef;
913 let col4 = Arc::new(StructArray::new(
914 {
915 if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
916 fields.clone()
917 } else {
918 unreachable!()
919 }
920 },
921 vec![Arc::new(StructArray::new(
922 {
923 if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
924 if let DataType::Struct(fields) = fields[0].data_type() {
925 fields.clone()
926 } else {
927 unreachable!()
928 }
929 } else {
930 unreachable!()
931 }
932 },
933 vec![Arc::new(Int64Array::from_iter_values(0..1024))],
934 None,
935 ))],
936 None,
937 ));
938 let col5 = Arc::new({
939 let mut map_array_builder = MapBuilder::new(
940 None,
941 arrow_array::builder::StringBuilder::new(),
942 arrow_array::builder::ListBuilder::new(arrow_array::builder::PrimitiveBuilder::<
943 Int64Type,
944 >::new()),
945 );
946 for i in 0..1024 {
947 map_array_builder.keys().append_value(i.to_string());
948 map_array_builder
949 .values()
950 .append_value(vec![Some(i as i64); i + 1]);
951 map_array_builder.append(true)?;
952 }
953 let (_, offset_buffer, struct_array, null_buffer, ordered) =
954 map_array_builder.finish().into_parts();
955 let struct_array = {
956 let (_, mut arrays, nulls) = struct_array.into_parts();
957 let list_array = {
958 let list_array = arrays[1]
959 .as_any()
960 .downcast_ref::<ListArray>()
961 .unwrap()
962 .clone();
963 let (_, offsets, array, nulls) = list_array.into_parts();
964 let list_field = {
965 if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
966 if let DataType::Struct(fields) = map_field.data_type() {
967 if let DataType::List(list_field) = fields[1].data_type() {
968 list_field.clone()
969 } else {
970 unreachable!()
971 }
972 } else {
973 unreachable!()
974 }
975 } else {
976 unreachable!()
977 }
978 };
979 ListArray::new(list_field, offsets, array, nulls)
980 };
981 arrays[1] = Arc::new(list_array) as ArrayRef;
982 StructArray::new(
983 {
984 if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
985 if let DataType::Struct(fields) = map_field.data_type() {
986 fields.clone()
987 } else {
988 unreachable!()
989 }
990 } else {
991 unreachable!()
992 }
993 },
994 arrays,
995 nulls,
996 )
997 };
998 arrow_array::MapArray::new(
999 {
1000 if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1001 map_field.clone()
1002 } else {
1003 unreachable!()
1004 }
1005 },
1006 offset_buffer,
1007 struct_array,
1008 null_buffer,
1009 ordered,
1010 )
1011 }) as ArrayRef;
1012 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1013 col0, col1, col2, col3, col4, col5,
1014 ])
1015 .unwrap();
1016 let output_file = file_io.new_output(
1017 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1018 )?;
1019
1020 let mut pw =
1022 ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
1023 .build(output_file)
1024 .await?;
1025 pw.write(&to_write).await?;
1026 let res = pw.close().await?;
1027 assert_eq!(res.len(), 1);
1028 let data_file = res
1029 .into_iter()
1030 .next()
1031 .unwrap()
1032 .content(crate::spec::DataContentType::Data)
1034 .partition(Struct::empty())
1035 .partition_spec_id(0)
1036 .build()
1037 .unwrap();
1038
1039 assert_eq!(data_file.record_count(), 1024);
1041 assert_eq!(
1042 *data_file.value_counts(),
1043 HashMap::from([
1044 (0, 1024),
1045 (5, 1024),
1046 (6, 1024),
1047 (2, 1024),
1048 (7, 1024),
1049 (9, 1024),
1050 (11, 1024),
1051 (13, (1..1025).sum()),
1052 ])
1053 );
1054 assert_eq!(
1055 *data_file.lower_bounds(),
1056 HashMap::from([
1057 (0, Datum::long(0)),
1058 (5, Datum::long(0)),
1059 (6, Datum::long(0)),
1060 (2, Datum::string("0")),
1061 (7, Datum::long(0)),
1062 (9, Datum::long(0)),
1063 (11, Datum::string("0")),
1064 (13, Datum::long(0))
1065 ])
1066 );
1067 assert_eq!(
1068 *data_file.upper_bounds(),
1069 HashMap::from([
1070 (0, Datum::long(1023)),
1071 (5, Datum::long(1023)),
1072 (6, Datum::long(1023)),
1073 (2, Datum::string("999")),
1074 (7, Datum::long(1023)),
1075 (9, Datum::long(1023)),
1076 (11, Datum::string("999")),
1077 (13, Datum::long(1023))
1078 ])
1079 );
1080
1081 check_parquet_data_file(&file_io, &data_file, &to_write).await;
1083
1084 Ok(())
1085 }
1086
1087 #[tokio::test]
1088 async fn test_all_type_for_write() -> Result<()> {
1089 let temp_dir = TempDir::new().unwrap();
1090 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1091 let location_gen = DefaultLocationGenerator::with_data_location(
1092 temp_dir.path().to_str().unwrap().to_string(),
1093 );
1094 let file_name_gen =
1095 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1096
1097 let schema = schema_for_all_type();
1100 let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
1101 let col0 = Arc::new(BooleanArray::from(vec![
1102 Some(true),
1103 Some(false),
1104 None,
1105 Some(true),
1106 ])) as ArrayRef;
1107 let col1 = Arc::new(Int32Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1108 let col2 = Arc::new(Int64Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1109 let col3 = Arc::new(arrow_array::Float32Array::from(vec![
1110 Some(0.5),
1111 Some(2.0),
1112 None,
1113 Some(3.5),
1114 ])) as ArrayRef;
1115 let col4 = Arc::new(arrow_array::Float64Array::from(vec![
1116 Some(0.5),
1117 Some(2.0),
1118 None,
1119 Some(3.5),
1120 ])) as ArrayRef;
1121 let col5 = Arc::new(arrow_array::StringArray::from(vec![
1122 Some("a"),
1123 Some("b"),
1124 None,
1125 Some("d"),
1126 ])) as ArrayRef;
1127 let col6 = Arc::new(arrow_array::LargeBinaryArray::from_opt_vec(vec![
1128 Some(b"one"),
1129 None,
1130 Some(b""),
1131 Some(b"zzzz"),
1132 ])) as ArrayRef;
1133 let col7 = Arc::new(arrow_array::Date32Array::from(vec![
1134 Some(0),
1135 Some(1),
1136 None,
1137 Some(3),
1138 ])) as ArrayRef;
1139 let col8 = Arc::new(arrow_array::Time64MicrosecondArray::from(vec![
1140 Some(0),
1141 Some(1),
1142 None,
1143 Some(3),
1144 ])) as ArrayRef;
1145 let col9 = Arc::new(arrow_array::TimestampMicrosecondArray::from(vec![
1146 Some(0),
1147 Some(1),
1148 None,
1149 Some(3),
1150 ])) as ArrayRef;
1151 let col10 = Arc::new(
1152 arrow_array::TimestampMicrosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1153 .with_timezone_utc(),
1154 ) as ArrayRef;
1155 let col11 = Arc::new(arrow_array::TimestampNanosecondArray::from(vec![
1156 Some(0),
1157 Some(1),
1158 None,
1159 Some(3),
1160 ])) as ArrayRef;
1161 let col12 = Arc::new(
1162 arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1163 .with_timezone_utc(),
1164 ) as ArrayRef;
1165 let col13 = Arc::new(
1166 arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1167 .with_precision_and_scale(10, 5)
1168 .unwrap(),
1169 ) as ArrayRef;
1170 let col14 = Arc::new(
1171 arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1172 vec![
1173 Some(Uuid::from_u128(0).as_bytes().to_vec()),
1174 Some(Uuid::from_u128(1).as_bytes().to_vec()),
1175 None,
1176 Some(Uuid::from_u128(3).as_bytes().to_vec()),
1177 ]
1178 .into_iter(),
1179 16,
1180 )
1181 .unwrap(),
1182 ) as ArrayRef;
1183 let col15 = Arc::new(
1184 arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1185 vec![
1186 Some(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
1187 Some(vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]),
1188 None,
1189 Some(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]),
1190 ]
1191 .into_iter(),
1192 10,
1193 )
1194 .unwrap(),
1195 ) as ArrayRef;
1196 let col16 = Arc::new(
1197 arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1198 .with_precision_and_scale(38, 5)
1199 .unwrap(),
1200 ) as ArrayRef;
1201 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1202 col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13,
1203 col14, col15, col16,
1204 ])
1205 .unwrap();
1206 let output_file = file_io.new_output(
1207 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1208 )?;
1209
1210 let mut pw =
1212 ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
1213 .build(output_file)
1214 .await?;
1215 pw.write(&to_write).await?;
1216 let res = pw.close().await?;
1217 assert_eq!(res.len(), 1);
1218 let data_file = res
1219 .into_iter()
1220 .next()
1221 .unwrap()
1222 .content(crate::spec::DataContentType::Data)
1224 .partition(Struct::empty())
1225 .partition_spec_id(0)
1226 .build()
1227 .unwrap();
1228
1229 assert_eq!(data_file.record_count(), 4);
1231 assert!(data_file.value_counts().iter().all(|(_, &v)| { v == 4 }));
1232 assert!(
1233 data_file
1234 .null_value_counts()
1235 .iter()
1236 .all(|(_, &v)| { v == 1 })
1237 );
1238 assert_eq!(
1239 *data_file.lower_bounds(),
1240 HashMap::from([
1241 (0, Datum::bool(false)),
1242 (1, Datum::int(1)),
1243 (2, Datum::long(1)),
1244 (3, Datum::float(0.5)),
1245 (4, Datum::double(0.5)),
1246 (5, Datum::string("a")),
1247 (6, Datum::binary(vec![])),
1248 (7, Datum::date(0)),
1249 (8, Datum::time_micros(0).unwrap()),
1250 (9, Datum::timestamp_micros(0)),
1251 (10, Datum::timestamptz_micros(0)),
1252 (11, Datum::timestamp_nanos(0)),
1253 (12, Datum::timestamptz_nanos(0)),
1254 (
1255 13,
1256 Datum::new(
1257 PrimitiveType::Decimal {
1258 precision: 10,
1259 scale: 5
1260 },
1261 PrimitiveLiteral::Int128(1)
1262 )
1263 ),
1264 (14, Datum::uuid(Uuid::from_u128(0))),
1265 (15, Datum::fixed(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1266 (
1267 16,
1268 Datum::new(
1269 PrimitiveType::Decimal {
1270 precision: 38,
1271 scale: 5
1272 },
1273 PrimitiveLiteral::Int128(1)
1274 )
1275 ),
1276 ])
1277 );
1278 assert_eq!(
1279 *data_file.upper_bounds(),
1280 HashMap::from([
1281 (0, Datum::bool(true)),
1282 (1, Datum::int(4)),
1283 (2, Datum::long(4)),
1284 (3, Datum::float(3.5)),
1285 (4, Datum::double(3.5)),
1286 (5, Datum::string("d")),
1287 (6, Datum::binary(vec![122, 122, 122, 122])),
1288 (7, Datum::date(3)),
1289 (8, Datum::time_micros(3).unwrap()),
1290 (9, Datum::timestamp_micros(3)),
1291 (10, Datum::timestamptz_micros(3)),
1292 (11, Datum::timestamp_nanos(3)),
1293 (12, Datum::timestamptz_nanos(3)),
1294 (
1295 13,
1296 Datum::new(
1297 PrimitiveType::Decimal {
1298 precision: 10,
1299 scale: 5
1300 },
1301 PrimitiveLiteral::Int128(100)
1302 )
1303 ),
1304 (14, Datum::uuid(Uuid::from_u128(3))),
1305 (
1306 15,
1307 Datum::fixed(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30])
1308 ),
1309 (
1310 16,
1311 Datum::new(
1312 PrimitiveType::Decimal {
1313 precision: 38,
1314 scale: 5
1315 },
1316 PrimitiveLiteral::Int128(100)
1317 )
1318 ),
1319 ])
1320 );
1321
1322 check_parquet_data_file(&file_io, &data_file, &to_write).await;
1324
1325 Ok(())
1326 }
1327
1328 #[tokio::test]
1329 async fn test_decimal_bound() -> Result<()> {
1330 let temp_dir = TempDir::new().unwrap();
1331 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1332 let location_gen = DefaultLocationGenerator::with_data_location(
1333 temp_dir.path().to_str().unwrap().to_string(),
1334 );
1335 let file_name_gen =
1336 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1337
1338 let schema = Arc::new(
1340 Schema::builder()
1341 .with_fields(vec![
1342 NestedField::optional(
1343 0,
1344 "decimal",
1345 Type::Primitive(PrimitiveType::Decimal {
1346 precision: 28,
1347 scale: 10,
1348 }),
1349 )
1350 .into(),
1351 ])
1352 .build()
1353 .unwrap(),
1354 );
1355 let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1356 let output_file = file_io.new_output(
1357 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1358 )?;
1359 let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1360 .build(output_file)
1361 .await?;
1362 let col0 = Arc::new(
1363 Decimal128Array::from(vec![Some(22000000000), Some(11000000000)])
1364 .with_data_type(DataType::Decimal128(28, 10)),
1365 ) as ArrayRef;
1366 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1367 pw.write(&to_write).await?;
1368 let res = pw.close().await?;
1369 assert_eq!(res.len(), 1);
1370 let data_file = res
1371 .into_iter()
1372 .next()
1373 .unwrap()
1374 .content(crate::spec::DataContentType::Data)
1375 .partition(Struct::empty())
1376 .partition_spec_id(0)
1377 .build()
1378 .unwrap();
1379 assert_eq!(
1380 data_file.upper_bounds().get(&0),
1381 Some(Datum::decimal_with_precision(Decimal::new(22000000000_i64, 10), 28).unwrap())
1382 .as_ref()
1383 );
1384 assert_eq!(
1385 data_file.lower_bounds().get(&0),
1386 Some(Datum::decimal_with_precision(Decimal::new(11000000000_i64, 10), 28).unwrap())
1387 .as_ref()
1388 );
1389
1390 let schema = Arc::new(
1392 Schema::builder()
1393 .with_fields(vec![
1394 NestedField::optional(
1395 0,
1396 "decimal",
1397 Type::Primitive(PrimitiveType::Decimal {
1398 precision: 28,
1399 scale: 10,
1400 }),
1401 )
1402 .into(),
1403 ])
1404 .build()
1405 .unwrap(),
1406 );
1407 let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1408 let output_file = file_io.new_output(
1409 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1410 )?;
1411 let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1412 .build(output_file)
1413 .await?;
1414 let col0 = Arc::new(
1415 Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)])
1416 .with_data_type(DataType::Decimal128(28, 10)),
1417 ) as ArrayRef;
1418 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1419 pw.write(&to_write).await?;
1420 let res = pw.close().await?;
1421 assert_eq!(res.len(), 1);
1422 let data_file = res
1423 .into_iter()
1424 .next()
1425 .unwrap()
1426 .content(crate::spec::DataContentType::Data)
1427 .partition(Struct::empty())
1428 .partition_spec_id(0)
1429 .build()
1430 .unwrap();
1431 assert_eq!(
1432 data_file.upper_bounds().get(&0),
1433 Some(Datum::decimal_with_precision(Decimal::new(-11000000000_i64, 10), 28).unwrap())
1434 .as_ref()
1435 );
1436 assert_eq!(
1437 data_file.lower_bounds().get(&0),
1438 Some(Datum::decimal_with_precision(Decimal::new(-22000000000_i64, 10), 28).unwrap())
1439 .as_ref()
1440 );
1441
1442 let decimal_max = Decimal::MAX;
1444 let decimal_min = Decimal::MIN;
1445 assert_eq!(decimal_max.scale(), decimal_min.scale());
1446 let schema = Arc::new(
1447 Schema::builder()
1448 .with_fields(vec![
1449 NestedField::optional(
1450 0,
1451 "decimal",
1452 Type::Primitive(PrimitiveType::Decimal {
1453 precision: 38,
1454 scale: decimal_max.scale(),
1455 }),
1456 )
1457 .into(),
1458 ])
1459 .build()
1460 .unwrap(),
1461 );
1462 let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1463 let output_file = file_io.new_output(
1464 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1465 )?;
1466 let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema)
1467 .build(output_file)
1468 .await?;
1469 let col0 = Arc::new(
1470 Decimal128Array::from(vec![
1471 Some(decimal_max.mantissa()),
1472 Some(decimal_min.mantissa()),
1473 ])
1474 .with_data_type(DataType::Decimal128(38, 0)),
1475 ) as ArrayRef;
1476 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1477 pw.write(&to_write).await?;
1478 let res = pw.close().await?;
1479 assert_eq!(res.len(), 1);
1480 let data_file = res
1481 .into_iter()
1482 .next()
1483 .unwrap()
1484 .content(crate::spec::DataContentType::Data)
1485 .partition(Struct::empty())
1486 .partition_spec_id(0)
1487 .build()
1488 .unwrap();
1489 assert_eq!(
1490 data_file.upper_bounds().get(&0),
1491 Some(Datum::decimal(decimal_max).unwrap()).as_ref()
1492 );
1493 assert_eq!(
1494 data_file.lower_bounds().get(&0),
1495 Some(Datum::decimal(decimal_min).unwrap()).as_ref()
1496 );
1497
1498 Ok(())
1568 }
1569
1570 #[tokio::test]
1571 async fn test_empty_write() -> Result<()> {
1572 let temp_dir = TempDir::new().unwrap();
1573 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1574 let location_gen = DefaultLocationGenerator::with_data_location(
1575 temp_dir.path().to_str().unwrap().to_string(),
1576 );
1577 let file_name_gen =
1578 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1579
1580 let schema = {
1582 let fields = vec![
1583 arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true).with_metadata(
1584 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1585 ),
1586 ];
1587 Arc::new(arrow_schema::Schema::new(fields))
1588 };
1589 let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
1590 let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
1591 let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name());
1592 let output_file = file_io.new_output(&file_path)?;
1593 let mut pw = ParquetWriterBuilder::new(
1594 WriterProperties::builder().build(),
1595 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1596 )
1597 .build(output_file)
1598 .await?;
1599 pw.write(&to_write).await?;
1600 pw.close().await.unwrap();
1601 assert!(file_io.exists(&file_path).await.unwrap());
1602
1603 let file_name_gen =
1605 DefaultFileNameGenerator::new("test_empty".to_string(), None, DataFileFormat::Parquet);
1606 let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name());
1607 let output_file = file_io.new_output(&file_path)?;
1608 let pw = ParquetWriterBuilder::new(
1609 WriterProperties::builder().build(),
1610 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1611 )
1612 .build(output_file)
1613 .await?;
1614 pw.close().await.unwrap();
1615 assert!(!file_io.exists(&file_path).await.unwrap());
1616
1617 Ok(())
1618 }
1619
1620 #[tokio::test]
1621 async fn test_nan_val_cnts_primitive_type() -> Result<()> {
1622 let temp_dir = TempDir::new().unwrap();
1623 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1624 let location_gen = DefaultLocationGenerator::with_data_location(
1625 temp_dir.path().to_str().unwrap().to_string(),
1626 );
1627 let file_name_gen =
1628 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1629
1630 let arrow_schema = {
1632 let fields = vec![
1633 Field::new("col", arrow_schema::DataType::Float32, false).with_metadata(
1634 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1635 ),
1636 Field::new("col2", arrow_schema::DataType::Float64, false).with_metadata(
1637 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1638 ),
1639 ];
1640 Arc::new(arrow_schema::Schema::new(fields))
1641 };
1642
1643 let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1644 [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1645 None,
1646 )) as ArrayRef;
1647
1648 let float_64_col = Arc::new(Float64Array::from_iter_values_with_nulls(
1649 [1.0_f64, f64::NAN, 2.0, 2.0].into_iter(),
1650 None,
1651 )) as ArrayRef;
1652
1653 let to_write =
1654 RecordBatch::try_new(arrow_schema.clone(), vec![float_32_col, float_64_col]).unwrap();
1655 let output_file = file_io.new_output(
1656 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1657 )?;
1658
1659 let mut pw = ParquetWriterBuilder::new(
1661 WriterProperties::builder().build(),
1662 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1663 )
1664 .build(output_file)
1665 .await?;
1666
1667 pw.write(&to_write).await?;
1668 let res = pw.close().await?;
1669 assert_eq!(res.len(), 1);
1670 let data_file = res
1671 .into_iter()
1672 .next()
1673 .unwrap()
1674 .content(crate::spec::DataContentType::Data)
1676 .partition(Struct::empty())
1677 .partition_spec_id(0)
1678 .build()
1679 .unwrap();
1680
1681 assert_eq!(data_file.record_count(), 4);
1683 assert_eq!(*data_file.value_counts(), HashMap::from([(0, 4), (1, 4)]));
1684 assert_eq!(
1685 *data_file.lower_bounds(),
1686 HashMap::from([(0, Datum::float(1.0)), (1, Datum::double(1.0)),])
1687 );
1688 assert_eq!(
1689 *data_file.upper_bounds(),
1690 HashMap::from([(0, Datum::float(2.0)), (1, Datum::double(2.0)),])
1691 );
1692 assert_eq!(
1693 *data_file.null_value_counts(),
1694 HashMap::from([(0, 0), (1, 0)])
1695 );
1696 assert_eq!(
1697 *data_file.nan_value_counts(),
1698 HashMap::from([(0, 1), (1, 1)])
1699 );
1700
1701 let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1703 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1704
1705 Ok(())
1706 }
1707
1708 #[tokio::test]
1709 async fn test_nan_val_cnts_struct_type() -> Result<()> {
1710 let temp_dir = TempDir::new().unwrap();
1711 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1712 let location_gen = DefaultLocationGenerator::with_data_location(
1713 temp_dir.path().to_str().unwrap().to_string(),
1714 );
1715 let file_name_gen =
1716 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1717
1718 let schema_struct_float_fields = Fields::from(vec![
1719 Field::new("col4", DataType::Float32, false).with_metadata(HashMap::from([(
1720 PARQUET_FIELD_ID_META_KEY.to_string(),
1721 "4".to_string(),
1722 )])),
1723 ]);
1724
1725 let schema_struct_nested_float_fields = Fields::from(vec![
1726 Field::new("col7", DataType::Float32, false).with_metadata(HashMap::from([(
1727 PARQUET_FIELD_ID_META_KEY.to_string(),
1728 "7".to_string(),
1729 )])),
1730 ]);
1731
1732 let schema_struct_nested_fields = Fields::from(vec![
1733 Field::new(
1734 "col6",
1735 arrow_schema::DataType::Struct(schema_struct_nested_float_fields.clone()),
1736 false,
1737 )
1738 .with_metadata(HashMap::from([(
1739 PARQUET_FIELD_ID_META_KEY.to_string(),
1740 "6".to_string(),
1741 )])),
1742 ]);
1743
1744 let arrow_schema = {
1746 let fields = vec![
1747 Field::new(
1748 "col3",
1749 arrow_schema::DataType::Struct(schema_struct_float_fields.clone()),
1750 false,
1751 )
1752 .with_metadata(HashMap::from([(
1753 PARQUET_FIELD_ID_META_KEY.to_string(),
1754 "3".to_string(),
1755 )])),
1756 Field::new(
1757 "col5",
1758 arrow_schema::DataType::Struct(schema_struct_nested_fields.clone()),
1759 false,
1760 )
1761 .with_metadata(HashMap::from([(
1762 PARQUET_FIELD_ID_META_KEY.to_string(),
1763 "5".to_string(),
1764 )])),
1765 ];
1766 Arc::new(arrow_schema::Schema::new(fields))
1767 };
1768
1769 let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1770 [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1771 None,
1772 )) as ArrayRef;
1773
1774 let struct_float_field_col = Arc::new(StructArray::new(
1775 schema_struct_float_fields,
1776 vec![float_32_col.clone()],
1777 None,
1778 )) as ArrayRef;
1779
1780 let struct_nested_float_field_col = Arc::new(StructArray::new(
1781 schema_struct_nested_fields,
1782 vec![Arc::new(StructArray::new(
1783 schema_struct_nested_float_fields,
1784 vec![float_32_col.clone()],
1785 None,
1786 )) as ArrayRef],
1787 None,
1788 )) as ArrayRef;
1789
1790 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1791 struct_float_field_col,
1792 struct_nested_float_field_col,
1793 ])
1794 .unwrap();
1795 let output_file = file_io.new_output(
1796 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1797 )?;
1798
1799 let mut pw = ParquetWriterBuilder::new(
1801 WriterProperties::builder().build(),
1802 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1803 )
1804 .build(output_file)
1805 .await?;
1806
1807 pw.write(&to_write).await?;
1808 let res = pw.close().await?;
1809 assert_eq!(res.len(), 1);
1810 let data_file = res
1811 .into_iter()
1812 .next()
1813 .unwrap()
1814 .content(crate::spec::DataContentType::Data)
1816 .partition(Struct::empty())
1817 .partition_spec_id(0)
1818 .build()
1819 .unwrap();
1820
1821 assert_eq!(data_file.record_count(), 4);
1823 assert_eq!(*data_file.value_counts(), HashMap::from([(4, 4), (7, 4)]));
1824 assert_eq!(
1825 *data_file.lower_bounds(),
1826 HashMap::from([(4, Datum::float(1.0)), (7, Datum::float(1.0)),])
1827 );
1828 assert_eq!(
1829 *data_file.upper_bounds(),
1830 HashMap::from([(4, Datum::float(2.0)), (7, Datum::float(2.0)),])
1831 );
1832 assert_eq!(
1833 *data_file.null_value_counts(),
1834 HashMap::from([(4, 0), (7, 0)])
1835 );
1836 assert_eq!(
1837 *data_file.nan_value_counts(),
1838 HashMap::from([(4, 1), (7, 1)])
1839 );
1840
1841 let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1843 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1844
1845 Ok(())
1846 }
1847
1848 #[tokio::test]
1849 async fn test_nan_val_cnts_list_type() -> Result<()> {
1850 let temp_dir = TempDir::new().unwrap();
1851 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1852 let location_gen = DefaultLocationGenerator::with_data_location(
1853 temp_dir.path().to_str().unwrap().to_string(),
1854 );
1855 let file_name_gen =
1856 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1857
1858 let schema_list_float_field = Field::new("element", DataType::Float32, true).with_metadata(
1859 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1860 );
1861
1862 let schema_struct_list_float_field = Field::new("element", DataType::Float32, true)
1863 .with_metadata(HashMap::from([(
1864 PARQUET_FIELD_ID_META_KEY.to_string(),
1865 "4".to_string(),
1866 )]));
1867
1868 let schema_struct_list_field = Fields::from(vec![
1869 Field::new_list("col2", schema_struct_list_float_field.clone(), true).with_metadata(
1870 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
1871 ),
1872 ]);
1873
1874 let arrow_schema = {
1875 let fields = vec![
1876 Field::new_list("col0", schema_list_float_field.clone(), true).with_metadata(
1877 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1878 ),
1879 Field::new_struct("col1", schema_struct_list_field.clone(), true)
1880 .with_metadata(HashMap::from([(
1881 PARQUET_FIELD_ID_META_KEY.to_string(),
1882 "2".to_string(),
1883 )]))
1884 .clone(),
1885 ];
1889 Arc::new(arrow_schema::Schema::new(fields))
1890 };
1891
1892 let list_parts = ListArray::from_iter_primitive::<Float32Type, _, _>(vec![Some(vec![
1893 Some(1.0_f32),
1894 Some(f32::NAN),
1895 Some(2.0),
1896 Some(2.0),
1897 ])])
1898 .into_parts();
1899
1900 let list_float_field_col = Arc::new({
1901 let list_parts = list_parts.clone();
1902 ListArray::new(
1903 {
1904 if let DataType::List(field) = arrow_schema.field(0).data_type() {
1905 field.clone()
1906 } else {
1907 unreachable!()
1908 }
1909 },
1910 list_parts.1,
1911 list_parts.2,
1912 list_parts.3,
1913 )
1914 }) as ArrayRef;
1915
1916 let struct_list_fields_schema =
1917 if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
1918 fields.clone()
1919 } else {
1920 unreachable!()
1921 };
1922
1923 let struct_list_float_field_col = Arc::new({
1924 ListArray::new(
1925 {
1926 if let DataType::List(field) = struct_list_fields_schema
1927 .first()
1928 .expect("could not find first list field")
1929 .data_type()
1930 {
1931 field.clone()
1932 } else {
1933 unreachable!()
1934 }
1935 },
1936 list_parts.1,
1937 list_parts.2,
1938 list_parts.3,
1939 )
1940 }) as ArrayRef;
1941
1942 let struct_list_float_field_col = Arc::new(StructArray::new(
1943 struct_list_fields_schema,
1944 vec![struct_list_float_field_col.clone()],
1945 None,
1946 )) as ArrayRef;
1947
1948 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1949 list_float_field_col,
1950 struct_list_float_field_col,
1951 ])
1953 .expect("Could not form record batch");
1954 let output_file = file_io.new_output(
1955 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1956 )?;
1957
1958 let mut pw = ParquetWriterBuilder::new(
1960 WriterProperties::builder().build(),
1961 Arc::new(
1962 to_write
1963 .schema()
1964 .as_ref()
1965 .try_into()
1966 .expect("Could not convert iceberg schema"),
1967 ),
1968 )
1969 .build(output_file)
1970 .await?;
1971
1972 pw.write(&to_write).await?;
1973 let res = pw.close().await?;
1974 assert_eq!(res.len(), 1);
1975 let data_file = res
1976 .into_iter()
1977 .next()
1978 .unwrap()
1979 .content(crate::spec::DataContentType::Data)
1980 .partition(Struct::empty())
1981 .partition_spec_id(0)
1982 .build()
1983 .unwrap();
1984
1985 assert_eq!(data_file.record_count(), 1);
1987 assert_eq!(*data_file.value_counts(), HashMap::from([(1, 4), (4, 4)]));
1988 assert_eq!(
1989 *data_file.lower_bounds(),
1990 HashMap::from([(1, Datum::float(1.0)), (4, Datum::float(1.0))])
1991 );
1992 assert_eq!(
1993 *data_file.upper_bounds(),
1994 HashMap::from([(1, Datum::float(2.0)), (4, Datum::float(2.0))])
1995 );
1996 assert_eq!(
1997 *data_file.null_value_counts(),
1998 HashMap::from([(1, 0), (4, 0)])
1999 );
2000 assert_eq!(
2001 *data_file.nan_value_counts(),
2002 HashMap::from([(1, 1), (4, 1)])
2003 );
2004
2005 let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2007 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2008
2009 Ok(())
2010 }
2011
2012 macro_rules! construct_map_arr {
2013 ($map_key_field_schema:ident, $map_value_field_schema:ident) => {{
2014 let int_builder = Int32Builder::new();
2015 let float_builder = Float32Builder::with_capacity(4);
2016 let mut builder = MapBuilder::new(None, int_builder, float_builder);
2017 builder.keys().append_value(1);
2018 builder.values().append_value(1.0_f32);
2019 builder.append(true).unwrap();
2020 builder.keys().append_value(2);
2021 builder.values().append_value(f32::NAN);
2022 builder.append(true).unwrap();
2023 builder.keys().append_value(3);
2024 builder.values().append_value(2.0);
2025 builder.append(true).unwrap();
2026 builder.keys().append_value(4);
2027 builder.values().append_value(2.0);
2028 builder.append(true).unwrap();
2029 let array = builder.finish();
2030
2031 let (_field, offsets, entries, nulls, ordered) = array.into_parts();
2032 let new_struct_fields_schema =
2033 Fields::from(vec![$map_key_field_schema, $map_value_field_schema]);
2034
2035 let entries = {
2036 let (_, arrays, nulls) = entries.into_parts();
2037 StructArray::new(new_struct_fields_schema.clone(), arrays, nulls)
2038 };
2039
2040 let field = Arc::new(Field::new(
2041 DEFAULT_MAP_FIELD_NAME,
2042 DataType::Struct(new_struct_fields_schema),
2043 false,
2044 ));
2045
2046 Arc::new(MapArray::new(field, offsets, entries, nulls, ordered))
2047 }};
2048 }
2049
2050 #[tokio::test]
2051 async fn test_nan_val_cnts_map_type() -> Result<()> {
2052 let temp_dir = TempDir::new().unwrap();
2053 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2054 let location_gen = DefaultLocationGenerator::with_data_location(
2055 temp_dir.path().to_str().unwrap().to_string(),
2056 );
2057 let file_name_gen =
2058 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2059
2060 let map_key_field_schema =
2061 Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2062 (PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()),
2063 ]));
2064
2065 let map_value_field_schema =
2066 Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2067 [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
2068 ));
2069
2070 let struct_map_key_field_schema =
2071 Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2072 (PARQUET_FIELD_ID_META_KEY.to_string(), "6".to_string()),
2073 ]));
2074
2075 let struct_map_value_field_schema =
2076 Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2077 [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())],
2078 ));
2079
2080 let schema_struct_map_field = Fields::from(vec![
2081 Field::new_map(
2082 "col3",
2083 DEFAULT_MAP_FIELD_NAME,
2084 struct_map_key_field_schema.clone(),
2085 struct_map_value_field_schema.clone(),
2086 false,
2087 false,
2088 )
2089 .with_metadata(HashMap::from([(
2090 PARQUET_FIELD_ID_META_KEY.to_string(),
2091 "5".to_string(),
2092 )])),
2093 ]);
2094
2095 let arrow_schema = {
2096 let fields = vec![
2097 Field::new_map(
2098 "col0",
2099 DEFAULT_MAP_FIELD_NAME,
2100 map_key_field_schema.clone(),
2101 map_value_field_schema.clone(),
2102 false,
2103 false,
2104 )
2105 .with_metadata(HashMap::from([(
2106 PARQUET_FIELD_ID_META_KEY.to_string(),
2107 "0".to_string(),
2108 )])),
2109 Field::new_struct("col1", schema_struct_map_field.clone(), true)
2110 .with_metadata(HashMap::from([(
2111 PARQUET_FIELD_ID_META_KEY.to_string(),
2112 "3".to_string(),
2113 )]))
2114 .clone(),
2115 ];
2116 Arc::new(arrow_schema::Schema::new(fields))
2117 };
2118
2119 let map_array = construct_map_arr!(map_key_field_schema, map_value_field_schema);
2120
2121 let struct_map_arr =
2122 construct_map_arr!(struct_map_key_field_schema, struct_map_value_field_schema);
2123
2124 let struct_list_float_field_col = Arc::new(StructArray::new(
2125 schema_struct_map_field,
2126 vec![struct_map_arr],
2127 None,
2128 )) as ArrayRef;
2129
2130 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
2131 map_array,
2132 struct_list_float_field_col,
2133 ])
2134 .expect("Could not form record batch");
2135 let output_file = file_io.new_output(
2136 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
2137 )?;
2138
2139 let mut pw = ParquetWriterBuilder::new(
2141 WriterProperties::builder().build(),
2142 Arc::new(
2143 to_write
2144 .schema()
2145 .as_ref()
2146 .try_into()
2147 .expect("Could not convert iceberg schema"),
2148 ),
2149 )
2150 .build(output_file)
2151 .await?;
2152
2153 pw.write(&to_write).await?;
2154 let res = pw.close().await?;
2155 assert_eq!(res.len(), 1);
2156 let data_file = res
2157 .into_iter()
2158 .next()
2159 .unwrap()
2160 .content(crate::spec::DataContentType::Data)
2161 .partition(Struct::empty())
2162 .partition_spec_id(0)
2163 .build()
2164 .unwrap();
2165
2166 assert_eq!(data_file.record_count(), 4);
2168 assert_eq!(
2169 *data_file.value_counts(),
2170 HashMap::from([(1, 4), (2, 4), (6, 4), (7, 4)])
2171 );
2172 assert_eq!(
2173 *data_file.lower_bounds(),
2174 HashMap::from([
2175 (1, Datum::int(1)),
2176 (2, Datum::float(1.0)),
2177 (6, Datum::int(1)),
2178 (7, Datum::float(1.0))
2179 ])
2180 );
2181 assert_eq!(
2182 *data_file.upper_bounds(),
2183 HashMap::from([
2184 (1, Datum::int(4)),
2185 (2, Datum::float(2.0)),
2186 (6, Datum::int(4)),
2187 (7, Datum::float(2.0))
2188 ])
2189 );
2190 assert_eq!(
2191 *data_file.null_value_counts(),
2192 HashMap::from([(1, 0), (2, 0), (6, 0), (7, 0)])
2193 );
2194 assert_eq!(
2195 *data_file.nan_value_counts(),
2196 HashMap::from([(2, 1), (7, 1)])
2197 );
2198
2199 let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2201 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2202
2203 Ok(())
2204 }
2205
2206 #[tokio::test]
2207 async fn test_write_empty_parquet_file() {
2208 let temp_dir = TempDir::new().unwrap();
2209 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2210 let location_gen = DefaultLocationGenerator::with_data_location(
2211 temp_dir.path().to_str().unwrap().to_string(),
2212 );
2213 let file_name_gen =
2214 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2215 let output_file = file_io
2216 .new_output(location_gen.generate_location(None, &file_name_gen.generate_file_name()))
2217 .unwrap();
2218
2219 let pw = ParquetWriterBuilder::new(
2221 WriterProperties::builder().build(),
2222 Arc::new(
2223 Schema::builder()
2224 .with_schema_id(1)
2225 .with_fields(vec![
2226 NestedField::required(0, "col", Type::Primitive(PrimitiveType::Long))
2227 .with_id(0)
2228 .into(),
2229 ])
2230 .build()
2231 .expect("Failed to create schema"),
2232 ),
2233 )
2234 .build(output_file)
2235 .await
2236 .unwrap();
2237
2238 let res = pw.close().await.unwrap();
2239 assert_eq!(res.len(), 0);
2240
2241 assert_eq!(std::fs::read_dir(temp_dir.path()).unwrap().count(), 0);
2243 }
2244
2245 #[test]
2246 fn test_min_max_aggregator() {
2247 let schema = Arc::new(
2248 Schema::builder()
2249 .with_schema_id(1)
2250 .with_fields(vec![
2251 NestedField::required(0, "col", Type::Primitive(PrimitiveType::Int))
2252 .with_id(0)
2253 .into(),
2254 ])
2255 .build()
2256 .expect("Failed to create schema"),
2257 );
2258 let mut min_max_agg = MinMaxColAggregator::new(schema);
2259 let create_statistics =
2260 |min, max| Statistics::Int32(ValueStatistics::new(min, max, None, None, false));
2261 min_max_agg
2262 .update(0, create_statistics(None, Some(42)))
2263 .unwrap();
2264 min_max_agg
2265 .update(0, create_statistics(Some(0), Some(i32::MAX)))
2266 .unwrap();
2267 min_max_agg
2268 .update(0, create_statistics(Some(i32::MIN), None))
2269 .unwrap();
2270 min_max_agg
2271 .update(0, create_statistics(None, None))
2272 .unwrap();
2273
2274 let (lower_bounds, upper_bounds) = min_max_agg.produce();
2275
2276 assert_eq!(lower_bounds, HashMap::from([(0, Datum::int(i32::MIN))]));
2277 assert_eq!(upper_bounds, HashMap::from([(0, Datum::int(i32::MAX))]));
2278 }
2279}