iceberg/writer/file_writer/
parquet_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The module contains the file writer for parquet file format.
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use arrow_schema::SchemaRef as ArrowSchemaRef;
24use bytes::Bytes;
25use futures::future::BoxFuture;
26use itertools::Itertools;
27use parquet::arrow::AsyncArrowWriter;
28use parquet::arrow::async_reader::AsyncFileReader;
29use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter;
30use parquet::file::metadata::ParquetMetaData;
31use parquet::file::properties::WriterProperties;
32use parquet::file::statistics::Statistics;
33
34use super::{FileWriter, FileWriterBuilder};
35use crate::arrow::{
36    ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor,
37    get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum,
38};
39use crate::io::{FileIO, FileWrite, OutputFile};
40use crate::spec::{
41    DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType,
42    NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct,
43    StructType, TableMetadata, Type, visit_schema,
44};
45use crate::transform::create_transform_function;
46use crate::writer::{CurrentFileStatus, DataFile};
47use crate::{Error, ErrorKind, Result};
48
49/// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
50#[derive(Clone, Debug)]
51pub struct ParquetWriterBuilder {
52    props: WriterProperties,
53    schema: SchemaRef,
54    match_mode: FieldMatchMode,
55}
56
57impl ParquetWriterBuilder {
58    /// Create a new `ParquetWriterBuilder`
59    /// To construct the write result, the schema should contain the `PARQUET_FIELD_ID_META_KEY` metadata for each field.
60    pub fn new(props: WriterProperties, schema: SchemaRef) -> Self {
61        Self::new_with_match_mode(props, schema, FieldMatchMode::Id)
62    }
63
64    /// Create a new `ParquetWriterBuilder` with custom match mode
65    pub fn new_with_match_mode(
66        props: WriterProperties,
67        schema: SchemaRef,
68        match_mode: FieldMatchMode,
69    ) -> Self {
70        Self {
71            props,
72            schema,
73            match_mode,
74        }
75    }
76}
77
78impl FileWriterBuilder for ParquetWriterBuilder {
79    type R = ParquetWriter;
80
81    async fn build(&self, output_file: OutputFile) -> Result<Self::R> {
82        Ok(ParquetWriter {
83            schema: self.schema.clone(),
84            inner_writer: None,
85            writer_properties: self.props.clone(),
86            current_row_num: 0,
87            output_file,
88            nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode),
89        })
90    }
91}
92
93/// A mapping from Parquet column path names to internal field id
94struct IndexByParquetPathName {
95    name_to_id: HashMap<String, i32>,
96
97    field_names: Vec<String>,
98
99    field_id: i32,
100}
101
102impl IndexByParquetPathName {
103    /// Creates a new, empty `IndexByParquetPathName`
104    pub fn new() -> Self {
105        Self {
106            name_to_id: HashMap::new(),
107            field_names: Vec::new(),
108            field_id: 0,
109        }
110    }
111
112    /// Retrieves the internal field ID
113    pub fn get(&self, name: &str) -> Option<&i32> {
114        self.name_to_id.get(name)
115    }
116}
117
118impl Default for IndexByParquetPathName {
119    fn default() -> Self {
120        Self::new()
121    }
122}
123
124impl SchemaVisitor for IndexByParquetPathName {
125    type T = ();
126
127    fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> {
128        self.field_names.push(field.name.to_string());
129        self.field_id = field.id;
130        Ok(())
131    }
132
133    fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> {
134        self.field_names.pop();
135        Ok(())
136    }
137
138    fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> {
139        self.field_names.push(format!("list.{}", field.name));
140        self.field_id = field.id;
141        Ok(())
142    }
143
144    fn after_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> {
145        self.field_names.pop();
146        Ok(())
147    }
148
149    fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> {
150        self.field_names
151            .push(format!("{DEFAULT_MAP_FIELD_NAME}.key"));
152        self.field_id = field.id;
153        Ok(())
154    }
155
156    fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> {
157        self.field_names.pop();
158        Ok(())
159    }
160
161    fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> {
162        self.field_names
163            .push(format!("{DEFAULT_MAP_FIELD_NAME}.value"));
164        self.field_id = field.id;
165        Ok(())
166    }
167
168    fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> {
169        self.field_names.pop();
170        Ok(())
171    }
172
173    fn schema(&mut self, _schema: &Schema, _value: Self::T) -> Result<Self::T> {
174        Ok(())
175    }
176
177    fn field(&mut self, _field: &NestedFieldRef, _value: Self::T) -> Result<Self::T> {
178        Ok(())
179    }
180
181    fn r#struct(&mut self, _struct: &StructType, _results: Vec<Self::T>) -> Result<Self::T> {
182        Ok(())
183    }
184
185    fn list(&mut self, _list: &ListType, _value: Self::T) -> Result<Self::T> {
186        Ok(())
187    }
188
189    fn map(&mut self, _map: &MapType, _key_value: Self::T, _value: Self::T) -> Result<Self::T> {
190        Ok(())
191    }
192
193    fn primitive(&mut self, _p: &PrimitiveType) -> Result<Self::T> {
194        let full_name = self.field_names.iter().map(String::as_str).join(".");
195        let field_id = self.field_id;
196        if let Some(existing_field_id) = self.name_to_id.get(full_name.as_str()) {
197            return Err(Error::new(
198                ErrorKind::DataInvalid,
199                format!(
200                    "Invalid schema: multiple fields for name {full_name}: {field_id} and {existing_field_id}"
201                ),
202            ));
203        } else {
204            self.name_to_id.insert(full_name, field_id);
205        }
206
207        Ok(())
208    }
209}
210
211/// `ParquetWriter`` is used to write arrow data into parquet file on storage.
212pub struct ParquetWriter {
213    schema: SchemaRef,
214    output_file: OutputFile,
215    inner_writer: Option<AsyncArrowWriter<AsyncFileWriter<Box<dyn FileWrite>>>>,
216    writer_properties: WriterProperties,
217    current_row_num: usize,
218    nan_value_count_visitor: NanValueCountVisitor,
219}
220
221/// Used to aggregate min and max value of each column.
222struct MinMaxColAggregator {
223    lower_bounds: HashMap<i32, Datum>,
224    upper_bounds: HashMap<i32, Datum>,
225    schema: SchemaRef,
226}
227
228impl MinMaxColAggregator {
229    /// Creates new and empty `MinMaxColAggregator`
230    fn new(schema: SchemaRef) -> Self {
231        Self {
232            lower_bounds: HashMap::new(),
233            upper_bounds: HashMap::new(),
234            schema,
235        }
236    }
237
238    fn update_state_min(&mut self, field_id: i32, datum: Datum) {
239        self.lower_bounds
240            .entry(field_id)
241            .and_modify(|e| {
242                if *e > datum {
243                    *e = datum.clone()
244                }
245            })
246            .or_insert(datum);
247    }
248
249    fn update_state_max(&mut self, field_id: i32, datum: Datum) {
250        self.upper_bounds
251            .entry(field_id)
252            .and_modify(|e| {
253                if *e < datum {
254                    *e = datum.clone()
255                }
256            })
257            .or_insert(datum);
258    }
259
260    /// Update statistics
261    fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> {
262        let Some(ty) = self
263            .schema
264            .field_by_id(field_id)
265            .map(|f| f.field_type.as_ref())
266        else {
267            // Following java implementation: https://github.com/apache/iceberg/blob/29a2c456353a6120b8c882ed2ab544975b168d7b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L163
268            // Ignore the field if it is not in schema.
269            return Ok(());
270        };
271        let Type::Primitive(ty) = ty.clone() else {
272            return Err(Error::new(
273                ErrorKind::Unexpected,
274                format!("Composed type {ty} is not supported for min max aggregation."),
275            ));
276        };
277
278        if value.min_is_exact() {
279            let Some(min_datum) = get_parquet_stat_min_as_datum(&ty, &value)? else {
280                return Err(Error::new(
281                    ErrorKind::Unexpected,
282                    format!("Statistics {value} is not match with field type {ty}."),
283                ));
284            };
285
286            self.update_state_min(field_id, min_datum);
287        }
288
289        if value.max_is_exact() {
290            let Some(max_datum) = get_parquet_stat_max_as_datum(&ty, &value)? else {
291                return Err(Error::new(
292                    ErrorKind::Unexpected,
293                    format!("Statistics {value} is not match with field type {ty}."),
294                ));
295            };
296
297            self.update_state_max(field_id, max_datum);
298        }
299
300        Ok(())
301    }
302
303    /// Returns lower and upper bounds
304    fn produce(self) -> (HashMap<i32, Datum>, HashMap<i32, Datum>) {
305        (self.lower_bounds, self.upper_bounds)
306    }
307}
308
309impl ParquetWriter {
310    /// Converts parquet files to data files
311    #[allow(dead_code)]
312    pub(crate) async fn parquet_files_to_data_files(
313        file_io: &FileIO,
314        file_paths: Vec<String>,
315        table_metadata: &TableMetadata,
316    ) -> Result<Vec<DataFile>> {
317        // TODO: support adding to partitioned table
318        let mut data_files: Vec<DataFile> = Vec::new();
319
320        for file_path in file_paths {
321            let input_file = file_io.new_input(&file_path)?;
322            let file_metadata = input_file.metadata().await?;
323            let file_size_in_bytes = file_metadata.size as usize;
324            let reader = input_file.reader().await?;
325
326            let mut parquet_reader = ArrowFileReader::new(file_metadata, reader);
327            let parquet_metadata = parquet_reader.get_metadata(None).await.map_err(|err| {
328                Error::new(
329                    ErrorKind::DataInvalid,
330                    format!("Error reading Parquet metadata: {err}"),
331                )
332            })?;
333            let mut builder = ParquetWriter::parquet_to_data_file_builder(
334                table_metadata.current_schema().clone(),
335                parquet_metadata,
336                file_size_in_bytes,
337                file_path,
338                // TODO: Implement nan_value_counts here
339                HashMap::new(),
340            )?;
341            builder.partition_spec_id(table_metadata.default_partition_spec_id());
342            let data_file = builder.build().unwrap();
343            data_files.push(data_file);
344        }
345
346        Ok(data_files)
347    }
348
349    /// `ParquetMetadata` to data file builder
350    pub(crate) fn parquet_to_data_file_builder(
351        schema: SchemaRef,
352        metadata: Arc<ParquetMetaData>,
353        written_size: usize,
354        file_path: String,
355        nan_value_counts: HashMap<i32, u64>,
356    ) -> Result<DataFileBuilder> {
357        let index_by_parquet_path = {
358            let mut visitor = IndexByParquetPathName::new();
359            visit_schema(&schema, &mut visitor)?;
360            visitor
361        };
362
363        let (column_sizes, value_counts, null_value_counts, (lower_bounds, upper_bounds)) = {
364            let mut per_col_size: HashMap<i32, u64> = HashMap::new();
365            let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
366            let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
367            let mut min_max_agg = MinMaxColAggregator::new(schema);
368
369            for row_group in metadata.row_groups() {
370                for column_chunk_metadata in row_group.columns() {
371                    let parquet_path = column_chunk_metadata.column_descr().path().string();
372
373                    let Some(&field_id) = index_by_parquet_path.get(&parquet_path) else {
374                        continue;
375                    };
376
377                    *per_col_size.entry(field_id).or_insert(0) +=
378                        column_chunk_metadata.compressed_size() as u64;
379                    *per_col_val_num.entry(field_id).or_insert(0) +=
380                        column_chunk_metadata.num_values() as u64;
381
382                    if let Some(statistics) = column_chunk_metadata.statistics() {
383                        if let Some(null_count) = statistics.null_count_opt() {
384                            *per_col_null_val_num.entry(field_id).or_insert(0) += null_count;
385                        }
386
387                        min_max_agg.update(field_id, statistics.clone())?;
388                    }
389                }
390            }
391            (
392                per_col_size,
393                per_col_val_num,
394                per_col_null_val_num,
395                min_max_agg.produce(),
396            )
397        };
398
399        let mut builder = DataFileBuilder::default();
400        builder
401            .content(DataContentType::Data)
402            .file_path(file_path)
403            .file_format(DataFileFormat::Parquet)
404            .partition(Struct::empty())
405            .record_count(metadata.file_metadata().num_rows() as u64)
406            .file_size_in_bytes(written_size as u64)
407            .column_sizes(column_sizes)
408            .value_counts(value_counts)
409            .null_value_counts(null_value_counts)
410            .nan_value_counts(nan_value_counts)
411            // # NOTE:
412            // - We can ignore implementing distinct_counts due to this: https://lists.apache.org/thread/j52tsojv0x4bopxyzsp7m7bqt23n5fnd
413            .lower_bounds(lower_bounds)
414            .upper_bounds(upper_bounds)
415            .split_offsets(Some(
416                metadata
417                    .row_groups()
418                    .iter()
419                    .filter_map(|group| group.file_offset())
420                    .collect(),
421            ));
422
423        Ok(builder)
424    }
425
426    #[allow(dead_code)]
427    fn partition_value_from_bounds(
428        table_spec: Arc<PartitionSpec>,
429        lower_bounds: &HashMap<i32, Datum>,
430        upper_bounds: &HashMap<i32, Datum>,
431    ) -> Result<Struct> {
432        let mut partition_literals: Vec<Option<Literal>> = Vec::new();
433
434        for field in table_spec.fields() {
435            if let (Some(lower), Some(upper)) = (
436                lower_bounds.get(&field.source_id),
437                upper_bounds.get(&field.source_id),
438            ) {
439                if !field.transform.preserves_order() {
440                    return Err(Error::new(
441                        ErrorKind::DataInvalid,
442                        format!(
443                            "cannot infer partition value for non linear partition field (needs to preserve order): {} with transform {}",
444                            field.name, field.transform
445                        ),
446                    ));
447                }
448
449                if lower != upper {
450                    return Err(Error::new(
451                        ErrorKind::DataInvalid,
452                        format!(
453                            "multiple partition values for field {}: lower: {:?}, upper: {:?}",
454                            field.name, lower, upper
455                        ),
456                    ));
457                }
458
459                let transform_fn = create_transform_function(&field.transform)?;
460                let transform_literal =
461                    Literal::from(transform_fn.transform_literal_result(lower)?);
462
463                partition_literals.push(Some(transform_literal));
464            } else {
465                partition_literals.push(None);
466            }
467        }
468
469        let partition_struct = Struct::from_iter(partition_literals);
470
471        Ok(partition_struct)
472    }
473}
474
475impl FileWriter for ParquetWriter {
476    async fn write(&mut self, batch: &arrow_array::RecordBatch) -> Result<()> {
477        // Skip empty batch
478        if batch.num_rows() == 0 {
479            return Ok(());
480        }
481
482        self.current_row_num += batch.num_rows();
483
484        let batch_c = batch.clone();
485        self.nan_value_count_visitor
486            .compute(self.schema.clone(), batch_c)?;
487
488        // Lazy initialize the writer
489        let writer = if let Some(writer) = &mut self.inner_writer {
490            writer
491        } else {
492            let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
493            let inner_writer = self.output_file.writer().await?;
494            let async_writer = AsyncFileWriter::new(inner_writer);
495            let writer = AsyncArrowWriter::try_new(
496                async_writer,
497                arrow_schema.clone(),
498                Some(self.writer_properties.clone()),
499            )
500            .map_err(|err| {
501                Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.")
502                    .with_source(err)
503            })?;
504            self.inner_writer = Some(writer);
505            self.inner_writer.as_mut().unwrap()
506        };
507
508        writer.write(batch).await.map_err(|err| {
509            Error::new(
510                ErrorKind::Unexpected,
511                "Failed to write using parquet writer.",
512            )
513            .with_source(err)
514        })?;
515
516        Ok(())
517    }
518
519    async fn close(mut self) -> Result<Vec<DataFileBuilder>> {
520        let mut writer = match self.inner_writer.take() {
521            Some(writer) => writer,
522            None => return Ok(vec![]),
523        };
524
525        let metadata = writer.finish().await.map_err(|err| {
526            Error::new(ErrorKind::Unexpected, "Failed to finish parquet writer.").with_source(err)
527        })?;
528
529        let written_size = writer.bytes_written();
530
531        if self.current_row_num == 0 {
532            self.output_file.delete().await.map_err(|err| {
533                Error::new(
534                    ErrorKind::Unexpected,
535                    "Failed to delete empty parquet file.",
536                )
537                .with_source(err)
538            })?;
539            Ok(vec![])
540        } else {
541            let parquet_metadata = Arc::new(metadata);
542
543            Ok(vec![Self::parquet_to_data_file_builder(
544                self.schema,
545                parquet_metadata,
546                written_size,
547                self.output_file.location().to_string(),
548                self.nan_value_count_visitor.nan_value_counts,
549            )?])
550        }
551    }
552}
553
554impl CurrentFileStatus for ParquetWriter {
555    fn current_file_path(&self) -> String {
556        self.output_file.location().to_string()
557    }
558
559    fn current_row_num(&self) -> usize {
560        self.current_row_num
561    }
562
563    fn current_written_size(&self) -> usize {
564        if let Some(inner) = self.inner_writer.as_ref() {
565            // inner/AsyncArrowWriter contains sync and async writers
566            // written size = bytes flushed to inner's async writer + bytes buffered in the inner's sync writer
567            inner.bytes_written() + inner.in_progress_size()
568        } else {
569            // inner writer is not initialized yet
570            0
571        }
572    }
573}
574
575/// AsyncFileWriter is a wrapper of FileWrite to make it compatible with tokio::io::AsyncWrite.
576///
577/// # NOTES
578///
579/// We keep this wrapper been used inside only.
580struct AsyncFileWriter<W: FileWrite>(W);
581
582impl<W: FileWrite> AsyncFileWriter<W> {
583    /// Create a new `AsyncFileWriter` with the given writer.
584    pub fn new(writer: W) -> Self {
585        Self(writer)
586    }
587}
588
589impl<W: FileWrite> ArrowAsyncFileWriter for AsyncFileWriter<W> {
590    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
591        Box::pin(async {
592            self.0
593                .write(bs)
594                .await
595                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
596        })
597    }
598
599    fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
600        Box::pin(async {
601            self.0
602                .close()
603                .await
604                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
605        })
606    }
607}
608
609#[cfg(test)]
610mod tests {
611    use std::collections::HashMap;
612    use std::sync::Arc;
613
614    use anyhow::Result;
615    use arrow_array::builder::{Float32Builder, Int32Builder, MapBuilder};
616    use arrow_array::types::{Float32Type, Int64Type};
617    use arrow_array::{
618        Array, ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array,
619        Int64Array, ListArray, MapArray, RecordBatch, StructArray,
620    };
621    use arrow_schema::{DataType, Field, Fields, SchemaRef as ArrowSchemaRef};
622    use arrow_select::concat::concat_batches;
623    use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
624    use parquet::file::statistics::ValueStatistics;
625    use rust_decimal::Decimal;
626    use tempfile::TempDir;
627    use uuid::Uuid;
628
629    use super::*;
630    use crate::arrow::schema_to_arrow_schema;
631    use crate::io::FileIOBuilder;
632    use crate::spec::{PrimitiveLiteral, Struct, *};
633    use crate::writer::file_writer::location_generator::{
634        DefaultFileNameGenerator, DefaultLocationGenerator, FileNameGenerator, LocationGenerator,
635    };
636    use crate::writer::tests::check_parquet_data_file;
637
638    fn schema_for_all_type() -> Schema {
639        Schema::builder()
640            .with_schema_id(1)
641            .with_fields(vec![
642                NestedField::optional(0, "boolean", Type::Primitive(PrimitiveType::Boolean)).into(),
643                NestedField::optional(1, "int", Type::Primitive(PrimitiveType::Int)).into(),
644                NestedField::optional(2, "long", Type::Primitive(PrimitiveType::Long)).into(),
645                NestedField::optional(3, "float", Type::Primitive(PrimitiveType::Float)).into(),
646                NestedField::optional(4, "double", Type::Primitive(PrimitiveType::Double)).into(),
647                NestedField::optional(5, "string", Type::Primitive(PrimitiveType::String)).into(),
648                NestedField::optional(6, "binary", Type::Primitive(PrimitiveType::Binary)).into(),
649                NestedField::optional(7, "date", Type::Primitive(PrimitiveType::Date)).into(),
650                NestedField::optional(8, "time", Type::Primitive(PrimitiveType::Time)).into(),
651                NestedField::optional(9, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
652                    .into(),
653                NestedField::optional(
654                    10,
655                    "timestamptz",
656                    Type::Primitive(PrimitiveType::Timestamptz),
657                )
658                .into(),
659                NestedField::optional(
660                    11,
661                    "timestamp_ns",
662                    Type::Primitive(PrimitiveType::TimestampNs),
663                )
664                .into(),
665                NestedField::optional(
666                    12,
667                    "timestamptz_ns",
668                    Type::Primitive(PrimitiveType::TimestamptzNs),
669                )
670                .into(),
671                NestedField::optional(
672                    13,
673                    "decimal",
674                    Type::Primitive(PrimitiveType::Decimal {
675                        precision: 10,
676                        scale: 5,
677                    }),
678                )
679                .into(),
680                NestedField::optional(14, "uuid", Type::Primitive(PrimitiveType::Uuid)).into(),
681                NestedField::optional(15, "fixed", Type::Primitive(PrimitiveType::Fixed(10)))
682                    .into(),
683                // Parquet Statistics will use different representation for Decimal with precision 38 and scale 5,
684                // so we need to add a new field for it.
685                NestedField::optional(
686                    16,
687                    "decimal_38",
688                    Type::Primitive(PrimitiveType::Decimal {
689                        precision: 38,
690                        scale: 5,
691                    }),
692                )
693                .into(),
694            ])
695            .build()
696            .unwrap()
697    }
698
699    fn nested_schema_for_test() -> Schema {
700        // Int, Struct(Int,Int), String, List(Int), Struct(Struct(Int)), Map(String, List(Int))
701        Schema::builder()
702            .with_schema_id(1)
703            .with_fields(vec![
704                NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Long)).into(),
705                NestedField::required(
706                    1,
707                    "col1",
708                    Type::Struct(StructType::new(vec![
709                        NestedField::required(5, "col_1_5", Type::Primitive(PrimitiveType::Long))
710                            .into(),
711                        NestedField::required(6, "col_1_6", Type::Primitive(PrimitiveType::Long))
712                            .into(),
713                    ])),
714                )
715                .into(),
716                NestedField::required(2, "col2", Type::Primitive(PrimitiveType::String)).into(),
717                NestedField::required(
718                    3,
719                    "col3",
720                    Type::List(ListType::new(
721                        NestedField::required(7, "element", Type::Primitive(PrimitiveType::Long))
722                            .into(),
723                    )),
724                )
725                .into(),
726                NestedField::required(
727                    4,
728                    "col4",
729                    Type::Struct(StructType::new(vec![
730                        NestedField::required(
731                            8,
732                            "col_4_8",
733                            Type::Struct(StructType::new(vec![
734                                NestedField::required(
735                                    9,
736                                    "col_4_8_9",
737                                    Type::Primitive(PrimitiveType::Long),
738                                )
739                                .into(),
740                            ])),
741                        )
742                        .into(),
743                    ])),
744                )
745                .into(),
746                NestedField::required(
747                    10,
748                    "col5",
749                    Type::Map(MapType::new(
750                        NestedField::required(11, "key", Type::Primitive(PrimitiveType::String))
751                            .into(),
752                        NestedField::required(
753                            12,
754                            "value",
755                            Type::List(ListType::new(
756                                NestedField::required(
757                                    13,
758                                    "item",
759                                    Type::Primitive(PrimitiveType::Long),
760                                )
761                                .into(),
762                            )),
763                        )
764                        .into(),
765                    )),
766                )
767                .into(),
768            ])
769            .build()
770            .unwrap()
771    }
772
773    #[tokio::test]
774    async fn test_index_by_parquet_path() {
775        let expect = HashMap::from([
776            ("col0".to_string(), 0),
777            ("col1.col_1_5".to_string(), 5),
778            ("col1.col_1_6".to_string(), 6),
779            ("col2".to_string(), 2),
780            ("col3.list.element".to_string(), 7),
781            ("col4.col_4_8.col_4_8_9".to_string(), 9),
782            ("col5.key_value.key".to_string(), 11),
783            ("col5.key_value.value.list.item".to_string(), 13),
784        ]);
785        let mut visitor = IndexByParquetPathName::new();
786        visit_schema(&nested_schema_for_test(), &mut visitor).unwrap();
787        assert_eq!(visitor.name_to_id, expect);
788    }
789
790    #[tokio::test]
791    async fn test_parquet_writer() -> Result<()> {
792        let temp_dir = TempDir::new().unwrap();
793        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
794        let location_gen = DefaultLocationGenerator::with_data_location(
795            temp_dir.path().to_str().unwrap().to_string(),
796        );
797        let file_name_gen =
798            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
799
800        // prepare data
801        let schema = {
802            let fields =
803                vec![
804                    Field::new("col", DataType::Int64, true).with_metadata(HashMap::from([(
805                        PARQUET_FIELD_ID_META_KEY.to_string(),
806                        "0".to_string(),
807                    )])),
808                ];
809            Arc::new(arrow_schema::Schema::new(fields))
810        };
811        let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
812        let null_col = Arc::new(Int64Array::new_null(1024)) as ArrayRef;
813        let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
814        let to_write_null = RecordBatch::try_new(schema.clone(), vec![null_col]).unwrap();
815
816        let output_file = file_io.new_output(
817            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
818        )?;
819
820        // write data
821        let mut pw = ParquetWriterBuilder::new(
822            WriterProperties::builder()
823                .set_max_row_group_size(128)
824                .build(),
825            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
826        )
827        .build(output_file)
828        .await?;
829        pw.write(&to_write).await?;
830        pw.write(&to_write_null).await?;
831        let res = pw.close().await?;
832        assert_eq!(res.len(), 1);
833        let data_file = res
834            .into_iter()
835            .next()
836            .unwrap()
837            // Put dummy field for build successfully.
838            .content(DataContentType::Data)
839            .partition(Struct::empty())
840            .partition_spec_id(0)
841            .build()
842            .unwrap();
843
844        // check data file
845        assert_eq!(data_file.record_count(), 2048);
846        assert_eq!(*data_file.value_counts(), HashMap::from([(0, 2048)]));
847        assert_eq!(
848            *data_file.lower_bounds(),
849            HashMap::from([(0, Datum::long(0))])
850        );
851        assert_eq!(
852            *data_file.upper_bounds(),
853            HashMap::from([(0, Datum::long(1023))])
854        );
855        assert_eq!(*data_file.null_value_counts(), HashMap::from([(0, 1024)]));
856
857        // check the written file
858        let expect_batch = concat_batches(&schema, vec![&to_write, &to_write_null]).unwrap();
859        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
860
861        Ok(())
862    }
863
864    #[tokio::test]
865    async fn test_parquet_writer_with_complex_schema() -> Result<()> {
866        let temp_dir = TempDir::new().unwrap();
867        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
868        let location_gen = DefaultLocationGenerator::with_data_location(
869            temp_dir.path().to_str().unwrap().to_string(),
870        );
871        let file_name_gen =
872            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
873
874        // prepare data
875        let schema = nested_schema_for_test();
876        let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
877        let col0 = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
878        let col1 = Arc::new(StructArray::new(
879            {
880                if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
881                    fields.clone()
882                } else {
883                    unreachable!()
884                }
885            },
886            vec![
887                Arc::new(Int64Array::from_iter_values(0..1024)),
888                Arc::new(Int64Array::from_iter_values(0..1024)),
889            ],
890            None,
891        ));
892        let col2 = Arc::new(arrow_array::StringArray::from_iter_values(
893            (0..1024).map(|n| n.to_string()),
894        )) as ArrayRef;
895        let col3 = Arc::new({
896            let list_parts = arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(
897                (0..1024).map(|n| Some(vec![Some(n)])),
898            )
899            .into_parts();
900            arrow_array::ListArray::new(
901                {
902                    if let DataType::List(field) = arrow_schema.field(3).data_type() {
903                        field.clone()
904                    } else {
905                        unreachable!()
906                    }
907                },
908                list_parts.1,
909                list_parts.2,
910                list_parts.3,
911            )
912        }) as ArrayRef;
913        let col4 = Arc::new(StructArray::new(
914            {
915                if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
916                    fields.clone()
917                } else {
918                    unreachable!()
919                }
920            },
921            vec![Arc::new(StructArray::new(
922                {
923                    if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
924                        if let DataType::Struct(fields) = fields[0].data_type() {
925                            fields.clone()
926                        } else {
927                            unreachable!()
928                        }
929                    } else {
930                        unreachable!()
931                    }
932                },
933                vec![Arc::new(Int64Array::from_iter_values(0..1024))],
934                None,
935            ))],
936            None,
937        ));
938        let col5 = Arc::new({
939            let mut map_array_builder = MapBuilder::new(
940                None,
941                arrow_array::builder::StringBuilder::new(),
942                arrow_array::builder::ListBuilder::new(arrow_array::builder::PrimitiveBuilder::<
943                    Int64Type,
944                >::new()),
945            );
946            for i in 0..1024 {
947                map_array_builder.keys().append_value(i.to_string());
948                map_array_builder
949                    .values()
950                    .append_value(vec![Some(i as i64); i + 1]);
951                map_array_builder.append(true)?;
952            }
953            let (_, offset_buffer, struct_array, null_buffer, ordered) =
954                map_array_builder.finish().into_parts();
955            let struct_array = {
956                let (_, mut arrays, nulls) = struct_array.into_parts();
957                let list_array = {
958                    let list_array = arrays[1]
959                        .as_any()
960                        .downcast_ref::<ListArray>()
961                        .unwrap()
962                        .clone();
963                    let (_, offsets, array, nulls) = list_array.into_parts();
964                    let list_field = {
965                        if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
966                            if let DataType::Struct(fields) = map_field.data_type() {
967                                if let DataType::List(list_field) = fields[1].data_type() {
968                                    list_field.clone()
969                                } else {
970                                    unreachable!()
971                                }
972                            } else {
973                                unreachable!()
974                            }
975                        } else {
976                            unreachable!()
977                        }
978                    };
979                    ListArray::new(list_field, offsets, array, nulls)
980                };
981                arrays[1] = Arc::new(list_array) as ArrayRef;
982                StructArray::new(
983                    {
984                        if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
985                            if let DataType::Struct(fields) = map_field.data_type() {
986                                fields.clone()
987                            } else {
988                                unreachable!()
989                            }
990                        } else {
991                            unreachable!()
992                        }
993                    },
994                    arrays,
995                    nulls,
996                )
997            };
998            arrow_array::MapArray::new(
999                {
1000                    if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1001                        map_field.clone()
1002                    } else {
1003                        unreachable!()
1004                    }
1005                },
1006                offset_buffer,
1007                struct_array,
1008                null_buffer,
1009                ordered,
1010            )
1011        }) as ArrayRef;
1012        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1013            col0, col1, col2, col3, col4, col5,
1014        ])
1015        .unwrap();
1016        let output_file = file_io.new_output(
1017            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1018        )?;
1019
1020        // write data
1021        let mut pw =
1022            ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
1023                .build(output_file)
1024                .await?;
1025        pw.write(&to_write).await?;
1026        let res = pw.close().await?;
1027        assert_eq!(res.len(), 1);
1028        let data_file = res
1029            .into_iter()
1030            .next()
1031            .unwrap()
1032            // Put dummy field for build successfully.
1033            .content(crate::spec::DataContentType::Data)
1034            .partition(Struct::empty())
1035            .partition_spec_id(0)
1036            .build()
1037            .unwrap();
1038
1039        // check data file
1040        assert_eq!(data_file.record_count(), 1024);
1041        assert_eq!(
1042            *data_file.value_counts(),
1043            HashMap::from([
1044                (0, 1024),
1045                (5, 1024),
1046                (6, 1024),
1047                (2, 1024),
1048                (7, 1024),
1049                (9, 1024),
1050                (11, 1024),
1051                (13, (1..1025).sum()),
1052            ])
1053        );
1054        assert_eq!(
1055            *data_file.lower_bounds(),
1056            HashMap::from([
1057                (0, Datum::long(0)),
1058                (5, Datum::long(0)),
1059                (6, Datum::long(0)),
1060                (2, Datum::string("0")),
1061                (7, Datum::long(0)),
1062                (9, Datum::long(0)),
1063                (11, Datum::string("0")),
1064                (13, Datum::long(0))
1065            ])
1066        );
1067        assert_eq!(
1068            *data_file.upper_bounds(),
1069            HashMap::from([
1070                (0, Datum::long(1023)),
1071                (5, Datum::long(1023)),
1072                (6, Datum::long(1023)),
1073                (2, Datum::string("999")),
1074                (7, Datum::long(1023)),
1075                (9, Datum::long(1023)),
1076                (11, Datum::string("999")),
1077                (13, Datum::long(1023))
1078            ])
1079        );
1080
1081        // check the written file
1082        check_parquet_data_file(&file_io, &data_file, &to_write).await;
1083
1084        Ok(())
1085    }
1086
1087    #[tokio::test]
1088    async fn test_all_type_for_write() -> Result<()> {
1089        let temp_dir = TempDir::new().unwrap();
1090        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1091        let location_gen = DefaultLocationGenerator::with_data_location(
1092            temp_dir.path().to_str().unwrap().to_string(),
1093        );
1094        let file_name_gen =
1095            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1096
1097        // prepare data
1098        // generate iceberg schema for all type
1099        let schema = schema_for_all_type();
1100        let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
1101        let col0 = Arc::new(BooleanArray::from(vec![
1102            Some(true),
1103            Some(false),
1104            None,
1105            Some(true),
1106        ])) as ArrayRef;
1107        let col1 = Arc::new(Int32Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1108        let col2 = Arc::new(Int64Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1109        let col3 = Arc::new(arrow_array::Float32Array::from(vec![
1110            Some(0.5),
1111            Some(2.0),
1112            None,
1113            Some(3.5),
1114        ])) as ArrayRef;
1115        let col4 = Arc::new(arrow_array::Float64Array::from(vec![
1116            Some(0.5),
1117            Some(2.0),
1118            None,
1119            Some(3.5),
1120        ])) as ArrayRef;
1121        let col5 = Arc::new(arrow_array::StringArray::from(vec![
1122            Some("a"),
1123            Some("b"),
1124            None,
1125            Some("d"),
1126        ])) as ArrayRef;
1127        let col6 = Arc::new(arrow_array::LargeBinaryArray::from_opt_vec(vec![
1128            Some(b"one"),
1129            None,
1130            Some(b""),
1131            Some(b"zzzz"),
1132        ])) as ArrayRef;
1133        let col7 = Arc::new(arrow_array::Date32Array::from(vec![
1134            Some(0),
1135            Some(1),
1136            None,
1137            Some(3),
1138        ])) as ArrayRef;
1139        let col8 = Arc::new(arrow_array::Time64MicrosecondArray::from(vec![
1140            Some(0),
1141            Some(1),
1142            None,
1143            Some(3),
1144        ])) as ArrayRef;
1145        let col9 = Arc::new(arrow_array::TimestampMicrosecondArray::from(vec![
1146            Some(0),
1147            Some(1),
1148            None,
1149            Some(3),
1150        ])) as ArrayRef;
1151        let col10 = Arc::new(
1152            arrow_array::TimestampMicrosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1153                .with_timezone_utc(),
1154        ) as ArrayRef;
1155        let col11 = Arc::new(arrow_array::TimestampNanosecondArray::from(vec![
1156            Some(0),
1157            Some(1),
1158            None,
1159            Some(3),
1160        ])) as ArrayRef;
1161        let col12 = Arc::new(
1162            arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1163                .with_timezone_utc(),
1164        ) as ArrayRef;
1165        let col13 = Arc::new(
1166            arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1167                .with_precision_and_scale(10, 5)
1168                .unwrap(),
1169        ) as ArrayRef;
1170        let col14 = Arc::new(
1171            arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1172                vec![
1173                    Some(Uuid::from_u128(0).as_bytes().to_vec()),
1174                    Some(Uuid::from_u128(1).as_bytes().to_vec()),
1175                    None,
1176                    Some(Uuid::from_u128(3).as_bytes().to_vec()),
1177                ]
1178                .into_iter(),
1179                16,
1180            )
1181            .unwrap(),
1182        ) as ArrayRef;
1183        let col15 = Arc::new(
1184            arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1185                vec![
1186                    Some(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
1187                    Some(vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]),
1188                    None,
1189                    Some(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]),
1190                ]
1191                .into_iter(),
1192                10,
1193            )
1194            .unwrap(),
1195        ) as ArrayRef;
1196        let col16 = Arc::new(
1197            arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1198                .with_precision_and_scale(38, 5)
1199                .unwrap(),
1200        ) as ArrayRef;
1201        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1202            col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13,
1203            col14, col15, col16,
1204        ])
1205        .unwrap();
1206        let output_file = file_io.new_output(
1207            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1208        )?;
1209
1210        // write data
1211        let mut pw =
1212            ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
1213                .build(output_file)
1214                .await?;
1215        pw.write(&to_write).await?;
1216        let res = pw.close().await?;
1217        assert_eq!(res.len(), 1);
1218        let data_file = res
1219            .into_iter()
1220            .next()
1221            .unwrap()
1222            // Put dummy field for build successfully.
1223            .content(crate::spec::DataContentType::Data)
1224            .partition(Struct::empty())
1225            .partition_spec_id(0)
1226            .build()
1227            .unwrap();
1228
1229        // check data file
1230        assert_eq!(data_file.record_count(), 4);
1231        assert!(data_file.value_counts().iter().all(|(_, &v)| { v == 4 }));
1232        assert!(
1233            data_file
1234                .null_value_counts()
1235                .iter()
1236                .all(|(_, &v)| { v == 1 })
1237        );
1238        assert_eq!(
1239            *data_file.lower_bounds(),
1240            HashMap::from([
1241                (0, Datum::bool(false)),
1242                (1, Datum::int(1)),
1243                (2, Datum::long(1)),
1244                (3, Datum::float(0.5)),
1245                (4, Datum::double(0.5)),
1246                (5, Datum::string("a")),
1247                (6, Datum::binary(vec![])),
1248                (7, Datum::date(0)),
1249                (8, Datum::time_micros(0).unwrap()),
1250                (9, Datum::timestamp_micros(0)),
1251                (10, Datum::timestamptz_micros(0)),
1252                (11, Datum::timestamp_nanos(0)),
1253                (12, Datum::timestamptz_nanos(0)),
1254                (
1255                    13,
1256                    Datum::new(
1257                        PrimitiveType::Decimal {
1258                            precision: 10,
1259                            scale: 5
1260                        },
1261                        PrimitiveLiteral::Int128(1)
1262                    )
1263                ),
1264                (14, Datum::uuid(Uuid::from_u128(0))),
1265                (15, Datum::fixed(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1266                (
1267                    16,
1268                    Datum::new(
1269                        PrimitiveType::Decimal {
1270                            precision: 38,
1271                            scale: 5
1272                        },
1273                        PrimitiveLiteral::Int128(1)
1274                    )
1275                ),
1276            ])
1277        );
1278        assert_eq!(
1279            *data_file.upper_bounds(),
1280            HashMap::from([
1281                (0, Datum::bool(true)),
1282                (1, Datum::int(4)),
1283                (2, Datum::long(4)),
1284                (3, Datum::float(3.5)),
1285                (4, Datum::double(3.5)),
1286                (5, Datum::string("d")),
1287                (6, Datum::binary(vec![122, 122, 122, 122])),
1288                (7, Datum::date(3)),
1289                (8, Datum::time_micros(3).unwrap()),
1290                (9, Datum::timestamp_micros(3)),
1291                (10, Datum::timestamptz_micros(3)),
1292                (11, Datum::timestamp_nanos(3)),
1293                (12, Datum::timestamptz_nanos(3)),
1294                (
1295                    13,
1296                    Datum::new(
1297                        PrimitiveType::Decimal {
1298                            precision: 10,
1299                            scale: 5
1300                        },
1301                        PrimitiveLiteral::Int128(100)
1302                    )
1303                ),
1304                (14, Datum::uuid(Uuid::from_u128(3))),
1305                (
1306                    15,
1307                    Datum::fixed(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30])
1308                ),
1309                (
1310                    16,
1311                    Datum::new(
1312                        PrimitiveType::Decimal {
1313                            precision: 38,
1314                            scale: 5
1315                        },
1316                        PrimitiveLiteral::Int128(100)
1317                    )
1318                ),
1319            ])
1320        );
1321
1322        // check the written file
1323        check_parquet_data_file(&file_io, &data_file, &to_write).await;
1324
1325        Ok(())
1326    }
1327
1328    #[tokio::test]
1329    async fn test_decimal_bound() -> Result<()> {
1330        let temp_dir = TempDir::new().unwrap();
1331        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1332        let location_gen = DefaultLocationGenerator::with_data_location(
1333            temp_dir.path().to_str().unwrap().to_string(),
1334        );
1335        let file_name_gen =
1336            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1337
1338        // test 1.1 and 2.2
1339        let schema = Arc::new(
1340            Schema::builder()
1341                .with_fields(vec![
1342                    NestedField::optional(
1343                        0,
1344                        "decimal",
1345                        Type::Primitive(PrimitiveType::Decimal {
1346                            precision: 28,
1347                            scale: 10,
1348                        }),
1349                    )
1350                    .into(),
1351                ])
1352                .build()
1353                .unwrap(),
1354        );
1355        let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1356        let output_file = file_io.new_output(
1357            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1358        )?;
1359        let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1360            .build(output_file)
1361            .await?;
1362        let col0 = Arc::new(
1363            Decimal128Array::from(vec![Some(22000000000), Some(11000000000)])
1364                .with_data_type(DataType::Decimal128(28, 10)),
1365        ) as ArrayRef;
1366        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1367        pw.write(&to_write).await?;
1368        let res = pw.close().await?;
1369        assert_eq!(res.len(), 1);
1370        let data_file = res
1371            .into_iter()
1372            .next()
1373            .unwrap()
1374            .content(crate::spec::DataContentType::Data)
1375            .partition(Struct::empty())
1376            .partition_spec_id(0)
1377            .build()
1378            .unwrap();
1379        assert_eq!(
1380            data_file.upper_bounds().get(&0),
1381            Some(Datum::decimal_with_precision(Decimal::new(22000000000_i64, 10), 28).unwrap())
1382                .as_ref()
1383        );
1384        assert_eq!(
1385            data_file.lower_bounds().get(&0),
1386            Some(Datum::decimal_with_precision(Decimal::new(11000000000_i64, 10), 28).unwrap())
1387                .as_ref()
1388        );
1389
1390        // test -1.1 and -2.2
1391        let schema = Arc::new(
1392            Schema::builder()
1393                .with_fields(vec![
1394                    NestedField::optional(
1395                        0,
1396                        "decimal",
1397                        Type::Primitive(PrimitiveType::Decimal {
1398                            precision: 28,
1399                            scale: 10,
1400                        }),
1401                    )
1402                    .into(),
1403                ])
1404                .build()
1405                .unwrap(),
1406        );
1407        let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1408        let output_file = file_io.new_output(
1409            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1410        )?;
1411        let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1412            .build(output_file)
1413            .await?;
1414        let col0 = Arc::new(
1415            Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)])
1416                .with_data_type(DataType::Decimal128(28, 10)),
1417        ) as ArrayRef;
1418        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1419        pw.write(&to_write).await?;
1420        let res = pw.close().await?;
1421        assert_eq!(res.len(), 1);
1422        let data_file = res
1423            .into_iter()
1424            .next()
1425            .unwrap()
1426            .content(crate::spec::DataContentType::Data)
1427            .partition(Struct::empty())
1428            .partition_spec_id(0)
1429            .build()
1430            .unwrap();
1431        assert_eq!(
1432            data_file.upper_bounds().get(&0),
1433            Some(Datum::decimal_with_precision(Decimal::new(-11000000000_i64, 10), 28).unwrap())
1434                .as_ref()
1435        );
1436        assert_eq!(
1437            data_file.lower_bounds().get(&0),
1438            Some(Datum::decimal_with_precision(Decimal::new(-22000000000_i64, 10), 28).unwrap())
1439                .as_ref()
1440        );
1441
1442        // test max and min of rust_decimal
1443        let decimal_max = Decimal::MAX;
1444        let decimal_min = Decimal::MIN;
1445        assert_eq!(decimal_max.scale(), decimal_min.scale());
1446        let schema = Arc::new(
1447            Schema::builder()
1448                .with_fields(vec![
1449                    NestedField::optional(
1450                        0,
1451                        "decimal",
1452                        Type::Primitive(PrimitiveType::Decimal {
1453                            precision: 38,
1454                            scale: decimal_max.scale(),
1455                        }),
1456                    )
1457                    .into(),
1458                ])
1459                .build()
1460                .unwrap(),
1461        );
1462        let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1463        let output_file = file_io.new_output(
1464            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1465        )?;
1466        let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema)
1467            .build(output_file)
1468            .await?;
1469        let col0 = Arc::new(
1470            Decimal128Array::from(vec![
1471                Some(decimal_max.mantissa()),
1472                Some(decimal_min.mantissa()),
1473            ])
1474            .with_data_type(DataType::Decimal128(38, 0)),
1475        ) as ArrayRef;
1476        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1477        pw.write(&to_write).await?;
1478        let res = pw.close().await?;
1479        assert_eq!(res.len(), 1);
1480        let data_file = res
1481            .into_iter()
1482            .next()
1483            .unwrap()
1484            .content(crate::spec::DataContentType::Data)
1485            .partition(Struct::empty())
1486            .partition_spec_id(0)
1487            .build()
1488            .unwrap();
1489        assert_eq!(
1490            data_file.upper_bounds().get(&0),
1491            Some(Datum::decimal(decimal_max).unwrap()).as_ref()
1492        );
1493        assert_eq!(
1494            data_file.lower_bounds().get(&0),
1495            Some(Datum::decimal(decimal_min).unwrap()).as_ref()
1496        );
1497
1498        // test max and min for scale 38
1499        // # TODO
1500        // Readd this case after resolve https://github.com/apache/iceberg-rust/issues/669
1501        // let schema = Arc::new(
1502        //     Schema::builder()
1503        //         .with_fields(vec![NestedField::optional(
1504        //             0,
1505        //             "decimal",
1506        //             Type::Primitive(PrimitiveType::Decimal {
1507        //                 precision: 38,
1508        //                 scale: 0,
1509        //             }),
1510        //         )
1511        //         .into()])
1512        //         .build()
1513        //         .unwrap(),
1514        // );
1515        // let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1516        // let mut pw = ParquetWriterBuilder::new(
1517        //     WriterProperties::builder().build(),
1518        //     schema,
1519        //     file_io.clone(),
1520        //     loccation_gen,
1521        //     file_name_gen,
1522        // )
1523        // .build()
1524        // .await?;
1525        // let col0 = Arc::new(
1526        //     Decimal128Array::from(vec![
1527        //         Some(99999999999999999999999999999999999999_i128),
1528        //         Some(-99999999999999999999999999999999999999_i128),
1529        //     ])
1530        //     .with_data_type(DataType::Decimal128(38, 0)),
1531        // ) as ArrayRef;
1532        // let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1533        // pw.write(&to_write).await?;
1534        // let res = pw.close().await?;
1535        // assert_eq!(res.len(), 1);
1536        // let data_file = res
1537        //     .into_iter()
1538        //     .next()
1539        //     .unwrap()
1540        //     .content(crate::spec::DataContentType::Data)
1541        //     .partition(Struct::empty())
1542        //     .build()
1543        //     .unwrap();
1544        // assert_eq!(
1545        //     data_file.upper_bounds().get(&0),
1546        //     Some(Datum::new(
1547        //         PrimitiveType::Decimal {
1548        //             precision: 38,
1549        //             scale: 0
1550        //         },
1551        //         PrimitiveLiteral::Int128(99999999999999999999999999999999999999_i128)
1552        //     ))
1553        //     .as_ref()
1554        // );
1555        // assert_eq!(
1556        //     data_file.lower_bounds().get(&0),
1557        //     Some(Datum::new(
1558        //         PrimitiveType::Decimal {
1559        //             precision: 38,
1560        //             scale: 0
1561        //         },
1562        //         PrimitiveLiteral::Int128(-99999999999999999999999999999999999999_i128)
1563        //     ))
1564        //     .as_ref()
1565        // );
1566
1567        Ok(())
1568    }
1569
1570    #[tokio::test]
1571    async fn test_empty_write() -> Result<()> {
1572        let temp_dir = TempDir::new().unwrap();
1573        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1574        let location_gen = DefaultLocationGenerator::with_data_location(
1575            temp_dir.path().to_str().unwrap().to_string(),
1576        );
1577        let file_name_gen =
1578            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1579
1580        // Test that file will create if data to write
1581        let schema = {
1582            let fields = vec![
1583                arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true).with_metadata(
1584                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1585                ),
1586            ];
1587            Arc::new(arrow_schema::Schema::new(fields))
1588        };
1589        let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
1590        let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
1591        let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name());
1592        let output_file = file_io.new_output(&file_path)?;
1593        let mut pw = ParquetWriterBuilder::new(
1594            WriterProperties::builder().build(),
1595            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1596        )
1597        .build(output_file)
1598        .await?;
1599        pw.write(&to_write).await?;
1600        pw.close().await.unwrap();
1601        assert!(file_io.exists(&file_path).await.unwrap());
1602
1603        // Test that file will not create if no data to write
1604        let file_name_gen =
1605            DefaultFileNameGenerator::new("test_empty".to_string(), None, DataFileFormat::Parquet);
1606        let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name());
1607        let output_file = file_io.new_output(&file_path)?;
1608        let pw = ParquetWriterBuilder::new(
1609            WriterProperties::builder().build(),
1610            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1611        )
1612        .build(output_file)
1613        .await?;
1614        pw.close().await.unwrap();
1615        assert!(!file_io.exists(&file_path).await.unwrap());
1616
1617        Ok(())
1618    }
1619
1620    #[tokio::test]
1621    async fn test_nan_val_cnts_primitive_type() -> Result<()> {
1622        let temp_dir = TempDir::new().unwrap();
1623        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1624        let location_gen = DefaultLocationGenerator::with_data_location(
1625            temp_dir.path().to_str().unwrap().to_string(),
1626        );
1627        let file_name_gen =
1628            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1629
1630        // prepare data
1631        let arrow_schema = {
1632            let fields = vec![
1633                Field::new("col", arrow_schema::DataType::Float32, false).with_metadata(
1634                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1635                ),
1636                Field::new("col2", arrow_schema::DataType::Float64, false).with_metadata(
1637                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1638                ),
1639            ];
1640            Arc::new(arrow_schema::Schema::new(fields))
1641        };
1642
1643        let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1644            [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1645            None,
1646        )) as ArrayRef;
1647
1648        let float_64_col = Arc::new(Float64Array::from_iter_values_with_nulls(
1649            [1.0_f64, f64::NAN, 2.0, 2.0].into_iter(),
1650            None,
1651        )) as ArrayRef;
1652
1653        let to_write =
1654            RecordBatch::try_new(arrow_schema.clone(), vec![float_32_col, float_64_col]).unwrap();
1655        let output_file = file_io.new_output(
1656            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1657        )?;
1658
1659        // write data
1660        let mut pw = ParquetWriterBuilder::new(
1661            WriterProperties::builder().build(),
1662            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1663        )
1664        .build(output_file)
1665        .await?;
1666
1667        pw.write(&to_write).await?;
1668        let res = pw.close().await?;
1669        assert_eq!(res.len(), 1);
1670        let data_file = res
1671            .into_iter()
1672            .next()
1673            .unwrap()
1674            // Put dummy field for build successfully.
1675            .content(crate::spec::DataContentType::Data)
1676            .partition(Struct::empty())
1677            .partition_spec_id(0)
1678            .build()
1679            .unwrap();
1680
1681        // check data file
1682        assert_eq!(data_file.record_count(), 4);
1683        assert_eq!(*data_file.value_counts(), HashMap::from([(0, 4), (1, 4)]));
1684        assert_eq!(
1685            *data_file.lower_bounds(),
1686            HashMap::from([(0, Datum::float(1.0)), (1, Datum::double(1.0)),])
1687        );
1688        assert_eq!(
1689            *data_file.upper_bounds(),
1690            HashMap::from([(0, Datum::float(2.0)), (1, Datum::double(2.0)),])
1691        );
1692        assert_eq!(
1693            *data_file.null_value_counts(),
1694            HashMap::from([(0, 0), (1, 0)])
1695        );
1696        assert_eq!(
1697            *data_file.nan_value_counts(),
1698            HashMap::from([(0, 1), (1, 1)])
1699        );
1700
1701        // check the written file
1702        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1703        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1704
1705        Ok(())
1706    }
1707
1708    #[tokio::test]
1709    async fn test_nan_val_cnts_struct_type() -> Result<()> {
1710        let temp_dir = TempDir::new().unwrap();
1711        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1712        let location_gen = DefaultLocationGenerator::with_data_location(
1713            temp_dir.path().to_str().unwrap().to_string(),
1714        );
1715        let file_name_gen =
1716            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1717
1718        let schema_struct_float_fields = Fields::from(vec![
1719            Field::new("col4", DataType::Float32, false).with_metadata(HashMap::from([(
1720                PARQUET_FIELD_ID_META_KEY.to_string(),
1721                "4".to_string(),
1722            )])),
1723        ]);
1724
1725        let schema_struct_nested_float_fields = Fields::from(vec![
1726            Field::new("col7", DataType::Float32, false).with_metadata(HashMap::from([(
1727                PARQUET_FIELD_ID_META_KEY.to_string(),
1728                "7".to_string(),
1729            )])),
1730        ]);
1731
1732        let schema_struct_nested_fields = Fields::from(vec![
1733            Field::new(
1734                "col6",
1735                arrow_schema::DataType::Struct(schema_struct_nested_float_fields.clone()),
1736                false,
1737            )
1738            .with_metadata(HashMap::from([(
1739                PARQUET_FIELD_ID_META_KEY.to_string(),
1740                "6".to_string(),
1741            )])),
1742        ]);
1743
1744        // prepare data
1745        let arrow_schema = {
1746            let fields = vec![
1747                Field::new(
1748                    "col3",
1749                    arrow_schema::DataType::Struct(schema_struct_float_fields.clone()),
1750                    false,
1751                )
1752                .with_metadata(HashMap::from([(
1753                    PARQUET_FIELD_ID_META_KEY.to_string(),
1754                    "3".to_string(),
1755                )])),
1756                Field::new(
1757                    "col5",
1758                    arrow_schema::DataType::Struct(schema_struct_nested_fields.clone()),
1759                    false,
1760                )
1761                .with_metadata(HashMap::from([(
1762                    PARQUET_FIELD_ID_META_KEY.to_string(),
1763                    "5".to_string(),
1764                )])),
1765            ];
1766            Arc::new(arrow_schema::Schema::new(fields))
1767        };
1768
1769        let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1770            [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1771            None,
1772        )) as ArrayRef;
1773
1774        let struct_float_field_col = Arc::new(StructArray::new(
1775            schema_struct_float_fields,
1776            vec![float_32_col.clone()],
1777            None,
1778        )) as ArrayRef;
1779
1780        let struct_nested_float_field_col = Arc::new(StructArray::new(
1781            schema_struct_nested_fields,
1782            vec![Arc::new(StructArray::new(
1783                schema_struct_nested_float_fields,
1784                vec![float_32_col.clone()],
1785                None,
1786            )) as ArrayRef],
1787            None,
1788        )) as ArrayRef;
1789
1790        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1791            struct_float_field_col,
1792            struct_nested_float_field_col,
1793        ])
1794        .unwrap();
1795        let output_file = file_io.new_output(
1796            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1797        )?;
1798
1799        // write data
1800        let mut pw = ParquetWriterBuilder::new(
1801            WriterProperties::builder().build(),
1802            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1803        )
1804        .build(output_file)
1805        .await?;
1806
1807        pw.write(&to_write).await?;
1808        let res = pw.close().await?;
1809        assert_eq!(res.len(), 1);
1810        let data_file = res
1811            .into_iter()
1812            .next()
1813            .unwrap()
1814            // Put dummy field for build successfully.
1815            .content(crate::spec::DataContentType::Data)
1816            .partition(Struct::empty())
1817            .partition_spec_id(0)
1818            .build()
1819            .unwrap();
1820
1821        // check data file
1822        assert_eq!(data_file.record_count(), 4);
1823        assert_eq!(*data_file.value_counts(), HashMap::from([(4, 4), (7, 4)]));
1824        assert_eq!(
1825            *data_file.lower_bounds(),
1826            HashMap::from([(4, Datum::float(1.0)), (7, Datum::float(1.0)),])
1827        );
1828        assert_eq!(
1829            *data_file.upper_bounds(),
1830            HashMap::from([(4, Datum::float(2.0)), (7, Datum::float(2.0)),])
1831        );
1832        assert_eq!(
1833            *data_file.null_value_counts(),
1834            HashMap::from([(4, 0), (7, 0)])
1835        );
1836        assert_eq!(
1837            *data_file.nan_value_counts(),
1838            HashMap::from([(4, 1), (7, 1)])
1839        );
1840
1841        // check the written file
1842        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1843        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1844
1845        Ok(())
1846    }
1847
1848    #[tokio::test]
1849    async fn test_nan_val_cnts_list_type() -> Result<()> {
1850        let temp_dir = TempDir::new().unwrap();
1851        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1852        let location_gen = DefaultLocationGenerator::with_data_location(
1853            temp_dir.path().to_str().unwrap().to_string(),
1854        );
1855        let file_name_gen =
1856            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1857
1858        let schema_list_float_field = Field::new("element", DataType::Float32, true).with_metadata(
1859            HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1860        );
1861
1862        let schema_struct_list_float_field = Field::new("element", DataType::Float32, true)
1863            .with_metadata(HashMap::from([(
1864                PARQUET_FIELD_ID_META_KEY.to_string(),
1865                "4".to_string(),
1866            )]));
1867
1868        let schema_struct_list_field = Fields::from(vec![
1869            Field::new_list("col2", schema_struct_list_float_field.clone(), true).with_metadata(
1870                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
1871            ),
1872        ]);
1873
1874        let arrow_schema = {
1875            let fields = vec![
1876                Field::new_list("col0", schema_list_float_field.clone(), true).with_metadata(
1877                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1878                ),
1879                Field::new_struct("col1", schema_struct_list_field.clone(), true)
1880                    .with_metadata(HashMap::from([(
1881                        PARQUET_FIELD_ID_META_KEY.to_string(),
1882                        "2".to_string(),
1883                    )]))
1884                    .clone(),
1885                // Field::new_large_list("col3", schema_large_list_float_field.clone(), true).with_metadata(
1886                //     HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "5".to_string())]),
1887                // ).clone(),
1888            ];
1889            Arc::new(arrow_schema::Schema::new(fields))
1890        };
1891
1892        let list_parts = ListArray::from_iter_primitive::<Float32Type, _, _>(vec![Some(vec![
1893            Some(1.0_f32),
1894            Some(f32::NAN),
1895            Some(2.0),
1896            Some(2.0),
1897        ])])
1898        .into_parts();
1899
1900        let list_float_field_col = Arc::new({
1901            let list_parts = list_parts.clone();
1902            ListArray::new(
1903                {
1904                    if let DataType::List(field) = arrow_schema.field(0).data_type() {
1905                        field.clone()
1906                    } else {
1907                        unreachable!()
1908                    }
1909                },
1910                list_parts.1,
1911                list_parts.2,
1912                list_parts.3,
1913            )
1914        }) as ArrayRef;
1915
1916        let struct_list_fields_schema =
1917            if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
1918                fields.clone()
1919            } else {
1920                unreachable!()
1921            };
1922
1923        let struct_list_float_field_col = Arc::new({
1924            ListArray::new(
1925                {
1926                    if let DataType::List(field) = struct_list_fields_schema
1927                        .first()
1928                        .expect("could not find first list field")
1929                        .data_type()
1930                    {
1931                        field.clone()
1932                    } else {
1933                        unreachable!()
1934                    }
1935                },
1936                list_parts.1,
1937                list_parts.2,
1938                list_parts.3,
1939            )
1940        }) as ArrayRef;
1941
1942        let struct_list_float_field_col = Arc::new(StructArray::new(
1943            struct_list_fields_schema,
1944            vec![struct_list_float_field_col.clone()],
1945            None,
1946        )) as ArrayRef;
1947
1948        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1949            list_float_field_col,
1950            struct_list_float_field_col,
1951            // large_list_float_field_col,
1952        ])
1953        .expect("Could not form record batch");
1954        let output_file = file_io.new_output(
1955            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1956        )?;
1957
1958        // write data
1959        let mut pw = ParquetWriterBuilder::new(
1960            WriterProperties::builder().build(),
1961            Arc::new(
1962                to_write
1963                    .schema()
1964                    .as_ref()
1965                    .try_into()
1966                    .expect("Could not convert iceberg schema"),
1967            ),
1968        )
1969        .build(output_file)
1970        .await?;
1971
1972        pw.write(&to_write).await?;
1973        let res = pw.close().await?;
1974        assert_eq!(res.len(), 1);
1975        let data_file = res
1976            .into_iter()
1977            .next()
1978            .unwrap()
1979            .content(crate::spec::DataContentType::Data)
1980            .partition(Struct::empty())
1981            .partition_spec_id(0)
1982            .build()
1983            .unwrap();
1984
1985        // check data file
1986        assert_eq!(data_file.record_count(), 1);
1987        assert_eq!(*data_file.value_counts(), HashMap::from([(1, 4), (4, 4)]));
1988        assert_eq!(
1989            *data_file.lower_bounds(),
1990            HashMap::from([(1, Datum::float(1.0)), (4, Datum::float(1.0))])
1991        );
1992        assert_eq!(
1993            *data_file.upper_bounds(),
1994            HashMap::from([(1, Datum::float(2.0)), (4, Datum::float(2.0))])
1995        );
1996        assert_eq!(
1997            *data_file.null_value_counts(),
1998            HashMap::from([(1, 0), (4, 0)])
1999        );
2000        assert_eq!(
2001            *data_file.nan_value_counts(),
2002            HashMap::from([(1, 1), (4, 1)])
2003        );
2004
2005        // check the written file
2006        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2007        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2008
2009        Ok(())
2010    }
2011
2012    macro_rules! construct_map_arr {
2013        ($map_key_field_schema:ident, $map_value_field_schema:ident) => {{
2014            let int_builder = Int32Builder::new();
2015            let float_builder = Float32Builder::with_capacity(4);
2016            let mut builder = MapBuilder::new(None, int_builder, float_builder);
2017            builder.keys().append_value(1);
2018            builder.values().append_value(1.0_f32);
2019            builder.append(true).unwrap();
2020            builder.keys().append_value(2);
2021            builder.values().append_value(f32::NAN);
2022            builder.append(true).unwrap();
2023            builder.keys().append_value(3);
2024            builder.values().append_value(2.0);
2025            builder.append(true).unwrap();
2026            builder.keys().append_value(4);
2027            builder.values().append_value(2.0);
2028            builder.append(true).unwrap();
2029            let array = builder.finish();
2030
2031            let (_field, offsets, entries, nulls, ordered) = array.into_parts();
2032            let new_struct_fields_schema =
2033                Fields::from(vec![$map_key_field_schema, $map_value_field_schema]);
2034
2035            let entries = {
2036                let (_, arrays, nulls) = entries.into_parts();
2037                StructArray::new(new_struct_fields_schema.clone(), arrays, nulls)
2038            };
2039
2040            let field = Arc::new(Field::new(
2041                DEFAULT_MAP_FIELD_NAME,
2042                DataType::Struct(new_struct_fields_schema),
2043                false,
2044            ));
2045
2046            Arc::new(MapArray::new(field, offsets, entries, nulls, ordered))
2047        }};
2048    }
2049
2050    #[tokio::test]
2051    async fn test_nan_val_cnts_map_type() -> Result<()> {
2052        let temp_dir = TempDir::new().unwrap();
2053        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2054        let location_gen = DefaultLocationGenerator::with_data_location(
2055            temp_dir.path().to_str().unwrap().to_string(),
2056        );
2057        let file_name_gen =
2058            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2059
2060        let map_key_field_schema =
2061            Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2062                (PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()),
2063            ]));
2064
2065        let map_value_field_schema =
2066            Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2067                [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
2068            ));
2069
2070        let struct_map_key_field_schema =
2071            Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2072                (PARQUET_FIELD_ID_META_KEY.to_string(), "6".to_string()),
2073            ]));
2074
2075        let struct_map_value_field_schema =
2076            Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2077                [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())],
2078            ));
2079
2080        let schema_struct_map_field = Fields::from(vec![
2081            Field::new_map(
2082                "col3",
2083                DEFAULT_MAP_FIELD_NAME,
2084                struct_map_key_field_schema.clone(),
2085                struct_map_value_field_schema.clone(),
2086                false,
2087                false,
2088            )
2089            .with_metadata(HashMap::from([(
2090                PARQUET_FIELD_ID_META_KEY.to_string(),
2091                "5".to_string(),
2092            )])),
2093        ]);
2094
2095        let arrow_schema = {
2096            let fields = vec![
2097                Field::new_map(
2098                    "col0",
2099                    DEFAULT_MAP_FIELD_NAME,
2100                    map_key_field_schema.clone(),
2101                    map_value_field_schema.clone(),
2102                    false,
2103                    false,
2104                )
2105                .with_metadata(HashMap::from([(
2106                    PARQUET_FIELD_ID_META_KEY.to_string(),
2107                    "0".to_string(),
2108                )])),
2109                Field::new_struct("col1", schema_struct_map_field.clone(), true)
2110                    .with_metadata(HashMap::from([(
2111                        PARQUET_FIELD_ID_META_KEY.to_string(),
2112                        "3".to_string(),
2113                    )]))
2114                    .clone(),
2115            ];
2116            Arc::new(arrow_schema::Schema::new(fields))
2117        };
2118
2119        let map_array = construct_map_arr!(map_key_field_schema, map_value_field_schema);
2120
2121        let struct_map_arr =
2122            construct_map_arr!(struct_map_key_field_schema, struct_map_value_field_schema);
2123
2124        let struct_list_float_field_col = Arc::new(StructArray::new(
2125            schema_struct_map_field,
2126            vec![struct_map_arr],
2127            None,
2128        )) as ArrayRef;
2129
2130        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
2131            map_array,
2132            struct_list_float_field_col,
2133        ])
2134        .expect("Could not form record batch");
2135        let output_file = file_io.new_output(
2136            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
2137        )?;
2138
2139        // write data
2140        let mut pw = ParquetWriterBuilder::new(
2141            WriterProperties::builder().build(),
2142            Arc::new(
2143                to_write
2144                    .schema()
2145                    .as_ref()
2146                    .try_into()
2147                    .expect("Could not convert iceberg schema"),
2148            ),
2149        )
2150        .build(output_file)
2151        .await?;
2152
2153        pw.write(&to_write).await?;
2154        let res = pw.close().await?;
2155        assert_eq!(res.len(), 1);
2156        let data_file = res
2157            .into_iter()
2158            .next()
2159            .unwrap()
2160            .content(crate::spec::DataContentType::Data)
2161            .partition(Struct::empty())
2162            .partition_spec_id(0)
2163            .build()
2164            .unwrap();
2165
2166        // check data file
2167        assert_eq!(data_file.record_count(), 4);
2168        assert_eq!(
2169            *data_file.value_counts(),
2170            HashMap::from([(1, 4), (2, 4), (6, 4), (7, 4)])
2171        );
2172        assert_eq!(
2173            *data_file.lower_bounds(),
2174            HashMap::from([
2175                (1, Datum::int(1)),
2176                (2, Datum::float(1.0)),
2177                (6, Datum::int(1)),
2178                (7, Datum::float(1.0))
2179            ])
2180        );
2181        assert_eq!(
2182            *data_file.upper_bounds(),
2183            HashMap::from([
2184                (1, Datum::int(4)),
2185                (2, Datum::float(2.0)),
2186                (6, Datum::int(4)),
2187                (7, Datum::float(2.0))
2188            ])
2189        );
2190        assert_eq!(
2191            *data_file.null_value_counts(),
2192            HashMap::from([(1, 0), (2, 0), (6, 0), (7, 0)])
2193        );
2194        assert_eq!(
2195            *data_file.nan_value_counts(),
2196            HashMap::from([(2, 1), (7, 1)])
2197        );
2198
2199        // check the written file
2200        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2201        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2202
2203        Ok(())
2204    }
2205
2206    #[tokio::test]
2207    async fn test_write_empty_parquet_file() {
2208        let temp_dir = TempDir::new().unwrap();
2209        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2210        let location_gen = DefaultLocationGenerator::with_data_location(
2211            temp_dir.path().to_str().unwrap().to_string(),
2212        );
2213        let file_name_gen =
2214            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2215        let output_file = file_io
2216            .new_output(location_gen.generate_location(None, &file_name_gen.generate_file_name()))
2217            .unwrap();
2218
2219        // write data
2220        let pw = ParquetWriterBuilder::new(
2221            WriterProperties::builder().build(),
2222            Arc::new(
2223                Schema::builder()
2224                    .with_schema_id(1)
2225                    .with_fields(vec![
2226                        NestedField::required(0, "col", Type::Primitive(PrimitiveType::Long))
2227                            .with_id(0)
2228                            .into(),
2229                    ])
2230                    .build()
2231                    .expect("Failed to create schema"),
2232            ),
2233        )
2234        .build(output_file)
2235        .await
2236        .unwrap();
2237
2238        let res = pw.close().await.unwrap();
2239        assert_eq!(res.len(), 0);
2240
2241        // Check that file should have been deleted.
2242        assert_eq!(std::fs::read_dir(temp_dir.path()).unwrap().count(), 0);
2243    }
2244
2245    #[test]
2246    fn test_min_max_aggregator() {
2247        let schema = Arc::new(
2248            Schema::builder()
2249                .with_schema_id(1)
2250                .with_fields(vec![
2251                    NestedField::required(0, "col", Type::Primitive(PrimitiveType::Int))
2252                        .with_id(0)
2253                        .into(),
2254                ])
2255                .build()
2256                .expect("Failed to create schema"),
2257        );
2258        let mut min_max_agg = MinMaxColAggregator::new(schema);
2259        let create_statistics =
2260            |min, max| Statistics::Int32(ValueStatistics::new(min, max, None, None, false));
2261        min_max_agg
2262            .update(0, create_statistics(None, Some(42)))
2263            .unwrap();
2264        min_max_agg
2265            .update(0, create_statistics(Some(0), Some(i32::MAX)))
2266            .unwrap();
2267        min_max_agg
2268            .update(0, create_statistics(Some(i32::MIN), None))
2269            .unwrap();
2270        min_max_agg
2271            .update(0, create_statistics(None, None))
2272            .unwrap();
2273
2274        let (lower_bounds, upper_bounds) = min_max_agg.produce();
2275
2276        assert_eq!(lower_bounds, HashMap::from([(0, Datum::int(i32::MIN))]));
2277        assert_eq!(upper_bounds, HashMap::from([(0, Datum::int(i32::MAX))]));
2278    }
2279}