Skip to main content

iceberg/arrow/reader/
pipeline.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The main `ArrowReader` pipeline: reading a stream of `FileScanTask`s,
19//! opening Parquet files and resolving schemas, then wiring projection,
20//! predicates, row-group / row selection, and delete handling into a stream
21//! of transformed Arrow `RecordBatch`es.
22
23use std::sync::Arc;
24use std::sync::atomic::AtomicU64;
25
26use futures::{StreamExt, TryStreamExt};
27use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
28use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder};
29
30use super::{
31    ArrowFileReader, ArrowReader, ParquetReadOptions, add_fallback_field_ids_to_arrow_schema,
32    apply_name_mapping_to_arrow_schema,
33};
34use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
35use crate::arrow::int96::coerce_int96_timestamps;
36use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
37use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics, ScanResult};
38use crate::error::Result;
39use crate::io::{FileIO, FileMetadata, FileRead};
40use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
41use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
42use crate::spec::Datum;
43use crate::{Error, ErrorKind};
44
45impl ArrowReader {
46    /// Take a stream of FileScanTasks and reads all the files.
47    /// Returns a [`ScanResult`] containing the record batch stream and scan metrics.
48    pub fn read(self, tasks: FileScanTaskStream) -> Result<ScanResult> {
49        let concurrency_limit_data_files = self.concurrency_limit_data_files;
50        let scan_metrics = ScanMetrics::new();
51
52        let task_reader = FileScanTaskReader {
53            batch_size: self.batch_size,
54            file_io: self.file_io,
55            delete_file_loader: self
56                .delete_file_loader
57                .with_scan_metrics(scan_metrics.clone()),
58            row_group_filtering_enabled: self.row_group_filtering_enabled,
59            row_selection_enabled: self.row_selection_enabled,
60            parquet_read_options: self.parquet_read_options,
61            scan_metrics: scan_metrics.clone(),
62        };
63
64        // Fast-path for single concurrency to avoid overhead of try_flatten_unordered
65        let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 {
66            Box::pin(
67                tasks
68                    .and_then(move |task| task_reader.clone().process(task))
69                    .map_err(|err| {
70                        Error::new(ErrorKind::Unexpected, "file scan task generate failed")
71                            .with_source(err)
72                    })
73                    .try_flatten(),
74            )
75        } else {
76            Box::pin(
77                tasks
78                    .map_ok(move |task| task_reader.clone().process(task))
79                    .map_err(|err| {
80                        Error::new(ErrorKind::Unexpected, "file scan task generate failed")
81                            .with_source(err)
82                    })
83                    .try_buffer_unordered(concurrency_limit_data_files)
84                    .try_flatten_unordered(concurrency_limit_data_files),
85            )
86        };
87
88        Ok(ScanResult::new(stream, scan_metrics))
89    }
90}
91
92/// Per-scan state for processing [`FileScanTask`]s. Created once per
93/// [`ArrowReader::read`] call and cloned per task.
94#[derive(Clone)]
95struct FileScanTaskReader {
96    batch_size: Option<usize>,
97    file_io: FileIO,
98    delete_file_loader: CachingDeleteFileLoader,
99    row_group_filtering_enabled: bool,
100    row_selection_enabled: bool,
101    parquet_read_options: ParquetReadOptions,
102    scan_metrics: ScanMetrics,
103}
104
105impl FileScanTaskReader {
106    async fn process(self, task: FileScanTask) -> Result<ArrowRecordBatchStream> {
107        let should_load_page_index =
108            (self.row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
109        let mut parquet_read_options = self.parquet_read_options;
110        parquet_read_options.preload_page_index = should_load_page_index;
111
112        let delete_filter_rx = self
113            .delete_file_loader
114            .load_deletes(&task.deletes, Arc::clone(&task.schema));
115
116        // Open the Parquet file once, loading its metadata
117        let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file(
118            &task.data_file_path,
119            &self.file_io,
120            task.file_size_in_bytes,
121            parquet_read_options,
122            self.scan_metrics.bytes_read_counter(),
123        )
124        .await?;
125
126        // Check if Parquet file has embedded field IDs
127        // Corresponds to Java's ParquetSchemaUtil.hasIds()
128        // Reference: parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java:118
129        let missing_field_ids = arrow_metadata
130            .schema()
131            .fields()
132            .iter()
133            .next()
134            .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
135
136        // Position-based fallback applies only when the file has no embedded field IDs
137        // AND no name mapping is available. With a name mapping, field IDs are assigned
138        // to the Arrow schema below, and projection/predicate planning must use them
139        // (see #2403).
140        let use_position_fallback = missing_field_ids && task.name_mapping.is_none();
141
142        // Three-branch schema resolution strategy matching Java's ReadConf constructor
143        //
144        // Per Iceberg spec Column Projection rules:
145        // "Columns in Iceberg data files are selected by field id. The table schema's column
146        //  names and order may change after a data file is written, and projection must be done
147        //  using field ids."
148        // https://iceberg.apache.org/spec/#column-projection
149        //
150        // When Parquet files lack field IDs (e.g., Hive/Spark migrations via add_files),
151        // we must assign field IDs BEFORE reading data to enable correct projection.
152        //
153        // Java's ReadConf determines field ID strategy:
154        // - Branch 1: hasIds(fileSchema) → trust embedded field IDs, use pruneColumns()
155        // - Branch 2: nameMapping present → applyNameMapping(), then pruneColumns()
156        // - Branch 3: fallback → addFallbackIds(), then pruneColumnsFallback()
157        let arrow_metadata = if missing_field_ids {
158            // Parquet file lacks field IDs - must assign them before reading
159            let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
160                // Branch 2: Apply name mapping to assign correct Iceberg field IDs
161                // Per spec rule #2: "Use schema.name-mapping.default metadata to map field id
162                // to columns without field id"
163                // Corresponds to Java's ParquetSchemaUtil.applyNameMapping()
164                apply_name_mapping_to_arrow_schema(
165                    Arc::clone(arrow_metadata.schema()),
166                    name_mapping,
167                )?
168            } else {
169                // Branch 3: No name mapping - use position-based fallback IDs
170                // Corresponds to Java's ParquetSchemaUtil.addFallbackIds()
171                add_fallback_field_ids_to_arrow_schema(arrow_metadata.schema())
172            };
173
174            let options = ArrowReaderOptions::new().with_schema(arrow_schema);
175            ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
176                |e| {
177                    Error::new(
178                        ErrorKind::Unexpected,
179                        "Failed to create ArrowReaderMetadata with field ID schema",
180                    )
181                    .with_source(e)
182                },
183            )?
184        } else {
185            // Branch 1: File has embedded field IDs - trust them
186            arrow_metadata
187        };
188
189        // Coerce INT96 timestamp columns to the resolution specified by the Iceberg schema.
190        // This must happen before building the stream reader to avoid i64 overflow in arrow-rs.
191        let arrow_metadata = if let Some(coerced_schema) =
192            coerce_int96_timestamps(arrow_metadata.schema(), &task.schema)
193        {
194            let options = ArrowReaderOptions::new().with_schema(Arc::clone(&coerced_schema));
195            ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
196                |e| {
197                    Error::new(
198                        ErrorKind::Unexpected,
199                        format!(
200                            "Failed to create ArrowReaderMetadata with INT96-coerced schema: {coerced_schema}"
201                        ),
202                    )
203                    .with_source(e)
204                },
205            )?
206        } else {
207            arrow_metadata
208        };
209
210        // Build the stream reader, reusing the already-opened file reader
211        let mut record_batch_stream_builder =
212            ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_file_reader, arrow_metadata);
213
214        // Filter out metadata fields for Parquet projection (they don't exist in files)
215        let project_field_ids_without_metadata: Vec<i32> = task
216            .project_field_ids
217            .iter()
218            .filter(|&&id| !is_metadata_field(id))
219            .copied()
220            .collect();
221
222        // Create projection mask based on field IDs
223        // - If file has embedded IDs: field-ID-based projection
224        // - If name mapping applied: field-ID-based projection using the IDs the name
225        //   mapping assigned to the Arrow schema
226        // - Otherwise: position-based fallback projection
227        let projection_mask = ArrowReader::get_arrow_projection_mask(
228            &project_field_ids_without_metadata,
229            &task.schema,
230            record_batch_stream_builder.parquet_schema(),
231            record_batch_stream_builder.schema(),
232            use_position_fallback, // Whether to use position-based (true) or field-ID-based (false) projection
233        )?;
234
235        record_batch_stream_builder =
236            record_batch_stream_builder.with_projection(projection_mask.clone());
237
238        // RecordBatchTransformer performs any transformations required on the RecordBatches
239        // that come back from the file, such as type promotion, default column insertion,
240        // column re-ordering, partition constants, and virtual field addition (like _file)
241        let mut record_batch_transformer_builder =
242            RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
243
244        // Add the _file metadata column if it's in the projected fields
245        if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
246            let file_datum = Datum::string(task.data_file_path.clone());
247            record_batch_transformer_builder =
248                record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum);
249        }
250
251        if let (Some(partition_spec), Some(partition_data)) =
252            (task.partition_spec.clone(), task.partition.clone())
253        {
254            record_batch_transformer_builder =
255                record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
256        }
257
258        let mut record_batch_transformer = record_batch_transformer_builder.build();
259
260        if let Some(batch_size) = self.batch_size {
261            record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
262        }
263
264        let delete_filter = delete_filter_rx.await.unwrap()?;
265        let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
266
267        // In addition to the optional predicate supplied in the `FileScanTask`,
268        // we also have an optional predicate resulting from equality delete files.
269        // If both are present, we logical-AND them together to form a single filter
270        // predicate that we can pass to the `RecordBatchStreamBuilder`.
271        let final_predicate = match (&task.predicate, delete_predicate) {
272            (None, None) => None,
273            (Some(predicate), None) => Some(predicate.clone()),
274            (None, Some(ref predicate)) => Some(predicate.clone()),
275            (Some(filter_predicate), Some(delete_predicate)) => {
276                Some(filter_predicate.clone().and(delete_predicate))
277            }
278        };
279
280        // There are three possible sources for potential lists of selected RowGroup indices,
281        // and two for `RowSelection`s.
282        // Selected RowGroup index lists can come from three sources:
283        //   * When task.start and task.length specify a byte range (file splitting);
284        //   * When there are equality delete files that are applicable;
285        //   * When there is a scan predicate and row_group_filtering_enabled = true.
286        // `RowSelection`s can be created in either or both of the following cases:
287        //   * When there are positional delete files that are applicable;
288        //   * When there is a scan predicate and row_selection_enabled = true
289        // Note that row group filtering from predicates only happens when
290        // there is a scan predicate AND row_group_filtering_enabled = true,
291        // but we perform row selection filtering if there are applicable
292        // equality delete files OR (there is a scan predicate AND row_selection_enabled),
293        // since the only implemented method of applying positional deletes is
294        // by using a `RowSelection`.
295        let mut selected_row_group_indices = None;
296        let mut row_selection = None;
297
298        // Filter row groups based on byte range from task.start and task.length.
299        // If both start and length are 0, read the entire file (backwards compatibility).
300        if task.start != 0 || task.length != 0 {
301            let byte_range_filtered_row_groups = ArrowReader::filter_row_groups_by_byte_range(
302                record_batch_stream_builder.metadata(),
303                task.start,
304                task.length,
305            )?;
306            selected_row_group_indices = Some(byte_range_filtered_row_groups);
307        }
308
309        if let Some(predicate) = final_predicate {
310            let (iceberg_field_ids, field_id_map) = ArrowReader::build_field_id_set_and_map(
311                record_batch_stream_builder.parquet_schema(),
312                record_batch_stream_builder.schema(),
313                &predicate,
314                use_position_fallback,
315            )?;
316
317            let row_filter = ArrowReader::get_row_filter(
318                &predicate,
319                record_batch_stream_builder.parquet_schema(),
320                &iceberg_field_ids,
321                &field_id_map,
322            )?;
323            record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
324
325            if self.row_group_filtering_enabled {
326                let predicate_filtered_row_groups = ArrowReader::get_selected_row_group_indices(
327                    &predicate,
328                    record_batch_stream_builder.metadata(),
329                    &field_id_map,
330                    &task.schema,
331                )?;
332
333                // Merge predicate-based filtering with byte range filtering (if present)
334                // by taking the intersection of both filters
335                selected_row_group_indices = match selected_row_group_indices {
336                    Some(byte_range_filtered) => {
337                        // Keep only row groups that are in both filters
338                        let intersection: Vec<usize> = byte_range_filtered
339                            .into_iter()
340                            .filter(|idx| predicate_filtered_row_groups.contains(idx))
341                            .collect();
342                        Some(intersection)
343                    }
344                    None => Some(predicate_filtered_row_groups),
345                };
346            }
347
348            if self.row_selection_enabled {
349                row_selection = ArrowReader::get_row_selection_for_filter_predicate(
350                    &predicate,
351                    record_batch_stream_builder.metadata(),
352                    &selected_row_group_indices,
353                    &field_id_map,
354                    &task.schema,
355                )?;
356            }
357        }
358
359        let positional_delete_indexes = delete_filter.get_delete_vector(&task);
360
361        if let Some(positional_delete_indexes) = positional_delete_indexes {
362            let delete_row_selection = {
363                let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
364
365                ArrowReader::build_deletes_row_selection(
366                    record_batch_stream_builder.metadata().row_groups(),
367                    &selected_row_group_indices,
368                    &positional_delete_indexes,
369                )
370            }?;
371
372            // merge the row selection from the delete files with the row selection
373            // from the filter predicate, if there is one from the filter predicate
374            row_selection = match row_selection {
375                None => Some(delete_row_selection),
376                Some(filter_row_selection) => {
377                    Some(filter_row_selection.intersection(&delete_row_selection))
378                }
379            };
380        }
381
382        if let Some(row_selection) = row_selection {
383            record_batch_stream_builder =
384                record_batch_stream_builder.with_row_selection(row_selection);
385        }
386
387        if let Some(selected_row_group_indices) = selected_row_group_indices {
388            record_batch_stream_builder =
389                record_batch_stream_builder.with_row_groups(selected_row_group_indices);
390        }
391
392        // Build the batch stream and send all the RecordBatches that it generates
393        // to the requester.
394        let record_batch_stream =
395            record_batch_stream_builder
396                .build()?
397                .map(move |batch| match batch {
398                    Ok(batch) => {
399                        // Process the record batch (type promotion, column reordering, virtual fields, etc.)
400                        record_batch_transformer.process_record_batch(batch)
401                    }
402                    Err(err) => Err(err.into()),
403                });
404
405        Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
406    }
407}
408
409impl ArrowReader {
410    /// Opens a Parquet file and loads its metadata, wrapping the reader with
411    /// [`CountingFileRead`] so all I/O is accumulated into `bytes_read`.
412    pub(crate) async fn open_parquet_file(
413        data_file_path: &str,
414        file_io: &FileIO,
415        file_size_in_bytes: u64,
416        parquet_read_options: ParquetReadOptions,
417        bytes_read: &Arc<AtomicU64>,
418    ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
419        let parquet_file = file_io.new_input(data_file_path)?;
420        let counting_reader =
421            CountingFileRead::new(parquet_file.reader().await?, Arc::clone(bytes_read));
422        Self::build_parquet_reader(
423            Box::new(counting_reader),
424            file_size_in_bytes,
425            parquet_read_options,
426        )
427        .await
428    }
429
430    async fn build_parquet_reader(
431        parquet_reader: Box<dyn FileRead>,
432        file_size_in_bytes: u64,
433        parquet_read_options: ParquetReadOptions,
434    ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
435        let mut reader = ArrowFileReader::new(
436            FileMetadata {
437                size: file_size_in_bytes,
438            },
439            parquet_reader,
440        )
441        .with_parquet_read_options(parquet_read_options);
442
443        let arrow_metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
444            .await
445            .map_err(|e| {
446                Error::new(ErrorKind::Unexpected, "Failed to load Parquet metadata").with_source(e)
447            })?;
448
449        Ok((reader, arrow_metadata))
450    }
451}
452
453#[cfg(test)]
454mod tests {
455    use std::collections::HashMap;
456    use std::fs::File;
457    use std::sync::Arc;
458
459    use arrow_array::cast::AsArray;
460    use arrow_array::{Array, ArrayRef, RecordBatch};
461    use arrow_schema::{DataType, Field, Schema as ArrowSchema};
462    use futures::TryStreamExt;
463    use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
464    use parquet::basic::Compression;
465    use parquet::file::properties::WriterProperties;
466    use tempfile::TempDir;
467
468    use crate::Runtime;
469    use crate::arrow::ArrowReaderBuilder;
470    use crate::io::FileIO;
471    use crate::scan::{FileScanTask, FileScanTaskStream};
472    use crate::spec::{DataFileFormat, NestedField, PrimitiveType, Schema, SchemaRef, Type};
473
474    // INT96 encoding: [nanos_low_u32, nanos_high_u32, julian_day_u32]
475    // Julian day 2_440_588 = Unix epoch (1970-01-01)
476    const UNIX_EPOCH_JULIAN: i64 = 2_440_588;
477    const MICROS_PER_DAY: i64 = 86_400_000_000;
478    // Noon on 3333-01-01 (Julian day 2_953_529) — outside the i64 nanosecond range (~1677-2262).
479    const INT96_TEST_NANOS_WITHIN_DAY: u64 = 43_200_000_000_000;
480    const INT96_TEST_JULIAN_DAY: u32 = 2_953_529;
481
482    fn make_int96_test_value() -> (parquet::data_type::Int96, i64) {
483        let mut val = parquet::data_type::Int96::new();
484        val.set_data(
485            (INT96_TEST_NANOS_WITHIN_DAY & 0xFFFFFFFF) as u32,
486            (INT96_TEST_NANOS_WITHIN_DAY >> 32) as u32,
487            INT96_TEST_JULIAN_DAY,
488        );
489        let expected_micros = (INT96_TEST_JULIAN_DAY as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY
490            + (INT96_TEST_NANOS_WITHIN_DAY / 1_000) as i64;
491        (val, expected_micros)
492    }
493
494    async fn read_int96_batches(
495        file_path: &str,
496        schema: SchemaRef,
497        project_field_ids: Vec<i32>,
498    ) -> Vec<RecordBatch> {
499        let file_io = FileIO::new_with_fs();
500        let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build();
501
502        let file_size = std::fs::metadata(file_path).unwrap().len();
503        let task = FileScanTask::builder()
504            .with_file_size_in_bytes(file_size)
505            .with_start(0)
506            .with_length(file_size)
507            .with_data_file_path(file_path.to_string())
508            .with_data_file_format(DataFileFormat::Parquet)
509            .with_schema(schema)
510            .with_project_field_ids(project_field_ids)
511            .with_case_sensitive(false)
512            .build();
513
514        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
515        reader
516            .read(tasks)
517            .unwrap()
518            .stream()
519            .try_collect()
520            .await
521            .unwrap()
522    }
523
524    // ArrowWriter cannot write INT96, so we use SerializedFileWriter directly.
525    fn write_int96_parquet_file(
526        table_location: &str,
527        filename: &str,
528        with_field_ids: bool,
529    ) -> (String, Vec<i64>) {
530        use parquet::basic::{Repetition, Type as PhysicalType};
531        use parquet::data_type::{Int32Type, Int96, Int96Type};
532        use parquet::file::writer::SerializedFileWriter;
533        use parquet::schema::types::Type as SchemaType;
534
535        let file_path = format!("{table_location}/{filename}");
536
537        let mut ts_builder = SchemaType::primitive_type_builder("ts", PhysicalType::INT96)
538            .with_repetition(Repetition::OPTIONAL);
539        let mut id_builder = SchemaType::primitive_type_builder("id", PhysicalType::INT32)
540            .with_repetition(Repetition::REQUIRED);
541
542        if with_field_ids {
543            ts_builder = ts_builder.with_id(Some(1));
544            id_builder = id_builder.with_id(Some(2));
545        }
546
547        let schema = SchemaType::group_type_builder("schema")
548            .with_fields(vec![
549                Arc::new(ts_builder.build().unwrap()),
550                Arc::new(id_builder.build().unwrap()),
551            ])
552            .build()
553            .unwrap();
554
555        // Dates outside the i64 nanosecond range (~1677-2262) overflow without coercion.
556        const NOON_NANOS: u64 = INT96_TEST_NANOS_WITHIN_DAY;
557        const JULIAN_3333: u32 = INT96_TEST_JULIAN_DAY;
558        const JULIAN_2100: u32 = 2_488_070;
559
560        let test_data: Vec<(u32, u32, u32, i64)> = vec![
561            // 3333-01-01 00:00:00
562            (
563                0,
564                0,
565                JULIAN_3333,
566                (JULIAN_3333 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY,
567            ),
568            // 3333-01-01 12:00:00
569            (
570                (NOON_NANOS & 0xFFFFFFFF) as u32,
571                (NOON_NANOS >> 32) as u32,
572                JULIAN_3333,
573                (JULIAN_3333 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY
574                    + (NOON_NANOS / 1_000) as i64,
575            ),
576            // 2100-01-01 00:00:00
577            (
578                0,
579                0,
580                JULIAN_2100,
581                (JULIAN_2100 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY,
582            ),
583        ];
584
585        let int96_values: Vec<Int96> = test_data
586            .iter()
587            .map(|(lo, hi, day, _)| {
588                let mut v = Int96::new();
589                v.set_data(*lo, *hi, *day);
590                v
591            })
592            .collect();
593
594        let id_values: Vec<i32> = (0..test_data.len() as i32).collect();
595        let expected_micros: Vec<i64> = test_data.iter().map(|(_, _, _, m)| *m).collect();
596
597        let file = File::create(&file_path).unwrap();
598        let mut writer =
599            SerializedFileWriter::new(file, Arc::new(schema), Default::default()).unwrap();
600
601        let mut row_group = writer.next_row_group().unwrap();
602        {
603            // def=1: ts is OPTIONAL and present. No repetition levels (top-level columns).
604            let mut col = row_group.next_column().unwrap().unwrap();
605            col.typed::<Int96Type>()
606                .write_batch(&int96_values, Some(&vec![1; test_data.len()]), None)
607                .unwrap();
608            col.close().unwrap();
609        }
610        {
611            let mut col = row_group.next_column().unwrap().unwrap();
612            col.typed::<Int32Type>()
613                .write_batch(&id_values, None, None)
614                .unwrap();
615            col.close().unwrap();
616        }
617        row_group.close().unwrap();
618        writer.close().unwrap();
619
620        (file_path, expected_micros)
621    }
622
623    async fn assert_int96_read_matches(
624        file_path: &str,
625        schema: SchemaRef,
626        project_field_ids: Vec<i32>,
627        expected_micros: &[i64],
628    ) {
629        use arrow_array::TimestampMicrosecondArray;
630
631        let batches = read_int96_batches(file_path, schema, project_field_ids).await;
632
633        assert_eq!(batches.len(), 1);
634        let ts_array = batches[0]
635            .column(0)
636            .as_any()
637            .downcast_ref::<TimestampMicrosecondArray>()
638            .expect("Expected TimestampMicrosecondArray");
639
640        for (i, expected) in expected_micros.iter().enumerate() {
641            assert_eq!(
642                ts_array.value(i),
643                *expected,
644                "Row {i}: got {}, expected {expected}",
645                ts_array.value(i)
646            );
647        }
648    }
649
650    /// Test that concurrency=1 reads all files correctly and in deterministic order.
651    /// This verifies the fast-path optimization for single concurrency.
652    #[tokio::test]
653    async fn test_read_with_concurrency_one() {
654        use arrow_array::Int32Array;
655
656        let schema = Arc::new(
657            Schema::builder()
658                .with_schema_id(1)
659                .with_fields(vec![
660                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
661                    NestedField::required(2, "file_num", Type::Primitive(PrimitiveType::Int))
662                        .into(),
663                ])
664                .build()
665                .unwrap(),
666        );
667
668        let arrow_schema = Arc::new(ArrowSchema::new(vec![
669            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
670                PARQUET_FIELD_ID_META_KEY.to_string(),
671                "1".to_string(),
672            )])),
673            Field::new("file_num", DataType::Int32, false).with_metadata(HashMap::from([(
674                PARQUET_FIELD_ID_META_KEY.to_string(),
675                "2".to_string(),
676            )])),
677        ]));
678
679        let tmp_dir = TempDir::new().unwrap();
680        let table_location = tmp_dir.path().to_str().unwrap().to_string();
681        let file_io = FileIO::new_with_fs();
682
683        // Create 3 parquet files with different data
684        let props = WriterProperties::builder()
685            .set_compression(Compression::SNAPPY)
686            .build();
687
688        for file_num in 0..3 {
689            let id_data = Arc::new(Int32Array::from_iter_values(
690                file_num * 10..(file_num + 1) * 10,
691            )) as ArrayRef;
692            let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10])) as ArrayRef;
693
694            let to_write =
695                RecordBatch::try_new(arrow_schema.clone(), vec![id_data, file_num_data]).unwrap();
696
697            let file = File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap();
698            let mut writer =
699                ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
700            writer.write(&to_write).expect("Writing batch");
701            writer.close().unwrap();
702        }
703
704        // Read with concurrency=1 (fast-path)
705        let reader = ArrowReaderBuilder::new(file_io, Runtime::current())
706            .with_data_file_concurrency_limit(1)
707            .build();
708
709        // Create tasks in a specific order: file_0, file_1, file_2
710        let tasks = vec![
711            Ok(FileScanTask::builder()
712                .with_file_size_in_bytes(
713                    std::fs::metadata(format!("{table_location}/file_0.parquet"))
714                        .unwrap()
715                        .len(),
716                )
717                .with_start(0)
718                .with_length(0)
719                .with_data_file_path(format!("{table_location}/file_0.parquet"))
720                .with_data_file_format(DataFileFormat::Parquet)
721                .with_schema(schema.clone())
722                .with_project_field_ids(vec![1, 2])
723                .with_case_sensitive(false)
724                .build()),
725            Ok(FileScanTask::builder()
726                .with_file_size_in_bytes(
727                    std::fs::metadata(format!("{table_location}/file_1.parquet"))
728                        .unwrap()
729                        .len(),
730                )
731                .with_start(0)
732                .with_length(0)
733                .with_data_file_path(format!("{table_location}/file_1.parquet"))
734                .with_data_file_format(DataFileFormat::Parquet)
735                .with_schema(schema.clone())
736                .with_project_field_ids(vec![1, 2])
737                .with_case_sensitive(false)
738                .build()),
739            Ok(FileScanTask::builder()
740                .with_file_size_in_bytes(
741                    std::fs::metadata(format!("{table_location}/file_2.parquet"))
742                        .unwrap()
743                        .len(),
744                )
745                .with_start(0)
746                .with_length(0)
747                .with_data_file_path(format!("{table_location}/file_2.parquet"))
748                .with_data_file_format(DataFileFormat::Parquet)
749                .with_schema(schema.clone())
750                .with_project_field_ids(vec![1, 2])
751                .with_case_sensitive(false)
752                .build()),
753        ];
754
755        let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream;
756
757        let result = reader
758            .read(tasks_stream)
759            .unwrap()
760            .stream()
761            .try_collect::<Vec<RecordBatch>>()
762            .await
763            .unwrap();
764
765        // Verify we got all 30 rows (10 from each file)
766        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
767        assert_eq!(total_rows, 30, "Should have 30 total rows");
768
769        // Collect all ids and file_nums to verify data
770        let mut all_ids = Vec::new();
771        let mut all_file_nums = Vec::new();
772
773        for batch in &result {
774            let id_col = batch
775                .column(0)
776                .as_primitive::<arrow_array::types::Int32Type>();
777            let file_num_col = batch
778                .column(1)
779                .as_primitive::<arrow_array::types::Int32Type>();
780
781            for i in 0..batch.num_rows() {
782                all_ids.push(id_col.value(i));
783                all_file_nums.push(file_num_col.value(i));
784            }
785        }
786
787        assert_eq!(all_ids.len(), 30);
788        assert_eq!(all_file_nums.len(), 30);
789
790        // With concurrency=1 and sequential processing, files should be processed in order
791        // file_0: ids 0-9, file_num=0
792        // file_1: ids 10-19, file_num=1
793        // file_2: ids 20-29, file_num=2
794        for i in 0..10 {
795            assert_eq!(all_file_nums[i], 0, "First 10 rows should be from file_0");
796            assert_eq!(all_ids[i], i as i32, "IDs should be 0-9");
797        }
798        for i in 10..20 {
799            assert_eq!(all_file_nums[i], 1, "Next 10 rows should be from file_1");
800            assert_eq!(all_ids[i], i as i32, "IDs should be 10-19");
801        }
802        for i in 20..30 {
803            assert_eq!(all_file_nums[i], 2, "Last 10 rows should be from file_2");
804            assert_eq!(all_ids[i], i as i32, "IDs should be 20-29");
805        }
806    }
807
808    #[tokio::test]
809    async fn test_read_int96_timestamps_with_field_ids() {
810        let schema = Arc::new(
811            Schema::builder()
812                .with_schema_id(1)
813                .with_fields(vec![
814                    NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::Timestamp))
815                        .into(),
816                    NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(),
817                ])
818                .build()
819                .unwrap(),
820        );
821
822        let tmp_dir = TempDir::new().unwrap();
823        let table_location = tmp_dir.path().to_str().unwrap().to_string();
824        let (file_path, expected_micros) =
825            write_int96_parquet_file(&table_location, "with_ids.parquet", true);
826
827        assert_int96_read_matches(&file_path, schema, vec![1, 2], &expected_micros).await;
828    }
829
830    #[tokio::test]
831    async fn test_read_int96_timestamps_without_field_ids() {
832        let schema = Arc::new(
833            Schema::builder()
834                .with_schema_id(1)
835                .with_fields(vec![
836                    NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::Timestamp))
837                        .into(),
838                    NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(),
839                ])
840                .build()
841                .unwrap(),
842        );
843
844        let tmp_dir = TempDir::new().unwrap();
845        let table_location = tmp_dir.path().to_str().unwrap().to_string();
846        let (file_path, expected_micros) =
847            write_int96_parquet_file(&table_location, "no_ids.parquet", false);
848
849        assert_int96_read_matches(&file_path, schema, vec![1, 2], &expected_micros).await;
850    }
851
852    #[tokio::test]
853    async fn test_read_int96_timestamps_in_struct() {
854        use arrow_array::{StructArray, TimestampMicrosecondArray};
855        use parquet::basic::{Repetition, Type as PhysicalType};
856        use parquet::data_type::Int96Type;
857        use parquet::file::writer::SerializedFileWriter;
858        use parquet::schema::types::Type as SchemaType;
859
860        let tmp_dir = TempDir::new().unwrap();
861        let table_location = tmp_dir.path().to_str().unwrap().to_string();
862        let file_path = format!("{table_location}/struct_int96.parquet");
863
864        let ts_type = SchemaType::primitive_type_builder("ts", PhysicalType::INT96)
865            .with_repetition(Repetition::OPTIONAL)
866            .with_id(Some(2))
867            .build()
868            .unwrap();
869
870        let struct_type = SchemaType::group_type_builder("data")
871            .with_repetition(Repetition::REQUIRED)
872            .with_id(Some(1))
873            .with_fields(vec![Arc::new(ts_type)])
874            .build()
875            .unwrap();
876
877        let parquet_schema = SchemaType::group_type_builder("schema")
878            .with_fields(vec![Arc::new(struct_type)])
879            .build()
880            .unwrap();
881
882        let (int96_val, expected_micros) = make_int96_test_value();
883
884        let file = File::create(&file_path).unwrap();
885        let mut writer =
886            SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
887
888        // def=1: struct is REQUIRED so no level, ts is OPTIONAL and present (1).
889        // No repetition levels needed (no repeated groups).
890        let mut row_group = writer.next_row_group().unwrap();
891        {
892            let mut col = row_group.next_column().unwrap().unwrap();
893            col.typed::<Int96Type>()
894                .write_batch(&[int96_val], Some(&[1]), None)
895                .unwrap();
896            col.close().unwrap();
897        }
898        row_group.close().unwrap();
899        writer.close().unwrap();
900
901        let iceberg_schema = Arc::new(
902            Schema::builder()
903                .with_schema_id(1)
904                .with_fields(vec![
905                    NestedField::required(
906                        1,
907                        "data",
908                        Type::Struct(crate::spec::StructType::new(vec![
909                            NestedField::optional(
910                                2,
911                                "ts",
912                                Type::Primitive(PrimitiveType::Timestamp),
913                            )
914                            .into(),
915                        ])),
916                    )
917                    .into(),
918                ])
919                .build()
920                .unwrap(),
921        );
922
923        let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
924
925        assert_eq!(batches.len(), 1);
926        let struct_array = batches[0]
927            .column(0)
928            .as_any()
929            .downcast_ref::<StructArray>()
930            .expect("Expected StructArray");
931        let ts_array = struct_array
932            .column(0)
933            .as_any()
934            .downcast_ref::<TimestampMicrosecondArray>()
935            .expect("Expected TimestampMicrosecondArray inside struct");
936
937        assert_eq!(
938            ts_array.value(0),
939            expected_micros,
940            "INT96 in struct: got {}, expected {expected_micros}",
941            ts_array.value(0)
942        );
943    }
944
945    #[tokio::test]
946    async fn test_read_int96_timestamps_in_list() {
947        use arrow_array::{ListArray, TimestampMicrosecondArray};
948        use parquet::basic::{Repetition, Type as PhysicalType};
949        use parquet::data_type::Int96Type;
950        use parquet::file::writer::SerializedFileWriter;
951        use parquet::schema::types::Type as SchemaType;
952
953        let tmp_dir = TempDir::new().unwrap();
954        let table_location = tmp_dir.path().to_str().unwrap().to_string();
955        let file_path = format!("{table_location}/list_int96.parquet");
956
957        // 3-level LIST encoding:
958        //   optional group timestamps (LIST) {
959        //     repeated group list {
960        //       optional int96 element;
961        //     }
962        //   }
963        let element_type = SchemaType::primitive_type_builder("element", PhysicalType::INT96)
964            .with_repetition(Repetition::OPTIONAL)
965            .with_id(Some(2))
966            .build()
967            .unwrap();
968
969        let list_group = SchemaType::group_type_builder("list")
970            .with_repetition(Repetition::REPEATED)
971            .with_fields(vec![Arc::new(element_type)])
972            .build()
973            .unwrap();
974
975        let list_type = SchemaType::group_type_builder("timestamps")
976            .with_repetition(Repetition::OPTIONAL)
977            .with_id(Some(1))
978            .with_logical_type(Some(parquet::basic::LogicalType::List))
979            .with_fields(vec![Arc::new(list_group)])
980            .build()
981            .unwrap();
982
983        let parquet_schema = SchemaType::group_type_builder("schema")
984            .with_fields(vec![Arc::new(list_type)])
985            .build()
986            .unwrap();
987
988        let (int96_val, expected_micros) = make_int96_test_value();
989
990        let file = File::create(&file_path).unwrap();
991        let mut writer =
992            SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
993
994        // Write a single row with a list containing one INT96 element.
995        // def=3: list present (1) + repeated group (2) + element present (3)
996        // rep=0: start of a new list
997        let mut row_group = writer.next_row_group().unwrap();
998        {
999            let mut col = row_group.next_column().unwrap().unwrap();
1000            col.typed::<Int96Type>()
1001                .write_batch(&[int96_val], Some(&[3]), Some(&[0]))
1002                .unwrap();
1003            col.close().unwrap();
1004        }
1005        row_group.close().unwrap();
1006        writer.close().unwrap();
1007
1008        let iceberg_schema = Arc::new(
1009            Schema::builder()
1010                .with_schema_id(1)
1011                .with_fields(vec![
1012                    NestedField::optional(
1013                        1,
1014                        "timestamps",
1015                        Type::List(crate::spec::ListType {
1016                            element_field: NestedField::optional(
1017                                2,
1018                                "element",
1019                                Type::Primitive(PrimitiveType::Timestamp),
1020                            )
1021                            .into(),
1022                        }),
1023                    )
1024                    .into(),
1025                ])
1026                .build()
1027                .unwrap(),
1028        );
1029
1030        let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
1031
1032        assert_eq!(batches.len(), 1);
1033        let list_array = batches[0]
1034            .column(0)
1035            .as_any()
1036            .downcast_ref::<ListArray>()
1037            .expect("Expected ListArray");
1038        let ts_array = list_array
1039            .values()
1040            .as_any()
1041            .downcast_ref::<TimestampMicrosecondArray>()
1042            .expect("Expected TimestampMicrosecondArray inside list");
1043
1044        assert_eq!(
1045            ts_array.value(0),
1046            expected_micros,
1047            "INT96 in list: got {}, expected {expected_micros}",
1048            ts_array.value(0)
1049        );
1050    }
1051
1052    #[tokio::test]
1053    async fn test_read_int96_timestamps_in_map() {
1054        use arrow_array::{MapArray, TimestampMicrosecondArray};
1055        use parquet::basic::{Repetition, Type as PhysicalType};
1056        use parquet::data_type::{ByteArrayType, Int96Type};
1057        use parquet::file::writer::SerializedFileWriter;
1058        use parquet::schema::types::Type as SchemaType;
1059
1060        let tmp_dir = TempDir::new().unwrap();
1061        let table_location = tmp_dir.path().to_str().unwrap().to_string();
1062        let file_path = format!("{table_location}/map_int96.parquet");
1063
1064        // MAP encoding:
1065        //   optional group ts_map (MAP) {
1066        //     repeated group key_value {
1067        //       required binary key (UTF8);
1068        //       optional int96 value;
1069        //     }
1070        //   }
1071        let key_type = SchemaType::primitive_type_builder("key", PhysicalType::BYTE_ARRAY)
1072            .with_repetition(Repetition::REQUIRED)
1073            .with_logical_type(Some(parquet::basic::LogicalType::String))
1074            .with_id(Some(2))
1075            .build()
1076            .unwrap();
1077
1078        let value_type = SchemaType::primitive_type_builder("value", PhysicalType::INT96)
1079            .with_repetition(Repetition::OPTIONAL)
1080            .with_id(Some(3))
1081            .build()
1082            .unwrap();
1083
1084        let key_value_group = SchemaType::group_type_builder("key_value")
1085            .with_repetition(Repetition::REPEATED)
1086            .with_fields(vec![Arc::new(key_type), Arc::new(value_type)])
1087            .build()
1088            .unwrap();
1089
1090        let map_type = SchemaType::group_type_builder("ts_map")
1091            .with_repetition(Repetition::OPTIONAL)
1092            .with_id(Some(1))
1093            .with_logical_type(Some(parquet::basic::LogicalType::Map))
1094            .with_fields(vec![Arc::new(key_value_group)])
1095            .build()
1096            .unwrap();
1097
1098        let parquet_schema = SchemaType::group_type_builder("schema")
1099            .with_fields(vec![Arc::new(map_type)])
1100            .build()
1101            .unwrap();
1102
1103        let (int96_val, expected_micros) = make_int96_test_value();
1104
1105        let file = File::create(&file_path).unwrap();
1106        let mut writer =
1107            SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
1108
1109        // Write a single row with a map containing one key-value pair.
1110        // rep=0 for both columns: start of a new map.
1111        // key def=2: map present (1) + key_value entry present (2), key is REQUIRED.
1112        // value def=3: map present (1) + key_value entry present (2) + value present (3).
1113        let mut row_group = writer.next_row_group().unwrap();
1114        {
1115            let mut col = row_group.next_column().unwrap().unwrap();
1116            col.typed::<ByteArrayType>()
1117                .write_batch(
1118                    &[parquet::data_type::ByteArray::from("event_time")],
1119                    Some(&[2]),
1120                    Some(&[0]),
1121                )
1122                .unwrap();
1123            col.close().unwrap();
1124        }
1125        {
1126            let mut col = row_group.next_column().unwrap().unwrap();
1127            col.typed::<Int96Type>()
1128                .write_batch(&[int96_val], Some(&[3]), Some(&[0]))
1129                .unwrap();
1130            col.close().unwrap();
1131        }
1132        row_group.close().unwrap();
1133        writer.close().unwrap();
1134
1135        let iceberg_schema = Arc::new(
1136            Schema::builder()
1137                .with_schema_id(1)
1138                .with_fields(vec![
1139                    NestedField::optional(
1140                        1,
1141                        "ts_map",
1142                        Type::Map(crate::spec::MapType {
1143                            key_field: NestedField::required(
1144                                2,
1145                                "key",
1146                                Type::Primitive(PrimitiveType::String),
1147                            )
1148                            .into(),
1149                            value_field: NestedField::optional(
1150                                3,
1151                                "value",
1152                                Type::Primitive(PrimitiveType::Timestamp),
1153                            )
1154                            .into(),
1155                        }),
1156                    )
1157                    .into(),
1158                ])
1159                .build()
1160                .unwrap(),
1161        );
1162
1163        let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
1164
1165        assert_eq!(batches.len(), 1);
1166        let map_array = batches[0]
1167            .column(0)
1168            .as_any()
1169            .downcast_ref::<MapArray>()
1170            .expect("Expected MapArray");
1171        let ts_array = map_array
1172            .values()
1173            .as_any()
1174            .downcast_ref::<TimestampMicrosecondArray>()
1175            .expect("Expected TimestampMicrosecondArray as map values");
1176
1177        assert_eq!(
1178            ts_array.value(0),
1179            expected_micros,
1180            "INT96 in map: got {}, expected {expected_micros}",
1181            ts_array.value(0)
1182        );
1183    }
1184}