iceberg/arrow/reader/
pipeline.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The main `ArrowReader` pipeline: reading a stream of `FileScanTask`s,
19//! opening Parquet files and resolving schemas, then wiring projection,
20//! predicates, row-group / row selection, and delete handling into a stream
21//! of transformed Arrow `RecordBatch`es.
22
23use std::sync::Arc;
24use std::sync::atomic::AtomicU64;
25
26use futures::{StreamExt, TryStreamExt};
27use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
28use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder};
29
30use super::{
31    ArrowFileReader, ArrowReader, ParquetReadOptions, add_fallback_field_ids_to_arrow_schema,
32    apply_name_mapping_to_arrow_schema,
33};
34use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
35use crate::arrow::int96::coerce_int96_timestamps;
36use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
37use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics, ScanResult};
38use crate::error::Result;
39use crate::io::{FileIO, FileMetadata, FileRead};
40use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
41use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
42use crate::spec::Datum;
43use crate::{Error, ErrorKind};
44
45impl ArrowReader {
46    /// Take a stream of FileScanTasks and reads all the files.
47    /// Returns a [`ScanResult`] containing the record batch stream and scan metrics.
48    pub fn read(self, tasks: FileScanTaskStream) -> Result<ScanResult> {
49        let concurrency_limit_data_files = self.concurrency_limit_data_files;
50        let scan_metrics = ScanMetrics::new();
51
52        let task_reader = FileScanTaskReader {
53            batch_size: self.batch_size,
54            file_io: self.file_io,
55            delete_file_loader: self
56                .delete_file_loader
57                .with_scan_metrics(scan_metrics.clone()),
58            row_group_filtering_enabled: self.row_group_filtering_enabled,
59            row_selection_enabled: self.row_selection_enabled,
60            parquet_read_options: self.parquet_read_options,
61            scan_metrics: scan_metrics.clone(),
62        };
63
64        // Fast-path for single concurrency to avoid overhead of try_flatten_unordered
65        let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 {
66            Box::pin(
67                tasks
68                    .and_then(move |task| task_reader.clone().process(task))
69                    .map_err(|err| {
70                        Error::new(ErrorKind::Unexpected, "file scan task generate failed")
71                            .with_source(err)
72                    })
73                    .try_flatten(),
74            )
75        } else {
76            Box::pin(
77                tasks
78                    .map_ok(move |task| task_reader.clone().process(task))
79                    .map_err(|err| {
80                        Error::new(ErrorKind::Unexpected, "file scan task generate failed")
81                            .with_source(err)
82                    })
83                    .try_buffer_unordered(concurrency_limit_data_files)
84                    .try_flatten_unordered(concurrency_limit_data_files),
85            )
86        };
87
88        Ok(ScanResult::new(stream, scan_metrics))
89    }
90}
91
92/// Per-scan state for processing [`FileScanTask`]s. Created once per
93/// [`ArrowReader::read`] call and cloned per task.
94#[derive(Clone)]
95struct FileScanTaskReader {
96    batch_size: Option<usize>,
97    file_io: FileIO,
98    delete_file_loader: CachingDeleteFileLoader,
99    row_group_filtering_enabled: bool,
100    row_selection_enabled: bool,
101    parquet_read_options: ParquetReadOptions,
102    scan_metrics: ScanMetrics,
103}
104
105impl FileScanTaskReader {
106    async fn process(self, task: FileScanTask) -> Result<ArrowRecordBatchStream> {
107        let should_load_page_index =
108            (self.row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
109        let mut parquet_read_options = self.parquet_read_options;
110        parquet_read_options.preload_page_index = should_load_page_index;
111
112        let delete_filter_rx = self
113            .delete_file_loader
114            .load_deletes(&task.deletes, Arc::clone(&task.schema));
115
116        // Open the Parquet file once, loading its metadata
117        let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file(
118            &task.data_file_path,
119            &self.file_io,
120            task.file_size_in_bytes,
121            parquet_read_options,
122            self.scan_metrics.bytes_read_counter(),
123        )
124        .await?;
125
126        // Check if Parquet file has embedded field IDs
127        // Corresponds to Java's ParquetSchemaUtil.hasIds()
128        // Reference: parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java:118
129        let missing_field_ids = arrow_metadata
130            .schema()
131            .fields()
132            .iter()
133            .next()
134            .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
135
136        // Three-branch schema resolution strategy matching Java's ReadConf constructor
137        //
138        // Per Iceberg spec Column Projection rules:
139        // "Columns in Iceberg data files are selected by field id. The table schema's column
140        //  names and order may change after a data file is written, and projection must be done
141        //  using field ids."
142        // https://iceberg.apache.org/spec/#column-projection
143        //
144        // When Parquet files lack field IDs (e.g., Hive/Spark migrations via add_files),
145        // we must assign field IDs BEFORE reading data to enable correct projection.
146        //
147        // Java's ReadConf determines field ID strategy:
148        // - Branch 1: hasIds(fileSchema) → trust embedded field IDs, use pruneColumns()
149        // - Branch 2: nameMapping present → applyNameMapping(), then pruneColumns()
150        // - Branch 3: fallback → addFallbackIds(), then pruneColumnsFallback()
151        let arrow_metadata = if missing_field_ids {
152            // Parquet file lacks field IDs - must assign them before reading
153            let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
154                // Branch 2: Apply name mapping to assign correct Iceberg field IDs
155                // Per spec rule #2: "Use schema.name-mapping.default metadata to map field id
156                // to columns without field id"
157                // Corresponds to Java's ParquetSchemaUtil.applyNameMapping()
158                apply_name_mapping_to_arrow_schema(
159                    Arc::clone(arrow_metadata.schema()),
160                    name_mapping,
161                )?
162            } else {
163                // Branch 3: No name mapping - use position-based fallback IDs
164                // Corresponds to Java's ParquetSchemaUtil.addFallbackIds()
165                add_fallback_field_ids_to_arrow_schema(arrow_metadata.schema())
166            };
167
168            let options = ArrowReaderOptions::new().with_schema(arrow_schema);
169            ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
170                |e| {
171                    Error::new(
172                        ErrorKind::Unexpected,
173                        "Failed to create ArrowReaderMetadata with field ID schema",
174                    )
175                    .with_source(e)
176                },
177            )?
178        } else {
179            // Branch 1: File has embedded field IDs - trust them
180            arrow_metadata
181        };
182
183        // Coerce INT96 timestamp columns to the resolution specified by the Iceberg schema.
184        // This must happen before building the stream reader to avoid i64 overflow in arrow-rs.
185        let arrow_metadata = if let Some(coerced_schema) =
186            coerce_int96_timestamps(arrow_metadata.schema(), &task.schema)
187        {
188            let options = ArrowReaderOptions::new().with_schema(Arc::clone(&coerced_schema));
189            ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
190                |e| {
191                    Error::new(
192                        ErrorKind::Unexpected,
193                        format!(
194                            "Failed to create ArrowReaderMetadata with INT96-coerced schema: {coerced_schema}"
195                        ),
196                    )
197                    .with_source(e)
198                },
199            )?
200        } else {
201            arrow_metadata
202        };
203
204        // Build the stream reader, reusing the already-opened file reader
205        let mut record_batch_stream_builder =
206            ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_file_reader, arrow_metadata);
207
208        // Filter out metadata fields for Parquet projection (they don't exist in files)
209        let project_field_ids_without_metadata: Vec<i32> = task
210            .project_field_ids
211            .iter()
212            .filter(|&&id| !is_metadata_field(id))
213            .copied()
214            .collect();
215
216        // Create projection mask based on field IDs
217        // - If file has embedded IDs: field-ID-based projection (missing_field_ids=false)
218        // - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match)
219        // - If fallback IDs: position-based projection (missing_field_ids=true)
220        let projection_mask = ArrowReader::get_arrow_projection_mask(
221            &project_field_ids_without_metadata,
222            &task.schema,
223            record_batch_stream_builder.parquet_schema(),
224            record_batch_stream_builder.schema(),
225            missing_field_ids, // Whether to use position-based (true) or field-ID-based (false) projection
226        )?;
227
228        record_batch_stream_builder =
229            record_batch_stream_builder.with_projection(projection_mask.clone());
230
231        // RecordBatchTransformer performs any transformations required on the RecordBatches
232        // that come back from the file, such as type promotion, default column insertion,
233        // column re-ordering, partition constants, and virtual field addition (like _file)
234        let mut record_batch_transformer_builder =
235            RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
236
237        // Add the _file metadata column if it's in the projected fields
238        if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
239            let file_datum = Datum::string(task.data_file_path.clone());
240            record_batch_transformer_builder =
241                record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum);
242        }
243
244        if let (Some(partition_spec), Some(partition_data)) =
245            (task.partition_spec.clone(), task.partition.clone())
246        {
247            record_batch_transformer_builder =
248                record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
249        }
250
251        let mut record_batch_transformer = record_batch_transformer_builder.build();
252
253        if let Some(batch_size) = self.batch_size {
254            record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
255        }
256
257        let delete_filter = delete_filter_rx.await.unwrap()?;
258        let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
259
260        // In addition to the optional predicate supplied in the `FileScanTask`,
261        // we also have an optional predicate resulting from equality delete files.
262        // If both are present, we logical-AND them together to form a single filter
263        // predicate that we can pass to the `RecordBatchStreamBuilder`.
264        let final_predicate = match (&task.predicate, delete_predicate) {
265            (None, None) => None,
266            (Some(predicate), None) => Some(predicate.clone()),
267            (None, Some(ref predicate)) => Some(predicate.clone()),
268            (Some(filter_predicate), Some(delete_predicate)) => {
269                Some(filter_predicate.clone().and(delete_predicate))
270            }
271        };
272
273        // There are three possible sources for potential lists of selected RowGroup indices,
274        // and two for `RowSelection`s.
275        // Selected RowGroup index lists can come from three sources:
276        //   * When task.start and task.length specify a byte range (file splitting);
277        //   * When there are equality delete files that are applicable;
278        //   * When there is a scan predicate and row_group_filtering_enabled = true.
279        // `RowSelection`s can be created in either or both of the following cases:
280        //   * When there are positional delete files that are applicable;
281        //   * When there is a scan predicate and row_selection_enabled = true
282        // Note that row group filtering from predicates only happens when
283        // there is a scan predicate AND row_group_filtering_enabled = true,
284        // but we perform row selection filtering if there are applicable
285        // equality delete files OR (there is a scan predicate AND row_selection_enabled),
286        // since the only implemented method of applying positional deletes is
287        // by using a `RowSelection`.
288        let mut selected_row_group_indices = None;
289        let mut row_selection = None;
290
291        // Filter row groups based on byte range from task.start and task.length.
292        // If both start and length are 0, read the entire file (backwards compatibility).
293        if task.start != 0 || task.length != 0 {
294            let byte_range_filtered_row_groups = ArrowReader::filter_row_groups_by_byte_range(
295                record_batch_stream_builder.metadata(),
296                task.start,
297                task.length,
298            )?;
299            selected_row_group_indices = Some(byte_range_filtered_row_groups);
300        }
301
302        if let Some(predicate) = final_predicate {
303            let (iceberg_field_ids, field_id_map) = ArrowReader::build_field_id_set_and_map(
304                record_batch_stream_builder.parquet_schema(),
305                &predicate,
306            )?;
307
308            let row_filter = ArrowReader::get_row_filter(
309                &predicate,
310                record_batch_stream_builder.parquet_schema(),
311                &iceberg_field_ids,
312                &field_id_map,
313            )?;
314            record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
315
316            if self.row_group_filtering_enabled {
317                let predicate_filtered_row_groups = ArrowReader::get_selected_row_group_indices(
318                    &predicate,
319                    record_batch_stream_builder.metadata(),
320                    &field_id_map,
321                    &task.schema,
322                )?;
323
324                // Merge predicate-based filtering with byte range filtering (if present)
325                // by taking the intersection of both filters
326                selected_row_group_indices = match selected_row_group_indices {
327                    Some(byte_range_filtered) => {
328                        // Keep only row groups that are in both filters
329                        let intersection: Vec<usize> = byte_range_filtered
330                            .into_iter()
331                            .filter(|idx| predicate_filtered_row_groups.contains(idx))
332                            .collect();
333                        Some(intersection)
334                    }
335                    None => Some(predicate_filtered_row_groups),
336                };
337            }
338
339            if self.row_selection_enabled {
340                row_selection = Some(ArrowReader::get_row_selection_for_filter_predicate(
341                    &predicate,
342                    record_batch_stream_builder.metadata(),
343                    &selected_row_group_indices,
344                    &field_id_map,
345                    &task.schema,
346                )?);
347            }
348        }
349
350        let positional_delete_indexes = delete_filter.get_delete_vector(&task);
351
352        if let Some(positional_delete_indexes) = positional_delete_indexes {
353            let delete_row_selection = {
354                let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
355
356                ArrowReader::build_deletes_row_selection(
357                    record_batch_stream_builder.metadata().row_groups(),
358                    &selected_row_group_indices,
359                    &positional_delete_indexes,
360                )
361            }?;
362
363            // merge the row selection from the delete files with the row selection
364            // from the filter predicate, if there is one from the filter predicate
365            row_selection = match row_selection {
366                None => Some(delete_row_selection),
367                Some(filter_row_selection) => {
368                    Some(filter_row_selection.intersection(&delete_row_selection))
369                }
370            };
371        }
372
373        if let Some(row_selection) = row_selection {
374            record_batch_stream_builder =
375                record_batch_stream_builder.with_row_selection(row_selection);
376        }
377
378        if let Some(selected_row_group_indices) = selected_row_group_indices {
379            record_batch_stream_builder =
380                record_batch_stream_builder.with_row_groups(selected_row_group_indices);
381        }
382
383        // Build the batch stream and send all the RecordBatches that it generates
384        // to the requester.
385        let record_batch_stream =
386            record_batch_stream_builder
387                .build()?
388                .map(move |batch| match batch {
389                    Ok(batch) => {
390                        // Process the record batch (type promotion, column reordering, virtual fields, etc.)
391                        record_batch_transformer.process_record_batch(batch)
392                    }
393                    Err(err) => Err(err.into()),
394                });
395
396        Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
397    }
398}
399
400impl ArrowReader {
401    /// Opens a Parquet file and loads its metadata, wrapping the reader with
402    /// [`CountingFileRead`] so all I/O is accumulated into `bytes_read`.
403    pub(crate) async fn open_parquet_file(
404        data_file_path: &str,
405        file_io: &FileIO,
406        file_size_in_bytes: u64,
407        parquet_read_options: ParquetReadOptions,
408        bytes_read: &Arc<AtomicU64>,
409    ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
410        let parquet_file = file_io.new_input(data_file_path)?;
411        let counting_reader =
412            CountingFileRead::new(parquet_file.reader().await?, Arc::clone(bytes_read));
413        Self::build_parquet_reader(
414            Box::new(counting_reader),
415            file_size_in_bytes,
416            parquet_read_options,
417        )
418        .await
419    }
420
421    async fn build_parquet_reader(
422        parquet_reader: Box<dyn FileRead>,
423        file_size_in_bytes: u64,
424        parquet_read_options: ParquetReadOptions,
425    ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
426        let mut reader = ArrowFileReader::new(
427            FileMetadata {
428                size: file_size_in_bytes,
429            },
430            parquet_reader,
431        )
432        .with_parquet_read_options(parquet_read_options);
433
434        let arrow_metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
435            .await
436            .map_err(|e| {
437                Error::new(ErrorKind::Unexpected, "Failed to load Parquet metadata").with_source(e)
438            })?;
439
440        Ok((reader, arrow_metadata))
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use std::collections::HashMap;
447    use std::fs::File;
448    use std::sync::Arc;
449
450    use arrow_array::cast::AsArray;
451    use arrow_array::{Array, ArrayRef, RecordBatch};
452    use arrow_schema::{DataType, Field, Schema as ArrowSchema};
453    use futures::TryStreamExt;
454    use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
455    use parquet::basic::Compression;
456    use parquet::file::properties::WriterProperties;
457    use tempfile::TempDir;
458
459    use crate::Runtime;
460    use crate::arrow::ArrowReaderBuilder;
461    use crate::io::FileIO;
462    use crate::scan::{FileScanTask, FileScanTaskStream};
463    use crate::spec::{DataFileFormat, NestedField, PrimitiveType, Schema, SchemaRef, Type};
464
465    // INT96 encoding: [nanos_low_u32, nanos_high_u32, julian_day_u32]
466    // Julian day 2_440_588 = Unix epoch (1970-01-01)
467    const UNIX_EPOCH_JULIAN: i64 = 2_440_588;
468    const MICROS_PER_DAY: i64 = 86_400_000_000;
469    // Noon on 3333-01-01 (Julian day 2_953_529) — outside the i64 nanosecond range (~1677-2262).
470    const INT96_TEST_NANOS_WITHIN_DAY: u64 = 43_200_000_000_000;
471    const INT96_TEST_JULIAN_DAY: u32 = 2_953_529;
472
473    fn make_int96_test_value() -> (parquet::data_type::Int96, i64) {
474        let mut val = parquet::data_type::Int96::new();
475        val.set_data(
476            (INT96_TEST_NANOS_WITHIN_DAY & 0xFFFFFFFF) as u32,
477            (INT96_TEST_NANOS_WITHIN_DAY >> 32) as u32,
478            INT96_TEST_JULIAN_DAY,
479        );
480        let expected_micros = (INT96_TEST_JULIAN_DAY as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY
481            + (INT96_TEST_NANOS_WITHIN_DAY / 1_000) as i64;
482        (val, expected_micros)
483    }
484
485    async fn read_int96_batches(
486        file_path: &str,
487        schema: SchemaRef,
488        project_field_ids: Vec<i32>,
489    ) -> Vec<RecordBatch> {
490        let file_io = FileIO::new_with_fs();
491        let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build();
492
493        let file_size = std::fs::metadata(file_path).unwrap().len();
494        let task = FileScanTask::builder()
495            .with_file_size_in_bytes(file_size)
496            .with_start(0)
497            .with_length(file_size)
498            .with_data_file_path(file_path.to_string())
499            .with_data_file_format(DataFileFormat::Parquet)
500            .with_schema(schema)
501            .with_project_field_ids(project_field_ids)
502            .with_case_sensitive(false)
503            .build();
504
505        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
506        reader
507            .read(tasks)
508            .unwrap()
509            .stream()
510            .try_collect()
511            .await
512            .unwrap()
513    }
514
515    // ArrowWriter cannot write INT96, so we use SerializedFileWriter directly.
516    fn write_int96_parquet_file(
517        table_location: &str,
518        filename: &str,
519        with_field_ids: bool,
520    ) -> (String, Vec<i64>) {
521        use parquet::basic::{Repetition, Type as PhysicalType};
522        use parquet::data_type::{Int32Type, Int96, Int96Type};
523        use parquet::file::writer::SerializedFileWriter;
524        use parquet::schema::types::Type as SchemaType;
525
526        let file_path = format!("{table_location}/{filename}");
527
528        let mut ts_builder = SchemaType::primitive_type_builder("ts", PhysicalType::INT96)
529            .with_repetition(Repetition::OPTIONAL);
530        let mut id_builder = SchemaType::primitive_type_builder("id", PhysicalType::INT32)
531            .with_repetition(Repetition::REQUIRED);
532
533        if with_field_ids {
534            ts_builder = ts_builder.with_id(Some(1));
535            id_builder = id_builder.with_id(Some(2));
536        }
537
538        let schema = SchemaType::group_type_builder("schema")
539            .with_fields(vec![
540                Arc::new(ts_builder.build().unwrap()),
541                Arc::new(id_builder.build().unwrap()),
542            ])
543            .build()
544            .unwrap();
545
546        // Dates outside the i64 nanosecond range (~1677-2262) overflow without coercion.
547        const NOON_NANOS: u64 = INT96_TEST_NANOS_WITHIN_DAY;
548        const JULIAN_3333: u32 = INT96_TEST_JULIAN_DAY;
549        const JULIAN_2100: u32 = 2_488_070;
550
551        let test_data: Vec<(u32, u32, u32, i64)> = vec![
552            // 3333-01-01 00:00:00
553            (
554                0,
555                0,
556                JULIAN_3333,
557                (JULIAN_3333 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY,
558            ),
559            // 3333-01-01 12:00:00
560            (
561                (NOON_NANOS & 0xFFFFFFFF) as u32,
562                (NOON_NANOS >> 32) as u32,
563                JULIAN_3333,
564                (JULIAN_3333 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY
565                    + (NOON_NANOS / 1_000) as i64,
566            ),
567            // 2100-01-01 00:00:00
568            (
569                0,
570                0,
571                JULIAN_2100,
572                (JULIAN_2100 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY,
573            ),
574        ];
575
576        let int96_values: Vec<Int96> = test_data
577            .iter()
578            .map(|(lo, hi, day, _)| {
579                let mut v = Int96::new();
580                v.set_data(*lo, *hi, *day);
581                v
582            })
583            .collect();
584
585        let id_values: Vec<i32> = (0..test_data.len() as i32).collect();
586        let expected_micros: Vec<i64> = test_data.iter().map(|(_, _, _, m)| *m).collect();
587
588        let file = File::create(&file_path).unwrap();
589        let mut writer =
590            SerializedFileWriter::new(file, Arc::new(schema), Default::default()).unwrap();
591
592        let mut row_group = writer.next_row_group().unwrap();
593        {
594            // def=1: ts is OPTIONAL and present. No repetition levels (top-level columns).
595            let mut col = row_group.next_column().unwrap().unwrap();
596            col.typed::<Int96Type>()
597                .write_batch(&int96_values, Some(&vec![1; test_data.len()]), None)
598                .unwrap();
599            col.close().unwrap();
600        }
601        {
602            let mut col = row_group.next_column().unwrap().unwrap();
603            col.typed::<Int32Type>()
604                .write_batch(&id_values, None, None)
605                .unwrap();
606            col.close().unwrap();
607        }
608        row_group.close().unwrap();
609        writer.close().unwrap();
610
611        (file_path, expected_micros)
612    }
613
614    async fn assert_int96_read_matches(
615        file_path: &str,
616        schema: SchemaRef,
617        project_field_ids: Vec<i32>,
618        expected_micros: &[i64],
619    ) {
620        use arrow_array::TimestampMicrosecondArray;
621
622        let batches = read_int96_batches(file_path, schema, project_field_ids).await;
623
624        assert_eq!(batches.len(), 1);
625        let ts_array = batches[0]
626            .column(0)
627            .as_any()
628            .downcast_ref::<TimestampMicrosecondArray>()
629            .expect("Expected TimestampMicrosecondArray");
630
631        for (i, expected) in expected_micros.iter().enumerate() {
632            assert_eq!(
633                ts_array.value(i),
634                *expected,
635                "Row {i}: got {}, expected {expected}",
636                ts_array.value(i)
637            );
638        }
639    }
640
641    /// Test that concurrency=1 reads all files correctly and in deterministic order.
642    /// This verifies the fast-path optimization for single concurrency.
643    #[tokio::test]
644    async fn test_read_with_concurrency_one() {
645        use arrow_array::Int32Array;
646
647        let schema = Arc::new(
648            Schema::builder()
649                .with_schema_id(1)
650                .with_fields(vec![
651                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
652                    NestedField::required(2, "file_num", Type::Primitive(PrimitiveType::Int))
653                        .into(),
654                ])
655                .build()
656                .unwrap(),
657        );
658
659        let arrow_schema = Arc::new(ArrowSchema::new(vec![
660            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
661                PARQUET_FIELD_ID_META_KEY.to_string(),
662                "1".to_string(),
663            )])),
664            Field::new("file_num", DataType::Int32, false).with_metadata(HashMap::from([(
665                PARQUET_FIELD_ID_META_KEY.to_string(),
666                "2".to_string(),
667            )])),
668        ]));
669
670        let tmp_dir = TempDir::new().unwrap();
671        let table_location = tmp_dir.path().to_str().unwrap().to_string();
672        let file_io = FileIO::new_with_fs();
673
674        // Create 3 parquet files with different data
675        let props = WriterProperties::builder()
676            .set_compression(Compression::SNAPPY)
677            .build();
678
679        for file_num in 0..3 {
680            let id_data = Arc::new(Int32Array::from_iter_values(
681                file_num * 10..(file_num + 1) * 10,
682            )) as ArrayRef;
683            let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10])) as ArrayRef;
684
685            let to_write =
686                RecordBatch::try_new(arrow_schema.clone(), vec![id_data, file_num_data]).unwrap();
687
688            let file = File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap();
689            let mut writer =
690                ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
691            writer.write(&to_write).expect("Writing batch");
692            writer.close().unwrap();
693        }
694
695        // Read with concurrency=1 (fast-path)
696        let reader = ArrowReaderBuilder::new(file_io, Runtime::current())
697            .with_data_file_concurrency_limit(1)
698            .build();
699
700        // Create tasks in a specific order: file_0, file_1, file_2
701        let tasks = vec![
702            Ok(FileScanTask::builder()
703                .with_file_size_in_bytes(
704                    std::fs::metadata(format!("{table_location}/file_0.parquet"))
705                        .unwrap()
706                        .len(),
707                )
708                .with_start(0)
709                .with_length(0)
710                .with_data_file_path(format!("{table_location}/file_0.parquet"))
711                .with_data_file_format(DataFileFormat::Parquet)
712                .with_schema(schema.clone())
713                .with_project_field_ids(vec![1, 2])
714                .with_case_sensitive(false)
715                .build()),
716            Ok(FileScanTask::builder()
717                .with_file_size_in_bytes(
718                    std::fs::metadata(format!("{table_location}/file_1.parquet"))
719                        .unwrap()
720                        .len(),
721                )
722                .with_start(0)
723                .with_length(0)
724                .with_data_file_path(format!("{table_location}/file_1.parquet"))
725                .with_data_file_format(DataFileFormat::Parquet)
726                .with_schema(schema.clone())
727                .with_project_field_ids(vec![1, 2])
728                .with_case_sensitive(false)
729                .build()),
730            Ok(FileScanTask::builder()
731                .with_file_size_in_bytes(
732                    std::fs::metadata(format!("{table_location}/file_2.parquet"))
733                        .unwrap()
734                        .len(),
735                )
736                .with_start(0)
737                .with_length(0)
738                .with_data_file_path(format!("{table_location}/file_2.parquet"))
739                .with_data_file_format(DataFileFormat::Parquet)
740                .with_schema(schema.clone())
741                .with_project_field_ids(vec![1, 2])
742                .with_case_sensitive(false)
743                .build()),
744        ];
745
746        let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream;
747
748        let result = reader
749            .read(tasks_stream)
750            .unwrap()
751            .stream()
752            .try_collect::<Vec<RecordBatch>>()
753            .await
754            .unwrap();
755
756        // Verify we got all 30 rows (10 from each file)
757        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
758        assert_eq!(total_rows, 30, "Should have 30 total rows");
759
760        // Collect all ids and file_nums to verify data
761        let mut all_ids = Vec::new();
762        let mut all_file_nums = Vec::new();
763
764        for batch in &result {
765            let id_col = batch
766                .column(0)
767                .as_primitive::<arrow_array::types::Int32Type>();
768            let file_num_col = batch
769                .column(1)
770                .as_primitive::<arrow_array::types::Int32Type>();
771
772            for i in 0..batch.num_rows() {
773                all_ids.push(id_col.value(i));
774                all_file_nums.push(file_num_col.value(i));
775            }
776        }
777
778        assert_eq!(all_ids.len(), 30);
779        assert_eq!(all_file_nums.len(), 30);
780
781        // With concurrency=1 and sequential processing, files should be processed in order
782        // file_0: ids 0-9, file_num=0
783        // file_1: ids 10-19, file_num=1
784        // file_2: ids 20-29, file_num=2
785        for i in 0..10 {
786            assert_eq!(all_file_nums[i], 0, "First 10 rows should be from file_0");
787            assert_eq!(all_ids[i], i as i32, "IDs should be 0-9");
788        }
789        for i in 10..20 {
790            assert_eq!(all_file_nums[i], 1, "Next 10 rows should be from file_1");
791            assert_eq!(all_ids[i], i as i32, "IDs should be 10-19");
792        }
793        for i in 20..30 {
794            assert_eq!(all_file_nums[i], 2, "Last 10 rows should be from file_2");
795            assert_eq!(all_ids[i], i as i32, "IDs should be 20-29");
796        }
797    }
798
799    #[tokio::test]
800    async fn test_read_int96_timestamps_with_field_ids() {
801        let schema = Arc::new(
802            Schema::builder()
803                .with_schema_id(1)
804                .with_fields(vec![
805                    NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::Timestamp))
806                        .into(),
807                    NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(),
808                ])
809                .build()
810                .unwrap(),
811        );
812
813        let tmp_dir = TempDir::new().unwrap();
814        let table_location = tmp_dir.path().to_str().unwrap().to_string();
815        let (file_path, expected_micros) =
816            write_int96_parquet_file(&table_location, "with_ids.parquet", true);
817
818        assert_int96_read_matches(&file_path, schema, vec![1, 2], &expected_micros).await;
819    }
820
821    #[tokio::test]
822    async fn test_read_int96_timestamps_without_field_ids() {
823        let schema = Arc::new(
824            Schema::builder()
825                .with_schema_id(1)
826                .with_fields(vec![
827                    NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::Timestamp))
828                        .into(),
829                    NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(),
830                ])
831                .build()
832                .unwrap(),
833        );
834
835        let tmp_dir = TempDir::new().unwrap();
836        let table_location = tmp_dir.path().to_str().unwrap().to_string();
837        let (file_path, expected_micros) =
838            write_int96_parquet_file(&table_location, "no_ids.parquet", false);
839
840        assert_int96_read_matches(&file_path, schema, vec![1, 2], &expected_micros).await;
841    }
842
843    #[tokio::test]
844    async fn test_read_int96_timestamps_in_struct() {
845        use arrow_array::{StructArray, TimestampMicrosecondArray};
846        use parquet::basic::{Repetition, Type as PhysicalType};
847        use parquet::data_type::Int96Type;
848        use parquet::file::writer::SerializedFileWriter;
849        use parquet::schema::types::Type as SchemaType;
850
851        let tmp_dir = TempDir::new().unwrap();
852        let table_location = tmp_dir.path().to_str().unwrap().to_string();
853        let file_path = format!("{table_location}/struct_int96.parquet");
854
855        let ts_type = SchemaType::primitive_type_builder("ts", PhysicalType::INT96)
856            .with_repetition(Repetition::OPTIONAL)
857            .with_id(Some(2))
858            .build()
859            .unwrap();
860
861        let struct_type = SchemaType::group_type_builder("data")
862            .with_repetition(Repetition::REQUIRED)
863            .with_id(Some(1))
864            .with_fields(vec![Arc::new(ts_type)])
865            .build()
866            .unwrap();
867
868        let parquet_schema = SchemaType::group_type_builder("schema")
869            .with_fields(vec![Arc::new(struct_type)])
870            .build()
871            .unwrap();
872
873        let (int96_val, expected_micros) = make_int96_test_value();
874
875        let file = File::create(&file_path).unwrap();
876        let mut writer =
877            SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
878
879        // def=1: struct is REQUIRED so no level, ts is OPTIONAL and present (1).
880        // No repetition levels needed (no repeated groups).
881        let mut row_group = writer.next_row_group().unwrap();
882        {
883            let mut col = row_group.next_column().unwrap().unwrap();
884            col.typed::<Int96Type>()
885                .write_batch(&[int96_val], Some(&[1]), None)
886                .unwrap();
887            col.close().unwrap();
888        }
889        row_group.close().unwrap();
890        writer.close().unwrap();
891
892        let iceberg_schema = Arc::new(
893            Schema::builder()
894                .with_schema_id(1)
895                .with_fields(vec![
896                    NestedField::required(
897                        1,
898                        "data",
899                        Type::Struct(crate::spec::StructType::new(vec![
900                            NestedField::optional(
901                                2,
902                                "ts",
903                                Type::Primitive(PrimitiveType::Timestamp),
904                            )
905                            .into(),
906                        ])),
907                    )
908                    .into(),
909                ])
910                .build()
911                .unwrap(),
912        );
913
914        let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
915
916        assert_eq!(batches.len(), 1);
917        let struct_array = batches[0]
918            .column(0)
919            .as_any()
920            .downcast_ref::<StructArray>()
921            .expect("Expected StructArray");
922        let ts_array = struct_array
923            .column(0)
924            .as_any()
925            .downcast_ref::<TimestampMicrosecondArray>()
926            .expect("Expected TimestampMicrosecondArray inside struct");
927
928        assert_eq!(
929            ts_array.value(0),
930            expected_micros,
931            "INT96 in struct: got {}, expected {expected_micros}",
932            ts_array.value(0)
933        );
934    }
935
936    #[tokio::test]
937    async fn test_read_int96_timestamps_in_list() {
938        use arrow_array::{ListArray, TimestampMicrosecondArray};
939        use parquet::basic::{Repetition, Type as PhysicalType};
940        use parquet::data_type::Int96Type;
941        use parquet::file::writer::SerializedFileWriter;
942        use parquet::schema::types::Type as SchemaType;
943
944        let tmp_dir = TempDir::new().unwrap();
945        let table_location = tmp_dir.path().to_str().unwrap().to_string();
946        let file_path = format!("{table_location}/list_int96.parquet");
947
948        // 3-level LIST encoding:
949        //   optional group timestamps (LIST) {
950        //     repeated group list {
951        //       optional int96 element;
952        //     }
953        //   }
954        let element_type = SchemaType::primitive_type_builder("element", PhysicalType::INT96)
955            .with_repetition(Repetition::OPTIONAL)
956            .with_id(Some(2))
957            .build()
958            .unwrap();
959
960        let list_group = SchemaType::group_type_builder("list")
961            .with_repetition(Repetition::REPEATED)
962            .with_fields(vec![Arc::new(element_type)])
963            .build()
964            .unwrap();
965
966        let list_type = SchemaType::group_type_builder("timestamps")
967            .with_repetition(Repetition::OPTIONAL)
968            .with_id(Some(1))
969            .with_logical_type(Some(parquet::basic::LogicalType::List))
970            .with_fields(vec![Arc::new(list_group)])
971            .build()
972            .unwrap();
973
974        let parquet_schema = SchemaType::group_type_builder("schema")
975            .with_fields(vec![Arc::new(list_type)])
976            .build()
977            .unwrap();
978
979        let (int96_val, expected_micros) = make_int96_test_value();
980
981        let file = File::create(&file_path).unwrap();
982        let mut writer =
983            SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
984
985        // Write a single row with a list containing one INT96 element.
986        // def=3: list present (1) + repeated group (2) + element present (3)
987        // rep=0: start of a new list
988        let mut row_group = writer.next_row_group().unwrap();
989        {
990            let mut col = row_group.next_column().unwrap().unwrap();
991            col.typed::<Int96Type>()
992                .write_batch(&[int96_val], Some(&[3]), Some(&[0]))
993                .unwrap();
994            col.close().unwrap();
995        }
996        row_group.close().unwrap();
997        writer.close().unwrap();
998
999        let iceberg_schema = Arc::new(
1000            Schema::builder()
1001                .with_schema_id(1)
1002                .with_fields(vec![
1003                    NestedField::optional(
1004                        1,
1005                        "timestamps",
1006                        Type::List(crate::spec::ListType {
1007                            element_field: NestedField::optional(
1008                                2,
1009                                "element",
1010                                Type::Primitive(PrimitiveType::Timestamp),
1011                            )
1012                            .into(),
1013                        }),
1014                    )
1015                    .into(),
1016                ])
1017                .build()
1018                .unwrap(),
1019        );
1020
1021        let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
1022
1023        assert_eq!(batches.len(), 1);
1024        let list_array = batches[0]
1025            .column(0)
1026            .as_any()
1027            .downcast_ref::<ListArray>()
1028            .expect("Expected ListArray");
1029        let ts_array = list_array
1030            .values()
1031            .as_any()
1032            .downcast_ref::<TimestampMicrosecondArray>()
1033            .expect("Expected TimestampMicrosecondArray inside list");
1034
1035        assert_eq!(
1036            ts_array.value(0),
1037            expected_micros,
1038            "INT96 in list: got {}, expected {expected_micros}",
1039            ts_array.value(0)
1040        );
1041    }
1042
1043    #[tokio::test]
1044    async fn test_read_int96_timestamps_in_map() {
1045        use arrow_array::{MapArray, TimestampMicrosecondArray};
1046        use parquet::basic::{Repetition, Type as PhysicalType};
1047        use parquet::data_type::{ByteArrayType, Int96Type};
1048        use parquet::file::writer::SerializedFileWriter;
1049        use parquet::schema::types::Type as SchemaType;
1050
1051        let tmp_dir = TempDir::new().unwrap();
1052        let table_location = tmp_dir.path().to_str().unwrap().to_string();
1053        let file_path = format!("{table_location}/map_int96.parquet");
1054
1055        // MAP encoding:
1056        //   optional group ts_map (MAP) {
1057        //     repeated group key_value {
1058        //       required binary key (UTF8);
1059        //       optional int96 value;
1060        //     }
1061        //   }
1062        let key_type = SchemaType::primitive_type_builder("key", PhysicalType::BYTE_ARRAY)
1063            .with_repetition(Repetition::REQUIRED)
1064            .with_logical_type(Some(parquet::basic::LogicalType::String))
1065            .with_id(Some(2))
1066            .build()
1067            .unwrap();
1068
1069        let value_type = SchemaType::primitive_type_builder("value", PhysicalType::INT96)
1070            .with_repetition(Repetition::OPTIONAL)
1071            .with_id(Some(3))
1072            .build()
1073            .unwrap();
1074
1075        let key_value_group = SchemaType::group_type_builder("key_value")
1076            .with_repetition(Repetition::REPEATED)
1077            .with_fields(vec![Arc::new(key_type), Arc::new(value_type)])
1078            .build()
1079            .unwrap();
1080
1081        let map_type = SchemaType::group_type_builder("ts_map")
1082            .with_repetition(Repetition::OPTIONAL)
1083            .with_id(Some(1))
1084            .with_logical_type(Some(parquet::basic::LogicalType::Map))
1085            .with_fields(vec![Arc::new(key_value_group)])
1086            .build()
1087            .unwrap();
1088
1089        let parquet_schema = SchemaType::group_type_builder("schema")
1090            .with_fields(vec![Arc::new(map_type)])
1091            .build()
1092            .unwrap();
1093
1094        let (int96_val, expected_micros) = make_int96_test_value();
1095
1096        let file = File::create(&file_path).unwrap();
1097        let mut writer =
1098            SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
1099
1100        // Write a single row with a map containing one key-value pair.
1101        // rep=0 for both columns: start of a new map.
1102        // key def=2: map present (1) + key_value entry present (2), key is REQUIRED.
1103        // value def=3: map present (1) + key_value entry present (2) + value present (3).
1104        let mut row_group = writer.next_row_group().unwrap();
1105        {
1106            let mut col = row_group.next_column().unwrap().unwrap();
1107            col.typed::<ByteArrayType>()
1108                .write_batch(
1109                    &[parquet::data_type::ByteArray::from("event_time")],
1110                    Some(&[2]),
1111                    Some(&[0]),
1112                )
1113                .unwrap();
1114            col.close().unwrap();
1115        }
1116        {
1117            let mut col = row_group.next_column().unwrap().unwrap();
1118            col.typed::<Int96Type>()
1119                .write_batch(&[int96_val], Some(&[3]), Some(&[0]))
1120                .unwrap();
1121            col.close().unwrap();
1122        }
1123        row_group.close().unwrap();
1124        writer.close().unwrap();
1125
1126        let iceberg_schema = Arc::new(
1127            Schema::builder()
1128                .with_schema_id(1)
1129                .with_fields(vec![
1130                    NestedField::optional(
1131                        1,
1132                        "ts_map",
1133                        Type::Map(crate::spec::MapType {
1134                            key_field: NestedField::required(
1135                                2,
1136                                "key",
1137                                Type::Primitive(PrimitiveType::String),
1138                            )
1139                            .into(),
1140                            value_field: NestedField::optional(
1141                                3,
1142                                "value",
1143                                Type::Primitive(PrimitiveType::Timestamp),
1144                            )
1145                            .into(),
1146                        }),
1147                    )
1148                    .into(),
1149                ])
1150                .build()
1151                .unwrap(),
1152        );
1153
1154        let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
1155
1156        assert_eq!(batches.len(), 1);
1157        let map_array = batches[0]
1158            .column(0)
1159            .as_any()
1160            .downcast_ref::<MapArray>()
1161            .expect("Expected MapArray");
1162        let ts_array = map_array
1163            .values()
1164            .as_any()
1165            .downcast_ref::<TimestampMicrosecondArray>()
1166            .expect("Expected TimestampMicrosecondArray as map values");
1167
1168        assert_eq!(
1169            ts_array.value(0),
1170            expected_micros,
1171            "INT96 in map: got {}, expected {expected_micros}",
1172            ts_array.value(0)
1173        );
1174    }
1175}