iceberg/arrow/reader/
pipeline.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The main `ArrowReader` pipeline: reading a stream of `FileScanTask`s,
19//! opening Parquet files and resolving schemas, then wiring projection,
20//! predicates, row-group / row selection, and delete handling into a stream
21//! of transformed Arrow `RecordBatch`es.
22
23use std::sync::Arc;
24use std::sync::atomic::AtomicU64;
25
26use futures::{StreamExt, TryStreamExt};
27use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
28use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder};
29
30use super::{
31    ArrowFileReader, ArrowReader, ParquetReadOptions, add_fallback_field_ids_to_arrow_schema,
32    apply_name_mapping_to_arrow_schema,
33};
34use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
35use crate::arrow::int96::coerce_int96_timestamps;
36use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
37use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics, ScanResult};
38use crate::error::Result;
39use crate::io::{FileIO, FileMetadata, FileRead};
40use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
41use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
42use crate::spec::Datum;
43use crate::{Error, ErrorKind};
44
45impl ArrowReader {
46    /// Take a stream of FileScanTasks and reads all the files.
47    /// Returns a [`ScanResult`] containing the record batch stream and scan metrics.
48    pub fn read(self, tasks: FileScanTaskStream) -> Result<ScanResult> {
49        let concurrency_limit_data_files = self.concurrency_limit_data_files;
50        let scan_metrics = ScanMetrics::new();
51
52        let task_reader = FileScanTaskReader {
53            batch_size: self.batch_size,
54            file_io: self.file_io,
55            delete_file_loader: self
56                .delete_file_loader
57                .with_scan_metrics(scan_metrics.clone()),
58            row_group_filtering_enabled: self.row_group_filtering_enabled,
59            row_selection_enabled: self.row_selection_enabled,
60            parquet_read_options: self.parquet_read_options,
61            scan_metrics: scan_metrics.clone(),
62        };
63
64        // Fast-path for single concurrency to avoid overhead of try_flatten_unordered
65        let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 {
66            Box::pin(
67                tasks
68                    .and_then(move |task| task_reader.clone().process(task))
69                    .map_err(|err| {
70                        Error::new(ErrorKind::Unexpected, "file scan task generate failed")
71                            .with_source(err)
72                    })
73                    .try_flatten(),
74            )
75        } else {
76            Box::pin(
77                tasks
78                    .map_ok(move |task| task_reader.clone().process(task))
79                    .map_err(|err| {
80                        Error::new(ErrorKind::Unexpected, "file scan task generate failed")
81                            .with_source(err)
82                    })
83                    .try_buffer_unordered(concurrency_limit_data_files)
84                    .try_flatten_unordered(concurrency_limit_data_files),
85            )
86        };
87
88        Ok(ScanResult::new(stream, scan_metrics))
89    }
90}
91
92/// Per-scan state for processing [`FileScanTask`]s. Created once per
93/// [`ArrowReader::read`] call and cloned per task.
94#[derive(Clone)]
95struct FileScanTaskReader {
96    batch_size: Option<usize>,
97    file_io: FileIO,
98    delete_file_loader: CachingDeleteFileLoader,
99    row_group_filtering_enabled: bool,
100    row_selection_enabled: bool,
101    parquet_read_options: ParquetReadOptions,
102    scan_metrics: ScanMetrics,
103}
104
105impl FileScanTaskReader {
106    async fn process(self, task: FileScanTask) -> Result<ArrowRecordBatchStream> {
107        let should_load_page_index =
108            (self.row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
109        let mut parquet_read_options = self.parquet_read_options;
110        parquet_read_options.preload_page_index = should_load_page_index;
111
112        let delete_filter_rx = self
113            .delete_file_loader
114            .load_deletes(&task.deletes, Arc::clone(&task.schema));
115
116        // Open the Parquet file once, loading its metadata
117        let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file(
118            &task.data_file_path,
119            &self.file_io,
120            task.file_size_in_bytes,
121            parquet_read_options,
122            self.scan_metrics.bytes_read_counter(),
123        )
124        .await?;
125
126        // Check if Parquet file has embedded field IDs
127        // Corresponds to Java's ParquetSchemaUtil.hasIds()
128        // Reference: parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java:118
129        let missing_field_ids = arrow_metadata
130            .schema()
131            .fields()
132            .iter()
133            .next()
134            .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
135
136        // Three-branch schema resolution strategy matching Java's ReadConf constructor
137        //
138        // Per Iceberg spec Column Projection rules:
139        // "Columns in Iceberg data files are selected by field id. The table schema's column
140        //  names and order may change after a data file is written, and projection must be done
141        //  using field ids."
142        // https://iceberg.apache.org/spec/#column-projection
143        //
144        // When Parquet files lack field IDs (e.g., Hive/Spark migrations via add_files),
145        // we must assign field IDs BEFORE reading data to enable correct projection.
146        //
147        // Java's ReadConf determines field ID strategy:
148        // - Branch 1: hasIds(fileSchema) → trust embedded field IDs, use pruneColumns()
149        // - Branch 2: nameMapping present → applyNameMapping(), then pruneColumns()
150        // - Branch 3: fallback → addFallbackIds(), then pruneColumnsFallback()
151        let arrow_metadata = if missing_field_ids {
152            // Parquet file lacks field IDs - must assign them before reading
153            let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
154                // Branch 2: Apply name mapping to assign correct Iceberg field IDs
155                // Per spec rule #2: "Use schema.name-mapping.default metadata to map field id
156                // to columns without field id"
157                // Corresponds to Java's ParquetSchemaUtil.applyNameMapping()
158                apply_name_mapping_to_arrow_schema(
159                    Arc::clone(arrow_metadata.schema()),
160                    name_mapping,
161                )?
162            } else {
163                // Branch 3: No name mapping - use position-based fallback IDs
164                // Corresponds to Java's ParquetSchemaUtil.addFallbackIds()
165                add_fallback_field_ids_to_arrow_schema(arrow_metadata.schema())
166            };
167
168            let options = ArrowReaderOptions::new().with_schema(arrow_schema);
169            ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
170                |e| {
171                    Error::new(
172                        ErrorKind::Unexpected,
173                        "Failed to create ArrowReaderMetadata with field ID schema",
174                    )
175                    .with_source(e)
176                },
177            )?
178        } else {
179            // Branch 1: File has embedded field IDs - trust them
180            arrow_metadata
181        };
182
183        // Coerce INT96 timestamp columns to the resolution specified by the Iceberg schema.
184        // This must happen before building the stream reader to avoid i64 overflow in arrow-rs.
185        let arrow_metadata = if let Some(coerced_schema) =
186            coerce_int96_timestamps(arrow_metadata.schema(), &task.schema)
187        {
188            let options = ArrowReaderOptions::new().with_schema(Arc::clone(&coerced_schema));
189            ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
190                |e| {
191                    Error::new(
192                        ErrorKind::Unexpected,
193                        format!(
194                            "Failed to create ArrowReaderMetadata with INT96-coerced schema: {coerced_schema}"
195                        ),
196                    )
197                    .with_source(e)
198                },
199            )?
200        } else {
201            arrow_metadata
202        };
203
204        // Build the stream reader, reusing the already-opened file reader
205        let mut record_batch_stream_builder =
206            ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_file_reader, arrow_metadata);
207
208        // Filter out metadata fields for Parquet projection (they don't exist in files)
209        let project_field_ids_without_metadata: Vec<i32> = task
210            .project_field_ids
211            .iter()
212            .filter(|&&id| !is_metadata_field(id))
213            .copied()
214            .collect();
215
216        // Create projection mask based on field IDs
217        // - If file has embedded IDs: field-ID-based projection (missing_field_ids=false)
218        // - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match)
219        // - If fallback IDs: position-based projection (missing_field_ids=true)
220        let projection_mask = ArrowReader::get_arrow_projection_mask(
221            &project_field_ids_without_metadata,
222            &task.schema,
223            record_batch_stream_builder.parquet_schema(),
224            record_batch_stream_builder.schema(),
225            missing_field_ids, // Whether to use position-based (true) or field-ID-based (false) projection
226        )?;
227
228        record_batch_stream_builder =
229            record_batch_stream_builder.with_projection(projection_mask.clone());
230
231        // RecordBatchTransformer performs any transformations required on the RecordBatches
232        // that come back from the file, such as type promotion, default column insertion,
233        // column re-ordering, partition constants, and virtual field addition (like _file)
234        let mut record_batch_transformer_builder =
235            RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
236
237        // Add the _file metadata column if it's in the projected fields
238        if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
239            let file_datum = Datum::string(task.data_file_path.clone());
240            record_batch_transformer_builder =
241                record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum);
242        }
243
244        if let (Some(partition_spec), Some(partition_data)) =
245            (task.partition_spec.clone(), task.partition.clone())
246        {
247            record_batch_transformer_builder =
248                record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
249        }
250
251        let mut record_batch_transformer = record_batch_transformer_builder.build();
252
253        if let Some(batch_size) = self.batch_size {
254            record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
255        }
256
257        let delete_filter = delete_filter_rx.await.unwrap()?;
258        let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
259
260        // In addition to the optional predicate supplied in the `FileScanTask`,
261        // we also have an optional predicate resulting from equality delete files.
262        // If both are present, we logical-AND them together to form a single filter
263        // predicate that we can pass to the `RecordBatchStreamBuilder`.
264        let final_predicate = match (&task.predicate, delete_predicate) {
265            (None, None) => None,
266            (Some(predicate), None) => Some(predicate.clone()),
267            (None, Some(ref predicate)) => Some(predicate.clone()),
268            (Some(filter_predicate), Some(delete_predicate)) => {
269                Some(filter_predicate.clone().and(delete_predicate))
270            }
271        };
272
273        // There are three possible sources for potential lists of selected RowGroup indices,
274        // and two for `RowSelection`s.
275        // Selected RowGroup index lists can come from three sources:
276        //   * When task.start and task.length specify a byte range (file splitting);
277        //   * When there are equality delete files that are applicable;
278        //   * When there is a scan predicate and row_group_filtering_enabled = true.
279        // `RowSelection`s can be created in either or both of the following cases:
280        //   * When there are positional delete files that are applicable;
281        //   * When there is a scan predicate and row_selection_enabled = true
282        // Note that row group filtering from predicates only happens when
283        // there is a scan predicate AND row_group_filtering_enabled = true,
284        // but we perform row selection filtering if there are applicable
285        // equality delete files OR (there is a scan predicate AND row_selection_enabled),
286        // since the only implemented method of applying positional deletes is
287        // by using a `RowSelection`.
288        let mut selected_row_group_indices = None;
289        let mut row_selection = None;
290
291        // Filter row groups based on byte range from task.start and task.length.
292        // If both start and length are 0, read the entire file (backwards compatibility).
293        if task.start != 0 || task.length != 0 {
294            let byte_range_filtered_row_groups = ArrowReader::filter_row_groups_by_byte_range(
295                record_batch_stream_builder.metadata(),
296                task.start,
297                task.length,
298            )?;
299            selected_row_group_indices = Some(byte_range_filtered_row_groups);
300        }
301
302        if let Some(predicate) = final_predicate {
303            let (iceberg_field_ids, field_id_map) = ArrowReader::build_field_id_set_and_map(
304                record_batch_stream_builder.parquet_schema(),
305                &predicate,
306            )?;
307
308            let row_filter = ArrowReader::get_row_filter(
309                &predicate,
310                record_batch_stream_builder.parquet_schema(),
311                &iceberg_field_ids,
312                &field_id_map,
313            )?;
314            record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
315
316            if self.row_group_filtering_enabled {
317                let predicate_filtered_row_groups = ArrowReader::get_selected_row_group_indices(
318                    &predicate,
319                    record_batch_stream_builder.metadata(),
320                    &field_id_map,
321                    &task.schema,
322                )?;
323
324                // Merge predicate-based filtering with byte range filtering (if present)
325                // by taking the intersection of both filters
326                selected_row_group_indices = match selected_row_group_indices {
327                    Some(byte_range_filtered) => {
328                        // Keep only row groups that are in both filters
329                        let intersection: Vec<usize> = byte_range_filtered
330                            .into_iter()
331                            .filter(|idx| predicate_filtered_row_groups.contains(idx))
332                            .collect();
333                        Some(intersection)
334                    }
335                    None => Some(predicate_filtered_row_groups),
336                };
337            }
338
339            if self.row_selection_enabled {
340                row_selection = Some(ArrowReader::get_row_selection_for_filter_predicate(
341                    &predicate,
342                    record_batch_stream_builder.metadata(),
343                    &selected_row_group_indices,
344                    &field_id_map,
345                    &task.schema,
346                )?);
347            }
348        }
349
350        let positional_delete_indexes = delete_filter.get_delete_vector(&task);
351
352        if let Some(positional_delete_indexes) = positional_delete_indexes {
353            let delete_row_selection = {
354                let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
355
356                ArrowReader::build_deletes_row_selection(
357                    record_batch_stream_builder.metadata().row_groups(),
358                    &selected_row_group_indices,
359                    &positional_delete_indexes,
360                )
361            }?;
362
363            // merge the row selection from the delete files with the row selection
364            // from the filter predicate, if there is one from the filter predicate
365            row_selection = match row_selection {
366                None => Some(delete_row_selection),
367                Some(filter_row_selection) => {
368                    Some(filter_row_selection.intersection(&delete_row_selection))
369                }
370            };
371        }
372
373        if let Some(row_selection) = row_selection {
374            record_batch_stream_builder =
375                record_batch_stream_builder.with_row_selection(row_selection);
376        }
377
378        if let Some(selected_row_group_indices) = selected_row_group_indices {
379            record_batch_stream_builder =
380                record_batch_stream_builder.with_row_groups(selected_row_group_indices);
381        }
382
383        // Build the batch stream and send all the RecordBatches that it generates
384        // to the requester.
385        let record_batch_stream =
386            record_batch_stream_builder
387                .build()?
388                .map(move |batch| match batch {
389                    Ok(batch) => {
390                        // Process the record batch (type promotion, column reordering, virtual fields, etc.)
391                        record_batch_transformer.process_record_batch(batch)
392                    }
393                    Err(err) => Err(err.into()),
394                });
395
396        Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
397    }
398}
399
400impl ArrowReader {
401    /// Opens a Parquet file and loads its metadata, wrapping the reader with
402    /// [`CountingFileRead`] so all I/O is accumulated into `bytes_read`.
403    pub(crate) async fn open_parquet_file(
404        data_file_path: &str,
405        file_io: &FileIO,
406        file_size_in_bytes: u64,
407        parquet_read_options: ParquetReadOptions,
408        bytes_read: &Arc<AtomicU64>,
409    ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
410        let parquet_file = file_io.new_input(data_file_path)?;
411        let counting_reader =
412            CountingFileRead::new(parquet_file.reader().await?, Arc::clone(bytes_read));
413        Self::build_parquet_reader(
414            Box::new(counting_reader),
415            file_size_in_bytes,
416            parquet_read_options,
417        )
418        .await
419    }
420
421    async fn build_parquet_reader(
422        parquet_reader: Box<dyn FileRead>,
423        file_size_in_bytes: u64,
424        parquet_read_options: ParquetReadOptions,
425    ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
426        let mut reader = ArrowFileReader::new(
427            FileMetadata {
428                size: file_size_in_bytes,
429            },
430            parquet_reader,
431        )
432        .with_parquet_read_options(parquet_read_options);
433
434        let arrow_metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
435            .await
436            .map_err(|e| {
437                Error::new(ErrorKind::Unexpected, "Failed to load Parquet metadata").with_source(e)
438            })?;
439
440        Ok((reader, arrow_metadata))
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use std::collections::HashMap;
447    use std::fs::File;
448    use std::sync::Arc;
449
450    use arrow_array::cast::AsArray;
451    use arrow_array::{Array, ArrayRef, RecordBatch};
452    use arrow_schema::{DataType, Field, Schema as ArrowSchema};
453    use futures::TryStreamExt;
454    use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
455    use parquet::basic::Compression;
456    use parquet::file::properties::WriterProperties;
457    use tempfile::TempDir;
458
459    use crate::Runtime;
460    use crate::arrow::ArrowReaderBuilder;
461    use crate::io::FileIO;
462    use crate::scan::{FileScanTask, FileScanTaskStream};
463    use crate::spec::{DataFileFormat, NestedField, PrimitiveType, Schema, SchemaRef, Type};
464
465    // INT96 encoding: [nanos_low_u32, nanos_high_u32, julian_day_u32]
466    // Julian day 2_440_588 = Unix epoch (1970-01-01)
467    const UNIX_EPOCH_JULIAN: i64 = 2_440_588;
468    const MICROS_PER_DAY: i64 = 86_400_000_000;
469    // Noon on 3333-01-01 (Julian day 2_953_529) — outside the i64 nanosecond range (~1677-2262).
470    const INT96_TEST_NANOS_WITHIN_DAY: u64 = 43_200_000_000_000;
471    const INT96_TEST_JULIAN_DAY: u32 = 2_953_529;
472
473    fn make_int96_test_value() -> (parquet::data_type::Int96, i64) {
474        let mut val = parquet::data_type::Int96::new();
475        val.set_data(
476            (INT96_TEST_NANOS_WITHIN_DAY & 0xFFFFFFFF) as u32,
477            (INT96_TEST_NANOS_WITHIN_DAY >> 32) as u32,
478            INT96_TEST_JULIAN_DAY,
479        );
480        let expected_micros = (INT96_TEST_JULIAN_DAY as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY
481            + (INT96_TEST_NANOS_WITHIN_DAY / 1_000) as i64;
482        (val, expected_micros)
483    }
484
485    async fn read_int96_batches(
486        file_path: &str,
487        schema: SchemaRef,
488        project_field_ids: Vec<i32>,
489    ) -> Vec<RecordBatch> {
490        let file_io = FileIO::new_with_fs();
491        let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build();
492
493        let file_size = std::fs::metadata(file_path).unwrap().len();
494        let task = FileScanTask {
495            file_size_in_bytes: file_size,
496            start: 0,
497            length: file_size,
498            record_count: None,
499            data_file_path: file_path.to_string(),
500            data_file_format: DataFileFormat::Parquet,
501            schema,
502            project_field_ids,
503            predicate: None,
504            deletes: vec![],
505            partition: None,
506            partition_spec: None,
507            name_mapping: None,
508            case_sensitive: false,
509        };
510
511        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
512        reader
513            .read(tasks)
514            .unwrap()
515            .stream()
516            .try_collect()
517            .await
518            .unwrap()
519    }
520
521    // ArrowWriter cannot write INT96, so we use SerializedFileWriter directly.
522    fn write_int96_parquet_file(
523        table_location: &str,
524        filename: &str,
525        with_field_ids: bool,
526    ) -> (String, Vec<i64>) {
527        use parquet::basic::{Repetition, Type as PhysicalType};
528        use parquet::data_type::{Int32Type, Int96, Int96Type};
529        use parquet::file::writer::SerializedFileWriter;
530        use parquet::schema::types::Type as SchemaType;
531
532        let file_path = format!("{table_location}/{filename}");
533
534        let mut ts_builder = SchemaType::primitive_type_builder("ts", PhysicalType::INT96)
535            .with_repetition(Repetition::OPTIONAL);
536        let mut id_builder = SchemaType::primitive_type_builder("id", PhysicalType::INT32)
537            .with_repetition(Repetition::REQUIRED);
538
539        if with_field_ids {
540            ts_builder = ts_builder.with_id(Some(1));
541            id_builder = id_builder.with_id(Some(2));
542        }
543
544        let schema = SchemaType::group_type_builder("schema")
545            .with_fields(vec![
546                Arc::new(ts_builder.build().unwrap()),
547                Arc::new(id_builder.build().unwrap()),
548            ])
549            .build()
550            .unwrap();
551
552        // Dates outside the i64 nanosecond range (~1677-2262) overflow without coercion.
553        const NOON_NANOS: u64 = INT96_TEST_NANOS_WITHIN_DAY;
554        const JULIAN_3333: u32 = INT96_TEST_JULIAN_DAY;
555        const JULIAN_2100: u32 = 2_488_070;
556
557        let test_data: Vec<(u32, u32, u32, i64)> = vec![
558            // 3333-01-01 00:00:00
559            (
560                0,
561                0,
562                JULIAN_3333,
563                (JULIAN_3333 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY,
564            ),
565            // 3333-01-01 12:00:00
566            (
567                (NOON_NANOS & 0xFFFFFFFF) as u32,
568                (NOON_NANOS >> 32) as u32,
569                JULIAN_3333,
570                (JULIAN_3333 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY
571                    + (NOON_NANOS / 1_000) as i64,
572            ),
573            // 2100-01-01 00:00:00
574            (
575                0,
576                0,
577                JULIAN_2100,
578                (JULIAN_2100 as i64 - UNIX_EPOCH_JULIAN) * MICROS_PER_DAY,
579            ),
580        ];
581
582        let int96_values: Vec<Int96> = test_data
583            .iter()
584            .map(|(lo, hi, day, _)| {
585                let mut v = Int96::new();
586                v.set_data(*lo, *hi, *day);
587                v
588            })
589            .collect();
590
591        let id_values: Vec<i32> = (0..test_data.len() as i32).collect();
592        let expected_micros: Vec<i64> = test_data.iter().map(|(_, _, _, m)| *m).collect();
593
594        let file = File::create(&file_path).unwrap();
595        let mut writer =
596            SerializedFileWriter::new(file, Arc::new(schema), Default::default()).unwrap();
597
598        let mut row_group = writer.next_row_group().unwrap();
599        {
600            // def=1: ts is OPTIONAL and present. No repetition levels (top-level columns).
601            let mut col = row_group.next_column().unwrap().unwrap();
602            col.typed::<Int96Type>()
603                .write_batch(&int96_values, Some(&vec![1; test_data.len()]), None)
604                .unwrap();
605            col.close().unwrap();
606        }
607        {
608            let mut col = row_group.next_column().unwrap().unwrap();
609            col.typed::<Int32Type>()
610                .write_batch(&id_values, None, None)
611                .unwrap();
612            col.close().unwrap();
613        }
614        row_group.close().unwrap();
615        writer.close().unwrap();
616
617        (file_path, expected_micros)
618    }
619
620    async fn assert_int96_read_matches(
621        file_path: &str,
622        schema: SchemaRef,
623        project_field_ids: Vec<i32>,
624        expected_micros: &[i64],
625    ) {
626        use arrow_array::TimestampMicrosecondArray;
627
628        let batches = read_int96_batches(file_path, schema, project_field_ids).await;
629
630        assert_eq!(batches.len(), 1);
631        let ts_array = batches[0]
632            .column(0)
633            .as_any()
634            .downcast_ref::<TimestampMicrosecondArray>()
635            .expect("Expected TimestampMicrosecondArray");
636
637        for (i, expected) in expected_micros.iter().enumerate() {
638            assert_eq!(
639                ts_array.value(i),
640                *expected,
641                "Row {i}: got {}, expected {expected}",
642                ts_array.value(i)
643            );
644        }
645    }
646
647    /// Test that concurrency=1 reads all files correctly and in deterministic order.
648    /// This verifies the fast-path optimization for single concurrency.
649    #[tokio::test]
650    async fn test_read_with_concurrency_one() {
651        use arrow_array::Int32Array;
652
653        let schema = Arc::new(
654            Schema::builder()
655                .with_schema_id(1)
656                .with_fields(vec![
657                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
658                    NestedField::required(2, "file_num", Type::Primitive(PrimitiveType::Int))
659                        .into(),
660                ])
661                .build()
662                .unwrap(),
663        );
664
665        let arrow_schema = Arc::new(ArrowSchema::new(vec![
666            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
667                PARQUET_FIELD_ID_META_KEY.to_string(),
668                "1".to_string(),
669            )])),
670            Field::new("file_num", DataType::Int32, false).with_metadata(HashMap::from([(
671                PARQUET_FIELD_ID_META_KEY.to_string(),
672                "2".to_string(),
673            )])),
674        ]));
675
676        let tmp_dir = TempDir::new().unwrap();
677        let table_location = tmp_dir.path().to_str().unwrap().to_string();
678        let file_io = FileIO::new_with_fs();
679
680        // Create 3 parquet files with different data
681        let props = WriterProperties::builder()
682            .set_compression(Compression::SNAPPY)
683            .build();
684
685        for file_num in 0..3 {
686            let id_data = Arc::new(Int32Array::from_iter_values(
687                file_num * 10..(file_num + 1) * 10,
688            )) as ArrayRef;
689            let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10])) as ArrayRef;
690
691            let to_write =
692                RecordBatch::try_new(arrow_schema.clone(), vec![id_data, file_num_data]).unwrap();
693
694            let file = File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap();
695            let mut writer =
696                ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
697            writer.write(&to_write).expect("Writing batch");
698            writer.close().unwrap();
699        }
700
701        // Read with concurrency=1 (fast-path)
702        let reader = ArrowReaderBuilder::new(file_io, Runtime::current())
703            .with_data_file_concurrency_limit(1)
704            .build();
705
706        // Create tasks in a specific order: file_0, file_1, file_2
707        let tasks = vec![
708            Ok(FileScanTask {
709                file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_0.parquet"))
710                    .unwrap()
711                    .len(),
712                start: 0,
713                length: 0,
714                record_count: None,
715                data_file_path: format!("{table_location}/file_0.parquet"),
716                data_file_format: DataFileFormat::Parquet,
717                schema: schema.clone(),
718                project_field_ids: vec![1, 2],
719                predicate: None,
720                deletes: vec![],
721                partition: None,
722                partition_spec: None,
723                name_mapping: None,
724                case_sensitive: false,
725            }),
726            Ok(FileScanTask {
727                file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_1.parquet"))
728                    .unwrap()
729                    .len(),
730                start: 0,
731                length: 0,
732                record_count: None,
733                data_file_path: format!("{table_location}/file_1.parquet"),
734                data_file_format: DataFileFormat::Parquet,
735                schema: schema.clone(),
736                project_field_ids: vec![1, 2],
737                predicate: None,
738                deletes: vec![],
739                partition: None,
740                partition_spec: None,
741                name_mapping: None,
742                case_sensitive: false,
743            }),
744            Ok(FileScanTask {
745                file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_2.parquet"))
746                    .unwrap()
747                    .len(),
748                start: 0,
749                length: 0,
750                record_count: None,
751                data_file_path: format!("{table_location}/file_2.parquet"),
752                data_file_format: DataFileFormat::Parquet,
753                schema: schema.clone(),
754                project_field_ids: vec![1, 2],
755                predicate: None,
756                deletes: vec![],
757                partition: None,
758                partition_spec: None,
759                name_mapping: None,
760                case_sensitive: false,
761            }),
762        ];
763
764        let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream;
765
766        let result = reader
767            .read(tasks_stream)
768            .unwrap()
769            .stream()
770            .try_collect::<Vec<RecordBatch>>()
771            .await
772            .unwrap();
773
774        // Verify we got all 30 rows (10 from each file)
775        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
776        assert_eq!(total_rows, 30, "Should have 30 total rows");
777
778        // Collect all ids and file_nums to verify data
779        let mut all_ids = Vec::new();
780        let mut all_file_nums = Vec::new();
781
782        for batch in &result {
783            let id_col = batch
784                .column(0)
785                .as_primitive::<arrow_array::types::Int32Type>();
786            let file_num_col = batch
787                .column(1)
788                .as_primitive::<arrow_array::types::Int32Type>();
789
790            for i in 0..batch.num_rows() {
791                all_ids.push(id_col.value(i));
792                all_file_nums.push(file_num_col.value(i));
793            }
794        }
795
796        assert_eq!(all_ids.len(), 30);
797        assert_eq!(all_file_nums.len(), 30);
798
799        // With concurrency=1 and sequential processing, files should be processed in order
800        // file_0: ids 0-9, file_num=0
801        // file_1: ids 10-19, file_num=1
802        // file_2: ids 20-29, file_num=2
803        for i in 0..10 {
804            assert_eq!(all_file_nums[i], 0, "First 10 rows should be from file_0");
805            assert_eq!(all_ids[i], i as i32, "IDs should be 0-9");
806        }
807        for i in 10..20 {
808            assert_eq!(all_file_nums[i], 1, "Next 10 rows should be from file_1");
809            assert_eq!(all_ids[i], i as i32, "IDs should be 10-19");
810        }
811        for i in 20..30 {
812            assert_eq!(all_file_nums[i], 2, "Last 10 rows should be from file_2");
813            assert_eq!(all_ids[i], i as i32, "IDs should be 20-29");
814        }
815    }
816
817    #[tokio::test]
818    async fn test_read_int96_timestamps_with_field_ids() {
819        let schema = Arc::new(
820            Schema::builder()
821                .with_schema_id(1)
822                .with_fields(vec![
823                    NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::Timestamp))
824                        .into(),
825                    NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(),
826                ])
827                .build()
828                .unwrap(),
829        );
830
831        let tmp_dir = TempDir::new().unwrap();
832        let table_location = tmp_dir.path().to_str().unwrap().to_string();
833        let (file_path, expected_micros) =
834            write_int96_parquet_file(&table_location, "with_ids.parquet", true);
835
836        assert_int96_read_matches(&file_path, schema, vec![1, 2], &expected_micros).await;
837    }
838
839    #[tokio::test]
840    async fn test_read_int96_timestamps_without_field_ids() {
841        let schema = Arc::new(
842            Schema::builder()
843                .with_schema_id(1)
844                .with_fields(vec![
845                    NestedField::optional(1, "ts", Type::Primitive(PrimitiveType::Timestamp))
846                        .into(),
847                    NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(),
848                ])
849                .build()
850                .unwrap(),
851        );
852
853        let tmp_dir = TempDir::new().unwrap();
854        let table_location = tmp_dir.path().to_str().unwrap().to_string();
855        let (file_path, expected_micros) =
856            write_int96_parquet_file(&table_location, "no_ids.parquet", false);
857
858        assert_int96_read_matches(&file_path, schema, vec![1, 2], &expected_micros).await;
859    }
860
861    #[tokio::test]
862    async fn test_read_int96_timestamps_in_struct() {
863        use arrow_array::{StructArray, TimestampMicrosecondArray};
864        use parquet::basic::{Repetition, Type as PhysicalType};
865        use parquet::data_type::Int96Type;
866        use parquet::file::writer::SerializedFileWriter;
867        use parquet::schema::types::Type as SchemaType;
868
869        let tmp_dir = TempDir::new().unwrap();
870        let table_location = tmp_dir.path().to_str().unwrap().to_string();
871        let file_path = format!("{table_location}/struct_int96.parquet");
872
873        let ts_type = SchemaType::primitive_type_builder("ts", PhysicalType::INT96)
874            .with_repetition(Repetition::OPTIONAL)
875            .with_id(Some(2))
876            .build()
877            .unwrap();
878
879        let struct_type = SchemaType::group_type_builder("data")
880            .with_repetition(Repetition::REQUIRED)
881            .with_id(Some(1))
882            .with_fields(vec![Arc::new(ts_type)])
883            .build()
884            .unwrap();
885
886        let parquet_schema = SchemaType::group_type_builder("schema")
887            .with_fields(vec![Arc::new(struct_type)])
888            .build()
889            .unwrap();
890
891        let (int96_val, expected_micros) = make_int96_test_value();
892
893        let file = File::create(&file_path).unwrap();
894        let mut writer =
895            SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
896
897        // def=1: struct is REQUIRED so no level, ts is OPTIONAL and present (1).
898        // No repetition levels needed (no repeated groups).
899        let mut row_group = writer.next_row_group().unwrap();
900        {
901            let mut col = row_group.next_column().unwrap().unwrap();
902            col.typed::<Int96Type>()
903                .write_batch(&[int96_val], Some(&[1]), None)
904                .unwrap();
905            col.close().unwrap();
906        }
907        row_group.close().unwrap();
908        writer.close().unwrap();
909
910        let iceberg_schema = Arc::new(
911            Schema::builder()
912                .with_schema_id(1)
913                .with_fields(vec![
914                    NestedField::required(
915                        1,
916                        "data",
917                        Type::Struct(crate::spec::StructType::new(vec![
918                            NestedField::optional(
919                                2,
920                                "ts",
921                                Type::Primitive(PrimitiveType::Timestamp),
922                            )
923                            .into(),
924                        ])),
925                    )
926                    .into(),
927                ])
928                .build()
929                .unwrap(),
930        );
931
932        let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
933
934        assert_eq!(batches.len(), 1);
935        let struct_array = batches[0]
936            .column(0)
937            .as_any()
938            .downcast_ref::<StructArray>()
939            .expect("Expected StructArray");
940        let ts_array = struct_array
941            .column(0)
942            .as_any()
943            .downcast_ref::<TimestampMicrosecondArray>()
944            .expect("Expected TimestampMicrosecondArray inside struct");
945
946        assert_eq!(
947            ts_array.value(0),
948            expected_micros,
949            "INT96 in struct: got {}, expected {expected_micros}",
950            ts_array.value(0)
951        );
952    }
953
954    #[tokio::test]
955    async fn test_read_int96_timestamps_in_list() {
956        use arrow_array::{ListArray, TimestampMicrosecondArray};
957        use parquet::basic::{Repetition, Type as PhysicalType};
958        use parquet::data_type::Int96Type;
959        use parquet::file::writer::SerializedFileWriter;
960        use parquet::schema::types::Type as SchemaType;
961
962        let tmp_dir = TempDir::new().unwrap();
963        let table_location = tmp_dir.path().to_str().unwrap().to_string();
964        let file_path = format!("{table_location}/list_int96.parquet");
965
966        // 3-level LIST encoding:
967        //   optional group timestamps (LIST) {
968        //     repeated group list {
969        //       optional int96 element;
970        //     }
971        //   }
972        let element_type = SchemaType::primitive_type_builder("element", PhysicalType::INT96)
973            .with_repetition(Repetition::OPTIONAL)
974            .with_id(Some(2))
975            .build()
976            .unwrap();
977
978        let list_group = SchemaType::group_type_builder("list")
979            .with_repetition(Repetition::REPEATED)
980            .with_fields(vec![Arc::new(element_type)])
981            .build()
982            .unwrap();
983
984        let list_type = SchemaType::group_type_builder("timestamps")
985            .with_repetition(Repetition::OPTIONAL)
986            .with_id(Some(1))
987            .with_logical_type(Some(parquet::basic::LogicalType::List))
988            .with_fields(vec![Arc::new(list_group)])
989            .build()
990            .unwrap();
991
992        let parquet_schema = SchemaType::group_type_builder("schema")
993            .with_fields(vec![Arc::new(list_type)])
994            .build()
995            .unwrap();
996
997        let (int96_val, expected_micros) = make_int96_test_value();
998
999        let file = File::create(&file_path).unwrap();
1000        let mut writer =
1001            SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
1002
1003        // Write a single row with a list containing one INT96 element.
1004        // def=3: list present (1) + repeated group (2) + element present (3)
1005        // rep=0: start of a new list
1006        let mut row_group = writer.next_row_group().unwrap();
1007        {
1008            let mut col = row_group.next_column().unwrap().unwrap();
1009            col.typed::<Int96Type>()
1010                .write_batch(&[int96_val], Some(&[3]), Some(&[0]))
1011                .unwrap();
1012            col.close().unwrap();
1013        }
1014        row_group.close().unwrap();
1015        writer.close().unwrap();
1016
1017        let iceberg_schema = Arc::new(
1018            Schema::builder()
1019                .with_schema_id(1)
1020                .with_fields(vec![
1021                    NestedField::optional(
1022                        1,
1023                        "timestamps",
1024                        Type::List(crate::spec::ListType {
1025                            element_field: NestedField::optional(
1026                                2,
1027                                "element",
1028                                Type::Primitive(PrimitiveType::Timestamp),
1029                            )
1030                            .into(),
1031                        }),
1032                    )
1033                    .into(),
1034                ])
1035                .build()
1036                .unwrap(),
1037        );
1038
1039        let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
1040
1041        assert_eq!(batches.len(), 1);
1042        let list_array = batches[0]
1043            .column(0)
1044            .as_any()
1045            .downcast_ref::<ListArray>()
1046            .expect("Expected ListArray");
1047        let ts_array = list_array
1048            .values()
1049            .as_any()
1050            .downcast_ref::<TimestampMicrosecondArray>()
1051            .expect("Expected TimestampMicrosecondArray inside list");
1052
1053        assert_eq!(
1054            ts_array.value(0),
1055            expected_micros,
1056            "INT96 in list: got {}, expected {expected_micros}",
1057            ts_array.value(0)
1058        );
1059    }
1060
1061    #[tokio::test]
1062    async fn test_read_int96_timestamps_in_map() {
1063        use arrow_array::{MapArray, TimestampMicrosecondArray};
1064        use parquet::basic::{Repetition, Type as PhysicalType};
1065        use parquet::data_type::{ByteArrayType, Int96Type};
1066        use parquet::file::writer::SerializedFileWriter;
1067        use parquet::schema::types::Type as SchemaType;
1068
1069        let tmp_dir = TempDir::new().unwrap();
1070        let table_location = tmp_dir.path().to_str().unwrap().to_string();
1071        let file_path = format!("{table_location}/map_int96.parquet");
1072
1073        // MAP encoding:
1074        //   optional group ts_map (MAP) {
1075        //     repeated group key_value {
1076        //       required binary key (UTF8);
1077        //       optional int96 value;
1078        //     }
1079        //   }
1080        let key_type = SchemaType::primitive_type_builder("key", PhysicalType::BYTE_ARRAY)
1081            .with_repetition(Repetition::REQUIRED)
1082            .with_logical_type(Some(parquet::basic::LogicalType::String))
1083            .with_id(Some(2))
1084            .build()
1085            .unwrap();
1086
1087        let value_type = SchemaType::primitive_type_builder("value", PhysicalType::INT96)
1088            .with_repetition(Repetition::OPTIONAL)
1089            .with_id(Some(3))
1090            .build()
1091            .unwrap();
1092
1093        let key_value_group = SchemaType::group_type_builder("key_value")
1094            .with_repetition(Repetition::REPEATED)
1095            .with_fields(vec![Arc::new(key_type), Arc::new(value_type)])
1096            .build()
1097            .unwrap();
1098
1099        let map_type = SchemaType::group_type_builder("ts_map")
1100            .with_repetition(Repetition::OPTIONAL)
1101            .with_id(Some(1))
1102            .with_logical_type(Some(parquet::basic::LogicalType::Map))
1103            .with_fields(vec![Arc::new(key_value_group)])
1104            .build()
1105            .unwrap();
1106
1107        let parquet_schema = SchemaType::group_type_builder("schema")
1108            .with_fields(vec![Arc::new(map_type)])
1109            .build()
1110            .unwrap();
1111
1112        let (int96_val, expected_micros) = make_int96_test_value();
1113
1114        let file = File::create(&file_path).unwrap();
1115        let mut writer =
1116            SerializedFileWriter::new(file, Arc::new(parquet_schema), Default::default()).unwrap();
1117
1118        // Write a single row with a map containing one key-value pair.
1119        // rep=0 for both columns: start of a new map.
1120        // key def=2: map present (1) + key_value entry present (2), key is REQUIRED.
1121        // value def=3: map present (1) + key_value entry present (2) + value present (3).
1122        let mut row_group = writer.next_row_group().unwrap();
1123        {
1124            let mut col = row_group.next_column().unwrap().unwrap();
1125            col.typed::<ByteArrayType>()
1126                .write_batch(
1127                    &[parquet::data_type::ByteArray::from("event_time")],
1128                    Some(&[2]),
1129                    Some(&[0]),
1130                )
1131                .unwrap();
1132            col.close().unwrap();
1133        }
1134        {
1135            let mut col = row_group.next_column().unwrap().unwrap();
1136            col.typed::<Int96Type>()
1137                .write_batch(&[int96_val], Some(&[3]), Some(&[0]))
1138                .unwrap();
1139            col.close().unwrap();
1140        }
1141        row_group.close().unwrap();
1142        writer.close().unwrap();
1143
1144        let iceberg_schema = Arc::new(
1145            Schema::builder()
1146                .with_schema_id(1)
1147                .with_fields(vec![
1148                    NestedField::optional(
1149                        1,
1150                        "ts_map",
1151                        Type::Map(crate::spec::MapType {
1152                            key_field: NestedField::required(
1153                                2,
1154                                "key",
1155                                Type::Primitive(PrimitiveType::String),
1156                            )
1157                            .into(),
1158                            value_field: NestedField::optional(
1159                                3,
1160                                "value",
1161                                Type::Primitive(PrimitiveType::Timestamp),
1162                            )
1163                            .into(),
1164                        }),
1165                    )
1166                    .into(),
1167                ])
1168                .build()
1169                .unwrap(),
1170        );
1171
1172        let batches = read_int96_batches(&file_path, iceberg_schema, vec![1]).await;
1173
1174        assert_eq!(batches.len(), 1);
1175        let map_array = batches[0]
1176            .column(0)
1177            .as_any()
1178            .downcast_ref::<MapArray>()
1179            .expect("Expected MapArray");
1180        let ts_array = map_array
1181            .values()
1182            .as_any()
1183            .downcast_ref::<TimestampMicrosecondArray>()
1184            .expect("Expected TimestampMicrosecondArray as map values");
1185
1186        assert_eq!(
1187            ts_array.value(0),
1188            expected_micros,
1189            "INT96 in map: got {}, expected {expected_micros}",
1190            ts_array.value(0)
1191        );
1192    }
1193}