iceberg/arrow/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Parquet file data reader
19
20use std::collections::{HashMap, HashSet};
21use std::ops::Range;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene};
26use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar};
27use arrow_cast::cast::cast;
28use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
29use arrow_schema::{
30    ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
31};
32use arrow_string::like::starts_with;
33use bytes::Bytes;
34use fnv::FnvHashSet;
35use futures::future::BoxFuture;
36use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join};
37use parquet::arrow::arrow_reader::{
38    ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
39};
40use parquet::arrow::async_reader::AsyncFileReader;
41use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask};
42use parquet::file::metadata::{
43    PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
44};
45use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
46
47use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
48use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
49use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
50use crate::delete_vector::DeleteVector;
51use crate::error::Result;
52use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit};
53use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
54use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
55use crate::expr::{BoundPredicate, BoundReference};
56use crate::io::{FileIO, FileMetadata, FileRead};
57use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
58use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
59use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type};
60use crate::utils::available_parallelism;
61use crate::{Error, ErrorKind};
62
63/// Builder to create ArrowReader
64pub struct ArrowReaderBuilder {
65    batch_size: Option<usize>,
66    file_io: FileIO,
67    concurrency_limit_data_files: usize,
68    row_group_filtering_enabled: bool,
69    row_selection_enabled: bool,
70}
71
72impl ArrowReaderBuilder {
73    /// Create a new ArrowReaderBuilder
74    pub fn new(file_io: FileIO) -> Self {
75        let num_cpus = available_parallelism().get();
76
77        ArrowReaderBuilder {
78            batch_size: None,
79            file_io,
80            concurrency_limit_data_files: num_cpus,
81            row_group_filtering_enabled: true,
82            row_selection_enabled: false,
83        }
84    }
85
86    /// Sets the max number of in flight data files that are being fetched
87    pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
88        self.concurrency_limit_data_files = val;
89        self
90    }
91
92    /// Sets the desired size of batches in the response
93    /// to something other than the default
94    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
95        self.batch_size = Some(batch_size);
96        self
97    }
98
99    /// Determines whether to enable row group filtering.
100    pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
101        self.row_group_filtering_enabled = row_group_filtering_enabled;
102        self
103    }
104
105    /// Determines whether to enable row selection.
106    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
107        self.row_selection_enabled = row_selection_enabled;
108        self
109    }
110
111    /// Build the ArrowReader.
112    pub fn build(self) -> ArrowReader {
113        ArrowReader {
114            batch_size: self.batch_size,
115            file_io: self.file_io.clone(),
116            delete_file_loader: CachingDeleteFileLoader::new(
117                self.file_io.clone(),
118                self.concurrency_limit_data_files,
119            ),
120            concurrency_limit_data_files: self.concurrency_limit_data_files,
121            row_group_filtering_enabled: self.row_group_filtering_enabled,
122            row_selection_enabled: self.row_selection_enabled,
123        }
124    }
125}
126
127/// Reads data from Parquet files
128#[derive(Clone)]
129pub struct ArrowReader {
130    batch_size: Option<usize>,
131    file_io: FileIO,
132    delete_file_loader: CachingDeleteFileLoader,
133
134    /// the maximum number of data files that can be fetched at the same time
135    concurrency_limit_data_files: usize,
136
137    row_group_filtering_enabled: bool,
138    row_selection_enabled: bool,
139}
140
141impl ArrowReader {
142    /// Take a stream of FileScanTasks and reads all the files.
143    /// Returns a stream of Arrow RecordBatches containing the data from the files
144    pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
145        let file_io = self.file_io.clone();
146        let batch_size = self.batch_size;
147        let concurrency_limit_data_files = self.concurrency_limit_data_files;
148        let row_group_filtering_enabled = self.row_group_filtering_enabled;
149        let row_selection_enabled = self.row_selection_enabled;
150
151        // Fast-path for single concurrency to avoid overhead of try_flatten_unordered
152        let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 {
153            Box::pin(
154                tasks
155                    .and_then(move |task| {
156                        let file_io = file_io.clone();
157
158                        Self::process_file_scan_task(
159                            task,
160                            batch_size,
161                            file_io,
162                            self.delete_file_loader.clone(),
163                            row_group_filtering_enabled,
164                            row_selection_enabled,
165                        )
166                    })
167                    .map_err(|err| {
168                        Error::new(ErrorKind::Unexpected, "file scan task generate failed")
169                            .with_source(err)
170                    })
171                    .try_flatten(),
172            )
173        } else {
174            Box::pin(
175                tasks
176                    .map_ok(move |task| {
177                        let file_io = file_io.clone();
178
179                        Self::process_file_scan_task(
180                            task,
181                            batch_size,
182                            file_io,
183                            self.delete_file_loader.clone(),
184                            row_group_filtering_enabled,
185                            row_selection_enabled,
186                        )
187                    })
188                    .map_err(|err| {
189                        Error::new(ErrorKind::Unexpected, "file scan task generate failed")
190                            .with_source(err)
191                    })
192                    .try_buffer_unordered(concurrency_limit_data_files)
193                    .try_flatten_unordered(concurrency_limit_data_files),
194            )
195        };
196
197        Ok(stream)
198    }
199
200    #[allow(clippy::too_many_arguments)]
201    async fn process_file_scan_task(
202        task: FileScanTask,
203        batch_size: Option<usize>,
204        file_io: FileIO,
205        delete_file_loader: CachingDeleteFileLoader,
206        row_group_filtering_enabled: bool,
207        row_selection_enabled: bool,
208    ) -> Result<ArrowRecordBatchStream> {
209        let should_load_page_index =
210            (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
211
212        let delete_filter_rx =
213            delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema));
214
215        // Migrated tables lack field IDs, requiring us to inspect the schema to choose
216        // between field-ID-based or position-based projection
217        let initial_stream_builder = Self::create_parquet_record_batch_stream_builder(
218            &task.data_file_path,
219            file_io.clone(),
220            should_load_page_index,
221            None,
222        )
223        .await?;
224
225        // Check if Parquet file has embedded field IDs
226        // Corresponds to Java's ParquetSchemaUtil.hasIds()
227        // Reference: parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java:118
228        let missing_field_ids = initial_stream_builder
229            .schema()
230            .fields()
231            .iter()
232            .next()
233            .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
234
235        // Three-branch schema resolution strategy matching Java's ReadConf constructor
236        //
237        // Per Iceberg spec Column Projection rules:
238        // "Columns in Iceberg data files are selected by field id. The table schema's column
239        //  names and order may change after a data file is written, and projection must be done
240        //  using field ids."
241        // https://iceberg.apache.org/spec/#column-projection
242        //
243        // When Parquet files lack field IDs (e.g., Hive/Spark migrations via add_files),
244        // we must assign field IDs BEFORE reading data to enable correct projection.
245        //
246        // Java's ReadConf determines field ID strategy:
247        // - Branch 1: hasIds(fileSchema) → trust embedded field IDs, use pruneColumns()
248        // - Branch 2: nameMapping present → applyNameMapping(), then pruneColumns()
249        // - Branch 3: fallback → addFallbackIds(), then pruneColumnsFallback()
250        let mut record_batch_stream_builder = if missing_field_ids {
251            // Parquet file lacks field IDs - must assign them before reading
252            let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
253                // Branch 2: Apply name mapping to assign correct Iceberg field IDs
254                // Per spec rule #2: "Use schema.name-mapping.default metadata to map field id
255                // to columns without field id"
256                // Corresponds to Java's ParquetSchemaUtil.applyNameMapping()
257                apply_name_mapping_to_arrow_schema(
258                    Arc::clone(initial_stream_builder.schema()),
259                    name_mapping,
260                )?
261            } else {
262                // Branch 3: No name mapping - use position-based fallback IDs
263                // Corresponds to Java's ParquetSchemaUtil.addFallbackIds()
264                add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema())
265            };
266
267            let options = ArrowReaderOptions::new().with_schema(arrow_schema);
268
269            Self::create_parquet_record_batch_stream_builder(
270                &task.data_file_path,
271                file_io.clone(),
272                should_load_page_index,
273                Some(options),
274            )
275            .await?
276        } else {
277            // Branch 1: File has embedded field IDs - trust them
278            initial_stream_builder
279        };
280
281        // Filter out metadata fields for Parquet projection (they don't exist in files)
282        let project_field_ids_without_metadata: Vec<i32> = task
283            .project_field_ids
284            .iter()
285            .filter(|&&id| !is_metadata_field(id))
286            .copied()
287            .collect();
288
289        // Create projection mask based on field IDs
290        // - If file has embedded IDs: field-ID-based projection (missing_field_ids=false)
291        // - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match)
292        // - If fallback IDs: position-based projection (missing_field_ids=true)
293        let projection_mask = Self::get_arrow_projection_mask(
294            &project_field_ids_without_metadata,
295            &task.schema,
296            record_batch_stream_builder.parquet_schema(),
297            record_batch_stream_builder.schema(),
298            missing_field_ids, // Whether to use position-based (true) or field-ID-based (false) projection
299        )?;
300
301        record_batch_stream_builder =
302            record_batch_stream_builder.with_projection(projection_mask.clone());
303
304        // RecordBatchTransformer performs any transformations required on the RecordBatches
305        // that come back from the file, such as type promotion, default column insertion,
306        // column re-ordering, partition constants, and virtual field addition (like _file)
307        let mut record_batch_transformer_builder =
308            RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
309
310        // Add the _file metadata column if it's in the projected fields
311        if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
312            let file_datum = Datum::string(task.data_file_path.clone());
313            record_batch_transformer_builder =
314                record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum);
315        }
316
317        if let (Some(partition_spec), Some(partition_data)) =
318            (task.partition_spec.clone(), task.partition.clone())
319        {
320            record_batch_transformer_builder =
321                record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
322        }
323
324        let mut record_batch_transformer = record_batch_transformer_builder.build();
325
326        if let Some(batch_size) = batch_size {
327            record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
328        }
329
330        let delete_filter = delete_filter_rx.await.unwrap()?;
331        let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
332
333        // In addition to the optional predicate supplied in the `FileScanTask`,
334        // we also have an optional predicate resulting from equality delete files.
335        // If both are present, we logical-AND them together to form a single filter
336        // predicate that we can pass to the `RecordBatchStreamBuilder`.
337        let final_predicate = match (&task.predicate, delete_predicate) {
338            (None, None) => None,
339            (Some(predicate), None) => Some(predicate.clone()),
340            (None, Some(ref predicate)) => Some(predicate.clone()),
341            (Some(filter_predicate), Some(delete_predicate)) => {
342                Some(filter_predicate.clone().and(delete_predicate))
343            }
344        };
345
346        // There are three possible sources for potential lists of selected RowGroup indices,
347        // and two for `RowSelection`s.
348        // Selected RowGroup index lists can come from three sources:
349        //   * When task.start and task.length specify a byte range (file splitting);
350        //   * When there are equality delete files that are applicable;
351        //   * When there is a scan predicate and row_group_filtering_enabled = true.
352        // `RowSelection`s can be created in either or both of the following cases:
353        //   * When there are positional delete files that are applicable;
354        //   * When there is a scan predicate and row_selection_enabled = true
355        // Note that row group filtering from predicates only happens when
356        // there is a scan predicate AND row_group_filtering_enabled = true,
357        // but we perform row selection filtering if there are applicable
358        // equality delete files OR (there is a scan predicate AND row_selection_enabled),
359        // since the only implemented method of applying positional deletes is
360        // by using a `RowSelection`.
361        let mut selected_row_group_indices = None;
362        let mut row_selection = None;
363
364        // Filter row groups based on byte range from task.start and task.length.
365        // If both start and length are 0, read the entire file (backwards compatibility).
366        if task.start != 0 || task.length != 0 {
367            let byte_range_filtered_row_groups = Self::filter_row_groups_by_byte_range(
368                record_batch_stream_builder.metadata(),
369                task.start,
370                task.length,
371            )?;
372            selected_row_group_indices = Some(byte_range_filtered_row_groups);
373        }
374
375        if let Some(predicate) = final_predicate {
376            let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
377                record_batch_stream_builder.parquet_schema(),
378                &predicate,
379            )?;
380
381            let row_filter = Self::get_row_filter(
382                &predicate,
383                record_batch_stream_builder.parquet_schema(),
384                &iceberg_field_ids,
385                &field_id_map,
386            )?;
387            record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
388
389            if row_group_filtering_enabled {
390                let predicate_filtered_row_groups = Self::get_selected_row_group_indices(
391                    &predicate,
392                    record_batch_stream_builder.metadata(),
393                    &field_id_map,
394                    &task.schema,
395                )?;
396
397                // Merge predicate-based filtering with byte range filtering (if present)
398                // by taking the intersection of both filters
399                selected_row_group_indices = match selected_row_group_indices {
400                    Some(byte_range_filtered) => {
401                        // Keep only row groups that are in both filters
402                        let intersection: Vec<usize> = byte_range_filtered
403                            .into_iter()
404                            .filter(|idx| predicate_filtered_row_groups.contains(idx))
405                            .collect();
406                        Some(intersection)
407                    }
408                    None => Some(predicate_filtered_row_groups),
409                };
410            }
411
412            if row_selection_enabled {
413                row_selection = Some(Self::get_row_selection_for_filter_predicate(
414                    &predicate,
415                    record_batch_stream_builder.metadata(),
416                    &selected_row_group_indices,
417                    &field_id_map,
418                    &task.schema,
419                )?);
420            }
421        }
422
423        let positional_delete_indexes = delete_filter.get_delete_vector(&task);
424
425        if let Some(positional_delete_indexes) = positional_delete_indexes {
426            let delete_row_selection = {
427                let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
428
429                Self::build_deletes_row_selection(
430                    record_batch_stream_builder.metadata().row_groups(),
431                    &selected_row_group_indices,
432                    &positional_delete_indexes,
433                )
434            }?;
435
436            // merge the row selection from the delete files with the row selection
437            // from the filter predicate, if there is one from the filter predicate
438            row_selection = match row_selection {
439                None => Some(delete_row_selection),
440                Some(filter_row_selection) => {
441                    Some(filter_row_selection.intersection(&delete_row_selection))
442                }
443            };
444        }
445
446        if let Some(row_selection) = row_selection {
447            record_batch_stream_builder =
448                record_batch_stream_builder.with_row_selection(row_selection);
449        }
450
451        if let Some(selected_row_group_indices) = selected_row_group_indices {
452            record_batch_stream_builder =
453                record_batch_stream_builder.with_row_groups(selected_row_group_indices);
454        }
455
456        // Build the batch stream and send all the RecordBatches that it generates
457        // to the requester.
458        let record_batch_stream =
459            record_batch_stream_builder
460                .build()?
461                .map(move |batch| match batch {
462                    Ok(batch) => {
463                        // Process the record batch (type promotion, column reordering, virtual fields, etc.)
464                        record_batch_transformer.process_record_batch(batch)
465                    }
466                    Err(err) => Err(err.into()),
467                });
468
469        Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
470    }
471
472    pub(crate) async fn create_parquet_record_batch_stream_builder(
473        data_file_path: &str,
474        file_io: FileIO,
475        should_load_page_index: bool,
476        arrow_reader_options: Option<ArrowReaderOptions>,
477    ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader>> {
478        // Get the metadata for the Parquet file we need to read and build
479        // a reader for the data within
480        let parquet_file = file_io.new_input(data_file_path)?;
481        let (parquet_metadata, parquet_reader) =
482            try_join!(parquet_file.metadata(), parquet_file.reader())?;
483        let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader)
484            .with_preload_column_index(true)
485            .with_preload_offset_index(true)
486            .with_preload_page_index(should_load_page_index);
487
488        // Create the record batch stream builder, which wraps the parquet file reader
489        let options = arrow_reader_options.unwrap_or_default();
490        let record_batch_stream_builder =
491            ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, options).await?;
492        Ok(record_batch_stream_builder)
493    }
494
495    /// computes a `RowSelection` from positional delete indices.
496    ///
497    /// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated
498    /// as having been deleted by a positional delete, taking into account any row groups that have
499    /// been skipped entirely by the filter predicate
500    fn build_deletes_row_selection(
501        row_group_metadata_list: &[RowGroupMetaData],
502        selected_row_groups: &Option<Vec<usize>>,
503        positional_deletes: &DeleteVector,
504    ) -> Result<RowSelection> {
505        let mut results: Vec<RowSelector> = Vec::new();
506        let mut selected_row_groups_idx = 0;
507        let mut current_row_group_base_idx: u64 = 0;
508        let mut delete_vector_iter = positional_deletes.iter();
509        let mut next_deleted_row_idx_opt = delete_vector_iter.next();
510
511        for (idx, row_group_metadata) in row_group_metadata_list.iter().enumerate() {
512            let row_group_num_rows = row_group_metadata.num_rows() as u64;
513            let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;
514
515            // if row group selection is enabled,
516            if let Some(selected_row_groups) = selected_row_groups {
517                // if we've consumed all the selected row groups, we're done
518                if selected_row_groups_idx == selected_row_groups.len() {
519                    break;
520                }
521
522                if idx == selected_row_groups[selected_row_groups_idx] {
523                    // we're in a selected row group. Increment selected_row_groups_idx
524                    // so that next time around the for loop we're looking for the next
525                    // selected row group
526                    selected_row_groups_idx += 1;
527                } else {
528                    // Advance iterator past all deletes in the skipped row group.
529                    // advance_to() positions the iterator to the first delete >= next_row_group_base_idx.
530                    // However, if our cached next_deleted_row_idx_opt is in the skipped range,
531                    // we need to call next() to update the cache with the newly positioned value.
532                    delete_vector_iter.advance_to(next_row_group_base_idx);
533                    // Only update the cache if the cached value is stale (in the skipped range)
534                    if let Some(cached_idx) = next_deleted_row_idx_opt
535                        && cached_idx < next_row_group_base_idx
536                    {
537                        next_deleted_row_idx_opt = delete_vector_iter.next();
538                    }
539
540                    // still increment the current page base index but then skip to the next row group
541                    // in the file
542                    current_row_group_base_idx += row_group_num_rows;
543                    continue;
544                }
545            }
546
547            let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
548                Some(next_deleted_row_idx) => {
549                    // if the index of the next deleted row is beyond this row group, add a selection for
550                    // the remainder of this row group and skip to the next row group
551                    if next_deleted_row_idx >= next_row_group_base_idx {
552                        results.push(RowSelector::select(row_group_num_rows as usize));
553                        current_row_group_base_idx += row_group_num_rows;
554                        continue;
555                    }
556
557                    next_deleted_row_idx
558                }
559
560                // If there are no more pos deletes, add a selector for the entirety of this row group.
561                _ => {
562                    results.push(RowSelector::select(row_group_num_rows as usize));
563                    current_row_group_base_idx += row_group_num_rows;
564                    continue;
565                }
566            };
567
568            let mut current_idx = current_row_group_base_idx;
569            'chunks: while next_deleted_row_idx < next_row_group_base_idx {
570                // `select` all rows that precede the next delete index
571                if current_idx < next_deleted_row_idx {
572                    let run_length = next_deleted_row_idx - current_idx;
573                    results.push(RowSelector::select(run_length as usize));
574                    current_idx += run_length;
575                }
576
577                // `skip` all consecutive deleted rows in the current row group
578                let mut run_length = 0;
579                while next_deleted_row_idx == current_idx
580                    && next_deleted_row_idx < next_row_group_base_idx
581                {
582                    run_length += 1;
583                    current_idx += 1;
584
585                    next_deleted_row_idx_opt = delete_vector_iter.next();
586                    next_deleted_row_idx = match next_deleted_row_idx_opt {
587                        Some(next_deleted_row_idx) => next_deleted_row_idx,
588                        _ => {
589                            // We've processed the final positional delete.
590                            // Conclude the skip and then break so that we select the remaining
591                            // rows in the row group and move on to the next row group
592                            results.push(RowSelector::skip(run_length));
593                            break 'chunks;
594                        }
595                    };
596                }
597                if run_length > 0 {
598                    results.push(RowSelector::skip(run_length));
599                }
600            }
601
602            if current_idx < next_row_group_base_idx {
603                results.push(RowSelector::select(
604                    (next_row_group_base_idx - current_idx) as usize,
605                ));
606            }
607
608            current_row_group_base_idx += row_group_num_rows;
609        }
610
611        Ok(results.into())
612    }
613
614    fn build_field_id_set_and_map(
615        parquet_schema: &SchemaDescriptor,
616        predicate: &BoundPredicate,
617    ) -> Result<(HashSet<i32>, HashMap<i32, usize>)> {
618        // Collects all Iceberg field IDs referenced in the filter predicate
619        let mut collector = CollectFieldIdVisitor {
620            field_ids: HashSet::default(),
621        };
622        visit(&mut collector, predicate)?;
623
624        let iceberg_field_ids = collector.field_ids();
625
626        // Without embedded field IDs, we fall back to position-based mapping for compatibility
627        let field_id_map = match build_field_id_map(parquet_schema)? {
628            Some(map) => map,
629            None => build_fallback_field_id_map(parquet_schema),
630        };
631
632        Ok((iceberg_field_ids, field_id_map))
633    }
634
635    /// Recursively extract leaf field IDs because Parquet projection works at the leaf column level.
636    /// Nested types (struct/list/map) are flattened in Parquet's columnar format.
637    fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
638        match field.field_type.as_ref() {
639            Type::Primitive(_) => {
640                field_ids.push(field.id);
641            }
642            Type::Struct(struct_type) => {
643                for nested_field in struct_type.fields() {
644                    Self::include_leaf_field_id(nested_field, field_ids);
645                }
646            }
647            Type::List(list_type) => {
648                Self::include_leaf_field_id(&list_type.element_field, field_ids);
649            }
650            Type::Map(map_type) => {
651                Self::include_leaf_field_id(&map_type.key_field, field_ids);
652                Self::include_leaf_field_id(&map_type.value_field, field_ids);
653            }
654        }
655    }
656
657    fn get_arrow_projection_mask(
658        field_ids: &[i32],
659        iceberg_schema_of_task: &Schema,
660        parquet_schema: &SchemaDescriptor,
661        arrow_schema: &ArrowSchemaRef,
662        use_fallback: bool, // Whether file lacks embedded field IDs (e.g., migrated from Hive/Spark)
663    ) -> Result<ProjectionMask> {
664        fn type_promotion_is_valid(
665            file_type: Option<&PrimitiveType>,
666            projected_type: Option<&PrimitiveType>,
667        ) -> bool {
668            match (file_type, projected_type) {
669                (Some(lhs), Some(rhs)) if lhs == rhs => true,
670                (Some(PrimitiveType::Int), Some(PrimitiveType::Long)) => true,
671                (Some(PrimitiveType::Float), Some(PrimitiveType::Double)) => true,
672                (
673                    Some(PrimitiveType::Decimal {
674                        precision: file_precision,
675                        scale: file_scale,
676                    }),
677                    Some(PrimitiveType::Decimal {
678                        precision: requested_precision,
679                        scale: requested_scale,
680                    }),
681                ) if requested_precision >= file_precision && file_scale == requested_scale => true,
682                // Uuid will be store as Fixed(16) in parquet file, so the read back type will be Fixed(16).
683                (Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true,
684                _ => false,
685            }
686        }
687
688        if field_ids.is_empty() {
689            return Ok(ProjectionMask::all());
690        }
691
692        if use_fallback {
693            // Position-based projection necessary because file lacks embedded field IDs
694            Self::get_arrow_projection_mask_fallback(field_ids, parquet_schema)
695        } else {
696            // Field-ID-based projection using embedded field IDs from Parquet metadata
697
698            // Parquet's columnar format requires leaf-level (not top-level struct/list/map) projection
699            let mut leaf_field_ids = vec![];
700            for field_id in field_ids {
701                let field = iceberg_schema_of_task.field_by_id(*field_id);
702                if let Some(field) = field {
703                    Self::include_leaf_field_id(field, &mut leaf_field_ids);
704                }
705            }
706
707            Self::get_arrow_projection_mask_with_field_ids(
708                &leaf_field_ids,
709                iceberg_schema_of_task,
710                parquet_schema,
711                arrow_schema,
712                type_promotion_is_valid,
713            )
714        }
715    }
716
717    /// Standard projection using embedded field IDs from Parquet metadata.
718    /// For iceberg-java compatibility with ParquetSchemaUtil.pruneColumns().
719    fn get_arrow_projection_mask_with_field_ids(
720        leaf_field_ids: &[i32],
721        iceberg_schema_of_task: &Schema,
722        parquet_schema: &SchemaDescriptor,
723        arrow_schema: &ArrowSchemaRef,
724        type_promotion_is_valid: fn(Option<&PrimitiveType>, Option<&PrimitiveType>) -> bool,
725    ) -> Result<ProjectionMask> {
726        let mut column_map = HashMap::new();
727        let fields = arrow_schema.fields();
728
729        // Pre-project only the fields that have been selected, possibly avoiding converting
730        // some Arrow types that are not yet supported.
731        let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
732        let projected_arrow_schema = ArrowSchema::new_with_metadata(
733            fields.filter_leaves(|_, f| {
734                f.metadata()
735                    .get(PARQUET_FIELD_ID_META_KEY)
736                    .and_then(|field_id| i32::from_str(field_id).ok())
737                    .is_some_and(|field_id| {
738                        projected_fields.insert((*f).clone(), field_id);
739                        leaf_field_ids.contains(&field_id)
740                    })
741            }),
742            arrow_schema.metadata().clone(),
743        );
744        let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
745
746        fields.filter_leaves(|idx, field| {
747            let Some(field_id) = projected_fields.get(field).cloned() else {
748                return false;
749            };
750
751            let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
752            let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
753
754            if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
755                return false;
756            }
757
758            if !type_promotion_is_valid(
759                parquet_iceberg_field
760                    .unwrap()
761                    .field_type
762                    .as_primitive_type(),
763                iceberg_field.unwrap().field_type.as_primitive_type(),
764            ) {
765                return false;
766            }
767
768            column_map.insert(field_id, idx);
769            true
770        });
771
772        // Schema evolution: New columns may not exist in old Parquet files.
773        // We only project existing columns; RecordBatchTransformer adds default/NULL values.
774        let mut indices = vec![];
775        for field_id in leaf_field_ids {
776            if let Some(col_idx) = column_map.get(field_id) {
777                indices.push(*col_idx);
778            }
779        }
780
781        if indices.is_empty() {
782            // Edge case: All requested columns are new (don't exist in file).
783            // Project all columns so RecordBatchTransformer has a batch to transform.
784            Ok(ProjectionMask::all())
785        } else {
786            Ok(ProjectionMask::leaves(parquet_schema, indices))
787        }
788    }
789
790    /// Fallback projection for Parquet files without field IDs.
791    /// Uses position-based matching: field ID N → column position N-1.
792    /// Projects entire top-level columns (including nested content) for iceberg-java compatibility.
793    fn get_arrow_projection_mask_fallback(
794        field_ids: &[i32],
795        parquet_schema: &SchemaDescriptor,
796    ) -> Result<ProjectionMask> {
797        // Position-based: field_id N → column N-1 (field IDs are 1-indexed)
798        let parquet_root_fields = parquet_schema.root_schema().get_fields();
799        let mut root_indices = vec![];
800
801        for field_id in field_ids.iter() {
802            let parquet_pos = (*field_id - 1) as usize;
803
804            if parquet_pos < parquet_root_fields.len() {
805                root_indices.push(parquet_pos);
806            }
807            // RecordBatchTransformer adds missing columns with NULL values
808        }
809
810        if root_indices.is_empty() {
811            Ok(ProjectionMask::all())
812        } else {
813            Ok(ProjectionMask::roots(parquet_schema, root_indices))
814        }
815    }
816
817    fn get_row_filter(
818        predicates: &BoundPredicate,
819        parquet_schema: &SchemaDescriptor,
820        iceberg_field_ids: &HashSet<i32>,
821        field_id_map: &HashMap<i32, usize>,
822    ) -> Result<RowFilter> {
823        // Collect Parquet column indices from field ids.
824        // If the field id is not found in Parquet schema, it will be ignored due to schema evolution.
825        let mut column_indices = iceberg_field_ids
826            .iter()
827            .filter_map(|field_id| field_id_map.get(field_id).cloned())
828            .collect::<Vec<_>>();
829        column_indices.sort();
830
831        // The converter that converts `BoundPredicates` to `ArrowPredicates`
832        let mut converter = PredicateConverter {
833            parquet_schema,
834            column_map: field_id_map,
835            column_indices: &column_indices,
836        };
837
838        // After collecting required leaf column indices used in the predicate,
839        // creates the projection mask for the Arrow predicates.
840        let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
841        let predicate_func = visit(&mut converter, predicates)?;
842        let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
843        Ok(RowFilter::new(vec![Box::new(arrow_predicate)]))
844    }
845
846    fn get_selected_row_group_indices(
847        predicate: &BoundPredicate,
848        parquet_metadata: &Arc<ParquetMetaData>,
849        field_id_map: &HashMap<i32, usize>,
850        snapshot_schema: &Schema,
851    ) -> Result<Vec<usize>> {
852        let row_groups_metadata = parquet_metadata.row_groups();
853        let mut results = Vec::with_capacity(row_groups_metadata.len());
854
855        for (idx, row_group_metadata) in row_groups_metadata.iter().enumerate() {
856            if RowGroupMetricsEvaluator::eval(
857                predicate,
858                row_group_metadata,
859                field_id_map,
860                snapshot_schema,
861            )? {
862                results.push(idx);
863            }
864        }
865
866        Ok(results)
867    }
868
869    fn get_row_selection_for_filter_predicate(
870        predicate: &BoundPredicate,
871        parquet_metadata: &Arc<ParquetMetaData>,
872        selected_row_groups: &Option<Vec<usize>>,
873        field_id_map: &HashMap<i32, usize>,
874        snapshot_schema: &Schema,
875    ) -> Result<RowSelection> {
876        let Some(column_index) = parquet_metadata.column_index() else {
877            return Err(Error::new(
878                ErrorKind::Unexpected,
879                "Parquet file metadata does not contain a column index",
880            ));
881        };
882
883        let Some(offset_index) = parquet_metadata.offset_index() else {
884            return Err(Error::new(
885                ErrorKind::Unexpected,
886                "Parquet file metadata does not contain an offset index",
887            ));
888        };
889
890        // If all row groups were filtered out, return an empty RowSelection (select no rows)
891        if let Some(selected_row_groups) = selected_row_groups
892            && selected_row_groups.is_empty()
893        {
894            return Ok(RowSelection::from(Vec::new()));
895        }
896
897        let mut selected_row_groups_idx = 0;
898
899        let page_index = column_index
900            .iter()
901            .enumerate()
902            .zip(offset_index)
903            .zip(parquet_metadata.row_groups());
904
905        let mut results = Vec::new();
906        for (((idx, column_index), offset_index), row_group_metadata) in page_index {
907            if let Some(selected_row_groups) = selected_row_groups {
908                // skip row groups that aren't present in selected_row_groups
909                if idx == selected_row_groups[selected_row_groups_idx] {
910                    selected_row_groups_idx += 1;
911                } else {
912                    continue;
913                }
914            }
915
916            let selections_for_page = PageIndexEvaluator::eval(
917                predicate,
918                column_index,
919                offset_index,
920                row_group_metadata,
921                field_id_map,
922                snapshot_schema,
923            )?;
924
925            results.push(selections_for_page);
926
927            if let Some(selected_row_groups) = selected_row_groups
928                && selected_row_groups_idx == selected_row_groups.len()
929            {
930                break;
931            }
932        }
933
934        Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
935    }
936
937    /// Filters row groups by byte range to support Iceberg's file splitting.
938    ///
939    /// Iceberg splits large files at row group boundaries, so we only read row groups
940    /// whose byte ranges overlap with [start, start+length).
941    fn filter_row_groups_by_byte_range(
942        parquet_metadata: &Arc<ParquetMetaData>,
943        start: u64,
944        length: u64,
945    ) -> Result<Vec<usize>> {
946        let row_groups = parquet_metadata.row_groups();
947        let mut selected = Vec::new();
948        let end = start + length;
949
950        // Row groups are stored sequentially after the 4-byte magic header.
951        let mut current_byte_offset = 4u64;
952
953        for (idx, row_group) in row_groups.iter().enumerate() {
954            let row_group_size = row_group.compressed_size() as u64;
955            let row_group_end = current_byte_offset + row_group_size;
956
957            if current_byte_offset < end && start < row_group_end {
958                selected.push(idx);
959            }
960
961            current_byte_offset = row_group_end;
962        }
963
964        Ok(selected)
965    }
966}
967
968/// Build the map of parquet field id to Parquet column index in the schema.
969/// Returns None if the Parquet file doesn't have field IDs embedded (e.g., migrated tables).
970fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<Option<HashMap<i32, usize>>> {
971    let mut column_map = HashMap::new();
972
973    for (idx, field) in parquet_schema.columns().iter().enumerate() {
974        let field_type = field.self_type();
975        match field_type {
976            ParquetType::PrimitiveType { basic_info, .. } => {
977                if !basic_info.has_id() {
978                    return Ok(None);
979                }
980                column_map.insert(basic_info.id(), idx);
981            }
982            ParquetType::GroupType { .. } => {
983                return Err(Error::new(
984                    ErrorKind::DataInvalid,
985                    format!(
986                        "Leave column in schema should be primitive type but got {field_type:?}"
987                    ),
988                ));
989            }
990        };
991    }
992
993    Ok(Some(column_map))
994}
995
996/// Build a fallback field ID map for Parquet files without embedded field IDs.
997/// Position-based (1, 2, 3, ...) for compatibility with iceberg-java migrations.
998fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap<i32, usize> {
999    let mut column_map = HashMap::new();
1000
1001    // 1-indexed to match iceberg-java's convention
1002    for (idx, _field) in parquet_schema.columns().iter().enumerate() {
1003        let field_id = (idx + 1) as i32;
1004        column_map.insert(field_id, idx);
1005    }
1006
1007    column_map
1008}
1009
1010/// Apply name mapping to Arrow schema for Parquet files lacking field IDs.
1011///
1012/// Assigns Iceberg field IDs based on column names using the name mapping,
1013/// enabling correct projection on migrated files (e.g., from Hive/Spark via add_files).
1014///
1015/// Per Iceberg spec Column Projection rule #2:
1016/// "Use schema.name-mapping.default metadata to map field id to columns without field id"
1017/// https://iceberg.apache.org/spec/#column-projection
1018///
1019/// Corresponds to Java's ParquetSchemaUtil.applyNameMapping() and ApplyNameMapping visitor.
1020/// The key difference is Java operates on Parquet MessageType, while we operate on Arrow Schema.
1021///
1022/// # Arguments
1023/// * `arrow_schema` - Arrow schema from Parquet file (without field IDs)
1024/// * `name_mapping` - Name mapping from table metadata (TableProperties.DEFAULT_NAME_MAPPING)
1025///
1026/// # Returns
1027/// Arrow schema with field IDs assigned based on name mapping
1028fn apply_name_mapping_to_arrow_schema(
1029    arrow_schema: ArrowSchemaRef,
1030    name_mapping: &NameMapping,
1031) -> Result<Arc<ArrowSchema>> {
1032    debug_assert!(
1033        arrow_schema
1034            .fields()
1035            .iter()
1036            .next()
1037            .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1038        "Schema already has field IDs - name mapping should not be applied"
1039    );
1040
1041    use arrow_schema::Field;
1042
1043    let fields_with_mapped_ids: Vec<_> = arrow_schema
1044        .fields()
1045        .iter()
1046        .map(|field| {
1047            // Look up this column name in name mapping to get the Iceberg field ID.
1048            // Corresponds to Java's ApplyNameMapping visitor which calls
1049            // nameMapping.find(currentPath()) and returns field.withId() if found.
1050            //
1051            // If the field isn't in the mapping, leave it WITHOUT assigning an ID
1052            // (matching Java's behavior of returning the field unchanged).
1053            // Later, during projection, fields without IDs are filtered out.
1054            let mapped_field_opt = name_mapping
1055                .fields()
1056                .iter()
1057                .find(|f| f.names().contains(&field.name().to_string()));
1058
1059            let mut metadata = field.metadata().clone();
1060
1061            if let Some(mapped_field) = mapped_field_opt
1062                && let Some(field_id) = mapped_field.field_id()
1063            {
1064                // Field found in mapping with a field_id → assign it
1065                metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1066            }
1067            // If field_id is None, leave the field without an ID (will be filtered by projection)
1068
1069            Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1070                .with_metadata(metadata)
1071        })
1072        .collect();
1073
1074    Ok(Arc::new(ArrowSchema::new_with_metadata(
1075        fields_with_mapped_ids,
1076        arrow_schema.metadata().clone(),
1077    )))
1078}
1079
1080/// Add position-based fallback field IDs to Arrow schema for Parquet files lacking them.
1081/// Enables projection on migrated files (e.g., from Hive/Spark).
1082///
1083/// Why at schema level (not per-batch): Efficiency - avoids repeated schema modification.
1084/// Why only top-level: Nested projection uses leaf column indices, not parent struct IDs.
1085/// Why 1-indexed: Compatibility with iceberg-java's ParquetSchemaUtil.addFallbackIds().
1086fn add_fallback_field_ids_to_arrow_schema(arrow_schema: &ArrowSchemaRef) -> Arc<ArrowSchema> {
1087    debug_assert!(
1088        arrow_schema
1089            .fields()
1090            .iter()
1091            .next()
1092            .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1093        "Schema already has field IDs"
1094    );
1095
1096    use arrow_schema::Field;
1097
1098    let fields_with_fallback_ids: Vec<_> = arrow_schema
1099        .fields()
1100        .iter()
1101        .enumerate()
1102        .map(|(pos, field)| {
1103            let mut metadata = field.metadata().clone();
1104            let field_id = (pos + 1) as i32; // 1-indexed for Java compatibility
1105            metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1106
1107            Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1108                .with_metadata(metadata)
1109        })
1110        .collect();
1111
1112    Arc::new(ArrowSchema::new_with_metadata(
1113        fields_with_fallback_ids,
1114        arrow_schema.metadata().clone(),
1115    ))
1116}
1117
1118/// A visitor to collect field ids from bound predicates.
1119struct CollectFieldIdVisitor {
1120    field_ids: HashSet<i32>,
1121}
1122
1123impl CollectFieldIdVisitor {
1124    fn field_ids(self) -> HashSet<i32> {
1125        self.field_ids
1126    }
1127}
1128
1129impl BoundPredicateVisitor for CollectFieldIdVisitor {
1130    type T = ();
1131
1132    fn always_true(&mut self) -> Result<()> {
1133        Ok(())
1134    }
1135
1136    fn always_false(&mut self) -> Result<()> {
1137        Ok(())
1138    }
1139
1140    fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1141        Ok(())
1142    }
1143
1144    fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1145        Ok(())
1146    }
1147
1148    fn not(&mut self, _inner: ()) -> Result<()> {
1149        Ok(())
1150    }
1151
1152    fn is_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1153        self.field_ids.insert(reference.field().id);
1154        Ok(())
1155    }
1156
1157    fn not_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1158        self.field_ids.insert(reference.field().id);
1159        Ok(())
1160    }
1161
1162    fn is_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1163        self.field_ids.insert(reference.field().id);
1164        Ok(())
1165    }
1166
1167    fn not_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1168        self.field_ids.insert(reference.field().id);
1169        Ok(())
1170    }
1171
1172    fn less_than(
1173        &mut self,
1174        reference: &BoundReference,
1175        _literal: &Datum,
1176        _predicate: &BoundPredicate,
1177    ) -> Result<()> {
1178        self.field_ids.insert(reference.field().id);
1179        Ok(())
1180    }
1181
1182    fn less_than_or_eq(
1183        &mut self,
1184        reference: &BoundReference,
1185        _literal: &Datum,
1186        _predicate: &BoundPredicate,
1187    ) -> Result<()> {
1188        self.field_ids.insert(reference.field().id);
1189        Ok(())
1190    }
1191
1192    fn greater_than(
1193        &mut self,
1194        reference: &BoundReference,
1195        _literal: &Datum,
1196        _predicate: &BoundPredicate,
1197    ) -> Result<()> {
1198        self.field_ids.insert(reference.field().id);
1199        Ok(())
1200    }
1201
1202    fn greater_than_or_eq(
1203        &mut self,
1204        reference: &BoundReference,
1205        _literal: &Datum,
1206        _predicate: &BoundPredicate,
1207    ) -> Result<()> {
1208        self.field_ids.insert(reference.field().id);
1209        Ok(())
1210    }
1211
1212    fn eq(
1213        &mut self,
1214        reference: &BoundReference,
1215        _literal: &Datum,
1216        _predicate: &BoundPredicate,
1217    ) -> Result<()> {
1218        self.field_ids.insert(reference.field().id);
1219        Ok(())
1220    }
1221
1222    fn not_eq(
1223        &mut self,
1224        reference: &BoundReference,
1225        _literal: &Datum,
1226        _predicate: &BoundPredicate,
1227    ) -> Result<()> {
1228        self.field_ids.insert(reference.field().id);
1229        Ok(())
1230    }
1231
1232    fn starts_with(
1233        &mut self,
1234        reference: &BoundReference,
1235        _literal: &Datum,
1236        _predicate: &BoundPredicate,
1237    ) -> Result<()> {
1238        self.field_ids.insert(reference.field().id);
1239        Ok(())
1240    }
1241
1242    fn not_starts_with(
1243        &mut self,
1244        reference: &BoundReference,
1245        _literal: &Datum,
1246        _predicate: &BoundPredicate,
1247    ) -> Result<()> {
1248        self.field_ids.insert(reference.field().id);
1249        Ok(())
1250    }
1251
1252    fn r#in(
1253        &mut self,
1254        reference: &BoundReference,
1255        _literals: &FnvHashSet<Datum>,
1256        _predicate: &BoundPredicate,
1257    ) -> Result<()> {
1258        self.field_ids.insert(reference.field().id);
1259        Ok(())
1260    }
1261
1262    fn not_in(
1263        &mut self,
1264        reference: &BoundReference,
1265        _literals: &FnvHashSet<Datum>,
1266        _predicate: &BoundPredicate,
1267    ) -> Result<()> {
1268        self.field_ids.insert(reference.field().id);
1269        Ok(())
1270    }
1271}
1272
1273/// A visitor to convert Iceberg bound predicates to Arrow predicates.
1274struct PredicateConverter<'a> {
1275    /// The Parquet schema descriptor.
1276    pub parquet_schema: &'a SchemaDescriptor,
1277    /// The map between field id and leaf column index in Parquet schema.
1278    pub column_map: &'a HashMap<i32, usize>,
1279    /// The required column indices in Parquet schema for the predicates.
1280    pub column_indices: &'a Vec<usize>,
1281}
1282
1283impl PredicateConverter<'_> {
1284    /// When visiting a bound reference, we return index of the leaf column in the
1285    /// required column indices which is used to project the column in the record batch.
1286    /// Return None if the field id is not found in the column map, which is possible
1287    /// due to schema evolution.
1288    fn bound_reference(&mut self, reference: &BoundReference) -> Result<Option<usize>> {
1289        // The leaf column's index in Parquet schema.
1290        if let Some(column_idx) = self.column_map.get(&reference.field().id) {
1291            if self.parquet_schema.get_column_root(*column_idx).is_group() {
1292                return Err(Error::new(
1293                    ErrorKind::DataInvalid,
1294                    format!(
1295                        "Leave column `{}` in predicates isn't a root column in Parquet schema.",
1296                        reference.field().name
1297                    ),
1298                ));
1299            }
1300
1301            // The leaf column's index in the required column indices.
1302            let index = self
1303                .column_indices
1304                .iter()
1305                .position(|&idx| idx == *column_idx)
1306                .ok_or(Error::new(
1307                    ErrorKind::DataInvalid,
1308                    format!(
1309                "Leave column `{}` in predicates cannot be found in the required column indices.",
1310                reference.field().name
1311            ),
1312                ))?;
1313
1314            Ok(Some(index))
1315        } else {
1316            Ok(None)
1317        }
1318    }
1319
1320    /// Build an Arrow predicate that always returns true.
1321    fn build_always_true(&self) -> Result<Box<PredicateResult>> {
1322        Ok(Box::new(|batch| {
1323            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
1324        }))
1325    }
1326
1327    /// Build an Arrow predicate that always returns false.
1328    fn build_always_false(&self) -> Result<Box<PredicateResult>> {
1329        Ok(Box::new(|batch| {
1330            Ok(BooleanArray::from(vec![false; batch.num_rows()]))
1331        }))
1332    }
1333}
1334
1335/// Gets the leaf column from the record batch for the required column index. Only
1336/// supports top-level columns for now.
1337fn project_column(
1338    batch: &RecordBatch,
1339    column_idx: usize,
1340) -> std::result::Result<ArrayRef, ArrowError> {
1341    let column = batch.column(column_idx);
1342
1343    match column.data_type() {
1344        DataType::Struct(_) => Err(ArrowError::SchemaError(
1345            "Does not support struct column yet.".to_string(),
1346        )),
1347        _ => Ok(column.clone()),
1348    }
1349}
1350
1351type PredicateResult =
1352    dyn FnMut(RecordBatch) -> std::result::Result<BooleanArray, ArrowError> + Send + 'static;
1353
1354impl BoundPredicateVisitor for PredicateConverter<'_> {
1355    type T = Box<PredicateResult>;
1356
1357    fn always_true(&mut self) -> Result<Box<PredicateResult>> {
1358        self.build_always_true()
1359    }
1360
1361    fn always_false(&mut self) -> Result<Box<PredicateResult>> {
1362        self.build_always_false()
1363    }
1364
1365    fn and(
1366        &mut self,
1367        mut lhs: Box<PredicateResult>,
1368        mut rhs: Box<PredicateResult>,
1369    ) -> Result<Box<PredicateResult>> {
1370        Ok(Box::new(move |batch| {
1371            let left = lhs(batch.clone())?;
1372            let right = rhs(batch)?;
1373            and_kleene(&left, &right)
1374        }))
1375    }
1376
1377    fn or(
1378        &mut self,
1379        mut lhs: Box<PredicateResult>,
1380        mut rhs: Box<PredicateResult>,
1381    ) -> Result<Box<PredicateResult>> {
1382        Ok(Box::new(move |batch| {
1383            let left = lhs(batch.clone())?;
1384            let right = rhs(batch)?;
1385            or_kleene(&left, &right)
1386        }))
1387    }
1388
1389    fn not(&mut self, mut inner: Box<PredicateResult>) -> Result<Box<PredicateResult>> {
1390        Ok(Box::new(move |batch| {
1391            let pred_ret = inner(batch)?;
1392            not(&pred_ret)
1393        }))
1394    }
1395
1396    fn is_null(
1397        &mut self,
1398        reference: &BoundReference,
1399        _predicate: &BoundPredicate,
1400    ) -> Result<Box<PredicateResult>> {
1401        if let Some(idx) = self.bound_reference(reference)? {
1402            Ok(Box::new(move |batch| {
1403                let column = project_column(&batch, idx)?;
1404                is_null(&column)
1405            }))
1406        } else {
1407            // A missing column, treating it as null.
1408            self.build_always_true()
1409        }
1410    }
1411
1412    fn not_null(
1413        &mut self,
1414        reference: &BoundReference,
1415        _predicate: &BoundPredicate,
1416    ) -> Result<Box<PredicateResult>> {
1417        if let Some(idx) = self.bound_reference(reference)? {
1418            Ok(Box::new(move |batch| {
1419                let column = project_column(&batch, idx)?;
1420                is_not_null(&column)
1421            }))
1422        } else {
1423            // A missing column, treating it as null.
1424            self.build_always_false()
1425        }
1426    }
1427
1428    fn is_nan(
1429        &mut self,
1430        reference: &BoundReference,
1431        _predicate: &BoundPredicate,
1432    ) -> Result<Box<PredicateResult>> {
1433        if self.bound_reference(reference)?.is_some() {
1434            self.build_always_true()
1435        } else {
1436            // A missing column, treating it as null.
1437            self.build_always_false()
1438        }
1439    }
1440
1441    fn not_nan(
1442        &mut self,
1443        reference: &BoundReference,
1444        _predicate: &BoundPredicate,
1445    ) -> Result<Box<PredicateResult>> {
1446        if self.bound_reference(reference)?.is_some() {
1447            self.build_always_false()
1448        } else {
1449            // A missing column, treating it as null.
1450            self.build_always_true()
1451        }
1452    }
1453
1454    fn less_than(
1455        &mut self,
1456        reference: &BoundReference,
1457        literal: &Datum,
1458        _predicate: &BoundPredicate,
1459    ) -> Result<Box<PredicateResult>> {
1460        if let Some(idx) = self.bound_reference(reference)? {
1461            let literal = get_arrow_datum(literal)?;
1462
1463            Ok(Box::new(move |batch| {
1464                let left = project_column(&batch, idx)?;
1465                let literal = try_cast_literal(&literal, left.data_type())?;
1466                lt(&left, literal.as_ref())
1467            }))
1468        } else {
1469            // A missing column, treating it as null.
1470            self.build_always_true()
1471        }
1472    }
1473
1474    fn less_than_or_eq(
1475        &mut self,
1476        reference: &BoundReference,
1477        literal: &Datum,
1478        _predicate: &BoundPredicate,
1479    ) -> Result<Box<PredicateResult>> {
1480        if let Some(idx) = self.bound_reference(reference)? {
1481            let literal = get_arrow_datum(literal)?;
1482
1483            Ok(Box::new(move |batch| {
1484                let left = project_column(&batch, idx)?;
1485                let literal = try_cast_literal(&literal, left.data_type())?;
1486                lt_eq(&left, literal.as_ref())
1487            }))
1488        } else {
1489            // A missing column, treating it as null.
1490            self.build_always_true()
1491        }
1492    }
1493
1494    fn greater_than(
1495        &mut self,
1496        reference: &BoundReference,
1497        literal: &Datum,
1498        _predicate: &BoundPredicate,
1499    ) -> Result<Box<PredicateResult>> {
1500        if let Some(idx) = self.bound_reference(reference)? {
1501            let literal = get_arrow_datum(literal)?;
1502
1503            Ok(Box::new(move |batch| {
1504                let left = project_column(&batch, idx)?;
1505                let literal = try_cast_literal(&literal, left.data_type())?;
1506                gt(&left, literal.as_ref())
1507            }))
1508        } else {
1509            // A missing column, treating it as null.
1510            self.build_always_false()
1511        }
1512    }
1513
1514    fn greater_than_or_eq(
1515        &mut self,
1516        reference: &BoundReference,
1517        literal: &Datum,
1518        _predicate: &BoundPredicate,
1519    ) -> Result<Box<PredicateResult>> {
1520        if let Some(idx) = self.bound_reference(reference)? {
1521            let literal = get_arrow_datum(literal)?;
1522
1523            Ok(Box::new(move |batch| {
1524                let left = project_column(&batch, idx)?;
1525                let literal = try_cast_literal(&literal, left.data_type())?;
1526                gt_eq(&left, literal.as_ref())
1527            }))
1528        } else {
1529            // A missing column, treating it as null.
1530            self.build_always_false()
1531        }
1532    }
1533
1534    fn eq(
1535        &mut self,
1536        reference: &BoundReference,
1537        literal: &Datum,
1538        _predicate: &BoundPredicate,
1539    ) -> Result<Box<PredicateResult>> {
1540        if let Some(idx) = self.bound_reference(reference)? {
1541            let literal = get_arrow_datum(literal)?;
1542
1543            Ok(Box::new(move |batch| {
1544                let left = project_column(&batch, idx)?;
1545                let literal = try_cast_literal(&literal, left.data_type())?;
1546                eq(&left, literal.as_ref())
1547            }))
1548        } else {
1549            // A missing column, treating it as null.
1550            self.build_always_false()
1551        }
1552    }
1553
1554    fn not_eq(
1555        &mut self,
1556        reference: &BoundReference,
1557        literal: &Datum,
1558        _predicate: &BoundPredicate,
1559    ) -> Result<Box<PredicateResult>> {
1560        if let Some(idx) = self.bound_reference(reference)? {
1561            let literal = get_arrow_datum(literal)?;
1562
1563            Ok(Box::new(move |batch| {
1564                let left = project_column(&batch, idx)?;
1565                let literal = try_cast_literal(&literal, left.data_type())?;
1566                neq(&left, literal.as_ref())
1567            }))
1568        } else {
1569            // A missing column, treating it as null.
1570            self.build_always_false()
1571        }
1572    }
1573
1574    fn starts_with(
1575        &mut self,
1576        reference: &BoundReference,
1577        literal: &Datum,
1578        _predicate: &BoundPredicate,
1579    ) -> Result<Box<PredicateResult>> {
1580        if let Some(idx) = self.bound_reference(reference)? {
1581            let literal = get_arrow_datum(literal)?;
1582
1583            Ok(Box::new(move |batch| {
1584                let left = project_column(&batch, idx)?;
1585                let literal = try_cast_literal(&literal, left.data_type())?;
1586                starts_with(&left, literal.as_ref())
1587            }))
1588        } else {
1589            // A missing column, treating it as null.
1590            self.build_always_false()
1591        }
1592    }
1593
1594    fn not_starts_with(
1595        &mut self,
1596        reference: &BoundReference,
1597        literal: &Datum,
1598        _predicate: &BoundPredicate,
1599    ) -> Result<Box<PredicateResult>> {
1600        if let Some(idx) = self.bound_reference(reference)? {
1601            let literal = get_arrow_datum(literal)?;
1602
1603            Ok(Box::new(move |batch| {
1604                let left = project_column(&batch, idx)?;
1605                let literal = try_cast_literal(&literal, left.data_type())?;
1606                // update here if arrow ever adds a native not_starts_with
1607                not(&starts_with(&left, literal.as_ref())?)
1608            }))
1609        } else {
1610            // A missing column, treating it as null.
1611            self.build_always_true()
1612        }
1613    }
1614
1615    fn r#in(
1616        &mut self,
1617        reference: &BoundReference,
1618        literals: &FnvHashSet<Datum>,
1619        _predicate: &BoundPredicate,
1620    ) -> Result<Box<PredicateResult>> {
1621        if let Some(idx) = self.bound_reference(reference)? {
1622            let literals: Vec<_> = literals
1623                .iter()
1624                .map(|lit| get_arrow_datum(lit).unwrap())
1625                .collect();
1626
1627            Ok(Box::new(move |batch| {
1628                // update this if arrow ever adds a native is_in kernel
1629                let left = project_column(&batch, idx)?;
1630
1631                let mut acc = BooleanArray::from(vec![false; batch.num_rows()]);
1632                for literal in &literals {
1633                    let literal = try_cast_literal(literal, left.data_type())?;
1634                    acc = or(&acc, &eq(&left, literal.as_ref())?)?
1635                }
1636
1637                Ok(acc)
1638            }))
1639        } else {
1640            // A missing column, treating it as null.
1641            self.build_always_false()
1642        }
1643    }
1644
1645    fn not_in(
1646        &mut self,
1647        reference: &BoundReference,
1648        literals: &FnvHashSet<Datum>,
1649        _predicate: &BoundPredicate,
1650    ) -> Result<Box<PredicateResult>> {
1651        if let Some(idx) = self.bound_reference(reference)? {
1652            let literals: Vec<_> = literals
1653                .iter()
1654                .map(|lit| get_arrow_datum(lit).unwrap())
1655                .collect();
1656
1657            Ok(Box::new(move |batch| {
1658                // update this if arrow ever adds a native not_in kernel
1659                let left = project_column(&batch, idx)?;
1660                let mut acc = BooleanArray::from(vec![true; batch.num_rows()]);
1661                for literal in &literals {
1662                    let literal = try_cast_literal(literal, left.data_type())?;
1663                    acc = and(&acc, &neq(&left, literal.as_ref())?)?
1664                }
1665
1666                Ok(acc)
1667            }))
1668        } else {
1669            // A missing column, treating it as null.
1670            self.build_always_true()
1671        }
1672    }
1673}
1674
1675/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader.
1676pub struct ArrowFileReader {
1677    meta: FileMetadata,
1678    preload_column_index: bool,
1679    preload_offset_index: bool,
1680    preload_page_index: bool,
1681    metadata_size_hint: Option<usize>,
1682    r: Box<dyn FileRead>,
1683}
1684
1685impl ArrowFileReader {
1686    /// Create a new ArrowFileReader
1687    pub fn new(meta: FileMetadata, r: Box<dyn FileRead>) -> Self {
1688        Self {
1689            meta,
1690            preload_column_index: false,
1691            preload_offset_index: false,
1692            preload_page_index: false,
1693            metadata_size_hint: None,
1694            r,
1695        }
1696    }
1697
1698    /// Enable or disable preloading of the column index
1699    pub fn with_preload_column_index(mut self, preload: bool) -> Self {
1700        self.preload_column_index = preload;
1701        self
1702    }
1703
1704    /// Enable or disable preloading of the offset index
1705    pub fn with_preload_offset_index(mut self, preload: bool) -> Self {
1706        self.preload_offset_index = preload;
1707        self
1708    }
1709
1710    /// Enable or disable preloading of the page index
1711    pub fn with_preload_page_index(mut self, preload: bool) -> Self {
1712        self.preload_page_index = preload;
1713        self
1714    }
1715
1716    /// Provide a hint as to the number of bytes to prefetch for parsing the Parquet metadata
1717    ///
1718    /// This hint can help reduce the number of fetch requests. For more details see the
1719    /// [ParquetMetaDataReader documentation](https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html#method.with_prefetch_hint).
1720    pub fn with_metadata_size_hint(mut self, hint: usize) -> Self {
1721        self.metadata_size_hint = Some(hint);
1722        self
1723    }
1724}
1725
1726impl AsyncFileReader for ArrowFileReader {
1727    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
1728        Box::pin(
1729            self.r
1730                .read(range.start..range.end)
1731                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
1732        )
1733    }
1734
1735    // TODO: currently we don't respect `ArrowReaderOptions` cause it don't expose any method to access the option field
1736    // we will fix it after `v55.1.0` is released in https://github.com/apache/arrow-rs/issues/7393
1737    fn get_metadata(
1738        &mut self,
1739        _options: Option<&'_ ArrowReaderOptions>,
1740    ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
1741        async move {
1742            let reader = ParquetMetaDataReader::new()
1743                .with_prefetch_hint(self.metadata_size_hint)
1744                // Set the page policy first because it updates both column and offset policies.
1745                .with_page_index_policy(PageIndexPolicy::from(self.preload_page_index))
1746                .with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
1747                .with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index));
1748            let size = self.meta.size;
1749            let meta = reader.load_and_finish(self, size).await?;
1750
1751            Ok(Arc::new(meta))
1752        }
1753        .boxed()
1754    }
1755}
1756
1757/// The Arrow type of an array that the Parquet reader reads may not match the exact Arrow type
1758/// that Iceberg uses for literals - but they are effectively the same logical type,
1759/// i.e. LargeUtf8 and Utf8 or Utf8View and Utf8 or Utf8View and LargeUtf8.
1760///
1761/// The Arrow compute kernels that we use must match the type exactly, so first cast the literal
1762/// into the type of the batch we read from Parquet before sending it to the compute kernel.
1763fn try_cast_literal(
1764    literal: &Arc<dyn ArrowDatum + Send + Sync>,
1765    column_type: &DataType,
1766) -> std::result::Result<Arc<dyn ArrowDatum + Send + Sync>, ArrowError> {
1767    let literal_array = literal.get().0;
1768
1769    // No cast required
1770    if literal_array.data_type() == column_type {
1771        return Ok(Arc::clone(literal));
1772    }
1773
1774    let literal_array = cast(literal_array, column_type)?;
1775    Ok(Arc::new(Scalar::new(literal_array)))
1776}
1777
1778#[cfg(test)]
1779mod tests {
1780    use std::collections::{HashMap, HashSet};
1781    use std::fs::File;
1782    use std::sync::Arc;
1783
1784    use arrow_array::cast::AsArray;
1785    use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray};
1786    use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1787    use futures::TryStreamExt;
1788    use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
1789    use parquet::arrow::{ArrowWriter, ProjectionMask};
1790    use parquet::basic::Compression;
1791    use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1792    use parquet::file::properties::WriterProperties;
1793    use parquet::schema::parser::parse_message_type;
1794    use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
1795    use roaring::RoaringTreemap;
1796    use tempfile::TempDir;
1797
1798    use crate::ErrorKind;
1799    use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
1800    use crate::arrow::{ArrowReader, ArrowReaderBuilder};
1801    use crate::delete_vector::DeleteVector;
1802    use crate::expr::visitors::bound_predicate_visitor::visit;
1803    use crate::expr::{Bind, Predicate, Reference};
1804    use crate::io::FileIO;
1805    use crate::scan::{FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream};
1806    use crate::spec::{
1807        DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type,
1808    };
1809
1810    fn table_schema_simple() -> SchemaRef {
1811        Arc::new(
1812            Schema::builder()
1813                .with_schema_id(1)
1814                .with_identifier_field_ids(vec![2])
1815                .with_fields(vec![
1816                    NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1817                    NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1818                    NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1819                    NestedField::optional(4, "qux", Type::Primitive(PrimitiveType::Float)).into(),
1820                ])
1821                .build()
1822                .unwrap(),
1823        )
1824    }
1825
1826    #[test]
1827    fn test_collect_field_id() {
1828        let schema = table_schema_simple();
1829        let expr = Reference::new("qux").is_null();
1830        let bound_expr = expr.bind(schema, true).unwrap();
1831
1832        let mut visitor = CollectFieldIdVisitor {
1833            field_ids: HashSet::default(),
1834        };
1835        visit(&mut visitor, &bound_expr).unwrap();
1836
1837        let mut expected = HashSet::default();
1838        expected.insert(4_i32);
1839
1840        assert_eq!(visitor.field_ids, expected);
1841    }
1842
1843    #[test]
1844    fn test_collect_field_id_with_and() {
1845        let schema = table_schema_simple();
1846        let expr = Reference::new("qux")
1847            .is_null()
1848            .and(Reference::new("baz").is_null());
1849        let bound_expr = expr.bind(schema, true).unwrap();
1850
1851        let mut visitor = CollectFieldIdVisitor {
1852            field_ids: HashSet::default(),
1853        };
1854        visit(&mut visitor, &bound_expr).unwrap();
1855
1856        let mut expected = HashSet::default();
1857        expected.insert(4_i32);
1858        expected.insert(3);
1859
1860        assert_eq!(visitor.field_ids, expected);
1861    }
1862
1863    #[test]
1864    fn test_collect_field_id_with_or() {
1865        let schema = table_schema_simple();
1866        let expr = Reference::new("qux")
1867            .is_null()
1868            .or(Reference::new("baz").is_null());
1869        let bound_expr = expr.bind(schema, true).unwrap();
1870
1871        let mut visitor = CollectFieldIdVisitor {
1872            field_ids: HashSet::default(),
1873        };
1874        visit(&mut visitor, &bound_expr).unwrap();
1875
1876        let mut expected = HashSet::default();
1877        expected.insert(4_i32);
1878        expected.insert(3);
1879
1880        assert_eq!(visitor.field_ids, expected);
1881    }
1882
1883    #[test]
1884    fn test_arrow_projection_mask() {
1885        let schema = Arc::new(
1886            Schema::builder()
1887                .with_schema_id(1)
1888                .with_identifier_field_ids(vec![1])
1889                .with_fields(vec![
1890                    NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
1891                    NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
1892                    NestedField::optional(
1893                        3,
1894                        "c3",
1895                        Type::Primitive(PrimitiveType::Decimal {
1896                            precision: 38,
1897                            scale: 3,
1898                        }),
1899                    )
1900                    .into(),
1901                ])
1902                .build()
1903                .unwrap(),
1904        );
1905        let arrow_schema = Arc::new(ArrowSchema::new(vec![
1906            Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
1907                PARQUET_FIELD_ID_META_KEY.to_string(),
1908                "1".to_string(),
1909            )])),
1910            // Type not supported
1911            Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
1912                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
1913            ),
1914            // Precision is beyond the supported range
1915            Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
1916                PARQUET_FIELD_ID_META_KEY.to_string(),
1917                "3".to_string(),
1918            )])),
1919        ]));
1920
1921        let message_type = "
1922message schema {
1923  required binary c1 (STRING) = 1;
1924  optional int32 c2 (INTEGER(8,true)) = 2;
1925  optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
1926}
1927    ";
1928        let parquet_type = parse_message_type(message_type).expect("should parse schema");
1929        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
1930
1931        // Try projecting the fields c2 and c3 with the unsupported data types
1932        let err = ArrowReader::get_arrow_projection_mask(
1933            &[1, 2, 3],
1934            &schema,
1935            &parquet_schema,
1936            &arrow_schema,
1937            false,
1938        )
1939        .unwrap_err();
1940
1941        assert_eq!(err.kind(), ErrorKind::DataInvalid);
1942        assert_eq!(
1943            err.to_string(),
1944            "DataInvalid => Unsupported Arrow data type: Duration(µs)".to_string()
1945        );
1946
1947        // Omitting field c2, we still get an error due to c3 being selected
1948        let err = ArrowReader::get_arrow_projection_mask(
1949            &[1, 3],
1950            &schema,
1951            &parquet_schema,
1952            &arrow_schema,
1953            false,
1954        )
1955        .unwrap_err();
1956
1957        assert_eq!(err.kind(), ErrorKind::DataInvalid);
1958        assert_eq!(
1959            err.to_string(),
1960            "DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
1961        );
1962
1963        // Finally avoid selecting fields with unsupported data types
1964        let mask = ArrowReader::get_arrow_projection_mask(
1965            &[1],
1966            &schema,
1967            &parquet_schema,
1968            &arrow_schema,
1969            false,
1970        )
1971        .expect("Some ProjectionMask");
1972        assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
1973    }
1974
1975    #[tokio::test]
1976    async fn test_kleene_logic_or_behaviour() {
1977        // a IS NULL OR a = 'foo'
1978        let predicate = Reference::new("a")
1979            .is_null()
1980            .or(Reference::new("a").equal_to(Datum::string("foo")));
1981
1982        // Table data: [NULL, "foo", "bar"]
1983        let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
1984
1985        // Expected: [NULL, "foo"].
1986        let expected = vec![None, Some("foo".to_string())];
1987
1988        let (file_io, schema, table_location, _temp_dir) =
1989            setup_kleene_logic(data_for_col_a, DataType::Utf8);
1990        let reader = ArrowReaderBuilder::new(file_io).build();
1991
1992        let result_data = test_perform_read(predicate, schema, table_location, reader).await;
1993
1994        assert_eq!(result_data, expected);
1995    }
1996
1997    #[tokio::test]
1998    async fn test_kleene_logic_and_behaviour() {
1999        // a IS NOT NULL AND a != 'foo'
2000        let predicate = Reference::new("a")
2001            .is_not_null()
2002            .and(Reference::new("a").not_equal_to(Datum::string("foo")));
2003
2004        // Table data: [NULL, "foo", "bar"]
2005        let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
2006
2007        // Expected: ["bar"].
2008        let expected = vec![Some("bar".to_string())];
2009
2010        let (file_io, schema, table_location, _temp_dir) =
2011            setup_kleene_logic(data_for_col_a, DataType::Utf8);
2012        let reader = ArrowReaderBuilder::new(file_io).build();
2013
2014        let result_data = test_perform_read(predicate, schema, table_location, reader).await;
2015
2016        assert_eq!(result_data, expected);
2017    }
2018
2019    #[tokio::test]
2020    async fn test_predicate_cast_literal() {
2021        let predicates = vec![
2022            // a == 'foo'
2023            (Reference::new("a").equal_to(Datum::string("foo")), vec![
2024                Some("foo".to_string()),
2025            ]),
2026            // a != 'foo'
2027            (
2028                Reference::new("a").not_equal_to(Datum::string("foo")),
2029                vec![Some("bar".to_string())],
2030            ),
2031            // STARTS_WITH(a, 'foo')
2032            (Reference::new("a").starts_with(Datum::string("f")), vec![
2033                Some("foo".to_string()),
2034            ]),
2035            // NOT STARTS_WITH(a, 'foo')
2036            (
2037                Reference::new("a").not_starts_with(Datum::string("f")),
2038                vec![Some("bar".to_string())],
2039            ),
2040            // a < 'foo'
2041            (Reference::new("a").less_than(Datum::string("foo")), vec![
2042                Some("bar".to_string()),
2043            ]),
2044            // a <= 'foo'
2045            (
2046                Reference::new("a").less_than_or_equal_to(Datum::string("foo")),
2047                vec![Some("foo".to_string()), Some("bar".to_string())],
2048            ),
2049            // a > 'foo'
2050            (
2051                Reference::new("a").greater_than(Datum::string("bar")),
2052                vec![Some("foo".to_string())],
2053            ),
2054            // a >= 'foo'
2055            (
2056                Reference::new("a").greater_than_or_equal_to(Datum::string("foo")),
2057                vec![Some("foo".to_string())],
2058            ),
2059            // a IN ('foo', 'bar')
2060            (
2061                Reference::new("a").is_in([Datum::string("foo"), Datum::string("baz")]),
2062                vec![Some("foo".to_string())],
2063            ),
2064            // a NOT IN ('foo', 'bar')
2065            (
2066                Reference::new("a").is_not_in([Datum::string("foo"), Datum::string("baz")]),
2067                vec![Some("bar".to_string())],
2068            ),
2069        ];
2070
2071        // Table data: ["foo", "bar"]
2072        let data_for_col_a = vec![Some("foo".to_string()), Some("bar".to_string())];
2073
2074        let (file_io, schema, table_location, _temp_dir) =
2075            setup_kleene_logic(data_for_col_a, DataType::LargeUtf8);
2076        let reader = ArrowReaderBuilder::new(file_io).build();
2077
2078        for (predicate, expected) in predicates {
2079            println!("testing predicate {predicate}");
2080            let result_data = test_perform_read(
2081                predicate.clone(),
2082                schema.clone(),
2083                table_location.clone(),
2084                reader.clone(),
2085            )
2086            .await;
2087
2088            assert_eq!(result_data, expected, "predicate={predicate}");
2089        }
2090    }
2091
2092    async fn test_perform_read(
2093        predicate: Predicate,
2094        schema: SchemaRef,
2095        table_location: String,
2096        reader: ArrowReader,
2097    ) -> Vec<Option<String>> {
2098        let tasks = Box::pin(futures::stream::iter(
2099            vec![Ok(FileScanTask {
2100                start: 0,
2101                length: 0,
2102                record_count: None,
2103                data_file_path: format!("{table_location}/1.parquet"),
2104                data_file_format: DataFileFormat::Parquet,
2105                schema: schema.clone(),
2106                project_field_ids: vec![1],
2107                predicate: Some(predicate.bind(schema, true).unwrap()),
2108                deletes: vec![],
2109                partition: None,
2110                partition_spec: None,
2111                name_mapping: None,
2112                case_sensitive: false,
2113            })]
2114            .into_iter(),
2115        )) as FileScanTaskStream;
2116
2117        let result = reader
2118            .read(tasks)
2119            .unwrap()
2120            .try_collect::<Vec<RecordBatch>>()
2121            .await
2122            .unwrap();
2123
2124        result[0].columns()[0]
2125            .as_string_opt::<i32>()
2126            .unwrap()
2127            .iter()
2128            .map(|v| v.map(ToOwned::to_owned))
2129            .collect::<Vec<_>>()
2130    }
2131
2132    fn setup_kleene_logic(
2133        data_for_col_a: Vec<Option<String>>,
2134        col_a_type: DataType,
2135    ) -> (FileIO, SchemaRef, String, TempDir) {
2136        let schema = Arc::new(
2137            Schema::builder()
2138                .with_schema_id(1)
2139                .with_fields(vec![
2140                    NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(),
2141                ])
2142                .build()
2143                .unwrap(),
2144        );
2145
2146        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2147            Field::new("a", col_a_type.clone(), true).with_metadata(HashMap::from([(
2148                PARQUET_FIELD_ID_META_KEY.to_string(),
2149                "1".to_string(),
2150            )])),
2151        ]));
2152
2153        let tmp_dir = TempDir::new().unwrap();
2154        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2155
2156        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2157
2158        let col = match col_a_type {
2159            DataType::Utf8 => Arc::new(StringArray::from(data_for_col_a)) as ArrayRef,
2160            DataType::LargeUtf8 => Arc::new(LargeStringArray::from(data_for_col_a)) as ArrayRef,
2161            _ => panic!("unexpected col_a_type"),
2162        };
2163
2164        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col]).unwrap();
2165
2166        // Write the Parquet files
2167        let props = WriterProperties::builder()
2168            .set_compression(Compression::SNAPPY)
2169            .build();
2170
2171        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
2172        let mut writer =
2173            ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
2174
2175        writer.write(&to_write).expect("Writing batch");
2176
2177        // writer must be closed to write footer
2178        writer.close().unwrap();
2179
2180        (file_io, schema, table_location, tmp_dir)
2181    }
2182
2183    #[test]
2184    fn test_build_deletes_row_selection() {
2185        let schema_descr = get_test_schema_descr();
2186
2187        let mut columns = vec![];
2188        for ptr in schema_descr.columns() {
2189            let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
2190            columns.push(column);
2191        }
2192
2193        let row_groups_metadata = vec![
2194            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 0),
2195            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 1),
2196            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 2),
2197            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 3),
2198            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 4),
2199        ];
2200
2201        let selected_row_groups = Some(vec![1, 3]);
2202
2203        /* cases to cover:
2204           * {skip|select} {first|intermediate|last} {one row|multiple rows} in
2205             {first|intermediate|last} {skipped|selected} row group
2206           * row group selection disabled
2207        */
2208
2209        let positional_deletes = RoaringTreemap::from_iter(&[
2210            1, // in skipped rg 0, should be ignored
2211            3, // run of three consecutive items in skipped rg0
2212            4, 5, 998, // two consecutive items at end of skipped rg0
2213            999, 1000, // solitary row at start of selected rg1 (1, 9)
2214            1010, // run of 3 rows in selected rg1
2215            1011, 1012, // (3, 485)
2216            1498, // run of two items at end of selected rg1
2217            1499, 1500, // run of two items at start of skipped rg2
2218            1501, 1600, // should ignore, in skipped rg2
2219            1999, // single row at end of skipped rg2
2220            2000, // run of two items at start of selected rg3
2221            2001, // (4, 98)
2222            2100, // single row in selected row group 3 (1, 99)
2223            2200, // run of 3 consecutive rows in selected row group 3
2224            2201, 2202, // (3, 796)
2225            2999, // single item at end of selected rg3 (1)
2226            3000, // single item at start of skipped rg4
2227        ]);
2228
2229        let positional_deletes = DeleteVector::new(positional_deletes);
2230
2231        // using selected row groups 1 and 3
2232        let result = ArrowReader::build_deletes_row_selection(
2233            &row_groups_metadata,
2234            &selected_row_groups,
2235            &positional_deletes,
2236        )
2237        .unwrap();
2238
2239        let expected = RowSelection::from(vec![
2240            RowSelector::skip(1),
2241            RowSelector::select(9),
2242            RowSelector::skip(3),
2243            RowSelector::select(485),
2244            RowSelector::skip(4),
2245            RowSelector::select(98),
2246            RowSelector::skip(1),
2247            RowSelector::select(99),
2248            RowSelector::skip(3),
2249            RowSelector::select(796),
2250            RowSelector::skip(1),
2251        ]);
2252
2253        assert_eq!(result, expected);
2254
2255        // selecting all row groups
2256        let result = ArrowReader::build_deletes_row_selection(
2257            &row_groups_metadata,
2258            &None,
2259            &positional_deletes,
2260        )
2261        .unwrap();
2262
2263        let expected = RowSelection::from(vec![
2264            RowSelector::select(1),
2265            RowSelector::skip(1),
2266            RowSelector::select(1),
2267            RowSelector::skip(3),
2268            RowSelector::select(992),
2269            RowSelector::skip(3),
2270            RowSelector::select(9),
2271            RowSelector::skip(3),
2272            RowSelector::select(485),
2273            RowSelector::skip(4),
2274            RowSelector::select(98),
2275            RowSelector::skip(1),
2276            RowSelector::select(398),
2277            RowSelector::skip(3),
2278            RowSelector::select(98),
2279            RowSelector::skip(1),
2280            RowSelector::select(99),
2281            RowSelector::skip(3),
2282            RowSelector::select(796),
2283            RowSelector::skip(2),
2284            RowSelector::select(499),
2285        ]);
2286
2287        assert_eq!(result, expected);
2288    }
2289
2290    fn build_test_row_group_meta(
2291        schema_descr: SchemaDescPtr,
2292        columns: Vec<ColumnChunkMetaData>,
2293        num_rows: i64,
2294        ordinal: i16,
2295    ) -> RowGroupMetaData {
2296        RowGroupMetaData::builder(schema_descr.clone())
2297            .set_num_rows(num_rows)
2298            .set_total_byte_size(2000)
2299            .set_column_metadata(columns)
2300            .set_ordinal(ordinal)
2301            .build()
2302            .unwrap()
2303    }
2304
2305    fn get_test_schema_descr() -> SchemaDescPtr {
2306        use parquet::schema::types::Type as SchemaType;
2307
2308        let schema = SchemaType::group_type_builder("schema")
2309            .with_fields(vec![
2310                Arc::new(
2311                    SchemaType::primitive_type_builder("a", parquet::basic::Type::INT32)
2312                        .build()
2313                        .unwrap(),
2314                ),
2315                Arc::new(
2316                    SchemaType::primitive_type_builder("b", parquet::basic::Type::INT32)
2317                        .build()
2318                        .unwrap(),
2319                ),
2320            ])
2321            .build()
2322            .unwrap();
2323
2324        Arc::new(SchemaDescriptor::new(Arc::new(schema)))
2325    }
2326
2327    /// Verifies that file splits respect byte ranges and only read specific row groups.
2328    #[tokio::test]
2329    async fn test_file_splits_respect_byte_ranges() {
2330        use arrow_array::Int32Array;
2331        use parquet::file::reader::{FileReader, SerializedFileReader};
2332
2333        let schema = Arc::new(
2334            Schema::builder()
2335                .with_schema_id(1)
2336                .with_fields(vec![
2337                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2338                ])
2339                .build()
2340                .unwrap(),
2341        );
2342
2343        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2344            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2345                PARQUET_FIELD_ID_META_KEY.to_string(),
2346                "1".to_string(),
2347            )])),
2348        ]));
2349
2350        let tmp_dir = TempDir::new().unwrap();
2351        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2352        let file_path = format!("{table_location}/multi_row_group.parquet");
2353
2354        // Force each batch into its own row group for testing byte range filtering.
2355        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2356            (0..100).collect::<Vec<i32>>(),
2357        ))])
2358        .unwrap();
2359        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2360            (100..200).collect::<Vec<i32>>(),
2361        ))])
2362        .unwrap();
2363        let batch3 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2364            (200..300).collect::<Vec<i32>>(),
2365        ))])
2366        .unwrap();
2367
2368        let props = WriterProperties::builder()
2369            .set_compression(Compression::SNAPPY)
2370            .set_max_row_group_size(100)
2371            .build();
2372
2373        let file = File::create(&file_path).unwrap();
2374        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2375        writer.write(&batch1).expect("Writing batch 1");
2376        writer.write(&batch2).expect("Writing batch 2");
2377        writer.write(&batch3).expect("Writing batch 3");
2378        writer.close().unwrap();
2379
2380        // Read the file metadata to get row group byte positions
2381        let file = File::open(&file_path).unwrap();
2382        let reader = SerializedFileReader::new(file).unwrap();
2383        let metadata = reader.metadata();
2384
2385        println!("File has {} row groups", metadata.num_row_groups());
2386        assert_eq!(metadata.num_row_groups(), 3, "Expected 3 row groups");
2387
2388        // Get byte positions for each row group
2389        let row_group_0 = metadata.row_group(0);
2390        let row_group_1 = metadata.row_group(1);
2391        let row_group_2 = metadata.row_group(2);
2392
2393        let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
2394        let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2395        let rg2_start = rg1_start + row_group_1.compressed_size() as u64;
2396        let file_end = rg2_start + row_group_2.compressed_size() as u64;
2397
2398        println!(
2399            "Row group 0: {} rows, starts at byte {}, {} bytes compressed",
2400            row_group_0.num_rows(),
2401            rg0_start,
2402            row_group_0.compressed_size()
2403        );
2404        println!(
2405            "Row group 1: {} rows, starts at byte {}, {} bytes compressed",
2406            row_group_1.num_rows(),
2407            rg1_start,
2408            row_group_1.compressed_size()
2409        );
2410        println!(
2411            "Row group 2: {} rows, starts at byte {}, {} bytes compressed",
2412            row_group_2.num_rows(),
2413            rg2_start,
2414            row_group_2.compressed_size()
2415        );
2416
2417        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2418        let reader = ArrowReaderBuilder::new(file_io).build();
2419
2420        // Task 1: read only the first row group
2421        let task1 = FileScanTask {
2422            start: rg0_start,
2423            length: row_group_0.compressed_size() as u64,
2424            record_count: Some(100),
2425            data_file_path: file_path.clone(),
2426            data_file_format: DataFileFormat::Parquet,
2427            schema: schema.clone(),
2428            project_field_ids: vec![1],
2429            predicate: None,
2430            deletes: vec![],
2431            partition: None,
2432            partition_spec: None,
2433            name_mapping: None,
2434            case_sensitive: false,
2435        };
2436
2437        // Task 2: read the second and third row groups
2438        let task2 = FileScanTask {
2439            start: rg1_start,
2440            length: file_end - rg1_start,
2441            record_count: Some(200),
2442            data_file_path: file_path.clone(),
2443            data_file_format: DataFileFormat::Parquet,
2444            schema: schema.clone(),
2445            project_field_ids: vec![1],
2446            predicate: None,
2447            deletes: vec![],
2448            partition: None,
2449            partition_spec: None,
2450            name_mapping: None,
2451            case_sensitive: false,
2452        };
2453
2454        let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
2455        let result1 = reader
2456            .clone()
2457            .read(tasks1)
2458            .unwrap()
2459            .try_collect::<Vec<RecordBatch>>()
2460            .await
2461            .unwrap();
2462
2463        let total_rows_task1: usize = result1.iter().map(|b| b.num_rows()).sum();
2464        println!(
2465            "Task 1 (bytes {}-{}) returned {} rows",
2466            rg0_start,
2467            rg0_start + row_group_0.compressed_size() as u64,
2468            total_rows_task1
2469        );
2470
2471        let tasks2 = Box::pin(futures::stream::iter(vec![Ok(task2)])) as FileScanTaskStream;
2472        let result2 = reader
2473            .read(tasks2)
2474            .unwrap()
2475            .try_collect::<Vec<RecordBatch>>()
2476            .await
2477            .unwrap();
2478
2479        let total_rows_task2: usize = result2.iter().map(|b| b.num_rows()).sum();
2480        println!("Task 2 (bytes {rg1_start}-{file_end}) returned {total_rows_task2} rows");
2481
2482        assert_eq!(
2483            total_rows_task1, 100,
2484            "Task 1 should read only the first row group (100 rows), but got {total_rows_task1} rows"
2485        );
2486
2487        assert_eq!(
2488            total_rows_task2, 200,
2489            "Task 2 should read only the second+third row groups (200 rows), but got {total_rows_task2} rows"
2490        );
2491
2492        // Verify the actual data values are correct (not just the row count)
2493        if total_rows_task1 > 0 {
2494            let first_batch = &result1[0];
2495            let id_col = first_batch
2496                .column(0)
2497                .as_primitive::<arrow_array::types::Int32Type>();
2498            let first_val = id_col.value(0);
2499            let last_val = id_col.value(id_col.len() - 1);
2500            println!("Task 1 data range: {first_val} to {last_val}");
2501
2502            assert_eq!(first_val, 0, "Task 1 should start with id=0");
2503            assert_eq!(last_val, 99, "Task 1 should end with id=99");
2504        }
2505
2506        if total_rows_task2 > 0 {
2507            let first_batch = &result2[0];
2508            let id_col = first_batch
2509                .column(0)
2510                .as_primitive::<arrow_array::types::Int32Type>();
2511            let first_val = id_col.value(0);
2512            println!("Task 2 first value: {first_val}");
2513
2514            assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0");
2515        }
2516    }
2517
2518    /// Test schema evolution: reading old Parquet file (with only column 'a')
2519    /// using a newer table schema (with columns 'a' and 'b').
2520    /// This tests that:
2521    /// 1. get_arrow_projection_mask allows missing columns
2522    /// 2. RecordBatchTransformer adds missing column 'b' with NULL values
2523    #[tokio::test]
2524    async fn test_schema_evolution_add_column() {
2525        use arrow_array::{Array, Int32Array};
2526
2527        // New table schema: columns 'a' and 'b' (b was added later, file only has 'a')
2528        let new_schema = Arc::new(
2529            Schema::builder()
2530                .with_schema_id(2)
2531                .with_fields(vec![
2532                    NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)).into(),
2533                    NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
2534                ])
2535                .build()
2536                .unwrap(),
2537        );
2538
2539        // Create Arrow schema for old Parquet file (only has column 'a')
2540        let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
2541            Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([(
2542                PARQUET_FIELD_ID_META_KEY.to_string(),
2543                "1".to_string(),
2544            )])),
2545        ]));
2546
2547        // Write old Parquet file with only column 'a'
2548        let tmp_dir = TempDir::new().unwrap();
2549        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2550        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2551
2552        let data_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
2553        let to_write = RecordBatch::try_new(arrow_schema_old.clone(), vec![data_a]).unwrap();
2554
2555        let props = WriterProperties::builder()
2556            .set_compression(Compression::SNAPPY)
2557            .build();
2558        let file = File::create(format!("{table_location}/old_file.parquet")).unwrap();
2559        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
2560        writer.write(&to_write).expect("Writing batch");
2561        writer.close().unwrap();
2562
2563        // Read the old Parquet file using the NEW schema (with column 'b')
2564        let reader = ArrowReaderBuilder::new(file_io).build();
2565        let tasks = Box::pin(futures::stream::iter(
2566            vec![Ok(FileScanTask {
2567                start: 0,
2568                length: 0,
2569                record_count: None,
2570                data_file_path: format!("{table_location}/old_file.parquet"),
2571                data_file_format: DataFileFormat::Parquet,
2572                schema: new_schema.clone(),
2573                project_field_ids: vec![1, 2], // Request both columns 'a' and 'b'
2574                predicate: None,
2575                deletes: vec![],
2576                partition: None,
2577                partition_spec: None,
2578                name_mapping: None,
2579                case_sensitive: false,
2580            })]
2581            .into_iter(),
2582        )) as FileScanTaskStream;
2583
2584        let result = reader
2585            .read(tasks)
2586            .unwrap()
2587            .try_collect::<Vec<RecordBatch>>()
2588            .await
2589            .unwrap();
2590
2591        // Verify we got the correct data
2592        assert_eq!(result.len(), 1);
2593        let batch = &result[0];
2594
2595        // Should have 2 columns now
2596        assert_eq!(batch.num_columns(), 2);
2597        assert_eq!(batch.num_rows(), 3);
2598
2599        // Column 'a' should have the original data
2600        let col_a = batch
2601            .column(0)
2602            .as_primitive::<arrow_array::types::Int32Type>();
2603        assert_eq!(col_a.values(), &[1, 2, 3]);
2604
2605        // Column 'b' should be all NULLs (it didn't exist in the old file)
2606        let col_b = batch
2607            .column(1)
2608            .as_primitive::<arrow_array::types::Int32Type>();
2609        assert_eq!(col_b.null_count(), 3);
2610        assert!(col_b.is_null(0));
2611        assert!(col_b.is_null(1));
2612        assert!(col_b.is_null(2));
2613    }
2614
2615    /// Test for bug where position deletes in later row groups are not applied correctly.
2616    ///
2617    /// When a file has multiple row groups and a position delete targets a row in a later
2618    /// row group, the `build_deletes_row_selection` function had a bug where it would
2619    /// fail to increment `current_row_group_base_idx` when skipping row groups.
2620    ///
2621    /// This test creates:
2622    /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
2623    /// - A position delete file that deletes row 199 (last row in second row group)
2624    ///
2625    /// Expected behavior: Should return 199 rows (with id=200 deleted)
2626    /// Bug behavior: Returns 200 rows (delete is not applied)
2627    ///
2628    /// This bug was discovered while running Apache Spark + Apache Iceberg integration tests
2629    /// through DataFusion Comet. The following Iceberg Java tests failed due to this bug:
2630    /// - `org.apache.iceberg.spark.extensions.TestMergeOnReadDelete::testDeleteWithMultipleRowGroupsParquet`
2631    /// - `org.apache.iceberg.spark.extensions.TestMergeOnReadUpdate::testUpdateWithMultipleRowGroupsParquet`
2632    #[tokio::test]
2633    async fn test_position_delete_across_multiple_row_groups() {
2634        use arrow_array::{Int32Array, Int64Array};
2635        use parquet::file::reader::{FileReader, SerializedFileReader};
2636
2637        // Field IDs for positional delete schema
2638        const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2639        const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2640
2641        let tmp_dir = TempDir::new().unwrap();
2642        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2643
2644        // Create table schema with a single 'id' column
2645        let table_schema = Arc::new(
2646            Schema::builder()
2647                .with_schema_id(1)
2648                .with_fields(vec![
2649                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2650                ])
2651                .build()
2652                .unwrap(),
2653        );
2654
2655        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2656            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2657                PARQUET_FIELD_ID_META_KEY.to_string(),
2658                "1".to_string(),
2659            )])),
2660        ]));
2661
2662        // Step 1: Create data file with 200 rows in 2 row groups
2663        // Row group 0: rows 0-99 (ids 1-100)
2664        // Row group 1: rows 100-199 (ids 101-200)
2665        let data_file_path = format!("{table_location}/data.parquet");
2666
2667        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2668            Int32Array::from_iter_values(1..=100),
2669        )])
2670        .unwrap();
2671
2672        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2673            Int32Array::from_iter_values(101..=200),
2674        )])
2675        .unwrap();
2676
2677        // Force each batch into its own row group
2678        let props = WriterProperties::builder()
2679            .set_compression(Compression::SNAPPY)
2680            .set_max_row_group_size(100)
2681            .build();
2682
2683        let file = File::create(&data_file_path).unwrap();
2684        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2685        writer.write(&batch1).expect("Writing batch 1");
2686        writer.write(&batch2).expect("Writing batch 2");
2687        writer.close().unwrap();
2688
2689        // Verify we created 2 row groups
2690        let verify_file = File::open(&data_file_path).unwrap();
2691        let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2692        assert_eq!(
2693            verify_reader.metadata().num_row_groups(),
2694            2,
2695            "Should have 2 row groups"
2696        );
2697
2698        // Step 2: Create position delete file that deletes row 199 (id=200, last row in row group 1)
2699        let delete_file_path = format!("{table_location}/deletes.parquet");
2700
2701        let delete_schema = Arc::new(ArrowSchema::new(vec![
2702            Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2703                PARQUET_FIELD_ID_META_KEY.to_string(),
2704                FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2705            )])),
2706            Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2707                PARQUET_FIELD_ID_META_KEY.to_string(),
2708                FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2709            )])),
2710        ]));
2711
2712        // Delete row at position 199 (0-indexed, so it's the last row: id=200)
2713        let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2714            Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2715            Arc::new(Int64Array::from_iter_values(vec![199i64])),
2716        ])
2717        .unwrap();
2718
2719        let delete_props = WriterProperties::builder()
2720            .set_compression(Compression::SNAPPY)
2721            .build();
2722
2723        let delete_file = File::create(&delete_file_path).unwrap();
2724        let mut delete_writer =
2725            ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2726        delete_writer.write(&delete_batch).unwrap();
2727        delete_writer.close().unwrap();
2728
2729        // Step 3: Read the data file with the delete applied
2730        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2731        let reader = ArrowReaderBuilder::new(file_io).build();
2732
2733        let task = FileScanTask {
2734            start: 0,
2735            length: 0,
2736            record_count: Some(200),
2737            data_file_path: data_file_path.clone(),
2738            data_file_format: DataFileFormat::Parquet,
2739            schema: table_schema.clone(),
2740            project_field_ids: vec![1],
2741            predicate: None,
2742            deletes: vec![FileScanTaskDeleteFile {
2743                file_path: delete_file_path,
2744                file_type: DataContentType::PositionDeletes,
2745                partition_spec_id: 0,
2746                equality_ids: None,
2747            }],
2748            partition: None,
2749            partition_spec: None,
2750            name_mapping: None,
2751            case_sensitive: false,
2752        };
2753
2754        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
2755        let result = reader
2756            .read(tasks)
2757            .unwrap()
2758            .try_collect::<Vec<RecordBatch>>()
2759            .await
2760            .unwrap();
2761
2762        // Step 4: Verify we got 199 rows (not 200)
2763        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
2764
2765        println!("Total rows read: {total_rows}");
2766        println!("Expected: 199 rows (deleted row 199 which had id=200)");
2767
2768        // This assertion will FAIL before the fix and PASS after the fix
2769        assert_eq!(
2770            total_rows, 199,
2771            "Expected 199 rows after deleting row 199, but got {total_rows} rows. \
2772             The bug causes position deletes in later row groups to be ignored."
2773        );
2774
2775        // Verify the deleted row (id=200) is not present
2776        let all_ids: Vec<i32> = result
2777            .iter()
2778            .flat_map(|batch| {
2779                batch
2780                    .column(0)
2781                    .as_primitive::<arrow_array::types::Int32Type>()
2782                    .values()
2783                    .iter()
2784                    .copied()
2785            })
2786            .collect();
2787
2788        assert!(
2789            !all_ids.contains(&200),
2790            "Row with id=200 should be deleted but was found in results"
2791        );
2792
2793        // Verify we have all other ids (1-199)
2794        let expected_ids: Vec<i32> = (1..=199).collect();
2795        assert_eq!(
2796            all_ids, expected_ids,
2797            "Should have ids 1-199 but got different values"
2798        );
2799    }
2800
2801    /// Test for bug where position deletes are lost when skipping unselected row groups.
2802    ///
2803    /// This is a variant of `test_position_delete_across_multiple_row_groups` that exercises
2804    /// the row group selection code path (`selected_row_groups: Some([...])`).
2805    ///
2806    /// When a file has multiple row groups and only some are selected for reading,
2807    /// the `build_deletes_row_selection` function must correctly skip over deletes in
2808    /// unselected row groups WITHOUT consuming deletes that belong to selected row groups.
2809    ///
2810    /// This test creates:
2811    /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
2812    /// - A position delete file that deletes row 199 (last row in second row group)
2813    /// - Row group selection that reads ONLY row group 1 (rows 100-199)
2814    ///
2815    /// Expected behavior: Should return 99 rows (with row 199 deleted)
2816    /// Bug behavior: Returns 100 rows (delete is lost when skipping row group 0)
2817    ///
2818    /// The bug occurs when processing row group 0 (unselected):
2819    /// ```rust
2820    /// delete_vector_iter.advance_to(next_row_group_base_idx); // Position at first delete >= 100
2821    /// next_deleted_row_idx_opt = delete_vector_iter.next(); // BUG: Consumes delete at 199!
2822    /// ```
2823    ///
2824    /// The fix is to NOT call `next()` after `advance_to()` when skipping unselected row groups,
2825    /// because `advance_to()` already positions the iterator correctly without consuming elements.
2826    #[tokio::test]
2827    async fn test_position_delete_with_row_group_selection() {
2828        use arrow_array::{Int32Array, Int64Array};
2829        use parquet::file::reader::{FileReader, SerializedFileReader};
2830
2831        // Field IDs for positional delete schema
2832        const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2833        const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2834
2835        let tmp_dir = TempDir::new().unwrap();
2836        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2837
2838        // Create table schema with a single 'id' column
2839        let table_schema = Arc::new(
2840            Schema::builder()
2841                .with_schema_id(1)
2842                .with_fields(vec![
2843                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2844                ])
2845                .build()
2846                .unwrap(),
2847        );
2848
2849        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2850            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2851                PARQUET_FIELD_ID_META_KEY.to_string(),
2852                "1".to_string(),
2853            )])),
2854        ]));
2855
2856        // Step 1: Create data file with 200 rows in 2 row groups
2857        // Row group 0: rows 0-99 (ids 1-100)
2858        // Row group 1: rows 100-199 (ids 101-200)
2859        let data_file_path = format!("{table_location}/data.parquet");
2860
2861        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2862            Int32Array::from_iter_values(1..=100),
2863        )])
2864        .unwrap();
2865
2866        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2867            Int32Array::from_iter_values(101..=200),
2868        )])
2869        .unwrap();
2870
2871        // Force each batch into its own row group
2872        let props = WriterProperties::builder()
2873            .set_compression(Compression::SNAPPY)
2874            .set_max_row_group_size(100)
2875            .build();
2876
2877        let file = File::create(&data_file_path).unwrap();
2878        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2879        writer.write(&batch1).expect("Writing batch 1");
2880        writer.write(&batch2).expect("Writing batch 2");
2881        writer.close().unwrap();
2882
2883        // Verify we created 2 row groups
2884        let verify_file = File::open(&data_file_path).unwrap();
2885        let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2886        assert_eq!(
2887            verify_reader.metadata().num_row_groups(),
2888            2,
2889            "Should have 2 row groups"
2890        );
2891
2892        // Step 2: Create position delete file that deletes row 199 (id=200, last row in row group 1)
2893        let delete_file_path = format!("{table_location}/deletes.parquet");
2894
2895        let delete_schema = Arc::new(ArrowSchema::new(vec![
2896            Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2897                PARQUET_FIELD_ID_META_KEY.to_string(),
2898                FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2899            )])),
2900            Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2901                PARQUET_FIELD_ID_META_KEY.to_string(),
2902                FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2903            )])),
2904        ]));
2905
2906        // Delete row at position 199 (0-indexed, so it's the last row: id=200)
2907        let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2908            Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2909            Arc::new(Int64Array::from_iter_values(vec![199i64])),
2910        ])
2911        .unwrap();
2912
2913        let delete_props = WriterProperties::builder()
2914            .set_compression(Compression::SNAPPY)
2915            .build();
2916
2917        let delete_file = File::create(&delete_file_path).unwrap();
2918        let mut delete_writer =
2919            ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2920        delete_writer.write(&delete_batch).unwrap();
2921        delete_writer.close().unwrap();
2922
2923        // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199)
2924        // This exercises the row group selection code path where row group 0 is skipped
2925        let metadata_file = File::open(&data_file_path).unwrap();
2926        let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
2927        let metadata = metadata_reader.metadata();
2928
2929        let row_group_0 = metadata.row_group(0);
2930        let row_group_1 = metadata.row_group(1);
2931
2932        let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
2933        let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2934        let rg1_length = row_group_1.compressed_size() as u64;
2935
2936        println!(
2937            "Row group 0: starts at byte {}, {} bytes compressed",
2938            rg0_start,
2939            row_group_0.compressed_size()
2940        );
2941        println!(
2942            "Row group 1: starts at byte {}, {} bytes compressed",
2943            rg1_start,
2944            row_group_1.compressed_size()
2945        );
2946
2947        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2948        let reader = ArrowReaderBuilder::new(file_io).build();
2949
2950        // Create FileScanTask that reads ONLY row group 1 via byte range filtering
2951        let task = FileScanTask {
2952            start: rg1_start,
2953            length: rg1_length,
2954            record_count: Some(100), // Row group 1 has 100 rows
2955            data_file_path: data_file_path.clone(),
2956            data_file_format: DataFileFormat::Parquet,
2957            schema: table_schema.clone(),
2958            project_field_ids: vec![1],
2959            predicate: None,
2960            deletes: vec![FileScanTaskDeleteFile {
2961                file_path: delete_file_path,
2962                file_type: DataContentType::PositionDeletes,
2963                partition_spec_id: 0,
2964                equality_ids: None,
2965            }],
2966            partition: None,
2967            partition_spec: None,
2968            name_mapping: None,
2969            case_sensitive: false,
2970        };
2971
2972        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
2973        let result = reader
2974            .read(tasks)
2975            .unwrap()
2976            .try_collect::<Vec<RecordBatch>>()
2977            .await
2978            .unwrap();
2979
2980        // Step 4: Verify we got 99 rows (not 100)
2981        // Row group 1 has 100 rows (ids 101-200), minus 1 delete (id=200) = 99 rows
2982        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
2983
2984        println!("Total rows read from row group 1: {total_rows}");
2985        println!("Expected: 99 rows (row group 1 has 100 rows, 1 delete at position 199)");
2986
2987        // This assertion will FAIL before the fix and PASS after the fix
2988        assert_eq!(
2989            total_rows, 99,
2990            "Expected 99 rows from row group 1 after deleting position 199, but got {total_rows} rows. \
2991             The bug causes position deletes to be lost when advance_to() is followed by next() \
2992             when skipping unselected row groups."
2993        );
2994
2995        // Verify the deleted row (id=200) is not present
2996        let all_ids: Vec<i32> = result
2997            .iter()
2998            .flat_map(|batch| {
2999                batch
3000                    .column(0)
3001                    .as_primitive::<arrow_array::types::Int32Type>()
3002                    .values()
3003                    .iter()
3004                    .copied()
3005            })
3006            .collect();
3007
3008        assert!(
3009            !all_ids.contains(&200),
3010            "Row with id=200 should be deleted but was found in results"
3011        );
3012
3013        // Verify we have ids 101-199 (not 101-200)
3014        let expected_ids: Vec<i32> = (101..=199).collect();
3015        assert_eq!(
3016            all_ids, expected_ids,
3017            "Should have ids 101-199 but got different values"
3018        );
3019    }
3020    /// Test for bug where stale cached delete causes infinite loop when skipping row groups.
3021    ///
3022    /// This test exposes the inverse scenario of `test_position_delete_with_row_group_selection`:
3023    /// - Position delete targets a row in the SKIPPED row group (not the selected one)
3024    /// - After calling advance_to(), the cached delete index is stale
3025    /// - Without updating the cache, the code enters an infinite loop
3026    ///
3027    /// This test creates:
3028    /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
3029    /// - A position delete file that deletes row 0 (first row in SKIPPED row group 0)
3030    /// - Row group selection that reads ONLY row group 1 (rows 100-199)
3031    ///
3032    /// The bug occurs when skipping row group 0:
3033    /// ```rust
3034    /// let mut next_deleted_row_idx_opt = delete_vector_iter.next(); // Some(0)
3035    /// // ... skip to row group 1 ...
3036    /// delete_vector_iter.advance_to(100); // Iterator advances past delete at 0
3037    /// // BUG: next_deleted_row_idx_opt is still Some(0) - STALE!
3038    /// // When processing row group 1:
3039    /// //   current_idx = 100, next_deleted_row_idx = 0, next_row_group_base_idx = 200
3040    /// //   Loop condition: 0 < 200 (true)
3041    /// //   But: current_idx (100) > next_deleted_row_idx (0)
3042    /// //   And: current_idx (100) != next_deleted_row_idx (0)
3043    /// //   Neither branch executes -> INFINITE LOOP!
3044    /// ```
3045    ///
3046    /// Expected behavior: Should return 100 rows (delete at 0 doesn't affect row group 1)
3047    /// Bug behavior: Infinite loop in build_deletes_row_selection
3048    #[tokio::test]
3049    async fn test_position_delete_in_skipped_row_group() {
3050        use arrow_array::{Int32Array, Int64Array};
3051        use parquet::file::reader::{FileReader, SerializedFileReader};
3052
3053        // Field IDs for positional delete schema
3054        const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
3055        const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
3056
3057        let tmp_dir = TempDir::new().unwrap();
3058        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3059
3060        // Create table schema with a single 'id' column
3061        let table_schema = Arc::new(
3062            Schema::builder()
3063                .with_schema_id(1)
3064                .with_fields(vec![
3065                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3066                ])
3067                .build()
3068                .unwrap(),
3069        );
3070
3071        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3072            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3073                PARQUET_FIELD_ID_META_KEY.to_string(),
3074                "1".to_string(),
3075            )])),
3076        ]));
3077
3078        // Step 1: Create data file with 200 rows in 2 row groups
3079        // Row group 0: rows 0-99 (ids 1-100)
3080        // Row group 1: rows 100-199 (ids 101-200)
3081        let data_file_path = format!("{table_location}/data.parquet");
3082
3083        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3084            Int32Array::from_iter_values(1..=100),
3085        )])
3086        .unwrap();
3087
3088        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3089            Int32Array::from_iter_values(101..=200),
3090        )])
3091        .unwrap();
3092
3093        // Force each batch into its own row group
3094        let props = WriterProperties::builder()
3095            .set_compression(Compression::SNAPPY)
3096            .set_max_row_group_size(100)
3097            .build();
3098
3099        let file = File::create(&data_file_path).unwrap();
3100        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3101        writer.write(&batch1).expect("Writing batch 1");
3102        writer.write(&batch2).expect("Writing batch 2");
3103        writer.close().unwrap();
3104
3105        // Verify we created 2 row groups
3106        let verify_file = File::open(&data_file_path).unwrap();
3107        let verify_reader = SerializedFileReader::new(verify_file).unwrap();
3108        assert_eq!(
3109            verify_reader.metadata().num_row_groups(),
3110            2,
3111            "Should have 2 row groups"
3112        );
3113
3114        // Step 2: Create position delete file that deletes row 0 (id=1, first row in row group 0)
3115        let delete_file_path = format!("{table_location}/deletes.parquet");
3116
3117        let delete_schema = Arc::new(ArrowSchema::new(vec![
3118            Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
3119                PARQUET_FIELD_ID_META_KEY.to_string(),
3120                FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
3121            )])),
3122            Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
3123                PARQUET_FIELD_ID_META_KEY.to_string(),
3124                FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
3125            )])),
3126        ]));
3127
3128        // Delete row at position 0 (0-indexed, so it's the first row: id=1)
3129        let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
3130            Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
3131            Arc::new(Int64Array::from_iter_values(vec![0i64])),
3132        ])
3133        .unwrap();
3134
3135        let delete_props = WriterProperties::builder()
3136            .set_compression(Compression::SNAPPY)
3137            .build();
3138
3139        let delete_file = File::create(&delete_file_path).unwrap();
3140        let mut delete_writer =
3141            ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
3142        delete_writer.write(&delete_batch).unwrap();
3143        delete_writer.close().unwrap();
3144
3145        // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199)
3146        // This exercises the row group selection code path where row group 0 is skipped
3147        let metadata_file = File::open(&data_file_path).unwrap();
3148        let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
3149        let metadata = metadata_reader.metadata();
3150
3151        let row_group_0 = metadata.row_group(0);
3152        let row_group_1 = metadata.row_group(1);
3153
3154        let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
3155        let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
3156        let rg1_length = row_group_1.compressed_size() as u64;
3157
3158        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3159        let reader = ArrowReaderBuilder::new(file_io).build();
3160
3161        // Create FileScanTask that reads ONLY row group 1 via byte range filtering
3162        let task = FileScanTask {
3163            start: rg1_start,
3164            length: rg1_length,
3165            record_count: Some(100), // Row group 1 has 100 rows
3166            data_file_path: data_file_path.clone(),
3167            data_file_format: DataFileFormat::Parquet,
3168            schema: table_schema.clone(),
3169            project_field_ids: vec![1],
3170            predicate: None,
3171            deletes: vec![FileScanTaskDeleteFile {
3172                file_path: delete_file_path,
3173                file_type: DataContentType::PositionDeletes,
3174                partition_spec_id: 0,
3175                equality_ids: None,
3176            }],
3177            partition: None,
3178            partition_spec: None,
3179            name_mapping: None,
3180            case_sensitive: false,
3181        };
3182
3183        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
3184        let result = reader
3185            .read(tasks)
3186            .unwrap()
3187            .try_collect::<Vec<RecordBatch>>()
3188            .await
3189            .unwrap();
3190
3191        // Step 4: Verify we got 100 rows (all of row group 1)
3192        // The delete at position 0 is in row group 0, which is skipped, so it doesn't affect us
3193        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
3194
3195        assert_eq!(
3196            total_rows, 100,
3197            "Expected 100 rows from row group 1 (delete at position 0 is in skipped row group 0). \
3198             If this hangs or fails, it indicates the cached delete index was not updated after advance_to()."
3199        );
3200
3201        // Verify we have all ids from row group 1 (101-200)
3202        let all_ids: Vec<i32> = result
3203            .iter()
3204            .flat_map(|batch| {
3205                batch
3206                    .column(0)
3207                    .as_primitive::<arrow_array::types::Int32Type>()
3208                    .values()
3209                    .iter()
3210                    .copied()
3211            })
3212            .collect();
3213
3214        let expected_ids: Vec<i32> = (101..=200).collect();
3215        assert_eq!(
3216            all_ids, expected_ids,
3217            "Should have ids 101-200 (all of row group 1)"
3218        );
3219    }
3220
3221    /// Test reading Parquet files without field ID metadata (e.g., migrated tables).
3222    /// This exercises the position-based fallback path.
3223    ///
3224    /// Corresponds to Java's ParquetSchemaUtil.addFallbackIds() + pruneColumnsFallback()
3225    /// in /parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java
3226    #[tokio::test]
3227    async fn test_read_parquet_file_without_field_ids() {
3228        let schema = Arc::new(
3229            Schema::builder()
3230                .with_schema_id(1)
3231                .with_fields(vec![
3232                    NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3233                    NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3234                ])
3235                .build()
3236                .unwrap(),
3237        );
3238
3239        // Parquet file from a migrated table - no field ID metadata
3240        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3241            Field::new("name", DataType::Utf8, false),
3242            Field::new("age", DataType::Int32, false),
3243        ]));
3244
3245        let tmp_dir = TempDir::new().unwrap();
3246        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3247        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3248
3249        let name_data = vec!["Alice", "Bob", "Charlie"];
3250        let age_data = vec![30, 25, 35];
3251
3252        use arrow_array::Int32Array;
3253        let name_col = Arc::new(StringArray::from(name_data.clone())) as ArrayRef;
3254        let age_col = Arc::new(Int32Array::from(age_data.clone())) as ArrayRef;
3255
3256        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![name_col, age_col]).unwrap();
3257
3258        let props = WriterProperties::builder()
3259            .set_compression(Compression::SNAPPY)
3260            .build();
3261
3262        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3263        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3264
3265        writer.write(&to_write).expect("Writing batch");
3266        writer.close().unwrap();
3267
3268        let reader = ArrowReaderBuilder::new(file_io).build();
3269
3270        let tasks = Box::pin(futures::stream::iter(
3271            vec![Ok(FileScanTask {
3272                start: 0,
3273                length: 0,
3274                record_count: None,
3275                data_file_path: format!("{table_location}/1.parquet"),
3276                data_file_format: DataFileFormat::Parquet,
3277                schema: schema.clone(),
3278                project_field_ids: vec![1, 2],
3279                predicate: None,
3280                deletes: vec![],
3281                partition: None,
3282                partition_spec: None,
3283                name_mapping: None,
3284                case_sensitive: false,
3285            })]
3286            .into_iter(),
3287        )) as FileScanTaskStream;
3288
3289        let result = reader
3290            .read(tasks)
3291            .unwrap()
3292            .try_collect::<Vec<RecordBatch>>()
3293            .await
3294            .unwrap();
3295
3296        assert_eq!(result.len(), 1);
3297        let batch = &result[0];
3298        assert_eq!(batch.num_rows(), 3);
3299        assert_eq!(batch.num_columns(), 2);
3300
3301        // Verify position-based mapping: field_id 1 → position 0, field_id 2 → position 1
3302        let name_array = batch.column(0).as_string::<i32>();
3303        assert_eq!(name_array.value(0), "Alice");
3304        assert_eq!(name_array.value(1), "Bob");
3305        assert_eq!(name_array.value(2), "Charlie");
3306
3307        let age_array = batch
3308            .column(1)
3309            .as_primitive::<arrow_array::types::Int32Type>();
3310        assert_eq!(age_array.value(0), 30);
3311        assert_eq!(age_array.value(1), 25);
3312        assert_eq!(age_array.value(2), 35);
3313    }
3314
3315    /// Test reading Parquet files without field IDs with partial projection.
3316    /// Only a subset of columns are requested, verifying position-based fallback
3317    /// handles column selection correctly.
3318    #[tokio::test]
3319    async fn test_read_parquet_without_field_ids_partial_projection() {
3320        use arrow_array::Int32Array;
3321
3322        let schema = Arc::new(
3323            Schema::builder()
3324                .with_schema_id(1)
3325                .with_fields(vec![
3326                    NestedField::required(1, "col1", Type::Primitive(PrimitiveType::String)).into(),
3327                    NestedField::required(2, "col2", Type::Primitive(PrimitiveType::Int)).into(),
3328                    NestedField::required(3, "col3", Type::Primitive(PrimitiveType::String)).into(),
3329                    NestedField::required(4, "col4", Type::Primitive(PrimitiveType::Int)).into(),
3330                ])
3331                .build()
3332                .unwrap(),
3333        );
3334
3335        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3336            Field::new("col1", DataType::Utf8, false),
3337            Field::new("col2", DataType::Int32, false),
3338            Field::new("col3", DataType::Utf8, false),
3339            Field::new("col4", DataType::Int32, false),
3340        ]));
3341
3342        let tmp_dir = TempDir::new().unwrap();
3343        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3344        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3345
3346        let col1_data = Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef;
3347        let col2_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3348        let col3_data = Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef;
3349        let col4_data = Arc::new(Int32Array::from(vec![30, 40])) as ArrayRef;
3350
3351        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
3352            col1_data, col2_data, col3_data, col4_data,
3353        ])
3354        .unwrap();
3355
3356        let props = WriterProperties::builder()
3357            .set_compression(Compression::SNAPPY)
3358            .build();
3359
3360        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3361        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3362
3363        writer.write(&to_write).expect("Writing batch");
3364        writer.close().unwrap();
3365
3366        let reader = ArrowReaderBuilder::new(file_io).build();
3367
3368        let tasks = Box::pin(futures::stream::iter(
3369            vec![Ok(FileScanTask {
3370                start: 0,
3371                length: 0,
3372                record_count: None,
3373                data_file_path: format!("{table_location}/1.parquet"),
3374                data_file_format: DataFileFormat::Parquet,
3375                schema: schema.clone(),
3376                project_field_ids: vec![1, 3],
3377                predicate: None,
3378                deletes: vec![],
3379                partition: None,
3380                partition_spec: None,
3381                name_mapping: None,
3382                case_sensitive: false,
3383            })]
3384            .into_iter(),
3385        )) as FileScanTaskStream;
3386
3387        let result = reader
3388            .read(tasks)
3389            .unwrap()
3390            .try_collect::<Vec<RecordBatch>>()
3391            .await
3392            .unwrap();
3393
3394        assert_eq!(result.len(), 1);
3395        let batch = &result[0];
3396        assert_eq!(batch.num_rows(), 2);
3397        assert_eq!(batch.num_columns(), 2);
3398
3399        let col1_array = batch.column(0).as_string::<i32>();
3400        assert_eq!(col1_array.value(0), "a");
3401        assert_eq!(col1_array.value(1), "b");
3402
3403        let col3_array = batch.column(1).as_string::<i32>();
3404        assert_eq!(col3_array.value(0), "c");
3405        assert_eq!(col3_array.value(1), "d");
3406    }
3407
3408    /// Test reading Parquet files without field IDs with schema evolution.
3409    /// The Iceberg schema has more fields than the Parquet file, testing that
3410    /// missing columns are filled with NULLs.
3411    #[tokio::test]
3412    async fn test_read_parquet_without_field_ids_schema_evolution() {
3413        use arrow_array::{Array, Int32Array};
3414
3415        // Schema with field 3 added after the file was written
3416        let schema = Arc::new(
3417            Schema::builder()
3418                .with_schema_id(1)
3419                .with_fields(vec![
3420                    NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3421                    NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3422                    NestedField::optional(3, "city", Type::Primitive(PrimitiveType::String)).into(),
3423                ])
3424                .build()
3425                .unwrap(),
3426        );
3427
3428        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3429            Field::new("name", DataType::Utf8, false),
3430            Field::new("age", DataType::Int32, false),
3431        ]));
3432
3433        let tmp_dir = TempDir::new().unwrap();
3434        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3435        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3436
3437        let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3438        let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3439
3440        let to_write =
3441            RecordBatch::try_new(arrow_schema.clone(), vec![name_data, age_data]).unwrap();
3442
3443        let props = WriterProperties::builder()
3444            .set_compression(Compression::SNAPPY)
3445            .build();
3446
3447        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3448        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3449
3450        writer.write(&to_write).expect("Writing batch");
3451        writer.close().unwrap();
3452
3453        let reader = ArrowReaderBuilder::new(file_io).build();
3454
3455        let tasks = Box::pin(futures::stream::iter(
3456            vec![Ok(FileScanTask {
3457                start: 0,
3458                length: 0,
3459                record_count: None,
3460                data_file_path: format!("{table_location}/1.parquet"),
3461                data_file_format: DataFileFormat::Parquet,
3462                schema: schema.clone(),
3463                project_field_ids: vec![1, 2, 3],
3464                predicate: None,
3465                deletes: vec![],
3466                partition: None,
3467                partition_spec: None,
3468                name_mapping: None,
3469                case_sensitive: false,
3470            })]
3471            .into_iter(),
3472        )) as FileScanTaskStream;
3473
3474        let result = reader
3475            .read(tasks)
3476            .unwrap()
3477            .try_collect::<Vec<RecordBatch>>()
3478            .await
3479            .unwrap();
3480
3481        assert_eq!(result.len(), 1);
3482        let batch = &result[0];
3483        assert_eq!(batch.num_rows(), 2);
3484        assert_eq!(batch.num_columns(), 3);
3485
3486        let name_array = batch.column(0).as_string::<i32>();
3487        assert_eq!(name_array.value(0), "Alice");
3488        assert_eq!(name_array.value(1), "Bob");
3489
3490        let age_array = batch
3491            .column(1)
3492            .as_primitive::<arrow_array::types::Int32Type>();
3493        assert_eq!(age_array.value(0), 30);
3494        assert_eq!(age_array.value(1), 25);
3495
3496        // Verify missing column filled with NULLs
3497        let city_array = batch.column(2).as_string::<i32>();
3498        assert_eq!(city_array.null_count(), 2);
3499        assert!(city_array.is_null(0));
3500        assert!(city_array.is_null(1));
3501    }
3502
3503    /// Test reading Parquet files without field IDs that have multiple row groups.
3504    /// This ensures the position-based fallback works correctly across row group boundaries.
3505    #[tokio::test]
3506    async fn test_read_parquet_without_field_ids_multiple_row_groups() {
3507        use arrow_array::Int32Array;
3508
3509        let schema = Arc::new(
3510            Schema::builder()
3511                .with_schema_id(1)
3512                .with_fields(vec![
3513                    NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3514                    NestedField::required(2, "value", Type::Primitive(PrimitiveType::Int)).into(),
3515                ])
3516                .build()
3517                .unwrap(),
3518        );
3519
3520        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3521            Field::new("name", DataType::Utf8, false),
3522            Field::new("value", DataType::Int32, false),
3523        ]));
3524
3525        let tmp_dir = TempDir::new().unwrap();
3526        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3527        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3528
3529        // Small row group size to create multiple row groups
3530        let props = WriterProperties::builder()
3531            .set_compression(Compression::SNAPPY)
3532            .set_write_batch_size(2)
3533            .set_max_row_group_size(2)
3534            .build();
3535
3536        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3537        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3538
3539        // Write 6 rows in 3 batches (will create 3 row groups)
3540        for batch_num in 0..3 {
3541            let name_data = Arc::new(StringArray::from(vec![
3542                format!("name_{}", batch_num * 2),
3543                format!("name_{}", batch_num * 2 + 1),
3544            ])) as ArrayRef;
3545            let value_data =
3546                Arc::new(Int32Array::from(vec![batch_num * 2, batch_num * 2 + 1])) as ArrayRef;
3547
3548            let batch =
3549                RecordBatch::try_new(arrow_schema.clone(), vec![name_data, value_data]).unwrap();
3550            writer.write(&batch).expect("Writing batch");
3551        }
3552        writer.close().unwrap();
3553
3554        let reader = ArrowReaderBuilder::new(file_io).build();
3555
3556        let tasks = Box::pin(futures::stream::iter(
3557            vec![Ok(FileScanTask {
3558                start: 0,
3559                length: 0,
3560                record_count: None,
3561                data_file_path: format!("{table_location}/1.parquet"),
3562                data_file_format: DataFileFormat::Parquet,
3563                schema: schema.clone(),
3564                project_field_ids: vec![1, 2],
3565                predicate: None,
3566                deletes: vec![],
3567                partition: None,
3568                partition_spec: None,
3569                name_mapping: None,
3570                case_sensitive: false,
3571            })]
3572            .into_iter(),
3573        )) as FileScanTaskStream;
3574
3575        let result = reader
3576            .read(tasks)
3577            .unwrap()
3578            .try_collect::<Vec<RecordBatch>>()
3579            .await
3580            .unwrap();
3581
3582        assert!(!result.is_empty());
3583
3584        let mut all_names = Vec::new();
3585        let mut all_values = Vec::new();
3586
3587        for batch in &result {
3588            let name_array = batch.column(0).as_string::<i32>();
3589            let value_array = batch
3590                .column(1)
3591                .as_primitive::<arrow_array::types::Int32Type>();
3592
3593            for i in 0..batch.num_rows() {
3594                all_names.push(name_array.value(i).to_string());
3595                all_values.push(value_array.value(i));
3596            }
3597        }
3598
3599        assert_eq!(all_names.len(), 6);
3600        assert_eq!(all_values.len(), 6);
3601
3602        for i in 0..6 {
3603            assert_eq!(all_names[i], format!("name_{i}"));
3604            assert_eq!(all_values[i], i as i32);
3605        }
3606    }
3607
3608    /// Test reading Parquet files without field IDs with nested types (struct).
3609    /// Java's pruneColumnsFallback() projects entire top-level columns including nested content.
3610    /// This test verifies that a top-level struct field is projected correctly with all its nested fields.
3611    #[tokio::test]
3612    async fn test_read_parquet_without_field_ids_with_struct() {
3613        use arrow_array::{Int32Array, StructArray};
3614        use arrow_schema::Fields;
3615
3616        let schema = Arc::new(
3617            Schema::builder()
3618                .with_schema_id(1)
3619                .with_fields(vec![
3620                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3621                    NestedField::required(
3622                        2,
3623                        "person",
3624                        Type::Struct(crate::spec::StructType::new(vec![
3625                            NestedField::required(
3626                                3,
3627                                "name",
3628                                Type::Primitive(PrimitiveType::String),
3629                            )
3630                            .into(),
3631                            NestedField::required(4, "age", Type::Primitive(PrimitiveType::Int))
3632                                .into(),
3633                        ])),
3634                    )
3635                    .into(),
3636                ])
3637                .build()
3638                .unwrap(),
3639        );
3640
3641        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3642            Field::new("id", DataType::Int32, false),
3643            Field::new(
3644                "person",
3645                DataType::Struct(Fields::from(vec![
3646                    Field::new("name", DataType::Utf8, false),
3647                    Field::new("age", DataType::Int32, false),
3648                ])),
3649                false,
3650            ),
3651        ]));
3652
3653        let tmp_dir = TempDir::new().unwrap();
3654        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3655        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3656
3657        let id_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3658        let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3659        let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3660        let person_data = Arc::new(StructArray::from(vec![
3661            (
3662                Arc::new(Field::new("name", DataType::Utf8, false)),
3663                name_data,
3664            ),
3665            (
3666                Arc::new(Field::new("age", DataType::Int32, false)),
3667                age_data,
3668            ),
3669        ])) as ArrayRef;
3670
3671        let to_write =
3672            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, person_data]).unwrap();
3673
3674        let props = WriterProperties::builder()
3675            .set_compression(Compression::SNAPPY)
3676            .build();
3677
3678        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3679        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3680
3681        writer.write(&to_write).expect("Writing batch");
3682        writer.close().unwrap();
3683
3684        let reader = ArrowReaderBuilder::new(file_io).build();
3685
3686        let tasks = Box::pin(futures::stream::iter(
3687            vec![Ok(FileScanTask {
3688                start: 0,
3689                length: 0,
3690                record_count: None,
3691                data_file_path: format!("{table_location}/1.parquet"),
3692                data_file_format: DataFileFormat::Parquet,
3693                schema: schema.clone(),
3694                project_field_ids: vec![1, 2],
3695                predicate: None,
3696                deletes: vec![],
3697                partition: None,
3698                partition_spec: None,
3699                name_mapping: None,
3700                case_sensitive: false,
3701            })]
3702            .into_iter(),
3703        )) as FileScanTaskStream;
3704
3705        let result = reader
3706            .read(tasks)
3707            .unwrap()
3708            .try_collect::<Vec<RecordBatch>>()
3709            .await
3710            .unwrap();
3711
3712        assert_eq!(result.len(), 1);
3713        let batch = &result[0];
3714        assert_eq!(batch.num_rows(), 2);
3715        assert_eq!(batch.num_columns(), 2);
3716
3717        let id_array = batch
3718            .column(0)
3719            .as_primitive::<arrow_array::types::Int32Type>();
3720        assert_eq!(id_array.value(0), 1);
3721        assert_eq!(id_array.value(1), 2);
3722
3723        let person_array = batch.column(1).as_struct();
3724        assert_eq!(person_array.num_columns(), 2);
3725
3726        let name_array = person_array.column(0).as_string::<i32>();
3727        assert_eq!(name_array.value(0), "Alice");
3728        assert_eq!(name_array.value(1), "Bob");
3729
3730        let age_array = person_array
3731            .column(1)
3732            .as_primitive::<arrow_array::types::Int32Type>();
3733        assert_eq!(age_array.value(0), 30);
3734        assert_eq!(age_array.value(1), 25);
3735    }
3736
3737    /// Test reading Parquet files without field IDs with schema evolution - column added in the middle.
3738    /// When a new column is inserted between existing columns in the schema order,
3739    /// the fallback projection must correctly map field IDs to output positions.
3740    #[tokio::test]
3741    async fn test_read_parquet_without_field_ids_schema_evolution_add_column_in_middle() {
3742        use arrow_array::{Array, Int32Array};
3743
3744        let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
3745            Field::new("col0", DataType::Int32, true),
3746            Field::new("col1", DataType::Int32, true),
3747        ]));
3748
3749        // New column added between existing columns: col0 (id=1), newCol (id=5), col1 (id=2)
3750        let schema = Arc::new(
3751            Schema::builder()
3752                .with_schema_id(1)
3753                .with_fields(vec![
3754                    NestedField::optional(1, "col0", Type::Primitive(PrimitiveType::Int)).into(),
3755                    NestedField::optional(5, "newCol", Type::Primitive(PrimitiveType::Int)).into(),
3756                    NestedField::optional(2, "col1", Type::Primitive(PrimitiveType::Int)).into(),
3757                ])
3758                .build()
3759                .unwrap(),
3760        );
3761
3762        let tmp_dir = TempDir::new().unwrap();
3763        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3764        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3765
3766        let col0_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3767        let col1_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3768
3769        let to_write =
3770            RecordBatch::try_new(arrow_schema_old.clone(), vec![col0_data, col1_data]).unwrap();
3771
3772        let props = WriterProperties::builder()
3773            .set_compression(Compression::SNAPPY)
3774            .build();
3775
3776        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3777        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3778        writer.write(&to_write).expect("Writing batch");
3779        writer.close().unwrap();
3780
3781        let reader = ArrowReaderBuilder::new(file_io).build();
3782
3783        let tasks = Box::pin(futures::stream::iter(
3784            vec![Ok(FileScanTask {
3785                start: 0,
3786                length: 0,
3787                record_count: None,
3788                data_file_path: format!("{table_location}/1.parquet"),
3789                data_file_format: DataFileFormat::Parquet,
3790                schema: schema.clone(),
3791                project_field_ids: vec![1, 5, 2],
3792                predicate: None,
3793                deletes: vec![],
3794                partition: None,
3795                partition_spec: None,
3796                name_mapping: None,
3797                case_sensitive: false,
3798            })]
3799            .into_iter(),
3800        )) as FileScanTaskStream;
3801
3802        let result = reader
3803            .read(tasks)
3804            .unwrap()
3805            .try_collect::<Vec<RecordBatch>>()
3806            .await
3807            .unwrap();
3808
3809        assert_eq!(result.len(), 1);
3810        let batch = &result[0];
3811        assert_eq!(batch.num_rows(), 2);
3812        assert_eq!(batch.num_columns(), 3);
3813
3814        let result_col0 = batch
3815            .column(0)
3816            .as_primitive::<arrow_array::types::Int32Type>();
3817        assert_eq!(result_col0.value(0), 1);
3818        assert_eq!(result_col0.value(1), 2);
3819
3820        // New column should be NULL (doesn't exist in old file)
3821        let result_newcol = batch
3822            .column(1)
3823            .as_primitive::<arrow_array::types::Int32Type>();
3824        assert_eq!(result_newcol.null_count(), 2);
3825        assert!(result_newcol.is_null(0));
3826        assert!(result_newcol.is_null(1));
3827
3828        let result_col1 = batch
3829            .column(2)
3830            .as_primitive::<arrow_array::types::Int32Type>();
3831        assert_eq!(result_col1.value(0), 10);
3832        assert_eq!(result_col1.value(1), 20);
3833    }
3834
3835    /// Test reading Parquet files without field IDs with a filter that eliminates all row groups.
3836    /// During development of field ID mapping, we saw a panic when row_selection_enabled=true and
3837    /// all row groups are filtered out.
3838    #[tokio::test]
3839    async fn test_read_parquet_without_field_ids_filter_eliminates_all_rows() {
3840        use arrow_array::{Float64Array, Int32Array};
3841
3842        // Schema with fields that will use fallback IDs 1, 2, 3
3843        let schema = Arc::new(
3844            Schema::builder()
3845                .with_schema_id(1)
3846                .with_fields(vec![
3847                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3848                    NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3849                    NestedField::required(3, "value", Type::Primitive(PrimitiveType::Double))
3850                        .into(),
3851                ])
3852                .build()
3853                .unwrap(),
3854        );
3855
3856        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3857            Field::new("id", DataType::Int32, false),
3858            Field::new("name", DataType::Utf8, false),
3859            Field::new("value", DataType::Float64, false),
3860        ]));
3861
3862        let tmp_dir = TempDir::new().unwrap();
3863        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3864        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3865
3866        // Write data where all ids are >= 10
3867        let id_data = Arc::new(Int32Array::from(vec![10, 11, 12])) as ArrayRef;
3868        let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
3869        let value_data = Arc::new(Float64Array::from(vec![100.0, 200.0, 300.0])) as ArrayRef;
3870
3871        let to_write =
3872            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data, value_data])
3873                .unwrap();
3874
3875        let props = WriterProperties::builder()
3876            .set_compression(Compression::SNAPPY)
3877            .build();
3878
3879        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3880        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3881        writer.write(&to_write).expect("Writing batch");
3882        writer.close().unwrap();
3883
3884        // Filter that eliminates all row groups: id < 5
3885        let predicate = Reference::new("id").less_than(Datum::int(5));
3886
3887        // Enable both row_group_filtering and row_selection - triggered the panic
3888        let reader = ArrowReaderBuilder::new(file_io)
3889            .with_row_group_filtering_enabled(true)
3890            .with_row_selection_enabled(true)
3891            .build();
3892
3893        let tasks = Box::pin(futures::stream::iter(
3894            vec![Ok(FileScanTask {
3895                start: 0,
3896                length: 0,
3897                record_count: None,
3898                data_file_path: format!("{table_location}/1.parquet"),
3899                data_file_format: DataFileFormat::Parquet,
3900                schema: schema.clone(),
3901                project_field_ids: vec![1, 2, 3],
3902                predicate: Some(predicate.bind(schema, true).unwrap()),
3903                deletes: vec![],
3904                partition: None,
3905                partition_spec: None,
3906                name_mapping: None,
3907                case_sensitive: false,
3908            })]
3909            .into_iter(),
3910        )) as FileScanTaskStream;
3911
3912        // Should no longer panic
3913        let result = reader
3914            .read(tasks)
3915            .unwrap()
3916            .try_collect::<Vec<RecordBatch>>()
3917            .await
3918            .unwrap();
3919
3920        // Should return empty results
3921        assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0));
3922    }
3923
3924    /// Test that concurrency=1 reads all files correctly and in deterministic order.
3925    /// This verifies the fast-path optimization for single concurrency.
3926    #[tokio::test]
3927    async fn test_read_with_concurrency_one() {
3928        use arrow_array::Int32Array;
3929
3930        let schema = Arc::new(
3931            Schema::builder()
3932                .with_schema_id(1)
3933                .with_fields(vec![
3934                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3935                    NestedField::required(2, "file_num", Type::Primitive(PrimitiveType::Int))
3936                        .into(),
3937                ])
3938                .build()
3939                .unwrap(),
3940        );
3941
3942        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3943            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3944                PARQUET_FIELD_ID_META_KEY.to_string(),
3945                "1".to_string(),
3946            )])),
3947            Field::new("file_num", DataType::Int32, false).with_metadata(HashMap::from([(
3948                PARQUET_FIELD_ID_META_KEY.to_string(),
3949                "2".to_string(),
3950            )])),
3951        ]));
3952
3953        let tmp_dir = TempDir::new().unwrap();
3954        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3955        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3956
3957        // Create 3 parquet files with different data
3958        let props = WriterProperties::builder()
3959            .set_compression(Compression::SNAPPY)
3960            .build();
3961
3962        for file_num in 0..3 {
3963            let id_data = Arc::new(Int32Array::from_iter_values(
3964                file_num * 10..(file_num + 1) * 10,
3965            )) as ArrayRef;
3966            let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10])) as ArrayRef;
3967
3968            let to_write =
3969                RecordBatch::try_new(arrow_schema.clone(), vec![id_data, file_num_data]).unwrap();
3970
3971            let file = File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap();
3972            let mut writer =
3973                ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
3974            writer.write(&to_write).expect("Writing batch");
3975            writer.close().unwrap();
3976        }
3977
3978        // Read with concurrency=1 (fast-path)
3979        let reader = ArrowReaderBuilder::new(file_io)
3980            .with_data_file_concurrency_limit(1)
3981            .build();
3982
3983        // Create tasks in a specific order: file_0, file_1, file_2
3984        let tasks = vec![
3985            Ok(FileScanTask {
3986                start: 0,
3987                length: 0,
3988                record_count: None,
3989                data_file_path: format!("{table_location}/file_0.parquet"),
3990                data_file_format: DataFileFormat::Parquet,
3991                schema: schema.clone(),
3992                project_field_ids: vec![1, 2],
3993                predicate: None,
3994                deletes: vec![],
3995                partition: None,
3996                partition_spec: None,
3997                name_mapping: None,
3998                case_sensitive: false,
3999            }),
4000            Ok(FileScanTask {
4001                start: 0,
4002                length: 0,
4003                record_count: None,
4004                data_file_path: format!("{table_location}/file_1.parquet"),
4005                data_file_format: DataFileFormat::Parquet,
4006                schema: schema.clone(),
4007                project_field_ids: vec![1, 2],
4008                predicate: None,
4009                deletes: vec![],
4010                partition: None,
4011                partition_spec: None,
4012                name_mapping: None,
4013                case_sensitive: false,
4014            }),
4015            Ok(FileScanTask {
4016                start: 0,
4017                length: 0,
4018                record_count: None,
4019                data_file_path: format!("{table_location}/file_2.parquet"),
4020                data_file_format: DataFileFormat::Parquet,
4021                schema: schema.clone(),
4022                project_field_ids: vec![1, 2],
4023                predicate: None,
4024                deletes: vec![],
4025                partition: None,
4026                partition_spec: None,
4027                name_mapping: None,
4028                case_sensitive: false,
4029            }),
4030        ];
4031
4032        let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream;
4033
4034        let result = reader
4035            .read(tasks_stream)
4036            .unwrap()
4037            .try_collect::<Vec<RecordBatch>>()
4038            .await
4039            .unwrap();
4040
4041        // Verify we got all 30 rows (10 from each file)
4042        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
4043        assert_eq!(total_rows, 30, "Should have 30 total rows");
4044
4045        // Collect all ids and file_nums to verify data
4046        let mut all_ids = Vec::new();
4047        let mut all_file_nums = Vec::new();
4048
4049        for batch in &result {
4050            let id_col = batch
4051                .column(0)
4052                .as_primitive::<arrow_array::types::Int32Type>();
4053            let file_num_col = batch
4054                .column(1)
4055                .as_primitive::<arrow_array::types::Int32Type>();
4056
4057            for i in 0..batch.num_rows() {
4058                all_ids.push(id_col.value(i));
4059                all_file_nums.push(file_num_col.value(i));
4060            }
4061        }
4062
4063        assert_eq!(all_ids.len(), 30);
4064        assert_eq!(all_file_nums.len(), 30);
4065
4066        // With concurrency=1 and sequential processing, files should be processed in order
4067        // file_0: ids 0-9, file_num=0
4068        // file_1: ids 10-19, file_num=1
4069        // file_2: ids 20-29, file_num=2
4070        for i in 0..10 {
4071            assert_eq!(all_file_nums[i], 0, "First 10 rows should be from file_0");
4072            assert_eq!(all_ids[i], i as i32, "IDs should be 0-9");
4073        }
4074        for i in 10..20 {
4075            assert_eq!(all_file_nums[i], 1, "Next 10 rows should be from file_1");
4076            assert_eq!(all_ids[i], i as i32, "IDs should be 10-19");
4077        }
4078        for i in 20..30 {
4079            assert_eq!(all_file_nums[i], 2, "Last 10 rows should be from file_2");
4080            assert_eq!(all_ids[i], i as i32, "IDs should be 20-29");
4081        }
4082    }
4083
4084    /// Test bucket partitioning reads source column from data file (not partition metadata).
4085    ///
4086    /// This is an integration test verifying the complete ArrowReader pipeline with bucket partitioning.
4087    /// It corresponds to TestRuntimeFiltering tests in Iceberg Java (e.g., testRenamedSourceColumnTable).
4088    ///
4089    /// # Iceberg Spec Requirements
4090    ///
4091    /// Per the Iceberg spec "Column Projection" section:
4092    /// > "Return the value from partition metadata if an **Identity Transform** exists for the field"
4093    ///
4094    /// This means:
4095    /// - Identity transforms (e.g., `identity(dept)`) use constants from partition metadata
4096    /// - Non-identity transforms (e.g., `bucket(4, id)`) must read source columns from data files
4097    /// - Partition metadata for bucket transforms stores bucket numbers (0-3), NOT source values
4098    ///
4099    /// Java's PartitionUtil.constantsMap() implements this via:
4100    /// ```java
4101    /// if (field.transform().isIdentity()) {
4102    ///     idToConstant.put(field.sourceId(), converted);
4103    /// }
4104    /// ```
4105    ///
4106    /// # What This Test Verifies
4107    ///
4108    /// This test ensures the full ArrowReader → RecordBatchTransformer pipeline correctly handles
4109    /// bucket partitioning when FileScanTask provides partition_spec and partition_data:
4110    ///
4111    /// - Parquet file has field_id=1 named "id" with actual data [1, 5, 9, 13]
4112    /// - FileScanTask specifies partition_spec with bucket(4, id) and partition_data with bucket=1
4113    /// - RecordBatchTransformer.constants_map() excludes bucket-partitioned field from constants
4114    /// - ArrowReader correctly reads [1, 5, 9, 13] from the data file
4115    /// - Values are NOT replaced with constant 1 from partition metadata
4116    ///
4117    /// # Why This Matters
4118    ///
4119    /// Without correct handling:
4120    /// - Runtime filtering would break (e.g., `WHERE id = 5` would fail)
4121    /// - Query results would be incorrect (all rows would have id=1)
4122    /// - Bucket partitioning would be unusable for query optimization
4123    ///
4124    /// # References
4125    /// - Iceberg spec: format/spec.md "Column Projection" + "Partition Transforms"
4126    /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java
4127    /// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
4128    #[tokio::test]
4129    async fn test_bucket_partitioning_reads_source_column_from_file() {
4130        use arrow_array::Int32Array;
4131
4132        use crate::spec::{Literal, PartitionSpec, Struct, Transform};
4133
4134        // Iceberg schema with id and name columns
4135        let schema = Arc::new(
4136            Schema::builder()
4137                .with_schema_id(0)
4138                .with_fields(vec![
4139                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
4140                    NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
4141                ])
4142                .build()
4143                .unwrap(),
4144        );
4145
4146        // Partition spec: bucket(4, id)
4147        let partition_spec = Arc::new(
4148            PartitionSpec::builder(schema.clone())
4149                .with_spec_id(0)
4150                .add_partition_field("id", "id_bucket", Transform::Bucket(4))
4151                .unwrap()
4152                .build()
4153                .unwrap(),
4154        );
4155
4156        // Partition data: bucket value is 1
4157        let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]);
4158
4159        // Create Arrow schema with field IDs for Parquet file
4160        let arrow_schema = Arc::new(ArrowSchema::new(vec![
4161            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
4162                PARQUET_FIELD_ID_META_KEY.to_string(),
4163                "1".to_string(),
4164            )])),
4165            Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
4166                PARQUET_FIELD_ID_META_KEY.to_string(),
4167                "2".to_string(),
4168            )])),
4169        ]));
4170
4171        // Write Parquet file with data
4172        let tmp_dir = TempDir::new().unwrap();
4173        let table_location = tmp_dir.path().to_str().unwrap().to_string();
4174        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
4175
4176        let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as ArrayRef;
4177        let name_data =
4178            Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef;
4179
4180        let to_write =
4181            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data]).unwrap();
4182
4183        let props = WriterProperties::builder()
4184            .set_compression(Compression::SNAPPY)
4185            .build();
4186        let file = File::create(format!("{}/data.parquet", &table_location)).unwrap();
4187        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
4188        writer.write(&to_write).expect("Writing batch");
4189        writer.close().unwrap();
4190
4191        // Read the Parquet file with partition spec and data
4192        let reader = ArrowReaderBuilder::new(file_io).build();
4193        let tasks = Box::pin(futures::stream::iter(
4194            vec![Ok(FileScanTask {
4195                start: 0,
4196                length: 0,
4197                record_count: None,
4198                data_file_path: format!("{table_location}/data.parquet"),
4199                data_file_format: DataFileFormat::Parquet,
4200                schema: schema.clone(),
4201                project_field_ids: vec![1, 2],
4202                predicate: None,
4203                deletes: vec![],
4204                partition: Some(partition_data),
4205                partition_spec: Some(partition_spec),
4206                name_mapping: None,
4207                case_sensitive: false,
4208            })]
4209            .into_iter(),
4210        )) as FileScanTaskStream;
4211
4212        let result = reader
4213            .read(tasks)
4214            .unwrap()
4215            .try_collect::<Vec<RecordBatch>>()
4216            .await
4217            .unwrap();
4218
4219        // Verify we got the correct data
4220        assert_eq!(result.len(), 1);
4221        let batch = &result[0];
4222
4223        assert_eq!(batch.num_columns(), 2);
4224        assert_eq!(batch.num_rows(), 4);
4225
4226        // The id column MUST contain actual values from the Parquet file [1, 5, 9, 13],
4227        // NOT the constant partition value 1
4228        let id_col = batch
4229            .column(0)
4230            .as_primitive::<arrow_array::types::Int32Type>();
4231        assert_eq!(id_col.value(0), 1);
4232        assert_eq!(id_col.value(1), 5);
4233        assert_eq!(id_col.value(2), 9);
4234        assert_eq!(id_col.value(3), 13);
4235
4236        let name_col = batch.column(1).as_string::<i32>();
4237        assert_eq!(name_col.value(0), "Alice");
4238        assert_eq!(name_col.value(1), "Bob");
4239        assert_eq!(name_col.value(2), "Charlie");
4240        assert_eq!(name_col.value(3), "Dave");
4241    }
4242}