iceberg/arrow/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Parquet file data reader
19
20use std::collections::{HashMap, HashSet};
21use std::ops::Range;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene};
26use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar};
27use arrow_cast::cast::cast;
28use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
29use arrow_schema::{
30    ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
31};
32use arrow_string::like::starts_with;
33use bytes::Bytes;
34use fnv::FnvHashSet;
35use futures::future::BoxFuture;
36use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
37use parquet::arrow::arrow_reader::{
38    ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
39};
40use parquet::arrow::async_reader::AsyncFileReader;
41use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask};
42use parquet::file::metadata::{
43    PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
44};
45use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
46use typed_builder::TypedBuilder;
47
48use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
49use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
50use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
51use crate::delete_vector::DeleteVector;
52use crate::error::Result;
53use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit};
54use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
55use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
56use crate::expr::{BoundPredicate, BoundReference};
57use crate::io::{FileIO, FileMetadata, FileRead};
58use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
59use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
60use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type};
61use crate::utils::available_parallelism;
62use crate::{Error, ErrorKind};
63
64/// Default gap between byte ranges below which they are coalesced into a
65/// single request. Matches object_store's `OBJECT_STORE_COALESCE_DEFAULT`.
66const DEFAULT_RANGE_COALESCE_BYTES: u64 = 1024 * 1024;
67
68/// Default maximum number of coalesced byte ranges fetched concurrently.
69/// Matches object_store's `OBJECT_STORE_COALESCE_PARALLEL`.
70const DEFAULT_RANGE_FETCH_CONCURRENCY: usize = 10;
71
72/// Default number of bytes to prefetch when parsing Parquet footer metadata.
73/// Matches DataFusion's default `ParquetOptions::metadata_size_hint`.
74const DEFAULT_METADATA_SIZE_HINT: usize = 512 * 1024;
75
76/// Options for tuning Parquet file I/O.
77#[derive(Clone, Copy, Debug, TypedBuilder)]
78#[builder(field_defaults(setter(prefix = "with_")))]
79pub(crate) struct ParquetReadOptions {
80    /// Number of bytes to prefetch for parsing the Parquet metadata.
81    ///
82    /// This hint can help reduce the number of fetch requests. For more details see the
83    /// [ParquetMetaDataReader documentation](https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html#method.with_prefetch_hint).
84    ///
85    /// Defaults to 512 KiB, matching DataFusion's default `ParquetOptions::metadata_size_hint`.
86    #[builder(default = Some(DEFAULT_METADATA_SIZE_HINT))]
87    pub(crate) metadata_size_hint: Option<usize>,
88    /// Gap threshold for merging nearby byte ranges into a single request.
89    /// Ranges with gaps smaller than this value will be coalesced.
90    ///
91    /// Defaults to 1 MiB, matching object_store's `OBJECT_STORE_COALESCE_DEFAULT`.
92    #[builder(default = DEFAULT_RANGE_COALESCE_BYTES)]
93    pub(crate) range_coalesce_bytes: u64,
94    /// Maximum number of merged byte ranges to fetch concurrently.
95    ///
96    /// Defaults to 10, matching object_store's `OBJECT_STORE_COALESCE_PARALLEL`.
97    #[builder(default = DEFAULT_RANGE_FETCH_CONCURRENCY)]
98    pub(crate) range_fetch_concurrency: usize,
99    /// Whether to preload the column index when reading Parquet metadata.
100    #[builder(default = true)]
101    pub(crate) preload_column_index: bool,
102    /// Whether to preload the offset index when reading Parquet metadata.
103    #[builder(default = true)]
104    pub(crate) preload_offset_index: bool,
105    /// Whether to preload the page index when reading Parquet metadata.
106    #[builder(default = false)]
107    pub(crate) preload_page_index: bool,
108}
109
110impl ParquetReadOptions {
111    pub(crate) fn metadata_size_hint(&self) -> Option<usize> {
112        self.metadata_size_hint
113    }
114
115    pub(crate) fn range_coalesce_bytes(&self) -> u64 {
116        self.range_coalesce_bytes
117    }
118
119    pub(crate) fn range_fetch_concurrency(&self) -> usize {
120        self.range_fetch_concurrency
121    }
122
123    pub(crate) fn preload_column_index(&self) -> bool {
124        self.preload_column_index
125    }
126
127    pub(crate) fn preload_offset_index(&self) -> bool {
128        self.preload_offset_index
129    }
130
131    pub(crate) fn preload_page_index(&self) -> bool {
132        self.preload_page_index
133    }
134}
135
136/// Builder to create ArrowReader
137pub struct ArrowReaderBuilder {
138    batch_size: Option<usize>,
139    file_io: FileIO,
140    concurrency_limit_data_files: usize,
141    row_group_filtering_enabled: bool,
142    row_selection_enabled: bool,
143    parquet_read_options: ParquetReadOptions,
144}
145
146impl ArrowReaderBuilder {
147    /// Create a new ArrowReaderBuilder
148    pub fn new(file_io: FileIO) -> Self {
149        let num_cpus = available_parallelism().get();
150
151        ArrowReaderBuilder {
152            batch_size: None,
153            file_io,
154            concurrency_limit_data_files: num_cpus,
155            row_group_filtering_enabled: true,
156            row_selection_enabled: false,
157            parquet_read_options: ParquetReadOptions::builder().build(),
158        }
159    }
160
161    /// Sets the max number of in flight data files that are being fetched
162    pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
163        self.concurrency_limit_data_files = val;
164        self
165    }
166
167    /// Sets the desired size of batches in the response
168    /// to something other than the default
169    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
170        self.batch_size = Some(batch_size);
171        self
172    }
173
174    /// Determines whether to enable row group filtering.
175    pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
176        self.row_group_filtering_enabled = row_group_filtering_enabled;
177        self
178    }
179
180    /// Determines whether to enable row selection.
181    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
182        self.row_selection_enabled = row_selection_enabled;
183        self
184    }
185
186    /// Provide a hint as to the number of bytes to prefetch for parsing the Parquet metadata
187    ///
188    /// This hint can help reduce the number of fetch requests. For more details see the
189    /// [ParquetMetaDataReader documentation](https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html#method.with_prefetch_hint).
190    pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
191        self.parquet_read_options.metadata_size_hint = Some(metadata_size_hint);
192        self
193    }
194
195    /// Sets the gap threshold for merging nearby byte ranges into a single request.
196    /// Ranges with gaps smaller than this value will be coalesced.
197    ///
198    /// Defaults to 1 MiB, matching object_store's OBJECT_STORE_COALESCE_DEFAULT.
199    pub fn with_range_coalesce_bytes(mut self, range_coalesce_bytes: u64) -> Self {
200        self.parquet_read_options.range_coalesce_bytes = range_coalesce_bytes;
201        self
202    }
203
204    /// Sets the maximum number of merged byte ranges to fetch concurrently.
205    ///
206    /// Defaults to 10, matching object_store's OBJECT_STORE_COALESCE_PARALLEL.
207    pub fn with_range_fetch_concurrency(mut self, range_fetch_concurrency: usize) -> Self {
208        self.parquet_read_options.range_fetch_concurrency = range_fetch_concurrency;
209        self
210    }
211
212    /// Build the ArrowReader.
213    pub fn build(self) -> ArrowReader {
214        ArrowReader {
215            batch_size: self.batch_size,
216            file_io: self.file_io.clone(),
217            delete_file_loader: CachingDeleteFileLoader::new(
218                self.file_io.clone(),
219                self.concurrency_limit_data_files,
220            ),
221            concurrency_limit_data_files: self.concurrency_limit_data_files,
222            row_group_filtering_enabled: self.row_group_filtering_enabled,
223            row_selection_enabled: self.row_selection_enabled,
224            parquet_read_options: self.parquet_read_options,
225        }
226    }
227}
228
229/// Reads data from Parquet files
230#[derive(Clone)]
231pub struct ArrowReader {
232    batch_size: Option<usize>,
233    file_io: FileIO,
234    delete_file_loader: CachingDeleteFileLoader,
235
236    /// the maximum number of data files that can be fetched at the same time
237    concurrency_limit_data_files: usize,
238
239    row_group_filtering_enabled: bool,
240    row_selection_enabled: bool,
241    parquet_read_options: ParquetReadOptions,
242}
243
244impl ArrowReader {
245    /// Take a stream of FileScanTasks and reads all the files.
246    /// Returns a stream of Arrow RecordBatches containing the data from the files
247    pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
248        let file_io = self.file_io.clone();
249        let batch_size = self.batch_size;
250        let concurrency_limit_data_files = self.concurrency_limit_data_files;
251        let row_group_filtering_enabled = self.row_group_filtering_enabled;
252        let row_selection_enabled = self.row_selection_enabled;
253        let parquet_read_options = self.parquet_read_options;
254
255        // Fast-path for single concurrency to avoid overhead of try_flatten_unordered
256        let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 {
257            Box::pin(
258                tasks
259                    .and_then(move |task| {
260                        let file_io = file_io.clone();
261
262                        Self::process_file_scan_task(
263                            task,
264                            batch_size,
265                            file_io,
266                            self.delete_file_loader.clone(),
267                            row_group_filtering_enabled,
268                            row_selection_enabled,
269                            parquet_read_options,
270                        )
271                    })
272                    .map_err(|err| {
273                        Error::new(ErrorKind::Unexpected, "file scan task generate failed")
274                            .with_source(err)
275                    })
276                    .try_flatten(),
277            )
278        } else {
279            Box::pin(
280                tasks
281                    .map_ok(move |task| {
282                        let file_io = file_io.clone();
283
284                        Self::process_file_scan_task(
285                            task,
286                            batch_size,
287                            file_io,
288                            self.delete_file_loader.clone(),
289                            row_group_filtering_enabled,
290                            row_selection_enabled,
291                            parquet_read_options,
292                        )
293                    })
294                    .map_err(|err| {
295                        Error::new(ErrorKind::Unexpected, "file scan task generate failed")
296                            .with_source(err)
297                    })
298                    .try_buffer_unordered(concurrency_limit_data_files)
299                    .try_flatten_unordered(concurrency_limit_data_files),
300            )
301        };
302
303        Ok(stream)
304    }
305
306    async fn process_file_scan_task(
307        task: FileScanTask,
308        batch_size: Option<usize>,
309        file_io: FileIO,
310        delete_file_loader: CachingDeleteFileLoader,
311        row_group_filtering_enabled: bool,
312        row_selection_enabled: bool,
313        parquet_read_options: ParquetReadOptions,
314    ) -> Result<ArrowRecordBatchStream> {
315        let should_load_page_index =
316            (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
317        let mut parquet_read_options = parquet_read_options;
318        parquet_read_options.preload_page_index = should_load_page_index;
319
320        let delete_filter_rx =
321            delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema));
322
323        // Open the Parquet file once, loading its metadata
324        let (parquet_file_reader, arrow_metadata) = Self::open_parquet_file(
325            &task.data_file_path,
326            &file_io,
327            task.file_size_in_bytes,
328            parquet_read_options,
329        )
330        .await?;
331
332        // Check if Parquet file has embedded field IDs
333        // Corresponds to Java's ParquetSchemaUtil.hasIds()
334        // Reference: parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java:118
335        let missing_field_ids = arrow_metadata
336            .schema()
337            .fields()
338            .iter()
339            .next()
340            .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
341
342        // Three-branch schema resolution strategy matching Java's ReadConf constructor
343        //
344        // Per Iceberg spec Column Projection rules:
345        // "Columns in Iceberg data files are selected by field id. The table schema's column
346        //  names and order may change after a data file is written, and projection must be done
347        //  using field ids."
348        // https://iceberg.apache.org/spec/#column-projection
349        //
350        // When Parquet files lack field IDs (e.g., Hive/Spark migrations via add_files),
351        // we must assign field IDs BEFORE reading data to enable correct projection.
352        //
353        // Java's ReadConf determines field ID strategy:
354        // - Branch 1: hasIds(fileSchema) → trust embedded field IDs, use pruneColumns()
355        // - Branch 2: nameMapping present → applyNameMapping(), then pruneColumns()
356        // - Branch 3: fallback → addFallbackIds(), then pruneColumnsFallback()
357        let arrow_metadata = if missing_field_ids {
358            // Parquet file lacks field IDs - must assign them before reading
359            let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
360                // Branch 2: Apply name mapping to assign correct Iceberg field IDs
361                // Per spec rule #2: "Use schema.name-mapping.default metadata to map field id
362                // to columns without field id"
363                // Corresponds to Java's ParquetSchemaUtil.applyNameMapping()
364                apply_name_mapping_to_arrow_schema(
365                    Arc::clone(arrow_metadata.schema()),
366                    name_mapping,
367                )?
368            } else {
369                // Branch 3: No name mapping - use position-based fallback IDs
370                // Corresponds to Java's ParquetSchemaUtil.addFallbackIds()
371                add_fallback_field_ids_to_arrow_schema(arrow_metadata.schema())
372            };
373
374            let options = ArrowReaderOptions::new().with_schema(arrow_schema);
375            ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
376                |e| {
377                    Error::new(
378                        ErrorKind::Unexpected,
379                        "Failed to create ArrowReaderMetadata with field ID schema",
380                    )
381                    .with_source(e)
382                },
383            )?
384        } else {
385            // Branch 1: File has embedded field IDs - trust them
386            arrow_metadata
387        };
388
389        // Build the stream reader, reusing the already-opened file reader
390        let mut record_batch_stream_builder =
391            ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_file_reader, arrow_metadata);
392
393        // Filter out metadata fields for Parquet projection (they don't exist in files)
394        let project_field_ids_without_metadata: Vec<i32> = task
395            .project_field_ids
396            .iter()
397            .filter(|&&id| !is_metadata_field(id))
398            .copied()
399            .collect();
400
401        // Create projection mask based on field IDs
402        // - If file has embedded IDs: field-ID-based projection (missing_field_ids=false)
403        // - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match)
404        // - If fallback IDs: position-based projection (missing_field_ids=true)
405        let projection_mask = Self::get_arrow_projection_mask(
406            &project_field_ids_without_metadata,
407            &task.schema,
408            record_batch_stream_builder.parquet_schema(),
409            record_batch_stream_builder.schema(),
410            missing_field_ids, // Whether to use position-based (true) or field-ID-based (false) projection
411        )?;
412
413        record_batch_stream_builder =
414            record_batch_stream_builder.with_projection(projection_mask.clone());
415
416        // RecordBatchTransformer performs any transformations required on the RecordBatches
417        // that come back from the file, such as type promotion, default column insertion,
418        // column re-ordering, partition constants, and virtual field addition (like _file)
419        let mut record_batch_transformer_builder =
420            RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
421
422        // Add the _file metadata column if it's in the projected fields
423        if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
424            let file_datum = Datum::string(task.data_file_path.clone());
425            record_batch_transformer_builder =
426                record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum);
427        }
428
429        if let (Some(partition_spec), Some(partition_data)) =
430            (task.partition_spec.clone(), task.partition.clone())
431        {
432            record_batch_transformer_builder =
433                record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
434        }
435
436        let mut record_batch_transformer = record_batch_transformer_builder.build();
437
438        if let Some(batch_size) = batch_size {
439            record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
440        }
441
442        let delete_filter = delete_filter_rx.await.unwrap()?;
443        let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
444
445        // In addition to the optional predicate supplied in the `FileScanTask`,
446        // we also have an optional predicate resulting from equality delete files.
447        // If both are present, we logical-AND them together to form a single filter
448        // predicate that we can pass to the `RecordBatchStreamBuilder`.
449        let final_predicate = match (&task.predicate, delete_predicate) {
450            (None, None) => None,
451            (Some(predicate), None) => Some(predicate.clone()),
452            (None, Some(ref predicate)) => Some(predicate.clone()),
453            (Some(filter_predicate), Some(delete_predicate)) => {
454                Some(filter_predicate.clone().and(delete_predicate))
455            }
456        };
457
458        // There are three possible sources for potential lists of selected RowGroup indices,
459        // and two for `RowSelection`s.
460        // Selected RowGroup index lists can come from three sources:
461        //   * When task.start and task.length specify a byte range (file splitting);
462        //   * When there are equality delete files that are applicable;
463        //   * When there is a scan predicate and row_group_filtering_enabled = true.
464        // `RowSelection`s can be created in either or both of the following cases:
465        //   * When there are positional delete files that are applicable;
466        //   * When there is a scan predicate and row_selection_enabled = true
467        // Note that row group filtering from predicates only happens when
468        // there is a scan predicate AND row_group_filtering_enabled = true,
469        // but we perform row selection filtering if there are applicable
470        // equality delete files OR (there is a scan predicate AND row_selection_enabled),
471        // since the only implemented method of applying positional deletes is
472        // by using a `RowSelection`.
473        let mut selected_row_group_indices = None;
474        let mut row_selection = None;
475
476        // Filter row groups based on byte range from task.start and task.length.
477        // If both start and length are 0, read the entire file (backwards compatibility).
478        if task.start != 0 || task.length != 0 {
479            let byte_range_filtered_row_groups = Self::filter_row_groups_by_byte_range(
480                record_batch_stream_builder.metadata(),
481                task.start,
482                task.length,
483            )?;
484            selected_row_group_indices = Some(byte_range_filtered_row_groups);
485        }
486
487        if let Some(predicate) = final_predicate {
488            let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
489                record_batch_stream_builder.parquet_schema(),
490                &predicate,
491            )?;
492
493            let row_filter = Self::get_row_filter(
494                &predicate,
495                record_batch_stream_builder.parquet_schema(),
496                &iceberg_field_ids,
497                &field_id_map,
498            )?;
499            record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
500
501            if row_group_filtering_enabled {
502                let predicate_filtered_row_groups = Self::get_selected_row_group_indices(
503                    &predicate,
504                    record_batch_stream_builder.metadata(),
505                    &field_id_map,
506                    &task.schema,
507                )?;
508
509                // Merge predicate-based filtering with byte range filtering (if present)
510                // by taking the intersection of both filters
511                selected_row_group_indices = match selected_row_group_indices {
512                    Some(byte_range_filtered) => {
513                        // Keep only row groups that are in both filters
514                        let intersection: Vec<usize> = byte_range_filtered
515                            .into_iter()
516                            .filter(|idx| predicate_filtered_row_groups.contains(idx))
517                            .collect();
518                        Some(intersection)
519                    }
520                    None => Some(predicate_filtered_row_groups),
521                };
522            }
523
524            if row_selection_enabled {
525                row_selection = Some(Self::get_row_selection_for_filter_predicate(
526                    &predicate,
527                    record_batch_stream_builder.metadata(),
528                    &selected_row_group_indices,
529                    &field_id_map,
530                    &task.schema,
531                )?);
532            }
533        }
534
535        let positional_delete_indexes = delete_filter.get_delete_vector(&task);
536
537        if let Some(positional_delete_indexes) = positional_delete_indexes {
538            let delete_row_selection = {
539                let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
540
541                Self::build_deletes_row_selection(
542                    record_batch_stream_builder.metadata().row_groups(),
543                    &selected_row_group_indices,
544                    &positional_delete_indexes,
545                )
546            }?;
547
548            // merge the row selection from the delete files with the row selection
549            // from the filter predicate, if there is one from the filter predicate
550            row_selection = match row_selection {
551                None => Some(delete_row_selection),
552                Some(filter_row_selection) => {
553                    Some(filter_row_selection.intersection(&delete_row_selection))
554                }
555            };
556        }
557
558        if let Some(row_selection) = row_selection {
559            record_batch_stream_builder =
560                record_batch_stream_builder.with_row_selection(row_selection);
561        }
562
563        if let Some(selected_row_group_indices) = selected_row_group_indices {
564            record_batch_stream_builder =
565                record_batch_stream_builder.with_row_groups(selected_row_group_indices);
566        }
567
568        // Build the batch stream and send all the RecordBatches that it generates
569        // to the requester.
570        let record_batch_stream =
571            record_batch_stream_builder
572                .build()?
573                .map(move |batch| match batch {
574                    Ok(batch) => {
575                        // Process the record batch (type promotion, column reordering, virtual fields, etc.)
576                        record_batch_transformer.process_record_batch(batch)
577                    }
578                    Err(err) => Err(err.into()),
579                });
580
581        Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
582    }
583
584    /// Opens a Parquet file and loads its metadata, returning both the reader and metadata.
585    /// The reader can be reused to build a `ParquetRecordBatchStreamBuilder` without
586    /// reopening the file.
587    pub(crate) async fn open_parquet_file(
588        data_file_path: &str,
589        file_io: &FileIO,
590        file_size_in_bytes: u64,
591        parquet_read_options: ParquetReadOptions,
592    ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
593        let parquet_file = file_io.new_input(data_file_path)?;
594        let parquet_reader = parquet_file.reader().await?;
595        let mut reader = ArrowFileReader::new(
596            FileMetadata {
597                size: file_size_in_bytes,
598            },
599            parquet_reader,
600        )
601        .with_parquet_read_options(parquet_read_options);
602
603        let arrow_metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
604            .await
605            .map_err(|e| {
606                Error::new(ErrorKind::Unexpected, "Failed to load Parquet metadata").with_source(e)
607            })?;
608
609        Ok((reader, arrow_metadata))
610    }
611
612    /// computes a `RowSelection` from positional delete indices.
613    ///
614    /// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated
615    /// as having been deleted by a positional delete, taking into account any row groups that have
616    /// been skipped entirely by the filter predicate
617    fn build_deletes_row_selection(
618        row_group_metadata_list: &[RowGroupMetaData],
619        selected_row_groups: &Option<Vec<usize>>,
620        positional_deletes: &DeleteVector,
621    ) -> Result<RowSelection> {
622        let mut results: Vec<RowSelector> = Vec::new();
623        let mut selected_row_groups_idx = 0;
624        let mut current_row_group_base_idx: u64 = 0;
625        let mut delete_vector_iter = positional_deletes.iter();
626        let mut next_deleted_row_idx_opt = delete_vector_iter.next();
627
628        for (idx, row_group_metadata) in row_group_metadata_list.iter().enumerate() {
629            let row_group_num_rows = row_group_metadata.num_rows() as u64;
630            let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;
631
632            // if row group selection is enabled,
633            if let Some(selected_row_groups) = selected_row_groups {
634                // if we've consumed all the selected row groups, we're done
635                if selected_row_groups_idx == selected_row_groups.len() {
636                    break;
637                }
638
639                if idx == selected_row_groups[selected_row_groups_idx] {
640                    // we're in a selected row group. Increment selected_row_groups_idx
641                    // so that next time around the for loop we're looking for the next
642                    // selected row group
643                    selected_row_groups_idx += 1;
644                } else {
645                    // Advance iterator past all deletes in the skipped row group.
646                    // advance_to() positions the iterator to the first delete >= next_row_group_base_idx.
647                    // However, if our cached next_deleted_row_idx_opt is in the skipped range,
648                    // we need to call next() to update the cache with the newly positioned value.
649                    delete_vector_iter.advance_to(next_row_group_base_idx);
650                    // Only update the cache if the cached value is stale (in the skipped range)
651                    if let Some(cached_idx) = next_deleted_row_idx_opt
652                        && cached_idx < next_row_group_base_idx
653                    {
654                        next_deleted_row_idx_opt = delete_vector_iter.next();
655                    }
656
657                    // still increment the current page base index but then skip to the next row group
658                    // in the file
659                    current_row_group_base_idx += row_group_num_rows;
660                    continue;
661                }
662            }
663
664            let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
665                Some(next_deleted_row_idx) => {
666                    // if the index of the next deleted row is beyond this row group, add a selection for
667                    // the remainder of this row group and skip to the next row group
668                    if next_deleted_row_idx >= next_row_group_base_idx {
669                        results.push(RowSelector::select(row_group_num_rows as usize));
670                        current_row_group_base_idx += row_group_num_rows;
671                        continue;
672                    }
673
674                    next_deleted_row_idx
675                }
676
677                // If there are no more pos deletes, add a selector for the entirety of this row group.
678                _ => {
679                    results.push(RowSelector::select(row_group_num_rows as usize));
680                    current_row_group_base_idx += row_group_num_rows;
681                    continue;
682                }
683            };
684
685            let mut current_idx = current_row_group_base_idx;
686            'chunks: while next_deleted_row_idx < next_row_group_base_idx {
687                // `select` all rows that precede the next delete index
688                if current_idx < next_deleted_row_idx {
689                    let run_length = next_deleted_row_idx - current_idx;
690                    results.push(RowSelector::select(run_length as usize));
691                    current_idx += run_length;
692                }
693
694                // `skip` all consecutive deleted rows in the current row group
695                let mut run_length = 0;
696                while next_deleted_row_idx == current_idx
697                    && next_deleted_row_idx < next_row_group_base_idx
698                {
699                    run_length += 1;
700                    current_idx += 1;
701
702                    next_deleted_row_idx_opt = delete_vector_iter.next();
703                    next_deleted_row_idx = match next_deleted_row_idx_opt {
704                        Some(next_deleted_row_idx) => next_deleted_row_idx,
705                        _ => {
706                            // We've processed the final positional delete.
707                            // Conclude the skip and then break so that we select the remaining
708                            // rows in the row group and move on to the next row group
709                            results.push(RowSelector::skip(run_length));
710                            break 'chunks;
711                        }
712                    };
713                }
714                if run_length > 0 {
715                    results.push(RowSelector::skip(run_length));
716                }
717            }
718
719            if current_idx < next_row_group_base_idx {
720                results.push(RowSelector::select(
721                    (next_row_group_base_idx - current_idx) as usize,
722                ));
723            }
724
725            current_row_group_base_idx += row_group_num_rows;
726        }
727
728        Ok(results.into())
729    }
730
731    fn build_field_id_set_and_map(
732        parquet_schema: &SchemaDescriptor,
733        predicate: &BoundPredicate,
734    ) -> Result<(HashSet<i32>, HashMap<i32, usize>)> {
735        // Collects all Iceberg field IDs referenced in the filter predicate
736        let mut collector = CollectFieldIdVisitor {
737            field_ids: HashSet::default(),
738        };
739        visit(&mut collector, predicate)?;
740
741        let iceberg_field_ids = collector.field_ids();
742
743        // Without embedded field IDs, we fall back to position-based mapping for compatibility
744        let field_id_map = match build_field_id_map(parquet_schema)? {
745            Some(map) => map,
746            None => build_fallback_field_id_map(parquet_schema),
747        };
748
749        Ok((iceberg_field_ids, field_id_map))
750    }
751
752    /// Recursively extract leaf field IDs because Parquet projection works at the leaf column level.
753    /// Nested types (struct/list/map) are flattened in Parquet's columnar format.
754    fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
755        match field.field_type.as_ref() {
756            Type::Primitive(_) => {
757                field_ids.push(field.id);
758            }
759            Type::Struct(struct_type) => {
760                for nested_field in struct_type.fields() {
761                    Self::include_leaf_field_id(nested_field, field_ids);
762                }
763            }
764            Type::List(list_type) => {
765                Self::include_leaf_field_id(&list_type.element_field, field_ids);
766            }
767            Type::Map(map_type) => {
768                Self::include_leaf_field_id(&map_type.key_field, field_ids);
769                Self::include_leaf_field_id(&map_type.value_field, field_ids);
770            }
771        }
772    }
773
774    fn get_arrow_projection_mask(
775        field_ids: &[i32],
776        iceberg_schema_of_task: &Schema,
777        parquet_schema: &SchemaDescriptor,
778        arrow_schema: &ArrowSchemaRef,
779        use_fallback: bool, // Whether file lacks embedded field IDs (e.g., migrated from Hive/Spark)
780    ) -> Result<ProjectionMask> {
781        fn type_promotion_is_valid(
782            file_type: Option<&PrimitiveType>,
783            projected_type: Option<&PrimitiveType>,
784        ) -> bool {
785            match (file_type, projected_type) {
786                (Some(lhs), Some(rhs)) if lhs == rhs => true,
787                (Some(PrimitiveType::Int), Some(PrimitiveType::Long)) => true,
788                (Some(PrimitiveType::Float), Some(PrimitiveType::Double)) => true,
789                (
790                    Some(PrimitiveType::Decimal {
791                        precision: file_precision,
792                        scale: file_scale,
793                    }),
794                    Some(PrimitiveType::Decimal {
795                        precision: requested_precision,
796                        scale: requested_scale,
797                    }),
798                ) if requested_precision >= file_precision && file_scale == requested_scale => true,
799                // Uuid will be store as Fixed(16) in parquet file, so the read back type will be Fixed(16).
800                (Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true,
801                _ => false,
802            }
803        }
804
805        if field_ids.is_empty() {
806            return Ok(ProjectionMask::all());
807        }
808
809        if use_fallback {
810            // Position-based projection necessary because file lacks embedded field IDs
811            Self::get_arrow_projection_mask_fallback(field_ids, parquet_schema)
812        } else {
813            // Field-ID-based projection using embedded field IDs from Parquet metadata
814
815            // Parquet's columnar format requires leaf-level (not top-level struct/list/map) projection
816            let mut leaf_field_ids = vec![];
817            for field_id in field_ids {
818                let field = iceberg_schema_of_task.field_by_id(*field_id);
819                if let Some(field) = field {
820                    Self::include_leaf_field_id(field, &mut leaf_field_ids);
821                }
822            }
823
824            Self::get_arrow_projection_mask_with_field_ids(
825                &leaf_field_ids,
826                iceberg_schema_of_task,
827                parquet_schema,
828                arrow_schema,
829                type_promotion_is_valid,
830            )
831        }
832    }
833
834    /// Standard projection using embedded field IDs from Parquet metadata.
835    /// For iceberg-java compatibility with ParquetSchemaUtil.pruneColumns().
836    fn get_arrow_projection_mask_with_field_ids(
837        leaf_field_ids: &[i32],
838        iceberg_schema_of_task: &Schema,
839        parquet_schema: &SchemaDescriptor,
840        arrow_schema: &ArrowSchemaRef,
841        type_promotion_is_valid: fn(Option<&PrimitiveType>, Option<&PrimitiveType>) -> bool,
842    ) -> Result<ProjectionMask> {
843        let mut column_map = HashMap::new();
844        let fields = arrow_schema.fields();
845
846        // Pre-project only the fields that have been selected, possibly avoiding converting
847        // some Arrow types that are not yet supported.
848        let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
849        let projected_arrow_schema = ArrowSchema::new_with_metadata(
850            fields.filter_leaves(|_, f| {
851                f.metadata()
852                    .get(PARQUET_FIELD_ID_META_KEY)
853                    .and_then(|field_id| i32::from_str(field_id).ok())
854                    .is_some_and(|field_id| {
855                        projected_fields.insert((*f).clone(), field_id);
856                        leaf_field_ids.contains(&field_id)
857                    })
858            }),
859            arrow_schema.metadata().clone(),
860        );
861        let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
862
863        fields.filter_leaves(|idx, field| {
864            let Some(field_id) = projected_fields.get(field).cloned() else {
865                return false;
866            };
867
868            let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
869            let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
870
871            if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
872                return false;
873            }
874
875            if !type_promotion_is_valid(
876                parquet_iceberg_field
877                    .unwrap()
878                    .field_type
879                    .as_primitive_type(),
880                iceberg_field.unwrap().field_type.as_primitive_type(),
881            ) {
882                return false;
883            }
884
885            column_map.insert(field_id, idx);
886            true
887        });
888
889        // Schema evolution: New columns may not exist in old Parquet files.
890        // We only project existing columns; RecordBatchTransformer adds default/NULL values.
891        let mut indices = vec![];
892        for field_id in leaf_field_ids {
893            if let Some(col_idx) = column_map.get(field_id) {
894                indices.push(*col_idx);
895            }
896        }
897
898        if indices.is_empty() {
899            // Edge case: All requested columns are new (don't exist in file).
900            // Project all columns so RecordBatchTransformer has a batch to transform.
901            Ok(ProjectionMask::all())
902        } else {
903            Ok(ProjectionMask::leaves(parquet_schema, indices))
904        }
905    }
906
907    /// Fallback projection for Parquet files without field IDs.
908    /// Uses position-based matching: field ID N → column position N-1.
909    /// Projects entire top-level columns (including nested content) for iceberg-java compatibility.
910    fn get_arrow_projection_mask_fallback(
911        field_ids: &[i32],
912        parquet_schema: &SchemaDescriptor,
913    ) -> Result<ProjectionMask> {
914        // Position-based: field_id N → column N-1 (field IDs are 1-indexed)
915        let parquet_root_fields = parquet_schema.root_schema().get_fields();
916        let mut root_indices = vec![];
917
918        for field_id in field_ids.iter() {
919            let parquet_pos = (*field_id - 1) as usize;
920
921            if parquet_pos < parquet_root_fields.len() {
922                root_indices.push(parquet_pos);
923            }
924            // RecordBatchTransformer adds missing columns with NULL values
925        }
926
927        if root_indices.is_empty() {
928            Ok(ProjectionMask::all())
929        } else {
930            Ok(ProjectionMask::roots(parquet_schema, root_indices))
931        }
932    }
933
934    fn get_row_filter(
935        predicates: &BoundPredicate,
936        parquet_schema: &SchemaDescriptor,
937        iceberg_field_ids: &HashSet<i32>,
938        field_id_map: &HashMap<i32, usize>,
939    ) -> Result<RowFilter> {
940        // Collect Parquet column indices from field ids.
941        // If the field id is not found in Parquet schema, it will be ignored due to schema evolution.
942        let mut column_indices = iceberg_field_ids
943            .iter()
944            .filter_map(|field_id| field_id_map.get(field_id).cloned())
945            .collect::<Vec<_>>();
946        column_indices.sort();
947
948        // The converter that converts `BoundPredicates` to `ArrowPredicates`
949        let mut converter = PredicateConverter {
950            parquet_schema,
951            column_map: field_id_map,
952            column_indices: &column_indices,
953        };
954
955        // After collecting required leaf column indices used in the predicate,
956        // creates the projection mask for the Arrow predicates.
957        let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
958        let predicate_func = visit(&mut converter, predicates)?;
959        let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
960        Ok(RowFilter::new(vec![Box::new(arrow_predicate)]))
961    }
962
963    fn get_selected_row_group_indices(
964        predicate: &BoundPredicate,
965        parquet_metadata: &Arc<ParquetMetaData>,
966        field_id_map: &HashMap<i32, usize>,
967        snapshot_schema: &Schema,
968    ) -> Result<Vec<usize>> {
969        let row_groups_metadata = parquet_metadata.row_groups();
970        let mut results = Vec::with_capacity(row_groups_metadata.len());
971
972        for (idx, row_group_metadata) in row_groups_metadata.iter().enumerate() {
973            if RowGroupMetricsEvaluator::eval(
974                predicate,
975                row_group_metadata,
976                field_id_map,
977                snapshot_schema,
978            )? {
979                results.push(idx);
980            }
981        }
982
983        Ok(results)
984    }
985
986    fn get_row_selection_for_filter_predicate(
987        predicate: &BoundPredicate,
988        parquet_metadata: &Arc<ParquetMetaData>,
989        selected_row_groups: &Option<Vec<usize>>,
990        field_id_map: &HashMap<i32, usize>,
991        snapshot_schema: &Schema,
992    ) -> Result<RowSelection> {
993        let Some(column_index) = parquet_metadata.column_index() else {
994            return Err(Error::new(
995                ErrorKind::Unexpected,
996                "Parquet file metadata does not contain a column index",
997            ));
998        };
999
1000        let Some(offset_index) = parquet_metadata.offset_index() else {
1001            return Err(Error::new(
1002                ErrorKind::Unexpected,
1003                "Parquet file metadata does not contain an offset index",
1004            ));
1005        };
1006
1007        // If all row groups were filtered out, return an empty RowSelection (select no rows)
1008        if let Some(selected_row_groups) = selected_row_groups
1009            && selected_row_groups.is_empty()
1010        {
1011            return Ok(RowSelection::from(Vec::new()));
1012        }
1013
1014        let mut selected_row_groups_idx = 0;
1015
1016        let page_index = column_index
1017            .iter()
1018            .enumerate()
1019            .zip(offset_index)
1020            .zip(parquet_metadata.row_groups());
1021
1022        let mut results = Vec::new();
1023        for (((idx, column_index), offset_index), row_group_metadata) in page_index {
1024            if let Some(selected_row_groups) = selected_row_groups {
1025                // skip row groups that aren't present in selected_row_groups
1026                if idx == selected_row_groups[selected_row_groups_idx] {
1027                    selected_row_groups_idx += 1;
1028                } else {
1029                    continue;
1030                }
1031            }
1032
1033            let selections_for_page = PageIndexEvaluator::eval(
1034                predicate,
1035                column_index,
1036                offset_index,
1037                row_group_metadata,
1038                field_id_map,
1039                snapshot_schema,
1040            )?;
1041
1042            results.push(selections_for_page);
1043
1044            if let Some(selected_row_groups) = selected_row_groups
1045                && selected_row_groups_idx == selected_row_groups.len()
1046            {
1047                break;
1048            }
1049        }
1050
1051        Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
1052    }
1053
1054    /// Filters row groups by byte range to support Iceberg's file splitting.
1055    ///
1056    /// Iceberg splits large files at row group boundaries, so we only read row groups
1057    /// whose byte ranges overlap with [start, start+length).
1058    fn filter_row_groups_by_byte_range(
1059        parquet_metadata: &Arc<ParquetMetaData>,
1060        start: u64,
1061        length: u64,
1062    ) -> Result<Vec<usize>> {
1063        let row_groups = parquet_metadata.row_groups();
1064        let mut selected = Vec::new();
1065        let end = start + length;
1066
1067        // Row groups are stored sequentially after the 4-byte magic header.
1068        let mut current_byte_offset = 4u64;
1069
1070        for (idx, row_group) in row_groups.iter().enumerate() {
1071            let row_group_size = row_group.compressed_size() as u64;
1072            let row_group_end = current_byte_offset + row_group_size;
1073
1074            if current_byte_offset < end && start < row_group_end {
1075                selected.push(idx);
1076            }
1077
1078            current_byte_offset = row_group_end;
1079        }
1080
1081        Ok(selected)
1082    }
1083}
1084
1085/// Build the map of parquet field id to Parquet column index in the schema.
1086/// Returns None if the Parquet file doesn't have field IDs embedded (e.g., migrated tables).
1087fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<Option<HashMap<i32, usize>>> {
1088    let mut column_map = HashMap::new();
1089
1090    for (idx, field) in parquet_schema.columns().iter().enumerate() {
1091        let field_type = field.self_type();
1092        match field_type {
1093            ParquetType::PrimitiveType { basic_info, .. } => {
1094                if !basic_info.has_id() {
1095                    return Ok(None);
1096                }
1097                column_map.insert(basic_info.id(), idx);
1098            }
1099            ParquetType::GroupType { .. } => {
1100                return Err(Error::new(
1101                    ErrorKind::DataInvalid,
1102                    format!(
1103                        "Leave column in schema should be primitive type but got {field_type:?}"
1104                    ),
1105                ));
1106            }
1107        };
1108    }
1109
1110    Ok(Some(column_map))
1111}
1112
1113/// Build a fallback field ID map for Parquet files without embedded field IDs.
1114/// Position-based (1, 2, 3, ...) for compatibility with iceberg-java migrations.
1115fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap<i32, usize> {
1116    let mut column_map = HashMap::new();
1117
1118    // 1-indexed to match iceberg-java's convention
1119    for (idx, _field) in parquet_schema.columns().iter().enumerate() {
1120        let field_id = (idx + 1) as i32;
1121        column_map.insert(field_id, idx);
1122    }
1123
1124    column_map
1125}
1126
1127/// Apply name mapping to Arrow schema for Parquet files lacking field IDs.
1128///
1129/// Assigns Iceberg field IDs based on column names using the name mapping,
1130/// enabling correct projection on migrated files (e.g., from Hive/Spark via add_files).
1131///
1132/// Per Iceberg spec Column Projection rule #2:
1133/// "Use schema.name-mapping.default metadata to map field id to columns without field id"
1134/// https://iceberg.apache.org/spec/#column-projection
1135///
1136/// Corresponds to Java's ParquetSchemaUtil.applyNameMapping() and ApplyNameMapping visitor.
1137/// The key difference is Java operates on Parquet MessageType, while we operate on Arrow Schema.
1138///
1139/// # Arguments
1140/// * `arrow_schema` - Arrow schema from Parquet file (without field IDs)
1141/// * `name_mapping` - Name mapping from table metadata (TableProperties.DEFAULT_NAME_MAPPING)
1142///
1143/// # Returns
1144/// Arrow schema with field IDs assigned based on name mapping
1145fn apply_name_mapping_to_arrow_schema(
1146    arrow_schema: ArrowSchemaRef,
1147    name_mapping: &NameMapping,
1148) -> Result<Arc<ArrowSchema>> {
1149    debug_assert!(
1150        arrow_schema
1151            .fields()
1152            .iter()
1153            .next()
1154            .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1155        "Schema already has field IDs - name mapping should not be applied"
1156    );
1157
1158    use arrow_schema::Field;
1159
1160    let fields_with_mapped_ids: Vec<_> = arrow_schema
1161        .fields()
1162        .iter()
1163        .map(|field| {
1164            // Look up this column name in name mapping to get the Iceberg field ID.
1165            // Corresponds to Java's ApplyNameMapping visitor which calls
1166            // nameMapping.find(currentPath()) and returns field.withId() if found.
1167            //
1168            // If the field isn't in the mapping, leave it WITHOUT assigning an ID
1169            // (matching Java's behavior of returning the field unchanged).
1170            // Later, during projection, fields without IDs are filtered out.
1171            let mapped_field_opt = name_mapping
1172                .fields()
1173                .iter()
1174                .find(|f| f.names().contains(&field.name().to_string()));
1175
1176            let mut metadata = field.metadata().clone();
1177
1178            if let Some(mapped_field) = mapped_field_opt
1179                && let Some(field_id) = mapped_field.field_id()
1180            {
1181                // Field found in mapping with a field_id → assign it
1182                metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1183            }
1184            // If field_id is None, leave the field without an ID (will be filtered by projection)
1185
1186            Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1187                .with_metadata(metadata)
1188        })
1189        .collect();
1190
1191    Ok(Arc::new(ArrowSchema::new_with_metadata(
1192        fields_with_mapped_ids,
1193        arrow_schema.metadata().clone(),
1194    )))
1195}
1196
1197/// Add position-based fallback field IDs to Arrow schema for Parquet files lacking them.
1198/// Enables projection on migrated files (e.g., from Hive/Spark).
1199///
1200/// Why at schema level (not per-batch): Efficiency - avoids repeated schema modification.
1201/// Why only top-level: Nested projection uses leaf column indices, not parent struct IDs.
1202/// Why 1-indexed: Compatibility with iceberg-java's ParquetSchemaUtil.addFallbackIds().
1203fn add_fallback_field_ids_to_arrow_schema(arrow_schema: &ArrowSchemaRef) -> Arc<ArrowSchema> {
1204    debug_assert!(
1205        arrow_schema
1206            .fields()
1207            .iter()
1208            .next()
1209            .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1210        "Schema already has field IDs"
1211    );
1212
1213    use arrow_schema::Field;
1214
1215    let fields_with_fallback_ids: Vec<_> = arrow_schema
1216        .fields()
1217        .iter()
1218        .enumerate()
1219        .map(|(pos, field)| {
1220            let mut metadata = field.metadata().clone();
1221            let field_id = (pos + 1) as i32; // 1-indexed for Java compatibility
1222            metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1223
1224            Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1225                .with_metadata(metadata)
1226        })
1227        .collect();
1228
1229    Arc::new(ArrowSchema::new_with_metadata(
1230        fields_with_fallback_ids,
1231        arrow_schema.metadata().clone(),
1232    ))
1233}
1234
1235/// A visitor to collect field ids from bound predicates.
1236struct CollectFieldIdVisitor {
1237    field_ids: HashSet<i32>,
1238}
1239
1240impl CollectFieldIdVisitor {
1241    fn field_ids(self) -> HashSet<i32> {
1242        self.field_ids
1243    }
1244}
1245
1246impl BoundPredicateVisitor for CollectFieldIdVisitor {
1247    type T = ();
1248
1249    fn always_true(&mut self) -> Result<()> {
1250        Ok(())
1251    }
1252
1253    fn always_false(&mut self) -> Result<()> {
1254        Ok(())
1255    }
1256
1257    fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1258        Ok(())
1259    }
1260
1261    fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1262        Ok(())
1263    }
1264
1265    fn not(&mut self, _inner: ()) -> Result<()> {
1266        Ok(())
1267    }
1268
1269    fn is_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1270        self.field_ids.insert(reference.field().id);
1271        Ok(())
1272    }
1273
1274    fn not_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1275        self.field_ids.insert(reference.field().id);
1276        Ok(())
1277    }
1278
1279    fn is_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1280        self.field_ids.insert(reference.field().id);
1281        Ok(())
1282    }
1283
1284    fn not_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1285        self.field_ids.insert(reference.field().id);
1286        Ok(())
1287    }
1288
1289    fn less_than(
1290        &mut self,
1291        reference: &BoundReference,
1292        _literal: &Datum,
1293        _predicate: &BoundPredicate,
1294    ) -> Result<()> {
1295        self.field_ids.insert(reference.field().id);
1296        Ok(())
1297    }
1298
1299    fn less_than_or_eq(
1300        &mut self,
1301        reference: &BoundReference,
1302        _literal: &Datum,
1303        _predicate: &BoundPredicate,
1304    ) -> Result<()> {
1305        self.field_ids.insert(reference.field().id);
1306        Ok(())
1307    }
1308
1309    fn greater_than(
1310        &mut self,
1311        reference: &BoundReference,
1312        _literal: &Datum,
1313        _predicate: &BoundPredicate,
1314    ) -> Result<()> {
1315        self.field_ids.insert(reference.field().id);
1316        Ok(())
1317    }
1318
1319    fn greater_than_or_eq(
1320        &mut self,
1321        reference: &BoundReference,
1322        _literal: &Datum,
1323        _predicate: &BoundPredicate,
1324    ) -> Result<()> {
1325        self.field_ids.insert(reference.field().id);
1326        Ok(())
1327    }
1328
1329    fn eq(
1330        &mut self,
1331        reference: &BoundReference,
1332        _literal: &Datum,
1333        _predicate: &BoundPredicate,
1334    ) -> Result<()> {
1335        self.field_ids.insert(reference.field().id);
1336        Ok(())
1337    }
1338
1339    fn not_eq(
1340        &mut self,
1341        reference: &BoundReference,
1342        _literal: &Datum,
1343        _predicate: &BoundPredicate,
1344    ) -> Result<()> {
1345        self.field_ids.insert(reference.field().id);
1346        Ok(())
1347    }
1348
1349    fn starts_with(
1350        &mut self,
1351        reference: &BoundReference,
1352        _literal: &Datum,
1353        _predicate: &BoundPredicate,
1354    ) -> Result<()> {
1355        self.field_ids.insert(reference.field().id);
1356        Ok(())
1357    }
1358
1359    fn not_starts_with(
1360        &mut self,
1361        reference: &BoundReference,
1362        _literal: &Datum,
1363        _predicate: &BoundPredicate,
1364    ) -> Result<()> {
1365        self.field_ids.insert(reference.field().id);
1366        Ok(())
1367    }
1368
1369    fn r#in(
1370        &mut self,
1371        reference: &BoundReference,
1372        _literals: &FnvHashSet<Datum>,
1373        _predicate: &BoundPredicate,
1374    ) -> Result<()> {
1375        self.field_ids.insert(reference.field().id);
1376        Ok(())
1377    }
1378
1379    fn not_in(
1380        &mut self,
1381        reference: &BoundReference,
1382        _literals: &FnvHashSet<Datum>,
1383        _predicate: &BoundPredicate,
1384    ) -> Result<()> {
1385        self.field_ids.insert(reference.field().id);
1386        Ok(())
1387    }
1388}
1389
1390/// A visitor to convert Iceberg bound predicates to Arrow predicates.
1391struct PredicateConverter<'a> {
1392    /// The Parquet schema descriptor.
1393    pub parquet_schema: &'a SchemaDescriptor,
1394    /// The map between field id and leaf column index in Parquet schema.
1395    pub column_map: &'a HashMap<i32, usize>,
1396    /// The required column indices in Parquet schema for the predicates.
1397    pub column_indices: &'a Vec<usize>,
1398}
1399
1400impl PredicateConverter<'_> {
1401    /// When visiting a bound reference, we return index of the leaf column in the
1402    /// required column indices which is used to project the column in the record batch.
1403    /// Return None if the field id is not found in the column map, which is possible
1404    /// due to schema evolution.
1405    fn bound_reference(&mut self, reference: &BoundReference) -> Result<Option<usize>> {
1406        // The leaf column's index in Parquet schema.
1407        if let Some(column_idx) = self.column_map.get(&reference.field().id) {
1408            if self.parquet_schema.get_column_root(*column_idx).is_group() {
1409                return Err(Error::new(
1410                    ErrorKind::DataInvalid,
1411                    format!(
1412                        "Leave column `{}` in predicates isn't a root column in Parquet schema.",
1413                        reference.field().name
1414                    ),
1415                ));
1416            }
1417
1418            // The leaf column's index in the required column indices.
1419            let index = self
1420                .column_indices
1421                .iter()
1422                .position(|&idx| idx == *column_idx)
1423                .ok_or(Error::new(
1424                    ErrorKind::DataInvalid,
1425                    format!(
1426                "Leave column `{}` in predicates cannot be found in the required column indices.",
1427                reference.field().name
1428            ),
1429                ))?;
1430
1431            Ok(Some(index))
1432        } else {
1433            Ok(None)
1434        }
1435    }
1436
1437    /// Build an Arrow predicate that always returns true.
1438    fn build_always_true(&self) -> Result<Box<PredicateResult>> {
1439        Ok(Box::new(|batch| {
1440            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
1441        }))
1442    }
1443
1444    /// Build an Arrow predicate that always returns false.
1445    fn build_always_false(&self) -> Result<Box<PredicateResult>> {
1446        Ok(Box::new(|batch| {
1447            Ok(BooleanArray::from(vec![false; batch.num_rows()]))
1448        }))
1449    }
1450}
1451
1452/// Gets the leaf column from the record batch for the required column index. Only
1453/// supports top-level columns for now.
1454fn project_column(
1455    batch: &RecordBatch,
1456    column_idx: usize,
1457) -> std::result::Result<ArrayRef, ArrowError> {
1458    let column = batch.column(column_idx);
1459
1460    match column.data_type() {
1461        DataType::Struct(_) => Err(ArrowError::SchemaError(
1462            "Does not support struct column yet.".to_string(),
1463        )),
1464        _ => Ok(column.clone()),
1465    }
1466}
1467
1468type PredicateResult =
1469    dyn FnMut(RecordBatch) -> std::result::Result<BooleanArray, ArrowError> + Send + 'static;
1470
1471impl BoundPredicateVisitor for PredicateConverter<'_> {
1472    type T = Box<PredicateResult>;
1473
1474    fn always_true(&mut self) -> Result<Box<PredicateResult>> {
1475        self.build_always_true()
1476    }
1477
1478    fn always_false(&mut self) -> Result<Box<PredicateResult>> {
1479        self.build_always_false()
1480    }
1481
1482    fn and(
1483        &mut self,
1484        mut lhs: Box<PredicateResult>,
1485        mut rhs: Box<PredicateResult>,
1486    ) -> Result<Box<PredicateResult>> {
1487        Ok(Box::new(move |batch| {
1488            let left = lhs(batch.clone())?;
1489            let right = rhs(batch)?;
1490            and_kleene(&left, &right)
1491        }))
1492    }
1493
1494    fn or(
1495        &mut self,
1496        mut lhs: Box<PredicateResult>,
1497        mut rhs: Box<PredicateResult>,
1498    ) -> Result<Box<PredicateResult>> {
1499        Ok(Box::new(move |batch| {
1500            let left = lhs(batch.clone())?;
1501            let right = rhs(batch)?;
1502            or_kleene(&left, &right)
1503        }))
1504    }
1505
1506    fn not(&mut self, mut inner: Box<PredicateResult>) -> Result<Box<PredicateResult>> {
1507        Ok(Box::new(move |batch| {
1508            let pred_ret = inner(batch)?;
1509            not(&pred_ret)
1510        }))
1511    }
1512
1513    fn is_null(
1514        &mut self,
1515        reference: &BoundReference,
1516        _predicate: &BoundPredicate,
1517    ) -> Result<Box<PredicateResult>> {
1518        if let Some(idx) = self.bound_reference(reference)? {
1519            Ok(Box::new(move |batch| {
1520                let column = project_column(&batch, idx)?;
1521                is_null(&column)
1522            }))
1523        } else {
1524            // A missing column, treating it as null.
1525            self.build_always_true()
1526        }
1527    }
1528
1529    fn not_null(
1530        &mut self,
1531        reference: &BoundReference,
1532        _predicate: &BoundPredicate,
1533    ) -> Result<Box<PredicateResult>> {
1534        if let Some(idx) = self.bound_reference(reference)? {
1535            Ok(Box::new(move |batch| {
1536                let column = project_column(&batch, idx)?;
1537                is_not_null(&column)
1538            }))
1539        } else {
1540            // A missing column, treating it as null.
1541            self.build_always_false()
1542        }
1543    }
1544
1545    fn is_nan(
1546        &mut self,
1547        reference: &BoundReference,
1548        _predicate: &BoundPredicate,
1549    ) -> Result<Box<PredicateResult>> {
1550        if self.bound_reference(reference)?.is_some() {
1551            self.build_always_true()
1552        } else {
1553            // A missing column, treating it as null.
1554            self.build_always_false()
1555        }
1556    }
1557
1558    fn not_nan(
1559        &mut self,
1560        reference: &BoundReference,
1561        _predicate: &BoundPredicate,
1562    ) -> Result<Box<PredicateResult>> {
1563        if self.bound_reference(reference)?.is_some() {
1564            self.build_always_false()
1565        } else {
1566            // A missing column, treating it as null.
1567            self.build_always_true()
1568        }
1569    }
1570
1571    fn less_than(
1572        &mut self,
1573        reference: &BoundReference,
1574        literal: &Datum,
1575        _predicate: &BoundPredicate,
1576    ) -> Result<Box<PredicateResult>> {
1577        if let Some(idx) = self.bound_reference(reference)? {
1578            let literal = get_arrow_datum(literal)?;
1579
1580            Ok(Box::new(move |batch| {
1581                let left = project_column(&batch, idx)?;
1582                let literal = try_cast_literal(&literal, left.data_type())?;
1583                lt(&left, literal.as_ref())
1584            }))
1585        } else {
1586            // A missing column, treating it as null.
1587            self.build_always_true()
1588        }
1589    }
1590
1591    fn less_than_or_eq(
1592        &mut self,
1593        reference: &BoundReference,
1594        literal: &Datum,
1595        _predicate: &BoundPredicate,
1596    ) -> Result<Box<PredicateResult>> {
1597        if let Some(idx) = self.bound_reference(reference)? {
1598            let literal = get_arrow_datum(literal)?;
1599
1600            Ok(Box::new(move |batch| {
1601                let left = project_column(&batch, idx)?;
1602                let literal = try_cast_literal(&literal, left.data_type())?;
1603                lt_eq(&left, literal.as_ref())
1604            }))
1605        } else {
1606            // A missing column, treating it as null.
1607            self.build_always_true()
1608        }
1609    }
1610
1611    fn greater_than(
1612        &mut self,
1613        reference: &BoundReference,
1614        literal: &Datum,
1615        _predicate: &BoundPredicate,
1616    ) -> Result<Box<PredicateResult>> {
1617        if let Some(idx) = self.bound_reference(reference)? {
1618            let literal = get_arrow_datum(literal)?;
1619
1620            Ok(Box::new(move |batch| {
1621                let left = project_column(&batch, idx)?;
1622                let literal = try_cast_literal(&literal, left.data_type())?;
1623                gt(&left, literal.as_ref())
1624            }))
1625        } else {
1626            // A missing column, treating it as null.
1627            self.build_always_false()
1628        }
1629    }
1630
1631    fn greater_than_or_eq(
1632        &mut self,
1633        reference: &BoundReference,
1634        literal: &Datum,
1635        _predicate: &BoundPredicate,
1636    ) -> Result<Box<PredicateResult>> {
1637        if let Some(idx) = self.bound_reference(reference)? {
1638            let literal = get_arrow_datum(literal)?;
1639
1640            Ok(Box::new(move |batch| {
1641                let left = project_column(&batch, idx)?;
1642                let literal = try_cast_literal(&literal, left.data_type())?;
1643                gt_eq(&left, literal.as_ref())
1644            }))
1645        } else {
1646            // A missing column, treating it as null.
1647            self.build_always_false()
1648        }
1649    }
1650
1651    fn eq(
1652        &mut self,
1653        reference: &BoundReference,
1654        literal: &Datum,
1655        _predicate: &BoundPredicate,
1656    ) -> Result<Box<PredicateResult>> {
1657        if let Some(idx) = self.bound_reference(reference)? {
1658            let literal = get_arrow_datum(literal)?;
1659
1660            Ok(Box::new(move |batch| {
1661                let left = project_column(&batch, idx)?;
1662                let literal = try_cast_literal(&literal, left.data_type())?;
1663                eq(&left, literal.as_ref())
1664            }))
1665        } else {
1666            // A missing column, treating it as null.
1667            self.build_always_false()
1668        }
1669    }
1670
1671    fn not_eq(
1672        &mut self,
1673        reference: &BoundReference,
1674        literal: &Datum,
1675        _predicate: &BoundPredicate,
1676    ) -> Result<Box<PredicateResult>> {
1677        if let Some(idx) = self.bound_reference(reference)? {
1678            let literal = get_arrow_datum(literal)?;
1679
1680            Ok(Box::new(move |batch| {
1681                let left = project_column(&batch, idx)?;
1682                let literal = try_cast_literal(&literal, left.data_type())?;
1683                neq(&left, literal.as_ref())
1684            }))
1685        } else {
1686            // A missing column, treating it as null.
1687            self.build_always_false()
1688        }
1689    }
1690
1691    fn starts_with(
1692        &mut self,
1693        reference: &BoundReference,
1694        literal: &Datum,
1695        _predicate: &BoundPredicate,
1696    ) -> Result<Box<PredicateResult>> {
1697        if let Some(idx) = self.bound_reference(reference)? {
1698            let literal = get_arrow_datum(literal)?;
1699
1700            Ok(Box::new(move |batch| {
1701                let left = project_column(&batch, idx)?;
1702                let literal = try_cast_literal(&literal, left.data_type())?;
1703                starts_with(&left, literal.as_ref())
1704            }))
1705        } else {
1706            // A missing column, treating it as null.
1707            self.build_always_false()
1708        }
1709    }
1710
1711    fn not_starts_with(
1712        &mut self,
1713        reference: &BoundReference,
1714        literal: &Datum,
1715        _predicate: &BoundPredicate,
1716    ) -> Result<Box<PredicateResult>> {
1717        if let Some(idx) = self.bound_reference(reference)? {
1718            let literal = get_arrow_datum(literal)?;
1719
1720            Ok(Box::new(move |batch| {
1721                let left = project_column(&batch, idx)?;
1722                let literal = try_cast_literal(&literal, left.data_type())?;
1723                // update here if arrow ever adds a native not_starts_with
1724                not(&starts_with(&left, literal.as_ref())?)
1725            }))
1726        } else {
1727            // A missing column, treating it as null.
1728            self.build_always_true()
1729        }
1730    }
1731
1732    fn r#in(
1733        &mut self,
1734        reference: &BoundReference,
1735        literals: &FnvHashSet<Datum>,
1736        _predicate: &BoundPredicate,
1737    ) -> Result<Box<PredicateResult>> {
1738        if let Some(idx) = self.bound_reference(reference)? {
1739            let literals: Vec<_> = literals
1740                .iter()
1741                .map(|lit| get_arrow_datum(lit).unwrap())
1742                .collect();
1743
1744            Ok(Box::new(move |batch| {
1745                // update this if arrow ever adds a native is_in kernel
1746                let left = project_column(&batch, idx)?;
1747
1748                let mut acc = BooleanArray::from(vec![false; batch.num_rows()]);
1749                for literal in &literals {
1750                    let literal = try_cast_literal(literal, left.data_type())?;
1751                    acc = or(&acc, &eq(&left, literal.as_ref())?)?
1752                }
1753
1754                Ok(acc)
1755            }))
1756        } else {
1757            // A missing column, treating it as null.
1758            self.build_always_false()
1759        }
1760    }
1761
1762    fn not_in(
1763        &mut self,
1764        reference: &BoundReference,
1765        literals: &FnvHashSet<Datum>,
1766        _predicate: &BoundPredicate,
1767    ) -> Result<Box<PredicateResult>> {
1768        if let Some(idx) = self.bound_reference(reference)? {
1769            let literals: Vec<_> = literals
1770                .iter()
1771                .map(|lit| get_arrow_datum(lit).unwrap())
1772                .collect();
1773
1774            Ok(Box::new(move |batch| {
1775                // update this if arrow ever adds a native not_in kernel
1776                let left = project_column(&batch, idx)?;
1777                let mut acc = BooleanArray::from(vec![true; batch.num_rows()]);
1778                for literal in &literals {
1779                    let literal = try_cast_literal(literal, left.data_type())?;
1780                    acc = and(&acc, &neq(&left, literal.as_ref())?)?
1781                }
1782
1783                Ok(acc)
1784            }))
1785        } else {
1786            // A missing column, treating it as null.
1787            self.build_always_true()
1788        }
1789    }
1790}
1791
1792/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader.
1793pub struct ArrowFileReader {
1794    meta: FileMetadata,
1795    parquet_read_options: ParquetReadOptions,
1796    r: Box<dyn FileRead>,
1797}
1798
1799impl ArrowFileReader {
1800    /// Create a new ArrowFileReader
1801    pub fn new(meta: FileMetadata, r: Box<dyn FileRead>) -> Self {
1802        Self {
1803            meta,
1804            parquet_read_options: ParquetReadOptions::builder().build(),
1805            r,
1806        }
1807    }
1808
1809    /// Configure all Parquet read options.
1810    pub(crate) fn with_parquet_read_options(mut self, options: ParquetReadOptions) -> Self {
1811        self.parquet_read_options = options;
1812        self
1813    }
1814}
1815
1816impl AsyncFileReader for ArrowFileReader {
1817    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
1818        Box::pin(
1819            self.r
1820                .read(range.start..range.end)
1821                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
1822        )
1823    }
1824
1825    /// Override the default `get_byte_ranges` which calls `get_bytes` sequentially.
1826    /// The parquet reader calls this to fetch column chunks for a row group, so
1827    /// without this override each column chunk is a serial round-trip to object storage.
1828    /// Adapted from object_store's `coalesce_ranges` in `util.rs`.
1829    fn get_byte_ranges(
1830        &mut self,
1831        ranges: Vec<Range<u64>>,
1832    ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
1833        let coalesce_bytes = self.parquet_read_options.range_coalesce_bytes();
1834        let concurrency = self.parquet_read_options.range_fetch_concurrency().max(1);
1835
1836        async move {
1837            // Merge nearby ranges to reduce the number of object store requests.
1838            let fetch_ranges = merge_ranges(&ranges, coalesce_bytes);
1839            let r = &self.r;
1840
1841            // Fetch merged ranges concurrently.
1842            let fetched: Vec<Bytes> = futures::stream::iter(fetch_ranges.iter().cloned())
1843                .map(|range| async move {
1844                    r.read(range)
1845                        .await
1846                        .map_err(|e| parquet::errors::ParquetError::External(Box::new(e)))
1847                })
1848                .buffered(concurrency)
1849                .try_collect()
1850                .await?;
1851
1852            // Slice the fetched data back into the originally requested ranges.
1853            Ok(ranges
1854                .iter()
1855                .map(|range| {
1856                    let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1;
1857                    let fetch_range = &fetch_ranges[idx];
1858                    let fetch_bytes = &fetched[idx];
1859                    let start = (range.start - fetch_range.start) as usize;
1860                    let end = (range.end - fetch_range.start) as usize;
1861                    fetch_bytes.slice(start..end.min(fetch_bytes.len()))
1862                })
1863                .collect())
1864        }
1865        .boxed()
1866    }
1867
1868    // TODO: currently we don't respect `ArrowReaderOptions` cause it don't expose any method to access the option field
1869    // we will fix it after `v55.1.0` is released in https://github.com/apache/arrow-rs/issues/7393
1870    fn get_metadata(
1871        &mut self,
1872        _options: Option<&'_ ArrowReaderOptions>,
1873    ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
1874        async move {
1875            let reader = ParquetMetaDataReader::new()
1876                .with_prefetch_hint(self.parquet_read_options.metadata_size_hint())
1877                // Set the page policy first because it updates both column and offset policies.
1878                .with_page_index_policy(PageIndexPolicy::from(
1879                    self.parquet_read_options.preload_page_index(),
1880                ))
1881                .with_column_index_policy(PageIndexPolicy::from(
1882                    self.parquet_read_options.preload_column_index(),
1883                ))
1884                .with_offset_index_policy(PageIndexPolicy::from(
1885                    self.parquet_read_options.preload_offset_index(),
1886                ));
1887            let size = self.meta.size;
1888            let meta = reader.load_and_finish(self, size).await?;
1889
1890            Ok(Arc::new(meta))
1891        }
1892        .boxed()
1893    }
1894}
1895
1896/// Merge overlapping or nearby byte ranges, combining ranges with gaps <= `coalesce` bytes.
1897/// Adapted from object_store's `merge_ranges` in `util.rs`.
1898fn merge_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
1899    if ranges.is_empty() {
1900        return vec![];
1901    }
1902
1903    let mut ranges = ranges.to_vec();
1904    ranges.sort_unstable_by_key(|r| r.start);
1905
1906    let mut merged = Vec::with_capacity(ranges.len());
1907    let mut start_idx = 0;
1908    let mut end_idx = 1;
1909
1910    while start_idx != ranges.len() {
1911        let mut range_end = ranges[start_idx].end;
1912
1913        while end_idx != ranges.len()
1914            && ranges[end_idx]
1915                .start
1916                .checked_sub(range_end)
1917                .map(|delta| delta <= coalesce)
1918                .unwrap_or(true)
1919        {
1920            range_end = range_end.max(ranges[end_idx].end);
1921            end_idx += 1;
1922        }
1923
1924        merged.push(ranges[start_idx].start..range_end);
1925        start_idx = end_idx;
1926        end_idx += 1;
1927    }
1928
1929    merged
1930}
1931
1932/// The Arrow type of an array that the Parquet reader reads may not match the exact Arrow type
1933/// that Iceberg uses for literals - but they are effectively the same logical type,
1934/// i.e. LargeUtf8 and Utf8 or Utf8View and Utf8 or Utf8View and LargeUtf8.
1935///
1936/// The Arrow compute kernels that we use must match the type exactly, so first cast the literal
1937/// into the type of the batch we read from Parquet before sending it to the compute kernel.
1938fn try_cast_literal(
1939    literal: &Arc<dyn ArrowDatum + Send + Sync>,
1940    column_type: &DataType,
1941) -> std::result::Result<Arc<dyn ArrowDatum + Send + Sync>, ArrowError> {
1942    let literal_array = literal.get().0;
1943
1944    // No cast required
1945    if literal_array.data_type() == column_type {
1946        return Ok(Arc::clone(literal));
1947    }
1948
1949    let literal_array = cast(literal_array, column_type)?;
1950    Ok(Arc::new(Scalar::new(literal_array)))
1951}
1952
1953#[cfg(test)]
1954mod tests {
1955    use std::collections::{HashMap, HashSet};
1956    use std::fs::File;
1957    use std::ops::Range;
1958    use std::sync::Arc;
1959
1960    use arrow_array::cast::AsArray;
1961    use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray};
1962    use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1963    use futures::TryStreamExt;
1964    use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
1965    use parquet::arrow::{ArrowWriter, ProjectionMask};
1966    use parquet::basic::Compression;
1967    use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1968    use parquet::file::properties::WriterProperties;
1969    use parquet::schema::parser::parse_message_type;
1970    use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
1971    use roaring::RoaringTreemap;
1972    use tempfile::TempDir;
1973
1974    use crate::ErrorKind;
1975    use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
1976    use crate::arrow::{ArrowReader, ArrowReaderBuilder};
1977    use crate::delete_vector::DeleteVector;
1978    use crate::expr::visitors::bound_predicate_visitor::visit;
1979    use crate::expr::{Bind, Predicate, Reference};
1980    use crate::io::FileIO;
1981    use crate::scan::{FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream};
1982    use crate::spec::{
1983        DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type,
1984    };
1985
1986    fn table_schema_simple() -> SchemaRef {
1987        Arc::new(
1988            Schema::builder()
1989                .with_schema_id(1)
1990                .with_identifier_field_ids(vec![2])
1991                .with_fields(vec![
1992                    NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1993                    NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1994                    NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1995                    NestedField::optional(4, "qux", Type::Primitive(PrimitiveType::Float)).into(),
1996                ])
1997                .build()
1998                .unwrap(),
1999        )
2000    }
2001
2002    #[test]
2003    fn test_collect_field_id() {
2004        let schema = table_schema_simple();
2005        let expr = Reference::new("qux").is_null();
2006        let bound_expr = expr.bind(schema, true).unwrap();
2007
2008        let mut visitor = CollectFieldIdVisitor {
2009            field_ids: HashSet::default(),
2010        };
2011        visit(&mut visitor, &bound_expr).unwrap();
2012
2013        let mut expected = HashSet::default();
2014        expected.insert(4_i32);
2015
2016        assert_eq!(visitor.field_ids, expected);
2017    }
2018
2019    #[test]
2020    fn test_collect_field_id_with_and() {
2021        let schema = table_schema_simple();
2022        let expr = Reference::new("qux")
2023            .is_null()
2024            .and(Reference::new("baz").is_null());
2025        let bound_expr = expr.bind(schema, true).unwrap();
2026
2027        let mut visitor = CollectFieldIdVisitor {
2028            field_ids: HashSet::default(),
2029        };
2030        visit(&mut visitor, &bound_expr).unwrap();
2031
2032        let mut expected = HashSet::default();
2033        expected.insert(4_i32);
2034        expected.insert(3);
2035
2036        assert_eq!(visitor.field_ids, expected);
2037    }
2038
2039    #[test]
2040    fn test_collect_field_id_with_or() {
2041        let schema = table_schema_simple();
2042        let expr = Reference::new("qux")
2043            .is_null()
2044            .or(Reference::new("baz").is_null());
2045        let bound_expr = expr.bind(schema, true).unwrap();
2046
2047        let mut visitor = CollectFieldIdVisitor {
2048            field_ids: HashSet::default(),
2049        };
2050        visit(&mut visitor, &bound_expr).unwrap();
2051
2052        let mut expected = HashSet::default();
2053        expected.insert(4_i32);
2054        expected.insert(3);
2055
2056        assert_eq!(visitor.field_ids, expected);
2057    }
2058
2059    #[test]
2060    fn test_arrow_projection_mask() {
2061        let schema = Arc::new(
2062            Schema::builder()
2063                .with_schema_id(1)
2064                .with_identifier_field_ids(vec![1])
2065                .with_fields(vec![
2066                    NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
2067                    NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
2068                    NestedField::optional(
2069                        3,
2070                        "c3",
2071                        Type::Primitive(PrimitiveType::Decimal {
2072                            precision: 38,
2073                            scale: 3,
2074                        }),
2075                    )
2076                    .into(),
2077                ])
2078                .build()
2079                .unwrap(),
2080        );
2081        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2082            Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
2083                PARQUET_FIELD_ID_META_KEY.to_string(),
2084                "1".to_string(),
2085            )])),
2086            // Type not supported
2087            Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
2088                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
2089            ),
2090            // Precision is beyond the supported range
2091            Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
2092                PARQUET_FIELD_ID_META_KEY.to_string(),
2093                "3".to_string(),
2094            )])),
2095        ]));
2096
2097        let message_type = "
2098message schema {
2099  required binary c1 (STRING) = 1;
2100  optional int32 c2 (INTEGER(8,true)) = 2;
2101  optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
2102}
2103    ";
2104        let parquet_type = parse_message_type(message_type).expect("should parse schema");
2105        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
2106
2107        // Try projecting the fields c2 and c3 with the unsupported data types
2108        let err = ArrowReader::get_arrow_projection_mask(
2109            &[1, 2, 3],
2110            &schema,
2111            &parquet_schema,
2112            &arrow_schema,
2113            false,
2114        )
2115        .unwrap_err();
2116
2117        assert_eq!(err.kind(), ErrorKind::DataInvalid);
2118        assert_eq!(
2119            err.to_string(),
2120            "DataInvalid => Unsupported Arrow data type: Duration(µs)".to_string()
2121        );
2122
2123        // Omitting field c2, we still get an error due to c3 being selected
2124        let err = ArrowReader::get_arrow_projection_mask(
2125            &[1, 3],
2126            &schema,
2127            &parquet_schema,
2128            &arrow_schema,
2129            false,
2130        )
2131        .unwrap_err();
2132
2133        assert_eq!(err.kind(), ErrorKind::DataInvalid);
2134        assert_eq!(
2135            err.to_string(),
2136            "DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
2137        );
2138
2139        // Finally avoid selecting fields with unsupported data types
2140        let mask = ArrowReader::get_arrow_projection_mask(
2141            &[1],
2142            &schema,
2143            &parquet_schema,
2144            &arrow_schema,
2145            false,
2146        )
2147        .expect("Some ProjectionMask");
2148        assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
2149    }
2150
2151    #[tokio::test]
2152    async fn test_kleene_logic_or_behaviour() {
2153        // a IS NULL OR a = 'foo'
2154        let predicate = Reference::new("a")
2155            .is_null()
2156            .or(Reference::new("a").equal_to(Datum::string("foo")));
2157
2158        // Table data: [NULL, "foo", "bar"]
2159        let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
2160
2161        // Expected: [NULL, "foo"].
2162        let expected = vec![None, Some("foo".to_string())];
2163
2164        let (file_io, schema, table_location, _temp_dir) =
2165            setup_kleene_logic(data_for_col_a, DataType::Utf8);
2166        let reader = ArrowReaderBuilder::new(file_io).build();
2167
2168        let result_data = test_perform_read(predicate, schema, table_location, reader).await;
2169
2170        assert_eq!(result_data, expected);
2171    }
2172
2173    #[tokio::test]
2174    async fn test_kleene_logic_and_behaviour() {
2175        // a IS NOT NULL AND a != 'foo'
2176        let predicate = Reference::new("a")
2177            .is_not_null()
2178            .and(Reference::new("a").not_equal_to(Datum::string("foo")));
2179
2180        // Table data: [NULL, "foo", "bar"]
2181        let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
2182
2183        // Expected: ["bar"].
2184        let expected = vec![Some("bar".to_string())];
2185
2186        let (file_io, schema, table_location, _temp_dir) =
2187            setup_kleene_logic(data_for_col_a, DataType::Utf8);
2188        let reader = ArrowReaderBuilder::new(file_io).build();
2189
2190        let result_data = test_perform_read(predicate, schema, table_location, reader).await;
2191
2192        assert_eq!(result_data, expected);
2193    }
2194
2195    #[tokio::test]
2196    async fn test_predicate_cast_literal() {
2197        let predicates = vec![
2198            // a == 'foo'
2199            (Reference::new("a").equal_to(Datum::string("foo")), vec![
2200                Some("foo".to_string()),
2201            ]),
2202            // a != 'foo'
2203            (
2204                Reference::new("a").not_equal_to(Datum::string("foo")),
2205                vec![Some("bar".to_string())],
2206            ),
2207            // STARTS_WITH(a, 'foo')
2208            (Reference::new("a").starts_with(Datum::string("f")), vec![
2209                Some("foo".to_string()),
2210            ]),
2211            // NOT STARTS_WITH(a, 'foo')
2212            (
2213                Reference::new("a").not_starts_with(Datum::string("f")),
2214                vec![Some("bar".to_string())],
2215            ),
2216            // a < 'foo'
2217            (Reference::new("a").less_than(Datum::string("foo")), vec![
2218                Some("bar".to_string()),
2219            ]),
2220            // a <= 'foo'
2221            (
2222                Reference::new("a").less_than_or_equal_to(Datum::string("foo")),
2223                vec![Some("foo".to_string()), Some("bar".to_string())],
2224            ),
2225            // a > 'foo'
2226            (
2227                Reference::new("a").greater_than(Datum::string("bar")),
2228                vec![Some("foo".to_string())],
2229            ),
2230            // a >= 'foo'
2231            (
2232                Reference::new("a").greater_than_or_equal_to(Datum::string("foo")),
2233                vec![Some("foo".to_string())],
2234            ),
2235            // a IN ('foo', 'bar')
2236            (
2237                Reference::new("a").is_in([Datum::string("foo"), Datum::string("baz")]),
2238                vec![Some("foo".to_string())],
2239            ),
2240            // a NOT IN ('foo', 'bar')
2241            (
2242                Reference::new("a").is_not_in([Datum::string("foo"), Datum::string("baz")]),
2243                vec![Some("bar".to_string())],
2244            ),
2245        ];
2246
2247        // Table data: ["foo", "bar"]
2248        let data_for_col_a = vec![Some("foo".to_string()), Some("bar".to_string())];
2249
2250        let (file_io, schema, table_location, _temp_dir) =
2251            setup_kleene_logic(data_for_col_a, DataType::LargeUtf8);
2252        let reader = ArrowReaderBuilder::new(file_io).build();
2253
2254        for (predicate, expected) in predicates {
2255            println!("testing predicate {predicate}");
2256            let result_data = test_perform_read(
2257                predicate.clone(),
2258                schema.clone(),
2259                table_location.clone(),
2260                reader.clone(),
2261            )
2262            .await;
2263
2264            assert_eq!(result_data, expected, "predicate={predicate}");
2265        }
2266    }
2267
2268    async fn test_perform_read(
2269        predicate: Predicate,
2270        schema: SchemaRef,
2271        table_location: String,
2272        reader: ArrowReader,
2273    ) -> Vec<Option<String>> {
2274        let tasks = Box::pin(futures::stream::iter(
2275            vec![Ok(FileScanTask {
2276                file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
2277                    .unwrap()
2278                    .len(),
2279                start: 0,
2280                length: 0,
2281                record_count: None,
2282                data_file_path: format!("{table_location}/1.parquet"),
2283                data_file_format: DataFileFormat::Parquet,
2284                schema: schema.clone(),
2285                project_field_ids: vec![1],
2286                predicate: Some(predicate.bind(schema, true).unwrap()),
2287                deletes: vec![],
2288                partition: None,
2289                partition_spec: None,
2290                name_mapping: None,
2291                case_sensitive: false,
2292            })]
2293            .into_iter(),
2294        )) as FileScanTaskStream;
2295
2296        let result = reader
2297            .read(tasks)
2298            .unwrap()
2299            .try_collect::<Vec<RecordBatch>>()
2300            .await
2301            .unwrap();
2302
2303        result[0].columns()[0]
2304            .as_string_opt::<i32>()
2305            .unwrap()
2306            .iter()
2307            .map(|v| v.map(ToOwned::to_owned))
2308            .collect::<Vec<_>>()
2309    }
2310
2311    fn setup_kleene_logic(
2312        data_for_col_a: Vec<Option<String>>,
2313        col_a_type: DataType,
2314    ) -> (FileIO, SchemaRef, String, TempDir) {
2315        let schema = Arc::new(
2316            Schema::builder()
2317                .with_schema_id(1)
2318                .with_fields(vec![
2319                    NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(),
2320                ])
2321                .build()
2322                .unwrap(),
2323        );
2324
2325        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2326            Field::new("a", col_a_type.clone(), true).with_metadata(HashMap::from([(
2327                PARQUET_FIELD_ID_META_KEY.to_string(),
2328                "1".to_string(),
2329            )])),
2330        ]));
2331
2332        let tmp_dir = TempDir::new().unwrap();
2333        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2334
2335        let file_io = FileIO::new_with_fs();
2336
2337        let col = match col_a_type {
2338            DataType::Utf8 => Arc::new(StringArray::from(data_for_col_a)) as ArrayRef,
2339            DataType::LargeUtf8 => Arc::new(LargeStringArray::from(data_for_col_a)) as ArrayRef,
2340            _ => panic!("unexpected col_a_type"),
2341        };
2342
2343        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col]).unwrap();
2344
2345        // Write the Parquet files
2346        let props = WriterProperties::builder()
2347            .set_compression(Compression::SNAPPY)
2348            .build();
2349
2350        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
2351        let mut writer =
2352            ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
2353
2354        writer.write(&to_write).expect("Writing batch");
2355
2356        // writer must be closed to write footer
2357        writer.close().unwrap();
2358
2359        (file_io, schema, table_location, tmp_dir)
2360    }
2361
2362    #[test]
2363    fn test_build_deletes_row_selection() {
2364        let schema_descr = get_test_schema_descr();
2365
2366        let mut columns = vec![];
2367        for ptr in schema_descr.columns() {
2368            let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
2369            columns.push(column);
2370        }
2371
2372        let row_groups_metadata = vec![
2373            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 0),
2374            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 1),
2375            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 2),
2376            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 3),
2377            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 4),
2378        ];
2379
2380        let selected_row_groups = Some(vec![1, 3]);
2381
2382        /* cases to cover:
2383           * {skip|select} {first|intermediate|last} {one row|multiple rows} in
2384             {first|intermediate|last} {skipped|selected} row group
2385           * row group selection disabled
2386        */
2387
2388        let positional_deletes = RoaringTreemap::from_iter(&[
2389            1, // in skipped rg 0, should be ignored
2390            3, // run of three consecutive items in skipped rg0
2391            4, 5, 998, // two consecutive items at end of skipped rg0
2392            999, 1000, // solitary row at start of selected rg1 (1, 9)
2393            1010, // run of 3 rows in selected rg1
2394            1011, 1012, // (3, 485)
2395            1498, // run of two items at end of selected rg1
2396            1499, 1500, // run of two items at start of skipped rg2
2397            1501, 1600, // should ignore, in skipped rg2
2398            1999, // single row at end of skipped rg2
2399            2000, // run of two items at start of selected rg3
2400            2001, // (4, 98)
2401            2100, // single row in selected row group 3 (1, 99)
2402            2200, // run of 3 consecutive rows in selected row group 3
2403            2201, 2202, // (3, 796)
2404            2999, // single item at end of selected rg3 (1)
2405            3000, // single item at start of skipped rg4
2406        ]);
2407
2408        let positional_deletes = DeleteVector::new(positional_deletes);
2409
2410        // using selected row groups 1 and 3
2411        let result = ArrowReader::build_deletes_row_selection(
2412            &row_groups_metadata,
2413            &selected_row_groups,
2414            &positional_deletes,
2415        )
2416        .unwrap();
2417
2418        let expected = RowSelection::from(vec![
2419            RowSelector::skip(1),
2420            RowSelector::select(9),
2421            RowSelector::skip(3),
2422            RowSelector::select(485),
2423            RowSelector::skip(4),
2424            RowSelector::select(98),
2425            RowSelector::skip(1),
2426            RowSelector::select(99),
2427            RowSelector::skip(3),
2428            RowSelector::select(796),
2429            RowSelector::skip(1),
2430        ]);
2431
2432        assert_eq!(result, expected);
2433
2434        // selecting all row groups
2435        let result = ArrowReader::build_deletes_row_selection(
2436            &row_groups_metadata,
2437            &None,
2438            &positional_deletes,
2439        )
2440        .unwrap();
2441
2442        let expected = RowSelection::from(vec![
2443            RowSelector::select(1),
2444            RowSelector::skip(1),
2445            RowSelector::select(1),
2446            RowSelector::skip(3),
2447            RowSelector::select(992),
2448            RowSelector::skip(3),
2449            RowSelector::select(9),
2450            RowSelector::skip(3),
2451            RowSelector::select(485),
2452            RowSelector::skip(4),
2453            RowSelector::select(98),
2454            RowSelector::skip(1),
2455            RowSelector::select(398),
2456            RowSelector::skip(3),
2457            RowSelector::select(98),
2458            RowSelector::skip(1),
2459            RowSelector::select(99),
2460            RowSelector::skip(3),
2461            RowSelector::select(796),
2462            RowSelector::skip(2),
2463            RowSelector::select(499),
2464        ]);
2465
2466        assert_eq!(result, expected);
2467    }
2468
2469    fn build_test_row_group_meta(
2470        schema_descr: SchemaDescPtr,
2471        columns: Vec<ColumnChunkMetaData>,
2472        num_rows: i64,
2473        ordinal: i16,
2474    ) -> RowGroupMetaData {
2475        RowGroupMetaData::builder(schema_descr.clone())
2476            .set_num_rows(num_rows)
2477            .set_total_byte_size(2000)
2478            .set_column_metadata(columns)
2479            .set_ordinal(ordinal)
2480            .build()
2481            .unwrap()
2482    }
2483
2484    fn get_test_schema_descr() -> SchemaDescPtr {
2485        use parquet::schema::types::Type as SchemaType;
2486
2487        let schema = SchemaType::group_type_builder("schema")
2488            .with_fields(vec![
2489                Arc::new(
2490                    SchemaType::primitive_type_builder("a", parquet::basic::Type::INT32)
2491                        .build()
2492                        .unwrap(),
2493                ),
2494                Arc::new(
2495                    SchemaType::primitive_type_builder("b", parquet::basic::Type::INT32)
2496                        .build()
2497                        .unwrap(),
2498                ),
2499            ])
2500            .build()
2501            .unwrap();
2502
2503        Arc::new(SchemaDescriptor::new(Arc::new(schema)))
2504    }
2505
2506    /// Verifies that file splits respect byte ranges and only read specific row groups.
2507    #[tokio::test]
2508    async fn test_file_splits_respect_byte_ranges() {
2509        use arrow_array::Int32Array;
2510        use parquet::file::reader::{FileReader, SerializedFileReader};
2511
2512        let schema = Arc::new(
2513            Schema::builder()
2514                .with_schema_id(1)
2515                .with_fields(vec![
2516                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2517                ])
2518                .build()
2519                .unwrap(),
2520        );
2521
2522        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2523            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2524                PARQUET_FIELD_ID_META_KEY.to_string(),
2525                "1".to_string(),
2526            )])),
2527        ]));
2528
2529        let tmp_dir = TempDir::new().unwrap();
2530        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2531        let file_path = format!("{table_location}/multi_row_group.parquet");
2532
2533        // Force each batch into its own row group for testing byte range filtering.
2534        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2535            (0..100).collect::<Vec<i32>>(),
2536        ))])
2537        .unwrap();
2538        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2539            (100..200).collect::<Vec<i32>>(),
2540        ))])
2541        .unwrap();
2542        let batch3 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2543            (200..300).collect::<Vec<i32>>(),
2544        ))])
2545        .unwrap();
2546
2547        let props = WriterProperties::builder()
2548            .set_compression(Compression::SNAPPY)
2549            .set_max_row_group_size(100)
2550            .build();
2551
2552        let file = File::create(&file_path).unwrap();
2553        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2554        writer.write(&batch1).expect("Writing batch 1");
2555        writer.write(&batch2).expect("Writing batch 2");
2556        writer.write(&batch3).expect("Writing batch 3");
2557        writer.close().unwrap();
2558
2559        // Read the file metadata to get row group byte positions
2560        let file = File::open(&file_path).unwrap();
2561        let reader = SerializedFileReader::new(file).unwrap();
2562        let metadata = reader.metadata();
2563
2564        println!("File has {} row groups", metadata.num_row_groups());
2565        assert_eq!(metadata.num_row_groups(), 3, "Expected 3 row groups");
2566
2567        // Get byte positions for each row group
2568        let row_group_0 = metadata.row_group(0);
2569        let row_group_1 = metadata.row_group(1);
2570        let row_group_2 = metadata.row_group(2);
2571
2572        let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
2573        let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2574        let rg2_start = rg1_start + row_group_1.compressed_size() as u64;
2575        let file_end = rg2_start + row_group_2.compressed_size() as u64;
2576
2577        println!(
2578            "Row group 0: {} rows, starts at byte {}, {} bytes compressed",
2579            row_group_0.num_rows(),
2580            rg0_start,
2581            row_group_0.compressed_size()
2582        );
2583        println!(
2584            "Row group 1: {} rows, starts at byte {}, {} bytes compressed",
2585            row_group_1.num_rows(),
2586            rg1_start,
2587            row_group_1.compressed_size()
2588        );
2589        println!(
2590            "Row group 2: {} rows, starts at byte {}, {} bytes compressed",
2591            row_group_2.num_rows(),
2592            rg2_start,
2593            row_group_2.compressed_size()
2594        );
2595
2596        let file_io = FileIO::new_with_fs();
2597        let reader = ArrowReaderBuilder::new(file_io).build();
2598
2599        // Task 1: read only the first row group
2600        let task1 = FileScanTask {
2601            file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(),
2602            start: rg0_start,
2603            length: row_group_0.compressed_size() as u64,
2604            record_count: Some(100),
2605            data_file_path: file_path.clone(),
2606            data_file_format: DataFileFormat::Parquet,
2607            schema: schema.clone(),
2608            project_field_ids: vec![1],
2609            predicate: None,
2610            deletes: vec![],
2611            partition: None,
2612            partition_spec: None,
2613            name_mapping: None,
2614            case_sensitive: false,
2615        };
2616
2617        // Task 2: read the second and third row groups
2618        let task2 = FileScanTask {
2619            file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(),
2620            start: rg1_start,
2621            length: file_end - rg1_start,
2622            record_count: Some(200),
2623            data_file_path: file_path.clone(),
2624            data_file_format: DataFileFormat::Parquet,
2625            schema: schema.clone(),
2626            project_field_ids: vec![1],
2627            predicate: None,
2628            deletes: vec![],
2629            partition: None,
2630            partition_spec: None,
2631            name_mapping: None,
2632            case_sensitive: false,
2633        };
2634
2635        let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
2636        let result1 = reader
2637            .clone()
2638            .read(tasks1)
2639            .unwrap()
2640            .try_collect::<Vec<RecordBatch>>()
2641            .await
2642            .unwrap();
2643
2644        let total_rows_task1: usize = result1.iter().map(|b| b.num_rows()).sum();
2645        println!(
2646            "Task 1 (bytes {}-{}) returned {} rows",
2647            rg0_start,
2648            rg0_start + row_group_0.compressed_size() as u64,
2649            total_rows_task1
2650        );
2651
2652        let tasks2 = Box::pin(futures::stream::iter(vec![Ok(task2)])) as FileScanTaskStream;
2653        let result2 = reader
2654            .read(tasks2)
2655            .unwrap()
2656            .try_collect::<Vec<RecordBatch>>()
2657            .await
2658            .unwrap();
2659
2660        let total_rows_task2: usize = result2.iter().map(|b| b.num_rows()).sum();
2661        println!("Task 2 (bytes {rg1_start}-{file_end}) returned {total_rows_task2} rows");
2662
2663        assert_eq!(
2664            total_rows_task1, 100,
2665            "Task 1 should read only the first row group (100 rows), but got {total_rows_task1} rows"
2666        );
2667
2668        assert_eq!(
2669            total_rows_task2, 200,
2670            "Task 2 should read only the second+third row groups (200 rows), but got {total_rows_task2} rows"
2671        );
2672
2673        // Verify the actual data values are correct (not just the row count)
2674        if total_rows_task1 > 0 {
2675            let first_batch = &result1[0];
2676            let id_col = first_batch
2677                .column(0)
2678                .as_primitive::<arrow_array::types::Int32Type>();
2679            let first_val = id_col.value(0);
2680            let last_val = id_col.value(id_col.len() - 1);
2681            println!("Task 1 data range: {first_val} to {last_val}");
2682
2683            assert_eq!(first_val, 0, "Task 1 should start with id=0");
2684            assert_eq!(last_val, 99, "Task 1 should end with id=99");
2685        }
2686
2687        if total_rows_task2 > 0 {
2688            let first_batch = &result2[0];
2689            let id_col = first_batch
2690                .column(0)
2691                .as_primitive::<arrow_array::types::Int32Type>();
2692            let first_val = id_col.value(0);
2693            println!("Task 2 first value: {first_val}");
2694
2695            assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0");
2696        }
2697    }
2698
2699    /// Test schema evolution: reading old Parquet file (with only column 'a')
2700    /// using a newer table schema (with columns 'a' and 'b').
2701    /// This tests that:
2702    /// 1. get_arrow_projection_mask allows missing columns
2703    /// 2. RecordBatchTransformer adds missing column 'b' with NULL values
2704    #[tokio::test]
2705    async fn test_schema_evolution_add_column() {
2706        use arrow_array::{Array, Int32Array};
2707
2708        // New table schema: columns 'a' and 'b' (b was added later, file only has 'a')
2709        let new_schema = Arc::new(
2710            Schema::builder()
2711                .with_schema_id(2)
2712                .with_fields(vec![
2713                    NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)).into(),
2714                    NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
2715                ])
2716                .build()
2717                .unwrap(),
2718        );
2719
2720        // Create Arrow schema for old Parquet file (only has column 'a')
2721        let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
2722            Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([(
2723                PARQUET_FIELD_ID_META_KEY.to_string(),
2724                "1".to_string(),
2725            )])),
2726        ]));
2727
2728        // Write old Parquet file with only column 'a'
2729        let tmp_dir = TempDir::new().unwrap();
2730        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2731        let file_io = FileIO::new_with_fs();
2732
2733        let data_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
2734        let to_write = RecordBatch::try_new(arrow_schema_old.clone(), vec![data_a]).unwrap();
2735
2736        let props = WriterProperties::builder()
2737            .set_compression(Compression::SNAPPY)
2738            .build();
2739        let file = File::create(format!("{table_location}/old_file.parquet")).unwrap();
2740        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
2741        writer.write(&to_write).expect("Writing batch");
2742        writer.close().unwrap();
2743
2744        // Read the old Parquet file using the NEW schema (with column 'b')
2745        let reader = ArrowReaderBuilder::new(file_io).build();
2746        let tasks = Box::pin(futures::stream::iter(
2747            vec![Ok(FileScanTask {
2748                file_size_in_bytes: std::fs::metadata(format!("{table_location}/old_file.parquet"))
2749                    .unwrap()
2750                    .len(),
2751                start: 0,
2752                length: 0,
2753                record_count: None,
2754                data_file_path: format!("{table_location}/old_file.parquet"),
2755                data_file_format: DataFileFormat::Parquet,
2756                schema: new_schema.clone(),
2757                project_field_ids: vec![1, 2], // Request both columns 'a' and 'b'
2758                predicate: None,
2759                deletes: vec![],
2760                partition: None,
2761                partition_spec: None,
2762                name_mapping: None,
2763                case_sensitive: false,
2764            })]
2765            .into_iter(),
2766        )) as FileScanTaskStream;
2767
2768        let result = reader
2769            .read(tasks)
2770            .unwrap()
2771            .try_collect::<Vec<RecordBatch>>()
2772            .await
2773            .unwrap();
2774
2775        // Verify we got the correct data
2776        assert_eq!(result.len(), 1);
2777        let batch = &result[0];
2778
2779        // Should have 2 columns now
2780        assert_eq!(batch.num_columns(), 2);
2781        assert_eq!(batch.num_rows(), 3);
2782
2783        // Column 'a' should have the original data
2784        let col_a = batch
2785            .column(0)
2786            .as_primitive::<arrow_array::types::Int32Type>();
2787        assert_eq!(col_a.values(), &[1, 2, 3]);
2788
2789        // Column 'b' should be all NULLs (it didn't exist in the old file)
2790        let col_b = batch
2791            .column(1)
2792            .as_primitive::<arrow_array::types::Int32Type>();
2793        assert_eq!(col_b.null_count(), 3);
2794        assert!(col_b.is_null(0));
2795        assert!(col_b.is_null(1));
2796        assert!(col_b.is_null(2));
2797    }
2798
2799    /// Test for bug where position deletes in later row groups are not applied correctly.
2800    ///
2801    /// When a file has multiple row groups and a position delete targets a row in a later
2802    /// row group, the `build_deletes_row_selection` function had a bug where it would
2803    /// fail to increment `current_row_group_base_idx` when skipping row groups.
2804    ///
2805    /// This test creates:
2806    /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
2807    /// - A position delete file that deletes row 199 (last row in second row group)
2808    ///
2809    /// Expected behavior: Should return 199 rows (with id=200 deleted)
2810    /// Bug behavior: Returns 200 rows (delete is not applied)
2811    ///
2812    /// This bug was discovered while running Apache Spark + Apache Iceberg integration tests
2813    /// through DataFusion Comet. The following Iceberg Java tests failed due to this bug:
2814    /// - `org.apache.iceberg.spark.extensions.TestMergeOnReadDelete::testDeleteWithMultipleRowGroupsParquet`
2815    /// - `org.apache.iceberg.spark.extensions.TestMergeOnReadUpdate::testUpdateWithMultipleRowGroupsParquet`
2816    #[tokio::test]
2817    async fn test_position_delete_across_multiple_row_groups() {
2818        use arrow_array::{Int32Array, Int64Array};
2819        use parquet::file::reader::{FileReader, SerializedFileReader};
2820
2821        // Field IDs for positional delete schema
2822        const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2823        const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2824
2825        let tmp_dir = TempDir::new().unwrap();
2826        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2827
2828        // Create table schema with a single 'id' column
2829        let table_schema = Arc::new(
2830            Schema::builder()
2831                .with_schema_id(1)
2832                .with_fields(vec![
2833                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2834                ])
2835                .build()
2836                .unwrap(),
2837        );
2838
2839        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2840            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2841                PARQUET_FIELD_ID_META_KEY.to_string(),
2842                "1".to_string(),
2843            )])),
2844        ]));
2845
2846        // Step 1: Create data file with 200 rows in 2 row groups
2847        // Row group 0: rows 0-99 (ids 1-100)
2848        // Row group 1: rows 100-199 (ids 101-200)
2849        let data_file_path = format!("{table_location}/data.parquet");
2850
2851        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2852            Int32Array::from_iter_values(1..=100),
2853        )])
2854        .unwrap();
2855
2856        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2857            Int32Array::from_iter_values(101..=200),
2858        )])
2859        .unwrap();
2860
2861        // Force each batch into its own row group
2862        let props = WriterProperties::builder()
2863            .set_compression(Compression::SNAPPY)
2864            .set_max_row_group_size(100)
2865            .build();
2866
2867        let file = File::create(&data_file_path).unwrap();
2868        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2869        writer.write(&batch1).expect("Writing batch 1");
2870        writer.write(&batch2).expect("Writing batch 2");
2871        writer.close().unwrap();
2872
2873        // Verify we created 2 row groups
2874        let verify_file = File::open(&data_file_path).unwrap();
2875        let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2876        assert_eq!(
2877            verify_reader.metadata().num_row_groups(),
2878            2,
2879            "Should have 2 row groups"
2880        );
2881
2882        // Step 2: Create position delete file that deletes row 199 (id=200, last row in row group 1)
2883        let delete_file_path = format!("{table_location}/deletes.parquet");
2884
2885        let delete_schema = Arc::new(ArrowSchema::new(vec![
2886            Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2887                PARQUET_FIELD_ID_META_KEY.to_string(),
2888                FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2889            )])),
2890            Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2891                PARQUET_FIELD_ID_META_KEY.to_string(),
2892                FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2893            )])),
2894        ]));
2895
2896        // Delete row at position 199 (0-indexed, so it's the last row: id=200)
2897        let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2898            Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2899            Arc::new(Int64Array::from_iter_values(vec![199i64])),
2900        ])
2901        .unwrap();
2902
2903        let delete_props = WriterProperties::builder()
2904            .set_compression(Compression::SNAPPY)
2905            .build();
2906
2907        let delete_file = File::create(&delete_file_path).unwrap();
2908        let mut delete_writer =
2909            ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2910        delete_writer.write(&delete_batch).unwrap();
2911        delete_writer.close().unwrap();
2912
2913        // Step 3: Read the data file with the delete applied
2914        let file_io = FileIO::new_with_fs();
2915        let reader = ArrowReaderBuilder::new(file_io).build();
2916
2917        let task = FileScanTask {
2918            file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(),
2919            start: 0,
2920            length: 0,
2921            record_count: Some(200),
2922            data_file_path: data_file_path.clone(),
2923            data_file_format: DataFileFormat::Parquet,
2924            schema: table_schema.clone(),
2925            project_field_ids: vec![1],
2926            predicate: None,
2927            deletes: vec![FileScanTaskDeleteFile {
2928                file_size_in_bytes: std::fs::metadata(&delete_file_path).unwrap().len(),
2929                file_path: delete_file_path,
2930                file_type: DataContentType::PositionDeletes,
2931                partition_spec_id: 0,
2932                equality_ids: None,
2933            }],
2934            partition: None,
2935            partition_spec: None,
2936            name_mapping: None,
2937            case_sensitive: false,
2938        };
2939
2940        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
2941        let result = reader
2942            .read(tasks)
2943            .unwrap()
2944            .try_collect::<Vec<RecordBatch>>()
2945            .await
2946            .unwrap();
2947
2948        // Step 4: Verify we got 199 rows (not 200)
2949        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
2950
2951        println!("Total rows read: {total_rows}");
2952        println!("Expected: 199 rows (deleted row 199 which had id=200)");
2953
2954        // This assertion will FAIL before the fix and PASS after the fix
2955        assert_eq!(
2956            total_rows, 199,
2957            "Expected 199 rows after deleting row 199, but got {total_rows} rows. \
2958             The bug causes position deletes in later row groups to be ignored."
2959        );
2960
2961        // Verify the deleted row (id=200) is not present
2962        let all_ids: Vec<i32> = result
2963            .iter()
2964            .flat_map(|batch| {
2965                batch
2966                    .column(0)
2967                    .as_primitive::<arrow_array::types::Int32Type>()
2968                    .values()
2969                    .iter()
2970                    .copied()
2971            })
2972            .collect();
2973
2974        assert!(
2975            !all_ids.contains(&200),
2976            "Row with id=200 should be deleted but was found in results"
2977        );
2978
2979        // Verify we have all other ids (1-199)
2980        let expected_ids: Vec<i32> = (1..=199).collect();
2981        assert_eq!(
2982            all_ids, expected_ids,
2983            "Should have ids 1-199 but got different values"
2984        );
2985    }
2986
2987    /// Test for bug where position deletes are lost when skipping unselected row groups.
2988    ///
2989    /// This is a variant of `test_position_delete_across_multiple_row_groups` that exercises
2990    /// the row group selection code path (`selected_row_groups: Some([...])`).
2991    ///
2992    /// When a file has multiple row groups and only some are selected for reading,
2993    /// the `build_deletes_row_selection` function must correctly skip over deletes in
2994    /// unselected row groups WITHOUT consuming deletes that belong to selected row groups.
2995    ///
2996    /// This test creates:
2997    /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
2998    /// - A position delete file that deletes row 199 (last row in second row group)
2999    /// - Row group selection that reads ONLY row group 1 (rows 100-199)
3000    ///
3001    /// Expected behavior: Should return 99 rows (with row 199 deleted)
3002    /// Bug behavior: Returns 100 rows (delete is lost when skipping row group 0)
3003    ///
3004    /// The bug occurs when processing row group 0 (unselected):
3005    /// ```rust
3006    /// delete_vector_iter.advance_to(next_row_group_base_idx); // Position at first delete >= 100
3007    /// next_deleted_row_idx_opt = delete_vector_iter.next(); // BUG: Consumes delete at 199!
3008    /// ```
3009    ///
3010    /// The fix is to NOT call `next()` after `advance_to()` when skipping unselected row groups,
3011    /// because `advance_to()` already positions the iterator correctly without consuming elements.
3012    #[tokio::test]
3013    async fn test_position_delete_with_row_group_selection() {
3014        use arrow_array::{Int32Array, Int64Array};
3015        use parquet::file::reader::{FileReader, SerializedFileReader};
3016
3017        // Field IDs for positional delete schema
3018        const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
3019        const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
3020
3021        let tmp_dir = TempDir::new().unwrap();
3022        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3023
3024        // Create table schema with a single 'id' column
3025        let table_schema = Arc::new(
3026            Schema::builder()
3027                .with_schema_id(1)
3028                .with_fields(vec![
3029                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3030                ])
3031                .build()
3032                .unwrap(),
3033        );
3034
3035        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3036            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3037                PARQUET_FIELD_ID_META_KEY.to_string(),
3038                "1".to_string(),
3039            )])),
3040        ]));
3041
3042        // Step 1: Create data file with 200 rows in 2 row groups
3043        // Row group 0: rows 0-99 (ids 1-100)
3044        // Row group 1: rows 100-199 (ids 101-200)
3045        let data_file_path = format!("{table_location}/data.parquet");
3046
3047        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3048            Int32Array::from_iter_values(1..=100),
3049        )])
3050        .unwrap();
3051
3052        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3053            Int32Array::from_iter_values(101..=200),
3054        )])
3055        .unwrap();
3056
3057        // Force each batch into its own row group
3058        let props = WriterProperties::builder()
3059            .set_compression(Compression::SNAPPY)
3060            .set_max_row_group_size(100)
3061            .build();
3062
3063        let file = File::create(&data_file_path).unwrap();
3064        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3065        writer.write(&batch1).expect("Writing batch 1");
3066        writer.write(&batch2).expect("Writing batch 2");
3067        writer.close().unwrap();
3068
3069        // Verify we created 2 row groups
3070        let verify_file = File::open(&data_file_path).unwrap();
3071        let verify_reader = SerializedFileReader::new(verify_file).unwrap();
3072        assert_eq!(
3073            verify_reader.metadata().num_row_groups(),
3074            2,
3075            "Should have 2 row groups"
3076        );
3077
3078        // Step 2: Create position delete file that deletes row 199 (id=200, last row in row group 1)
3079        let delete_file_path = format!("{table_location}/deletes.parquet");
3080
3081        let delete_schema = Arc::new(ArrowSchema::new(vec![
3082            Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
3083                PARQUET_FIELD_ID_META_KEY.to_string(),
3084                FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
3085            )])),
3086            Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
3087                PARQUET_FIELD_ID_META_KEY.to_string(),
3088                FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
3089            )])),
3090        ]));
3091
3092        // Delete row at position 199 (0-indexed, so it's the last row: id=200)
3093        let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
3094            Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
3095            Arc::new(Int64Array::from_iter_values(vec![199i64])),
3096        ])
3097        .unwrap();
3098
3099        let delete_props = WriterProperties::builder()
3100            .set_compression(Compression::SNAPPY)
3101            .build();
3102
3103        let delete_file = File::create(&delete_file_path).unwrap();
3104        let mut delete_writer =
3105            ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
3106        delete_writer.write(&delete_batch).unwrap();
3107        delete_writer.close().unwrap();
3108
3109        // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199)
3110        // This exercises the row group selection code path where row group 0 is skipped
3111        let metadata_file = File::open(&data_file_path).unwrap();
3112        let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
3113        let metadata = metadata_reader.metadata();
3114
3115        let row_group_0 = metadata.row_group(0);
3116        let row_group_1 = metadata.row_group(1);
3117
3118        let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
3119        let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
3120        let rg1_length = row_group_1.compressed_size() as u64;
3121
3122        println!(
3123            "Row group 0: starts at byte {}, {} bytes compressed",
3124            rg0_start,
3125            row_group_0.compressed_size()
3126        );
3127        println!(
3128            "Row group 1: starts at byte {}, {} bytes compressed",
3129            rg1_start,
3130            row_group_1.compressed_size()
3131        );
3132
3133        let file_io = FileIO::new_with_fs();
3134        let reader = ArrowReaderBuilder::new(file_io).build();
3135
3136        // Create FileScanTask that reads ONLY row group 1 via byte range filtering
3137        let task = FileScanTask {
3138            file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(),
3139            start: rg1_start,
3140            length: rg1_length,
3141            record_count: Some(100), // Row group 1 has 100 rows
3142            data_file_path: data_file_path.clone(),
3143            data_file_format: DataFileFormat::Parquet,
3144            schema: table_schema.clone(),
3145            project_field_ids: vec![1],
3146            predicate: None,
3147            deletes: vec![FileScanTaskDeleteFile {
3148                file_size_in_bytes: std::fs::metadata(&delete_file_path).unwrap().len(),
3149                file_path: delete_file_path,
3150                file_type: DataContentType::PositionDeletes,
3151                partition_spec_id: 0,
3152                equality_ids: None,
3153            }],
3154            partition: None,
3155            partition_spec: None,
3156            name_mapping: None,
3157            case_sensitive: false,
3158        };
3159
3160        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
3161        let result = reader
3162            .read(tasks)
3163            .unwrap()
3164            .try_collect::<Vec<RecordBatch>>()
3165            .await
3166            .unwrap();
3167
3168        // Step 4: Verify we got 99 rows (not 100)
3169        // Row group 1 has 100 rows (ids 101-200), minus 1 delete (id=200) = 99 rows
3170        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
3171
3172        println!("Total rows read from row group 1: {total_rows}");
3173        println!("Expected: 99 rows (row group 1 has 100 rows, 1 delete at position 199)");
3174
3175        // This assertion will FAIL before the fix and PASS after the fix
3176        assert_eq!(
3177            total_rows, 99,
3178            "Expected 99 rows from row group 1 after deleting position 199, but got {total_rows} rows. \
3179             The bug causes position deletes to be lost when advance_to() is followed by next() \
3180             when skipping unselected row groups."
3181        );
3182
3183        // Verify the deleted row (id=200) is not present
3184        let all_ids: Vec<i32> = result
3185            .iter()
3186            .flat_map(|batch| {
3187                batch
3188                    .column(0)
3189                    .as_primitive::<arrow_array::types::Int32Type>()
3190                    .values()
3191                    .iter()
3192                    .copied()
3193            })
3194            .collect();
3195
3196        assert!(
3197            !all_ids.contains(&200),
3198            "Row with id=200 should be deleted but was found in results"
3199        );
3200
3201        // Verify we have ids 101-199 (not 101-200)
3202        let expected_ids: Vec<i32> = (101..=199).collect();
3203        assert_eq!(
3204            all_ids, expected_ids,
3205            "Should have ids 101-199 but got different values"
3206        );
3207    }
3208    /// Test for bug where stale cached delete causes infinite loop when skipping row groups.
3209    ///
3210    /// This test exposes the inverse scenario of `test_position_delete_with_row_group_selection`:
3211    /// - Position delete targets a row in the SKIPPED row group (not the selected one)
3212    /// - After calling advance_to(), the cached delete index is stale
3213    /// - Without updating the cache, the code enters an infinite loop
3214    ///
3215    /// This test creates:
3216    /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
3217    /// - A position delete file that deletes row 0 (first row in SKIPPED row group 0)
3218    /// - Row group selection that reads ONLY row group 1 (rows 100-199)
3219    ///
3220    /// The bug occurs when skipping row group 0:
3221    /// ```rust
3222    /// let mut next_deleted_row_idx_opt = delete_vector_iter.next(); // Some(0)
3223    /// // ... skip to row group 1 ...
3224    /// delete_vector_iter.advance_to(100); // Iterator advances past delete at 0
3225    /// // BUG: next_deleted_row_idx_opt is still Some(0) - STALE!
3226    /// // When processing row group 1:
3227    /// //   current_idx = 100, next_deleted_row_idx = 0, next_row_group_base_idx = 200
3228    /// //   Loop condition: 0 < 200 (true)
3229    /// //   But: current_idx (100) > next_deleted_row_idx (0)
3230    /// //   And: current_idx (100) != next_deleted_row_idx (0)
3231    /// //   Neither branch executes -> INFINITE LOOP!
3232    /// ```
3233    ///
3234    /// Expected behavior: Should return 100 rows (delete at 0 doesn't affect row group 1)
3235    /// Bug behavior: Infinite loop in build_deletes_row_selection
3236    #[tokio::test]
3237    async fn test_position_delete_in_skipped_row_group() {
3238        use arrow_array::{Int32Array, Int64Array};
3239        use parquet::file::reader::{FileReader, SerializedFileReader};
3240
3241        // Field IDs for positional delete schema
3242        const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
3243        const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
3244
3245        let tmp_dir = TempDir::new().unwrap();
3246        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3247
3248        // Create table schema with a single 'id' column
3249        let table_schema = Arc::new(
3250            Schema::builder()
3251                .with_schema_id(1)
3252                .with_fields(vec![
3253                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3254                ])
3255                .build()
3256                .unwrap(),
3257        );
3258
3259        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3260            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3261                PARQUET_FIELD_ID_META_KEY.to_string(),
3262                "1".to_string(),
3263            )])),
3264        ]));
3265
3266        // Step 1: Create data file with 200 rows in 2 row groups
3267        // Row group 0: rows 0-99 (ids 1-100)
3268        // Row group 1: rows 100-199 (ids 101-200)
3269        let data_file_path = format!("{table_location}/data.parquet");
3270
3271        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3272            Int32Array::from_iter_values(1..=100),
3273        )])
3274        .unwrap();
3275
3276        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3277            Int32Array::from_iter_values(101..=200),
3278        )])
3279        .unwrap();
3280
3281        // Force each batch into its own row group
3282        let props = WriterProperties::builder()
3283            .set_compression(Compression::SNAPPY)
3284            .set_max_row_group_size(100)
3285            .build();
3286
3287        let file = File::create(&data_file_path).unwrap();
3288        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3289        writer.write(&batch1).expect("Writing batch 1");
3290        writer.write(&batch2).expect("Writing batch 2");
3291        writer.close().unwrap();
3292
3293        // Verify we created 2 row groups
3294        let verify_file = File::open(&data_file_path).unwrap();
3295        let verify_reader = SerializedFileReader::new(verify_file).unwrap();
3296        assert_eq!(
3297            verify_reader.metadata().num_row_groups(),
3298            2,
3299            "Should have 2 row groups"
3300        );
3301
3302        // Step 2: Create position delete file that deletes row 0 (id=1, first row in row group 0)
3303        let delete_file_path = format!("{table_location}/deletes.parquet");
3304
3305        let delete_schema = Arc::new(ArrowSchema::new(vec![
3306            Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
3307                PARQUET_FIELD_ID_META_KEY.to_string(),
3308                FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
3309            )])),
3310            Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
3311                PARQUET_FIELD_ID_META_KEY.to_string(),
3312                FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
3313            )])),
3314        ]));
3315
3316        // Delete row at position 0 (0-indexed, so it's the first row: id=1)
3317        let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
3318            Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
3319            Arc::new(Int64Array::from_iter_values(vec![0i64])),
3320        ])
3321        .unwrap();
3322
3323        let delete_props = WriterProperties::builder()
3324            .set_compression(Compression::SNAPPY)
3325            .build();
3326
3327        let delete_file = File::create(&delete_file_path).unwrap();
3328        let mut delete_writer =
3329            ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
3330        delete_writer.write(&delete_batch).unwrap();
3331        delete_writer.close().unwrap();
3332
3333        // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199)
3334        // This exercises the row group selection code path where row group 0 is skipped
3335        let metadata_file = File::open(&data_file_path).unwrap();
3336        let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
3337        let metadata = metadata_reader.metadata();
3338
3339        let row_group_0 = metadata.row_group(0);
3340        let row_group_1 = metadata.row_group(1);
3341
3342        let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
3343        let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
3344        let rg1_length = row_group_1.compressed_size() as u64;
3345
3346        let file_io = FileIO::new_with_fs();
3347        let reader = ArrowReaderBuilder::new(file_io).build();
3348
3349        // Create FileScanTask that reads ONLY row group 1 via byte range filtering
3350        let task = FileScanTask {
3351            file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(),
3352            start: rg1_start,
3353            length: rg1_length,
3354            record_count: Some(100), // Row group 1 has 100 rows
3355            data_file_path: data_file_path.clone(),
3356            data_file_format: DataFileFormat::Parquet,
3357            schema: table_schema.clone(),
3358            project_field_ids: vec![1],
3359            predicate: None,
3360            deletes: vec![FileScanTaskDeleteFile {
3361                file_size_in_bytes: std::fs::metadata(&delete_file_path).unwrap().len(),
3362                file_path: delete_file_path,
3363                file_type: DataContentType::PositionDeletes,
3364                partition_spec_id: 0,
3365                equality_ids: None,
3366            }],
3367            partition: None,
3368            partition_spec: None,
3369            name_mapping: None,
3370            case_sensitive: false,
3371        };
3372
3373        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
3374        let result = reader
3375            .read(tasks)
3376            .unwrap()
3377            .try_collect::<Vec<RecordBatch>>()
3378            .await
3379            .unwrap();
3380
3381        // Step 4: Verify we got 100 rows (all of row group 1)
3382        // The delete at position 0 is in row group 0, which is skipped, so it doesn't affect us
3383        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
3384
3385        assert_eq!(
3386            total_rows, 100,
3387            "Expected 100 rows from row group 1 (delete at position 0 is in skipped row group 0). \
3388             If this hangs or fails, it indicates the cached delete index was not updated after advance_to()."
3389        );
3390
3391        // Verify we have all ids from row group 1 (101-200)
3392        let all_ids: Vec<i32> = result
3393            .iter()
3394            .flat_map(|batch| {
3395                batch
3396                    .column(0)
3397                    .as_primitive::<arrow_array::types::Int32Type>()
3398                    .values()
3399                    .iter()
3400                    .copied()
3401            })
3402            .collect();
3403
3404        let expected_ids: Vec<i32> = (101..=200).collect();
3405        assert_eq!(
3406            all_ids, expected_ids,
3407            "Should have ids 101-200 (all of row group 1)"
3408        );
3409    }
3410
3411    /// Test reading Parquet files without field ID metadata (e.g., migrated tables).
3412    /// This exercises the position-based fallback path.
3413    ///
3414    /// Corresponds to Java's ParquetSchemaUtil.addFallbackIds() + pruneColumnsFallback()
3415    /// in /parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java
3416    #[tokio::test]
3417    async fn test_read_parquet_file_without_field_ids() {
3418        let schema = Arc::new(
3419            Schema::builder()
3420                .with_schema_id(1)
3421                .with_fields(vec![
3422                    NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3423                    NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3424                ])
3425                .build()
3426                .unwrap(),
3427        );
3428
3429        // Parquet file from a migrated table - no field ID metadata
3430        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3431            Field::new("name", DataType::Utf8, false),
3432            Field::new("age", DataType::Int32, false),
3433        ]));
3434
3435        let tmp_dir = TempDir::new().unwrap();
3436        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3437        let file_io = FileIO::new_with_fs();
3438
3439        let name_data = vec!["Alice", "Bob", "Charlie"];
3440        let age_data = vec![30, 25, 35];
3441
3442        use arrow_array::Int32Array;
3443        let name_col = Arc::new(StringArray::from(name_data.clone())) as ArrayRef;
3444        let age_col = Arc::new(Int32Array::from(age_data.clone())) as ArrayRef;
3445
3446        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![name_col, age_col]).unwrap();
3447
3448        let props = WriterProperties::builder()
3449            .set_compression(Compression::SNAPPY)
3450            .build();
3451
3452        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3453        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3454
3455        writer.write(&to_write).expect("Writing batch");
3456        writer.close().unwrap();
3457
3458        let reader = ArrowReaderBuilder::new(file_io).build();
3459
3460        let tasks = Box::pin(futures::stream::iter(
3461            vec![Ok(FileScanTask {
3462                file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3463                    .unwrap()
3464                    .len(),
3465                start: 0,
3466                length: 0,
3467                record_count: None,
3468                data_file_path: format!("{table_location}/1.parquet"),
3469                data_file_format: DataFileFormat::Parquet,
3470                schema: schema.clone(),
3471                project_field_ids: vec![1, 2],
3472                predicate: None,
3473                deletes: vec![],
3474                partition: None,
3475                partition_spec: None,
3476                name_mapping: None,
3477                case_sensitive: false,
3478            })]
3479            .into_iter(),
3480        )) as FileScanTaskStream;
3481
3482        let result = reader
3483            .read(tasks)
3484            .unwrap()
3485            .try_collect::<Vec<RecordBatch>>()
3486            .await
3487            .unwrap();
3488
3489        assert_eq!(result.len(), 1);
3490        let batch = &result[0];
3491        assert_eq!(batch.num_rows(), 3);
3492        assert_eq!(batch.num_columns(), 2);
3493
3494        // Verify position-based mapping: field_id 1 → position 0, field_id 2 → position 1
3495        let name_array = batch.column(0).as_string::<i32>();
3496        assert_eq!(name_array.value(0), "Alice");
3497        assert_eq!(name_array.value(1), "Bob");
3498        assert_eq!(name_array.value(2), "Charlie");
3499
3500        let age_array = batch
3501            .column(1)
3502            .as_primitive::<arrow_array::types::Int32Type>();
3503        assert_eq!(age_array.value(0), 30);
3504        assert_eq!(age_array.value(1), 25);
3505        assert_eq!(age_array.value(2), 35);
3506    }
3507
3508    /// Test reading Parquet files without field IDs with partial projection.
3509    /// Only a subset of columns are requested, verifying position-based fallback
3510    /// handles column selection correctly.
3511    #[tokio::test]
3512    async fn test_read_parquet_without_field_ids_partial_projection() {
3513        use arrow_array::Int32Array;
3514
3515        let schema = Arc::new(
3516            Schema::builder()
3517                .with_schema_id(1)
3518                .with_fields(vec![
3519                    NestedField::required(1, "col1", Type::Primitive(PrimitiveType::String)).into(),
3520                    NestedField::required(2, "col2", Type::Primitive(PrimitiveType::Int)).into(),
3521                    NestedField::required(3, "col3", Type::Primitive(PrimitiveType::String)).into(),
3522                    NestedField::required(4, "col4", Type::Primitive(PrimitiveType::Int)).into(),
3523                ])
3524                .build()
3525                .unwrap(),
3526        );
3527
3528        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3529            Field::new("col1", DataType::Utf8, false),
3530            Field::new("col2", DataType::Int32, false),
3531            Field::new("col3", DataType::Utf8, false),
3532            Field::new("col4", DataType::Int32, false),
3533        ]));
3534
3535        let tmp_dir = TempDir::new().unwrap();
3536        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3537        let file_io = FileIO::new_with_fs();
3538
3539        let col1_data = Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef;
3540        let col2_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3541        let col3_data = Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef;
3542        let col4_data = Arc::new(Int32Array::from(vec![30, 40])) as ArrayRef;
3543
3544        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
3545            col1_data, col2_data, col3_data, col4_data,
3546        ])
3547        .unwrap();
3548
3549        let props = WriterProperties::builder()
3550            .set_compression(Compression::SNAPPY)
3551            .build();
3552
3553        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3554        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3555
3556        writer.write(&to_write).expect("Writing batch");
3557        writer.close().unwrap();
3558
3559        let reader = ArrowReaderBuilder::new(file_io).build();
3560
3561        let tasks = Box::pin(futures::stream::iter(
3562            vec![Ok(FileScanTask {
3563                file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3564                    .unwrap()
3565                    .len(),
3566                start: 0,
3567                length: 0,
3568                record_count: None,
3569                data_file_path: format!("{table_location}/1.parquet"),
3570                data_file_format: DataFileFormat::Parquet,
3571                schema: schema.clone(),
3572                project_field_ids: vec![1, 3],
3573                predicate: None,
3574                deletes: vec![],
3575                partition: None,
3576                partition_spec: None,
3577                name_mapping: None,
3578                case_sensitive: false,
3579            })]
3580            .into_iter(),
3581        )) as FileScanTaskStream;
3582
3583        let result = reader
3584            .read(tasks)
3585            .unwrap()
3586            .try_collect::<Vec<RecordBatch>>()
3587            .await
3588            .unwrap();
3589
3590        assert_eq!(result.len(), 1);
3591        let batch = &result[0];
3592        assert_eq!(batch.num_rows(), 2);
3593        assert_eq!(batch.num_columns(), 2);
3594
3595        let col1_array = batch.column(0).as_string::<i32>();
3596        assert_eq!(col1_array.value(0), "a");
3597        assert_eq!(col1_array.value(1), "b");
3598
3599        let col3_array = batch.column(1).as_string::<i32>();
3600        assert_eq!(col3_array.value(0), "c");
3601        assert_eq!(col3_array.value(1), "d");
3602    }
3603
3604    /// Test reading Parquet files without field IDs with schema evolution.
3605    /// The Iceberg schema has more fields than the Parquet file, testing that
3606    /// missing columns are filled with NULLs.
3607    #[tokio::test]
3608    async fn test_read_parquet_without_field_ids_schema_evolution() {
3609        use arrow_array::{Array, Int32Array};
3610
3611        // Schema with field 3 added after the file was written
3612        let schema = Arc::new(
3613            Schema::builder()
3614                .with_schema_id(1)
3615                .with_fields(vec![
3616                    NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3617                    NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3618                    NestedField::optional(3, "city", Type::Primitive(PrimitiveType::String)).into(),
3619                ])
3620                .build()
3621                .unwrap(),
3622        );
3623
3624        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3625            Field::new("name", DataType::Utf8, false),
3626            Field::new("age", DataType::Int32, false),
3627        ]));
3628
3629        let tmp_dir = TempDir::new().unwrap();
3630        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3631        let file_io = FileIO::new_with_fs();
3632
3633        let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3634        let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3635
3636        let to_write =
3637            RecordBatch::try_new(arrow_schema.clone(), vec![name_data, age_data]).unwrap();
3638
3639        let props = WriterProperties::builder()
3640            .set_compression(Compression::SNAPPY)
3641            .build();
3642
3643        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3644        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3645
3646        writer.write(&to_write).expect("Writing batch");
3647        writer.close().unwrap();
3648
3649        let reader = ArrowReaderBuilder::new(file_io).build();
3650
3651        let tasks = Box::pin(futures::stream::iter(
3652            vec![Ok(FileScanTask {
3653                file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3654                    .unwrap()
3655                    .len(),
3656                start: 0,
3657                length: 0,
3658                record_count: None,
3659                data_file_path: format!("{table_location}/1.parquet"),
3660                data_file_format: DataFileFormat::Parquet,
3661                schema: schema.clone(),
3662                project_field_ids: vec![1, 2, 3],
3663                predicate: None,
3664                deletes: vec![],
3665                partition: None,
3666                partition_spec: None,
3667                name_mapping: None,
3668                case_sensitive: false,
3669            })]
3670            .into_iter(),
3671        )) as FileScanTaskStream;
3672
3673        let result = reader
3674            .read(tasks)
3675            .unwrap()
3676            .try_collect::<Vec<RecordBatch>>()
3677            .await
3678            .unwrap();
3679
3680        assert_eq!(result.len(), 1);
3681        let batch = &result[0];
3682        assert_eq!(batch.num_rows(), 2);
3683        assert_eq!(batch.num_columns(), 3);
3684
3685        let name_array = batch.column(0).as_string::<i32>();
3686        assert_eq!(name_array.value(0), "Alice");
3687        assert_eq!(name_array.value(1), "Bob");
3688
3689        let age_array = batch
3690            .column(1)
3691            .as_primitive::<arrow_array::types::Int32Type>();
3692        assert_eq!(age_array.value(0), 30);
3693        assert_eq!(age_array.value(1), 25);
3694
3695        // Verify missing column filled with NULLs
3696        let city_array = batch.column(2).as_string::<i32>();
3697        assert_eq!(city_array.null_count(), 2);
3698        assert!(city_array.is_null(0));
3699        assert!(city_array.is_null(1));
3700    }
3701
3702    /// Test reading Parquet files without field IDs that have multiple row groups.
3703    /// This ensures the position-based fallback works correctly across row group boundaries.
3704    #[tokio::test]
3705    async fn test_read_parquet_without_field_ids_multiple_row_groups() {
3706        use arrow_array::Int32Array;
3707
3708        let schema = Arc::new(
3709            Schema::builder()
3710                .with_schema_id(1)
3711                .with_fields(vec![
3712                    NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3713                    NestedField::required(2, "value", Type::Primitive(PrimitiveType::Int)).into(),
3714                ])
3715                .build()
3716                .unwrap(),
3717        );
3718
3719        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3720            Field::new("name", DataType::Utf8, false),
3721            Field::new("value", DataType::Int32, false),
3722        ]));
3723
3724        let tmp_dir = TempDir::new().unwrap();
3725        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3726        let file_io = FileIO::new_with_fs();
3727
3728        // Small row group size to create multiple row groups
3729        let props = WriterProperties::builder()
3730            .set_compression(Compression::SNAPPY)
3731            .set_write_batch_size(2)
3732            .set_max_row_group_size(2)
3733            .build();
3734
3735        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3736        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3737
3738        // Write 6 rows in 3 batches (will create 3 row groups)
3739        for batch_num in 0..3 {
3740            let name_data = Arc::new(StringArray::from(vec![
3741                format!("name_{}", batch_num * 2),
3742                format!("name_{}", batch_num * 2 + 1),
3743            ])) as ArrayRef;
3744            let value_data =
3745                Arc::new(Int32Array::from(vec![batch_num * 2, batch_num * 2 + 1])) as ArrayRef;
3746
3747            let batch =
3748                RecordBatch::try_new(arrow_schema.clone(), vec![name_data, value_data]).unwrap();
3749            writer.write(&batch).expect("Writing batch");
3750        }
3751        writer.close().unwrap();
3752
3753        let reader = ArrowReaderBuilder::new(file_io).build();
3754
3755        let tasks = Box::pin(futures::stream::iter(
3756            vec![Ok(FileScanTask {
3757                file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3758                    .unwrap()
3759                    .len(),
3760                start: 0,
3761                length: 0,
3762                record_count: None,
3763                data_file_path: format!("{table_location}/1.parquet"),
3764                data_file_format: DataFileFormat::Parquet,
3765                schema: schema.clone(),
3766                project_field_ids: vec![1, 2],
3767                predicate: None,
3768                deletes: vec![],
3769                partition: None,
3770                partition_spec: None,
3771                name_mapping: None,
3772                case_sensitive: false,
3773            })]
3774            .into_iter(),
3775        )) as FileScanTaskStream;
3776
3777        let result = reader
3778            .read(tasks)
3779            .unwrap()
3780            .try_collect::<Vec<RecordBatch>>()
3781            .await
3782            .unwrap();
3783
3784        assert!(!result.is_empty());
3785
3786        let mut all_names = Vec::new();
3787        let mut all_values = Vec::new();
3788
3789        for batch in &result {
3790            let name_array = batch.column(0).as_string::<i32>();
3791            let value_array = batch
3792                .column(1)
3793                .as_primitive::<arrow_array::types::Int32Type>();
3794
3795            for i in 0..batch.num_rows() {
3796                all_names.push(name_array.value(i).to_string());
3797                all_values.push(value_array.value(i));
3798            }
3799        }
3800
3801        assert_eq!(all_names.len(), 6);
3802        assert_eq!(all_values.len(), 6);
3803
3804        for i in 0..6 {
3805            assert_eq!(all_names[i], format!("name_{i}"));
3806            assert_eq!(all_values[i], i as i32);
3807        }
3808    }
3809
3810    /// Test reading Parquet files without field IDs with nested types (struct).
3811    /// Java's pruneColumnsFallback() projects entire top-level columns including nested content.
3812    /// This test verifies that a top-level struct field is projected correctly with all its nested fields.
3813    #[tokio::test]
3814    async fn test_read_parquet_without_field_ids_with_struct() {
3815        use arrow_array::{Int32Array, StructArray};
3816        use arrow_schema::Fields;
3817
3818        let schema = Arc::new(
3819            Schema::builder()
3820                .with_schema_id(1)
3821                .with_fields(vec![
3822                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3823                    NestedField::required(
3824                        2,
3825                        "person",
3826                        Type::Struct(crate::spec::StructType::new(vec![
3827                            NestedField::required(
3828                                3,
3829                                "name",
3830                                Type::Primitive(PrimitiveType::String),
3831                            )
3832                            .into(),
3833                            NestedField::required(4, "age", Type::Primitive(PrimitiveType::Int))
3834                                .into(),
3835                        ])),
3836                    )
3837                    .into(),
3838                ])
3839                .build()
3840                .unwrap(),
3841        );
3842
3843        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3844            Field::new("id", DataType::Int32, false),
3845            Field::new(
3846                "person",
3847                DataType::Struct(Fields::from(vec![
3848                    Field::new("name", DataType::Utf8, false),
3849                    Field::new("age", DataType::Int32, false),
3850                ])),
3851                false,
3852            ),
3853        ]));
3854
3855        let tmp_dir = TempDir::new().unwrap();
3856        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3857        let file_io = FileIO::new_with_fs();
3858
3859        let id_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3860        let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3861        let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3862        let person_data = Arc::new(StructArray::from(vec![
3863            (
3864                Arc::new(Field::new("name", DataType::Utf8, false)),
3865                name_data,
3866            ),
3867            (
3868                Arc::new(Field::new("age", DataType::Int32, false)),
3869                age_data,
3870            ),
3871        ])) as ArrayRef;
3872
3873        let to_write =
3874            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, person_data]).unwrap();
3875
3876        let props = WriterProperties::builder()
3877            .set_compression(Compression::SNAPPY)
3878            .build();
3879
3880        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3881        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3882
3883        writer.write(&to_write).expect("Writing batch");
3884        writer.close().unwrap();
3885
3886        let reader = ArrowReaderBuilder::new(file_io).build();
3887
3888        let tasks = Box::pin(futures::stream::iter(
3889            vec![Ok(FileScanTask {
3890                file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3891                    .unwrap()
3892                    .len(),
3893                start: 0,
3894                length: 0,
3895                record_count: None,
3896                data_file_path: format!("{table_location}/1.parquet"),
3897                data_file_format: DataFileFormat::Parquet,
3898                schema: schema.clone(),
3899                project_field_ids: vec![1, 2],
3900                predicate: None,
3901                deletes: vec![],
3902                partition: None,
3903                partition_spec: None,
3904                name_mapping: None,
3905                case_sensitive: false,
3906            })]
3907            .into_iter(),
3908        )) as FileScanTaskStream;
3909
3910        let result = reader
3911            .read(tasks)
3912            .unwrap()
3913            .try_collect::<Vec<RecordBatch>>()
3914            .await
3915            .unwrap();
3916
3917        assert_eq!(result.len(), 1);
3918        let batch = &result[0];
3919        assert_eq!(batch.num_rows(), 2);
3920        assert_eq!(batch.num_columns(), 2);
3921
3922        let id_array = batch
3923            .column(0)
3924            .as_primitive::<arrow_array::types::Int32Type>();
3925        assert_eq!(id_array.value(0), 1);
3926        assert_eq!(id_array.value(1), 2);
3927
3928        let person_array = batch.column(1).as_struct();
3929        assert_eq!(person_array.num_columns(), 2);
3930
3931        let name_array = person_array.column(0).as_string::<i32>();
3932        assert_eq!(name_array.value(0), "Alice");
3933        assert_eq!(name_array.value(1), "Bob");
3934
3935        let age_array = person_array
3936            .column(1)
3937            .as_primitive::<arrow_array::types::Int32Type>();
3938        assert_eq!(age_array.value(0), 30);
3939        assert_eq!(age_array.value(1), 25);
3940    }
3941
3942    /// Test reading Parquet files without field IDs with schema evolution - column added in the middle.
3943    /// When a new column is inserted between existing columns in the schema order,
3944    /// the fallback projection must correctly map field IDs to output positions.
3945    #[tokio::test]
3946    async fn test_read_parquet_without_field_ids_schema_evolution_add_column_in_middle() {
3947        use arrow_array::{Array, Int32Array};
3948
3949        let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
3950            Field::new("col0", DataType::Int32, true),
3951            Field::new("col1", DataType::Int32, true),
3952        ]));
3953
3954        // New column added between existing columns: col0 (id=1), newCol (id=5), col1 (id=2)
3955        let schema = Arc::new(
3956            Schema::builder()
3957                .with_schema_id(1)
3958                .with_fields(vec![
3959                    NestedField::optional(1, "col0", Type::Primitive(PrimitiveType::Int)).into(),
3960                    NestedField::optional(5, "newCol", Type::Primitive(PrimitiveType::Int)).into(),
3961                    NestedField::optional(2, "col1", Type::Primitive(PrimitiveType::Int)).into(),
3962                ])
3963                .build()
3964                .unwrap(),
3965        );
3966
3967        let tmp_dir = TempDir::new().unwrap();
3968        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3969        let file_io = FileIO::new_with_fs();
3970
3971        let col0_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3972        let col1_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3973
3974        let to_write =
3975            RecordBatch::try_new(arrow_schema_old.clone(), vec![col0_data, col1_data]).unwrap();
3976
3977        let props = WriterProperties::builder()
3978            .set_compression(Compression::SNAPPY)
3979            .build();
3980
3981        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3982        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3983        writer.write(&to_write).expect("Writing batch");
3984        writer.close().unwrap();
3985
3986        let reader = ArrowReaderBuilder::new(file_io).build();
3987
3988        let tasks = Box::pin(futures::stream::iter(
3989            vec![Ok(FileScanTask {
3990                file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3991                    .unwrap()
3992                    .len(),
3993                start: 0,
3994                length: 0,
3995                record_count: None,
3996                data_file_path: format!("{table_location}/1.parquet"),
3997                data_file_format: DataFileFormat::Parquet,
3998                schema: schema.clone(),
3999                project_field_ids: vec![1, 5, 2],
4000                predicate: None,
4001                deletes: vec![],
4002                partition: None,
4003                partition_spec: None,
4004                name_mapping: None,
4005                case_sensitive: false,
4006            })]
4007            .into_iter(),
4008        )) as FileScanTaskStream;
4009
4010        let result = reader
4011            .read(tasks)
4012            .unwrap()
4013            .try_collect::<Vec<RecordBatch>>()
4014            .await
4015            .unwrap();
4016
4017        assert_eq!(result.len(), 1);
4018        let batch = &result[0];
4019        assert_eq!(batch.num_rows(), 2);
4020        assert_eq!(batch.num_columns(), 3);
4021
4022        let result_col0 = batch
4023            .column(0)
4024            .as_primitive::<arrow_array::types::Int32Type>();
4025        assert_eq!(result_col0.value(0), 1);
4026        assert_eq!(result_col0.value(1), 2);
4027
4028        // New column should be NULL (doesn't exist in old file)
4029        let result_newcol = batch
4030            .column(1)
4031            .as_primitive::<arrow_array::types::Int32Type>();
4032        assert_eq!(result_newcol.null_count(), 2);
4033        assert!(result_newcol.is_null(0));
4034        assert!(result_newcol.is_null(1));
4035
4036        let result_col1 = batch
4037            .column(2)
4038            .as_primitive::<arrow_array::types::Int32Type>();
4039        assert_eq!(result_col1.value(0), 10);
4040        assert_eq!(result_col1.value(1), 20);
4041    }
4042
4043    /// Test reading Parquet files without field IDs with a filter that eliminates all row groups.
4044    /// During development of field ID mapping, we saw a panic when row_selection_enabled=true and
4045    /// all row groups are filtered out.
4046    #[tokio::test]
4047    async fn test_read_parquet_without_field_ids_filter_eliminates_all_rows() {
4048        use arrow_array::{Float64Array, Int32Array};
4049
4050        // Schema with fields that will use fallback IDs 1, 2, 3
4051        let schema = Arc::new(
4052            Schema::builder()
4053                .with_schema_id(1)
4054                .with_fields(vec![
4055                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
4056                    NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
4057                    NestedField::required(3, "value", Type::Primitive(PrimitiveType::Double))
4058                        .into(),
4059                ])
4060                .build()
4061                .unwrap(),
4062        );
4063
4064        let arrow_schema = Arc::new(ArrowSchema::new(vec![
4065            Field::new("id", DataType::Int32, false),
4066            Field::new("name", DataType::Utf8, false),
4067            Field::new("value", DataType::Float64, false),
4068        ]));
4069
4070        let tmp_dir = TempDir::new().unwrap();
4071        let table_location = tmp_dir.path().to_str().unwrap().to_string();
4072        let file_io = FileIO::new_with_fs();
4073
4074        // Write data where all ids are >= 10
4075        let id_data = Arc::new(Int32Array::from(vec![10, 11, 12])) as ArrayRef;
4076        let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
4077        let value_data = Arc::new(Float64Array::from(vec![100.0, 200.0, 300.0])) as ArrayRef;
4078
4079        let to_write =
4080            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data, value_data])
4081                .unwrap();
4082
4083        let props = WriterProperties::builder()
4084            .set_compression(Compression::SNAPPY)
4085            .build();
4086
4087        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
4088        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
4089        writer.write(&to_write).expect("Writing batch");
4090        writer.close().unwrap();
4091
4092        // Filter that eliminates all row groups: id < 5
4093        let predicate = Reference::new("id").less_than(Datum::int(5));
4094
4095        // Enable both row_group_filtering and row_selection - triggered the panic
4096        let reader = ArrowReaderBuilder::new(file_io)
4097            .with_row_group_filtering_enabled(true)
4098            .with_row_selection_enabled(true)
4099            .build();
4100
4101        let tasks = Box::pin(futures::stream::iter(
4102            vec![Ok(FileScanTask {
4103                file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
4104                    .unwrap()
4105                    .len(),
4106                start: 0,
4107                length: 0,
4108                record_count: None,
4109                data_file_path: format!("{table_location}/1.parquet"),
4110                data_file_format: DataFileFormat::Parquet,
4111                schema: schema.clone(),
4112                project_field_ids: vec![1, 2, 3],
4113                predicate: Some(predicate.bind(schema, true).unwrap()),
4114                deletes: vec![],
4115                partition: None,
4116                partition_spec: None,
4117                name_mapping: None,
4118                case_sensitive: false,
4119            })]
4120            .into_iter(),
4121        )) as FileScanTaskStream;
4122
4123        // Should no longer panic
4124        let result = reader
4125            .read(tasks)
4126            .unwrap()
4127            .try_collect::<Vec<RecordBatch>>()
4128            .await
4129            .unwrap();
4130
4131        // Should return empty results
4132        assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0));
4133    }
4134
4135    /// Test that concurrency=1 reads all files correctly and in deterministic order.
4136    /// This verifies the fast-path optimization for single concurrency.
4137    #[tokio::test]
4138    async fn test_read_with_concurrency_one() {
4139        use arrow_array::Int32Array;
4140
4141        let schema = Arc::new(
4142            Schema::builder()
4143                .with_schema_id(1)
4144                .with_fields(vec![
4145                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
4146                    NestedField::required(2, "file_num", Type::Primitive(PrimitiveType::Int))
4147                        .into(),
4148                ])
4149                .build()
4150                .unwrap(),
4151        );
4152
4153        let arrow_schema = Arc::new(ArrowSchema::new(vec![
4154            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
4155                PARQUET_FIELD_ID_META_KEY.to_string(),
4156                "1".to_string(),
4157            )])),
4158            Field::new("file_num", DataType::Int32, false).with_metadata(HashMap::from([(
4159                PARQUET_FIELD_ID_META_KEY.to_string(),
4160                "2".to_string(),
4161            )])),
4162        ]));
4163
4164        let tmp_dir = TempDir::new().unwrap();
4165        let table_location = tmp_dir.path().to_str().unwrap().to_string();
4166        let file_io = FileIO::new_with_fs();
4167
4168        // Create 3 parquet files with different data
4169        let props = WriterProperties::builder()
4170            .set_compression(Compression::SNAPPY)
4171            .build();
4172
4173        for file_num in 0..3 {
4174            let id_data = Arc::new(Int32Array::from_iter_values(
4175                file_num * 10..(file_num + 1) * 10,
4176            )) as ArrayRef;
4177            let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10])) as ArrayRef;
4178
4179            let to_write =
4180                RecordBatch::try_new(arrow_schema.clone(), vec![id_data, file_num_data]).unwrap();
4181
4182            let file = File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap();
4183            let mut writer =
4184                ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
4185            writer.write(&to_write).expect("Writing batch");
4186            writer.close().unwrap();
4187        }
4188
4189        // Read with concurrency=1 (fast-path)
4190        let reader = ArrowReaderBuilder::new(file_io)
4191            .with_data_file_concurrency_limit(1)
4192            .build();
4193
4194        // Create tasks in a specific order: file_0, file_1, file_2
4195        let tasks = vec![
4196            Ok(FileScanTask {
4197                file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_0.parquet"))
4198                    .unwrap()
4199                    .len(),
4200                start: 0,
4201                length: 0,
4202                record_count: None,
4203                data_file_path: format!("{table_location}/file_0.parquet"),
4204                data_file_format: DataFileFormat::Parquet,
4205                schema: schema.clone(),
4206                project_field_ids: vec![1, 2],
4207                predicate: None,
4208                deletes: vec![],
4209                partition: None,
4210                partition_spec: None,
4211                name_mapping: None,
4212                case_sensitive: false,
4213            }),
4214            Ok(FileScanTask {
4215                file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_1.parquet"))
4216                    .unwrap()
4217                    .len(),
4218                start: 0,
4219                length: 0,
4220                record_count: None,
4221                data_file_path: format!("{table_location}/file_1.parquet"),
4222                data_file_format: DataFileFormat::Parquet,
4223                schema: schema.clone(),
4224                project_field_ids: vec![1, 2],
4225                predicate: None,
4226                deletes: vec![],
4227                partition: None,
4228                partition_spec: None,
4229                name_mapping: None,
4230                case_sensitive: false,
4231            }),
4232            Ok(FileScanTask {
4233                file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_2.parquet"))
4234                    .unwrap()
4235                    .len(),
4236                start: 0,
4237                length: 0,
4238                record_count: None,
4239                data_file_path: format!("{table_location}/file_2.parquet"),
4240                data_file_format: DataFileFormat::Parquet,
4241                schema: schema.clone(),
4242                project_field_ids: vec![1, 2],
4243                predicate: None,
4244                deletes: vec![],
4245                partition: None,
4246                partition_spec: None,
4247                name_mapping: None,
4248                case_sensitive: false,
4249            }),
4250        ];
4251
4252        let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream;
4253
4254        let result = reader
4255            .read(tasks_stream)
4256            .unwrap()
4257            .try_collect::<Vec<RecordBatch>>()
4258            .await
4259            .unwrap();
4260
4261        // Verify we got all 30 rows (10 from each file)
4262        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
4263        assert_eq!(total_rows, 30, "Should have 30 total rows");
4264
4265        // Collect all ids and file_nums to verify data
4266        let mut all_ids = Vec::new();
4267        let mut all_file_nums = Vec::new();
4268
4269        for batch in &result {
4270            let id_col = batch
4271                .column(0)
4272                .as_primitive::<arrow_array::types::Int32Type>();
4273            let file_num_col = batch
4274                .column(1)
4275                .as_primitive::<arrow_array::types::Int32Type>();
4276
4277            for i in 0..batch.num_rows() {
4278                all_ids.push(id_col.value(i));
4279                all_file_nums.push(file_num_col.value(i));
4280            }
4281        }
4282
4283        assert_eq!(all_ids.len(), 30);
4284        assert_eq!(all_file_nums.len(), 30);
4285
4286        // With concurrency=1 and sequential processing, files should be processed in order
4287        // file_0: ids 0-9, file_num=0
4288        // file_1: ids 10-19, file_num=1
4289        // file_2: ids 20-29, file_num=2
4290        for i in 0..10 {
4291            assert_eq!(all_file_nums[i], 0, "First 10 rows should be from file_0");
4292            assert_eq!(all_ids[i], i as i32, "IDs should be 0-9");
4293        }
4294        for i in 10..20 {
4295            assert_eq!(all_file_nums[i], 1, "Next 10 rows should be from file_1");
4296            assert_eq!(all_ids[i], i as i32, "IDs should be 10-19");
4297        }
4298        for i in 20..30 {
4299            assert_eq!(all_file_nums[i], 2, "Last 10 rows should be from file_2");
4300            assert_eq!(all_ids[i], i as i32, "IDs should be 20-29");
4301        }
4302    }
4303
4304    /// Test bucket partitioning reads source column from data file (not partition metadata).
4305    ///
4306    /// This is an integration test verifying the complete ArrowReader pipeline with bucket partitioning.
4307    /// It corresponds to TestRuntimeFiltering tests in Iceberg Java (e.g., testRenamedSourceColumnTable).
4308    ///
4309    /// # Iceberg Spec Requirements
4310    ///
4311    /// Per the Iceberg spec "Column Projection" section:
4312    /// > "Return the value from partition metadata if an **Identity Transform** exists for the field"
4313    ///
4314    /// This means:
4315    /// - Identity transforms (e.g., `identity(dept)`) use constants from partition metadata
4316    /// - Non-identity transforms (e.g., `bucket(4, id)`) must read source columns from data files
4317    /// - Partition metadata for bucket transforms stores bucket numbers (0-3), NOT source values
4318    ///
4319    /// Java's PartitionUtil.constantsMap() implements this via:
4320    /// ```java
4321    /// if (field.transform().isIdentity()) {
4322    ///     idToConstant.put(field.sourceId(), converted);
4323    /// }
4324    /// ```
4325    ///
4326    /// # What This Test Verifies
4327    ///
4328    /// This test ensures the full ArrowReader → RecordBatchTransformer pipeline correctly handles
4329    /// bucket partitioning when FileScanTask provides partition_spec and partition_data:
4330    ///
4331    /// - Parquet file has field_id=1 named "id" with actual data [1, 5, 9, 13]
4332    /// - FileScanTask specifies partition_spec with bucket(4, id) and partition_data with bucket=1
4333    /// - RecordBatchTransformer.constants_map() excludes bucket-partitioned field from constants
4334    /// - ArrowReader correctly reads [1, 5, 9, 13] from the data file
4335    /// - Values are NOT replaced with constant 1 from partition metadata
4336    ///
4337    /// # Why This Matters
4338    ///
4339    /// Without correct handling:
4340    /// - Runtime filtering would break (e.g., `WHERE id = 5` would fail)
4341    /// - Query results would be incorrect (all rows would have id=1)
4342    /// - Bucket partitioning would be unusable for query optimization
4343    ///
4344    /// # References
4345    /// - Iceberg spec: format/spec.md "Column Projection" + "Partition Transforms"
4346    /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java
4347    /// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
4348    #[tokio::test]
4349    async fn test_bucket_partitioning_reads_source_column_from_file() {
4350        use arrow_array::Int32Array;
4351
4352        use crate::spec::{Literal, PartitionSpec, Struct, Transform};
4353
4354        // Iceberg schema with id and name columns
4355        let schema = Arc::new(
4356            Schema::builder()
4357                .with_schema_id(0)
4358                .with_fields(vec![
4359                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
4360                    NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
4361                ])
4362                .build()
4363                .unwrap(),
4364        );
4365
4366        // Partition spec: bucket(4, id)
4367        let partition_spec = Arc::new(
4368            PartitionSpec::builder(schema.clone())
4369                .with_spec_id(0)
4370                .add_partition_field("id", "id_bucket", Transform::Bucket(4))
4371                .unwrap()
4372                .build()
4373                .unwrap(),
4374        );
4375
4376        // Partition data: bucket value is 1
4377        let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]);
4378
4379        // Create Arrow schema with field IDs for Parquet file
4380        let arrow_schema = Arc::new(ArrowSchema::new(vec![
4381            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
4382                PARQUET_FIELD_ID_META_KEY.to_string(),
4383                "1".to_string(),
4384            )])),
4385            Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
4386                PARQUET_FIELD_ID_META_KEY.to_string(),
4387                "2".to_string(),
4388            )])),
4389        ]));
4390
4391        // Write Parquet file with data
4392        let tmp_dir = TempDir::new().unwrap();
4393        let table_location = tmp_dir.path().to_str().unwrap().to_string();
4394        let file_io = FileIO::new_with_fs();
4395
4396        let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as ArrayRef;
4397        let name_data =
4398            Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef;
4399
4400        let to_write =
4401            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data]).unwrap();
4402
4403        let props = WriterProperties::builder()
4404            .set_compression(Compression::SNAPPY)
4405            .build();
4406        let file = File::create(format!("{}/data.parquet", &table_location)).unwrap();
4407        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
4408        writer.write(&to_write).expect("Writing batch");
4409        writer.close().unwrap();
4410
4411        // Read the Parquet file with partition spec and data
4412        let reader = ArrowReaderBuilder::new(file_io).build();
4413        let tasks = Box::pin(futures::stream::iter(
4414            vec![Ok(FileScanTask {
4415                file_size_in_bytes: std::fs::metadata(format!("{table_location}/data.parquet"))
4416                    .unwrap()
4417                    .len(),
4418                start: 0,
4419                length: 0,
4420                record_count: None,
4421                data_file_path: format!("{table_location}/data.parquet"),
4422                data_file_format: DataFileFormat::Parquet,
4423                schema: schema.clone(),
4424                project_field_ids: vec![1, 2],
4425                predicate: None,
4426                deletes: vec![],
4427                partition: Some(partition_data),
4428                partition_spec: Some(partition_spec),
4429                name_mapping: None,
4430                case_sensitive: false,
4431            })]
4432            .into_iter(),
4433        )) as FileScanTaskStream;
4434
4435        let result = reader
4436            .read(tasks)
4437            .unwrap()
4438            .try_collect::<Vec<RecordBatch>>()
4439            .await
4440            .unwrap();
4441
4442        // Verify we got the correct data
4443        assert_eq!(result.len(), 1);
4444        let batch = &result[0];
4445
4446        assert_eq!(batch.num_columns(), 2);
4447        assert_eq!(batch.num_rows(), 4);
4448
4449        // The id column MUST contain actual values from the Parquet file [1, 5, 9, 13],
4450        // NOT the constant partition value 1
4451        let id_col = batch
4452            .column(0)
4453            .as_primitive::<arrow_array::types::Int32Type>();
4454        assert_eq!(id_col.value(0), 1);
4455        assert_eq!(id_col.value(1), 5);
4456        assert_eq!(id_col.value(2), 9);
4457        assert_eq!(id_col.value(3), 13);
4458
4459        let name_col = batch.column(1).as_string::<i32>();
4460        assert_eq!(name_col.value(0), "Alice");
4461        assert_eq!(name_col.value(1), "Bob");
4462        assert_eq!(name_col.value(2), "Charlie");
4463        assert_eq!(name_col.value(3), "Dave");
4464    }
4465
4466    #[test]
4467    fn test_merge_ranges_empty() {
4468        assert_eq!(super::merge_ranges(&[], 1024), Vec::<Range<u64>>::new());
4469    }
4470
4471    #[test]
4472    fn test_merge_ranges_no_coalesce() {
4473        // Ranges far apart should not be merged
4474        let ranges = vec![0..100, 1_000_000..1_000_100];
4475        let merged = super::merge_ranges(&ranges, 1024);
4476        assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]);
4477    }
4478
4479    #[test]
4480    fn test_merge_ranges_coalesce() {
4481        // Ranges within the gap threshold should be merged
4482        let ranges = vec![0..100, 200..300, 500..600];
4483        let merged = super::merge_ranges(&ranges, 1024);
4484        assert_eq!(merged, vec![0..600]);
4485    }
4486
4487    #[test]
4488    fn test_merge_ranges_overlapping() {
4489        let ranges = vec![0..200, 100..300];
4490        let merged = super::merge_ranges(&ranges, 0);
4491        assert_eq!(merged, vec![0..300]);
4492    }
4493
4494    #[test]
4495    fn test_merge_ranges_unsorted() {
4496        let ranges = vec![500..600, 0..100, 200..300];
4497        let merged = super::merge_ranges(&ranges, 1024);
4498        assert_eq!(merged, vec![0..600]);
4499    }
4500
4501    /// Mock FileRead backed by a flat byte buffer.
4502    struct MockFileRead {
4503        data: bytes::Bytes,
4504    }
4505
4506    impl MockFileRead {
4507        fn new(size: usize) -> Self {
4508            // Fill with sequential byte values so slices are verifiable.
4509            let data: Vec<u8> = (0..size).map(|i| (i % 256) as u8).collect();
4510            Self {
4511                data: bytes::Bytes::from(data),
4512            }
4513        }
4514    }
4515
4516    #[async_trait::async_trait]
4517    impl crate::io::FileRead for MockFileRead {
4518        async fn read(&self, range: Range<u64>) -> crate::Result<bytes::Bytes> {
4519            Ok(self.data.slice(range.start as usize..range.end as usize))
4520        }
4521    }
4522
4523    #[tokio::test]
4524    async fn test_get_byte_ranges_no_coalesce() {
4525        use parquet::arrow::async_reader::AsyncFileReader;
4526
4527        let mock = MockFileRead::new(2048);
4528        let expected_0 = mock.data.slice(0..100);
4529        let expected_1 = mock.data.slice(1500..1600);
4530
4531        let mut reader =
4532            super::ArrowFileReader::new(crate::io::FileMetadata { size: 2048 }, Box::new(mock))
4533                .with_parquet_read_options(
4534                    super::ParquetReadOptions::builder()
4535                        .with_range_coalesce_bytes(0)
4536                        .build(),
4537                );
4538
4539        let result = reader
4540            .get_byte_ranges(vec![0..100, 1500..1600])
4541            .await
4542            .unwrap();
4543
4544        assert_eq!(result.len(), 2);
4545        assert_eq!(result[0], expected_0);
4546        assert_eq!(result[1], expected_1);
4547    }
4548
4549    #[tokio::test]
4550    async fn test_get_byte_ranges_with_coalesce() {
4551        use parquet::arrow::async_reader::AsyncFileReader;
4552
4553        let mock = MockFileRead::new(1024);
4554        let expected_0 = mock.data.slice(0..100);
4555        let expected_1 = mock.data.slice(200..300);
4556        let expected_2 = mock.data.slice(500..600);
4557
4558        let mut reader =
4559            super::ArrowFileReader::new(crate::io::FileMetadata { size: 1024 }, Box::new(mock))
4560                .with_parquet_read_options(
4561                    super::ParquetReadOptions::builder()
4562                        .with_range_coalesce_bytes(1024)
4563                        .build(),
4564                );
4565
4566        // All ranges within coalesce threshold — should merge into one fetch.
4567        let result = reader
4568            .get_byte_ranges(vec![0..100, 200..300, 500..600])
4569            .await
4570            .unwrap();
4571
4572        assert_eq!(result.len(), 3);
4573        assert_eq!(result[0], expected_0);
4574        assert_eq!(result[1], expected_1);
4575        assert_eq!(result[2], expected_2);
4576    }
4577
4578    #[tokio::test]
4579    async fn test_get_byte_ranges_empty() {
4580        use parquet::arrow::async_reader::AsyncFileReader;
4581
4582        let mock = MockFileRead::new(1024);
4583        let mut reader =
4584            super::ArrowFileReader::new(crate::io::FileMetadata { size: 1024 }, Box::new(mock));
4585
4586        let result = reader.get_byte_ranges(vec![]).await.unwrap();
4587        assert!(result.is_empty());
4588    }
4589
4590    #[tokio::test]
4591    async fn test_get_byte_ranges_coalesce_max() {
4592        use parquet::arrow::async_reader::AsyncFileReader;
4593
4594        let mock = MockFileRead::new(2048);
4595        let expected_0 = mock.data.slice(0..100);
4596        let expected_1 = mock.data.slice(1500..1600);
4597
4598        let mut reader =
4599            super::ArrowFileReader::new(crate::io::FileMetadata { size: 2048 }, Box::new(mock))
4600                .with_parquet_read_options(
4601                    super::ParquetReadOptions::builder()
4602                        .with_range_coalesce_bytes(u64::MAX)
4603                        .build(),
4604                );
4605
4606        // u64::MAX coalesce — all ranges merge into a single fetch.
4607        let result = reader
4608            .get_byte_ranges(vec![0..100, 1500..1600])
4609            .await
4610            .unwrap();
4611
4612        assert_eq!(result.len(), 2);
4613        assert_eq!(result[0], expected_0);
4614        assert_eq!(result[1], expected_1);
4615    }
4616
4617    #[tokio::test]
4618    async fn test_get_byte_ranges_concurrency_zero() {
4619        use parquet::arrow::async_reader::AsyncFileReader;
4620
4621        // concurrency=0 is clamped to 1, so this should not hang.
4622        let mock = MockFileRead::new(1024);
4623        let expected = mock.data.slice(0..100);
4624
4625        let mut reader =
4626            super::ArrowFileReader::new(crate::io::FileMetadata { size: 1024 }, Box::new(mock))
4627                .with_parquet_read_options(
4628                    super::ParquetReadOptions::builder()
4629                        .with_range_fetch_concurrency(0)
4630                        .build(),
4631                );
4632
4633        let result = reader
4634            .get_byte_ranges(vec![0..100, 200..300])
4635            .await
4636            .unwrap();
4637        assert_eq!(result.len(), 2);
4638        assert_eq!(result[0], expected);
4639    }
4640
4641    #[tokio::test]
4642    async fn test_get_byte_ranges_concurrency_one() {
4643        use parquet::arrow::async_reader::AsyncFileReader;
4644
4645        let mock = MockFileRead::new(2048);
4646        let expected_0 = mock.data.slice(0..100);
4647        let expected_1 = mock.data.slice(500..600);
4648        let expected_2 = mock.data.slice(1500..1600);
4649
4650        let mut reader =
4651            super::ArrowFileReader::new(crate::io::FileMetadata { size: 2048 }, Box::new(mock))
4652                .with_parquet_read_options(
4653                    super::ParquetReadOptions::builder()
4654                        .with_range_coalesce_bytes(0)
4655                        .with_range_fetch_concurrency(1)
4656                        .build(),
4657                );
4658
4659        // concurrency=1 with no coalescing — sequential fetches.
4660        let result = reader
4661            .get_byte_ranges(vec![0..100, 500..600, 1500..1600])
4662            .await
4663            .unwrap();
4664
4665        assert_eq!(result.len(), 3);
4666        assert_eq!(result[0], expected_0);
4667        assert_eq!(result[1], expected_1);
4668        assert_eq!(result[2], expected_2);
4669    }
4670}