1use std::collections::HashMap;
21use std::sync::Arc;
22
23use arrow_schema::SchemaRef as ArrowSchemaRef;
24use bytes::Bytes;
25use futures::future::BoxFuture;
26use itertools::Itertools;
27use parquet::arrow::AsyncArrowWriter;
28use parquet::arrow::async_reader::AsyncFileReader;
29use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter;
30use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
31use parquet::file::properties::WriterProperties;
32use parquet::file::statistics::Statistics;
33use parquet::format::FileMetaData;
34use parquet::thrift::{TCompactOutputProtocol, TSerializable};
35use thrift::protocol::TOutputProtocol;
36
37use super::{FileWriter, FileWriterBuilder};
38use crate::arrow::{
39 ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor,
40 get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum,
41};
42use crate::io::{FileIO, FileWrite, OutputFile};
43use crate::spec::{
44 DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType,
45 NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct,
46 StructType, TableMetadata, Type, visit_schema,
47};
48use crate::transform::create_transform_function;
49use crate::writer::{CurrentFileStatus, DataFile};
50use crate::{Error, ErrorKind, Result};
51
52#[derive(Clone, Debug)]
54pub struct ParquetWriterBuilder {
55 props: WriterProperties,
56 schema: SchemaRef,
57 match_mode: FieldMatchMode,
58}
59
60impl ParquetWriterBuilder {
61 pub fn new(props: WriterProperties, schema: SchemaRef) -> Self {
64 Self::new_with_match_mode(props, schema, FieldMatchMode::Id)
65 }
66
67 pub fn new_with_match_mode(
69 props: WriterProperties,
70 schema: SchemaRef,
71 match_mode: FieldMatchMode,
72 ) -> Self {
73 Self {
74 props,
75 schema,
76 match_mode,
77 }
78 }
79}
80
81impl FileWriterBuilder for ParquetWriterBuilder {
82 type R = ParquetWriter;
83
84 async fn build(&self, output_file: OutputFile) -> Result<Self::R> {
85 Ok(ParquetWriter {
86 schema: self.schema.clone(),
87 inner_writer: None,
88 writer_properties: self.props.clone(),
89 current_row_num: 0,
90 output_file,
91 nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode),
92 })
93 }
94}
95
96struct IndexByParquetPathName {
98 name_to_id: HashMap<String, i32>,
99
100 field_names: Vec<String>,
101
102 field_id: i32,
103}
104
105impl IndexByParquetPathName {
106 pub fn new() -> Self {
108 Self {
109 name_to_id: HashMap::new(),
110 field_names: Vec::new(),
111 field_id: 0,
112 }
113 }
114
115 pub fn get(&self, name: &str) -> Option<&i32> {
117 self.name_to_id.get(name)
118 }
119}
120
121impl Default for IndexByParquetPathName {
122 fn default() -> Self {
123 Self::new()
124 }
125}
126
127impl SchemaVisitor for IndexByParquetPathName {
128 type T = ();
129
130 fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> {
131 self.field_names.push(field.name.to_string());
132 self.field_id = field.id;
133 Ok(())
134 }
135
136 fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> {
137 self.field_names.pop();
138 Ok(())
139 }
140
141 fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> {
142 self.field_names.push(format!("list.{}", field.name));
143 self.field_id = field.id;
144 Ok(())
145 }
146
147 fn after_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> {
148 self.field_names.pop();
149 Ok(())
150 }
151
152 fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> {
153 self.field_names
154 .push(format!("{DEFAULT_MAP_FIELD_NAME}.key"));
155 self.field_id = field.id;
156 Ok(())
157 }
158
159 fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> {
160 self.field_names.pop();
161 Ok(())
162 }
163
164 fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> {
165 self.field_names
166 .push(format!("{DEFAULT_MAP_FIELD_NAME}.value"));
167 self.field_id = field.id;
168 Ok(())
169 }
170
171 fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> {
172 self.field_names.pop();
173 Ok(())
174 }
175
176 fn schema(&mut self, _schema: &Schema, _value: Self::T) -> Result<Self::T> {
177 Ok(())
178 }
179
180 fn field(&mut self, _field: &NestedFieldRef, _value: Self::T) -> Result<Self::T> {
181 Ok(())
182 }
183
184 fn r#struct(&mut self, _struct: &StructType, _results: Vec<Self::T>) -> Result<Self::T> {
185 Ok(())
186 }
187
188 fn list(&mut self, _list: &ListType, _value: Self::T) -> Result<Self::T> {
189 Ok(())
190 }
191
192 fn map(&mut self, _map: &MapType, _key_value: Self::T, _value: Self::T) -> Result<Self::T> {
193 Ok(())
194 }
195
196 fn primitive(&mut self, _p: &PrimitiveType) -> Result<Self::T> {
197 let full_name = self.field_names.iter().map(String::as_str).join(".");
198 let field_id = self.field_id;
199 if let Some(existing_field_id) = self.name_to_id.get(full_name.as_str()) {
200 return Err(Error::new(
201 ErrorKind::DataInvalid,
202 format!(
203 "Invalid schema: multiple fields for name {full_name}: {field_id} and {existing_field_id}"
204 ),
205 ));
206 } else {
207 self.name_to_id.insert(full_name, field_id);
208 }
209
210 Ok(())
211 }
212}
213
214pub struct ParquetWriter {
216 schema: SchemaRef,
217 output_file: OutputFile,
218 inner_writer: Option<AsyncArrowWriter<AsyncFileWriter<Box<dyn FileWrite>>>>,
219 writer_properties: WriterProperties,
220 current_row_num: usize,
221 nan_value_count_visitor: NanValueCountVisitor,
222}
223
224struct MinMaxColAggregator {
226 lower_bounds: HashMap<i32, Datum>,
227 upper_bounds: HashMap<i32, Datum>,
228 schema: SchemaRef,
229}
230
231impl MinMaxColAggregator {
232 fn new(schema: SchemaRef) -> Self {
234 Self {
235 lower_bounds: HashMap::new(),
236 upper_bounds: HashMap::new(),
237 schema,
238 }
239 }
240
241 fn update_state_min(&mut self, field_id: i32, datum: Datum) {
242 self.lower_bounds
243 .entry(field_id)
244 .and_modify(|e| {
245 if *e > datum {
246 *e = datum.clone()
247 }
248 })
249 .or_insert(datum);
250 }
251
252 fn update_state_max(&mut self, field_id: i32, datum: Datum) {
253 self.upper_bounds
254 .entry(field_id)
255 .and_modify(|e| {
256 if *e < datum {
257 *e = datum.clone()
258 }
259 })
260 .or_insert(datum);
261 }
262
263 fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> {
265 let Some(ty) = self
266 .schema
267 .field_by_id(field_id)
268 .map(|f| f.field_type.as_ref())
269 else {
270 return Ok(());
273 };
274 let Type::Primitive(ty) = ty.clone() else {
275 return Err(Error::new(
276 ErrorKind::Unexpected,
277 format!("Composed type {ty} is not supported for min max aggregation."),
278 ));
279 };
280
281 if value.min_is_exact() {
282 let Some(min_datum) = get_parquet_stat_min_as_datum(&ty, &value)? else {
283 return Err(Error::new(
284 ErrorKind::Unexpected,
285 format!("Statistics {value} is not match with field type {ty}."),
286 ));
287 };
288
289 self.update_state_min(field_id, min_datum);
290 }
291
292 if value.max_is_exact() {
293 let Some(max_datum) = get_parquet_stat_max_as_datum(&ty, &value)? else {
294 return Err(Error::new(
295 ErrorKind::Unexpected,
296 format!("Statistics {value} is not match with field type {ty}."),
297 ));
298 };
299
300 self.update_state_max(field_id, max_datum);
301 }
302
303 Ok(())
304 }
305
306 fn produce(self) -> (HashMap<i32, Datum>, HashMap<i32, Datum>) {
308 (self.lower_bounds, self.upper_bounds)
309 }
310}
311
312impl ParquetWriter {
313 #[allow(dead_code)]
315 pub(crate) async fn parquet_files_to_data_files(
316 file_io: &FileIO,
317 file_paths: Vec<String>,
318 table_metadata: &TableMetadata,
319 ) -> Result<Vec<DataFile>> {
320 let mut data_files: Vec<DataFile> = Vec::new();
322
323 for file_path in file_paths {
324 let input_file = file_io.new_input(&file_path)?;
325 let file_metadata = input_file.metadata().await?;
326 let file_size_in_bytes = file_metadata.size as usize;
327 let reader = input_file.reader().await?;
328
329 let mut parquet_reader = ArrowFileReader::new(file_metadata, reader);
330 let parquet_metadata = parquet_reader.get_metadata(None).await.map_err(|err| {
331 Error::new(
332 ErrorKind::DataInvalid,
333 format!("Error reading Parquet metadata: {err}"),
334 )
335 })?;
336 let mut builder = ParquetWriter::parquet_to_data_file_builder(
337 table_metadata.current_schema().clone(),
338 parquet_metadata,
339 file_size_in_bytes,
340 file_path,
341 HashMap::new(),
343 )?;
344 builder.partition_spec_id(table_metadata.default_partition_spec_id());
345 let data_file = builder.build().unwrap();
346 data_files.push(data_file);
347 }
348
349 Ok(data_files)
350 }
351
352 fn thrift_to_parquet_metadata(&self, file_metadata: FileMetaData) -> Result<ParquetMetaData> {
353 let mut buffer = Vec::new();
354 {
355 let mut protocol = TCompactOutputProtocol::new(&mut buffer);
356 file_metadata
357 .write_to_out_protocol(&mut protocol)
358 .map_err(|err| {
359 Error::new(ErrorKind::Unexpected, "Failed to write parquet metadata")
360 .with_source(err)
361 })?;
362
363 protocol.flush().map_err(|err| {
364 Error::new(ErrorKind::Unexpected, "Failed to flush protocol").with_source(err)
365 })?;
366 }
367
368 let parquet_metadata = ParquetMetaDataReader::decode_metadata(&buffer).map_err(|err| {
369 Error::new(ErrorKind::Unexpected, "Failed to decode parquet metadata").with_source(err)
370 })?;
371
372 Ok(parquet_metadata)
373 }
374
375 pub(crate) fn parquet_to_data_file_builder(
377 schema: SchemaRef,
378 metadata: Arc<ParquetMetaData>,
379 written_size: usize,
380 file_path: String,
381 nan_value_counts: HashMap<i32, u64>,
382 ) -> Result<DataFileBuilder> {
383 let index_by_parquet_path = {
384 let mut visitor = IndexByParquetPathName::new();
385 visit_schema(&schema, &mut visitor)?;
386 visitor
387 };
388
389 let (column_sizes, value_counts, null_value_counts, (lower_bounds, upper_bounds)) = {
390 let mut per_col_size: HashMap<i32, u64> = HashMap::new();
391 let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
392 let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
393 let mut min_max_agg = MinMaxColAggregator::new(schema);
394
395 for row_group in metadata.row_groups() {
396 for column_chunk_metadata in row_group.columns() {
397 let parquet_path = column_chunk_metadata.column_descr().path().string();
398
399 let Some(&field_id) = index_by_parquet_path.get(&parquet_path) else {
400 continue;
401 };
402
403 *per_col_size.entry(field_id).or_insert(0) +=
404 column_chunk_metadata.compressed_size() as u64;
405 *per_col_val_num.entry(field_id).or_insert(0) +=
406 column_chunk_metadata.num_values() as u64;
407
408 if let Some(statistics) = column_chunk_metadata.statistics() {
409 if let Some(null_count) = statistics.null_count_opt() {
410 *per_col_null_val_num.entry(field_id).or_insert(0) += null_count;
411 }
412
413 min_max_agg.update(field_id, statistics.clone())?;
414 }
415 }
416 }
417 (
418 per_col_size,
419 per_col_val_num,
420 per_col_null_val_num,
421 min_max_agg.produce(),
422 )
423 };
424
425 let mut builder = DataFileBuilder::default();
426 builder
427 .content(DataContentType::Data)
428 .file_path(file_path)
429 .file_format(DataFileFormat::Parquet)
430 .partition(Struct::empty())
431 .record_count(metadata.file_metadata().num_rows() as u64)
432 .file_size_in_bytes(written_size as u64)
433 .column_sizes(column_sizes)
434 .value_counts(value_counts)
435 .null_value_counts(null_value_counts)
436 .nan_value_counts(nan_value_counts)
437 .lower_bounds(lower_bounds)
440 .upper_bounds(upper_bounds)
441 .split_offsets(
442 metadata
443 .row_groups()
444 .iter()
445 .filter_map(|group| group.file_offset())
446 .collect(),
447 );
448
449 Ok(builder)
450 }
451
452 #[allow(dead_code)]
453 fn partition_value_from_bounds(
454 table_spec: Arc<PartitionSpec>,
455 lower_bounds: &HashMap<i32, Datum>,
456 upper_bounds: &HashMap<i32, Datum>,
457 ) -> Result<Struct> {
458 let mut partition_literals: Vec<Option<Literal>> = Vec::new();
459
460 for field in table_spec.fields() {
461 if let (Some(lower), Some(upper)) = (
462 lower_bounds.get(&field.source_id),
463 upper_bounds.get(&field.source_id),
464 ) {
465 if !field.transform.preserves_order() {
466 return Err(Error::new(
467 ErrorKind::DataInvalid,
468 format!(
469 "cannot infer partition value for non linear partition field (needs to preserve order): {} with transform {}",
470 field.name, field.transform
471 ),
472 ));
473 }
474
475 if lower != upper {
476 return Err(Error::new(
477 ErrorKind::DataInvalid,
478 format!(
479 "multiple partition values for field {}: lower: {:?}, upper: {:?}",
480 field.name, lower, upper
481 ),
482 ));
483 }
484
485 let transform_fn = create_transform_function(&field.transform)?;
486 let transform_literal =
487 Literal::from(transform_fn.transform_literal_result(lower)?);
488
489 partition_literals.push(Some(transform_literal));
490 } else {
491 partition_literals.push(None);
492 }
493 }
494
495 let partition_struct = Struct::from_iter(partition_literals);
496
497 Ok(partition_struct)
498 }
499}
500
501impl FileWriter for ParquetWriter {
502 async fn write(&mut self, batch: &arrow_array::RecordBatch) -> Result<()> {
503 if batch.num_rows() == 0 {
505 return Ok(());
506 }
507
508 self.current_row_num += batch.num_rows();
509
510 let batch_c = batch.clone();
511 self.nan_value_count_visitor
512 .compute(self.schema.clone(), batch_c)?;
513
514 let writer = if let Some(writer) = &mut self.inner_writer {
516 writer
517 } else {
518 let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
519 let inner_writer = self.output_file.writer().await?;
520 let async_writer = AsyncFileWriter::new(inner_writer);
521 let writer = AsyncArrowWriter::try_new(
522 async_writer,
523 arrow_schema.clone(),
524 Some(self.writer_properties.clone()),
525 )
526 .map_err(|err| {
527 Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.")
528 .with_source(err)
529 })?;
530 self.inner_writer = Some(writer);
531 self.inner_writer.as_mut().unwrap()
532 };
533
534 writer.write(batch).await.map_err(|err| {
535 Error::new(
536 ErrorKind::Unexpected,
537 "Failed to write using parquet writer.",
538 )
539 .with_source(err)
540 })?;
541
542 Ok(())
543 }
544
545 async fn close(mut self) -> Result<Vec<DataFileBuilder>> {
546 let mut writer = match self.inner_writer.take() {
547 Some(writer) => writer,
548 None => return Ok(vec![]),
549 };
550
551 let metadata = writer.finish().await.map_err(|err| {
552 Error::new(ErrorKind::Unexpected, "Failed to finish parquet writer.").with_source(err)
553 })?;
554
555 let written_size = writer.bytes_written();
556
557 if self.current_row_num == 0 {
558 self.output_file.delete().await.map_err(|err| {
559 Error::new(
560 ErrorKind::Unexpected,
561 "Failed to delete empty parquet file.",
562 )
563 .with_source(err)
564 })?;
565 Ok(vec![])
566 } else {
567 let parquet_metadata =
568 Arc::new(self.thrift_to_parquet_metadata(metadata).map_err(|err| {
569 Error::new(
570 ErrorKind::Unexpected,
571 "Failed to convert metadata from thrift to parquet.",
572 )
573 .with_source(err)
574 })?);
575
576 Ok(vec![Self::parquet_to_data_file_builder(
577 self.schema,
578 parquet_metadata,
579 written_size,
580 self.output_file.location().to_string(),
581 self.nan_value_count_visitor.nan_value_counts,
582 )?])
583 }
584 }
585}
586
587impl CurrentFileStatus for ParquetWriter {
588 fn current_file_path(&self) -> String {
589 self.output_file.location().to_string()
590 }
591
592 fn current_row_num(&self) -> usize {
593 self.current_row_num
594 }
595
596 fn current_written_size(&self) -> usize {
597 if let Some(inner) = self.inner_writer.as_ref() {
598 inner.bytes_written() + inner.in_progress_size()
601 } else {
602 0
604 }
605 }
606}
607
608struct AsyncFileWriter<W: FileWrite>(W);
614
615impl<W: FileWrite> AsyncFileWriter<W> {
616 pub fn new(writer: W) -> Self {
618 Self(writer)
619 }
620}
621
622impl<W: FileWrite> ArrowAsyncFileWriter for AsyncFileWriter<W> {
623 fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
624 Box::pin(async {
625 self.0
626 .write(bs)
627 .await
628 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
629 })
630 }
631
632 fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
633 Box::pin(async {
634 self.0
635 .close()
636 .await
637 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
638 })
639 }
640}
641
642#[cfg(test)]
643mod tests {
644 use std::collections::HashMap;
645 use std::sync::Arc;
646
647 use anyhow::Result;
648 use arrow_array::builder::{Float32Builder, Int32Builder, MapBuilder};
649 use arrow_array::types::{Float32Type, Int64Type};
650 use arrow_array::{
651 Array, ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array,
652 Int64Array, ListArray, MapArray, RecordBatch, StructArray,
653 };
654 use arrow_schema::{DataType, Field, Fields, SchemaRef as ArrowSchemaRef};
655 use arrow_select::concat::concat_batches;
656 use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
657 use parquet::file::statistics::ValueStatistics;
658 use rust_decimal::Decimal;
659 use tempfile::TempDir;
660 use uuid::Uuid;
661
662 use super::*;
663 use crate::arrow::schema_to_arrow_schema;
664 use crate::io::FileIOBuilder;
665 use crate::spec::{PrimitiveLiteral, Struct, *};
666 use crate::writer::file_writer::location_generator::{
667 DefaultFileNameGenerator, DefaultLocationGenerator, FileNameGenerator, LocationGenerator,
668 };
669 use crate::writer::tests::check_parquet_data_file;
670
671 fn schema_for_all_type() -> Schema {
672 Schema::builder()
673 .with_schema_id(1)
674 .with_fields(vec![
675 NestedField::optional(0, "boolean", Type::Primitive(PrimitiveType::Boolean)).into(),
676 NestedField::optional(1, "int", Type::Primitive(PrimitiveType::Int)).into(),
677 NestedField::optional(2, "long", Type::Primitive(PrimitiveType::Long)).into(),
678 NestedField::optional(3, "float", Type::Primitive(PrimitiveType::Float)).into(),
679 NestedField::optional(4, "double", Type::Primitive(PrimitiveType::Double)).into(),
680 NestedField::optional(5, "string", Type::Primitive(PrimitiveType::String)).into(),
681 NestedField::optional(6, "binary", Type::Primitive(PrimitiveType::Binary)).into(),
682 NestedField::optional(7, "date", Type::Primitive(PrimitiveType::Date)).into(),
683 NestedField::optional(8, "time", Type::Primitive(PrimitiveType::Time)).into(),
684 NestedField::optional(9, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
685 .into(),
686 NestedField::optional(
687 10,
688 "timestamptz",
689 Type::Primitive(PrimitiveType::Timestamptz),
690 )
691 .into(),
692 NestedField::optional(
693 11,
694 "timestamp_ns",
695 Type::Primitive(PrimitiveType::TimestampNs),
696 )
697 .into(),
698 NestedField::optional(
699 12,
700 "timestamptz_ns",
701 Type::Primitive(PrimitiveType::TimestamptzNs),
702 )
703 .into(),
704 NestedField::optional(
705 13,
706 "decimal",
707 Type::Primitive(PrimitiveType::Decimal {
708 precision: 10,
709 scale: 5,
710 }),
711 )
712 .into(),
713 NestedField::optional(14, "uuid", Type::Primitive(PrimitiveType::Uuid)).into(),
714 NestedField::optional(15, "fixed", Type::Primitive(PrimitiveType::Fixed(10)))
715 .into(),
716 NestedField::optional(
719 16,
720 "decimal_38",
721 Type::Primitive(PrimitiveType::Decimal {
722 precision: 38,
723 scale: 5,
724 }),
725 )
726 .into(),
727 ])
728 .build()
729 .unwrap()
730 }
731
732 fn nested_schema_for_test() -> Schema {
733 Schema::builder()
735 .with_schema_id(1)
736 .with_fields(vec![
737 NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Long)).into(),
738 NestedField::required(
739 1,
740 "col1",
741 Type::Struct(StructType::new(vec![
742 NestedField::required(5, "col_1_5", Type::Primitive(PrimitiveType::Long))
743 .into(),
744 NestedField::required(6, "col_1_6", Type::Primitive(PrimitiveType::Long))
745 .into(),
746 ])),
747 )
748 .into(),
749 NestedField::required(2, "col2", Type::Primitive(PrimitiveType::String)).into(),
750 NestedField::required(
751 3,
752 "col3",
753 Type::List(ListType::new(
754 NestedField::required(7, "element", Type::Primitive(PrimitiveType::Long))
755 .into(),
756 )),
757 )
758 .into(),
759 NestedField::required(
760 4,
761 "col4",
762 Type::Struct(StructType::new(vec![
763 NestedField::required(
764 8,
765 "col_4_8",
766 Type::Struct(StructType::new(vec![
767 NestedField::required(
768 9,
769 "col_4_8_9",
770 Type::Primitive(PrimitiveType::Long),
771 )
772 .into(),
773 ])),
774 )
775 .into(),
776 ])),
777 )
778 .into(),
779 NestedField::required(
780 10,
781 "col5",
782 Type::Map(MapType::new(
783 NestedField::required(11, "key", Type::Primitive(PrimitiveType::String))
784 .into(),
785 NestedField::required(
786 12,
787 "value",
788 Type::List(ListType::new(
789 NestedField::required(
790 13,
791 "item",
792 Type::Primitive(PrimitiveType::Long),
793 )
794 .into(),
795 )),
796 )
797 .into(),
798 )),
799 )
800 .into(),
801 ])
802 .build()
803 .unwrap()
804 }
805
806 #[tokio::test]
807 async fn test_index_by_parquet_path() {
808 let expect = HashMap::from([
809 ("col0".to_string(), 0),
810 ("col1.col_1_5".to_string(), 5),
811 ("col1.col_1_6".to_string(), 6),
812 ("col2".to_string(), 2),
813 ("col3.list.element".to_string(), 7),
814 ("col4.col_4_8.col_4_8_9".to_string(), 9),
815 ("col5.key_value.key".to_string(), 11),
816 ("col5.key_value.value.list.item".to_string(), 13),
817 ]);
818 let mut visitor = IndexByParquetPathName::new();
819 visit_schema(&nested_schema_for_test(), &mut visitor).unwrap();
820 assert_eq!(visitor.name_to_id, expect);
821 }
822
823 #[tokio::test]
824 async fn test_parquet_writer() -> Result<()> {
825 let temp_dir = TempDir::new().unwrap();
826 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
827 let location_gen = DefaultLocationGenerator::with_data_location(
828 temp_dir.path().to_str().unwrap().to_string(),
829 );
830 let file_name_gen =
831 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
832
833 let schema = {
835 let fields =
836 vec![
837 Field::new("col", DataType::Int64, true).with_metadata(HashMap::from([(
838 PARQUET_FIELD_ID_META_KEY.to_string(),
839 "0".to_string(),
840 )])),
841 ];
842 Arc::new(arrow_schema::Schema::new(fields))
843 };
844 let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
845 let null_col = Arc::new(Int64Array::new_null(1024)) as ArrayRef;
846 let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
847 let to_write_null = RecordBatch::try_new(schema.clone(), vec![null_col]).unwrap();
848
849 let output_file = file_io.new_output(
850 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
851 )?;
852
853 let mut pw = ParquetWriterBuilder::new(
855 WriterProperties::builder()
856 .set_max_row_group_size(128)
857 .build(),
858 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
859 )
860 .build(output_file)
861 .await?;
862 pw.write(&to_write).await?;
863 pw.write(&to_write_null).await?;
864 let res = pw.close().await?;
865 assert_eq!(res.len(), 1);
866 let data_file = res
867 .into_iter()
868 .next()
869 .unwrap()
870 .content(DataContentType::Data)
872 .partition(Struct::empty())
873 .partition_spec_id(0)
874 .build()
875 .unwrap();
876
877 assert_eq!(data_file.record_count(), 2048);
879 assert_eq!(*data_file.value_counts(), HashMap::from([(0, 2048)]));
880 assert_eq!(
881 *data_file.lower_bounds(),
882 HashMap::from([(0, Datum::long(0))])
883 );
884 assert_eq!(
885 *data_file.upper_bounds(),
886 HashMap::from([(0, Datum::long(1023))])
887 );
888 assert_eq!(*data_file.null_value_counts(), HashMap::from([(0, 1024)]));
889
890 let expect_batch = concat_batches(&schema, vec![&to_write, &to_write_null]).unwrap();
892 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
893
894 Ok(())
895 }
896
897 #[tokio::test]
898 async fn test_parquet_writer_with_complex_schema() -> Result<()> {
899 let temp_dir = TempDir::new().unwrap();
900 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
901 let location_gen = DefaultLocationGenerator::with_data_location(
902 temp_dir.path().to_str().unwrap().to_string(),
903 );
904 let file_name_gen =
905 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
906
907 let schema = nested_schema_for_test();
909 let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
910 let col0 = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
911 let col1 = Arc::new(StructArray::new(
912 {
913 if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
914 fields.clone()
915 } else {
916 unreachable!()
917 }
918 },
919 vec![
920 Arc::new(Int64Array::from_iter_values(0..1024)),
921 Arc::new(Int64Array::from_iter_values(0..1024)),
922 ],
923 None,
924 ));
925 let col2 = Arc::new(arrow_array::StringArray::from_iter_values(
926 (0..1024).map(|n| n.to_string()),
927 )) as ArrayRef;
928 let col3 = Arc::new({
929 let list_parts = arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(
930 (0..1024).map(|n| Some(vec![Some(n)])),
931 )
932 .into_parts();
933 arrow_array::ListArray::new(
934 {
935 if let DataType::List(field) = arrow_schema.field(3).data_type() {
936 field.clone()
937 } else {
938 unreachable!()
939 }
940 },
941 list_parts.1,
942 list_parts.2,
943 list_parts.3,
944 )
945 }) as ArrayRef;
946 let col4 = Arc::new(StructArray::new(
947 {
948 if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
949 fields.clone()
950 } else {
951 unreachable!()
952 }
953 },
954 vec![Arc::new(StructArray::new(
955 {
956 if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
957 if let DataType::Struct(fields) = fields[0].data_type() {
958 fields.clone()
959 } else {
960 unreachable!()
961 }
962 } else {
963 unreachable!()
964 }
965 },
966 vec![Arc::new(Int64Array::from_iter_values(0..1024))],
967 None,
968 ))],
969 None,
970 ));
971 let col5 = Arc::new({
972 let mut map_array_builder = MapBuilder::new(
973 None,
974 arrow_array::builder::StringBuilder::new(),
975 arrow_array::builder::ListBuilder::new(arrow_array::builder::PrimitiveBuilder::<
976 Int64Type,
977 >::new()),
978 );
979 for i in 0..1024 {
980 map_array_builder.keys().append_value(i.to_string());
981 map_array_builder
982 .values()
983 .append_value(vec![Some(i as i64); i + 1]);
984 map_array_builder.append(true)?;
985 }
986 let (_, offset_buffer, struct_array, null_buffer, ordered) =
987 map_array_builder.finish().into_parts();
988 let struct_array = {
989 let (_, mut arrays, nulls) = struct_array.into_parts();
990 let list_array = {
991 let list_array = arrays[1]
992 .as_any()
993 .downcast_ref::<ListArray>()
994 .unwrap()
995 .clone();
996 let (_, offsets, array, nulls) = list_array.into_parts();
997 let list_field = {
998 if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
999 if let DataType::Struct(fields) = map_field.data_type() {
1000 if let DataType::List(list_field) = fields[1].data_type() {
1001 list_field.clone()
1002 } else {
1003 unreachable!()
1004 }
1005 } else {
1006 unreachable!()
1007 }
1008 } else {
1009 unreachable!()
1010 }
1011 };
1012 ListArray::new(list_field, offsets, array, nulls)
1013 };
1014 arrays[1] = Arc::new(list_array) as ArrayRef;
1015 StructArray::new(
1016 {
1017 if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1018 if let DataType::Struct(fields) = map_field.data_type() {
1019 fields.clone()
1020 } else {
1021 unreachable!()
1022 }
1023 } else {
1024 unreachable!()
1025 }
1026 },
1027 arrays,
1028 nulls,
1029 )
1030 };
1031 arrow_array::MapArray::new(
1032 {
1033 if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1034 map_field.clone()
1035 } else {
1036 unreachable!()
1037 }
1038 },
1039 offset_buffer,
1040 struct_array,
1041 null_buffer,
1042 ordered,
1043 )
1044 }) as ArrayRef;
1045 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1046 col0, col1, col2, col3, col4, col5,
1047 ])
1048 .unwrap();
1049 let output_file = file_io.new_output(
1050 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1051 )?;
1052
1053 let mut pw =
1055 ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
1056 .build(output_file)
1057 .await?;
1058 pw.write(&to_write).await?;
1059 let res = pw.close().await?;
1060 assert_eq!(res.len(), 1);
1061 let data_file = res
1062 .into_iter()
1063 .next()
1064 .unwrap()
1065 .content(crate::spec::DataContentType::Data)
1067 .partition(Struct::empty())
1068 .partition_spec_id(0)
1069 .build()
1070 .unwrap();
1071
1072 assert_eq!(data_file.record_count(), 1024);
1074 assert_eq!(
1075 *data_file.value_counts(),
1076 HashMap::from([
1077 (0, 1024),
1078 (5, 1024),
1079 (6, 1024),
1080 (2, 1024),
1081 (7, 1024),
1082 (9, 1024),
1083 (11, 1024),
1084 (13, (1..1025).sum()),
1085 ])
1086 );
1087 assert_eq!(
1088 *data_file.lower_bounds(),
1089 HashMap::from([
1090 (0, Datum::long(0)),
1091 (5, Datum::long(0)),
1092 (6, Datum::long(0)),
1093 (2, Datum::string("0")),
1094 (7, Datum::long(0)),
1095 (9, Datum::long(0)),
1096 (11, Datum::string("0")),
1097 (13, Datum::long(0))
1098 ])
1099 );
1100 assert_eq!(
1101 *data_file.upper_bounds(),
1102 HashMap::from([
1103 (0, Datum::long(1023)),
1104 (5, Datum::long(1023)),
1105 (6, Datum::long(1023)),
1106 (2, Datum::string("999")),
1107 (7, Datum::long(1023)),
1108 (9, Datum::long(1023)),
1109 (11, Datum::string("999")),
1110 (13, Datum::long(1023))
1111 ])
1112 );
1113
1114 check_parquet_data_file(&file_io, &data_file, &to_write).await;
1116
1117 Ok(())
1118 }
1119
1120 #[tokio::test]
1121 async fn test_all_type_for_write() -> Result<()> {
1122 let temp_dir = TempDir::new().unwrap();
1123 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1124 let location_gen = DefaultLocationGenerator::with_data_location(
1125 temp_dir.path().to_str().unwrap().to_string(),
1126 );
1127 let file_name_gen =
1128 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1129
1130 let schema = schema_for_all_type();
1133 let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
1134 let col0 = Arc::new(BooleanArray::from(vec![
1135 Some(true),
1136 Some(false),
1137 None,
1138 Some(true),
1139 ])) as ArrayRef;
1140 let col1 = Arc::new(Int32Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1141 let col2 = Arc::new(Int64Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1142 let col3 = Arc::new(arrow_array::Float32Array::from(vec![
1143 Some(0.5),
1144 Some(2.0),
1145 None,
1146 Some(3.5),
1147 ])) as ArrayRef;
1148 let col4 = Arc::new(arrow_array::Float64Array::from(vec![
1149 Some(0.5),
1150 Some(2.0),
1151 None,
1152 Some(3.5),
1153 ])) as ArrayRef;
1154 let col5 = Arc::new(arrow_array::StringArray::from(vec![
1155 Some("a"),
1156 Some("b"),
1157 None,
1158 Some("d"),
1159 ])) as ArrayRef;
1160 let col6 = Arc::new(arrow_array::LargeBinaryArray::from_opt_vec(vec![
1161 Some(b"one"),
1162 None,
1163 Some(b""),
1164 Some(b"zzzz"),
1165 ])) as ArrayRef;
1166 let col7 = Arc::new(arrow_array::Date32Array::from(vec![
1167 Some(0),
1168 Some(1),
1169 None,
1170 Some(3),
1171 ])) as ArrayRef;
1172 let col8 = Arc::new(arrow_array::Time64MicrosecondArray::from(vec![
1173 Some(0),
1174 Some(1),
1175 None,
1176 Some(3),
1177 ])) as ArrayRef;
1178 let col9 = Arc::new(arrow_array::TimestampMicrosecondArray::from(vec![
1179 Some(0),
1180 Some(1),
1181 None,
1182 Some(3),
1183 ])) as ArrayRef;
1184 let col10 = Arc::new(
1185 arrow_array::TimestampMicrosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1186 .with_timezone_utc(),
1187 ) as ArrayRef;
1188 let col11 = Arc::new(arrow_array::TimestampNanosecondArray::from(vec![
1189 Some(0),
1190 Some(1),
1191 None,
1192 Some(3),
1193 ])) as ArrayRef;
1194 let col12 = Arc::new(
1195 arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1196 .with_timezone_utc(),
1197 ) as ArrayRef;
1198 let col13 = Arc::new(
1199 arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1200 .with_precision_and_scale(10, 5)
1201 .unwrap(),
1202 ) as ArrayRef;
1203 let col14 = Arc::new(
1204 arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1205 vec![
1206 Some(Uuid::from_u128(0).as_bytes().to_vec()),
1207 Some(Uuid::from_u128(1).as_bytes().to_vec()),
1208 None,
1209 Some(Uuid::from_u128(3).as_bytes().to_vec()),
1210 ]
1211 .into_iter(),
1212 16,
1213 )
1214 .unwrap(),
1215 ) as ArrayRef;
1216 let col15 = Arc::new(
1217 arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1218 vec![
1219 Some(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
1220 Some(vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]),
1221 None,
1222 Some(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]),
1223 ]
1224 .into_iter(),
1225 10,
1226 )
1227 .unwrap(),
1228 ) as ArrayRef;
1229 let col16 = Arc::new(
1230 arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1231 .with_precision_and_scale(38, 5)
1232 .unwrap(),
1233 ) as ArrayRef;
1234 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1235 col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13,
1236 col14, col15, col16,
1237 ])
1238 .unwrap();
1239 let output_file = file_io.new_output(
1240 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1241 )?;
1242
1243 let mut pw =
1245 ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
1246 .build(output_file)
1247 .await?;
1248 pw.write(&to_write).await?;
1249 let res = pw.close().await?;
1250 assert_eq!(res.len(), 1);
1251 let data_file = res
1252 .into_iter()
1253 .next()
1254 .unwrap()
1255 .content(crate::spec::DataContentType::Data)
1257 .partition(Struct::empty())
1258 .partition_spec_id(0)
1259 .build()
1260 .unwrap();
1261
1262 assert_eq!(data_file.record_count(), 4);
1264 assert!(data_file.value_counts().iter().all(|(_, &v)| { v == 4 }));
1265 assert!(
1266 data_file
1267 .null_value_counts()
1268 .iter()
1269 .all(|(_, &v)| { v == 1 })
1270 );
1271 assert_eq!(
1272 *data_file.lower_bounds(),
1273 HashMap::from([
1274 (0, Datum::bool(false)),
1275 (1, Datum::int(1)),
1276 (2, Datum::long(1)),
1277 (3, Datum::float(0.5)),
1278 (4, Datum::double(0.5)),
1279 (5, Datum::string("a")),
1280 (6, Datum::binary(vec![])),
1281 (7, Datum::date(0)),
1282 (8, Datum::time_micros(0).unwrap()),
1283 (9, Datum::timestamp_micros(0)),
1284 (10, Datum::timestamptz_micros(0)),
1285 (11, Datum::timestamp_nanos(0)),
1286 (12, Datum::timestamptz_nanos(0)),
1287 (
1288 13,
1289 Datum::new(
1290 PrimitiveType::Decimal {
1291 precision: 10,
1292 scale: 5
1293 },
1294 PrimitiveLiteral::Int128(1)
1295 )
1296 ),
1297 (14, Datum::uuid(Uuid::from_u128(0))),
1298 (15, Datum::fixed(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1299 (
1300 16,
1301 Datum::new(
1302 PrimitiveType::Decimal {
1303 precision: 38,
1304 scale: 5
1305 },
1306 PrimitiveLiteral::Int128(1)
1307 )
1308 ),
1309 ])
1310 );
1311 assert_eq!(
1312 *data_file.upper_bounds(),
1313 HashMap::from([
1314 (0, Datum::bool(true)),
1315 (1, Datum::int(4)),
1316 (2, Datum::long(4)),
1317 (3, Datum::float(3.5)),
1318 (4, Datum::double(3.5)),
1319 (5, Datum::string("d")),
1320 (6, Datum::binary(vec![122, 122, 122, 122])),
1321 (7, Datum::date(3)),
1322 (8, Datum::time_micros(3).unwrap()),
1323 (9, Datum::timestamp_micros(3)),
1324 (10, Datum::timestamptz_micros(3)),
1325 (11, Datum::timestamp_nanos(3)),
1326 (12, Datum::timestamptz_nanos(3)),
1327 (
1328 13,
1329 Datum::new(
1330 PrimitiveType::Decimal {
1331 precision: 10,
1332 scale: 5
1333 },
1334 PrimitiveLiteral::Int128(100)
1335 )
1336 ),
1337 (14, Datum::uuid(Uuid::from_u128(3))),
1338 (
1339 15,
1340 Datum::fixed(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30])
1341 ),
1342 (
1343 16,
1344 Datum::new(
1345 PrimitiveType::Decimal {
1346 precision: 38,
1347 scale: 5
1348 },
1349 PrimitiveLiteral::Int128(100)
1350 )
1351 ),
1352 ])
1353 );
1354
1355 check_parquet_data_file(&file_io, &data_file, &to_write).await;
1357
1358 Ok(())
1359 }
1360
1361 #[tokio::test]
1362 async fn test_decimal_bound() -> Result<()> {
1363 let temp_dir = TempDir::new().unwrap();
1364 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1365 let location_gen = DefaultLocationGenerator::with_data_location(
1366 temp_dir.path().to_str().unwrap().to_string(),
1367 );
1368 let file_name_gen =
1369 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1370
1371 let schema = Arc::new(
1373 Schema::builder()
1374 .with_fields(vec![
1375 NestedField::optional(
1376 0,
1377 "decimal",
1378 Type::Primitive(PrimitiveType::Decimal {
1379 precision: 28,
1380 scale: 10,
1381 }),
1382 )
1383 .into(),
1384 ])
1385 .build()
1386 .unwrap(),
1387 );
1388 let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1389 let output_file = file_io.new_output(
1390 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1391 )?;
1392 let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1393 .build(output_file)
1394 .await?;
1395 let col0 = Arc::new(
1396 Decimal128Array::from(vec![Some(22000000000), Some(11000000000)])
1397 .with_data_type(DataType::Decimal128(28, 10)),
1398 ) as ArrayRef;
1399 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1400 pw.write(&to_write).await?;
1401 let res = pw.close().await?;
1402 assert_eq!(res.len(), 1);
1403 let data_file = res
1404 .into_iter()
1405 .next()
1406 .unwrap()
1407 .content(crate::spec::DataContentType::Data)
1408 .partition(Struct::empty())
1409 .partition_spec_id(0)
1410 .build()
1411 .unwrap();
1412 assert_eq!(
1413 data_file.upper_bounds().get(&0),
1414 Some(Datum::decimal_with_precision(Decimal::new(22000000000_i64, 10), 28).unwrap())
1415 .as_ref()
1416 );
1417 assert_eq!(
1418 data_file.lower_bounds().get(&0),
1419 Some(Datum::decimal_with_precision(Decimal::new(11000000000_i64, 10), 28).unwrap())
1420 .as_ref()
1421 );
1422
1423 let schema = Arc::new(
1425 Schema::builder()
1426 .with_fields(vec![
1427 NestedField::optional(
1428 0,
1429 "decimal",
1430 Type::Primitive(PrimitiveType::Decimal {
1431 precision: 28,
1432 scale: 10,
1433 }),
1434 )
1435 .into(),
1436 ])
1437 .build()
1438 .unwrap(),
1439 );
1440 let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1441 let output_file = file_io.new_output(
1442 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1443 )?;
1444 let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1445 .build(output_file)
1446 .await?;
1447 let col0 = Arc::new(
1448 Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)])
1449 .with_data_type(DataType::Decimal128(28, 10)),
1450 ) as ArrayRef;
1451 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1452 pw.write(&to_write).await?;
1453 let res = pw.close().await?;
1454 assert_eq!(res.len(), 1);
1455 let data_file = res
1456 .into_iter()
1457 .next()
1458 .unwrap()
1459 .content(crate::spec::DataContentType::Data)
1460 .partition(Struct::empty())
1461 .partition_spec_id(0)
1462 .build()
1463 .unwrap();
1464 assert_eq!(
1465 data_file.upper_bounds().get(&0),
1466 Some(Datum::decimal_with_precision(Decimal::new(-11000000000_i64, 10), 28).unwrap())
1467 .as_ref()
1468 );
1469 assert_eq!(
1470 data_file.lower_bounds().get(&0),
1471 Some(Datum::decimal_with_precision(Decimal::new(-22000000000_i64, 10), 28).unwrap())
1472 .as_ref()
1473 );
1474
1475 let decimal_max = Decimal::MAX;
1477 let decimal_min = Decimal::MIN;
1478 assert_eq!(decimal_max.scale(), decimal_min.scale());
1479 let schema = Arc::new(
1480 Schema::builder()
1481 .with_fields(vec![
1482 NestedField::optional(
1483 0,
1484 "decimal",
1485 Type::Primitive(PrimitiveType::Decimal {
1486 precision: 38,
1487 scale: decimal_max.scale(),
1488 }),
1489 )
1490 .into(),
1491 ])
1492 .build()
1493 .unwrap(),
1494 );
1495 let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1496 let output_file = file_io.new_output(
1497 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1498 )?;
1499 let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema)
1500 .build(output_file)
1501 .await?;
1502 let col0 = Arc::new(
1503 Decimal128Array::from(vec![
1504 Some(decimal_max.mantissa()),
1505 Some(decimal_min.mantissa()),
1506 ])
1507 .with_data_type(DataType::Decimal128(38, 0)),
1508 ) as ArrayRef;
1509 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1510 pw.write(&to_write).await?;
1511 let res = pw.close().await?;
1512 assert_eq!(res.len(), 1);
1513 let data_file = res
1514 .into_iter()
1515 .next()
1516 .unwrap()
1517 .content(crate::spec::DataContentType::Data)
1518 .partition(Struct::empty())
1519 .partition_spec_id(0)
1520 .build()
1521 .unwrap();
1522 assert_eq!(
1523 data_file.upper_bounds().get(&0),
1524 Some(Datum::decimal(decimal_max).unwrap()).as_ref()
1525 );
1526 assert_eq!(
1527 data_file.lower_bounds().get(&0),
1528 Some(Datum::decimal(decimal_min).unwrap()).as_ref()
1529 );
1530
1531 Ok(())
1601 }
1602
1603 #[tokio::test]
1604 async fn test_empty_write() -> Result<()> {
1605 let temp_dir = TempDir::new().unwrap();
1606 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1607 let location_gen = DefaultLocationGenerator::with_data_location(
1608 temp_dir.path().to_str().unwrap().to_string(),
1609 );
1610 let file_name_gen =
1611 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1612
1613 let schema = {
1615 let fields = vec![
1616 arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true).with_metadata(
1617 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1618 ),
1619 ];
1620 Arc::new(arrow_schema::Schema::new(fields))
1621 };
1622 let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
1623 let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
1624 let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name());
1625 let output_file = file_io.new_output(&file_path)?;
1626 let mut pw = ParquetWriterBuilder::new(
1627 WriterProperties::builder().build(),
1628 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1629 )
1630 .build(output_file)
1631 .await?;
1632 pw.write(&to_write).await?;
1633 pw.close().await.unwrap();
1634 assert!(file_io.exists(&file_path).await.unwrap());
1635
1636 let file_name_gen =
1638 DefaultFileNameGenerator::new("test_empty".to_string(), None, DataFileFormat::Parquet);
1639 let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name());
1640 let output_file = file_io.new_output(&file_path)?;
1641 let pw = ParquetWriterBuilder::new(
1642 WriterProperties::builder().build(),
1643 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1644 )
1645 .build(output_file)
1646 .await?;
1647 pw.close().await.unwrap();
1648 assert!(!file_io.exists(&file_path).await.unwrap());
1649
1650 Ok(())
1651 }
1652
1653 #[tokio::test]
1654 async fn test_nan_val_cnts_primitive_type() -> Result<()> {
1655 let temp_dir = TempDir::new().unwrap();
1656 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1657 let location_gen = DefaultLocationGenerator::with_data_location(
1658 temp_dir.path().to_str().unwrap().to_string(),
1659 );
1660 let file_name_gen =
1661 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1662
1663 let arrow_schema = {
1665 let fields = vec![
1666 Field::new("col", arrow_schema::DataType::Float32, false).with_metadata(
1667 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1668 ),
1669 Field::new("col2", arrow_schema::DataType::Float64, false).with_metadata(
1670 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1671 ),
1672 ];
1673 Arc::new(arrow_schema::Schema::new(fields))
1674 };
1675
1676 let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1677 [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1678 None,
1679 )) as ArrayRef;
1680
1681 let float_64_col = Arc::new(Float64Array::from_iter_values_with_nulls(
1682 [1.0_f64, f64::NAN, 2.0, 2.0].into_iter(),
1683 None,
1684 )) as ArrayRef;
1685
1686 let to_write =
1687 RecordBatch::try_new(arrow_schema.clone(), vec![float_32_col, float_64_col]).unwrap();
1688 let output_file = file_io.new_output(
1689 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1690 )?;
1691
1692 let mut pw = ParquetWriterBuilder::new(
1694 WriterProperties::builder().build(),
1695 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1696 )
1697 .build(output_file)
1698 .await?;
1699
1700 pw.write(&to_write).await?;
1701 let res = pw.close().await?;
1702 assert_eq!(res.len(), 1);
1703 let data_file = res
1704 .into_iter()
1705 .next()
1706 .unwrap()
1707 .content(crate::spec::DataContentType::Data)
1709 .partition(Struct::empty())
1710 .partition_spec_id(0)
1711 .build()
1712 .unwrap();
1713
1714 assert_eq!(data_file.record_count(), 4);
1716 assert_eq!(*data_file.value_counts(), HashMap::from([(0, 4), (1, 4)]));
1717 assert_eq!(
1718 *data_file.lower_bounds(),
1719 HashMap::from([(0, Datum::float(1.0)), (1, Datum::double(1.0)),])
1720 );
1721 assert_eq!(
1722 *data_file.upper_bounds(),
1723 HashMap::from([(0, Datum::float(2.0)), (1, Datum::double(2.0)),])
1724 );
1725 assert_eq!(
1726 *data_file.null_value_counts(),
1727 HashMap::from([(0, 0), (1, 0)])
1728 );
1729 assert_eq!(
1730 *data_file.nan_value_counts(),
1731 HashMap::from([(0, 1), (1, 1)])
1732 );
1733
1734 let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1736 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1737
1738 Ok(())
1739 }
1740
1741 #[tokio::test]
1742 async fn test_nan_val_cnts_struct_type() -> Result<()> {
1743 let temp_dir = TempDir::new().unwrap();
1744 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1745 let location_gen = DefaultLocationGenerator::with_data_location(
1746 temp_dir.path().to_str().unwrap().to_string(),
1747 );
1748 let file_name_gen =
1749 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1750
1751 let schema_struct_float_fields = Fields::from(vec![
1752 Field::new("col4", DataType::Float32, false).with_metadata(HashMap::from([(
1753 PARQUET_FIELD_ID_META_KEY.to_string(),
1754 "4".to_string(),
1755 )])),
1756 ]);
1757
1758 let schema_struct_nested_float_fields = Fields::from(vec![
1759 Field::new("col7", DataType::Float32, false).with_metadata(HashMap::from([(
1760 PARQUET_FIELD_ID_META_KEY.to_string(),
1761 "7".to_string(),
1762 )])),
1763 ]);
1764
1765 let schema_struct_nested_fields = Fields::from(vec![
1766 Field::new(
1767 "col6",
1768 arrow_schema::DataType::Struct(schema_struct_nested_float_fields.clone()),
1769 false,
1770 )
1771 .with_metadata(HashMap::from([(
1772 PARQUET_FIELD_ID_META_KEY.to_string(),
1773 "6".to_string(),
1774 )])),
1775 ]);
1776
1777 let arrow_schema = {
1779 let fields = vec![
1780 Field::new(
1781 "col3",
1782 arrow_schema::DataType::Struct(schema_struct_float_fields.clone()),
1783 false,
1784 )
1785 .with_metadata(HashMap::from([(
1786 PARQUET_FIELD_ID_META_KEY.to_string(),
1787 "3".to_string(),
1788 )])),
1789 Field::new(
1790 "col5",
1791 arrow_schema::DataType::Struct(schema_struct_nested_fields.clone()),
1792 false,
1793 )
1794 .with_metadata(HashMap::from([(
1795 PARQUET_FIELD_ID_META_KEY.to_string(),
1796 "5".to_string(),
1797 )])),
1798 ];
1799 Arc::new(arrow_schema::Schema::new(fields))
1800 };
1801
1802 let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1803 [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1804 None,
1805 )) as ArrayRef;
1806
1807 let struct_float_field_col = Arc::new(StructArray::new(
1808 schema_struct_float_fields,
1809 vec![float_32_col.clone()],
1810 None,
1811 )) as ArrayRef;
1812
1813 let struct_nested_float_field_col = Arc::new(StructArray::new(
1814 schema_struct_nested_fields,
1815 vec![Arc::new(StructArray::new(
1816 schema_struct_nested_float_fields,
1817 vec![float_32_col.clone()],
1818 None,
1819 )) as ArrayRef],
1820 None,
1821 )) as ArrayRef;
1822
1823 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1824 struct_float_field_col,
1825 struct_nested_float_field_col,
1826 ])
1827 .unwrap();
1828 let output_file = file_io.new_output(
1829 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1830 )?;
1831
1832 let mut pw = ParquetWriterBuilder::new(
1834 WriterProperties::builder().build(),
1835 Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1836 )
1837 .build(output_file)
1838 .await?;
1839
1840 pw.write(&to_write).await?;
1841 let res = pw.close().await?;
1842 assert_eq!(res.len(), 1);
1843 let data_file = res
1844 .into_iter()
1845 .next()
1846 .unwrap()
1847 .content(crate::spec::DataContentType::Data)
1849 .partition(Struct::empty())
1850 .partition_spec_id(0)
1851 .build()
1852 .unwrap();
1853
1854 assert_eq!(data_file.record_count(), 4);
1856 assert_eq!(*data_file.value_counts(), HashMap::from([(4, 4), (7, 4)]));
1857 assert_eq!(
1858 *data_file.lower_bounds(),
1859 HashMap::from([(4, Datum::float(1.0)), (7, Datum::float(1.0)),])
1860 );
1861 assert_eq!(
1862 *data_file.upper_bounds(),
1863 HashMap::from([(4, Datum::float(2.0)), (7, Datum::float(2.0)),])
1864 );
1865 assert_eq!(
1866 *data_file.null_value_counts(),
1867 HashMap::from([(4, 0), (7, 0)])
1868 );
1869 assert_eq!(
1870 *data_file.nan_value_counts(),
1871 HashMap::from([(4, 1), (7, 1)])
1872 );
1873
1874 let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1876 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1877
1878 Ok(())
1879 }
1880
1881 #[tokio::test]
1882 async fn test_nan_val_cnts_list_type() -> Result<()> {
1883 let temp_dir = TempDir::new().unwrap();
1884 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1885 let location_gen = DefaultLocationGenerator::with_data_location(
1886 temp_dir.path().to_str().unwrap().to_string(),
1887 );
1888 let file_name_gen =
1889 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1890
1891 let schema_list_float_field = Field::new("element", DataType::Float32, true).with_metadata(
1892 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1893 );
1894
1895 let schema_struct_list_float_field = Field::new("element", DataType::Float32, true)
1896 .with_metadata(HashMap::from([(
1897 PARQUET_FIELD_ID_META_KEY.to_string(),
1898 "4".to_string(),
1899 )]));
1900
1901 let schema_struct_list_field = Fields::from(vec![
1902 Field::new_list("col2", schema_struct_list_float_field.clone(), true).with_metadata(
1903 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
1904 ),
1905 ]);
1906
1907 let arrow_schema = {
1908 let fields = vec![
1909 Field::new_list("col0", schema_list_float_field.clone(), true).with_metadata(
1910 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1911 ),
1912 Field::new_struct("col1", schema_struct_list_field.clone(), true)
1913 .with_metadata(HashMap::from([(
1914 PARQUET_FIELD_ID_META_KEY.to_string(),
1915 "2".to_string(),
1916 )]))
1917 .clone(),
1918 ];
1922 Arc::new(arrow_schema::Schema::new(fields))
1923 };
1924
1925 let list_parts = ListArray::from_iter_primitive::<Float32Type, _, _>(vec![Some(vec![
1926 Some(1.0_f32),
1927 Some(f32::NAN),
1928 Some(2.0),
1929 Some(2.0),
1930 ])])
1931 .into_parts();
1932
1933 let list_float_field_col = Arc::new({
1934 let list_parts = list_parts.clone();
1935 ListArray::new(
1936 {
1937 if let DataType::List(field) = arrow_schema.field(0).data_type() {
1938 field.clone()
1939 } else {
1940 unreachable!()
1941 }
1942 },
1943 list_parts.1,
1944 list_parts.2,
1945 list_parts.3,
1946 )
1947 }) as ArrayRef;
1948
1949 let struct_list_fields_schema =
1950 if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
1951 fields.clone()
1952 } else {
1953 unreachable!()
1954 };
1955
1956 let struct_list_float_field_col = Arc::new({
1957 ListArray::new(
1958 {
1959 if let DataType::List(field) = struct_list_fields_schema
1960 .first()
1961 .expect("could not find first list field")
1962 .data_type()
1963 {
1964 field.clone()
1965 } else {
1966 unreachable!()
1967 }
1968 },
1969 list_parts.1,
1970 list_parts.2,
1971 list_parts.3,
1972 )
1973 }) as ArrayRef;
1974
1975 let struct_list_float_field_col = Arc::new(StructArray::new(
1976 struct_list_fields_schema,
1977 vec![struct_list_float_field_col.clone()],
1978 None,
1979 )) as ArrayRef;
1980
1981 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1982 list_float_field_col,
1983 struct_list_float_field_col,
1984 ])
1986 .expect("Could not form record batch");
1987 let output_file = file_io.new_output(
1988 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1989 )?;
1990
1991 let mut pw = ParquetWriterBuilder::new(
1993 WriterProperties::builder().build(),
1994 Arc::new(
1995 to_write
1996 .schema()
1997 .as_ref()
1998 .try_into()
1999 .expect("Could not convert iceberg schema"),
2000 ),
2001 )
2002 .build(output_file)
2003 .await?;
2004
2005 pw.write(&to_write).await?;
2006 let res = pw.close().await?;
2007 assert_eq!(res.len(), 1);
2008 let data_file = res
2009 .into_iter()
2010 .next()
2011 .unwrap()
2012 .content(crate::spec::DataContentType::Data)
2013 .partition(Struct::empty())
2014 .partition_spec_id(0)
2015 .build()
2016 .unwrap();
2017
2018 assert_eq!(data_file.record_count(), 1);
2020 assert_eq!(*data_file.value_counts(), HashMap::from([(1, 4), (4, 4)]));
2021 assert_eq!(
2022 *data_file.lower_bounds(),
2023 HashMap::from([(1, Datum::float(1.0)), (4, Datum::float(1.0))])
2024 );
2025 assert_eq!(
2026 *data_file.upper_bounds(),
2027 HashMap::from([(1, Datum::float(2.0)), (4, Datum::float(2.0))])
2028 );
2029 assert_eq!(
2030 *data_file.null_value_counts(),
2031 HashMap::from([(1, 0), (4, 0)])
2032 );
2033 assert_eq!(
2034 *data_file.nan_value_counts(),
2035 HashMap::from([(1, 1), (4, 1)])
2036 );
2037
2038 let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2040 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2041
2042 Ok(())
2043 }
2044
2045 macro_rules! construct_map_arr {
2046 ($map_key_field_schema:ident, $map_value_field_schema:ident) => {{
2047 let int_builder = Int32Builder::new();
2048 let float_builder = Float32Builder::with_capacity(4);
2049 let mut builder = MapBuilder::new(None, int_builder, float_builder);
2050 builder.keys().append_value(1);
2051 builder.values().append_value(1.0_f32);
2052 builder.append(true).unwrap();
2053 builder.keys().append_value(2);
2054 builder.values().append_value(f32::NAN);
2055 builder.append(true).unwrap();
2056 builder.keys().append_value(3);
2057 builder.values().append_value(2.0);
2058 builder.append(true).unwrap();
2059 builder.keys().append_value(4);
2060 builder.values().append_value(2.0);
2061 builder.append(true).unwrap();
2062 let array = builder.finish();
2063
2064 let (_field, offsets, entries, nulls, ordered) = array.into_parts();
2065 let new_struct_fields_schema =
2066 Fields::from(vec![$map_key_field_schema, $map_value_field_schema]);
2067
2068 let entries = {
2069 let (_, arrays, nulls) = entries.into_parts();
2070 StructArray::new(new_struct_fields_schema.clone(), arrays, nulls)
2071 };
2072
2073 let field = Arc::new(Field::new(
2074 DEFAULT_MAP_FIELD_NAME,
2075 DataType::Struct(new_struct_fields_schema),
2076 false,
2077 ));
2078
2079 Arc::new(MapArray::new(field, offsets, entries, nulls, ordered))
2080 }};
2081 }
2082
2083 #[tokio::test]
2084 async fn test_nan_val_cnts_map_type() -> Result<()> {
2085 let temp_dir = TempDir::new().unwrap();
2086 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2087 let location_gen = DefaultLocationGenerator::with_data_location(
2088 temp_dir.path().to_str().unwrap().to_string(),
2089 );
2090 let file_name_gen =
2091 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2092
2093 let map_key_field_schema =
2094 Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2095 (PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()),
2096 ]));
2097
2098 let map_value_field_schema =
2099 Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2100 [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
2101 ));
2102
2103 let struct_map_key_field_schema =
2104 Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2105 (PARQUET_FIELD_ID_META_KEY.to_string(), "6".to_string()),
2106 ]));
2107
2108 let struct_map_value_field_schema =
2109 Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2110 [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())],
2111 ));
2112
2113 let schema_struct_map_field = Fields::from(vec![
2114 Field::new_map(
2115 "col3",
2116 DEFAULT_MAP_FIELD_NAME,
2117 struct_map_key_field_schema.clone(),
2118 struct_map_value_field_schema.clone(),
2119 false,
2120 false,
2121 )
2122 .with_metadata(HashMap::from([(
2123 PARQUET_FIELD_ID_META_KEY.to_string(),
2124 "5".to_string(),
2125 )])),
2126 ]);
2127
2128 let arrow_schema = {
2129 let fields = vec![
2130 Field::new_map(
2131 "col0",
2132 DEFAULT_MAP_FIELD_NAME,
2133 map_key_field_schema.clone(),
2134 map_value_field_schema.clone(),
2135 false,
2136 false,
2137 )
2138 .with_metadata(HashMap::from([(
2139 PARQUET_FIELD_ID_META_KEY.to_string(),
2140 "0".to_string(),
2141 )])),
2142 Field::new_struct("col1", schema_struct_map_field.clone(), true)
2143 .with_metadata(HashMap::from([(
2144 PARQUET_FIELD_ID_META_KEY.to_string(),
2145 "3".to_string(),
2146 )]))
2147 .clone(),
2148 ];
2149 Arc::new(arrow_schema::Schema::new(fields))
2150 };
2151
2152 let map_array = construct_map_arr!(map_key_field_schema, map_value_field_schema);
2153
2154 let struct_map_arr =
2155 construct_map_arr!(struct_map_key_field_schema, struct_map_value_field_schema);
2156
2157 let struct_list_float_field_col = Arc::new(StructArray::new(
2158 schema_struct_map_field,
2159 vec![struct_map_arr],
2160 None,
2161 )) as ArrayRef;
2162
2163 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
2164 map_array,
2165 struct_list_float_field_col,
2166 ])
2167 .expect("Could not form record batch");
2168 let output_file = file_io.new_output(
2169 location_gen.generate_location(None, &file_name_gen.generate_file_name()),
2170 )?;
2171
2172 let mut pw = ParquetWriterBuilder::new(
2174 WriterProperties::builder().build(),
2175 Arc::new(
2176 to_write
2177 .schema()
2178 .as_ref()
2179 .try_into()
2180 .expect("Could not convert iceberg schema"),
2181 ),
2182 )
2183 .build(output_file)
2184 .await?;
2185
2186 pw.write(&to_write).await?;
2187 let res = pw.close().await?;
2188 assert_eq!(res.len(), 1);
2189 let data_file = res
2190 .into_iter()
2191 .next()
2192 .unwrap()
2193 .content(crate::spec::DataContentType::Data)
2194 .partition(Struct::empty())
2195 .partition_spec_id(0)
2196 .build()
2197 .unwrap();
2198
2199 assert_eq!(data_file.record_count(), 4);
2201 assert_eq!(
2202 *data_file.value_counts(),
2203 HashMap::from([(1, 4), (2, 4), (6, 4), (7, 4)])
2204 );
2205 assert_eq!(
2206 *data_file.lower_bounds(),
2207 HashMap::from([
2208 (1, Datum::int(1)),
2209 (2, Datum::float(1.0)),
2210 (6, Datum::int(1)),
2211 (7, Datum::float(1.0))
2212 ])
2213 );
2214 assert_eq!(
2215 *data_file.upper_bounds(),
2216 HashMap::from([
2217 (1, Datum::int(4)),
2218 (2, Datum::float(2.0)),
2219 (6, Datum::int(4)),
2220 (7, Datum::float(2.0))
2221 ])
2222 );
2223 assert_eq!(
2224 *data_file.null_value_counts(),
2225 HashMap::from([(1, 0), (2, 0), (6, 0), (7, 0)])
2226 );
2227 assert_eq!(
2228 *data_file.nan_value_counts(),
2229 HashMap::from([(2, 1), (7, 1)])
2230 );
2231
2232 let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2234 check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2235
2236 Ok(())
2237 }
2238
2239 #[tokio::test]
2240 async fn test_write_empty_parquet_file() {
2241 let temp_dir = TempDir::new().unwrap();
2242 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2243 let location_gen = DefaultLocationGenerator::with_data_location(
2244 temp_dir.path().to_str().unwrap().to_string(),
2245 );
2246 let file_name_gen =
2247 DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2248 let output_file = file_io
2249 .new_output(location_gen.generate_location(None, &file_name_gen.generate_file_name()))
2250 .unwrap();
2251
2252 let pw = ParquetWriterBuilder::new(
2254 WriterProperties::builder().build(),
2255 Arc::new(
2256 Schema::builder()
2257 .with_schema_id(1)
2258 .with_fields(vec![
2259 NestedField::required(0, "col", Type::Primitive(PrimitiveType::Long))
2260 .with_id(0)
2261 .into(),
2262 ])
2263 .build()
2264 .expect("Failed to create schema"),
2265 ),
2266 )
2267 .build(output_file)
2268 .await
2269 .unwrap();
2270
2271 let res = pw.close().await.unwrap();
2272 assert_eq!(res.len(), 0);
2273
2274 assert_eq!(std::fs::read_dir(temp_dir.path()).unwrap().count(), 0);
2276 }
2277
2278 #[test]
2279 fn test_min_max_aggregator() {
2280 let schema = Arc::new(
2281 Schema::builder()
2282 .with_schema_id(1)
2283 .with_fields(vec![
2284 NestedField::required(0, "col", Type::Primitive(PrimitiveType::Int))
2285 .with_id(0)
2286 .into(),
2287 ])
2288 .build()
2289 .expect("Failed to create schema"),
2290 );
2291 let mut min_max_agg = MinMaxColAggregator::new(schema);
2292 let create_statistics =
2293 |min, max| Statistics::Int32(ValueStatistics::new(min, max, None, None, false));
2294 min_max_agg
2295 .update(0, create_statistics(None, Some(42)))
2296 .unwrap();
2297 min_max_agg
2298 .update(0, create_statistics(Some(0), Some(i32::MAX)))
2299 .unwrap();
2300 min_max_agg
2301 .update(0, create_statistics(Some(i32::MIN), None))
2302 .unwrap();
2303 min_max_agg
2304 .update(0, create_statistics(None, None))
2305 .unwrap();
2306
2307 let (lower_bounds, upper_bounds) = min_max_agg.produce();
2308
2309 assert_eq!(lower_bounds, HashMap::from([(0, Datum::int(i32::MIN))]));
2310 assert_eq!(upper_bounds, HashMap::from([(0, Datum::int(i32::MAX))]));
2311 }
2312}