iceberg/writer/file_writer/
parquet_writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The module contains the file writer for parquet file format.
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use arrow_schema::SchemaRef as ArrowSchemaRef;
24use bytes::Bytes;
25use futures::future::BoxFuture;
26use itertools::Itertools;
27use parquet::arrow::AsyncArrowWriter;
28use parquet::arrow::async_reader::AsyncFileReader;
29use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter;
30use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
31use parquet::file::properties::WriterProperties;
32use parquet::file::statistics::Statistics;
33use parquet::format::FileMetaData;
34use parquet::thrift::{TCompactOutputProtocol, TSerializable};
35use thrift::protocol::TOutputProtocol;
36
37use super::{FileWriter, FileWriterBuilder};
38use crate::arrow::{
39    ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor,
40    get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum,
41};
42use crate::io::{FileIO, FileWrite, OutputFile};
43use crate::spec::{
44    DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType,
45    NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct,
46    StructType, TableMetadata, Type, visit_schema,
47};
48use crate::transform::create_transform_function;
49use crate::writer::{CurrentFileStatus, DataFile};
50use crate::{Error, ErrorKind, Result};
51
52/// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
53#[derive(Clone, Debug)]
54pub struct ParquetWriterBuilder {
55    props: WriterProperties,
56    schema: SchemaRef,
57    match_mode: FieldMatchMode,
58}
59
60impl ParquetWriterBuilder {
61    /// Create a new `ParquetWriterBuilder`
62    /// To construct the write result, the schema should contain the `PARQUET_FIELD_ID_META_KEY` metadata for each field.
63    pub fn new(props: WriterProperties, schema: SchemaRef) -> Self {
64        Self::new_with_match_mode(props, schema, FieldMatchMode::Id)
65    }
66
67    /// Create a new `ParquetWriterBuilder` with custom match mode
68    pub fn new_with_match_mode(
69        props: WriterProperties,
70        schema: SchemaRef,
71        match_mode: FieldMatchMode,
72    ) -> Self {
73        Self {
74            props,
75            schema,
76            match_mode,
77        }
78    }
79}
80
81impl FileWriterBuilder for ParquetWriterBuilder {
82    type R = ParquetWriter;
83
84    async fn build(&self, output_file: OutputFile) -> Result<Self::R> {
85        Ok(ParquetWriter {
86            schema: self.schema.clone(),
87            inner_writer: None,
88            writer_properties: self.props.clone(),
89            current_row_num: 0,
90            output_file,
91            nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode),
92        })
93    }
94}
95
96/// A mapping from Parquet column path names to internal field id
97struct IndexByParquetPathName {
98    name_to_id: HashMap<String, i32>,
99
100    field_names: Vec<String>,
101
102    field_id: i32,
103}
104
105impl IndexByParquetPathName {
106    /// Creates a new, empty `IndexByParquetPathName`
107    pub fn new() -> Self {
108        Self {
109            name_to_id: HashMap::new(),
110            field_names: Vec::new(),
111            field_id: 0,
112        }
113    }
114
115    /// Retrieves the internal field ID
116    pub fn get(&self, name: &str) -> Option<&i32> {
117        self.name_to_id.get(name)
118    }
119}
120
121impl Default for IndexByParquetPathName {
122    fn default() -> Self {
123        Self::new()
124    }
125}
126
127impl SchemaVisitor for IndexByParquetPathName {
128    type T = ();
129
130    fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> {
131        self.field_names.push(field.name.to_string());
132        self.field_id = field.id;
133        Ok(())
134    }
135
136    fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> {
137        self.field_names.pop();
138        Ok(())
139    }
140
141    fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> {
142        self.field_names.push(format!("list.{}", field.name));
143        self.field_id = field.id;
144        Ok(())
145    }
146
147    fn after_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> {
148        self.field_names.pop();
149        Ok(())
150    }
151
152    fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> {
153        self.field_names
154            .push(format!("{DEFAULT_MAP_FIELD_NAME}.key"));
155        self.field_id = field.id;
156        Ok(())
157    }
158
159    fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> {
160        self.field_names.pop();
161        Ok(())
162    }
163
164    fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> {
165        self.field_names
166            .push(format!("{DEFAULT_MAP_FIELD_NAME}.value"));
167        self.field_id = field.id;
168        Ok(())
169    }
170
171    fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> {
172        self.field_names.pop();
173        Ok(())
174    }
175
176    fn schema(&mut self, _schema: &Schema, _value: Self::T) -> Result<Self::T> {
177        Ok(())
178    }
179
180    fn field(&mut self, _field: &NestedFieldRef, _value: Self::T) -> Result<Self::T> {
181        Ok(())
182    }
183
184    fn r#struct(&mut self, _struct: &StructType, _results: Vec<Self::T>) -> Result<Self::T> {
185        Ok(())
186    }
187
188    fn list(&mut self, _list: &ListType, _value: Self::T) -> Result<Self::T> {
189        Ok(())
190    }
191
192    fn map(&mut self, _map: &MapType, _key_value: Self::T, _value: Self::T) -> Result<Self::T> {
193        Ok(())
194    }
195
196    fn primitive(&mut self, _p: &PrimitiveType) -> Result<Self::T> {
197        let full_name = self.field_names.iter().map(String::as_str).join(".");
198        let field_id = self.field_id;
199        if let Some(existing_field_id) = self.name_to_id.get(full_name.as_str()) {
200            return Err(Error::new(
201                ErrorKind::DataInvalid,
202                format!(
203                    "Invalid schema: multiple fields for name {full_name}: {field_id} and {existing_field_id}"
204                ),
205            ));
206        } else {
207            self.name_to_id.insert(full_name, field_id);
208        }
209
210        Ok(())
211    }
212}
213
214/// `ParquetWriter`` is used to write arrow data into parquet file on storage.
215pub struct ParquetWriter {
216    schema: SchemaRef,
217    output_file: OutputFile,
218    inner_writer: Option<AsyncArrowWriter<AsyncFileWriter<Box<dyn FileWrite>>>>,
219    writer_properties: WriterProperties,
220    current_row_num: usize,
221    nan_value_count_visitor: NanValueCountVisitor,
222}
223
224/// Used to aggregate min and max value of each column.
225struct MinMaxColAggregator {
226    lower_bounds: HashMap<i32, Datum>,
227    upper_bounds: HashMap<i32, Datum>,
228    schema: SchemaRef,
229}
230
231impl MinMaxColAggregator {
232    /// Creates new and empty `MinMaxColAggregator`
233    fn new(schema: SchemaRef) -> Self {
234        Self {
235            lower_bounds: HashMap::new(),
236            upper_bounds: HashMap::new(),
237            schema,
238        }
239    }
240
241    fn update_state_min(&mut self, field_id: i32, datum: Datum) {
242        self.lower_bounds
243            .entry(field_id)
244            .and_modify(|e| {
245                if *e > datum {
246                    *e = datum.clone()
247                }
248            })
249            .or_insert(datum);
250    }
251
252    fn update_state_max(&mut self, field_id: i32, datum: Datum) {
253        self.upper_bounds
254            .entry(field_id)
255            .and_modify(|e| {
256                if *e < datum {
257                    *e = datum.clone()
258                }
259            })
260            .or_insert(datum);
261    }
262
263    /// Update statistics
264    fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> {
265        let Some(ty) = self
266            .schema
267            .field_by_id(field_id)
268            .map(|f| f.field_type.as_ref())
269        else {
270            // Following java implementation: https://github.com/apache/iceberg/blob/29a2c456353a6120b8c882ed2ab544975b168d7b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L163
271            // Ignore the field if it is not in schema.
272            return Ok(());
273        };
274        let Type::Primitive(ty) = ty.clone() else {
275            return Err(Error::new(
276                ErrorKind::Unexpected,
277                format!("Composed type {ty} is not supported for min max aggregation."),
278            ));
279        };
280
281        if value.min_is_exact() {
282            let Some(min_datum) = get_parquet_stat_min_as_datum(&ty, &value)? else {
283                return Err(Error::new(
284                    ErrorKind::Unexpected,
285                    format!("Statistics {value} is not match with field type {ty}."),
286                ));
287            };
288
289            self.update_state_min(field_id, min_datum);
290        }
291
292        if value.max_is_exact() {
293            let Some(max_datum) = get_parquet_stat_max_as_datum(&ty, &value)? else {
294                return Err(Error::new(
295                    ErrorKind::Unexpected,
296                    format!("Statistics {value} is not match with field type {ty}."),
297                ));
298            };
299
300            self.update_state_max(field_id, max_datum);
301        }
302
303        Ok(())
304    }
305
306    /// Returns lower and upper bounds
307    fn produce(self) -> (HashMap<i32, Datum>, HashMap<i32, Datum>) {
308        (self.lower_bounds, self.upper_bounds)
309    }
310}
311
312impl ParquetWriter {
313    /// Converts parquet files to data files
314    #[allow(dead_code)]
315    pub(crate) async fn parquet_files_to_data_files(
316        file_io: &FileIO,
317        file_paths: Vec<String>,
318        table_metadata: &TableMetadata,
319    ) -> Result<Vec<DataFile>> {
320        // TODO: support adding to partitioned table
321        let mut data_files: Vec<DataFile> = Vec::new();
322
323        for file_path in file_paths {
324            let input_file = file_io.new_input(&file_path)?;
325            let file_metadata = input_file.metadata().await?;
326            let file_size_in_bytes = file_metadata.size as usize;
327            let reader = input_file.reader().await?;
328
329            let mut parquet_reader = ArrowFileReader::new(file_metadata, reader);
330            let parquet_metadata = parquet_reader.get_metadata(None).await.map_err(|err| {
331                Error::new(
332                    ErrorKind::DataInvalid,
333                    format!("Error reading Parquet metadata: {err}"),
334                )
335            })?;
336            let mut builder = ParquetWriter::parquet_to_data_file_builder(
337                table_metadata.current_schema().clone(),
338                parquet_metadata,
339                file_size_in_bytes,
340                file_path,
341                // TODO: Implement nan_value_counts here
342                HashMap::new(),
343            )?;
344            builder.partition_spec_id(table_metadata.default_partition_spec_id());
345            let data_file = builder.build().unwrap();
346            data_files.push(data_file);
347        }
348
349        Ok(data_files)
350    }
351
352    fn thrift_to_parquet_metadata(&self, file_metadata: FileMetaData) -> Result<ParquetMetaData> {
353        let mut buffer = Vec::new();
354        {
355            let mut protocol = TCompactOutputProtocol::new(&mut buffer);
356            file_metadata
357                .write_to_out_protocol(&mut protocol)
358                .map_err(|err| {
359                    Error::new(ErrorKind::Unexpected, "Failed to write parquet metadata")
360                        .with_source(err)
361                })?;
362
363            protocol.flush().map_err(|err| {
364                Error::new(ErrorKind::Unexpected, "Failed to flush protocol").with_source(err)
365            })?;
366        }
367
368        let parquet_metadata = ParquetMetaDataReader::decode_metadata(&buffer).map_err(|err| {
369            Error::new(ErrorKind::Unexpected, "Failed to decode parquet metadata").with_source(err)
370        })?;
371
372        Ok(parquet_metadata)
373    }
374
375    /// `ParquetMetadata` to data file builder
376    pub(crate) fn parquet_to_data_file_builder(
377        schema: SchemaRef,
378        metadata: Arc<ParquetMetaData>,
379        written_size: usize,
380        file_path: String,
381        nan_value_counts: HashMap<i32, u64>,
382    ) -> Result<DataFileBuilder> {
383        let index_by_parquet_path = {
384            let mut visitor = IndexByParquetPathName::new();
385            visit_schema(&schema, &mut visitor)?;
386            visitor
387        };
388
389        let (column_sizes, value_counts, null_value_counts, (lower_bounds, upper_bounds)) = {
390            let mut per_col_size: HashMap<i32, u64> = HashMap::new();
391            let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
392            let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
393            let mut min_max_agg = MinMaxColAggregator::new(schema);
394
395            for row_group in metadata.row_groups() {
396                for column_chunk_metadata in row_group.columns() {
397                    let parquet_path = column_chunk_metadata.column_descr().path().string();
398
399                    let Some(&field_id) = index_by_parquet_path.get(&parquet_path) else {
400                        continue;
401                    };
402
403                    *per_col_size.entry(field_id).or_insert(0) +=
404                        column_chunk_metadata.compressed_size() as u64;
405                    *per_col_val_num.entry(field_id).or_insert(0) +=
406                        column_chunk_metadata.num_values() as u64;
407
408                    if let Some(statistics) = column_chunk_metadata.statistics() {
409                        if let Some(null_count) = statistics.null_count_opt() {
410                            *per_col_null_val_num.entry(field_id).or_insert(0) += null_count;
411                        }
412
413                        min_max_agg.update(field_id, statistics.clone())?;
414                    }
415                }
416            }
417            (
418                per_col_size,
419                per_col_val_num,
420                per_col_null_val_num,
421                min_max_agg.produce(),
422            )
423        };
424
425        let mut builder = DataFileBuilder::default();
426        builder
427            .content(DataContentType::Data)
428            .file_path(file_path)
429            .file_format(DataFileFormat::Parquet)
430            .partition(Struct::empty())
431            .record_count(metadata.file_metadata().num_rows() as u64)
432            .file_size_in_bytes(written_size as u64)
433            .column_sizes(column_sizes)
434            .value_counts(value_counts)
435            .null_value_counts(null_value_counts)
436            .nan_value_counts(nan_value_counts)
437            // # NOTE:
438            // - We can ignore implementing distinct_counts due to this: https://lists.apache.org/thread/j52tsojv0x4bopxyzsp7m7bqt23n5fnd
439            .lower_bounds(lower_bounds)
440            .upper_bounds(upper_bounds)
441            .split_offsets(
442                metadata
443                    .row_groups()
444                    .iter()
445                    .filter_map(|group| group.file_offset())
446                    .collect(),
447            );
448
449        Ok(builder)
450    }
451
452    #[allow(dead_code)]
453    fn partition_value_from_bounds(
454        table_spec: Arc<PartitionSpec>,
455        lower_bounds: &HashMap<i32, Datum>,
456        upper_bounds: &HashMap<i32, Datum>,
457    ) -> Result<Struct> {
458        let mut partition_literals: Vec<Option<Literal>> = Vec::new();
459
460        for field in table_spec.fields() {
461            if let (Some(lower), Some(upper)) = (
462                lower_bounds.get(&field.source_id),
463                upper_bounds.get(&field.source_id),
464            ) {
465                if !field.transform.preserves_order() {
466                    return Err(Error::new(
467                        ErrorKind::DataInvalid,
468                        format!(
469                            "cannot infer partition value for non linear partition field (needs to preserve order): {} with transform {}",
470                            field.name, field.transform
471                        ),
472                    ));
473                }
474
475                if lower != upper {
476                    return Err(Error::new(
477                        ErrorKind::DataInvalid,
478                        format!(
479                            "multiple partition values for field {}: lower: {:?}, upper: {:?}",
480                            field.name, lower, upper
481                        ),
482                    ));
483                }
484
485                let transform_fn = create_transform_function(&field.transform)?;
486                let transform_literal =
487                    Literal::from(transform_fn.transform_literal_result(lower)?);
488
489                partition_literals.push(Some(transform_literal));
490            } else {
491                partition_literals.push(None);
492            }
493        }
494
495        let partition_struct = Struct::from_iter(partition_literals);
496
497        Ok(partition_struct)
498    }
499}
500
501impl FileWriter for ParquetWriter {
502    async fn write(&mut self, batch: &arrow_array::RecordBatch) -> Result<()> {
503        // Skip empty batch
504        if batch.num_rows() == 0 {
505            return Ok(());
506        }
507
508        self.current_row_num += batch.num_rows();
509
510        let batch_c = batch.clone();
511        self.nan_value_count_visitor
512            .compute(self.schema.clone(), batch_c)?;
513
514        // Lazy initialize the writer
515        let writer = if let Some(writer) = &mut self.inner_writer {
516            writer
517        } else {
518            let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
519            let inner_writer = self.output_file.writer().await?;
520            let async_writer = AsyncFileWriter::new(inner_writer);
521            let writer = AsyncArrowWriter::try_new(
522                async_writer,
523                arrow_schema.clone(),
524                Some(self.writer_properties.clone()),
525            )
526            .map_err(|err| {
527                Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.")
528                    .with_source(err)
529            })?;
530            self.inner_writer = Some(writer);
531            self.inner_writer.as_mut().unwrap()
532        };
533
534        writer.write(batch).await.map_err(|err| {
535            Error::new(
536                ErrorKind::Unexpected,
537                "Failed to write using parquet writer.",
538            )
539            .with_source(err)
540        })?;
541
542        Ok(())
543    }
544
545    async fn close(mut self) -> Result<Vec<DataFileBuilder>> {
546        let mut writer = match self.inner_writer.take() {
547            Some(writer) => writer,
548            None => return Ok(vec![]),
549        };
550
551        let metadata = writer.finish().await.map_err(|err| {
552            Error::new(ErrorKind::Unexpected, "Failed to finish parquet writer.").with_source(err)
553        })?;
554
555        let written_size = writer.bytes_written();
556
557        if self.current_row_num == 0 {
558            self.output_file.delete().await.map_err(|err| {
559                Error::new(
560                    ErrorKind::Unexpected,
561                    "Failed to delete empty parquet file.",
562                )
563                .with_source(err)
564            })?;
565            Ok(vec![])
566        } else {
567            let parquet_metadata =
568                Arc::new(self.thrift_to_parquet_metadata(metadata).map_err(|err| {
569                    Error::new(
570                        ErrorKind::Unexpected,
571                        "Failed to convert metadata from thrift to parquet.",
572                    )
573                    .with_source(err)
574                })?);
575
576            Ok(vec![Self::parquet_to_data_file_builder(
577                self.schema,
578                parquet_metadata,
579                written_size,
580                self.output_file.location().to_string(),
581                self.nan_value_count_visitor.nan_value_counts,
582            )?])
583        }
584    }
585}
586
587impl CurrentFileStatus for ParquetWriter {
588    fn current_file_path(&self) -> String {
589        self.output_file.location().to_string()
590    }
591
592    fn current_row_num(&self) -> usize {
593        self.current_row_num
594    }
595
596    fn current_written_size(&self) -> usize {
597        if let Some(inner) = self.inner_writer.as_ref() {
598            // inner/AsyncArrowWriter contains sync and async writers
599            // written size = bytes flushed to inner's async writer + bytes buffered in the inner's sync writer
600            inner.bytes_written() + inner.in_progress_size()
601        } else {
602            // inner writer is not initialized yet
603            0
604        }
605    }
606}
607
608/// AsyncFileWriter is a wrapper of FileWrite to make it compatible with tokio::io::AsyncWrite.
609///
610/// # NOTES
611///
612/// We keep this wrapper been used inside only.
613struct AsyncFileWriter<W: FileWrite>(W);
614
615impl<W: FileWrite> AsyncFileWriter<W> {
616    /// Create a new `AsyncFileWriter` with the given writer.
617    pub fn new(writer: W) -> Self {
618        Self(writer)
619    }
620}
621
622impl<W: FileWrite> ArrowAsyncFileWriter for AsyncFileWriter<W> {
623    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
624        Box::pin(async {
625            self.0
626                .write(bs)
627                .await
628                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
629        })
630    }
631
632    fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
633        Box::pin(async {
634            self.0
635                .close()
636                .await
637                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err)))
638        })
639    }
640}
641
642#[cfg(test)]
643mod tests {
644    use std::collections::HashMap;
645    use std::sync::Arc;
646
647    use anyhow::Result;
648    use arrow_array::builder::{Float32Builder, Int32Builder, MapBuilder};
649    use arrow_array::types::{Float32Type, Int64Type};
650    use arrow_array::{
651        Array, ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array,
652        Int64Array, ListArray, MapArray, RecordBatch, StructArray,
653    };
654    use arrow_schema::{DataType, Field, Fields, SchemaRef as ArrowSchemaRef};
655    use arrow_select::concat::concat_batches;
656    use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
657    use parquet::file::statistics::ValueStatistics;
658    use rust_decimal::Decimal;
659    use tempfile::TempDir;
660    use uuid::Uuid;
661
662    use super::*;
663    use crate::arrow::schema_to_arrow_schema;
664    use crate::io::FileIOBuilder;
665    use crate::spec::{PrimitiveLiteral, Struct, *};
666    use crate::writer::file_writer::location_generator::{
667        DefaultFileNameGenerator, DefaultLocationGenerator, FileNameGenerator, LocationGenerator,
668    };
669    use crate::writer::tests::check_parquet_data_file;
670
671    fn schema_for_all_type() -> Schema {
672        Schema::builder()
673            .with_schema_id(1)
674            .with_fields(vec![
675                NestedField::optional(0, "boolean", Type::Primitive(PrimitiveType::Boolean)).into(),
676                NestedField::optional(1, "int", Type::Primitive(PrimitiveType::Int)).into(),
677                NestedField::optional(2, "long", Type::Primitive(PrimitiveType::Long)).into(),
678                NestedField::optional(3, "float", Type::Primitive(PrimitiveType::Float)).into(),
679                NestedField::optional(4, "double", Type::Primitive(PrimitiveType::Double)).into(),
680                NestedField::optional(5, "string", Type::Primitive(PrimitiveType::String)).into(),
681                NestedField::optional(6, "binary", Type::Primitive(PrimitiveType::Binary)).into(),
682                NestedField::optional(7, "date", Type::Primitive(PrimitiveType::Date)).into(),
683                NestedField::optional(8, "time", Type::Primitive(PrimitiveType::Time)).into(),
684                NestedField::optional(9, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
685                    .into(),
686                NestedField::optional(
687                    10,
688                    "timestamptz",
689                    Type::Primitive(PrimitiveType::Timestamptz),
690                )
691                .into(),
692                NestedField::optional(
693                    11,
694                    "timestamp_ns",
695                    Type::Primitive(PrimitiveType::TimestampNs),
696                )
697                .into(),
698                NestedField::optional(
699                    12,
700                    "timestamptz_ns",
701                    Type::Primitive(PrimitiveType::TimestamptzNs),
702                )
703                .into(),
704                NestedField::optional(
705                    13,
706                    "decimal",
707                    Type::Primitive(PrimitiveType::Decimal {
708                        precision: 10,
709                        scale: 5,
710                    }),
711                )
712                .into(),
713                NestedField::optional(14, "uuid", Type::Primitive(PrimitiveType::Uuid)).into(),
714                NestedField::optional(15, "fixed", Type::Primitive(PrimitiveType::Fixed(10)))
715                    .into(),
716                // Parquet Statistics will use different representation for Decimal with precision 38 and scale 5,
717                // so we need to add a new field for it.
718                NestedField::optional(
719                    16,
720                    "decimal_38",
721                    Type::Primitive(PrimitiveType::Decimal {
722                        precision: 38,
723                        scale: 5,
724                    }),
725                )
726                .into(),
727            ])
728            .build()
729            .unwrap()
730    }
731
732    fn nested_schema_for_test() -> Schema {
733        // Int, Struct(Int,Int), String, List(Int), Struct(Struct(Int)), Map(String, List(Int))
734        Schema::builder()
735            .with_schema_id(1)
736            .with_fields(vec![
737                NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Long)).into(),
738                NestedField::required(
739                    1,
740                    "col1",
741                    Type::Struct(StructType::new(vec![
742                        NestedField::required(5, "col_1_5", Type::Primitive(PrimitiveType::Long))
743                            .into(),
744                        NestedField::required(6, "col_1_6", Type::Primitive(PrimitiveType::Long))
745                            .into(),
746                    ])),
747                )
748                .into(),
749                NestedField::required(2, "col2", Type::Primitive(PrimitiveType::String)).into(),
750                NestedField::required(
751                    3,
752                    "col3",
753                    Type::List(ListType::new(
754                        NestedField::required(7, "element", Type::Primitive(PrimitiveType::Long))
755                            .into(),
756                    )),
757                )
758                .into(),
759                NestedField::required(
760                    4,
761                    "col4",
762                    Type::Struct(StructType::new(vec![
763                        NestedField::required(
764                            8,
765                            "col_4_8",
766                            Type::Struct(StructType::new(vec![
767                                NestedField::required(
768                                    9,
769                                    "col_4_8_9",
770                                    Type::Primitive(PrimitiveType::Long),
771                                )
772                                .into(),
773                            ])),
774                        )
775                        .into(),
776                    ])),
777                )
778                .into(),
779                NestedField::required(
780                    10,
781                    "col5",
782                    Type::Map(MapType::new(
783                        NestedField::required(11, "key", Type::Primitive(PrimitiveType::String))
784                            .into(),
785                        NestedField::required(
786                            12,
787                            "value",
788                            Type::List(ListType::new(
789                                NestedField::required(
790                                    13,
791                                    "item",
792                                    Type::Primitive(PrimitiveType::Long),
793                                )
794                                .into(),
795                            )),
796                        )
797                        .into(),
798                    )),
799                )
800                .into(),
801            ])
802            .build()
803            .unwrap()
804    }
805
806    #[tokio::test]
807    async fn test_index_by_parquet_path() {
808        let expect = HashMap::from([
809            ("col0".to_string(), 0),
810            ("col1.col_1_5".to_string(), 5),
811            ("col1.col_1_6".to_string(), 6),
812            ("col2".to_string(), 2),
813            ("col3.list.element".to_string(), 7),
814            ("col4.col_4_8.col_4_8_9".to_string(), 9),
815            ("col5.key_value.key".to_string(), 11),
816            ("col5.key_value.value.list.item".to_string(), 13),
817        ]);
818        let mut visitor = IndexByParquetPathName::new();
819        visit_schema(&nested_schema_for_test(), &mut visitor).unwrap();
820        assert_eq!(visitor.name_to_id, expect);
821    }
822
823    #[tokio::test]
824    async fn test_parquet_writer() -> Result<()> {
825        let temp_dir = TempDir::new().unwrap();
826        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
827        let location_gen = DefaultLocationGenerator::with_data_location(
828            temp_dir.path().to_str().unwrap().to_string(),
829        );
830        let file_name_gen =
831            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
832
833        // prepare data
834        let schema = {
835            let fields =
836                vec![
837                    Field::new("col", DataType::Int64, true).with_metadata(HashMap::from([(
838                        PARQUET_FIELD_ID_META_KEY.to_string(),
839                        "0".to_string(),
840                    )])),
841                ];
842            Arc::new(arrow_schema::Schema::new(fields))
843        };
844        let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
845        let null_col = Arc::new(Int64Array::new_null(1024)) as ArrayRef;
846        let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
847        let to_write_null = RecordBatch::try_new(schema.clone(), vec![null_col]).unwrap();
848
849        let output_file = file_io.new_output(
850            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
851        )?;
852
853        // write data
854        let mut pw = ParquetWriterBuilder::new(
855            WriterProperties::builder()
856                .set_max_row_group_size(128)
857                .build(),
858            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
859        )
860        .build(output_file)
861        .await?;
862        pw.write(&to_write).await?;
863        pw.write(&to_write_null).await?;
864        let res = pw.close().await?;
865        assert_eq!(res.len(), 1);
866        let data_file = res
867            .into_iter()
868            .next()
869            .unwrap()
870            // Put dummy field for build successfully.
871            .content(DataContentType::Data)
872            .partition(Struct::empty())
873            .partition_spec_id(0)
874            .build()
875            .unwrap();
876
877        // check data file
878        assert_eq!(data_file.record_count(), 2048);
879        assert_eq!(*data_file.value_counts(), HashMap::from([(0, 2048)]));
880        assert_eq!(
881            *data_file.lower_bounds(),
882            HashMap::from([(0, Datum::long(0))])
883        );
884        assert_eq!(
885            *data_file.upper_bounds(),
886            HashMap::from([(0, Datum::long(1023))])
887        );
888        assert_eq!(*data_file.null_value_counts(), HashMap::from([(0, 1024)]));
889
890        // check the written file
891        let expect_batch = concat_batches(&schema, vec![&to_write, &to_write_null]).unwrap();
892        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
893
894        Ok(())
895    }
896
897    #[tokio::test]
898    async fn test_parquet_writer_with_complex_schema() -> Result<()> {
899        let temp_dir = TempDir::new().unwrap();
900        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
901        let location_gen = DefaultLocationGenerator::with_data_location(
902            temp_dir.path().to_str().unwrap().to_string(),
903        );
904        let file_name_gen =
905            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
906
907        // prepare data
908        let schema = nested_schema_for_test();
909        let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
910        let col0 = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
911        let col1 = Arc::new(StructArray::new(
912            {
913                if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
914                    fields.clone()
915                } else {
916                    unreachable!()
917                }
918            },
919            vec![
920                Arc::new(Int64Array::from_iter_values(0..1024)),
921                Arc::new(Int64Array::from_iter_values(0..1024)),
922            ],
923            None,
924        ));
925        let col2 = Arc::new(arrow_array::StringArray::from_iter_values(
926            (0..1024).map(|n| n.to_string()),
927        )) as ArrayRef;
928        let col3 = Arc::new({
929            let list_parts = arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(
930                (0..1024).map(|n| Some(vec![Some(n)])),
931            )
932            .into_parts();
933            arrow_array::ListArray::new(
934                {
935                    if let DataType::List(field) = arrow_schema.field(3).data_type() {
936                        field.clone()
937                    } else {
938                        unreachable!()
939                    }
940                },
941                list_parts.1,
942                list_parts.2,
943                list_parts.3,
944            )
945        }) as ArrayRef;
946        let col4 = Arc::new(StructArray::new(
947            {
948                if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
949                    fields.clone()
950                } else {
951                    unreachable!()
952                }
953            },
954            vec![Arc::new(StructArray::new(
955                {
956                    if let DataType::Struct(fields) = arrow_schema.field(4).data_type() {
957                        if let DataType::Struct(fields) = fields[0].data_type() {
958                            fields.clone()
959                        } else {
960                            unreachable!()
961                        }
962                    } else {
963                        unreachable!()
964                    }
965                },
966                vec![Arc::new(Int64Array::from_iter_values(0..1024))],
967                None,
968            ))],
969            None,
970        ));
971        let col5 = Arc::new({
972            let mut map_array_builder = MapBuilder::new(
973                None,
974                arrow_array::builder::StringBuilder::new(),
975                arrow_array::builder::ListBuilder::new(arrow_array::builder::PrimitiveBuilder::<
976                    Int64Type,
977                >::new()),
978            );
979            for i in 0..1024 {
980                map_array_builder.keys().append_value(i.to_string());
981                map_array_builder
982                    .values()
983                    .append_value(vec![Some(i as i64); i + 1]);
984                map_array_builder.append(true)?;
985            }
986            let (_, offset_buffer, struct_array, null_buffer, ordered) =
987                map_array_builder.finish().into_parts();
988            let struct_array = {
989                let (_, mut arrays, nulls) = struct_array.into_parts();
990                let list_array = {
991                    let list_array = arrays[1]
992                        .as_any()
993                        .downcast_ref::<ListArray>()
994                        .unwrap()
995                        .clone();
996                    let (_, offsets, array, nulls) = list_array.into_parts();
997                    let list_field = {
998                        if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
999                            if let DataType::Struct(fields) = map_field.data_type() {
1000                                if let DataType::List(list_field) = fields[1].data_type() {
1001                                    list_field.clone()
1002                                } else {
1003                                    unreachable!()
1004                                }
1005                            } else {
1006                                unreachable!()
1007                            }
1008                        } else {
1009                            unreachable!()
1010                        }
1011                    };
1012                    ListArray::new(list_field, offsets, array, nulls)
1013                };
1014                arrays[1] = Arc::new(list_array) as ArrayRef;
1015                StructArray::new(
1016                    {
1017                        if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1018                            if let DataType::Struct(fields) = map_field.data_type() {
1019                                fields.clone()
1020                            } else {
1021                                unreachable!()
1022                            }
1023                        } else {
1024                            unreachable!()
1025                        }
1026                    },
1027                    arrays,
1028                    nulls,
1029                )
1030            };
1031            arrow_array::MapArray::new(
1032                {
1033                    if let DataType::Map(map_field, _) = arrow_schema.field(5).data_type() {
1034                        map_field.clone()
1035                    } else {
1036                        unreachable!()
1037                    }
1038                },
1039                offset_buffer,
1040                struct_array,
1041                null_buffer,
1042                ordered,
1043            )
1044        }) as ArrayRef;
1045        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1046            col0, col1, col2, col3, col4, col5,
1047        ])
1048        .unwrap();
1049        let output_file = file_io.new_output(
1050            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1051        )?;
1052
1053        // write data
1054        let mut pw =
1055            ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
1056                .build(output_file)
1057                .await?;
1058        pw.write(&to_write).await?;
1059        let res = pw.close().await?;
1060        assert_eq!(res.len(), 1);
1061        let data_file = res
1062            .into_iter()
1063            .next()
1064            .unwrap()
1065            // Put dummy field for build successfully.
1066            .content(crate::spec::DataContentType::Data)
1067            .partition(Struct::empty())
1068            .partition_spec_id(0)
1069            .build()
1070            .unwrap();
1071
1072        // check data file
1073        assert_eq!(data_file.record_count(), 1024);
1074        assert_eq!(
1075            *data_file.value_counts(),
1076            HashMap::from([
1077                (0, 1024),
1078                (5, 1024),
1079                (6, 1024),
1080                (2, 1024),
1081                (7, 1024),
1082                (9, 1024),
1083                (11, 1024),
1084                (13, (1..1025).sum()),
1085            ])
1086        );
1087        assert_eq!(
1088            *data_file.lower_bounds(),
1089            HashMap::from([
1090                (0, Datum::long(0)),
1091                (5, Datum::long(0)),
1092                (6, Datum::long(0)),
1093                (2, Datum::string("0")),
1094                (7, Datum::long(0)),
1095                (9, Datum::long(0)),
1096                (11, Datum::string("0")),
1097                (13, Datum::long(0))
1098            ])
1099        );
1100        assert_eq!(
1101            *data_file.upper_bounds(),
1102            HashMap::from([
1103                (0, Datum::long(1023)),
1104                (5, Datum::long(1023)),
1105                (6, Datum::long(1023)),
1106                (2, Datum::string("999")),
1107                (7, Datum::long(1023)),
1108                (9, Datum::long(1023)),
1109                (11, Datum::string("999")),
1110                (13, Datum::long(1023))
1111            ])
1112        );
1113
1114        // check the written file
1115        check_parquet_data_file(&file_io, &data_file, &to_write).await;
1116
1117        Ok(())
1118    }
1119
1120    #[tokio::test]
1121    async fn test_all_type_for_write() -> Result<()> {
1122        let temp_dir = TempDir::new().unwrap();
1123        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1124        let location_gen = DefaultLocationGenerator::with_data_location(
1125            temp_dir.path().to_str().unwrap().to_string(),
1126        );
1127        let file_name_gen =
1128            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1129
1130        // prepare data
1131        // generate iceberg schema for all type
1132        let schema = schema_for_all_type();
1133        let arrow_schema: ArrowSchemaRef = Arc::new((&schema).try_into().unwrap());
1134        let col0 = Arc::new(BooleanArray::from(vec![
1135            Some(true),
1136            Some(false),
1137            None,
1138            Some(true),
1139        ])) as ArrayRef;
1140        let col1 = Arc::new(Int32Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1141        let col2 = Arc::new(Int64Array::from(vec![Some(1), Some(2), None, Some(4)])) as ArrayRef;
1142        let col3 = Arc::new(arrow_array::Float32Array::from(vec![
1143            Some(0.5),
1144            Some(2.0),
1145            None,
1146            Some(3.5),
1147        ])) as ArrayRef;
1148        let col4 = Arc::new(arrow_array::Float64Array::from(vec![
1149            Some(0.5),
1150            Some(2.0),
1151            None,
1152            Some(3.5),
1153        ])) as ArrayRef;
1154        let col5 = Arc::new(arrow_array::StringArray::from(vec![
1155            Some("a"),
1156            Some("b"),
1157            None,
1158            Some("d"),
1159        ])) as ArrayRef;
1160        let col6 = Arc::new(arrow_array::LargeBinaryArray::from_opt_vec(vec![
1161            Some(b"one"),
1162            None,
1163            Some(b""),
1164            Some(b"zzzz"),
1165        ])) as ArrayRef;
1166        let col7 = Arc::new(arrow_array::Date32Array::from(vec![
1167            Some(0),
1168            Some(1),
1169            None,
1170            Some(3),
1171        ])) as ArrayRef;
1172        let col8 = Arc::new(arrow_array::Time64MicrosecondArray::from(vec![
1173            Some(0),
1174            Some(1),
1175            None,
1176            Some(3),
1177        ])) as ArrayRef;
1178        let col9 = Arc::new(arrow_array::TimestampMicrosecondArray::from(vec![
1179            Some(0),
1180            Some(1),
1181            None,
1182            Some(3),
1183        ])) as ArrayRef;
1184        let col10 = Arc::new(
1185            arrow_array::TimestampMicrosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1186                .with_timezone_utc(),
1187        ) as ArrayRef;
1188        let col11 = Arc::new(arrow_array::TimestampNanosecondArray::from(vec![
1189            Some(0),
1190            Some(1),
1191            None,
1192            Some(3),
1193        ])) as ArrayRef;
1194        let col12 = Arc::new(
1195            arrow_array::TimestampNanosecondArray::from(vec![Some(0), Some(1), None, Some(3)])
1196                .with_timezone_utc(),
1197        ) as ArrayRef;
1198        let col13 = Arc::new(
1199            arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1200                .with_precision_and_scale(10, 5)
1201                .unwrap(),
1202        ) as ArrayRef;
1203        let col14 = Arc::new(
1204            arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1205                vec![
1206                    Some(Uuid::from_u128(0).as_bytes().to_vec()),
1207                    Some(Uuid::from_u128(1).as_bytes().to_vec()),
1208                    None,
1209                    Some(Uuid::from_u128(3).as_bytes().to_vec()),
1210                ]
1211                .into_iter(),
1212                16,
1213            )
1214            .unwrap(),
1215        ) as ArrayRef;
1216        let col15 = Arc::new(
1217            arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
1218                vec![
1219                    Some(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
1220                    Some(vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]),
1221                    None,
1222                    Some(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]),
1223                ]
1224                .into_iter(),
1225                10,
1226            )
1227            .unwrap(),
1228        ) as ArrayRef;
1229        let col16 = Arc::new(
1230            arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)])
1231                .with_precision_and_scale(38, 5)
1232                .unwrap(),
1233        ) as ArrayRef;
1234        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1235            col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13,
1236            col14, col15, col16,
1237        ])
1238        .unwrap();
1239        let output_file = file_io.new_output(
1240            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1241        )?;
1242
1243        // write data
1244        let mut pw =
1245            ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
1246                .build(output_file)
1247                .await?;
1248        pw.write(&to_write).await?;
1249        let res = pw.close().await?;
1250        assert_eq!(res.len(), 1);
1251        let data_file = res
1252            .into_iter()
1253            .next()
1254            .unwrap()
1255            // Put dummy field for build successfully.
1256            .content(crate::spec::DataContentType::Data)
1257            .partition(Struct::empty())
1258            .partition_spec_id(0)
1259            .build()
1260            .unwrap();
1261
1262        // check data file
1263        assert_eq!(data_file.record_count(), 4);
1264        assert!(data_file.value_counts().iter().all(|(_, &v)| { v == 4 }));
1265        assert!(
1266            data_file
1267                .null_value_counts()
1268                .iter()
1269                .all(|(_, &v)| { v == 1 })
1270        );
1271        assert_eq!(
1272            *data_file.lower_bounds(),
1273            HashMap::from([
1274                (0, Datum::bool(false)),
1275                (1, Datum::int(1)),
1276                (2, Datum::long(1)),
1277                (3, Datum::float(0.5)),
1278                (4, Datum::double(0.5)),
1279                (5, Datum::string("a")),
1280                (6, Datum::binary(vec![])),
1281                (7, Datum::date(0)),
1282                (8, Datum::time_micros(0).unwrap()),
1283                (9, Datum::timestamp_micros(0)),
1284                (10, Datum::timestamptz_micros(0)),
1285                (11, Datum::timestamp_nanos(0)),
1286                (12, Datum::timestamptz_nanos(0)),
1287                (
1288                    13,
1289                    Datum::new(
1290                        PrimitiveType::Decimal {
1291                            precision: 10,
1292                            scale: 5
1293                        },
1294                        PrimitiveLiteral::Int128(1)
1295                    )
1296                ),
1297                (14, Datum::uuid(Uuid::from_u128(0))),
1298                (15, Datum::fixed(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1299                (
1300                    16,
1301                    Datum::new(
1302                        PrimitiveType::Decimal {
1303                            precision: 38,
1304                            scale: 5
1305                        },
1306                        PrimitiveLiteral::Int128(1)
1307                    )
1308                ),
1309            ])
1310        );
1311        assert_eq!(
1312            *data_file.upper_bounds(),
1313            HashMap::from([
1314                (0, Datum::bool(true)),
1315                (1, Datum::int(4)),
1316                (2, Datum::long(4)),
1317                (3, Datum::float(3.5)),
1318                (4, Datum::double(3.5)),
1319                (5, Datum::string("d")),
1320                (6, Datum::binary(vec![122, 122, 122, 122])),
1321                (7, Datum::date(3)),
1322                (8, Datum::time_micros(3).unwrap()),
1323                (9, Datum::timestamp_micros(3)),
1324                (10, Datum::timestamptz_micros(3)),
1325                (11, Datum::timestamp_nanos(3)),
1326                (12, Datum::timestamptz_nanos(3)),
1327                (
1328                    13,
1329                    Datum::new(
1330                        PrimitiveType::Decimal {
1331                            precision: 10,
1332                            scale: 5
1333                        },
1334                        PrimitiveLiteral::Int128(100)
1335                    )
1336                ),
1337                (14, Datum::uuid(Uuid::from_u128(3))),
1338                (
1339                    15,
1340                    Datum::fixed(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30])
1341                ),
1342                (
1343                    16,
1344                    Datum::new(
1345                        PrimitiveType::Decimal {
1346                            precision: 38,
1347                            scale: 5
1348                        },
1349                        PrimitiveLiteral::Int128(100)
1350                    )
1351                ),
1352            ])
1353        );
1354
1355        // check the written file
1356        check_parquet_data_file(&file_io, &data_file, &to_write).await;
1357
1358        Ok(())
1359    }
1360
1361    #[tokio::test]
1362    async fn test_decimal_bound() -> Result<()> {
1363        let temp_dir = TempDir::new().unwrap();
1364        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1365        let location_gen = DefaultLocationGenerator::with_data_location(
1366            temp_dir.path().to_str().unwrap().to_string(),
1367        );
1368        let file_name_gen =
1369            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1370
1371        // test 1.1 and 2.2
1372        let schema = Arc::new(
1373            Schema::builder()
1374                .with_fields(vec![
1375                    NestedField::optional(
1376                        0,
1377                        "decimal",
1378                        Type::Primitive(PrimitiveType::Decimal {
1379                            precision: 28,
1380                            scale: 10,
1381                        }),
1382                    )
1383                    .into(),
1384                ])
1385                .build()
1386                .unwrap(),
1387        );
1388        let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1389        let output_file = file_io.new_output(
1390            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1391        )?;
1392        let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1393            .build(output_file)
1394            .await?;
1395        let col0 = Arc::new(
1396            Decimal128Array::from(vec![Some(22000000000), Some(11000000000)])
1397                .with_data_type(DataType::Decimal128(28, 10)),
1398        ) as ArrayRef;
1399        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1400        pw.write(&to_write).await?;
1401        let res = pw.close().await?;
1402        assert_eq!(res.len(), 1);
1403        let data_file = res
1404            .into_iter()
1405            .next()
1406            .unwrap()
1407            .content(crate::spec::DataContentType::Data)
1408            .partition(Struct::empty())
1409            .partition_spec_id(0)
1410            .build()
1411            .unwrap();
1412        assert_eq!(
1413            data_file.upper_bounds().get(&0),
1414            Some(Datum::decimal_with_precision(Decimal::new(22000000000_i64, 10), 28).unwrap())
1415                .as_ref()
1416        );
1417        assert_eq!(
1418            data_file.lower_bounds().get(&0),
1419            Some(Datum::decimal_with_precision(Decimal::new(11000000000_i64, 10), 28).unwrap())
1420                .as_ref()
1421        );
1422
1423        // test -1.1 and -2.2
1424        let schema = Arc::new(
1425            Schema::builder()
1426                .with_fields(vec![
1427                    NestedField::optional(
1428                        0,
1429                        "decimal",
1430                        Type::Primitive(PrimitiveType::Decimal {
1431                            precision: 28,
1432                            scale: 10,
1433                        }),
1434                    )
1435                    .into(),
1436                ])
1437                .build()
1438                .unwrap(),
1439        );
1440        let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1441        let output_file = file_io.new_output(
1442            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1443        )?;
1444        let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1445            .build(output_file)
1446            .await?;
1447        let col0 = Arc::new(
1448            Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)])
1449                .with_data_type(DataType::Decimal128(28, 10)),
1450        ) as ArrayRef;
1451        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1452        pw.write(&to_write).await?;
1453        let res = pw.close().await?;
1454        assert_eq!(res.len(), 1);
1455        let data_file = res
1456            .into_iter()
1457            .next()
1458            .unwrap()
1459            .content(crate::spec::DataContentType::Data)
1460            .partition(Struct::empty())
1461            .partition_spec_id(0)
1462            .build()
1463            .unwrap();
1464        assert_eq!(
1465            data_file.upper_bounds().get(&0),
1466            Some(Datum::decimal_with_precision(Decimal::new(-11000000000_i64, 10), 28).unwrap())
1467                .as_ref()
1468        );
1469        assert_eq!(
1470            data_file.lower_bounds().get(&0),
1471            Some(Datum::decimal_with_precision(Decimal::new(-22000000000_i64, 10), 28).unwrap())
1472                .as_ref()
1473        );
1474
1475        // test max and min of rust_decimal
1476        let decimal_max = Decimal::MAX;
1477        let decimal_min = Decimal::MIN;
1478        assert_eq!(decimal_max.scale(), decimal_min.scale());
1479        let schema = Arc::new(
1480            Schema::builder()
1481                .with_fields(vec![
1482                    NestedField::optional(
1483                        0,
1484                        "decimal",
1485                        Type::Primitive(PrimitiveType::Decimal {
1486                            precision: 38,
1487                            scale: decimal_max.scale(),
1488                        }),
1489                    )
1490                    .into(),
1491                ])
1492                .build()
1493                .unwrap(),
1494        );
1495        let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1496        let output_file = file_io.new_output(
1497            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1498        )?;
1499        let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema)
1500            .build(output_file)
1501            .await?;
1502        let col0 = Arc::new(
1503            Decimal128Array::from(vec![
1504                Some(decimal_max.mantissa()),
1505                Some(decimal_min.mantissa()),
1506            ])
1507            .with_data_type(DataType::Decimal128(38, 0)),
1508        ) as ArrayRef;
1509        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1510        pw.write(&to_write).await?;
1511        let res = pw.close().await?;
1512        assert_eq!(res.len(), 1);
1513        let data_file = res
1514            .into_iter()
1515            .next()
1516            .unwrap()
1517            .content(crate::spec::DataContentType::Data)
1518            .partition(Struct::empty())
1519            .partition_spec_id(0)
1520            .build()
1521            .unwrap();
1522        assert_eq!(
1523            data_file.upper_bounds().get(&0),
1524            Some(Datum::decimal(decimal_max).unwrap()).as_ref()
1525        );
1526        assert_eq!(
1527            data_file.lower_bounds().get(&0),
1528            Some(Datum::decimal(decimal_min).unwrap()).as_ref()
1529        );
1530
1531        // test max and min for scale 38
1532        // # TODO
1533        // Readd this case after resolve https://github.com/apache/iceberg-rust/issues/669
1534        // let schema = Arc::new(
1535        //     Schema::builder()
1536        //         .with_fields(vec![NestedField::optional(
1537        //             0,
1538        //             "decimal",
1539        //             Type::Primitive(PrimitiveType::Decimal {
1540        //                 precision: 38,
1541        //                 scale: 0,
1542        //             }),
1543        //         )
1544        //         .into()])
1545        //         .build()
1546        //         .unwrap(),
1547        // );
1548        // let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1549        // let mut pw = ParquetWriterBuilder::new(
1550        //     WriterProperties::builder().build(),
1551        //     schema,
1552        //     file_io.clone(),
1553        //     loccation_gen,
1554        //     file_name_gen,
1555        // )
1556        // .build()
1557        // .await?;
1558        // let col0 = Arc::new(
1559        //     Decimal128Array::from(vec![
1560        //         Some(99999999999999999999999999999999999999_i128),
1561        //         Some(-99999999999999999999999999999999999999_i128),
1562        //     ])
1563        //     .with_data_type(DataType::Decimal128(38, 0)),
1564        // ) as ArrayRef;
1565        // let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap();
1566        // pw.write(&to_write).await?;
1567        // let res = pw.close().await?;
1568        // assert_eq!(res.len(), 1);
1569        // let data_file = res
1570        //     .into_iter()
1571        //     .next()
1572        //     .unwrap()
1573        //     .content(crate::spec::DataContentType::Data)
1574        //     .partition(Struct::empty())
1575        //     .build()
1576        //     .unwrap();
1577        // assert_eq!(
1578        //     data_file.upper_bounds().get(&0),
1579        //     Some(Datum::new(
1580        //         PrimitiveType::Decimal {
1581        //             precision: 38,
1582        //             scale: 0
1583        //         },
1584        //         PrimitiveLiteral::Int128(99999999999999999999999999999999999999_i128)
1585        //     ))
1586        //     .as_ref()
1587        // );
1588        // assert_eq!(
1589        //     data_file.lower_bounds().get(&0),
1590        //     Some(Datum::new(
1591        //         PrimitiveType::Decimal {
1592        //             precision: 38,
1593        //             scale: 0
1594        //         },
1595        //         PrimitiveLiteral::Int128(-99999999999999999999999999999999999999_i128)
1596        //     ))
1597        //     .as_ref()
1598        // );
1599
1600        Ok(())
1601    }
1602
1603    #[tokio::test]
1604    async fn test_empty_write() -> Result<()> {
1605        let temp_dir = TempDir::new().unwrap();
1606        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1607        let location_gen = DefaultLocationGenerator::with_data_location(
1608            temp_dir.path().to_str().unwrap().to_string(),
1609        );
1610        let file_name_gen =
1611            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1612
1613        // Test that file will create if data to write
1614        let schema = {
1615            let fields = vec![
1616                arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true).with_metadata(
1617                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1618                ),
1619            ];
1620            Arc::new(arrow_schema::Schema::new(fields))
1621        };
1622        let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
1623        let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
1624        let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name());
1625        let output_file = file_io.new_output(&file_path)?;
1626        let mut pw = ParquetWriterBuilder::new(
1627            WriterProperties::builder().build(),
1628            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1629        )
1630        .build(output_file)
1631        .await?;
1632        pw.write(&to_write).await?;
1633        pw.close().await.unwrap();
1634        assert!(file_io.exists(&file_path).await.unwrap());
1635
1636        // Test that file will not create if no data to write
1637        let file_name_gen =
1638            DefaultFileNameGenerator::new("test_empty".to_string(), None, DataFileFormat::Parquet);
1639        let file_path = location_gen.generate_location(None, &file_name_gen.generate_file_name());
1640        let output_file = file_io.new_output(&file_path)?;
1641        let pw = ParquetWriterBuilder::new(
1642            WriterProperties::builder().build(),
1643            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1644        )
1645        .build(output_file)
1646        .await?;
1647        pw.close().await.unwrap();
1648        assert!(!file_io.exists(&file_path).await.unwrap());
1649
1650        Ok(())
1651    }
1652
1653    #[tokio::test]
1654    async fn test_nan_val_cnts_primitive_type() -> Result<()> {
1655        let temp_dir = TempDir::new().unwrap();
1656        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1657        let location_gen = DefaultLocationGenerator::with_data_location(
1658            temp_dir.path().to_str().unwrap().to_string(),
1659        );
1660        let file_name_gen =
1661            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1662
1663        // prepare data
1664        let arrow_schema = {
1665            let fields = vec![
1666                Field::new("col", arrow_schema::DataType::Float32, false).with_metadata(
1667                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1668                ),
1669                Field::new("col2", arrow_schema::DataType::Float64, false).with_metadata(
1670                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1671                ),
1672            ];
1673            Arc::new(arrow_schema::Schema::new(fields))
1674        };
1675
1676        let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1677            [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1678            None,
1679        )) as ArrayRef;
1680
1681        let float_64_col = Arc::new(Float64Array::from_iter_values_with_nulls(
1682            [1.0_f64, f64::NAN, 2.0, 2.0].into_iter(),
1683            None,
1684        )) as ArrayRef;
1685
1686        let to_write =
1687            RecordBatch::try_new(arrow_schema.clone(), vec![float_32_col, float_64_col]).unwrap();
1688        let output_file = file_io.new_output(
1689            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1690        )?;
1691
1692        // write data
1693        let mut pw = ParquetWriterBuilder::new(
1694            WriterProperties::builder().build(),
1695            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1696        )
1697        .build(output_file)
1698        .await?;
1699
1700        pw.write(&to_write).await?;
1701        let res = pw.close().await?;
1702        assert_eq!(res.len(), 1);
1703        let data_file = res
1704            .into_iter()
1705            .next()
1706            .unwrap()
1707            // Put dummy field for build successfully.
1708            .content(crate::spec::DataContentType::Data)
1709            .partition(Struct::empty())
1710            .partition_spec_id(0)
1711            .build()
1712            .unwrap();
1713
1714        // check data file
1715        assert_eq!(data_file.record_count(), 4);
1716        assert_eq!(*data_file.value_counts(), HashMap::from([(0, 4), (1, 4)]));
1717        assert_eq!(
1718            *data_file.lower_bounds(),
1719            HashMap::from([(0, Datum::float(1.0)), (1, Datum::double(1.0)),])
1720        );
1721        assert_eq!(
1722            *data_file.upper_bounds(),
1723            HashMap::from([(0, Datum::float(2.0)), (1, Datum::double(2.0)),])
1724        );
1725        assert_eq!(
1726            *data_file.null_value_counts(),
1727            HashMap::from([(0, 0), (1, 0)])
1728        );
1729        assert_eq!(
1730            *data_file.nan_value_counts(),
1731            HashMap::from([(0, 1), (1, 1)])
1732        );
1733
1734        // check the written file
1735        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1736        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1737
1738        Ok(())
1739    }
1740
1741    #[tokio::test]
1742    async fn test_nan_val_cnts_struct_type() -> Result<()> {
1743        let temp_dir = TempDir::new().unwrap();
1744        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1745        let location_gen = DefaultLocationGenerator::with_data_location(
1746            temp_dir.path().to_str().unwrap().to_string(),
1747        );
1748        let file_name_gen =
1749            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1750
1751        let schema_struct_float_fields = Fields::from(vec![
1752            Field::new("col4", DataType::Float32, false).with_metadata(HashMap::from([(
1753                PARQUET_FIELD_ID_META_KEY.to_string(),
1754                "4".to_string(),
1755            )])),
1756        ]);
1757
1758        let schema_struct_nested_float_fields = Fields::from(vec![
1759            Field::new("col7", DataType::Float32, false).with_metadata(HashMap::from([(
1760                PARQUET_FIELD_ID_META_KEY.to_string(),
1761                "7".to_string(),
1762            )])),
1763        ]);
1764
1765        let schema_struct_nested_fields = Fields::from(vec![
1766            Field::new(
1767                "col6",
1768                arrow_schema::DataType::Struct(schema_struct_nested_float_fields.clone()),
1769                false,
1770            )
1771            .with_metadata(HashMap::from([(
1772                PARQUET_FIELD_ID_META_KEY.to_string(),
1773                "6".to_string(),
1774            )])),
1775        ]);
1776
1777        // prepare data
1778        let arrow_schema = {
1779            let fields = vec![
1780                Field::new(
1781                    "col3",
1782                    arrow_schema::DataType::Struct(schema_struct_float_fields.clone()),
1783                    false,
1784                )
1785                .with_metadata(HashMap::from([(
1786                    PARQUET_FIELD_ID_META_KEY.to_string(),
1787                    "3".to_string(),
1788                )])),
1789                Field::new(
1790                    "col5",
1791                    arrow_schema::DataType::Struct(schema_struct_nested_fields.clone()),
1792                    false,
1793                )
1794                .with_metadata(HashMap::from([(
1795                    PARQUET_FIELD_ID_META_KEY.to_string(),
1796                    "5".to_string(),
1797                )])),
1798            ];
1799            Arc::new(arrow_schema::Schema::new(fields))
1800        };
1801
1802        let float_32_col = Arc::new(Float32Array::from_iter_values_with_nulls(
1803            [1.0_f32, f32::NAN, 2.0, 2.0].into_iter(),
1804            None,
1805        )) as ArrayRef;
1806
1807        let struct_float_field_col = Arc::new(StructArray::new(
1808            schema_struct_float_fields,
1809            vec![float_32_col.clone()],
1810            None,
1811        )) as ArrayRef;
1812
1813        let struct_nested_float_field_col = Arc::new(StructArray::new(
1814            schema_struct_nested_fields,
1815            vec![Arc::new(StructArray::new(
1816                schema_struct_nested_float_fields,
1817                vec![float_32_col.clone()],
1818                None,
1819            )) as ArrayRef],
1820            None,
1821        )) as ArrayRef;
1822
1823        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1824            struct_float_field_col,
1825            struct_nested_float_field_col,
1826        ])
1827        .unwrap();
1828        let output_file = file_io.new_output(
1829            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1830        )?;
1831
1832        // write data
1833        let mut pw = ParquetWriterBuilder::new(
1834            WriterProperties::builder().build(),
1835            Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1836        )
1837        .build(output_file)
1838        .await?;
1839
1840        pw.write(&to_write).await?;
1841        let res = pw.close().await?;
1842        assert_eq!(res.len(), 1);
1843        let data_file = res
1844            .into_iter()
1845            .next()
1846            .unwrap()
1847            // Put dummy field for build successfully.
1848            .content(crate::spec::DataContentType::Data)
1849            .partition(Struct::empty())
1850            .partition_spec_id(0)
1851            .build()
1852            .unwrap();
1853
1854        // check data file
1855        assert_eq!(data_file.record_count(), 4);
1856        assert_eq!(*data_file.value_counts(), HashMap::from([(4, 4), (7, 4)]));
1857        assert_eq!(
1858            *data_file.lower_bounds(),
1859            HashMap::from([(4, Datum::float(1.0)), (7, Datum::float(1.0)),])
1860        );
1861        assert_eq!(
1862            *data_file.upper_bounds(),
1863            HashMap::from([(4, Datum::float(2.0)), (7, Datum::float(2.0)),])
1864        );
1865        assert_eq!(
1866            *data_file.null_value_counts(),
1867            HashMap::from([(4, 0), (7, 0)])
1868        );
1869        assert_eq!(
1870            *data_file.nan_value_counts(),
1871            HashMap::from([(4, 1), (7, 1)])
1872        );
1873
1874        // check the written file
1875        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
1876        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
1877
1878        Ok(())
1879    }
1880
1881    #[tokio::test]
1882    async fn test_nan_val_cnts_list_type() -> Result<()> {
1883        let temp_dir = TempDir::new().unwrap();
1884        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1885        let location_gen = DefaultLocationGenerator::with_data_location(
1886            temp_dir.path().to_str().unwrap().to_string(),
1887        );
1888        let file_name_gen =
1889            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1890
1891        let schema_list_float_field = Field::new("element", DataType::Float32, true).with_metadata(
1892            HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
1893        );
1894
1895        let schema_struct_list_float_field = Field::new("element", DataType::Float32, true)
1896            .with_metadata(HashMap::from([(
1897                PARQUET_FIELD_ID_META_KEY.to_string(),
1898                "4".to_string(),
1899            )]));
1900
1901        let schema_struct_list_field = Fields::from(vec![
1902            Field::new_list("col2", schema_struct_list_float_field.clone(), true).with_metadata(
1903                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
1904            ),
1905        ]);
1906
1907        let arrow_schema = {
1908            let fields = vec![
1909                Field::new_list("col0", schema_list_float_field.clone(), true).with_metadata(
1910                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1911                ),
1912                Field::new_struct("col1", schema_struct_list_field.clone(), true)
1913                    .with_metadata(HashMap::from([(
1914                        PARQUET_FIELD_ID_META_KEY.to_string(),
1915                        "2".to_string(),
1916                    )]))
1917                    .clone(),
1918                // Field::new_large_list("col3", schema_large_list_float_field.clone(), true).with_metadata(
1919                //     HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "5".to_string())]),
1920                // ).clone(),
1921            ];
1922            Arc::new(arrow_schema::Schema::new(fields))
1923        };
1924
1925        let list_parts = ListArray::from_iter_primitive::<Float32Type, _, _>(vec![Some(vec![
1926            Some(1.0_f32),
1927            Some(f32::NAN),
1928            Some(2.0),
1929            Some(2.0),
1930        ])])
1931        .into_parts();
1932
1933        let list_float_field_col = Arc::new({
1934            let list_parts = list_parts.clone();
1935            ListArray::new(
1936                {
1937                    if let DataType::List(field) = arrow_schema.field(0).data_type() {
1938                        field.clone()
1939                    } else {
1940                        unreachable!()
1941                    }
1942                },
1943                list_parts.1,
1944                list_parts.2,
1945                list_parts.3,
1946            )
1947        }) as ArrayRef;
1948
1949        let struct_list_fields_schema =
1950            if let DataType::Struct(fields) = arrow_schema.field(1).data_type() {
1951                fields.clone()
1952            } else {
1953                unreachable!()
1954            };
1955
1956        let struct_list_float_field_col = Arc::new({
1957            ListArray::new(
1958                {
1959                    if let DataType::List(field) = struct_list_fields_schema
1960                        .first()
1961                        .expect("could not find first list field")
1962                        .data_type()
1963                    {
1964                        field.clone()
1965                    } else {
1966                        unreachable!()
1967                    }
1968                },
1969                list_parts.1,
1970                list_parts.2,
1971                list_parts.3,
1972            )
1973        }) as ArrayRef;
1974
1975        let struct_list_float_field_col = Arc::new(StructArray::new(
1976            struct_list_fields_schema,
1977            vec![struct_list_float_field_col.clone()],
1978            None,
1979        )) as ArrayRef;
1980
1981        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
1982            list_float_field_col,
1983            struct_list_float_field_col,
1984            // large_list_float_field_col,
1985        ])
1986        .expect("Could not form record batch");
1987        let output_file = file_io.new_output(
1988            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
1989        )?;
1990
1991        // write data
1992        let mut pw = ParquetWriterBuilder::new(
1993            WriterProperties::builder().build(),
1994            Arc::new(
1995                to_write
1996                    .schema()
1997                    .as_ref()
1998                    .try_into()
1999                    .expect("Could not convert iceberg schema"),
2000            ),
2001        )
2002        .build(output_file)
2003        .await?;
2004
2005        pw.write(&to_write).await?;
2006        let res = pw.close().await?;
2007        assert_eq!(res.len(), 1);
2008        let data_file = res
2009            .into_iter()
2010            .next()
2011            .unwrap()
2012            .content(crate::spec::DataContentType::Data)
2013            .partition(Struct::empty())
2014            .partition_spec_id(0)
2015            .build()
2016            .unwrap();
2017
2018        // check data file
2019        assert_eq!(data_file.record_count(), 1);
2020        assert_eq!(*data_file.value_counts(), HashMap::from([(1, 4), (4, 4)]));
2021        assert_eq!(
2022            *data_file.lower_bounds(),
2023            HashMap::from([(1, Datum::float(1.0)), (4, Datum::float(1.0))])
2024        );
2025        assert_eq!(
2026            *data_file.upper_bounds(),
2027            HashMap::from([(1, Datum::float(2.0)), (4, Datum::float(2.0))])
2028        );
2029        assert_eq!(
2030            *data_file.null_value_counts(),
2031            HashMap::from([(1, 0), (4, 0)])
2032        );
2033        assert_eq!(
2034            *data_file.nan_value_counts(),
2035            HashMap::from([(1, 1), (4, 1)])
2036        );
2037
2038        // check the written file
2039        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2040        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2041
2042        Ok(())
2043    }
2044
2045    macro_rules! construct_map_arr {
2046        ($map_key_field_schema:ident, $map_value_field_schema:ident) => {{
2047            let int_builder = Int32Builder::new();
2048            let float_builder = Float32Builder::with_capacity(4);
2049            let mut builder = MapBuilder::new(None, int_builder, float_builder);
2050            builder.keys().append_value(1);
2051            builder.values().append_value(1.0_f32);
2052            builder.append(true).unwrap();
2053            builder.keys().append_value(2);
2054            builder.values().append_value(f32::NAN);
2055            builder.append(true).unwrap();
2056            builder.keys().append_value(3);
2057            builder.values().append_value(2.0);
2058            builder.append(true).unwrap();
2059            builder.keys().append_value(4);
2060            builder.values().append_value(2.0);
2061            builder.append(true).unwrap();
2062            let array = builder.finish();
2063
2064            let (_field, offsets, entries, nulls, ordered) = array.into_parts();
2065            let new_struct_fields_schema =
2066                Fields::from(vec![$map_key_field_schema, $map_value_field_schema]);
2067
2068            let entries = {
2069                let (_, arrays, nulls) = entries.into_parts();
2070                StructArray::new(new_struct_fields_schema.clone(), arrays, nulls)
2071            };
2072
2073            let field = Arc::new(Field::new(
2074                DEFAULT_MAP_FIELD_NAME,
2075                DataType::Struct(new_struct_fields_schema),
2076                false,
2077            ));
2078
2079            Arc::new(MapArray::new(field, offsets, entries, nulls, ordered))
2080        }};
2081    }
2082
2083    #[tokio::test]
2084    async fn test_nan_val_cnts_map_type() -> Result<()> {
2085        let temp_dir = TempDir::new().unwrap();
2086        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2087        let location_gen = DefaultLocationGenerator::with_data_location(
2088            temp_dir.path().to_str().unwrap().to_string(),
2089        );
2090        let file_name_gen =
2091            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2092
2093        let map_key_field_schema =
2094            Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2095                (PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()),
2096            ]));
2097
2098        let map_value_field_schema =
2099            Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2100                [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
2101            ));
2102
2103        let struct_map_key_field_schema =
2104            Field::new(MAP_KEY_FIELD_NAME, DataType::Int32, false).with_metadata(HashMap::from([
2105                (PARQUET_FIELD_ID_META_KEY.to_string(), "6".to_string()),
2106            ]));
2107
2108        let struct_map_value_field_schema =
2109            Field::new(MAP_VALUE_FIELD_NAME, DataType::Float32, true).with_metadata(HashMap::from(
2110                [(PARQUET_FIELD_ID_META_KEY.to_string(), "7".to_string())],
2111            ));
2112
2113        let schema_struct_map_field = Fields::from(vec![
2114            Field::new_map(
2115                "col3",
2116                DEFAULT_MAP_FIELD_NAME,
2117                struct_map_key_field_schema.clone(),
2118                struct_map_value_field_schema.clone(),
2119                false,
2120                false,
2121            )
2122            .with_metadata(HashMap::from([(
2123                PARQUET_FIELD_ID_META_KEY.to_string(),
2124                "5".to_string(),
2125            )])),
2126        ]);
2127
2128        let arrow_schema = {
2129            let fields = vec![
2130                Field::new_map(
2131                    "col0",
2132                    DEFAULT_MAP_FIELD_NAME,
2133                    map_key_field_schema.clone(),
2134                    map_value_field_schema.clone(),
2135                    false,
2136                    false,
2137                )
2138                .with_metadata(HashMap::from([(
2139                    PARQUET_FIELD_ID_META_KEY.to_string(),
2140                    "0".to_string(),
2141                )])),
2142                Field::new_struct("col1", schema_struct_map_field.clone(), true)
2143                    .with_metadata(HashMap::from([(
2144                        PARQUET_FIELD_ID_META_KEY.to_string(),
2145                        "3".to_string(),
2146                    )]))
2147                    .clone(),
2148            ];
2149            Arc::new(arrow_schema::Schema::new(fields))
2150        };
2151
2152        let map_array = construct_map_arr!(map_key_field_schema, map_value_field_schema);
2153
2154        let struct_map_arr =
2155            construct_map_arr!(struct_map_key_field_schema, struct_map_value_field_schema);
2156
2157        let struct_list_float_field_col = Arc::new(StructArray::new(
2158            schema_struct_map_field,
2159            vec![struct_map_arr],
2160            None,
2161        )) as ArrayRef;
2162
2163        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
2164            map_array,
2165            struct_list_float_field_col,
2166        ])
2167        .expect("Could not form record batch");
2168        let output_file = file_io.new_output(
2169            location_gen.generate_location(None, &file_name_gen.generate_file_name()),
2170        )?;
2171
2172        // write data
2173        let mut pw = ParquetWriterBuilder::new(
2174            WriterProperties::builder().build(),
2175            Arc::new(
2176                to_write
2177                    .schema()
2178                    .as_ref()
2179                    .try_into()
2180                    .expect("Could not convert iceberg schema"),
2181            ),
2182        )
2183        .build(output_file)
2184        .await?;
2185
2186        pw.write(&to_write).await?;
2187        let res = pw.close().await?;
2188        assert_eq!(res.len(), 1);
2189        let data_file = res
2190            .into_iter()
2191            .next()
2192            .unwrap()
2193            .content(crate::spec::DataContentType::Data)
2194            .partition(Struct::empty())
2195            .partition_spec_id(0)
2196            .build()
2197            .unwrap();
2198
2199        // check data file
2200        assert_eq!(data_file.record_count(), 4);
2201        assert_eq!(
2202            *data_file.value_counts(),
2203            HashMap::from([(1, 4), (2, 4), (6, 4), (7, 4)])
2204        );
2205        assert_eq!(
2206            *data_file.lower_bounds(),
2207            HashMap::from([
2208                (1, Datum::int(1)),
2209                (2, Datum::float(1.0)),
2210                (6, Datum::int(1)),
2211                (7, Datum::float(1.0))
2212            ])
2213        );
2214        assert_eq!(
2215            *data_file.upper_bounds(),
2216            HashMap::from([
2217                (1, Datum::int(4)),
2218                (2, Datum::float(2.0)),
2219                (6, Datum::int(4)),
2220                (7, Datum::float(2.0))
2221            ])
2222        );
2223        assert_eq!(
2224            *data_file.null_value_counts(),
2225            HashMap::from([(1, 0), (2, 0), (6, 0), (7, 0)])
2226        );
2227        assert_eq!(
2228            *data_file.nan_value_counts(),
2229            HashMap::from([(2, 1), (7, 1)])
2230        );
2231
2232        // check the written file
2233        let expect_batch = concat_batches(&arrow_schema, vec![&to_write]).unwrap();
2234        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
2235
2236        Ok(())
2237    }
2238
2239    #[tokio::test]
2240    async fn test_write_empty_parquet_file() {
2241        let temp_dir = TempDir::new().unwrap();
2242        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2243        let location_gen = DefaultLocationGenerator::with_data_location(
2244            temp_dir.path().to_str().unwrap().to_string(),
2245        );
2246        let file_name_gen =
2247            DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2248        let output_file = file_io
2249            .new_output(location_gen.generate_location(None, &file_name_gen.generate_file_name()))
2250            .unwrap();
2251
2252        // write data
2253        let pw = ParquetWriterBuilder::new(
2254            WriterProperties::builder().build(),
2255            Arc::new(
2256                Schema::builder()
2257                    .with_schema_id(1)
2258                    .with_fields(vec![
2259                        NestedField::required(0, "col", Type::Primitive(PrimitiveType::Long))
2260                            .with_id(0)
2261                            .into(),
2262                    ])
2263                    .build()
2264                    .expect("Failed to create schema"),
2265            ),
2266        )
2267        .build(output_file)
2268        .await
2269        .unwrap();
2270
2271        let res = pw.close().await.unwrap();
2272        assert_eq!(res.len(), 0);
2273
2274        // Check that file should have been deleted.
2275        assert_eq!(std::fs::read_dir(temp_dir.path()).unwrap().count(), 0);
2276    }
2277
2278    #[test]
2279    fn test_min_max_aggregator() {
2280        let schema = Arc::new(
2281            Schema::builder()
2282                .with_schema_id(1)
2283                .with_fields(vec![
2284                    NestedField::required(0, "col", Type::Primitive(PrimitiveType::Int))
2285                        .with_id(0)
2286                        .into(),
2287                ])
2288                .build()
2289                .expect("Failed to create schema"),
2290        );
2291        let mut min_max_agg = MinMaxColAggregator::new(schema);
2292        let create_statistics =
2293            |min, max| Statistics::Int32(ValueStatistics::new(min, max, None, None, false));
2294        min_max_agg
2295            .update(0, create_statistics(None, Some(42)))
2296            .unwrap();
2297        min_max_agg
2298            .update(0, create_statistics(Some(0), Some(i32::MAX)))
2299            .unwrap();
2300        min_max_agg
2301            .update(0, create_statistics(Some(i32::MIN), None))
2302            .unwrap();
2303        min_max_agg
2304            .update(0, create_statistics(None, None))
2305            .unwrap();
2306
2307        let (lower_bounds, upper_bounds) = min_max_agg.produce();
2308
2309        assert_eq!(lower_bounds, HashMap::from([(0, Datum::int(i32::MIN))]));
2310        assert_eq!(upper_bounds, HashMap::from([(0, Datum::int(i32::MAX))]));
2311    }
2312}