iceberg/writer/file_writer/
parquet_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The module contains the file writer for parquet file format.
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use arrow_schema::SchemaRef as ArrowSchemaRef;
24use bytes::Bytes;
25use futures::future::BoxFuture;
26use itertools::Itertools;
27use parquet::arrow::AsyncArrowWriter;
28use parquet::arrow::async_reader::AsyncFileReader;
29use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter;
30use parquet::file::metadata::ParquetMetaData;
31use parquet::file::properties::WriterProperties;
32use parquet::file::statistics::Statistics;
33
34use super::{FileWriter, FileWriterBuilder};
35use crate::arrow::{
36    ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor,
37    get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum,
38};
39use crate::io::{FileIO, FileWrite, OutputFile};
40use crate::spec::{
41    DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType,
42    NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct,
43    StructType, TableMetadata, Type, visit_schema,
44};
45use crate::transform::create_transform_function;
46use crate::writer::{CurrentFileStatus, DataFile};
47use crate::{Error, ErrorKind, Result};
48
49/// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
50#[derive(Clone, Debug)]
51pub struct ParquetWriterBuilder {
52    props: WriterProperties,
53    schema: SchemaRef,
54    match_mode: FieldMatchMode,
55}
56
57impl ParquetWriterBuilder {
58    /// Create a new `ParquetWriterBuilder`
59    /// To construct the write result, the schema should contain the `PARQUET_FIELD_ID_META_KEY` metadata for each field.
60    pub fn new(props: WriterProperties, schema: SchemaRef) -> Self {
61        Self::new_with_match_mode(props, schema, FieldMatchMode::Id)
62    }
63
64    /// Create a new `ParquetWriterBuilder` with custom match mode
65    pub fn new_with_match_mode(
66        props: WriterProperties,
67        schema: SchemaRef,
68        match_mode: FieldMatchMode,
69    ) -> Self {
70        Self {
71            props,
72            schema,
73            match_mode,
74        }
75    }
76}
77
78impl FileWriterBuilder for ParquetWriterBuilder {
79    type R = ParquetWriter;
80
81    async fn build(&self, output_file: OutputFile) -> Result<Self::R> {
82        Ok(ParquetWriter {
83            schema: self.schema.clone(),
84            inner_writer: None,
85            writer_properties: self.props.clone(),
86            current_row_num: 0,
87            output_file,
88            nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode),
89        })
90    }
91}
92
93/// A mapping from Parquet column path names to internal field id
94struct IndexByParquetPathName {
95    name_to_id: HashMap<String, i32>,
96
97    field_names: Vec<String>,
98
99    field_id: i32,
100}
101
102impl IndexByParquetPathName {
103    /// Creates a new, empty `IndexByParquetPathName`
104    pub fn new() -> Self {
105        Self {
106            name_to_id: HashMap::new(),
107            field_names: Vec::new(),
108            field_id: 0,
109        }
110    }
111
112    /// Retrieves the internal field ID
113    pub fn get(&self, name: &str) -> Option<&i32> {
114        self.name_to_id.get(name)
115    }
116}
117
118impl Default for IndexByParquetPathName {
119    fn default() -> Self {
120        Self::new()
121    }
122}
123
124impl SchemaVisitor for IndexByParquetPathName {
125    type T = ();
126
127    fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> {
128        self.field_names.push(field.name.to_string());
129        self.field_id = field.id;
130        Ok(())
131    }
132
133    fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> {
134        self.field_names.pop();
135        Ok(())
136    }
137
138    fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> {
139        self.field_names.push(format!("list.{}", field.name));
140        self.field_id = field.id;
141        Ok(())
142    }
143
144    fn after_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> {
145        self.field_names.pop();
146        Ok(())
147    }
148
149    fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> {
150        self.field_names
151            .push(format!("{DEFAULT_MAP_FIELD_NAME}.key"));
152        self.field_id = field.id;
153        Ok(())
154    }
155
156    fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> {
157        self.field_names.pop();
158        Ok(())
159    }
160
161    fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> {
162        self.field_names
163            .push(format!("{DEFAULT_MAP_FIELD_NAME}.value"));
164        self.field_id = field.id;
165        Ok(())
166    }
167
168    fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> {
169        self.field_names.pop();
170        Ok(())
171    }
172
173    fn schema(&mut self, _schema: &Schema, _value: Self::T) -> Result<Self::T> {
174        Ok(())
175    }
176
177    fn field(&mut self, _field: &NestedFieldRef, _value: Self::T) -> Result<Self::T> {
178        Ok(())
179    }
180
181    fn r#struct(&mut self, _struct: &StructType, _results: Vec<Self::T>) -> Result<Self::T> {
182        Ok(())
183    }
184
185    fn list(&mut self, _list: &ListType, _value: Self::T) -> Result<Self::T> {
186        Ok(())
187    }
188
189    fn map(&mut self, _map: &MapType, _key_value: Self::T, _value: Self::T) -> Result<Self::T> {
190        Ok(())
191    }
192
193    fn primitive(&mut self, _p: &PrimitiveType) -> Result<Self::T> {
194        let full_name = self.field_names.iter().map(String::as_str).join(".");
195        let field_id = self.field_id;
196        if let Some(existing_field_id) = self.name_to_id.get(full_name.as_str()) {
197            return Err(Error::new(
198                ErrorKind::DataInvalid,
199                format!(
200                    "Invalid schema: multiple fields for name {full_name}: {field_id} and {existing_field_id}"
201                ),
202            ));
203        } else {
204            self.name_to_id.insert(full_name, field_id);
205        }
206
207        Ok(())
208    }
209}
210
211/// `ParquetWriter`` is used to write arrow data into parquet file on storage.
212pub struct ParquetWriter {
213    schema: SchemaRef,
214    output_file: OutputFile,
215    inner_writer: Option<AsyncArrowWriter<AsyncFileWriter>>,
216    writer_properties: WriterProperties,
217    current_row_num: usize,
218    nan_value_count_visitor: NanValueCountVisitor,
219}
220
221/// Used to aggregate min and max value of each column.
222struct MinMaxColAggregator {
223    lower_bounds: HashMap<i32, Datum>,
224    upper_bounds: HashMap<i32, Datum>,
225    schema: SchemaRef,
226}
227
228impl MinMaxColAggregator {
229    /// Creates new and empty `MinMaxColAggregator`
230    fn new(schema: SchemaRef) -> Self {
231        Self {
232            lower_bounds: HashMap::new(),
233            upper_bounds: HashMap::new(),
234            schema,
235        }
236    }
237
238    fn update_state_min(&mut self, field_id: i32, datum: Datum) {
239        self.lower_bounds
240            .entry(field_id)
241            .and_modify(|e| {
242                if *e > datum {
243                    *e = datum.clone()
244                }
245            })
246            .or_insert(datum);
247    }
248
249    fn update_state_max(&mut self, field_id: i32, datum: Datum) {
250        self.upper_bounds
251            .entry(field_id)
252            .and_modify(|e| {
253                if *e < datum {
254                    *e = datum.clone()
255                }
256            })
257            .or_insert(datum);
258    }
259
260    /// Update statistics
261    fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> {
262        let Some(ty) = self
263            .schema
264            .field_by_id(field_id)
265            .map(|f| f.field_type.as_ref())
266        else {
267            // Following java implementation: https://github.com/apache/iceberg/blob/29a2c456353a6120b8c882ed2ab544975b168d7b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L163
268            // Ignore the field if it is not in schema.
269            return Ok(());
270        };
271        let Type::Primitive(ty) = ty.clone() else {
272            return Err(Error::new(
273                ErrorKind::Unexpected,
274                format!("Composed type {ty} is not supported for min max aggregation."),
275            ));
276        };
277
278        if value.min_is_exact() {
279            let Some(min_datum) = get_parquet_stat_min_as_datum(&ty, &value)? else {
280                return Err(Error::new(
281                    ErrorKind::Unexpected,
282                    format!("Statistics {value} is not match with field type {ty}."),
283                ));
284            };
285
286            self.update_state_min(field_id, min_datum);
287        }
288
289        if value.max_is_exact() {
290            let Some(max_datum) = get_parquet_stat_max_as_datum(&ty, &value)? else {
291                return Err(Error::new(
292                    ErrorKind::Unexpected,
293                    format!("Statistics {value} is not match with field type {ty}."),
294                ));
295            };
296
297            self.update_state_max(field_id, max_datum);
298        }
299
300        Ok(())
301    }
302
303    /// Returns lower and upper bounds
304    fn produce(self) -> (HashMap<i32, Datum>, HashMap<i32, Datum>) {
305        (self.lower_bounds, self.upper_bounds)
306    }
307}
308
309impl ParquetWriter {
310    /// Converts parquet files to data files
311    #[allow(dead_code)]
312    pub(crate) async fn parquet_files_to_data_files(
313        file_io: &FileIO,
314        file_paths: Vec<String>,
315        table_metadata: &TableMetadata,
316    ) -> Result<Vec<DataFile>> {
317        // TODO: support adding to partitioned table
318        let mut data_files: Vec<DataFile> = Vec::new();
319
320        for file_path in file_paths {
321            let input_file = file_io.new_input(&file_path)?;
322            let file_metadata = input_file.metadata().await?;
323            let file_size_in_bytes = file_metadata.size as usize;
324            let reader = input_file.reader().await?;
325
326            let mut parquet_reader = ArrowFileReader::new(file_metadata, reader);
327            let parquet_metadata = parquet_reader.get_metadata(None).await.map_err(|err| {
328                Error::new(
329                    ErrorKind::DataInvalid,
330                    format!("Error reading Parquet metadata: {err}"),
331                )
332            })?;
333            let mut builder = ParquetWriter::parquet_to_data_file_builder(
334                table_metadata.current_schema().clone(),
335                parquet_metadata,
336                file_size_in_bytes,
337                file_path,
338                // TODO: Implement nan_value_counts here
339                HashMap::new(),
340            )?;
341            builder.partition_spec_id(table_metadata.default_partition_spec_id());
342            let data_file = builder.build().unwrap();
343            data_files.push(data_file);
344        }
345
346        Ok(data_files)
347    }
348
349    /// `ParquetMetadata` to data file builder
350    pub(crate) fn parquet_to_data_file_builder(
351        schema: SchemaRef,
352        metadata: Arc<ParquetMetaData>,
353        written_size: usize,
354        file_path: String,
355        nan_value_counts: HashMap<i32, u64>,
356    ) -> Result<DataFileBuilder> {
357        let index_by_parquet_path = {
358            let mut visitor = IndexByParquetPathName::new();
359            visit_schema(&schema, &mut visitor)?;
360            visitor
361        };
362
363        let (column_sizes, value_counts, null_value_counts, (lower_bounds, upper_bounds)) = {
364            let mut per_col_size: HashMap<i32, u64> = HashMap::new();
365            let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
366            let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
367            let mut min_max_agg = MinMaxColAggregator::new(schema);
368
369            for row_group in metadata.row_groups() {
370                for column_chunk_metadata in row_group.columns() {
371                    let parquet_path = column_chunk_metadata.column_descr().path().string();
372
373                    let Some(&field_id) = index_by_parquet_path.get(&parquet_path) else {
374                        continue;
375                    };
376
377                    *per_col_size.entry(field_id).or_insert(0) +=
378                        column_chunk_metadata.compressed_size() as u64;
379                    *per_col_val_num.entry(field_id).or_insert(0) +=
380                        column_chunk_metadata.num_values() as u64;
381
382                    if let Some(statistics) = column_chunk_metadata.statistics() {
383                        if let Some(null_count) = statistics.null_count_opt() {
384                            *per_col_null_val_num.entry(field_id).or_insert(0) += null_count;
385                        }
386
387                        min_max_agg.update(field_id, statistics.clone())?;
388                    }
389                }
390            }
391            (
392                per_col_size,
393                per_col_val_num,
394                per_col_null_val_num,
395                min_max_agg.produce(),
396            )
397        };
398
399        let mut builder = DataFileBuilder::default();
400        builder
401            .content(DataContentType::Data)
402            .file_path(file_path)
403            .file_format(DataFileFormat::Parquet)
404            .partition(Struct::empty())
405            .record_count(metadata.file_metadata().num_rows() as u64)
406            .file_size_in_bytes(written_size as u64)
407            .column_sizes(column_sizes)
408            .value_counts(value_counts)
409            .null_value_counts(null_value_counts)
410            .nan_value_counts(nan_value_counts)
411            // # NOTE:
412            // - We can ignore implementing distinct_counts due to this: https://lists.apache.org/thread/j52tsojv0x4bopxyzsp7m7bqt23n5fnd
413            .lower_bounds(lower_bounds)
414            .upper_bounds(upper_bounds)
415            .split_offsets(Some(
416                metadata
417                    .row_groups()
418                    .iter()
419                    .filter_map(|group| group.file_offset())
420                    .collect(),
421            ));
422
423        Ok(builder)
424    }
425
426    #[allow(dead_code)]
427    fn partition_value_from_bounds(
428        table_spec: Arc<PartitionSpec>,
429        lower_bounds: &HashMap<i32, Datum>,
430        upper_bounds: &HashMap<i32, Datum>,
431    ) -> Result<Struct> {
432        let mut partition_literals: Vec<Option<Literal>> = Vec::new();
433
434        for field in table_spec.fields() {
435            if let (Some(lower), Some(upper)) = (
436                lower_bounds.get(&field.source_id),
437                upper_bounds.get(&field.source_id),
438            ) {
439                if !field.transform.preserves_order() {
440                    return Err(Error::new(
441                        ErrorKind::DataInvalid,
442                        format!(
443                            "cannot infer partition value for non linear partition field (needs to preserve order): {} with transform {}",
444                            field.name, field.transform
445                        ),
446                    ));
447                }
448
449                if lower != upper {
450                    return Err(Error::new(
451                        ErrorKind::DataInvalid,
452                        format!(
453                            "multiple partition values for field {}: lower: {:?}, upper: {:?}",
454                            field.name, lower, upper
455                        ),
456                    ));
457                }
458
459                let transform_fn = create_transform_function(&field.transform)?;
460                let transform_literal =
461                    Literal::from(transform_fn.transform_literal_result(lower)?);
462
463                partition_literals.push(Some(transform_literal));
464            } else {
465                partition_literals.push(None);
466            }
467        }
468
469        let partition_struct = Struct::from_iter(partition_literals);
470
471        Ok(partition_struct)
472    }
473}
474
475impl FileWriter for ParquetWriter {
476    async fn write(&mut self, batch: &arrow_array::RecordBatch) -> Result<()> {
477        // Skip empty batch
478        if batch.num_rows() == 0 {
479            return Ok(());
480        }
481
482        self.current_row_num += batch.num_rows();
483
484        let batch_c = batch.clone();
485        self.nan_value_count_visitor
486            .compute(self.schema.clone(), batch_c)?;
487
488        // Lazy initialize the writer
489        let writer = if let Some(writer) = &mut self.inner_writer {
490            writer
491        } else {
492            let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
493            let inner_writer = self.output_file.writer().await?;
494            let async_writer = AsyncFileWriter::new(inner_writer);
495            let writer = AsyncArrowWriter::try_new(
496                async_writer,
497                arrow_schema.clone(),
498                Some(self.writer_properties.clone()),
499            )
500            .map_err(|err| {
501                Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.")
502                    .with_source(err)
503            })?;
504            self.inner_writer = Some(writer);
505            self.inner_writer.as_mut().unwrap()
506        };
507
508        writer.write(batch).await.map_err(|err| {
509            Error::new(
510                ErrorKind::Unexpected,
511                "Failed to write using parquet writer.",
512            )
513            .with_source(err)
514        })?;
515
516        Ok(())
517    }
518
519    async fn close(mut self) -> Result<Vec<DataFileBuilder>> {
520        let mut writer = match self.inner_writer.take() {
521            Some(writer) => writer,
522            None => return Ok(vec![]),
523        };
524
525        let metadata = writer.finish().await.map_err(|err| {
526            Error::new(ErrorKind::Unexpected, "Failed to finish parquet writer.").with_source(err)
527        })?;
528
529        let written_size = writer.bytes_written();
530
531        if self.current_row_num == 0 {
532            self.output_file.delete().await.map_err(|err| {
533                Error::new(
534                    ErrorKind::Unexpected,
535                    "Failed to delete empty parquet file.",
536                )
537                .with_source(err)
538            })?;
539            Ok(vec![])
540        } else {
541            let parquet_metadata = Arc::new(metadata);
542
543            Ok(vec![Self::parquet_to_data_file_builder(
544                self.schema,
545                parquet_metadata,
546                written_size,
547                self.output_file.location().to_string(),
548                self.nan_value_count_visitor.nan_value_counts,
549            )?])
550        }
551    }
552}
553
554impl CurrentFileStatus for ParquetWriter {
555    fn current_file_path(&self) -> String {
556        self.output_file.location().to_string()
557    }
558
559    fn current_row_num(&self) -> usize {
560        self.current_row_num
561    }
562
563    fn current_written_size(&self) -> usize {
564        if let Some(inner) = self.inner_writer.as_ref() {
565            // inner/AsyncArrowWriter contains sync and async writers
566            // written size = bytes flushed to inner's async writer + bytes buffered in the inner's sync writer
567            inner.bytes_written() + inner.in_progress_size()
568        } else {
569            // inner writer is not initialized yet
570            0
571        }
572    }
573}
574
575/// AsyncFileWriter is a wrapper of FileWrite to make it compatible with tokio::io::AsyncWrite.
576///
577/// # NOTES
578///
579/// We keep this wrapper been used inside only.
580struct AsyncFileWriter(Box<dyn FileWrite>);
581
582impl AsyncFileWriter {
583    /// Create a new `AsyncFileWriter` with the given writer.
584    pub fn new(writer: Box<dyn FileWrite>) -> Self {
585        Self(writer)
586    }
587}
588
589impl ArrowAsyncFileWriter for AsyncFileWriter {
590    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
591        Box::pin(async {
592            self.0
593                .write(bs)
594                .await
595                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
596        })
597    }
598
599    fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
600        Box::pin(async {
601            self.0
602                .close()
603                .await
604                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
605        })
606    }
607}
608
609#[cfg(test)]
610mod tests {
611    use std::collections::HashMap;
612    use std::sync::Arc;
613
614    use anyhow::Result;
615    use arrow_array::builder::{Float32Builder, Int32Builder, MapBuilder};
616    use arrow_array::types::{Float32Type, Int64Type};
617    use arrow_array::{
618        Array, ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array,
619        Int64Array, ListArray, MapArray, RecordBatch, StructArray,
620    };
621    use arrow_schema::{DataType, Field, Fields, SchemaRef as ArrowSchemaRef};
622    use arrow_select::concat::concat_batches;
623    use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
624    use parquet::file::statistics::ValueStatistics;
625    use tempfile::TempDir;
626    use uuid::Uuid;
627
628    use super::*;
629    use crate::arrow::schema_to_arrow_schema;
630    use crate::io::FileIOBuilder;
631    use crate::spec::decimal_utils::{decimal_mantissa, decimal_new, decimal_scale};
632    use crate::spec::{PrimitiveLiteral, Struct, *};
633    use crate::writer::file_writer::location_generator::{
634        DefaultFileNameGenerator, DefaultLocationGenerator, FileNameGenerator, LocationGenerator,
635    };
636    use crate::writer::tests::check_parquet_data_file;
637
638    fn schema_for_all_type() -> Schema {
639        Schema::builder()
640            .with_schema_id(1)
641            .with_fields(vec![
642                NestedField::optional(0, "boolean", Type::Primitive(PrimitiveType::Boolean)).into(),
643                NestedField::optional(1, "int", Type::Primitive(PrimitiveType::Int)).into(),
644                NestedField::optional(2, "long", Type::Primitive(PrimitiveType::Long)).into(),
645                NestedField::optional(3, "float", Type::Primitive(PrimitiveType::Float)).into(),
646                NestedField::optional(4, "double", Type::Primitive(PrimitiveType::Double)).into(),
647                NestedField::optional(5, "string", Type::Primitive(PrimitiveType::String)).into(),
648                NestedField::optional(6, "binary", Type::Primitive(PrimitiveType::Binary)).into(),
649                NestedField::optional(7, "date", Type::Primitive(PrimitiveType::Date)).into(),
650                NestedField::optional(8, "time", Type::Primitive(PrimitiveType::Time)).into(),
651                NestedField::optional(9, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
652                    .into(),
653                NestedField::optional(
654                    10,
655                    "timestamptz",
656                    Type::Primitive(PrimitiveType::Timestamptz),
657                )
658                .into(),
659                NestedField::optional(
660                    11,
661                    "timestamp_ns",
662                    Type::Primitive(PrimitiveType::TimestampNs),
663                )
664                .into(),
665                NestedField::optional(
666                    12,
667                    "timestamptz_ns",
668                    Type::Primitive(PrimitiveType::TimestamptzNs),
669                )
670                .into(),
671                NestedField::optional(
672                    13,
673                    "decimal",
674                    Type::Primitive(PrimitiveType::Decimal {
675                        precision: 10,
676                        scale: 5,
677                    }),
678                )
679                .into(),
680                NestedField::optional(14, "uuid", Type::Primitive(PrimitiveType::Uuid)).into(),
681                NestedField::optional(15, "fixed", Type::Primitive(PrimitiveType::Fixed(10)))
682                    .into(),
683                // Parquet Statistics will use different representation for Decimal with precision 38 and scale 5,
684                // so we need to add a new field for it.
685                NestedField::optional(
686                    16,
687                    "decimal_38",
688                    Type::Primitive(PrimitiveType::Decimal {
689                        precision: 38,
690                        scale: 5,
691                    }),
692                )
693                .into(),
694            ])
695            .build()
696            .unwrap()
697    }
698
699    fn nested_schema_for_test() -> Schema {
700        // Int, Struct(Int,Int), String, List(Int), Struct(Struct(Int)), Map(String, List(Int))
701        Schema::builder()
702            .with_schema_id(1)
703            .with_fields(vec![
704                NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Long)).into(),
705                NestedField::required(
706                    1,
707                    "col1",
708                    Type::Struct(StructType::new(vec![
709                        NestedField::required(5, "col_1_5", Type::Primitive(PrimitiveType::Long))
710                            .into(),
711                        NestedField::required(6, "col_1_6", Type::Primitive(PrimitiveType::Long))
712                            .into(),
713                    ])),
714                )
715                .into(),
716                NestedField::required(2, "col2", Type::Primitive(PrimitiveType::String)).into(),
717                NestedField::required(
718                    3,
719                    "col3",
720                    Type::List(ListType::new(
721                        NestedField::required(7, "element", Type::Primitive(PrimitiveType::Long))
722                            .into(),
723                    )),
724                )
725                .into(),
726                NestedField::required(
727                    4,
728                    "col4",
729                    Type::Struct(StructType::new(vec![
730                        NestedField::required(
731                            8,
732                            "col_4_8",
733                            Type::Struct(StructType::new(vec![
734                                NestedField::required(
735                                    9,
736                                    "col_4_8_9",
737                                    Type::Primitive(PrimitiveType::Long),
738                                )
739                                .into(),
740                            ])),
741                        )
742                        .into(),
743                    ])),
744                )
745                .into(),
746                NestedField::required(
747                    10,
748                    "col5",
749                    Type::Map(MapType::new(
750                        NestedField::required(11, "key", Type::Primitive(PrimitiveType::String))
751                            .into(),
752                        NestedField::required(
753                            12,
754                            "value",
755                            Type::List(ListType::new(
756                                NestedField::required(
757                                    13,
758                                    "item",
759                                    Type::Primitive(PrimitiveType::Long),
760                                )
761                                .into(),
762                            )),
763                        )
764                        .into(),
765                    )),
766                )
767                .into(),
768            ])
769            .build()
770            .unwrap()
771    }
772
773    #[tokio::test]
774    async fn test_index_by_parquet_path() {
775        let expect = HashMap::from([
776            ("col0".to_string(), 0),
777            ("col1.col_1_5".to_string(), 5),
778            ("col1.col_1_6".to_string(), 6),
779            ("col2".to_string(), 2),
780            ("col3.list.element".to_string(), 7),
781            ("col4.col_4_8.col_4_8_9".to_string(), 9),
782            ("col5.key_value.key".to_string(), 11),
783            ("col5.key_value.value.list.item".to_string(), 13),
784        ]);
785        let mut visitor = IndexByParquetPathName::new();
786        visit_schema(&nested_schema_for_test(), &mut visitor).unwrap();
787        assert_eq!(visitor.name_to_id, expect);
788    }
789
790    #[tokio::test]
791    async fn test_parquet_writer() -> Result<()> {
792        let temp_dir = TempDir::new().unwrap();
793        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
794        let location_gen = DefaultLocationGenerator::with_data_location(
795            temp_dir.path().to_str().unwrap().to_string(),
796        );
797        let file_name_gen =
798            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
799
800        // prepare data
801        let schema = {
802            let fields =
803                vec![
804                    Field::new("col", DataType::Int64, true).with_metadata(HashMap::from([(
805                        PARQUET_FIELD_ID_META_KEY.to_string(),
806                        "0".to_string(),
807                    )])),
808                ];
809            Arc::new(arrow_schema::Schema::new(fields))
810        };
811        let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
812        let null_col = Arc::new(Int64Array::new_null(1024)) as ArrayRef;
813        let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
814        let to_write_null = RecordBatch::try_new(schema.clone(), vec![null_col]).unwrap();
815
816        let output_file = file_io.new_output(
817            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
818        )?;
819
820        // write data
821        let mut pw = ParquetWriterBuilder::new(
822            WriterProperties::builder()
823                .set_max_row_group_size(128)
824                .build(),
825            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
826        )
827        .build(output_file)
828        .await?;
829        pw.write(&to_write).await?;
830        pw.write(&to_write_null).await?;
831        let res = pw.close().await?;
832        assert_eq!(res.len(), 1);
833        let data_file = res
834            .into_iter()
835            .next()
836            .unwrap()
837            // Put dummy field for build successfully.
838            .content(DataContentType::Data)
839            .partition(Struct::empty())
840            .partition_spec_id(0)
841            .build()
842            .unwrap();
843
844        // check data file
845        assert_eq!(data_file.record_count(), 2048);
846        assert_eq!(*data_file.value_counts(), HashMap::from([(0, 2048)]));
847        assert_eq!(
848            *data_file.lower_bounds(),
849            HashMap::from([(0, Datum::long(0))])
850        );
851        assert_eq!(
852            *data_file.upper_bounds(),
853            HashMap::from([(0, Datum::long(1023))])
854        );
855        assert_eq!(*data_file.null_value_counts(), HashMap::from([(0, 1024)]));
856
857        // check the written file
858        let expect_batch = concat_batches(&schema, vec![&to_write, &to_write_null]).unwrap();
859        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
860
861        Ok(())
862    }
863
864    #[tokio::test]
865    async fn test_parquet_writer_with_complex_schema() -> Result<()> {
866        let temp_dir = TempDir::new().unwrap();
867        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
868        let location_gen = DefaultLocationGenerator::with_data_location(
869            temp_dir.path().to_str().unwrap().to_string(),
870        );
871        let file_name_gen =
872            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
873
874        // prepare data
875        let schema = nested_schema_for_test();
876        let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
877        let col0 = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
878        let col1 = Arc::new(StructArray::new(
879            {
880                if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
881                    fields.clone()
882                } else {
883                    unreachable!()
884                }
885            },
886            vec![
887                Arc::new(Int64Array::from_iter_values(0..1024)),
888                Arc::new(Int64Array::from_iter_values(0..1024)),
889            ],
890            None,
891        ));
892        let col2 = Arc::new(arrow_array::StringArray::from_iter_values(
893            (0..1024).map(|n| n.to_string()),
894        )) as ArrayRef;
895        let col3 = Arc::new({
896            let list_parts = arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(
897                (0..1024).map(|n| Some(vec![Some(n)])),
898            )
899            .into_parts();
900            arrow_array::ListArray::new(
901                {
902                    if let DataType::List(field) = arrow_schema.field(3).data_type() {
903                        field.clone()
904                    } else {
905                        unreachable!()
906                    }
907                },
908                list_parts.1,
909                list_parts.2,
910                list_parts.3,
911            )
912        }) as ArrayRef;
913        let col4 = Arc::new(StructArray::new(
914            {
915                if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
916                    fields.clone()
917                } else {
918                    unreachable!()
919                }
920            },
921            vec![Arc::new(StructArray::new(
922                {
923                    if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
924                        if let DataType::Struct(fields) = fields[0].data_type() {
925                            fields.clone()
926                        } else {
927                            unreachable!()
928                        }
929                    } else {
930                        unreachable!()
931                    }
932                },
933                vec![Arc::new(Int64Array::from_iter_values(0..1024))],
934                None,
935            ))],
936            None,
937        ));
938        let col5 = Arc::new({
939            let mut map_array_builder = MapBuilder::new(
940                None,
941                arrow_array::builder::StringBuilder::new(),
942                arrow_array::builder::ListBuilder::new(arrow_array::builder::PrimitiveBuilder::<
943                    Int64Type,
944                >::new()),
945            );
946            for i in 0..1024 {
947                map_array_builder.keys().append_value(i.to_string());
948                map_array_builder
949                    .values()
950                    .append_value(vec![Some(i as i64); i + 1]);
951                map_array_builder.append(true)?;
952            }
953            let (_, offset_buffer, struct_array, null_buffer, ordered) =
954                map_array_builder.finish().into_parts();
955            let struct_array = {
956                let (_, mut arrays, nulls) = struct_array.into_parts();
957                let list_array = {
958                    let list_array = arrays[1]
959                        .as_any()
960                        .downcast_ref::<ListArray>()
961                        .unwrap()
962                        .clone();
963                    let (_, offsets, array, nulls) = list_array.into_parts();
964                    let list_field = {
965                        if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
966                            if let DataType::Struct(fields) = map_field.data_type() {
967                                if let DataType::List(list_field) = fields[1].data_type() {
968                                    list_field.clone()
969                                } else {
970                                    unreachable!()
971                                }
972                            } else {
973                                unreachable!()
974                            }
975                        } else {
976                            unreachable!()
977                        }
978                    };
979                    ListArray::new(list_field, offsets, array, nulls)
980                };
981                arrays[1] = Arc::new(list_array) as ArrayRef;
982                StructArray::new(
983                    {
984                        if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
985                            if let DataType::Struct(fields) = map_field.data_type() {
986                                fields.clone()
987                            } else {
988                                unreachable!()
989                            }
990                        } else {
991                            unreachable!()
992                        }
993                    },
994                    arrays,
995                    nulls,
996                )
997            };
998            arrow_array::MapArray::new(
999                {
1000                    if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1001                        map_field.clone()
1002                    } else {
1003                        unreachable!()
1004                    }
1005                },
1006                offset_buffer,
1007                struct_array,
1008                null_buffer,
1009                ordered,
1010            )
1011        }) as ArrayRef;
1012        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1013            col0, col1, col2, col3, col4, col5,
1014        ])
1015        .unwrap();
1016        let output_file = file_io.new_output(
1017            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1018        )?;
1019
1020        // write data
1021        let mut pw =
1022            ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
1023                .build(output_file)
1024                .await?;
1025        pw.write(&to_write).await?;
1026        let res = pw.close().await?;
1027        assert_eq!(res.len(), 1);
1028        let data_file = res
1029            .into_iter()
1030            .next()
1031            .unwrap()
1032            // Put dummy field for build successfully.
1033            .content(crate::spec::DataContentType::Data)
1034            .partition(Struct::empty())
1035            .partition_spec_id(0)
1036            .build()
1037            .unwrap();
1038
1039        // check data file
1040        assert_eq!(data_file.record_count(), 1024);
1041        assert_eq!(
1042            *data_file.value_counts(),
1043            HashMap::from([
1044                (0, 1024),
1045                (5, 1024),
1046                (6, 1024),
1047                (2, 1024),
1048                (7, 1024),
1049                (9, 1024),
1050                (11, 1024),
1051                (13, (1..1025).sum()),
1052            ])
1053        );
1054        assert_eq!(
1055            *data_file.lower_bounds(),
1056            HashMap::from([
1057                (0, Datum::long(0)),
1058                (5, Datum::long(0)),
1059                (6, Datum::long(0)),
1060                (2, Datum::string("0")),
1061                (7, Datum::long(0)),
1062                (9, Datum::long(0)),
1063                (11, Datum::string("0")),
1064                (13, Datum::long(0))
1065            ])
1066        );
1067        assert_eq!(
1068            *data_file.upper_bounds(),
1069            HashMap::from([
1070                (0, Datum::long(1023)),
1071                (5, Datum::long(1023)),
1072                (6, Datum::long(1023)),
1073                (2, Datum::string("999")),
1074                (7, Datum::long(1023)),
1075                (9, Datum::long(1023)),
1076                (11, Datum::string("999")),
1077                (13, Datum::long(1023))
1078            ])
1079        );
1080
1081        // check the written file
1082        check_parquet_data_file(&file_io, &data_file, &to_write).await;
1083
1084        Ok(())
1085    }
1086
1087    #[tokio::test]
1088    async fn test_all_type_for_write() -> Result<()> {
1089        let temp_dir = TempDir::new().unwrap();
1090        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1091        let location_gen = DefaultLocationGenerator::with_data_location(
1092            temp_dir.path().to_str().unwrap().to_string(),
1093        );
1094        let file_name_gen =
1095            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1096
1097        // prepare data
1098        // generate iceberg schema for all type
1099        let schema = schema_for_all_type();
1100        let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
1101        let col0 = Arc::new(BooleanArray::from(vec![
1102            Some(true),
1103            Some(false),
1104            None,
1105            Some(true),
1106        ])) as ArrayRef;
1107        let col1 = Arc::new(Int32Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1108        let col2 = Arc::new(Int64Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1109        let col3 = Arc::new(arrow_array::Float32Array::from(vec![
1110            Some(0.5),
1111            Some(2.0),
1112            None,
1113            Some(3.5),
1114        ])) as ArrayRef;
1115        let col4 = Arc::new(arrow_array::Float64Array::from(vec![
1116            Some(0.5),
1117            Some(2.0),
1118            None,
1119            Some(3.5),
1120        ])) as ArrayRef;
1121        let col5 = Arc::new(arrow_array::StringArray::from(vec![
1122            Some("a"),
1123            Some("b"),
1124            None,
1125            Some("d"),
1126        ])) as ArrayRef;
1127        let col6 = Arc::new(arrow_array::LargeBinaryArray::from_opt_vec(vec![
1128            Some(b"one"),
1129            None,
1130            Some(b""),
1131            Some(b"zzzz"),
1132        ])) as ArrayRef;
1133        let col7 = Arc::new(arrow_array::Date32Array::from(vec![
1134            Some(0),
1135            Some(1),
1136            None,
1137            Some(3),
1138        ])) as ArrayRef;
1139        let col8 = Arc::new(arrow_array::Time64MicrosecondArray::from(vec![
1140            Some(0),
1141            Some(1),
1142            None,
1143            Some(3),
1144        ])) as ArrayRef;
1145        let col9 = Arc::new(arrow_array::TimestampMicrosecondArray::from(vec![
1146            Some(0),
1147            Some(1),
1148            None,
1149            Some(3),
1150        ])) as ArrayRef;
1151        let col10 = Arc::new(
1152            arrow_array::TimestampMicrosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1153                .with_timezone_utc(),
1154        ) as ArrayRef;
1155        let col11 = Arc::new(arrow_array::TimestampNanosecondArray::from(vec![
1156            Some(0),
1157            Some(1),
1158            None,
1159            Some(3),
1160        ])) as ArrayRef;
1161        let col12 = Arc::new(
1162            arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1163                .with_timezone_utc(),
1164        ) as ArrayRef;
1165        let col13 = Arc::new(
1166            arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1167                .with_precision_and_scale(10, 5)
1168                .unwrap(),
1169        ) as ArrayRef;
1170        let col14 = Arc::new(
1171            arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1172                vec![
1173                    Some(Uuid::from_u128(0).as_bytes().to_vec()),
1174                    Some(Uuid::from_u128(1).as_bytes().to_vec()),
1175                    None,
1176                    Some(Uuid::from_u128(3).as_bytes().to_vec()),
1177                ]
1178                .into_iter(),
1179                16,
1180            )
1181            .unwrap(),
1182        ) as ArrayRef;
1183        let col15 = Arc::new(
1184            arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1185                vec![
1186                    Some(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
1187                    Some(vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]),
1188                    None,
1189                    Some(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]),
1190                ]
1191                .into_iter(),
1192                10,
1193            )
1194            .unwrap(),
1195        ) as ArrayRef;
1196        let col16 = Arc::new(
1197            arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1198                .with_precision_and_scale(38, 5)
1199                .unwrap(),
1200        ) as ArrayRef;
1201        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1202            col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13,
1203            col14, col15, col16,
1204        ])
1205        .unwrap();
1206        let output_file = file_io.new_output(
1207            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1208        )?;
1209
1210        // write data
1211        let mut pw =
1212            ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
1213                .build(output_file)
1214                .await?;
1215        pw.write(&to_write).await?;
1216        let res = pw.close().await?;
1217        assert_eq!(res.len(), 1);
1218        let data_file = res
1219            .into_iter()
1220            .next()
1221            .unwrap()
1222            // Put dummy field for build successfully.
1223            .content(crate::spec::DataContentType::Data)
1224            .partition(Struct::empty())
1225            .partition_spec_id(0)
1226            .build()
1227            .unwrap();
1228
1229        // check data file
1230        assert_eq!(data_file.record_count(), 4);
1231        assert!(data_file.value_counts().iter().all(|(_, &v)| { v == 4 }));
1232        assert!(
1233            data_file
1234                .null_value_counts()
1235                .iter()
1236                .all(|(_, &v)| { v == 1 })
1237        );
1238        assert_eq!(
1239            *data_file.lower_bounds(),
1240            HashMap::from([
1241                (0, Datum::bool(false)),
1242                (1, Datum::int(1)),
1243                (2, Datum::long(1)),
1244                (3, Datum::float(0.5)),
1245                (4, Datum::double(0.5)),
1246                (5, Datum::string("a")),
1247                (6, Datum::binary(vec![])),
1248                (7, Datum::date(0)),
1249                (8, Datum::time_micros(0).unwrap()),
1250                (9, Datum::timestamp_micros(0)),
1251                (10, Datum::timestamptz_micros(0)),
1252                (11, Datum::timestamp_nanos(0)),
1253                (12, Datum::timestamptz_nanos(0)),
1254                (
1255                    13,
1256                    Datum::new(
1257                        PrimitiveType::Decimal {
1258                            precision: 10,
1259                            scale: 5
1260                        },
1261                        PrimitiveLiteral::Int128(1)
1262                    )
1263                ),
1264                (14, Datum::uuid(Uuid::from_u128(0))),
1265                (15, Datum::fixed(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1266                (
1267                    16,
1268                    Datum::new(
1269                        PrimitiveType::Decimal {
1270                            precision: 38,
1271                            scale: 5
1272                        },
1273                        PrimitiveLiteral::Int128(1)
1274                    )
1275                ),
1276            ])
1277        );
1278        assert_eq!(
1279            *data_file.upper_bounds(),
1280            HashMap::from([
1281                (0, Datum::bool(true)),
1282                (1, Datum::int(4)),
1283                (2, Datum::long(4)),
1284                (3, Datum::float(3.5)),
1285                (4, Datum::double(3.5)),
1286                (5, Datum::string("d")),
1287                (6, Datum::binary(vec![122, 122, 122, 122])),
1288                (7, Datum::date(3)),
1289                (8, Datum::time_micros(3).unwrap()),
1290                (9, Datum::timestamp_micros(3)),
1291                (10, Datum::timestamptz_micros(3)),
1292                (11, Datum::timestamp_nanos(3)),
1293                (12, Datum::timestamptz_nanos(3)),
1294                (
1295                    13,
1296                    Datum::new(
1297                        PrimitiveType::Decimal {
1298                            precision: 10,
1299                            scale: 5
1300                        },
1301                        PrimitiveLiteral::Int128(100)
1302                    )
1303                ),
1304                (14, Datum::uuid(Uuid::from_u128(3))),
1305                (
1306                    15,
1307                    Datum::fixed(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30])
1308                ),
1309                (
1310                    16,
1311                    Datum::new(
1312                        PrimitiveType::Decimal {
1313                            precision: 38,
1314                            scale: 5
1315                        },
1316                        PrimitiveLiteral::Int128(100)
1317                    )
1318                ),
1319            ])
1320        );
1321
1322        // check the written file
1323        check_parquet_data_file(&file_io, &data_file, &to_write).await;
1324
1325        Ok(())
1326    }
1327
1328    #[tokio::test]
1329    async fn test_decimal_bound() -> Result<()> {
1330        let temp_dir = TempDir::new().unwrap();
1331        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1332        let location_gen = DefaultLocationGenerator::with_data_location(
1333            temp_dir.path().to_str().unwrap().to_string(),
1334        );
1335        let file_name_gen =
1336            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1337
1338        // test 1.1 and 2.2
1339        let schema = Arc::new(
1340            Schema::builder()
1341                .with_fields(vec![
1342                    NestedField::optional(
1343                        0,
1344                        "decimal",
1345                        Type::Primitive(PrimitiveType::Decimal {
1346                            precision: 28,
1347                            scale: 10,
1348                        }),
1349                    )
1350                    .into(),
1351                ])
1352                .build()
1353                .unwrap(),
1354        );
1355        let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1356        let output_file = file_io.new_output(
1357            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1358        )?;
1359        let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1360            .build(output_file)
1361            .await?;
1362        let col0 = Arc::new(
1363            Decimal128Array::from(vec![Some(22000000000), Some(11000000000)])
1364                .with_data_type(DataType::Decimal128(28, 10)),
1365        ) as ArrayRef;
1366        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1367        pw.write(&to_write).await?;
1368        let res = pw.close().await?;
1369        assert_eq!(res.len(), 1);
1370        let data_file = res
1371            .into_iter()
1372            .next()
1373            .unwrap()
1374            .content(crate::spec::DataContentType::Data)
1375            .partition(Struct::empty())
1376            .partition_spec_id(0)
1377            .build()
1378            .unwrap();
1379        assert_eq!(
1380            data_file.upper_bounds().get(&0),
1381            Some(Datum::decimal_with_precision(decimal_new(22000000000_i64, 10), 28).unwrap())
1382                .as_ref()
1383        );
1384        assert_eq!(
1385            data_file.lower_bounds().get(&0),
1386            Some(Datum::decimal_with_precision(decimal_new(11000000000_i64, 10), 28).unwrap())
1387                .as_ref()
1388        );
1389
1390        // test -1.1 and -2.2
1391        let schema = Arc::new(
1392            Schema::builder()
1393                .with_fields(vec![
1394                    NestedField::optional(
1395                        0,
1396                        "decimal",
1397                        Type::Primitive(PrimitiveType::Decimal {
1398                            precision: 28,
1399                            scale: 10,
1400                        }),
1401                    )
1402                    .into(),
1403                ])
1404                .build()
1405                .unwrap(),
1406        );
1407        let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1408        let output_file = file_io.new_output(
1409            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1410        )?;
1411        let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1412            .build(output_file)
1413            .await?;
1414        let col0 = Arc::new(
1415            Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)])
1416                .with_data_type(DataType::Decimal128(28, 10)),
1417        ) as ArrayRef;
1418        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1419        pw.write(&to_write).await?;
1420        let res = pw.close().await?;
1421        assert_eq!(res.len(), 1);
1422        let data_file = res
1423            .into_iter()
1424            .next()
1425            .unwrap()
1426            .content(crate::spec::DataContentType::Data)
1427            .partition(Struct::empty())
1428            .partition_spec_id(0)
1429            .build()
1430            .unwrap();
1431        assert_eq!(
1432            data_file.upper_bounds().get(&0),
1433            Some(Datum::decimal_with_precision(decimal_new(-11000000000_i64, 10), 28).unwrap())
1434                .as_ref()
1435        );
1436        assert_eq!(
1437            data_file.lower_bounds().get(&0),
1438            Some(Datum::decimal_with_precision(decimal_new(-22000000000_i64, 10), 28).unwrap())
1439                .as_ref()
1440        );
1441
1442        // test 38-digit precision decimal values (Iceberg spec max)
1443        // Note: fastnum D128::MAX/MIN have impractical exponents, so we use meaningful values
1444        use crate::spec::decimal_utils::decimal_from_str_exact;
1445        let decimal_max = decimal_from_str_exact("99999999999999999999999999999999999999").unwrap();
1446        let decimal_min =
1447            decimal_from_str_exact("-99999999999999999999999999999999999999").unwrap();
1448        assert_eq!(decimal_scale(&decimal_max), decimal_scale(&decimal_min));
1449        let schema = Arc::new(
1450            Schema::builder()
1451                .with_fields(vec![
1452                    NestedField::optional(
1453                        0,
1454                        "decimal",
1455                        Type::Primitive(PrimitiveType::Decimal {
1456                            precision: 38,
1457                            scale: decimal_scale(&decimal_max),
1458                        }),
1459                    )
1460                    .into(),
1461                ])
1462                .build()
1463                .unwrap(),
1464        );
1465        let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1466        let output_file = file_io.new_output(
1467            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1468        )?;
1469        let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema)
1470            .build(output_file)
1471            .await?;
1472        let col0 = Arc::new(
1473            Decimal128Array::from(vec![
1474                Some(decimal_mantissa(&decimal_max)),
1475                Some(decimal_mantissa(&decimal_min)),
1476            ])
1477            .with_data_type(DataType::Decimal128(38, 0)),
1478        ) as ArrayRef;
1479        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1480        pw.write(&to_write).await?;
1481        let res = pw.close().await?;
1482        assert_eq!(res.len(), 1);
1483        let data_file = res
1484            .into_iter()
1485            .next()
1486            .unwrap()
1487            .content(crate::spec::DataContentType::Data)
1488            .partition(Struct::empty())
1489            .partition_spec_id(0)
1490            .build()
1491            .unwrap();
1492        assert_eq!(
1493            data_file.upper_bounds().get(&0),
1494            Some(Datum::decimal(decimal_max).unwrap()).as_ref()
1495        );
1496        assert_eq!(
1497            data_file.lower_bounds().get(&0),
1498            Some(Datum::decimal(decimal_min).unwrap()).as_ref()
1499        );
1500
1501        // test max and min for scale 38
1502        // # TODO
1503        // Readd this case after resolve https://github.com/apache/iceberg-rust/issues/669
1504        // let schema = Arc::new(
1505        //     Schema::builder()
1506        //         .with_fields(vec![NestedField::optional(
1507        //             0,
1508        //             "decimal",
1509        //             Type::Primitive(PrimitiveType::Decimal {
1510        //                 precision: 38,
1511        //                 scale: 0,
1512        //             }),
1513        //         )
1514        //         .into()])
1515        //         .build()
1516        //         .unwrap(),
1517        // );
1518        // let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1519        // let mut pw = ParquetWriterBuilder::new(
1520        //     WriterProperties::builder().build(),
1521        //     schema,
1522        //     file_io.clone(),
1523        //     loccation_gen,
1524        //     file_name_gen,
1525        // )
1526        // .build()
1527        // .await?;
1528        // let col0 = Arc::new(
1529        //     Decimal128Array::from(vec![
1530        //         Some(99999999999999999999999999999999999999_i128),
1531        //         Some(-99999999999999999999999999999999999999_i128),
1532        //     ])
1533        //     .with_data_type(DataType::Decimal128(38, 0)),
1534        // ) as ArrayRef;
1535        // let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1536        // pw.write(&to_write).await?;
1537        // let res = pw.close().await?;
1538        // assert_eq!(res.len(), 1);
1539        // let data_file = res
1540        //     .into_iter()
1541        //     .next()
1542        //     .unwrap()
1543        //     .content(crate::spec::DataContentType::Data)
1544        //     .partition(Struct::empty())
1545        //     .build()
1546        //     .unwrap();
1547        // assert_eq!(
1548        //     data_file.upper_bounds().get(&0),
1549        //     Some(Datum::new(
1550        //         PrimitiveType::Decimal {
1551        //             precision: 38,
1552        //             scale: 0
1553        //         },
1554        //         PrimitiveLiteral::Int128(99999999999999999999999999999999999999_i128)
1555        //     ))
1556        //     .as_ref()
1557        // );
1558        // assert_eq!(
1559        //     data_file.lower_bounds().get(&0),
1560        //     Some(Datum::new(
1561        //         PrimitiveType::Decimal {
1562        //             precision: 38,
1563        //             scale: 0
1564        //         },
1565        //         PrimitiveLiteral::Int128(-99999999999999999999999999999999999999_i128)
1566        //     ))
1567        //     .as_ref()
1568        // );
1569
1570        Ok(())
1571    }
1572
1573    #[tokio::test]
1574    async fn test_empty_write() -> Result<()> {
1575        let temp_dir = TempDir::new().unwrap();
1576        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1577        let location_gen = DefaultLocationGenerator::with_data_location(
1578            temp_dir.path().to_str().unwrap().to_string(),
1579        );
1580        let file_name_gen =
1581            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1582
1583        // Test that file will create if data to write
1584        let schema = {
1585            let fields = vec![
1586                arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true).with_metadata(
1587                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1588                ),
1589            ];
1590            Arc::new(arrow_schema::Schema::new(fields))
1591        };
1592        let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
1593        let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
1594        let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name());
1595        let output_file = file_io.new_output(&file_path)?;
1596        let mut pw = ParquetWriterBuilder::new(
1597            WriterProperties::builder().build(),
1598            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1599        )
1600        .build(output_file)
1601        .await?;
1602        pw.write(&to_write).await?;
1603        pw.close().await.unwrap();
1604        assert!(file_io.exists(&file_path).await.unwrap());
1605
1606        // Test that file will not create if no data to write
1607        let file_name_gen =
1608            DefaultFileNameGenerator::new("test_empty".to_string(), None, DataFileFormat::Parquet);
1609        let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name());
1610        let output_file = file_io.new_output(&file_path)?;
1611        let pw = ParquetWriterBuilder::new(
1612            WriterProperties::builder().build(),
1613            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1614        )
1615        .build(output_file)
1616        .await?;
1617        pw.close().await.unwrap();
1618        assert!(!file_io.exists(&file_path).await.unwrap());
1619
1620        Ok(())
1621    }
1622
1623    #[tokio::test]
1624    async fn test_nan_val_cnts_primitive_type() -> Result<()> {
1625        let temp_dir = TempDir::new().unwrap();
1626        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1627        let location_gen = DefaultLocationGenerator::with_data_location(
1628            temp_dir.path().to_str().unwrap().to_string(),
1629        );
1630        let file_name_gen =
1631            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1632
1633        // prepare data
1634        let arrow_schema = {
1635            let fields = vec![
1636                Field::new("col", arrow_schema::DataType::Float32, false).with_metadata(
1637                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1638                ),
1639                Field::new("col2", arrow_schema::DataType::Float64, false).with_metadata(
1640                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1641                ),
1642            ];
1643            Arc::new(arrow_schema::Schema::new(fields))
1644        };
1645
1646        let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1647            [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1648            None,
1649        )) as ArrayRef;
1650
1651        let float_64_col = Arc::new(Float64Array::from_iter_values_with_nulls(
1652            [1.0_f64, f64::NAN, 2.0, 2.0].into_iter(),
1653            None,
1654        )) as ArrayRef;
1655
1656        let to_write =
1657            RecordBatch::try_new(arrow_schema.clone(), vec![float_32_col, float_64_col]).unwrap();
1658        let output_file = file_io.new_output(
1659            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1660        )?;
1661
1662        // write data
1663        let mut pw = ParquetWriterBuilder::new(
1664            WriterProperties::builder().build(),
1665            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1666        )
1667        .build(output_file)
1668        .await?;
1669
1670        pw.write(&to_write).await?;
1671        let res = pw.close().await?;
1672        assert_eq!(res.len(), 1);
1673        let data_file = res
1674            .into_iter()
1675            .next()
1676            .unwrap()
1677            // Put dummy field for build successfully.
1678            .content(crate::spec::DataContentType::Data)
1679            .partition(Struct::empty())
1680            .partition_spec_id(0)
1681            .build()
1682            .unwrap();
1683
1684        // check data file
1685        assert_eq!(data_file.record_count(), 4);
1686        assert_eq!(*data_file.value_counts(), HashMap::from([(0, 4), (1, 4)]));
1687        assert_eq!(
1688            *data_file.lower_bounds(),
1689            HashMap::from([(0, Datum::float(1.0)), (1, Datum::double(1.0)),])
1690        );
1691        assert_eq!(
1692            *data_file.upper_bounds(),
1693            HashMap::from([(0, Datum::float(2.0)), (1, Datum::double(2.0)),])
1694        );
1695        assert_eq!(
1696            *data_file.null_value_counts(),
1697            HashMap::from([(0, 0), (1, 0)])
1698        );
1699        assert_eq!(
1700            *data_file.nan_value_counts(),
1701            HashMap::from([(0, 1), (1, 1)])
1702        );
1703
1704        // check the written file
1705        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1706        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1707
1708        Ok(())
1709    }
1710
1711    #[tokio::test]
1712    async fn test_nan_val_cnts_struct_type() -> Result<()> {
1713        let temp_dir = TempDir::new().unwrap();
1714        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1715        let location_gen = DefaultLocationGenerator::with_data_location(
1716            temp_dir.path().to_str().unwrap().to_string(),
1717        );
1718        let file_name_gen =
1719            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1720
1721        let schema_struct_float_fields = Fields::from(vec![
1722            Field::new("col4", DataType::Float32, false).with_metadata(HashMap::from([(
1723                PARQUET_FIELD_ID_META_KEY.to_string(),
1724                "4".to_string(),
1725            )])),
1726        ]);
1727
1728        let schema_struct_nested_float_fields = Fields::from(vec![
1729            Field::new("col7", DataType::Float32, false).with_metadata(HashMap::from([(
1730                PARQUET_FIELD_ID_META_KEY.to_string(),
1731                "7".to_string(),
1732            )])),
1733        ]);
1734
1735        let schema_struct_nested_fields = Fields::from(vec![
1736            Field::new(
1737                "col6",
1738                arrow_schema::DataType::Struct(schema_struct_nested_float_fields.clone()),
1739                false,
1740            )
1741            .with_metadata(HashMap::from([(
1742                PARQUET_FIELD_ID_META_KEY.to_string(),
1743                "6".to_string(),
1744            )])),
1745        ]);
1746
1747        // prepare data
1748        let arrow_schema = {
1749            let fields = vec![
1750                Field::new(
1751                    "col3",
1752                    arrow_schema::DataType::Struct(schema_struct_float_fields.clone()),
1753                    false,
1754                )
1755                .with_metadata(HashMap::from([(
1756                    PARQUET_FIELD_ID_META_KEY.to_string(),
1757                    "3".to_string(),
1758                )])),
1759                Field::new(
1760                    "col5",
1761                    arrow_schema::DataType::Struct(schema_struct_nested_fields.clone()),
1762                    false,
1763                )
1764                .with_metadata(HashMap::from([(
1765                    PARQUET_FIELD_ID_META_KEY.to_string(),
1766                    "5".to_string(),
1767                )])),
1768            ];
1769            Arc::new(arrow_schema::Schema::new(fields))
1770        };
1771
1772        let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1773            [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1774            None,
1775        )) as ArrayRef;
1776
1777        let struct_float_field_col = Arc::new(StructArray::new(
1778            schema_struct_float_fields,
1779            vec![float_32_col.clone()],
1780            None,
1781        )) as ArrayRef;
1782
1783        let struct_nested_float_field_col = Arc::new(StructArray::new(
1784            schema_struct_nested_fields,
1785            vec![Arc::new(StructArray::new(
1786                schema_struct_nested_float_fields,
1787                vec![float_32_col.clone()],
1788                None,
1789            )) as ArrayRef],
1790            None,
1791        )) as ArrayRef;
1792
1793        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1794            struct_float_field_col,
1795            struct_nested_float_field_col,
1796        ])
1797        .unwrap();
1798        let output_file = file_io.new_output(
1799            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1800        )?;
1801
1802        // write data
1803        let mut pw = ParquetWriterBuilder::new(
1804            WriterProperties::builder().build(),
1805            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1806        )
1807        .build(output_file)
1808        .await?;
1809
1810        pw.write(&to_write).await?;
1811        let res = pw.close().await?;
1812        assert_eq!(res.len(), 1);
1813        let data_file = res
1814            .into_iter()
1815            .next()
1816            .unwrap()
1817            // Put dummy field for build successfully.
1818            .content(crate::spec::DataContentType::Data)
1819            .partition(Struct::empty())
1820            .partition_spec_id(0)
1821            .build()
1822            .unwrap();
1823
1824        // check data file
1825        assert_eq!(data_file.record_count(), 4);
1826        assert_eq!(*data_file.value_counts(), HashMap::from([(4, 4), (7, 4)]));
1827        assert_eq!(
1828            *data_file.lower_bounds(),
1829            HashMap::from([(4, Datum::float(1.0)), (7, Datum::float(1.0)),])
1830        );
1831        assert_eq!(
1832            *data_file.upper_bounds(),
1833            HashMap::from([(4, Datum::float(2.0)), (7, Datum::float(2.0)),])
1834        );
1835        assert_eq!(
1836            *data_file.null_value_counts(),
1837            HashMap::from([(4, 0), (7, 0)])
1838        );
1839        assert_eq!(
1840            *data_file.nan_value_counts(),
1841            HashMap::from([(4, 1), (7, 1)])
1842        );
1843
1844        // check the written file
1845        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1846        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1847
1848        Ok(())
1849    }
1850
1851    #[tokio::test]
1852    async fn test_nan_val_cnts_list_type() -> Result<()> {
1853        let temp_dir = TempDir::new().unwrap();
1854        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1855        let location_gen = DefaultLocationGenerator::with_data_location(
1856            temp_dir.path().to_str().unwrap().to_string(),
1857        );
1858        let file_name_gen =
1859            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1860
1861        let schema_list_float_field = Field::new("element", DataType::Float32, true).with_metadata(
1862            HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1863        );
1864
1865        let schema_struct_list_float_field = Field::new("element", DataType::Float32, true)
1866            .with_metadata(HashMap::from([(
1867                PARQUET_FIELD_ID_META_KEY.to_string(),
1868                "4".to_string(),
1869            )]));
1870
1871        let schema_struct_list_field = Fields::from(vec![
1872            Field::new_list("col2", schema_struct_list_float_field.clone(), true).with_metadata(
1873                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
1874            ),
1875        ]);
1876
1877        let arrow_schema = {
1878            let fields = vec![
1879                Field::new_list("col0", schema_list_float_field.clone(), true).with_metadata(
1880                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1881                ),
1882                Field::new_struct("col1", schema_struct_list_field.clone(), true)
1883                    .with_metadata(HashMap::from([(
1884                        PARQUET_FIELD_ID_META_KEY.to_string(),
1885                        "2".to_string(),
1886                    )]))
1887                    .clone(),
1888                // Field::new_large_list("col3", schema_large_list_float_field.clone(), true).with_metadata(
1889                //     HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "5".to_string())]),
1890                // ).clone(),
1891            ];
1892            Arc::new(arrow_schema::Schema::new(fields))
1893        };
1894
1895        let list_parts = ListArray::from_iter_primitive::<Float32Type, _, _>(vec![Some(vec![
1896            Some(1.0_f32),
1897            Some(f32::NAN),
1898            Some(2.0),
1899            Some(2.0),
1900        ])])
1901        .into_parts();
1902
1903        let list_float_field_col = Arc::new({
1904            let list_parts = list_parts.clone();
1905            ListArray::new(
1906                {
1907                    if let DataType::List(field) = arrow_schema.field(0).data_type() {
1908                        field.clone()
1909                    } else {
1910                        unreachable!()
1911                    }
1912                },
1913                list_parts.1,
1914                list_parts.2,
1915                list_parts.3,
1916            )
1917        }) as ArrayRef;
1918
1919        let struct_list_fields_schema =
1920            if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
1921                fields.clone()
1922            } else {
1923                unreachable!()
1924            };
1925
1926        let struct_list_float_field_col = Arc::new({
1927            ListArray::new(
1928                {
1929                    if let DataType::List(field) = struct_list_fields_schema
1930                        .first()
1931                        .expect("could not find first list field")
1932                        .data_type()
1933                    {
1934                        field.clone()
1935                    } else {
1936                        unreachable!()
1937                    }
1938                },
1939                list_parts.1,
1940                list_parts.2,
1941                list_parts.3,
1942            )
1943        }) as ArrayRef;
1944
1945        let struct_list_float_field_col = Arc::new(StructArray::new(
1946            struct_list_fields_schema,
1947            vec![struct_list_float_field_col.clone()],
1948            None,
1949        )) as ArrayRef;
1950
1951        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1952            list_float_field_col,
1953            struct_list_float_field_col,
1954            // large_list_float_field_col,
1955        ])
1956        .expect("Could not form record batch");
1957        let output_file = file_io.new_output(
1958            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1959        )?;
1960
1961        // write data
1962        let mut pw = ParquetWriterBuilder::new(
1963            WriterProperties::builder().build(),
1964            Arc::new(
1965                to_write
1966                    .schema()
1967                    .as_ref()
1968                    .try_into()
1969                    .expect("Could not convert iceberg schema"),
1970            ),
1971        )
1972        .build(output_file)
1973        .await?;
1974
1975        pw.write(&to_write).await?;
1976        let res = pw.close().await?;
1977        assert_eq!(res.len(), 1);
1978        let data_file = res
1979            .into_iter()
1980            .next()
1981            .unwrap()
1982            .content(crate::spec::DataContentType::Data)
1983            .partition(Struct::empty())
1984            .partition_spec_id(0)
1985            .build()
1986            .unwrap();
1987
1988        // check data file
1989        assert_eq!(data_file.record_count(), 1);
1990        assert_eq!(*data_file.value_counts(), HashMap::from([(1, 4), (4, 4)]));
1991        assert_eq!(
1992            *data_file.lower_bounds(),
1993            HashMap::from([(1, Datum::float(1.0)), (4, Datum::float(1.0))])
1994        );
1995        assert_eq!(
1996            *data_file.upper_bounds(),
1997            HashMap::from([(1, Datum::float(2.0)), (4, Datum::float(2.0))])
1998        );
1999        assert_eq!(
2000            *data_file.null_value_counts(),
2001            HashMap::from([(1, 0), (4, 0)])
2002        );
2003        assert_eq!(
2004            *data_file.nan_value_counts(),
2005            HashMap::from([(1, 1), (4, 1)])
2006        );
2007
2008        // check the written file
2009        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2010        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2011
2012        Ok(())
2013    }
2014
2015    macro_rules! construct_map_arr {
2016        ($map_key_field_schema:ident, $map_value_field_schema:ident) => {{
2017            let int_builder = Int32Builder::new();
2018            let float_builder = Float32Builder::with_capacity(4);
2019            let mut builder = MapBuilder::new(None, int_builder, float_builder);
2020            builder.keys().append_value(1);
2021            builder.values().append_value(1.0_f32);
2022            builder.append(true).unwrap();
2023            builder.keys().append_value(2);
2024            builder.values().append_value(f32::NAN);
2025            builder.append(true).unwrap();
2026            builder.keys().append_value(3);
2027            builder.values().append_value(2.0);
2028            builder.append(true).unwrap();
2029            builder.keys().append_value(4);
2030            builder.values().append_value(2.0);
2031            builder.append(true).unwrap();
2032            let array = builder.finish();
2033
2034            let (_field, offsets, entries, nulls, ordered) = array.into_parts();
2035            let new_struct_fields_schema =
2036                Fields::from(vec![$map_key_field_schema, $map_value_field_schema]);
2037
2038            let entries = {
2039                let (_, arrays, nulls) = entries.into_parts();
2040                StructArray::new(new_struct_fields_schema.clone(), arrays, nulls)
2041            };
2042
2043            let field = Arc::new(Field::new(
2044                DEFAULT_MAP_FIELD_NAME,
2045                DataType::Struct(new_struct_fields_schema),
2046                false,
2047            ));
2048
2049            Arc::new(MapArray::new(field, offsets, entries, nulls, ordered))
2050        }};
2051    }
2052
2053    #[tokio::test]
2054    async fn test_nan_val_cnts_map_type() -> Result<()> {
2055        let temp_dir = TempDir::new().unwrap();
2056        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2057        let location_gen = DefaultLocationGenerator::with_data_location(
2058            temp_dir.path().to_str().unwrap().to_string(),
2059        );
2060        let file_name_gen =
2061            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2062
2063        let map_key_field_schema =
2064            Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2065                (PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()),
2066            ]));
2067
2068        let map_value_field_schema =
2069            Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2070                [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
2071            ));
2072
2073        let struct_map_key_field_schema =
2074            Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2075                (PARQUET_FIELD_ID_META_KEY.to_string(), "6".to_string()),
2076            ]));
2077
2078        let struct_map_value_field_schema =
2079            Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2080                [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())],
2081            ));
2082
2083        let schema_struct_map_field = Fields::from(vec![
2084            Field::new_map(
2085                "col3",
2086                DEFAULT_MAP_FIELD_NAME,
2087                struct_map_key_field_schema.clone(),
2088                struct_map_value_field_schema.clone(),
2089                false,
2090                false,
2091            )
2092            .with_metadata(HashMap::from([(
2093                PARQUET_FIELD_ID_META_KEY.to_string(),
2094                "5".to_string(),
2095            )])),
2096        ]);
2097
2098        let arrow_schema = {
2099            let fields = vec![
2100                Field::new_map(
2101                    "col0",
2102                    DEFAULT_MAP_FIELD_NAME,
2103                    map_key_field_schema.clone(),
2104                    map_value_field_schema.clone(),
2105                    false,
2106                    false,
2107                )
2108                .with_metadata(HashMap::from([(
2109                    PARQUET_FIELD_ID_META_KEY.to_string(),
2110                    "0".to_string(),
2111                )])),
2112                Field::new_struct("col1", schema_struct_map_field.clone(), true)
2113                    .with_metadata(HashMap::from([(
2114                        PARQUET_FIELD_ID_META_KEY.to_string(),
2115                        "3".to_string(),
2116                    )]))
2117                    .clone(),
2118            ];
2119            Arc::new(arrow_schema::Schema::new(fields))
2120        };
2121
2122        let map_array = construct_map_arr!(map_key_field_schema, map_value_field_schema);
2123
2124        let struct_map_arr =
2125            construct_map_arr!(struct_map_key_field_schema, struct_map_value_field_schema);
2126
2127        let struct_list_float_field_col = Arc::new(StructArray::new(
2128            schema_struct_map_field,
2129            vec![struct_map_arr],
2130            None,
2131        )) as ArrayRef;
2132
2133        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
2134            map_array,
2135            struct_list_float_field_col,
2136        ])
2137        .expect("Could not form record batch");
2138        let output_file = file_io.new_output(
2139            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
2140        )?;
2141
2142        // write data
2143        let mut pw = ParquetWriterBuilder::new(
2144            WriterProperties::builder().build(),
2145            Arc::new(
2146                to_write
2147                    .schema()
2148                    .as_ref()
2149                    .try_into()
2150                    .expect("Could not convert iceberg schema"),
2151            ),
2152        )
2153        .build(output_file)
2154        .await?;
2155
2156        pw.write(&to_write).await?;
2157        let res = pw.close().await?;
2158        assert_eq!(res.len(), 1);
2159        let data_file = res
2160            .into_iter()
2161            .next()
2162            .unwrap()
2163            .content(crate::spec::DataContentType::Data)
2164            .partition(Struct::empty())
2165            .partition_spec_id(0)
2166            .build()
2167            .unwrap();
2168
2169        // check data file
2170        assert_eq!(data_file.record_count(), 4);
2171        assert_eq!(
2172            *data_file.value_counts(),
2173            HashMap::from([(1, 4), (2, 4), (6, 4), (7, 4)])
2174        );
2175        assert_eq!(
2176            *data_file.lower_bounds(),
2177            HashMap::from([
2178                (1, Datum::int(1)),
2179                (2, Datum::float(1.0)),
2180                (6, Datum::int(1)),
2181                (7, Datum::float(1.0))
2182            ])
2183        );
2184        assert_eq!(
2185            *data_file.upper_bounds(),
2186            HashMap::from([
2187                (1, Datum::int(4)),
2188                (2, Datum::float(2.0)),
2189                (6, Datum::int(4)),
2190                (7, Datum::float(2.0))
2191            ])
2192        );
2193        assert_eq!(
2194            *data_file.null_value_counts(),
2195            HashMap::from([(1, 0), (2, 0), (6, 0), (7, 0)])
2196        );
2197        assert_eq!(
2198            *data_file.nan_value_counts(),
2199            HashMap::from([(2, 1), (7, 1)])
2200        );
2201
2202        // check the written file
2203        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2204        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2205
2206        Ok(())
2207    }
2208
2209    #[tokio::test]
2210    async fn test_write_empty_parquet_file() {
2211        let temp_dir = TempDir::new().unwrap();
2212        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2213        let location_gen = DefaultLocationGenerator::with_data_location(
2214            temp_dir.path().to_str().unwrap().to_string(),
2215        );
2216        let file_name_gen =
2217            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2218        let output_file = file_io
2219            .new_output(location_gen.generate_location(None, &file_name_gen.generate_file_name()))
2220            .unwrap();
2221
2222        // write data
2223        let pw = ParquetWriterBuilder::new(
2224            WriterProperties::builder().build(),
2225            Arc::new(
2226                Schema::builder()
2227                    .with_schema_id(1)
2228                    .with_fields(vec![
2229                        NestedField::required(0, "col", Type::Primitive(PrimitiveType::Long))
2230                            .with_id(0)
2231                            .into(),
2232                    ])
2233                    .build()
2234                    .expect("Failed to create schema"),
2235            ),
2236        )
2237        .build(output_file)
2238        .await
2239        .unwrap();
2240
2241        let res = pw.close().await.unwrap();
2242        assert_eq!(res.len(), 0);
2243
2244        // Check that file should have been deleted.
2245        assert_eq!(std::fs::read_dir(temp_dir.path()).unwrap().count(), 0);
2246    }
2247
2248    #[test]
2249    fn test_min_max_aggregator() {
2250        let schema = Arc::new(
2251            Schema::builder()
2252                .with_schema_id(1)
2253                .with_fields(vec![
2254                    NestedField::required(0, "col", Type::Primitive(PrimitiveType::Int))
2255                        .with_id(0)
2256                        .into(),
2257                ])
2258                .build()
2259                .expect("Failed to create schema"),
2260        );
2261        let mut min_max_agg = MinMaxColAggregator::new(schema);
2262        let create_statistics =
2263            |min, max| Statistics::Int32(ValueStatistics::new(min, max, None, None, false));
2264        min_max_agg
2265            .update(0, create_statistics(None, Some(42)))
2266            .unwrap();
2267        min_max_agg
2268            .update(0, create_statistics(Some(0), Some(i32::MAX)))
2269            .unwrap();
2270        min_max_agg
2271            .update(0, create_statistics(Some(i32::MIN), None))
2272            .unwrap();
2273        min_max_agg
2274            .update(0, create_statistics(None, None))
2275            .unwrap();
2276
2277        let (lower_bounds, upper_bounds) = min_max_agg.produce();
2278
2279        assert_eq!(lower_bounds, HashMap::from([(0, Datum::int(i32::MIN))]));
2280        assert_eq!(upper_bounds, HashMap::from([(0, Datum::int(i32::MAX))]));
2281    }
2282}