iceberg/arrow/reader/
pipeline.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The main `ArrowReader` pipeline: reading a stream of `FileScanTask`s,
19//! opening Parquet files and resolving schemas, then wiring projection,
20//! predicates, row-group / row selection, and delete handling into a stream
21//! of transformed Arrow `RecordBatch`es.
22
23use std::sync::Arc;
24use std::sync::atomic::AtomicU64;
25
26use futures::{StreamExt, TryStreamExt};
27use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
28use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder};
29
30use super::{
31    ArrowFileReader, ArrowReader, ParquetReadOptions, add_fallback_field_ids_to_arrow_schema,
32    apply_name_mapping_to_arrow_schema,
33};
34use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
35use crate::arrow::int96::coerce_int96_timestamps;
36use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
37use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics, ScanResult};
38use crate::error::Result;
39use crate::io::{FileIO, FileMetadata, FileRead};
40use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
41use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
42use crate::spec::Datum;
43use crate::{Error, ErrorKind};
44
45impl ArrowReader {
46    /// Take a stream of FileScanTasks and reads all the files.
47    /// Returns a [`ScanResult`] containing the record batch stream and scan metrics.
48    pub fn read(self, tasks: FileScanTaskStream) -> Result<ScanResult> {
49        let concurrency_limit_data_files = self.concurrency_limit_data_files;
50        let scan_metrics = ScanMetrics::new();
51
52        let task_reader = FileScanTaskReader {
53            batch_size: self.batch_size,
54            file_io: self.file_io,
55            delete_file_loader: self
56                .delete_file_loader
57                .with_scan_metrics(scan_metrics.clone()),
58            row_group_filtering_enabled: self.row_group_filtering_enabled,
59            row_selection_enabled: self.row_selection_enabled,
60            parquet_read_options: self.parquet_read_options,
61            scan_metrics: scan_metrics.clone(),
62        };
63
64        // Fast-path for single concurrency to avoid overhead of try_flatten_unordered
65        let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 {
66            Box::pin(
67                tasks
68                    .and_then(move |task| task_reader.clone().process(task))
69                    .map_err(|err| {
70                        Error::new(ErrorKind::Unexpected, "file scan task generate failed")
71                            .with_source(err)
72                    })
73                    .try_flatten(),
74            )
75        } else {
76            Box::pin(
77                tasks
78                    .map_ok(move |task| task_reader.clone().process(task))
79                    .map_err(|err| {
80                        Error::new(ErrorKind::Unexpected, "file scan task generate failed")
81                            .with_source(err)
82                    })
83                    .try_buffer_unordered(concurrency_limit_data_files)
84                    .try_flatten_unordered(concurrency_limit_data_files),
85            )
86        };
87
88        Ok(ScanResult::new(stream, scan_metrics))
89    }
90}
91
92/// Per-scan state for processing [`FileScanTask`]s. Created once per
93/// [`ArrowReader::read`] call and cloned per task.
94#[derive(Clone)]
95struct FileScanTaskReader {
96    batch_size: Option<usize>,
97    file_io: FileIO,
98    delete_file_loader: CachingDeleteFileLoader,
99    row_group_filtering_enabled: bool,
100    row_selection_enabled: bool,
101    parquet_read_options: ParquetReadOptions,
102    scan_metrics: ScanMetrics,
103}
104
105impl FileScanTaskReader {
106    async fn process(self, task: FileScanTask) -> Result<ArrowRecordBatchStream> {
107        let should_load_page_index =
108            (self.row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
109        let mut parquet_read_options = self.parquet_read_options;
110        parquet_read_options.preload_page_index = should_load_page_index;
111
112        let delete_filter_rx = self
113            .delete_file_loader
114            .load_deletes(&task.deletes, Arc::clone(&task.schema));
115
116        // Open the Parquet file once, loading its metadata
117        let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file(
118            &task.data_file_path,
119            &self.file_io,
120            task.file_size_in_bytes,
121            parquet_read_options,
122            self.scan_metrics.bytes_read_counter(),
123        )
124        .await?;
125
126        // Check if Parquet file has embedded field IDs
127        // Corresponds to Java's ParquetSchemaUtil.hasIds()
128        // Reference: parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java:118
129        let missing_field_ids = arrow_metadata
130            .schema()
131            .fields()
132            .iter()
133            .next()
134            .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
135
136        // Three-branch schema resolution strategy matching Java's ReadConf constructor
137        //
138        // Per Iceberg spec Column Projection rules:
139        // "Columns in Iceberg data files are selected by field id. The table schema's column
140        //  names and order may change after a data file is written, and projection must be done
141        //  using field ids."
142        // https://iceberg.apache.org/spec/#column-projection
143        //
144        // When Parquet files lack field IDs (e.g., Hive/Spark migrations via add_files),
145        // we must assign field IDs BEFORE reading data to enable correct projection.
146        //
147        // Java's ReadConf determines field ID strategy:
148        // - Branch 1: hasIds(fileSchema) → trust embedded field IDs, use pruneColumns()
149        // - Branch 2: nameMapping present → applyNameMapping(), then pruneColumns()
150        // - Branch 3: fallback → addFallbackIds(), then pruneColumnsFallback()
151        let arrow_metadata = if missing_field_ids {
152            // Parquet file lacks field IDs - must assign them before reading
153            let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
154                // Branch 2: Apply name mapping to assign correct Iceberg field IDs
155                // Per spec rule #2: "Use schema.name-mapping.default metadata to map field id
156                // to columns without field id"
157                // Corresponds to Java's ParquetSchemaUtil.applyNameMapping()
158                apply_name_mapping_to_arrow_schema(
159                    Arc::clone(arrow_metadata.schema()),
160                    name_mapping,
161                )?
162            } else {
163                // Branch 3: No name mapping - use position-based fallback IDs
164                // Corresponds to Java's ParquetSchemaUtil.addFallbackIds()
165                add_fallback_field_ids_to_arrow_schema(arrow_metadata.schema())
166            };
167
168            let options = ArrowReaderOptions::new().with_schema(arrow_schema);
169            ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
170                |e| {
171                    Error::new(
172                        ErrorKind::Unexpected,
173                        "Failed to create ArrowReaderMetadata with field ID schema",
174                    )
175                    .with_source(e)
176                },
177            )?
178        } else {
179            // Branch 1: File has embedded field IDs - trust them
180            arrow_metadata
181        };
182
183        // Coerce INT96 timestamp columns to the resolution specified by the Iceberg schema.
184        // This must happen before building the stream reader to avoid i64 overflow in arrow-rs.
185        let arrow_metadata = if let Some(coerced_schema) =
186            coerce_int96_timestamps(arrow_metadata.schema(), &task.schema)
187        {
188            let options = ArrowReaderOptions::new().with_schema(Arc::clone(&coerced_schema));
189            ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
190                |e| {
191                    Error::new(
192                        ErrorKind::Unexpected,
193                        format!(
194                            "Failed to create ArrowReaderMetadata with INT96-coerced schema: {coerced_schema}"
195                        ),
196                    )
197                    .with_source(e)
198                },
199            )?
200        } else {
201            arrow_metadata
202        };
203
204        // Build the stream reader, reusing the already-opened file reader
205        let mut record_batch_stream_builder =
206            ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_file_reader, arrow_metadata);
207
208        // Filter out metadata fields for Parquet projection (they don't exist in files)
209        let project_field_ids_without_metadata: Vec<i32> = task
210            .project_field_ids
211            .iter()
212            .filter(|&&id| !is_metadata_field(id))
213            .copied()
214            .collect();
215
216        // Create projection mask based on field IDs
217        // - If file has embedded IDs: field-ID-based projection (missing_field_ids=false)
218        // - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match)
219        // - If fallback IDs: position-based projection (missing_field_ids=true)
220        let projection_mask = ArrowReader::get_arrow_projection_mask(
221            &project_field_ids_without_metadata,
222            &task.schema,
223            record_batch_stream_builder.parquet_schema(),
224            record_batch_stream_builder.schema(),
225            missing_field_ids, // Whether to use position-based (true) or field-ID-based (false) projection
226        )?;
227
228        record_batch_stream_builder =
229            record_batch_stream_builder.with_projection(projection_mask.clone());
230
231        // RecordBatchTransformer performs any transformations required on the RecordBatches
232        // that come back from the file, such as type promotion, default column insertion,
233        // column re-ordering, partition constants, and virtual field addition (like _file)
234        let mut record_batch_transformer_builder =
235            RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
236
237        // Add the _file metadata column if it's in the projected fields
238        if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
239            let file_datum = Datum::string(task.data_file_path.clone());
240            record_batch_transformer_builder =
241                record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum);
242        }
243
244        if let (Some(partition_spec), Some(partition_data)) =
245            (task.partition_spec.clone(), task.partition.clone())
246        {
247            record_batch_transformer_builder =
248                record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
249        }
250
251        let mut record_batch_transformer = record_batch_transformer_builder.build();
252
253        if let Some(batch_size) = self.batch_size {
254            record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
255        }
256
257        let delete_filter = delete_filter_rx.await.unwrap()?;
258        let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
259
260        // In addition to the optional predicate supplied in the `FileScanTask`,
261        // we also have an optional predicate resulting from equality delete files.
262        // If both are present, we logical-AND them together to form a single filter
263        // predicate that we can pass to the `RecordBatchStreamBuilder`.
264        let final_predicate = match (&task.predicate, delete_predicate) {
265            (None, None) => None,
266            (Some(predicate), None) => Some(predicate.clone()),
267            (None, Some(ref predicate)) => Some(predicate.clone()),
268            (Some(filter_predicate), Some(delete_predicate)) => {
269                Some(filter_predicate.clone().and(delete_predicate))
270            }
271        };
272
273        // There are three possible sources for potential lists of selected RowGroup indices,
274        // and two for `RowSelection`s.
275        // Selected RowGroup index lists can come from three sources:
276        //   * When task.start and task.length specify a byte range (file splitting);
277        //   * When there are equality delete files that are applicable;
278        //   * When there is a scan predicate and row_group_filtering_enabled = true.
279        // `RowSelection`s can be created in either or both of the following cases:
280        //   * When there are positional delete files that are applicable;
281        //   * When there is a scan predicate and row_selection_enabled = true
282        // Note that row group filtering from predicates only happens when
283        // there is a scan predicate AND row_group_filtering_enabled = true,
284        // but we perform row selection filtering if there are applicable
285        // equality delete files OR (there is a scan predicate AND row_selection_enabled),
286        // since the only implemented method of applying positional deletes is
287        // by using a `RowSelection`.
288        let mut selected_row_group_indices = None;
289        let mut row_selection = None;
290
291        // Filter row groups based on byte range from task.start and task.length.
292        // If both start and length are 0, read the entire file (backwards compatibility).
293        if task.start != 0 || task.length != 0 {
294            let byte_range_filtered_row_groups = ArrowReader::filter_row_groups_by_byte_range(
295                record_batch_stream_builder.metadata(),
296                task.start,
297                task.length,
298            )?;
299            selected_row_group_indices = Some(byte_range_filtered_row_groups);
300        }
301
302        if let Some(predicate) = final_predicate {
303            let (iceberg_field_ids, field_id_map) = ArrowReader::build_field_id_set_and_map(
304                record_batch_stream_builder.parquet_schema(),
305                &predicate,
306            )?;
307
308            let row_filter = ArrowReader::get_row_filter(
309                &predicate,
310                record_batch_stream_builder.parquet_schema(),
311                &iceberg_field_ids,
312                &field_id_map,
313            )?;
314            record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
315
316            if self.row_group_filtering_enabled {
317                let predicate_filtered_row_groups = ArrowReader::get_selected_row_group_indices(
318                    &predicate,
319                    record_batch_stream_builder.metadata(),
320                    &field_id_map,
321                    &task.schema,
322                )?;
323
324                // Merge predicate-based filtering with byte range filtering (if present)
325                // by taking the intersection of both filters
326                selected_row_group_indices = match selected_row_group_indices {
327                    Some(byte_range_filtered) => {
328                        // Keep only row groups that are in both filters
329                        let intersection: Vec<usize> = byte_range_filtered
330                            .into_iter()
331                            .filter(|idx| predicate_filtered_row_groups.contains(idx))
332                            .collect();
333                        Some(intersection)
334                    }
335                    None => Some(predicate_filtered_row_groups),
336                };
337            }
338
339            if self.row_selection_enabled {
340                row_selection = Some(ArrowReader::get_row_selection_for_filter_predicate(
341                    &predicate,
342                    record_batch_stream_builder.metadata(),
343                    &selected_row_group_indices,
344                    &field_id_map,
345                    &task.schema,
346                )?);
347            }
348        }
349
350        let positional_delete_indexes = delete_filter.get_delete_vector(&task);
351
352        if let Some(positional_delete_indexes) = positional_delete_indexes {
353            let delete_row_selection = {
354                let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
355
356                ArrowReader::build_deletes_row_selection(
357                    record_batch_stream_builder.metadata().row_groups(),
358                    &selected_row_group_indices,
359                    &positional_delete_indexes,
360                )
361            }?;
362
363            // merge the row selection from the delete files with the row selection
364            // from the filter predicate, if there is one from the filter predicate
365            row_selection = match row_selection {
366                None => Some(delete_row_selection),
367                Some(filter_row_selection) => {
368                    Some(filter_row_selection.intersection(&delete_row_selection))
369                }
370            };
371        }
372
373        if let Some(row_selection) = row_selection {
374            record_batch_stream_builder =
375                record_batch_stream_builder.with_row_selection(row_selection);
376        }
377
378        if let Some(selected_row_group_indices) = selected_row_group_indices {
379            record_batch_stream_builder =
380                record_batch_stream_builder.with_row_groups(selected_row_group_indices);
381        }
382
383        // Build the batch stream and send all the RecordBatches that it generates
384        // to the requester.
385        let record_batch_stream =
386            record_batch_stream_builder
387                .build()?
388                .map(move |batch| match batch {
389                    Ok(batch) => {
390                        // Process the record batch (type promotion, column reordering, virtual fields, etc.)
391                        record_batch_transformer.process_record_batch(batch)
392                    }
393                    Err(err) => Err(err.into()),
394                });
395
396        Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
397    }
398}
399
400impl ArrowReader {
401    /// Opens a Parquet file and loads its metadata, wrapping the reader with
402    /// [`CountingFileRead`] so all I/O is accumulated into `bytes_read`.
403    pub(crate) async fn open_parquet_file(
404        data_file_path: &str,
405        file_io: &FileIO,
406        file_size_in_bytes: u64,
407        parquet_read_options: ParquetReadOptions,
408        bytes_read: &Arc<AtomicU64>,
409    ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
410        let parquet_file = file_io.new_input(data_file_path)?;
411        let counting_reader =
412            CountingFileRead::new(parquet_file.reader().await?, Arc::clone(bytes_read));
413        Self::build_parquet_reader(
414            Box::new(counting_reader),
415            file_size_in_bytes,
416            parquet_read_options,
417        )
418        .await
419    }
420
421    async fn build_parquet_reader(
422        parquet_reader: Box<dyn FileRead>,
423        file_size_in_bytes: u64,
424        parquet_read_options: ParquetReadOptions,
425    ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
426        let mut reader = ArrowFileReader::new(
427            FileMetadata {
428                size: file_size_in_bytes,
429            },
430            parquet_reader,
431        )
432        .with_parquet_read_options(parquet_read_options);
433
434        let arrow_metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
435            .await
436            .map_err(|e| {
437                Error::new(ErrorKind::Unexpected, "Failed to load Parquet metadata").with_source(e)
438            })?;
439
440        Ok((reader, arrow_metadata))
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use std::collections::HashMap;
447    use std::fs::File;
448    use std::sync::Arc;
449
450    use arrow_array::cast::AsArray;
451    use arrow_array::{Array, ArrayRef, RecordBatch};
452    use arrow_schema::{DataType, Field, Schema as ArrowSchema};
453    use futures::TryStreamExt;
454    use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
455    use parquet::basic::Compression;
456    use parquet::file::properties::WriterProperties;
457    use tempfile::TempDir;
458
459    use crate::arrow::ArrowReaderBuilder;
460    use crate::io::FileIO;
461    use crate::scan::{FileScanTask, FileScanTaskStream};
462    use crate::spec::{DataFileFormat, NestedField, PrimitiveType, Schema, SchemaRef, Type};
463
464    // INT96 encoding: [nanos_low_u32, nanos_high_u32, julian_day_u32]
465    // Julian day 2_440_588 = Unix epoch (1970-01-01)
466    const UNIX_EPOCH_JULIAN: i64 = 2_440_588;
467    const MICROS_PER_DAY: i64 = 86_400_000_000;
468    // Noon on 3333-01-01 (Julian day 2_953_529) — outside the i64 nanosecond range (~1677-2262).
469    const INT96_TEST_NANOS_WITHIN_DAY: u64 = 43_200_000_000_000;
470    const INT96_TEST_JULIAN_DAY: u32 = 2_953_529;
471
472    fn make_int96_test_value() -> (parquet::data_type::Int96, i64) {
473        let mut val = parquet::data_type::Int96::new();
474        val.set_data(
475            (INT96_TEST_NANOS_WITHIN_DAY & 0xFFFFFFFF) as u32,
476            (INT96_TEST_NANOS_WITHIN_DAY >> 32) as u32,
477            INT96_TEST_JULIAN_DAY,
478        );
479        let expected_micros = (INT96_TEST_JULIAN_DAY as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY
480            + (INT96_TEST_NANOS_WITHIN_DAY / 1_000) as i64;
481        (val, expected_micros)
482    }
483
484    async fn read_int96_batches(
485        file_path: &str,
486        schema: SchemaRef,
487        project_field_ids: Vec<i32>,
488    ) -> Vec<RecordBatch> {
489        let file_io = FileIO::new_with_fs();
490        let reader = ArrowReaderBuilder::new(file_io).build();
491
492        let file_size = std::fs::metadata(file_path).unwrap().len();
493        let task = FileScanTask {
494            file_size_in_bytes: file_size,
495            start: 0,
496            length: file_size,
497            record_count: None,
498            data_file_path: file_path.to_string(),
499            data_file_format: DataFileFormat::Parquet,
500            schema,
501            project_field_ids,
502            predicate: None,
503            deletes: vec![],
504            partition: None,
505            partition_spec: None,
506            name_mapping: None,
507            case_sensitive: false,
508        };
509
510        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
511        reader
512            .read(tasks)
513            .unwrap()
514            .stream()
515            .try_collect()
516            .await
517            .unwrap()
518    }
519
520    // ArrowWriter cannot write INT96, so we use SerializedFileWriter directly.
521    fn write_int96_parquet_file(
522        table_location: &str,
523        filename: &str,
524        with_field_ids: bool,
525    ) -> (String, Vec<i64>) {
526        use parquet::basic::{Repetition, Type as PhysicalType};
527        use parquet::data_type::{Int32Type, Int96, Int96Type};
528        use parquet::file::writer::SerializedFileWriter;
529        use parquet::schema::types::Type as SchemaType;
530
531        let file_path = format!("{table_location}/{filename}");
532
533        let mut ts_builder = SchemaType::primitive_type_builder("ts", PhysicalType::INT96)
534            .with_repetition(Repetition::OPTIONAL);
535        let mut id_builder = SchemaType::primitive_type_builder("id", PhysicalType::INT32)
536            .with_repetition(Repetition::REQUIRED);
537
538        if with_field_ids {
539            ts_builder = ts_builder.with_id(Some(1));
540            id_builder = id_builder.with_id(Some(2));
541        }
542
543        let schema = SchemaType::group_type_builder("schema")
544            .with_fields(vec![
545                Arc::new(ts_builder.build().unwrap()),
546                Arc::new(id_builder.build().unwrap()),
547            ])
548            .build()
549            .unwrap();
550
551        // Dates outside the i64 nanosecond range (~1677-2262) overflow without coercion.
552        const NOON_NANOS: u64 = INT96_TEST_NANOS_WITHIN_DAY;
553        const JULIAN_3333: u32 = INT96_TEST_JULIAN_DAY;
554        const JULIAN_2100: u32 = 2_488_070;
555
556        let test_data: Vec<(u32, u32, u32, i64)> = vec![
557            // 3333-01-01 00:00:00
558            (
559                0,
560                0,
561                JULIAN_3333,
562                (JULIAN_3333 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY,
563            ),
564            // 3333-01-01 12:00:00
565            (
566                (NOON_NANOS & 0xFFFFFFFF) as u32,
567                (NOON_NANOS >> 32) as u32,
568                JULIAN_3333,
569                (JULIAN_3333 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY
570                    + (NOON_NANOS / 1_000) as i64,
571            ),
572            // 2100-01-01 00:00:00
573            (
574                0,
575                0,
576                JULIAN_2100,
577                (JULIAN_2100 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY,
578            ),
579        ];
580
581        let int96_values: Vec<Int96> = test_data
582            .iter()
583            .map(|(lo, hi, day, _)| {
584                let mut v = Int96::new();
585                v.set_data(*lo, *hi, *day);
586                v
587            })
588            .collect();
589
590        let id_values: Vec<i32> = (0..test_data.len() as i32).collect();
591        let expected_micros: Vec<i64> = test_data.iter().map(|(_, _, _, m)| *m).collect();
592
593        let file = File::create(&file_path).unwrap();
594        let mut writer =
595            SerializedFileWriter::new(file, Arc::new(schema), Default::default()).unwrap();
596
597        let mut row_group = writer.next_row_group().unwrap();
598        {
599            // def=1: ts is OPTIONAL and present. No repetition levels (top-level columns).
600            let mut col = row_group.next_column().unwrap().unwrap();
601            col.typed::<Int96Type>()
602                .write_batch(&int96_values, Some(&vec![1; test_data.len()]), None)
603                .unwrap();
604            col.close().unwrap();
605        }
606        {
607            let mut col = row_group.next_column().unwrap().unwrap();
608            col.typed::<Int32Type>()
609                .write_batch(&id_values, None, None)
610                .unwrap();
611            col.close().unwrap();
612        }
613        row_group.close().unwrap();
614        writer.close().unwrap();
615
616        (file_path, expected_micros)
617    }
618
619    async fn assert_int96_read_matches(
620        file_path: &str,
621        schema: SchemaRef,
622        project_field_ids: Vec<i32>,
623        expected_micros: &[i64],
624    ) {
625        use arrow_array::TimestampMicrosecondArray;
626
627        let batches = read_int96_batches(file_path, schema, project_field_ids).await;
628
629        assert_eq!(batches.len(), 1);
630        let ts_array = batches[0]
631            .column(0)
632            .as_any()
633            .downcast_ref::<TimestampMicrosecondArray>()
634            .expect("Expected TimestampMicrosecondArray");
635
636        for (i, expected) in expected_micros.iter().enumerate() {
637            assert_eq!(
638                ts_array.value(i),
639                *expected,
640                "Row {i}: got {}, expected {expected}",
641                ts_array.value(i)
642            );
643        }
644    }
645
646    /// Test that concurrency=1 reads all files correctly and in deterministic order.
647    /// This verifies the fast-path optimization for single concurrency.
648    #[tokio::test]
649    async fn test_read_with_concurrency_one() {
650        use arrow_array::Int32Array;
651
652        let schema = Arc::new(
653            Schema::builder()
654                .with_schema_id(1)
655                .with_fields(vec![
656                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
657                    NestedField::required(2, "file_num", Type::Primitive(PrimitiveType::Int))
658                        .into(),
659                ])
660                .build()
661                .unwrap(),
662        );
663
664        let arrow_schema = Arc::new(ArrowSchema::new(vec![
665            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
666                PARQUET_FIELD_ID_META_KEY.to_string(),
667                "1".to_string(),
668            )])),
669            Field::new("file_num", DataType::Int32, false).with_metadata(HashMap::from([(
670                PARQUET_FIELD_ID_META_KEY.to_string(),
671                "2".to_string(),
672            )])),
673        ]));
674
675        let tmp_dir = TempDir::new().unwrap();
676        let table_location = tmp_dir.path().to_str().unwrap().to_string();
677        let file_io = FileIO::new_with_fs();
678
679        // Create 3 parquet files with different data
680        let props = WriterProperties::builder()
681            .set_compression(Compression::SNAPPY)
682            .build();
683
684        for file_num in 0..3 {
685            let id_data = Arc::new(Int32Array::from_iter_values(
686                file_num * 10..(file_num + 1) * 10,
687            )) as ArrayRef;
688            let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10])) as ArrayRef;
689
690            let to_write =
691                RecordBatch::try_new(arrow_schema.clone(), vec![id_data, file_num_data]).unwrap();
692
693            let file = File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap();
694            let mut writer =
695                ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
696            writer.write(&to_write).expect("Writing batch");
697            writer.close().unwrap();
698        }
699
700        // Read with concurrency=1 (fast-path)
701        let reader = ArrowReaderBuilder::new(file_io)
702            .with_data_file_concurrency_limit(1)
703            .build();
704
705        // Create tasks in a specific order: file_0, file_1, file_2
706        let tasks = vec![
707            Ok(FileScanTask {
708                file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_0.parquet"))
709                    .unwrap()
710                    .len(),
711                start: 0,
712                length: 0,
713                record_count: None,
714                data_file_path: format!("{table_location}/file_0.parquet"),
715                data_file_format: DataFileFormat::Parquet,
716                schema: schema.clone(),
717                project_field_ids: vec![1, 2],
718                predicate: None,
719                deletes: vec![],
720                partition: None,
721                partition_spec: None,
722                name_mapping: None,
723                case_sensitive: false,
724            }),
725            Ok(FileScanTask {
726                file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_1.parquet"))
727                    .unwrap()
728                    .len(),
729                start: 0,
730                length: 0,
731                record_count: None,
732                data_file_path: format!("{table_location}/file_1.parquet"),
733                data_file_format: DataFileFormat::Parquet,
734                schema: schema.clone(),
735                project_field_ids: vec![1, 2],
736                predicate: None,
737                deletes: vec![],
738                partition: None,
739                partition_spec: None,
740                name_mapping: None,
741                case_sensitive: false,
742            }),
743            Ok(FileScanTask {
744                file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_2.parquet"))
745                    .unwrap()
746                    .len(),
747                start: 0,
748                length: 0,
749                record_count: None,
750                data_file_path: format!("{table_location}/file_2.parquet"),
751                data_file_format: DataFileFormat::Parquet,
752                schema: schema.clone(),
753                project_field_ids: vec![1, 2],
754                predicate: None,
755                deletes: vec![],
756                partition: None,
757                partition_spec: None,
758                name_mapping: None,
759                case_sensitive: false,
760            }),
761        ];
762
763        let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream;
764
765        let result = reader
766            .read(tasks_stream)
767            .unwrap()
768            .stream()
769            .try_collect::<Vec<RecordBatch>>()
770            .await
771            .unwrap();
772
773        // Verify we got all 30 rows (10 from each file)
774        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
775        assert_eq!(total_rows, 30, "Should have 30 total rows");
776
777        // Collect all ids and file_nums to verify data
778        let mut all_ids = Vec::new();
779        let mut all_file_nums = Vec::new();
780
781        for batch in &result {
782            let id_col = batch
783                .column(0)
784                .as_primitive::<arrow_array::types::Int32Type>();
785            let file_num_col = batch
786                .column(1)
787                .as_primitive::<arrow_array::types::Int32Type>();
788
789            for i in 0..batch.num_rows() {
790                all_ids.push(id_col.value(i));
791                all_file_nums.push(file_num_col.value(i));
792            }
793        }
794
795        assert_eq!(all_ids.len(), 30);
796        assert_eq!(all_file_nums.len(), 30);
797
798        // With concurrency=1 and sequential processing, files should be processed in order
799        // file_0: ids 0-9, file_num=0
800        // file_1: ids 10-19, file_num=1
801        // file_2: ids 20-29, file_num=2
802        for i in 0..10 {
803            assert_eq!(all_file_nums[i], 0, "First 10 rows should be from file_0");
804            assert_eq!(all_ids[i], i as i32, "IDs should be 0-9");
805        }
806        for i in 10..20 {
807            assert_eq!(all_file_nums[i], 1, "Next 10 rows should be from file_1");
808            assert_eq!(all_ids[i], i as i32, "IDs should be 10-19");
809        }
810        for i in 20..30 {
811            assert_eq!(all_file_nums[i], 2, "Last 10 rows should be from file_2");
812            assert_eq!(all_ids[i], i as i32, "IDs should be 20-29");
813        }
814    }
815
816    #[tokio::test]
817    async fn test_read_int96_timestamps_with_field_ids() {
818        let schema = Arc::new(
819            Schema::builder()
820                .with_schema_id(1)
821                .with_fields(vec![
822                    NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::Timestamp))
823                        .into(),
824                    NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(),
825                ])
826                .build()
827                .unwrap(),
828        );
829
830        let tmp_dir = TempDir::new().unwrap();
831        let table_location = tmp_dir.path().to_str().unwrap().to_string();
832        let (file_path, expected_micros) =
833            write_int96_parquet_file(&table_location, "with_ids.parquet", true);
834
835        assert_int96_read_matches(&file_path, schema, vec![1, 2], &expected_micros).await;
836    }
837
838    #[tokio::test]
839    async fn test_read_int96_timestamps_without_field_ids() {
840        let schema = Arc::new(
841            Schema::builder()
842                .with_schema_id(1)
843                .with_fields(vec![
844                    NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::Timestamp))
845                        .into(),
846                    NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(),
847                ])
848                .build()
849                .unwrap(),
850        );
851
852        let tmp_dir = TempDir::new().unwrap();
853        let table_location = tmp_dir.path().to_str().unwrap().to_string();
854        let (file_path, expected_micros) =
855            write_int96_parquet_file(&table_location, "no_ids.parquet", false);
856
857        assert_int96_read_matches(&file_path, schema, vec![1, 2], &expected_micros).await;
858    }
859
860    #[tokio::test]
861    async fn test_read_int96_timestamps_in_struct() {
862        use arrow_array::{StructArray, TimestampMicrosecondArray};
863        use parquet::basic::{Repetition, Type as PhysicalType};
864        use parquet::data_type::Int96Type;
865        use parquet::file::writer::SerializedFileWriter;
866        use parquet::schema::types::Type as SchemaType;
867
868        let tmp_dir = TempDir::new().unwrap();
869        let table_location = tmp_dir.path().to_str().unwrap().to_string();
870        let file_path = format!("{table_location}/struct_int96.parquet");
871
872        let ts_type = SchemaType::primitive_type_builder("ts", PhysicalType::INT96)
873            .with_repetition(Repetition::OPTIONAL)
874            .with_id(Some(2))
875            .build()
876            .unwrap();
877
878        let struct_type = SchemaType::group_type_builder("data")
879            .with_repetition(Repetition::REQUIRED)
880            .with_id(Some(1))
881            .with_fields(vec![Arc::new(ts_type)])
882            .build()
883            .unwrap();
884
885        let parquet_schema = SchemaType::group_type_builder("schema")
886            .with_fields(vec![Arc::new(struct_type)])
887            .build()
888            .unwrap();
889
890        let (int96_val, expected_micros) = make_int96_test_value();
891
892        let file = File::create(&file_path).unwrap();
893        let mut writer =
894            SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
895
896        // def=1: struct is REQUIRED so no level, ts is OPTIONAL and present (1).
897        // No repetition levels needed (no repeated groups).
898        let mut row_group = writer.next_row_group().unwrap();
899        {
900            let mut col = row_group.next_column().unwrap().unwrap();
901            col.typed::<Int96Type>()
902                .write_batch(&[int96_val], Some(&[1]), None)
903                .unwrap();
904            col.close().unwrap();
905        }
906        row_group.close().unwrap();
907        writer.close().unwrap();
908
909        let iceberg_schema = Arc::new(
910            Schema::builder()
911                .with_schema_id(1)
912                .with_fields(vec![
913                    NestedField::required(
914                        1,
915                        "data",
916                        Type::Struct(crate::spec::StructType::new(vec![
917                            NestedField::optional(
918                                2,
919                                "ts",
920                                Type::Primitive(PrimitiveType::Timestamp),
921                            )
922                            .into(),
923                        ])),
924                    )
925                    .into(),
926                ])
927                .build()
928                .unwrap(),
929        );
930
931        let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
932
933        assert_eq!(batches.len(), 1);
934        let struct_array = batches[0]
935            .column(0)
936            .as_any()
937            .downcast_ref::<StructArray>()
938            .expect("Expected StructArray");
939        let ts_array = struct_array
940            .column(0)
941            .as_any()
942            .downcast_ref::<TimestampMicrosecondArray>()
943            .expect("Expected TimestampMicrosecondArray inside struct");
944
945        assert_eq!(
946            ts_array.value(0),
947            expected_micros,
948            "INT96 in struct: got {}, expected {expected_micros}",
949            ts_array.value(0)
950        );
951    }
952
953    #[tokio::test]
954    async fn test_read_int96_timestamps_in_list() {
955        use arrow_array::{ListArray, TimestampMicrosecondArray};
956        use parquet::basic::{Repetition, Type as PhysicalType};
957        use parquet::data_type::Int96Type;
958        use parquet::file::writer::SerializedFileWriter;
959        use parquet::schema::types::Type as SchemaType;
960
961        let tmp_dir = TempDir::new().unwrap();
962        let table_location = tmp_dir.path().to_str().unwrap().to_string();
963        let file_path = format!("{table_location}/list_int96.parquet");
964
965        // 3-level LIST encoding:
966        //   optional group timestamps (LIST) {
967        //     repeated group list {
968        //       optional int96 element;
969        //     }
970        //   }
971        let element_type = SchemaType::primitive_type_builder("element", PhysicalType::INT96)
972            .with_repetition(Repetition::OPTIONAL)
973            .with_id(Some(2))
974            .build()
975            .unwrap();
976
977        let list_group = SchemaType::group_type_builder("list")
978            .with_repetition(Repetition::REPEATED)
979            .with_fields(vec![Arc::new(element_type)])
980            .build()
981            .unwrap();
982
983        let list_type = SchemaType::group_type_builder("timestamps")
984            .with_repetition(Repetition::OPTIONAL)
985            .with_id(Some(1))
986            .with_logical_type(Some(parquet::basic::LogicalType::List))
987            .with_fields(vec![Arc::new(list_group)])
988            .build()
989            .unwrap();
990
991        let parquet_schema = SchemaType::group_type_builder("schema")
992            .with_fields(vec![Arc::new(list_type)])
993            .build()
994            .unwrap();
995
996        let (int96_val, expected_micros) = make_int96_test_value();
997
998        let file = File::create(&file_path).unwrap();
999        let mut writer =
1000            SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
1001
1002        // Write a single row with a list containing one INT96 element.
1003        // def=3: list present (1) + repeated group (2) + element present (3)
1004        // rep=0: start of a new list
1005        let mut row_group = writer.next_row_group().unwrap();
1006        {
1007            let mut col = row_group.next_column().unwrap().unwrap();
1008            col.typed::<Int96Type>()
1009                .write_batch(&[int96_val], Some(&[3]), Some(&[0]))
1010                .unwrap();
1011            col.close().unwrap();
1012        }
1013        row_group.close().unwrap();
1014        writer.close().unwrap();
1015
1016        let iceberg_schema = Arc::new(
1017            Schema::builder()
1018                .with_schema_id(1)
1019                .with_fields(vec![
1020                    NestedField::optional(
1021                        1,
1022                        "timestamps",
1023                        Type::List(crate::spec::ListType {
1024                            element_field: NestedField::optional(
1025                                2,
1026                                "element",
1027                                Type::Primitive(PrimitiveType::Timestamp),
1028                            )
1029                            .into(),
1030                        }),
1031                    )
1032                    .into(),
1033                ])
1034                .build()
1035                .unwrap(),
1036        );
1037
1038        let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
1039
1040        assert_eq!(batches.len(), 1);
1041        let list_array = batches[0]
1042            .column(0)
1043            .as_any()
1044            .downcast_ref::<ListArray>()
1045            .expect("Expected ListArray");
1046        let ts_array = list_array
1047            .values()
1048            .as_any()
1049            .downcast_ref::<TimestampMicrosecondArray>()
1050            .expect("Expected TimestampMicrosecondArray inside list");
1051
1052        assert_eq!(
1053            ts_array.value(0),
1054            expected_micros,
1055            "INT96 in list: got {}, expected {expected_micros}",
1056            ts_array.value(0)
1057        );
1058    }
1059
1060    #[tokio::test]
1061    async fn test_read_int96_timestamps_in_map() {
1062        use arrow_array::{MapArray, TimestampMicrosecondArray};
1063        use parquet::basic::{Repetition, Type as PhysicalType};
1064        use parquet::data_type::{ByteArrayType, Int96Type};
1065        use parquet::file::writer::SerializedFileWriter;
1066        use parquet::schema::types::Type as SchemaType;
1067
1068        let tmp_dir = TempDir::new().unwrap();
1069        let table_location = tmp_dir.path().to_str().unwrap().to_string();
1070        let file_path = format!("{table_location}/map_int96.parquet");
1071
1072        // MAP encoding:
1073        //   optional group ts_map (MAP) {
1074        //     repeated group key_value {
1075        //       required binary key (UTF8);
1076        //       optional int96 value;
1077        //     }
1078        //   }
1079        let key_type = SchemaType::primitive_type_builder("key", PhysicalType::BYTE_ARRAY)
1080            .with_repetition(Repetition::REQUIRED)
1081            .with_logical_type(Some(parquet::basic::LogicalType::String))
1082            .with_id(Some(2))
1083            .build()
1084            .unwrap();
1085
1086        let value_type = SchemaType::primitive_type_builder("value", PhysicalType::INT96)
1087            .with_repetition(Repetition::OPTIONAL)
1088            .with_id(Some(3))
1089            .build()
1090            .unwrap();
1091
1092        let key_value_group = SchemaType::group_type_builder("key_value")
1093            .with_repetition(Repetition::REPEATED)
1094            .with_fields(vec![Arc::new(key_type), Arc::new(value_type)])
1095            .build()
1096            .unwrap();
1097
1098        let map_type = SchemaType::group_type_builder("ts_map")
1099            .with_repetition(Repetition::OPTIONAL)
1100            .with_id(Some(1))
1101            .with_logical_type(Some(parquet::basic::LogicalType::Map))
1102            .with_fields(vec![Arc::new(key_value_group)])
1103            .build()
1104            .unwrap();
1105
1106        let parquet_schema = SchemaType::group_type_builder("schema")
1107            .with_fields(vec![Arc::new(map_type)])
1108            .build()
1109            .unwrap();
1110
1111        let (int96_val, expected_micros) = make_int96_test_value();
1112
1113        let file = File::create(&file_path).unwrap();
1114        let mut writer =
1115            SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
1116
1117        // Write a single row with a map containing one key-value pair.
1118        // rep=0 for both columns: start of a new map.
1119        // key def=2: map present (1) + key_value entry present (2), key is REQUIRED.
1120        // value def=3: map present (1) + key_value entry present (2) + value present (3).
1121        let mut row_group = writer.next_row_group().unwrap();
1122        {
1123            let mut col = row_group.next_column().unwrap().unwrap();
1124            col.typed::<ByteArrayType>()
1125                .write_batch(
1126                    &[parquet::data_type::ByteArray::from("event_time")],
1127                    Some(&[2]),
1128                    Some(&[0]),
1129                )
1130                .unwrap();
1131            col.close().unwrap();
1132        }
1133        {
1134            let mut col = row_group.next_column().unwrap().unwrap();
1135            col.typed::<Int96Type>()
1136                .write_batch(&[int96_val], Some(&[3]), Some(&[0]))
1137                .unwrap();
1138            col.close().unwrap();
1139        }
1140        row_group.close().unwrap();
1141        writer.close().unwrap();
1142
1143        let iceberg_schema = Arc::new(
1144            Schema::builder()
1145                .with_schema_id(1)
1146                .with_fields(vec![
1147                    NestedField::optional(
1148                        1,
1149                        "ts_map",
1150                        Type::Map(crate::spec::MapType {
1151                            key_field: NestedField::required(
1152                                2,
1153                                "key",
1154                                Type::Primitive(PrimitiveType::String),
1155                            )
1156                            .into(),
1157                            value_field: NestedField::optional(
1158                                3,
1159                                "value",
1160                                Type::Primitive(PrimitiveType::Timestamp),
1161                            )
1162                            .into(),
1163                        }),
1164                    )
1165                    .into(),
1166                ])
1167                .build()
1168                .unwrap(),
1169        );
1170
1171        let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
1172
1173        assert_eq!(batches.len(), 1);
1174        let map_array = batches[0]
1175            .column(0)
1176            .as_any()
1177            .downcast_ref::<MapArray>()
1178            .expect("Expected MapArray");
1179        let ts_array = map_array
1180            .values()
1181            .as_any()
1182            .downcast_ref::<TimestampMicrosecondArray>()
1183            .expect("Expected TimestampMicrosecondArray as map values");
1184
1185        assert_eq!(
1186            ts_array.value(0),
1187            expected_micros,
1188            "INT96 in map: got {}, expected {expected_micros}",
1189            ts_array.value(0)
1190        );
1191    }
1192}