iceberg/arrow/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Parquet file data reader
19
20use std::collections::{HashMap, HashSet};
21use std::ops::Range;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene};
26use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar};
27use arrow_cast::cast::cast;
28use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
29use arrow_schema::{
30    ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
31};
32use arrow_string::like::starts_with;
33use bytes::Bytes;
34use fnv::FnvHashSet;
35use futures::future::BoxFuture;
36use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
37use parquet::arrow::arrow_reader::{
38    ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
39};
40use parquet::arrow::async_reader::AsyncFileReader;
41use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask};
42use parquet::file::metadata::{
43    PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
44};
45use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
46
47use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
48use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
49use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
50use crate::delete_vector::DeleteVector;
51use crate::error::Result;
52use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit};
53use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
54use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
55use crate::expr::{BoundPredicate, BoundReference};
56use crate::io::{FileIO, FileMetadata, FileRead};
57use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
58use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
59use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type};
60use crate::utils::available_parallelism;
61use crate::{Error, ErrorKind};
62
63/// Builder to create ArrowReader
64pub struct ArrowReaderBuilder {
65    batch_size: Option<usize>,
66    file_io: FileIO,
67    concurrency_limit_data_files: usize,
68    row_group_filtering_enabled: bool,
69    row_selection_enabled: bool,
70    metadata_size_hint: Option<usize>,
71}
72
73impl ArrowReaderBuilder {
74    /// Create a new ArrowReaderBuilder
75    pub fn new(file_io: FileIO) -> Self {
76        let num_cpus = available_parallelism().get();
77
78        ArrowReaderBuilder {
79            batch_size: None,
80            file_io,
81            concurrency_limit_data_files: num_cpus,
82            row_group_filtering_enabled: true,
83            row_selection_enabled: false,
84            metadata_size_hint: None,
85        }
86    }
87
88    /// Sets the max number of in flight data files that are being fetched
89    pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
90        self.concurrency_limit_data_files = val;
91        self
92    }
93
94    /// Sets the desired size of batches in the response
95    /// to something other than the default
96    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
97        self.batch_size = Some(batch_size);
98        self
99    }
100
101    /// Determines whether to enable row group filtering.
102    pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
103        self.row_group_filtering_enabled = row_group_filtering_enabled;
104        self
105    }
106
107    /// Determines whether to enable row selection.
108    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
109        self.row_selection_enabled = row_selection_enabled;
110        self
111    }
112
113    /// Provide a hint as to the number of bytes to prefetch for parsing the Parquet metadata
114    ///
115    /// This hint can help reduce the number of fetch requests. For more details see the
116    /// [ParquetMetaDataReader documentation](https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html#method.with_prefetch_hint).
117    pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
118        self.metadata_size_hint = Some(metadata_size_hint);
119        self
120    }
121
122    /// Build the ArrowReader.
123    pub fn build(self) -> ArrowReader {
124        ArrowReader {
125            batch_size: self.batch_size,
126            file_io: self.file_io.clone(),
127            delete_file_loader: CachingDeleteFileLoader::new(
128                self.file_io.clone(),
129                self.concurrency_limit_data_files,
130            ),
131            concurrency_limit_data_files: self.concurrency_limit_data_files,
132            row_group_filtering_enabled: self.row_group_filtering_enabled,
133            row_selection_enabled: self.row_selection_enabled,
134            metadata_size_hint: self.metadata_size_hint,
135        }
136    }
137}
138
139/// Reads data from Parquet files
140#[derive(Clone)]
141pub struct ArrowReader {
142    batch_size: Option<usize>,
143    file_io: FileIO,
144    delete_file_loader: CachingDeleteFileLoader,
145
146    /// the maximum number of data files that can be fetched at the same time
147    concurrency_limit_data_files: usize,
148
149    row_group_filtering_enabled: bool,
150    row_selection_enabled: bool,
151    metadata_size_hint: Option<usize>,
152}
153
154impl ArrowReader {
155    /// Take a stream of FileScanTasks and reads all the files.
156    /// Returns a stream of Arrow RecordBatches containing the data from the files
157    pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
158        let file_io = self.file_io.clone();
159        let batch_size = self.batch_size;
160        let concurrency_limit_data_files = self.concurrency_limit_data_files;
161        let row_group_filtering_enabled = self.row_group_filtering_enabled;
162        let row_selection_enabled = self.row_selection_enabled;
163        let metadata_size_hint = self.metadata_size_hint;
164
165        // Fast-path for single concurrency to avoid overhead of try_flatten_unordered
166        let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 {
167            Box::pin(
168                tasks
169                    .and_then(move |task| {
170                        let file_io = file_io.clone();
171
172                        Self::process_file_scan_task(
173                            task,
174                            batch_size,
175                            file_io,
176                            self.delete_file_loader.clone(),
177                            row_group_filtering_enabled,
178                            row_selection_enabled,
179                            metadata_size_hint,
180                        )
181                    })
182                    .map_err(|err| {
183                        Error::new(ErrorKind::Unexpected, "file scan task generate failed")
184                            .with_source(err)
185                    })
186                    .try_flatten(),
187            )
188        } else {
189            Box::pin(
190                tasks
191                    .map_ok(move |task| {
192                        let file_io = file_io.clone();
193
194                        Self::process_file_scan_task(
195                            task,
196                            batch_size,
197                            file_io,
198                            self.delete_file_loader.clone(),
199                            row_group_filtering_enabled,
200                            row_selection_enabled,
201                            metadata_size_hint,
202                        )
203                    })
204                    .map_err(|err| {
205                        Error::new(ErrorKind::Unexpected, "file scan task generate failed")
206                            .with_source(err)
207                    })
208                    .try_buffer_unordered(concurrency_limit_data_files)
209                    .try_flatten_unordered(concurrency_limit_data_files),
210            )
211        };
212
213        Ok(stream)
214    }
215
216    #[allow(clippy::too_many_arguments)]
217    async fn process_file_scan_task(
218        task: FileScanTask,
219        batch_size: Option<usize>,
220        file_io: FileIO,
221        delete_file_loader: CachingDeleteFileLoader,
222        row_group_filtering_enabled: bool,
223        row_selection_enabled: bool,
224        metadata_size_hint: Option<usize>,
225    ) -> Result<ArrowRecordBatchStream> {
226        let should_load_page_index =
227            (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
228
229        let delete_filter_rx =
230            delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema));
231
232        // Migrated tables lack field IDs, requiring us to inspect the schema to choose
233        // between field-ID-based or position-based projection
234        let initial_stream_builder = Self::create_parquet_record_batch_stream_builder(
235            &task.data_file_path,
236            file_io.clone(),
237            should_load_page_index,
238            None,
239            metadata_size_hint,
240            task.file_size_in_bytes,
241        )
242        .await?;
243
244        // Check if Parquet file has embedded field IDs
245        // Corresponds to Java's ParquetSchemaUtil.hasIds()
246        // Reference: parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java:118
247        let missing_field_ids = initial_stream_builder
248            .schema()
249            .fields()
250            .iter()
251            .next()
252            .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
253
254        // Three-branch schema resolution strategy matching Java's ReadConf constructor
255        //
256        // Per Iceberg spec Column Projection rules:
257        // "Columns in Iceberg data files are selected by field id. The table schema's column
258        //  names and order may change after a data file is written, and projection must be done
259        //  using field ids."
260        // https://iceberg.apache.org/spec/#column-projection
261        //
262        // When Parquet files lack field IDs (e.g., Hive/Spark migrations via add_files),
263        // we must assign field IDs BEFORE reading data to enable correct projection.
264        //
265        // Java's ReadConf determines field ID strategy:
266        // - Branch 1: hasIds(fileSchema) → trust embedded field IDs, use pruneColumns()
267        // - Branch 2: nameMapping present → applyNameMapping(), then pruneColumns()
268        // - Branch 3: fallback → addFallbackIds(), then pruneColumnsFallback()
269        let mut record_batch_stream_builder = if missing_field_ids {
270            // Parquet file lacks field IDs - must assign them before reading
271            let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
272                // Branch 2: Apply name mapping to assign correct Iceberg field IDs
273                // Per spec rule #2: "Use schema.name-mapping.default metadata to map field id
274                // to columns without field id"
275                // Corresponds to Java's ParquetSchemaUtil.applyNameMapping()
276                apply_name_mapping_to_arrow_schema(
277                    Arc::clone(initial_stream_builder.schema()),
278                    name_mapping,
279                )?
280            } else {
281                // Branch 3: No name mapping - use position-based fallback IDs
282                // Corresponds to Java's ParquetSchemaUtil.addFallbackIds()
283                add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema())
284            };
285
286            let options = ArrowReaderOptions::new().with_schema(arrow_schema);
287
288            Self::create_parquet_record_batch_stream_builder(
289                &task.data_file_path,
290                file_io.clone(),
291                should_load_page_index,
292                Some(options),
293                metadata_size_hint,
294                task.file_size_in_bytes,
295            )
296            .await?
297        } else {
298            // Branch 1: File has embedded field IDs - trust them
299            initial_stream_builder
300        };
301
302        // Filter out metadata fields for Parquet projection (they don't exist in files)
303        let project_field_ids_without_metadata: Vec<i32> = task
304            .project_field_ids
305            .iter()
306            .filter(|&&id| !is_metadata_field(id))
307            .copied()
308            .collect();
309
310        // Create projection mask based on field IDs
311        // - If file has embedded IDs: field-ID-based projection (missing_field_ids=false)
312        // - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match)
313        // - If fallback IDs: position-based projection (missing_field_ids=true)
314        let projection_mask = Self::get_arrow_projection_mask(
315            &project_field_ids_without_metadata,
316            &task.schema,
317            record_batch_stream_builder.parquet_schema(),
318            record_batch_stream_builder.schema(),
319            missing_field_ids, // Whether to use position-based (true) or field-ID-based (false) projection
320        )?;
321
322        record_batch_stream_builder =
323            record_batch_stream_builder.with_projection(projection_mask.clone());
324
325        // RecordBatchTransformer performs any transformations required on the RecordBatches
326        // that come back from the file, such as type promotion, default column insertion,
327        // column re-ordering, partition constants, and virtual field addition (like _file)
328        let mut record_batch_transformer_builder =
329            RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
330
331        // Add the _file metadata column if it's in the projected fields
332        if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
333            let file_datum = Datum::string(task.data_file_path.clone());
334            record_batch_transformer_builder =
335                record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum);
336        }
337
338        if let (Some(partition_spec), Some(partition_data)) =
339            (task.partition_spec.clone(), task.partition.clone())
340        {
341            record_batch_transformer_builder =
342                record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
343        }
344
345        let mut record_batch_transformer = record_batch_transformer_builder.build();
346
347        if let Some(batch_size) = batch_size {
348            record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
349        }
350
351        let delete_filter = delete_filter_rx.await.unwrap()?;
352        let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
353
354        // In addition to the optional predicate supplied in the `FileScanTask`,
355        // we also have an optional predicate resulting from equality delete files.
356        // If both are present, we logical-AND them together to form a single filter
357        // predicate that we can pass to the `RecordBatchStreamBuilder`.
358        let final_predicate = match (&task.predicate, delete_predicate) {
359            (None, None) => None,
360            (Some(predicate), None) => Some(predicate.clone()),
361            (None, Some(ref predicate)) => Some(predicate.clone()),
362            (Some(filter_predicate), Some(delete_predicate)) => {
363                Some(filter_predicate.clone().and(delete_predicate))
364            }
365        };
366
367        // There are three possible sources for potential lists of selected RowGroup indices,
368        // and two for `RowSelection`s.
369        // Selected RowGroup index lists can come from three sources:
370        //   * When task.start and task.length specify a byte range (file splitting);
371        //   * When there are equality delete files that are applicable;
372        //   * When there is a scan predicate and row_group_filtering_enabled = true.
373        // `RowSelection`s can be created in either or both of the following cases:
374        //   * When there are positional delete files that are applicable;
375        //   * When there is a scan predicate and row_selection_enabled = true
376        // Note that row group filtering from predicates only happens when
377        // there is a scan predicate AND row_group_filtering_enabled = true,
378        // but we perform row selection filtering if there are applicable
379        // equality delete files OR (there is a scan predicate AND row_selection_enabled),
380        // since the only implemented method of applying positional deletes is
381        // by using a `RowSelection`.
382        let mut selected_row_group_indices = None;
383        let mut row_selection = None;
384
385        // Filter row groups based on byte range from task.start and task.length.
386        // If both start and length are 0, read the entire file (backwards compatibility).
387        if task.start != 0 || task.length != 0 {
388            let byte_range_filtered_row_groups = Self::filter_row_groups_by_byte_range(
389                record_batch_stream_builder.metadata(),
390                task.start,
391                task.length,
392            )?;
393            selected_row_group_indices = Some(byte_range_filtered_row_groups);
394        }
395
396        if let Some(predicate) = final_predicate {
397            let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
398                record_batch_stream_builder.parquet_schema(),
399                &predicate,
400            )?;
401
402            let row_filter = Self::get_row_filter(
403                &predicate,
404                record_batch_stream_builder.parquet_schema(),
405                &iceberg_field_ids,
406                &field_id_map,
407            )?;
408            record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
409
410            if row_group_filtering_enabled {
411                let predicate_filtered_row_groups = Self::get_selected_row_group_indices(
412                    &predicate,
413                    record_batch_stream_builder.metadata(),
414                    &field_id_map,
415                    &task.schema,
416                )?;
417
418                // Merge predicate-based filtering with byte range filtering (if present)
419                // by taking the intersection of both filters
420                selected_row_group_indices = match selected_row_group_indices {
421                    Some(byte_range_filtered) => {
422                        // Keep only row groups that are in both filters
423                        let intersection: Vec<usize> = byte_range_filtered
424                            .into_iter()
425                            .filter(|idx| predicate_filtered_row_groups.contains(idx))
426                            .collect();
427                        Some(intersection)
428                    }
429                    None => Some(predicate_filtered_row_groups),
430                };
431            }
432
433            if row_selection_enabled {
434                row_selection = Some(Self::get_row_selection_for_filter_predicate(
435                    &predicate,
436                    record_batch_stream_builder.metadata(),
437                    &selected_row_group_indices,
438                    &field_id_map,
439                    &task.schema,
440                )?);
441            }
442        }
443
444        let positional_delete_indexes = delete_filter.get_delete_vector(&task);
445
446        if let Some(positional_delete_indexes) = positional_delete_indexes {
447            let delete_row_selection = {
448                let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
449
450                Self::build_deletes_row_selection(
451                    record_batch_stream_builder.metadata().row_groups(),
452                    &selected_row_group_indices,
453                    &positional_delete_indexes,
454                )
455            }?;
456
457            // merge the row selection from the delete files with the row selection
458            // from the filter predicate, if there is one from the filter predicate
459            row_selection = match row_selection {
460                None => Some(delete_row_selection),
461                Some(filter_row_selection) => {
462                    Some(filter_row_selection.intersection(&delete_row_selection))
463                }
464            };
465        }
466
467        if let Some(row_selection) = row_selection {
468            record_batch_stream_builder =
469                record_batch_stream_builder.with_row_selection(row_selection);
470        }
471
472        if let Some(selected_row_group_indices) = selected_row_group_indices {
473            record_batch_stream_builder =
474                record_batch_stream_builder.with_row_groups(selected_row_group_indices);
475        }
476
477        // Build the batch stream and send all the RecordBatches that it generates
478        // to the requester.
479        let record_batch_stream =
480            record_batch_stream_builder
481                .build()?
482                .map(move |batch| match batch {
483                    Ok(batch) => {
484                        // Process the record batch (type promotion, column reordering, virtual fields, etc.)
485                        record_batch_transformer.process_record_batch(batch)
486                    }
487                    Err(err) => Err(err.into()),
488                });
489
490        Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
491    }
492
493    pub(crate) async fn create_parquet_record_batch_stream_builder(
494        data_file_path: &str,
495        file_io: FileIO,
496        should_load_page_index: bool,
497        arrow_reader_options: Option<ArrowReaderOptions>,
498        metadata_size_hint: Option<usize>,
499        file_size_in_bytes: u64,
500    ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader>> {
501        // Get the metadata for the Parquet file we need to read and build
502        // a reader for the data within
503        let parquet_file = file_io.new_input(data_file_path)?;
504        let parquet_reader = parquet_file.reader().await?;
505        let mut parquet_file_reader = ArrowFileReader::new(
506            FileMetadata {
507                size: file_size_in_bytes,
508            },
509            parquet_reader,
510        )
511        .with_preload_column_index(true)
512        .with_preload_offset_index(true)
513        .with_preload_page_index(should_load_page_index);
514
515        if let Some(hint) = metadata_size_hint {
516            parquet_file_reader = parquet_file_reader.with_metadata_size_hint(hint);
517        }
518
519        // Create the record batch stream builder, which wraps the parquet file reader
520        let options = arrow_reader_options.unwrap_or_default();
521        let record_batch_stream_builder =
522            ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, options).await?;
523        Ok(record_batch_stream_builder)
524    }
525
526    /// computes a `RowSelection` from positional delete indices.
527    ///
528    /// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated
529    /// as having been deleted by a positional delete, taking into account any row groups that have
530    /// been skipped entirely by the filter predicate
531    fn build_deletes_row_selection(
532        row_group_metadata_list: &[RowGroupMetaData],
533        selected_row_groups: &Option<Vec<usize>>,
534        positional_deletes: &DeleteVector,
535    ) -> Result<RowSelection> {
536        let mut results: Vec<RowSelector> = Vec::new();
537        let mut selected_row_groups_idx = 0;
538        let mut current_row_group_base_idx: u64 = 0;
539        let mut delete_vector_iter = positional_deletes.iter();
540        let mut next_deleted_row_idx_opt = delete_vector_iter.next();
541
542        for (idx, row_group_metadata) in row_group_metadata_list.iter().enumerate() {
543            let row_group_num_rows = row_group_metadata.num_rows() as u64;
544            let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;
545
546            // if row group selection is enabled,
547            if let Some(selected_row_groups) = selected_row_groups {
548                // if we've consumed all the selected row groups, we're done
549                if selected_row_groups_idx == selected_row_groups.len() {
550                    break;
551                }
552
553                if idx == selected_row_groups[selected_row_groups_idx] {
554                    // we're in a selected row group. Increment selected_row_groups_idx
555                    // so that next time around the for loop we're looking for the next
556                    // selected row group
557                    selected_row_groups_idx += 1;
558                } else {
559                    // Advance iterator past all deletes in the skipped row group.
560                    // advance_to() positions the iterator to the first delete >= next_row_group_base_idx.
561                    // However, if our cached next_deleted_row_idx_opt is in the skipped range,
562                    // we need to call next() to update the cache with the newly positioned value.
563                    delete_vector_iter.advance_to(next_row_group_base_idx);
564                    // Only update the cache if the cached value is stale (in the skipped range)
565                    if let Some(cached_idx) = next_deleted_row_idx_opt
566                        && cached_idx < next_row_group_base_idx
567                    {
568                        next_deleted_row_idx_opt = delete_vector_iter.next();
569                    }
570
571                    // still increment the current page base index but then skip to the next row group
572                    // in the file
573                    current_row_group_base_idx += row_group_num_rows;
574                    continue;
575                }
576            }
577
578            let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
579                Some(next_deleted_row_idx) => {
580                    // if the index of the next deleted row is beyond this row group, add a selection for
581                    // the remainder of this row group and skip to the next row group
582                    if next_deleted_row_idx >= next_row_group_base_idx {
583                        results.push(RowSelector::select(row_group_num_rows as usize));
584                        current_row_group_base_idx += row_group_num_rows;
585                        continue;
586                    }
587
588                    next_deleted_row_idx
589                }
590
591                // If there are no more pos deletes, add a selector for the entirety of this row group.
592                _ => {
593                    results.push(RowSelector::select(row_group_num_rows as usize));
594                    current_row_group_base_idx += row_group_num_rows;
595                    continue;
596                }
597            };
598
599            let mut current_idx = current_row_group_base_idx;
600            'chunks: while next_deleted_row_idx < next_row_group_base_idx {
601                // `select` all rows that precede the next delete index
602                if current_idx < next_deleted_row_idx {
603                    let run_length = next_deleted_row_idx - current_idx;
604                    results.push(RowSelector::select(run_length as usize));
605                    current_idx += run_length;
606                }
607
608                // `skip` all consecutive deleted rows in the current row group
609                let mut run_length = 0;
610                while next_deleted_row_idx == current_idx
611                    && next_deleted_row_idx < next_row_group_base_idx
612                {
613                    run_length += 1;
614                    current_idx += 1;
615
616                    next_deleted_row_idx_opt = delete_vector_iter.next();
617                    next_deleted_row_idx = match next_deleted_row_idx_opt {
618                        Some(next_deleted_row_idx) => next_deleted_row_idx,
619                        _ => {
620                            // We've processed the final positional delete.
621                            // Conclude the skip and then break so that we select the remaining
622                            // rows in the row group and move on to the next row group
623                            results.push(RowSelector::skip(run_length));
624                            break 'chunks;
625                        }
626                    };
627                }
628                if run_length > 0 {
629                    results.push(RowSelector::skip(run_length));
630                }
631            }
632
633            if current_idx < next_row_group_base_idx {
634                results.push(RowSelector::select(
635                    (next_row_group_base_idx - current_idx) as usize,
636                ));
637            }
638
639            current_row_group_base_idx += row_group_num_rows;
640        }
641
642        Ok(results.into())
643    }
644
645    fn build_field_id_set_and_map(
646        parquet_schema: &SchemaDescriptor,
647        predicate: &BoundPredicate,
648    ) -> Result<(HashSet<i32>, HashMap<i32, usize>)> {
649        // Collects all Iceberg field IDs referenced in the filter predicate
650        let mut collector = CollectFieldIdVisitor {
651            field_ids: HashSet::default(),
652        };
653        visit(&mut collector, predicate)?;
654
655        let iceberg_field_ids = collector.field_ids();
656
657        // Without embedded field IDs, we fall back to position-based mapping for compatibility
658        let field_id_map = match build_field_id_map(parquet_schema)? {
659            Some(map) => map,
660            None => build_fallback_field_id_map(parquet_schema),
661        };
662
663        Ok((iceberg_field_ids, field_id_map))
664    }
665
666    /// Recursively extract leaf field IDs because Parquet projection works at the leaf column level.
667    /// Nested types (struct/list/map) are flattened in Parquet's columnar format.
668    fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
669        match field.field_type.as_ref() {
670            Type::Primitive(_) => {
671                field_ids.push(field.id);
672            }
673            Type::Struct(struct_type) => {
674                for nested_field in struct_type.fields() {
675                    Self::include_leaf_field_id(nested_field, field_ids);
676                }
677            }
678            Type::List(list_type) => {
679                Self::include_leaf_field_id(&list_type.element_field, field_ids);
680            }
681            Type::Map(map_type) => {
682                Self::include_leaf_field_id(&map_type.key_field, field_ids);
683                Self::include_leaf_field_id(&map_type.value_field, field_ids);
684            }
685        }
686    }
687
688    fn get_arrow_projection_mask(
689        field_ids: &[i32],
690        iceberg_schema_of_task: &Schema,
691        parquet_schema: &SchemaDescriptor,
692        arrow_schema: &ArrowSchemaRef,
693        use_fallback: bool, // Whether file lacks embedded field IDs (e.g., migrated from Hive/Spark)
694    ) -> Result<ProjectionMask> {
695        fn type_promotion_is_valid(
696            file_type: Option<&PrimitiveType>,
697            projected_type: Option<&PrimitiveType>,
698        ) -> bool {
699            match (file_type, projected_type) {
700                (Some(lhs), Some(rhs)) if lhs == rhs => true,
701                (Some(PrimitiveType::Int), Some(PrimitiveType::Long)) => true,
702                (Some(PrimitiveType::Float), Some(PrimitiveType::Double)) => true,
703                (
704                    Some(PrimitiveType::Decimal {
705                        precision: file_precision,
706                        scale: file_scale,
707                    }),
708                    Some(PrimitiveType::Decimal {
709                        precision: requested_precision,
710                        scale: requested_scale,
711                    }),
712                ) if requested_precision >= file_precision && file_scale == requested_scale => true,
713                // Uuid will be store as Fixed(16) in parquet file, so the read back type will be Fixed(16).
714                (Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true,
715                _ => false,
716            }
717        }
718
719        if field_ids.is_empty() {
720            return Ok(ProjectionMask::all());
721        }
722
723        if use_fallback {
724            // Position-based projection necessary because file lacks embedded field IDs
725            Self::get_arrow_projection_mask_fallback(field_ids, parquet_schema)
726        } else {
727            // Field-ID-based projection using embedded field IDs from Parquet metadata
728
729            // Parquet's columnar format requires leaf-level (not top-level struct/list/map) projection
730            let mut leaf_field_ids = vec![];
731            for field_id in field_ids {
732                let field = iceberg_schema_of_task.field_by_id(*field_id);
733                if let Some(field) = field {
734                    Self::include_leaf_field_id(field, &mut leaf_field_ids);
735                }
736            }
737
738            Self::get_arrow_projection_mask_with_field_ids(
739                &leaf_field_ids,
740                iceberg_schema_of_task,
741                parquet_schema,
742                arrow_schema,
743                type_promotion_is_valid,
744            )
745        }
746    }
747
748    /// Standard projection using embedded field IDs from Parquet metadata.
749    /// For iceberg-java compatibility with ParquetSchemaUtil.pruneColumns().
750    fn get_arrow_projection_mask_with_field_ids(
751        leaf_field_ids: &[i32],
752        iceberg_schema_of_task: &Schema,
753        parquet_schema: &SchemaDescriptor,
754        arrow_schema: &ArrowSchemaRef,
755        type_promotion_is_valid: fn(Option<&PrimitiveType>, Option<&PrimitiveType>) -> bool,
756    ) -> Result<ProjectionMask> {
757        let mut column_map = HashMap::new();
758        let fields = arrow_schema.fields();
759
760        // Pre-project only the fields that have been selected, possibly avoiding converting
761        // some Arrow types that are not yet supported.
762        let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
763        let projected_arrow_schema = ArrowSchema::new_with_metadata(
764            fields.filter_leaves(|_, f| {
765                f.metadata()
766                    .get(PARQUET_FIELD_ID_META_KEY)
767                    .and_then(|field_id| i32::from_str(field_id).ok())
768                    .is_some_and(|field_id| {
769                        projected_fields.insert((*f).clone(), field_id);
770                        leaf_field_ids.contains(&field_id)
771                    })
772            }),
773            arrow_schema.metadata().clone(),
774        );
775        let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
776
777        fields.filter_leaves(|idx, field| {
778            let Some(field_id) = projected_fields.get(field).cloned() else {
779                return false;
780            };
781
782            let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
783            let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
784
785            if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
786                return false;
787            }
788
789            if !type_promotion_is_valid(
790                parquet_iceberg_field
791                    .unwrap()
792                    .field_type
793                    .as_primitive_type(),
794                iceberg_field.unwrap().field_type.as_primitive_type(),
795            ) {
796                return false;
797            }
798
799            column_map.insert(field_id, idx);
800            true
801        });
802
803        // Schema evolution: New columns may not exist in old Parquet files.
804        // We only project existing columns; RecordBatchTransformer adds default/NULL values.
805        let mut indices = vec![];
806        for field_id in leaf_field_ids {
807            if let Some(col_idx) = column_map.get(field_id) {
808                indices.push(*col_idx);
809            }
810        }
811
812        if indices.is_empty() {
813            // Edge case: All requested columns are new (don't exist in file).
814            // Project all columns so RecordBatchTransformer has a batch to transform.
815            Ok(ProjectionMask::all())
816        } else {
817            Ok(ProjectionMask::leaves(parquet_schema, indices))
818        }
819    }
820
821    /// Fallback projection for Parquet files without field IDs.
822    /// Uses position-based matching: field ID N → column position N-1.
823    /// Projects entire top-level columns (including nested content) for iceberg-java compatibility.
824    fn get_arrow_projection_mask_fallback(
825        field_ids: &[i32],
826        parquet_schema: &SchemaDescriptor,
827    ) -> Result<ProjectionMask> {
828        // Position-based: field_id N → column N-1 (field IDs are 1-indexed)
829        let parquet_root_fields = parquet_schema.root_schema().get_fields();
830        let mut root_indices = vec![];
831
832        for field_id in field_ids.iter() {
833            let parquet_pos = (*field_id - 1) as usize;
834
835            if parquet_pos < parquet_root_fields.len() {
836                root_indices.push(parquet_pos);
837            }
838            // RecordBatchTransformer adds missing columns with NULL values
839        }
840
841        if root_indices.is_empty() {
842            Ok(ProjectionMask::all())
843        } else {
844            Ok(ProjectionMask::roots(parquet_schema, root_indices))
845        }
846    }
847
848    fn get_row_filter(
849        predicates: &BoundPredicate,
850        parquet_schema: &SchemaDescriptor,
851        iceberg_field_ids: &HashSet<i32>,
852        field_id_map: &HashMap<i32, usize>,
853    ) -> Result<RowFilter> {
854        // Collect Parquet column indices from field ids.
855        // If the field id is not found in Parquet schema, it will be ignored due to schema evolution.
856        let mut column_indices = iceberg_field_ids
857            .iter()
858            .filter_map(|field_id| field_id_map.get(field_id).cloned())
859            .collect::<Vec<_>>();
860        column_indices.sort();
861
862        // The converter that converts `BoundPredicates` to `ArrowPredicates`
863        let mut converter = PredicateConverter {
864            parquet_schema,
865            column_map: field_id_map,
866            column_indices: &column_indices,
867        };
868
869        // After collecting required leaf column indices used in the predicate,
870        // creates the projection mask for the Arrow predicates.
871        let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
872        let predicate_func = visit(&mut converter, predicates)?;
873        let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
874        Ok(RowFilter::new(vec![Box::new(arrow_predicate)]))
875    }
876
877    fn get_selected_row_group_indices(
878        predicate: &BoundPredicate,
879        parquet_metadata: &Arc<ParquetMetaData>,
880        field_id_map: &HashMap<i32, usize>,
881        snapshot_schema: &Schema,
882    ) -> Result<Vec<usize>> {
883        let row_groups_metadata = parquet_metadata.row_groups();
884        let mut results = Vec::with_capacity(row_groups_metadata.len());
885
886        for (idx, row_group_metadata) in row_groups_metadata.iter().enumerate() {
887            if RowGroupMetricsEvaluator::eval(
888                predicate,
889                row_group_metadata,
890                field_id_map,
891                snapshot_schema,
892            )? {
893                results.push(idx);
894            }
895        }
896
897        Ok(results)
898    }
899
900    fn get_row_selection_for_filter_predicate(
901        predicate: &BoundPredicate,
902        parquet_metadata: &Arc<ParquetMetaData>,
903        selected_row_groups: &Option<Vec<usize>>,
904        field_id_map: &HashMap<i32, usize>,
905        snapshot_schema: &Schema,
906    ) -> Result<RowSelection> {
907        let Some(column_index) = parquet_metadata.column_index() else {
908            return Err(Error::new(
909                ErrorKind::Unexpected,
910                "Parquet file metadata does not contain a column index",
911            ));
912        };
913
914        let Some(offset_index) = parquet_metadata.offset_index() else {
915            return Err(Error::new(
916                ErrorKind::Unexpected,
917                "Parquet file metadata does not contain an offset index",
918            ));
919        };
920
921        // If all row groups were filtered out, return an empty RowSelection (select no rows)
922        if let Some(selected_row_groups) = selected_row_groups
923            && selected_row_groups.is_empty()
924        {
925            return Ok(RowSelection::from(Vec::new()));
926        }
927
928        let mut selected_row_groups_idx = 0;
929
930        let page_index = column_index
931            .iter()
932            .enumerate()
933            .zip(offset_index)
934            .zip(parquet_metadata.row_groups());
935
936        let mut results = Vec::new();
937        for (((idx, column_index), offset_index), row_group_metadata) in page_index {
938            if let Some(selected_row_groups) = selected_row_groups {
939                // skip row groups that aren't present in selected_row_groups
940                if idx == selected_row_groups[selected_row_groups_idx] {
941                    selected_row_groups_idx += 1;
942                } else {
943                    continue;
944                }
945            }
946
947            let selections_for_page = PageIndexEvaluator::eval(
948                predicate,
949                column_index,
950                offset_index,
951                row_group_metadata,
952                field_id_map,
953                snapshot_schema,
954            )?;
955
956            results.push(selections_for_page);
957
958            if let Some(selected_row_groups) = selected_row_groups
959                && selected_row_groups_idx == selected_row_groups.len()
960            {
961                break;
962            }
963        }
964
965        Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
966    }
967
968    /// Filters row groups by byte range to support Iceberg's file splitting.
969    ///
970    /// Iceberg splits large files at row group boundaries, so we only read row groups
971    /// whose byte ranges overlap with [start, start+length).
972    fn filter_row_groups_by_byte_range(
973        parquet_metadata: &Arc<ParquetMetaData>,
974        start: u64,
975        length: u64,
976    ) -> Result<Vec<usize>> {
977        let row_groups = parquet_metadata.row_groups();
978        let mut selected = Vec::new();
979        let end = start + length;
980
981        // Row groups are stored sequentially after the 4-byte magic header.
982        let mut current_byte_offset = 4u64;
983
984        for (idx, row_group) in row_groups.iter().enumerate() {
985            let row_group_size = row_group.compressed_size() as u64;
986            let row_group_end = current_byte_offset + row_group_size;
987
988            if current_byte_offset < end && start < row_group_end {
989                selected.push(idx);
990            }
991
992            current_byte_offset = row_group_end;
993        }
994
995        Ok(selected)
996    }
997}
998
999/// Build the map of parquet field id to Parquet column index in the schema.
1000/// Returns None if the Parquet file doesn't have field IDs embedded (e.g., migrated tables).
1001fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<Option<HashMap<i32, usize>>> {
1002    let mut column_map = HashMap::new();
1003
1004    for (idx, field) in parquet_schema.columns().iter().enumerate() {
1005        let field_type = field.self_type();
1006        match field_type {
1007            ParquetType::PrimitiveType { basic_info, .. } => {
1008                if !basic_info.has_id() {
1009                    return Ok(None);
1010                }
1011                column_map.insert(basic_info.id(), idx);
1012            }
1013            ParquetType::GroupType { .. } => {
1014                return Err(Error::new(
1015                    ErrorKind::DataInvalid,
1016                    format!(
1017                        "Leave column in schema should be primitive type but got {field_type:?}"
1018                    ),
1019                ));
1020            }
1021        };
1022    }
1023
1024    Ok(Some(column_map))
1025}
1026
1027/// Build a fallback field ID map for Parquet files without embedded field IDs.
1028/// Position-based (1, 2, 3, ...) for compatibility with iceberg-java migrations.
1029fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap<i32, usize> {
1030    let mut column_map = HashMap::new();
1031
1032    // 1-indexed to match iceberg-java's convention
1033    for (idx, _field) in parquet_schema.columns().iter().enumerate() {
1034        let field_id = (idx + 1) as i32;
1035        column_map.insert(field_id, idx);
1036    }
1037
1038    column_map
1039}
1040
1041/// Apply name mapping to Arrow schema for Parquet files lacking field IDs.
1042///
1043/// Assigns Iceberg field IDs based on column names using the name mapping,
1044/// enabling correct projection on migrated files (e.g., from Hive/Spark via add_files).
1045///
1046/// Per Iceberg spec Column Projection rule #2:
1047/// "Use schema.name-mapping.default metadata to map field id to columns without field id"
1048/// https://iceberg.apache.org/spec/#column-projection
1049///
1050/// Corresponds to Java's ParquetSchemaUtil.applyNameMapping() and ApplyNameMapping visitor.
1051/// The key difference is Java operates on Parquet MessageType, while we operate on Arrow Schema.
1052///
1053/// # Arguments
1054/// * `arrow_schema` - Arrow schema from Parquet file (without field IDs)
1055/// * `name_mapping` - Name mapping from table metadata (TableProperties.DEFAULT_NAME_MAPPING)
1056///
1057/// # Returns
1058/// Arrow schema with field IDs assigned based on name mapping
1059fn apply_name_mapping_to_arrow_schema(
1060    arrow_schema: ArrowSchemaRef,
1061    name_mapping: &NameMapping,
1062) -> Result<Arc<ArrowSchema>> {
1063    debug_assert!(
1064        arrow_schema
1065            .fields()
1066            .iter()
1067            .next()
1068            .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1069        "Schema already has field IDs - name mapping should not be applied"
1070    );
1071
1072    use arrow_schema::Field;
1073
1074    let fields_with_mapped_ids: Vec<_> = arrow_schema
1075        .fields()
1076        .iter()
1077        .map(|field| {
1078            // Look up this column name in name mapping to get the Iceberg field ID.
1079            // Corresponds to Java's ApplyNameMapping visitor which calls
1080            // nameMapping.find(currentPath()) and returns field.withId() if found.
1081            //
1082            // If the field isn't in the mapping, leave it WITHOUT assigning an ID
1083            // (matching Java's behavior of returning the field unchanged).
1084            // Later, during projection, fields without IDs are filtered out.
1085            let mapped_field_opt = name_mapping
1086                .fields()
1087                .iter()
1088                .find(|f| f.names().contains(&field.name().to_string()));
1089
1090            let mut metadata = field.metadata().clone();
1091
1092            if let Some(mapped_field) = mapped_field_opt
1093                && let Some(field_id) = mapped_field.field_id()
1094            {
1095                // Field found in mapping with a field_id → assign it
1096                metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1097            }
1098            // If field_id is None, leave the field without an ID (will be filtered by projection)
1099
1100            Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1101                .with_metadata(metadata)
1102        })
1103        .collect();
1104
1105    Ok(Arc::new(ArrowSchema::new_with_metadata(
1106        fields_with_mapped_ids,
1107        arrow_schema.metadata().clone(),
1108    )))
1109}
1110
1111/// Add position-based fallback field IDs to Arrow schema for Parquet files lacking them.
1112/// Enables projection on migrated files (e.g., from Hive/Spark).
1113///
1114/// Why at schema level (not per-batch): Efficiency - avoids repeated schema modification.
1115/// Why only top-level: Nested projection uses leaf column indices, not parent struct IDs.
1116/// Why 1-indexed: Compatibility with iceberg-java's ParquetSchemaUtil.addFallbackIds().
1117fn add_fallback_field_ids_to_arrow_schema(arrow_schema: &ArrowSchemaRef) -> Arc<ArrowSchema> {
1118    debug_assert!(
1119        arrow_schema
1120            .fields()
1121            .iter()
1122            .next()
1123            .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1124        "Schema already has field IDs"
1125    );
1126
1127    use arrow_schema::Field;
1128
1129    let fields_with_fallback_ids: Vec<_> = arrow_schema
1130        .fields()
1131        .iter()
1132        .enumerate()
1133        .map(|(pos, field)| {
1134            let mut metadata = field.metadata().clone();
1135            let field_id = (pos + 1) as i32; // 1-indexed for Java compatibility
1136            metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1137
1138            Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1139                .with_metadata(metadata)
1140        })
1141        .collect();
1142
1143    Arc::new(ArrowSchema::new_with_metadata(
1144        fields_with_fallback_ids,
1145        arrow_schema.metadata().clone(),
1146    ))
1147}
1148
1149/// A visitor to collect field ids from bound predicates.
1150struct CollectFieldIdVisitor {
1151    field_ids: HashSet<i32>,
1152}
1153
1154impl CollectFieldIdVisitor {
1155    fn field_ids(self) -> HashSet<i32> {
1156        self.field_ids
1157    }
1158}
1159
1160impl BoundPredicateVisitor for CollectFieldIdVisitor {
1161    type T = ();
1162
1163    fn always_true(&mut self) -> Result<()> {
1164        Ok(())
1165    }
1166
1167    fn always_false(&mut self) -> Result<()> {
1168        Ok(())
1169    }
1170
1171    fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1172        Ok(())
1173    }
1174
1175    fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1176        Ok(())
1177    }
1178
1179    fn not(&mut self, _inner: ()) -> Result<()> {
1180        Ok(())
1181    }
1182
1183    fn is_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1184        self.field_ids.insert(reference.field().id);
1185        Ok(())
1186    }
1187
1188    fn not_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1189        self.field_ids.insert(reference.field().id);
1190        Ok(())
1191    }
1192
1193    fn is_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1194        self.field_ids.insert(reference.field().id);
1195        Ok(())
1196    }
1197
1198    fn not_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1199        self.field_ids.insert(reference.field().id);
1200        Ok(())
1201    }
1202
1203    fn less_than(
1204        &mut self,
1205        reference: &BoundReference,
1206        _literal: &Datum,
1207        _predicate: &BoundPredicate,
1208    ) -> Result<()> {
1209        self.field_ids.insert(reference.field().id);
1210        Ok(())
1211    }
1212
1213    fn less_than_or_eq(
1214        &mut self,
1215        reference: &BoundReference,
1216        _literal: &Datum,
1217        _predicate: &BoundPredicate,
1218    ) -> Result<()> {
1219        self.field_ids.insert(reference.field().id);
1220        Ok(())
1221    }
1222
1223    fn greater_than(
1224        &mut self,
1225        reference: &BoundReference,
1226        _literal: &Datum,
1227        _predicate: &BoundPredicate,
1228    ) -> Result<()> {
1229        self.field_ids.insert(reference.field().id);
1230        Ok(())
1231    }
1232
1233    fn greater_than_or_eq(
1234        &mut self,
1235        reference: &BoundReference,
1236        _literal: &Datum,
1237        _predicate: &BoundPredicate,
1238    ) -> Result<()> {
1239        self.field_ids.insert(reference.field().id);
1240        Ok(())
1241    }
1242
1243    fn eq(
1244        &mut self,
1245        reference: &BoundReference,
1246        _literal: &Datum,
1247        _predicate: &BoundPredicate,
1248    ) -> Result<()> {
1249        self.field_ids.insert(reference.field().id);
1250        Ok(())
1251    }
1252
1253    fn not_eq(
1254        &mut self,
1255        reference: &BoundReference,
1256        _literal: &Datum,
1257        _predicate: &BoundPredicate,
1258    ) -> Result<()> {
1259        self.field_ids.insert(reference.field().id);
1260        Ok(())
1261    }
1262
1263    fn starts_with(
1264        &mut self,
1265        reference: &BoundReference,
1266        _literal: &Datum,
1267        _predicate: &BoundPredicate,
1268    ) -> Result<()> {
1269        self.field_ids.insert(reference.field().id);
1270        Ok(())
1271    }
1272
1273    fn not_starts_with(
1274        &mut self,
1275        reference: &BoundReference,
1276        _literal: &Datum,
1277        _predicate: &BoundPredicate,
1278    ) -> Result<()> {
1279        self.field_ids.insert(reference.field().id);
1280        Ok(())
1281    }
1282
1283    fn r#in(
1284        &mut self,
1285        reference: &BoundReference,
1286        _literals: &FnvHashSet<Datum>,
1287        _predicate: &BoundPredicate,
1288    ) -> Result<()> {
1289        self.field_ids.insert(reference.field().id);
1290        Ok(())
1291    }
1292
1293    fn not_in(
1294        &mut self,
1295        reference: &BoundReference,
1296        _literals: &FnvHashSet<Datum>,
1297        _predicate: &BoundPredicate,
1298    ) -> Result<()> {
1299        self.field_ids.insert(reference.field().id);
1300        Ok(())
1301    }
1302}
1303
1304/// A visitor to convert Iceberg bound predicates to Arrow predicates.
1305struct PredicateConverter<'a> {
1306    /// The Parquet schema descriptor.
1307    pub parquet_schema: &'a SchemaDescriptor,
1308    /// The map between field id and leaf column index in Parquet schema.
1309    pub column_map: &'a HashMap<i32, usize>,
1310    /// The required column indices in Parquet schema for the predicates.
1311    pub column_indices: &'a Vec<usize>,
1312}
1313
1314impl PredicateConverter<'_> {
1315    /// When visiting a bound reference, we return index of the leaf column in the
1316    /// required column indices which is used to project the column in the record batch.
1317    /// Return None if the field id is not found in the column map, which is possible
1318    /// due to schema evolution.
1319    fn bound_reference(&mut self, reference: &BoundReference) -> Result<Option<usize>> {
1320        // The leaf column's index in Parquet schema.
1321        if let Some(column_idx) = self.column_map.get(&reference.field().id) {
1322            if self.parquet_schema.get_column_root(*column_idx).is_group() {
1323                return Err(Error::new(
1324                    ErrorKind::DataInvalid,
1325                    format!(
1326                        "Leave column `{}` in predicates isn't a root column in Parquet schema.",
1327                        reference.field().name
1328                    ),
1329                ));
1330            }
1331
1332            // The leaf column's index in the required column indices.
1333            let index = self
1334                .column_indices
1335                .iter()
1336                .position(|&idx| idx == *column_idx)
1337                .ok_or(Error::new(
1338                    ErrorKind::DataInvalid,
1339                    format!(
1340                "Leave column `{}` in predicates cannot be found in the required column indices.",
1341                reference.field().name
1342            ),
1343                ))?;
1344
1345            Ok(Some(index))
1346        } else {
1347            Ok(None)
1348        }
1349    }
1350
1351    /// Build an Arrow predicate that always returns true.
1352    fn build_always_true(&self) -> Result<Box<PredicateResult>> {
1353        Ok(Box::new(|batch| {
1354            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
1355        }))
1356    }
1357
1358    /// Build an Arrow predicate that always returns false.
1359    fn build_always_false(&self) -> Result<Box<PredicateResult>> {
1360        Ok(Box::new(|batch| {
1361            Ok(BooleanArray::from(vec![false; batch.num_rows()]))
1362        }))
1363    }
1364}
1365
1366/// Gets the leaf column from the record batch for the required column index. Only
1367/// supports top-level columns for now.
1368fn project_column(
1369    batch: &RecordBatch,
1370    column_idx: usize,
1371) -> std::result::Result<ArrayRef, ArrowError> {
1372    let column = batch.column(column_idx);
1373
1374    match column.data_type() {
1375        DataType::Struct(_) => Err(ArrowError::SchemaError(
1376            "Does not support struct column yet.".to_string(),
1377        )),
1378        _ => Ok(column.clone()),
1379    }
1380}
1381
1382type PredicateResult =
1383    dyn FnMut(RecordBatch) -> std::result::Result<BooleanArray, ArrowError> + Send + 'static;
1384
1385impl BoundPredicateVisitor for PredicateConverter<'_> {
1386    type T = Box<PredicateResult>;
1387
1388    fn always_true(&mut self) -> Result<Box<PredicateResult>> {
1389        self.build_always_true()
1390    }
1391
1392    fn always_false(&mut self) -> Result<Box<PredicateResult>> {
1393        self.build_always_false()
1394    }
1395
1396    fn and(
1397        &mut self,
1398        mut lhs: Box<PredicateResult>,
1399        mut rhs: Box<PredicateResult>,
1400    ) -> Result<Box<PredicateResult>> {
1401        Ok(Box::new(move |batch| {
1402            let left = lhs(batch.clone())?;
1403            let right = rhs(batch)?;
1404            and_kleene(&left, &right)
1405        }))
1406    }
1407
1408    fn or(
1409        &mut self,
1410        mut lhs: Box<PredicateResult>,
1411        mut rhs: Box<PredicateResult>,
1412    ) -> Result<Box<PredicateResult>> {
1413        Ok(Box::new(move |batch| {
1414            let left = lhs(batch.clone())?;
1415            let right = rhs(batch)?;
1416            or_kleene(&left, &right)
1417        }))
1418    }
1419
1420    fn not(&mut self, mut inner: Box<PredicateResult>) -> Result<Box<PredicateResult>> {
1421        Ok(Box::new(move |batch| {
1422            let pred_ret = inner(batch)?;
1423            not(&pred_ret)
1424        }))
1425    }
1426
1427    fn is_null(
1428        &mut self,
1429        reference: &BoundReference,
1430        _predicate: &BoundPredicate,
1431    ) -> Result<Box<PredicateResult>> {
1432        if let Some(idx) = self.bound_reference(reference)? {
1433            Ok(Box::new(move |batch| {
1434                let column = project_column(&batch, idx)?;
1435                is_null(&column)
1436            }))
1437        } else {
1438            // A missing column, treating it as null.
1439            self.build_always_true()
1440        }
1441    }
1442
1443    fn not_null(
1444        &mut self,
1445        reference: &BoundReference,
1446        _predicate: &BoundPredicate,
1447    ) -> Result<Box<PredicateResult>> {
1448        if let Some(idx) = self.bound_reference(reference)? {
1449            Ok(Box::new(move |batch| {
1450                let column = project_column(&batch, idx)?;
1451                is_not_null(&column)
1452            }))
1453        } else {
1454            // A missing column, treating it as null.
1455            self.build_always_false()
1456        }
1457    }
1458
1459    fn is_nan(
1460        &mut self,
1461        reference: &BoundReference,
1462        _predicate: &BoundPredicate,
1463    ) -> Result<Box<PredicateResult>> {
1464        if self.bound_reference(reference)?.is_some() {
1465            self.build_always_true()
1466        } else {
1467            // A missing column, treating it as null.
1468            self.build_always_false()
1469        }
1470    }
1471
1472    fn not_nan(
1473        &mut self,
1474        reference: &BoundReference,
1475        _predicate: &BoundPredicate,
1476    ) -> Result<Box<PredicateResult>> {
1477        if self.bound_reference(reference)?.is_some() {
1478            self.build_always_false()
1479        } else {
1480            // A missing column, treating it as null.
1481            self.build_always_true()
1482        }
1483    }
1484
1485    fn less_than(
1486        &mut self,
1487        reference: &BoundReference,
1488        literal: &Datum,
1489        _predicate: &BoundPredicate,
1490    ) -> Result<Box<PredicateResult>> {
1491        if let Some(idx) = self.bound_reference(reference)? {
1492            let literal = get_arrow_datum(literal)?;
1493
1494            Ok(Box::new(move |batch| {
1495                let left = project_column(&batch, idx)?;
1496                let literal = try_cast_literal(&literal, left.data_type())?;
1497                lt(&left, literal.as_ref())
1498            }))
1499        } else {
1500            // A missing column, treating it as null.
1501            self.build_always_true()
1502        }
1503    }
1504
1505    fn less_than_or_eq(
1506        &mut self,
1507        reference: &BoundReference,
1508        literal: &Datum,
1509        _predicate: &BoundPredicate,
1510    ) -> Result<Box<PredicateResult>> {
1511        if let Some(idx) = self.bound_reference(reference)? {
1512            let literal = get_arrow_datum(literal)?;
1513
1514            Ok(Box::new(move |batch| {
1515                let left = project_column(&batch, idx)?;
1516                let literal = try_cast_literal(&literal, left.data_type())?;
1517                lt_eq(&left, literal.as_ref())
1518            }))
1519        } else {
1520            // A missing column, treating it as null.
1521            self.build_always_true()
1522        }
1523    }
1524
1525    fn greater_than(
1526        &mut self,
1527        reference: &BoundReference,
1528        literal: &Datum,
1529        _predicate: &BoundPredicate,
1530    ) -> Result<Box<PredicateResult>> {
1531        if let Some(idx) = self.bound_reference(reference)? {
1532            let literal = get_arrow_datum(literal)?;
1533
1534            Ok(Box::new(move |batch| {
1535                let left = project_column(&batch, idx)?;
1536                let literal = try_cast_literal(&literal, left.data_type())?;
1537                gt(&left, literal.as_ref())
1538            }))
1539        } else {
1540            // A missing column, treating it as null.
1541            self.build_always_false()
1542        }
1543    }
1544
1545    fn greater_than_or_eq(
1546        &mut self,
1547        reference: &BoundReference,
1548        literal: &Datum,
1549        _predicate: &BoundPredicate,
1550    ) -> Result<Box<PredicateResult>> {
1551        if let Some(idx) = self.bound_reference(reference)? {
1552            let literal = get_arrow_datum(literal)?;
1553
1554            Ok(Box::new(move |batch| {
1555                let left = project_column(&batch, idx)?;
1556                let literal = try_cast_literal(&literal, left.data_type())?;
1557                gt_eq(&left, literal.as_ref())
1558            }))
1559        } else {
1560            // A missing column, treating it as null.
1561            self.build_always_false()
1562        }
1563    }
1564
1565    fn eq(
1566        &mut self,
1567        reference: &BoundReference,
1568        literal: &Datum,
1569        _predicate: &BoundPredicate,
1570    ) -> Result<Box<PredicateResult>> {
1571        if let Some(idx) = self.bound_reference(reference)? {
1572            let literal = get_arrow_datum(literal)?;
1573
1574            Ok(Box::new(move |batch| {
1575                let left = project_column(&batch, idx)?;
1576                let literal = try_cast_literal(&literal, left.data_type())?;
1577                eq(&left, literal.as_ref())
1578            }))
1579        } else {
1580            // A missing column, treating it as null.
1581            self.build_always_false()
1582        }
1583    }
1584
1585    fn not_eq(
1586        &mut self,
1587        reference: &BoundReference,
1588        literal: &Datum,
1589        _predicate: &BoundPredicate,
1590    ) -> Result<Box<PredicateResult>> {
1591        if let Some(idx) = self.bound_reference(reference)? {
1592            let literal = get_arrow_datum(literal)?;
1593
1594            Ok(Box::new(move |batch| {
1595                let left = project_column(&batch, idx)?;
1596                let literal = try_cast_literal(&literal, left.data_type())?;
1597                neq(&left, literal.as_ref())
1598            }))
1599        } else {
1600            // A missing column, treating it as null.
1601            self.build_always_false()
1602        }
1603    }
1604
1605    fn starts_with(
1606        &mut self,
1607        reference: &BoundReference,
1608        literal: &Datum,
1609        _predicate: &BoundPredicate,
1610    ) -> Result<Box<PredicateResult>> {
1611        if let Some(idx) = self.bound_reference(reference)? {
1612            let literal = get_arrow_datum(literal)?;
1613
1614            Ok(Box::new(move |batch| {
1615                let left = project_column(&batch, idx)?;
1616                let literal = try_cast_literal(&literal, left.data_type())?;
1617                starts_with(&left, literal.as_ref())
1618            }))
1619        } else {
1620            // A missing column, treating it as null.
1621            self.build_always_false()
1622        }
1623    }
1624
1625    fn not_starts_with(
1626        &mut self,
1627        reference: &BoundReference,
1628        literal: &Datum,
1629        _predicate: &BoundPredicate,
1630    ) -> Result<Box<PredicateResult>> {
1631        if let Some(idx) = self.bound_reference(reference)? {
1632            let literal = get_arrow_datum(literal)?;
1633
1634            Ok(Box::new(move |batch| {
1635                let left = project_column(&batch, idx)?;
1636                let literal = try_cast_literal(&literal, left.data_type())?;
1637                // update here if arrow ever adds a native not_starts_with
1638                not(&starts_with(&left, literal.as_ref())?)
1639            }))
1640        } else {
1641            // A missing column, treating it as null.
1642            self.build_always_true()
1643        }
1644    }
1645
1646    fn r#in(
1647        &mut self,
1648        reference: &BoundReference,
1649        literals: &FnvHashSet<Datum>,
1650        _predicate: &BoundPredicate,
1651    ) -> Result<Box<PredicateResult>> {
1652        if let Some(idx) = self.bound_reference(reference)? {
1653            let literals: Vec<_> = literals
1654                .iter()
1655                .map(|lit| get_arrow_datum(lit).unwrap())
1656                .collect();
1657
1658            Ok(Box::new(move |batch| {
1659                // update this if arrow ever adds a native is_in kernel
1660                let left = project_column(&batch, idx)?;
1661
1662                let mut acc = BooleanArray::from(vec![false; batch.num_rows()]);
1663                for literal in &literals {
1664                    let literal = try_cast_literal(literal, left.data_type())?;
1665                    acc = or(&acc, &eq(&left, literal.as_ref())?)?
1666                }
1667
1668                Ok(acc)
1669            }))
1670        } else {
1671            // A missing column, treating it as null.
1672            self.build_always_false()
1673        }
1674    }
1675
1676    fn not_in(
1677        &mut self,
1678        reference: &BoundReference,
1679        literals: &FnvHashSet<Datum>,
1680        _predicate: &BoundPredicate,
1681    ) -> Result<Box<PredicateResult>> {
1682        if let Some(idx) = self.bound_reference(reference)? {
1683            let literals: Vec<_> = literals
1684                .iter()
1685                .map(|lit| get_arrow_datum(lit).unwrap())
1686                .collect();
1687
1688            Ok(Box::new(move |batch| {
1689                // update this if arrow ever adds a native not_in kernel
1690                let left = project_column(&batch, idx)?;
1691                let mut acc = BooleanArray::from(vec![true; batch.num_rows()]);
1692                for literal in &literals {
1693                    let literal = try_cast_literal(literal, left.data_type())?;
1694                    acc = and(&acc, &neq(&left, literal.as_ref())?)?
1695                }
1696
1697                Ok(acc)
1698            }))
1699        } else {
1700            // A missing column, treating it as null.
1701            self.build_always_true()
1702        }
1703    }
1704}
1705
1706/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader.
1707pub struct ArrowFileReader {
1708    meta: FileMetadata,
1709    preload_column_index: bool,
1710    preload_offset_index: bool,
1711    preload_page_index: bool,
1712    metadata_size_hint: Option<usize>,
1713    r: Box<dyn FileRead>,
1714}
1715
1716impl ArrowFileReader {
1717    /// Create a new ArrowFileReader
1718    pub fn new(meta: FileMetadata, r: Box<dyn FileRead>) -> Self {
1719        Self {
1720            meta,
1721            preload_column_index: false,
1722            preload_offset_index: false,
1723            preload_page_index: false,
1724            metadata_size_hint: None,
1725            r,
1726        }
1727    }
1728
1729    /// Enable or disable preloading of the column index
1730    pub fn with_preload_column_index(mut self, preload: bool) -> Self {
1731        self.preload_column_index = preload;
1732        self
1733    }
1734
1735    /// Enable or disable preloading of the offset index
1736    pub fn with_preload_offset_index(mut self, preload: bool) -> Self {
1737        self.preload_offset_index = preload;
1738        self
1739    }
1740
1741    /// Enable or disable preloading of the page index
1742    pub fn with_preload_page_index(mut self, preload: bool) -> Self {
1743        self.preload_page_index = preload;
1744        self
1745    }
1746
1747    /// Provide a hint as to the number of bytes to prefetch for parsing the Parquet metadata
1748    ///
1749    /// This hint can help reduce the number of fetch requests. For more details see the
1750    /// [ParquetMetaDataReader documentation](https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html#method.with_prefetch_hint).
1751    pub fn with_metadata_size_hint(mut self, hint: usize) -> Self {
1752        self.metadata_size_hint = Some(hint);
1753        self
1754    }
1755}
1756
1757impl AsyncFileReader for ArrowFileReader {
1758    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
1759        Box::pin(
1760            self.r
1761                .read(range.start..range.end)
1762                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
1763        )
1764    }
1765
1766    // TODO: currently we don't respect `ArrowReaderOptions` cause it don't expose any method to access the option field
1767    // we will fix it after `v55.1.0` is released in https://github.com/apache/arrow-rs/issues/7393
1768    fn get_metadata(
1769        &mut self,
1770        _options: Option<&'_ ArrowReaderOptions>,
1771    ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
1772        async move {
1773            let reader = ParquetMetaDataReader::new()
1774                .with_prefetch_hint(self.metadata_size_hint)
1775                // Set the page policy first because it updates both column and offset policies.
1776                .with_page_index_policy(PageIndexPolicy::from(self.preload_page_index))
1777                .with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
1778                .with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index));
1779            let size = self.meta.size;
1780            let meta = reader.load_and_finish(self, size).await?;
1781
1782            Ok(Arc::new(meta))
1783        }
1784        .boxed()
1785    }
1786}
1787
1788/// The Arrow type of an array that the Parquet reader reads may not match the exact Arrow type
1789/// that Iceberg uses for literals - but they are effectively the same logical type,
1790/// i.e. LargeUtf8 and Utf8 or Utf8View and Utf8 or Utf8View and LargeUtf8.
1791///
1792/// The Arrow compute kernels that we use must match the type exactly, so first cast the literal
1793/// into the type of the batch we read from Parquet before sending it to the compute kernel.
1794fn try_cast_literal(
1795    literal: &Arc<dyn ArrowDatum + Send + Sync>,
1796    column_type: &DataType,
1797) -> std::result::Result<Arc<dyn ArrowDatum + Send + Sync>, ArrowError> {
1798    let literal_array = literal.get().0;
1799
1800    // No cast required
1801    if literal_array.data_type() == column_type {
1802        return Ok(Arc::clone(literal));
1803    }
1804
1805    let literal_array = cast(literal_array, column_type)?;
1806    Ok(Arc::new(Scalar::new(literal_array)))
1807}
1808
1809#[cfg(test)]
1810mod tests {
1811    use std::collections::{HashMap, HashSet};
1812    use std::fs::File;
1813    use std::sync::Arc;
1814
1815    use arrow_array::cast::AsArray;
1816    use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray};
1817    use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1818    use futures::TryStreamExt;
1819    use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
1820    use parquet::arrow::{ArrowWriter, ProjectionMask};
1821    use parquet::basic::Compression;
1822    use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1823    use parquet::file::properties::WriterProperties;
1824    use parquet::schema::parser::parse_message_type;
1825    use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
1826    use roaring::RoaringTreemap;
1827    use tempfile::TempDir;
1828
1829    use crate::ErrorKind;
1830    use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
1831    use crate::arrow::{ArrowReader, ArrowReaderBuilder};
1832    use crate::delete_vector::DeleteVector;
1833    use crate::expr::visitors::bound_predicate_visitor::visit;
1834    use crate::expr::{Bind, Predicate, Reference};
1835    use crate::io::FileIO;
1836    use crate::scan::{FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream};
1837    use crate::spec::{
1838        DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type,
1839    };
1840
1841    fn table_schema_simple() -> SchemaRef {
1842        Arc::new(
1843            Schema::builder()
1844                .with_schema_id(1)
1845                .with_identifier_field_ids(vec![2])
1846                .with_fields(vec![
1847                    NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1848                    NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1849                    NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1850                    NestedField::optional(4, "qux", Type::Primitive(PrimitiveType::Float)).into(),
1851                ])
1852                .build()
1853                .unwrap(),
1854        )
1855    }
1856
1857    #[test]
1858    fn test_collect_field_id() {
1859        let schema = table_schema_simple();
1860        let expr = Reference::new("qux").is_null();
1861        let bound_expr = expr.bind(schema, true).unwrap();
1862
1863        let mut visitor = CollectFieldIdVisitor {
1864            field_ids: HashSet::default(),
1865        };
1866        visit(&mut visitor, &bound_expr).unwrap();
1867
1868        let mut expected = HashSet::default();
1869        expected.insert(4_i32);
1870
1871        assert_eq!(visitor.field_ids, expected);
1872    }
1873
1874    #[test]
1875    fn test_collect_field_id_with_and() {
1876        let schema = table_schema_simple();
1877        let expr = Reference::new("qux")
1878            .is_null()
1879            .and(Reference::new("baz").is_null());
1880        let bound_expr = expr.bind(schema, true).unwrap();
1881
1882        let mut visitor = CollectFieldIdVisitor {
1883            field_ids: HashSet::default(),
1884        };
1885        visit(&mut visitor, &bound_expr).unwrap();
1886
1887        let mut expected = HashSet::default();
1888        expected.insert(4_i32);
1889        expected.insert(3);
1890
1891        assert_eq!(visitor.field_ids, expected);
1892    }
1893
1894    #[test]
1895    fn test_collect_field_id_with_or() {
1896        let schema = table_schema_simple();
1897        let expr = Reference::new("qux")
1898            .is_null()
1899            .or(Reference::new("baz").is_null());
1900        let bound_expr = expr.bind(schema, true).unwrap();
1901
1902        let mut visitor = CollectFieldIdVisitor {
1903            field_ids: HashSet::default(),
1904        };
1905        visit(&mut visitor, &bound_expr).unwrap();
1906
1907        let mut expected = HashSet::default();
1908        expected.insert(4_i32);
1909        expected.insert(3);
1910
1911        assert_eq!(visitor.field_ids, expected);
1912    }
1913
1914    #[test]
1915    fn test_arrow_projection_mask() {
1916        let schema = Arc::new(
1917            Schema::builder()
1918                .with_schema_id(1)
1919                .with_identifier_field_ids(vec![1])
1920                .with_fields(vec![
1921                    NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
1922                    NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
1923                    NestedField::optional(
1924                        3,
1925                        "c3",
1926                        Type::Primitive(PrimitiveType::Decimal {
1927                            precision: 38,
1928                            scale: 3,
1929                        }),
1930                    )
1931                    .into(),
1932                ])
1933                .build()
1934                .unwrap(),
1935        );
1936        let arrow_schema = Arc::new(ArrowSchema::new(vec![
1937            Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
1938                PARQUET_FIELD_ID_META_KEY.to_string(),
1939                "1".to_string(),
1940            )])),
1941            // Type not supported
1942            Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
1943                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
1944            ),
1945            // Precision is beyond the supported range
1946            Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
1947                PARQUET_FIELD_ID_META_KEY.to_string(),
1948                "3".to_string(),
1949            )])),
1950        ]));
1951
1952        let message_type = "
1953message schema {
1954  required binary c1 (STRING) = 1;
1955  optional int32 c2 (INTEGER(8,true)) = 2;
1956  optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
1957}
1958    ";
1959        let parquet_type = parse_message_type(message_type).expect("should parse schema");
1960        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
1961
1962        // Try projecting the fields c2 and c3 with the unsupported data types
1963        let err = ArrowReader::get_arrow_projection_mask(
1964            &[1, 2, 3],
1965            &schema,
1966            &parquet_schema,
1967            &arrow_schema,
1968            false,
1969        )
1970        .unwrap_err();
1971
1972        assert_eq!(err.kind(), ErrorKind::DataInvalid);
1973        assert_eq!(
1974            err.to_string(),
1975            "DataInvalid => Unsupported Arrow data type: Duration(µs)".to_string()
1976        );
1977
1978        // Omitting field c2, we still get an error due to c3 being selected
1979        let err = ArrowReader::get_arrow_projection_mask(
1980            &[1, 3],
1981            &schema,
1982            &parquet_schema,
1983            &arrow_schema,
1984            false,
1985        )
1986        .unwrap_err();
1987
1988        assert_eq!(err.kind(), ErrorKind::DataInvalid);
1989        assert_eq!(
1990            err.to_string(),
1991            "DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
1992        );
1993
1994        // Finally avoid selecting fields with unsupported data types
1995        let mask = ArrowReader::get_arrow_projection_mask(
1996            &[1],
1997            &schema,
1998            &parquet_schema,
1999            &arrow_schema,
2000            false,
2001        )
2002        .expect("Some ProjectionMask");
2003        assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
2004    }
2005
2006    #[tokio::test]
2007    async fn test_kleene_logic_or_behaviour() {
2008        // a IS NULL OR a = 'foo'
2009        let predicate = Reference::new("a")
2010            .is_null()
2011            .or(Reference::new("a").equal_to(Datum::string("foo")));
2012
2013        // Table data: [NULL, "foo", "bar"]
2014        let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
2015
2016        // Expected: [NULL, "foo"].
2017        let expected = vec![None, Some("foo".to_string())];
2018
2019        let (file_io, schema, table_location, _temp_dir) =
2020            setup_kleene_logic(data_for_col_a, DataType::Utf8);
2021        let reader = ArrowReaderBuilder::new(file_io).build();
2022
2023        let result_data = test_perform_read(predicate, schema, table_location, reader).await;
2024
2025        assert_eq!(result_data, expected);
2026    }
2027
2028    #[tokio::test]
2029    async fn test_kleene_logic_and_behaviour() {
2030        // a IS NOT NULL AND a != 'foo'
2031        let predicate = Reference::new("a")
2032            .is_not_null()
2033            .and(Reference::new("a").not_equal_to(Datum::string("foo")));
2034
2035        // Table data: [NULL, "foo", "bar"]
2036        let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
2037
2038        // Expected: ["bar"].
2039        let expected = vec![Some("bar".to_string())];
2040
2041        let (file_io, schema, table_location, _temp_dir) =
2042            setup_kleene_logic(data_for_col_a, DataType::Utf8);
2043        let reader = ArrowReaderBuilder::new(file_io).build();
2044
2045        let result_data = test_perform_read(predicate, schema, table_location, reader).await;
2046
2047        assert_eq!(result_data, expected);
2048    }
2049
2050    #[tokio::test]
2051    async fn test_predicate_cast_literal() {
2052        let predicates = vec![
2053            // a == 'foo'
2054            (Reference::new("a").equal_to(Datum::string("foo")), vec![
2055                Some("foo".to_string()),
2056            ]),
2057            // a != 'foo'
2058            (
2059                Reference::new("a").not_equal_to(Datum::string("foo")),
2060                vec![Some("bar".to_string())],
2061            ),
2062            // STARTS_WITH(a, 'foo')
2063            (Reference::new("a").starts_with(Datum::string("f")), vec![
2064                Some("foo".to_string()),
2065            ]),
2066            // NOT STARTS_WITH(a, 'foo')
2067            (
2068                Reference::new("a").not_starts_with(Datum::string("f")),
2069                vec![Some("bar".to_string())],
2070            ),
2071            // a < 'foo'
2072            (Reference::new("a").less_than(Datum::string("foo")), vec![
2073                Some("bar".to_string()),
2074            ]),
2075            // a <= 'foo'
2076            (
2077                Reference::new("a").less_than_or_equal_to(Datum::string("foo")),
2078                vec![Some("foo".to_string()), Some("bar".to_string())],
2079            ),
2080            // a > 'foo'
2081            (
2082                Reference::new("a").greater_than(Datum::string("bar")),
2083                vec![Some("foo".to_string())],
2084            ),
2085            // a >= 'foo'
2086            (
2087                Reference::new("a").greater_than_or_equal_to(Datum::string("foo")),
2088                vec![Some("foo".to_string())],
2089            ),
2090            // a IN ('foo', 'bar')
2091            (
2092                Reference::new("a").is_in([Datum::string("foo"), Datum::string("baz")]),
2093                vec![Some("foo".to_string())],
2094            ),
2095            // a NOT IN ('foo', 'bar')
2096            (
2097                Reference::new("a").is_not_in([Datum::string("foo"), Datum::string("baz")]),
2098                vec![Some("bar".to_string())],
2099            ),
2100        ];
2101
2102        // Table data: ["foo", "bar"]
2103        let data_for_col_a = vec![Some("foo".to_string()), Some("bar".to_string())];
2104
2105        let (file_io, schema, table_location, _temp_dir) =
2106            setup_kleene_logic(data_for_col_a, DataType::LargeUtf8);
2107        let reader = ArrowReaderBuilder::new(file_io).build();
2108
2109        for (predicate, expected) in predicates {
2110            println!("testing predicate {predicate}");
2111            let result_data = test_perform_read(
2112                predicate.clone(),
2113                schema.clone(),
2114                table_location.clone(),
2115                reader.clone(),
2116            )
2117            .await;
2118
2119            assert_eq!(result_data, expected, "predicate={predicate}");
2120        }
2121    }
2122
2123    async fn test_perform_read(
2124        predicate: Predicate,
2125        schema: SchemaRef,
2126        table_location: String,
2127        reader: ArrowReader,
2128    ) -> Vec<Option<String>> {
2129        let tasks = Box::pin(futures::stream::iter(
2130            vec![Ok(FileScanTask {
2131                file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
2132                    .unwrap()
2133                    .len(),
2134                start: 0,
2135                length: 0,
2136                record_count: None,
2137                data_file_path: format!("{table_location}/1.parquet"),
2138                data_file_format: DataFileFormat::Parquet,
2139                schema: schema.clone(),
2140                project_field_ids: vec![1],
2141                predicate: Some(predicate.bind(schema, true).unwrap()),
2142                deletes: vec![],
2143                partition: None,
2144                partition_spec: None,
2145                name_mapping: None,
2146                case_sensitive: false,
2147            })]
2148            .into_iter(),
2149        )) as FileScanTaskStream;
2150
2151        let result = reader
2152            .read(tasks)
2153            .unwrap()
2154            .try_collect::<Vec<RecordBatch>>()
2155            .await
2156            .unwrap();
2157
2158        result[0].columns()[0]
2159            .as_string_opt::<i32>()
2160            .unwrap()
2161            .iter()
2162            .map(|v| v.map(ToOwned::to_owned))
2163            .collect::<Vec<_>>()
2164    }
2165
2166    fn setup_kleene_logic(
2167        data_for_col_a: Vec<Option<String>>,
2168        col_a_type: DataType,
2169    ) -> (FileIO, SchemaRef, String, TempDir) {
2170        let schema = Arc::new(
2171            Schema::builder()
2172                .with_schema_id(1)
2173                .with_fields(vec![
2174                    NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(),
2175                ])
2176                .build()
2177                .unwrap(),
2178        );
2179
2180        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2181            Field::new("a", col_a_type.clone(), true).with_metadata(HashMap::from([(
2182                PARQUET_FIELD_ID_META_KEY.to_string(),
2183                "1".to_string(),
2184            )])),
2185        ]));
2186
2187        let tmp_dir = TempDir::new().unwrap();
2188        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2189
2190        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2191
2192        let col = match col_a_type {
2193            DataType::Utf8 => Arc::new(StringArray::from(data_for_col_a)) as ArrayRef,
2194            DataType::LargeUtf8 => Arc::new(LargeStringArray::from(data_for_col_a)) as ArrayRef,
2195            _ => panic!("unexpected col_a_type"),
2196        };
2197
2198        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col]).unwrap();
2199
2200        // Write the Parquet files
2201        let props = WriterProperties::builder()
2202            .set_compression(Compression::SNAPPY)
2203            .build();
2204
2205        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
2206        let mut writer =
2207            ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
2208
2209        writer.write(&to_write).expect("Writing batch");
2210
2211        // writer must be closed to write footer
2212        writer.close().unwrap();
2213
2214        (file_io, schema, table_location, tmp_dir)
2215    }
2216
2217    #[test]
2218    fn test_build_deletes_row_selection() {
2219        let schema_descr = get_test_schema_descr();
2220
2221        let mut columns = vec![];
2222        for ptr in schema_descr.columns() {
2223            let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
2224            columns.push(column);
2225        }
2226
2227        let row_groups_metadata = vec![
2228            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 0),
2229            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 1),
2230            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 2),
2231            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 3),
2232            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 4),
2233        ];
2234
2235        let selected_row_groups = Some(vec![1, 3]);
2236
2237        /* cases to cover:
2238           * {skip|select} {first|intermediate|last} {one row|multiple rows} in
2239             {first|intermediate|last} {skipped|selected} row group
2240           * row group selection disabled
2241        */
2242
2243        let positional_deletes = RoaringTreemap::from_iter(&[
2244            1, // in skipped rg 0, should be ignored
2245            3, // run of three consecutive items in skipped rg0
2246            4, 5, 998, // two consecutive items at end of skipped rg0
2247            999, 1000, // solitary row at start of selected rg1 (1, 9)
2248            1010, // run of 3 rows in selected rg1
2249            1011, 1012, // (3, 485)
2250            1498, // run of two items at end of selected rg1
2251            1499, 1500, // run of two items at start of skipped rg2
2252            1501, 1600, // should ignore, in skipped rg2
2253            1999, // single row at end of skipped rg2
2254            2000, // run of two items at start of selected rg3
2255            2001, // (4, 98)
2256            2100, // single row in selected row group 3 (1, 99)
2257            2200, // run of 3 consecutive rows in selected row group 3
2258            2201, 2202, // (3, 796)
2259            2999, // single item at end of selected rg3 (1)
2260            3000, // single item at start of skipped rg4
2261        ]);
2262
2263        let positional_deletes = DeleteVector::new(positional_deletes);
2264
2265        // using selected row groups 1 and 3
2266        let result = ArrowReader::build_deletes_row_selection(
2267            &row_groups_metadata,
2268            &selected_row_groups,
2269            &positional_deletes,
2270        )
2271        .unwrap();
2272
2273        let expected = RowSelection::from(vec![
2274            RowSelector::skip(1),
2275            RowSelector::select(9),
2276            RowSelector::skip(3),
2277            RowSelector::select(485),
2278            RowSelector::skip(4),
2279            RowSelector::select(98),
2280            RowSelector::skip(1),
2281            RowSelector::select(99),
2282            RowSelector::skip(3),
2283            RowSelector::select(796),
2284            RowSelector::skip(1),
2285        ]);
2286
2287        assert_eq!(result, expected);
2288
2289        // selecting all row groups
2290        let result = ArrowReader::build_deletes_row_selection(
2291            &row_groups_metadata,
2292            &None,
2293            &positional_deletes,
2294        )
2295        .unwrap();
2296
2297        let expected = RowSelection::from(vec![
2298            RowSelector::select(1),
2299            RowSelector::skip(1),
2300            RowSelector::select(1),
2301            RowSelector::skip(3),
2302            RowSelector::select(992),
2303            RowSelector::skip(3),
2304            RowSelector::select(9),
2305            RowSelector::skip(3),
2306            RowSelector::select(485),
2307            RowSelector::skip(4),
2308            RowSelector::select(98),
2309            RowSelector::skip(1),
2310            RowSelector::select(398),
2311            RowSelector::skip(3),
2312            RowSelector::select(98),
2313            RowSelector::skip(1),
2314            RowSelector::select(99),
2315            RowSelector::skip(3),
2316            RowSelector::select(796),
2317            RowSelector::skip(2),
2318            RowSelector::select(499),
2319        ]);
2320
2321        assert_eq!(result, expected);
2322    }
2323
2324    fn build_test_row_group_meta(
2325        schema_descr: SchemaDescPtr,
2326        columns: Vec<ColumnChunkMetaData>,
2327        num_rows: i64,
2328        ordinal: i16,
2329    ) -> RowGroupMetaData {
2330        RowGroupMetaData::builder(schema_descr.clone())
2331            .set_num_rows(num_rows)
2332            .set_total_byte_size(2000)
2333            .set_column_metadata(columns)
2334            .set_ordinal(ordinal)
2335            .build()
2336            .unwrap()
2337    }
2338
2339    fn get_test_schema_descr() -> SchemaDescPtr {
2340        use parquet::schema::types::Type as SchemaType;
2341
2342        let schema = SchemaType::group_type_builder("schema")
2343            .with_fields(vec![
2344                Arc::new(
2345                    SchemaType::primitive_type_builder("a", parquet::basic::Type::INT32)
2346                        .build()
2347                        .unwrap(),
2348                ),
2349                Arc::new(
2350                    SchemaType::primitive_type_builder("b", parquet::basic::Type::INT32)
2351                        .build()
2352                        .unwrap(),
2353                ),
2354            ])
2355            .build()
2356            .unwrap();
2357
2358        Arc::new(SchemaDescriptor::new(Arc::new(schema)))
2359    }
2360
2361    /// Verifies that file splits respect byte ranges and only read specific row groups.
2362    #[tokio::test]
2363    async fn test_file_splits_respect_byte_ranges() {
2364        use arrow_array::Int32Array;
2365        use parquet::file::reader::{FileReader, SerializedFileReader};
2366
2367        let schema = Arc::new(
2368            Schema::builder()
2369                .with_schema_id(1)
2370                .with_fields(vec![
2371                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2372                ])
2373                .build()
2374                .unwrap(),
2375        );
2376
2377        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2378            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2379                PARQUET_FIELD_ID_META_KEY.to_string(),
2380                "1".to_string(),
2381            )])),
2382        ]));
2383
2384        let tmp_dir = TempDir::new().unwrap();
2385        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2386        let file_path = format!("{table_location}/multi_row_group.parquet");
2387
2388        // Force each batch into its own row group for testing byte range filtering.
2389        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2390            (0..100).collect::<Vec<i32>>(),
2391        ))])
2392        .unwrap();
2393        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2394            (100..200).collect::<Vec<i32>>(),
2395        ))])
2396        .unwrap();
2397        let batch3 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2398            (200..300).collect::<Vec<i32>>(),
2399        ))])
2400        .unwrap();
2401
2402        let props = WriterProperties::builder()
2403            .set_compression(Compression::SNAPPY)
2404            .set_max_row_group_size(100)
2405            .build();
2406
2407        let file = File::create(&file_path).unwrap();
2408        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2409        writer.write(&batch1).expect("Writing batch 1");
2410        writer.write(&batch2).expect("Writing batch 2");
2411        writer.write(&batch3).expect("Writing batch 3");
2412        writer.close().unwrap();
2413
2414        // Read the file metadata to get row group byte positions
2415        let file = File::open(&file_path).unwrap();
2416        let reader = SerializedFileReader::new(file).unwrap();
2417        let metadata = reader.metadata();
2418
2419        println!("File has {} row groups", metadata.num_row_groups());
2420        assert_eq!(metadata.num_row_groups(), 3, "Expected 3 row groups");
2421
2422        // Get byte positions for each row group
2423        let row_group_0 = metadata.row_group(0);
2424        let row_group_1 = metadata.row_group(1);
2425        let row_group_2 = metadata.row_group(2);
2426
2427        let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
2428        let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2429        let rg2_start = rg1_start + row_group_1.compressed_size() as u64;
2430        let file_end = rg2_start + row_group_2.compressed_size() as u64;
2431
2432        println!(
2433            "Row group 0: {} rows, starts at byte {}, {} bytes compressed",
2434            row_group_0.num_rows(),
2435            rg0_start,
2436            row_group_0.compressed_size()
2437        );
2438        println!(
2439            "Row group 1: {} rows, starts at byte {}, {} bytes compressed",
2440            row_group_1.num_rows(),
2441            rg1_start,
2442            row_group_1.compressed_size()
2443        );
2444        println!(
2445            "Row group 2: {} rows, starts at byte {}, {} bytes compressed",
2446            row_group_2.num_rows(),
2447            rg2_start,
2448            row_group_2.compressed_size()
2449        );
2450
2451        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2452        let reader = ArrowReaderBuilder::new(file_io).build();
2453
2454        // Task 1: read only the first row group
2455        let task1 = FileScanTask {
2456            file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(),
2457            start: rg0_start,
2458            length: row_group_0.compressed_size() as u64,
2459            record_count: Some(100),
2460            data_file_path: file_path.clone(),
2461            data_file_format: DataFileFormat::Parquet,
2462            schema: schema.clone(),
2463            project_field_ids: vec![1],
2464            predicate: None,
2465            deletes: vec![],
2466            partition: None,
2467            partition_spec: None,
2468            name_mapping: None,
2469            case_sensitive: false,
2470        };
2471
2472        // Task 2: read the second and third row groups
2473        let task2 = FileScanTask {
2474            file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(),
2475            start: rg1_start,
2476            length: file_end - rg1_start,
2477            record_count: Some(200),
2478            data_file_path: file_path.clone(),
2479            data_file_format: DataFileFormat::Parquet,
2480            schema: schema.clone(),
2481            project_field_ids: vec![1],
2482            predicate: None,
2483            deletes: vec![],
2484            partition: None,
2485            partition_spec: None,
2486            name_mapping: None,
2487            case_sensitive: false,
2488        };
2489
2490        let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
2491        let result1 = reader
2492            .clone()
2493            .read(tasks1)
2494            .unwrap()
2495            .try_collect::<Vec<RecordBatch>>()
2496            .await
2497            .unwrap();
2498
2499        let total_rows_task1: usize = result1.iter().map(|b| b.num_rows()).sum();
2500        println!(
2501            "Task 1 (bytes {}-{}) returned {} rows",
2502            rg0_start,
2503            rg0_start + row_group_0.compressed_size() as u64,
2504            total_rows_task1
2505        );
2506
2507        let tasks2 = Box::pin(futures::stream::iter(vec![Ok(task2)])) as FileScanTaskStream;
2508        let result2 = reader
2509            .read(tasks2)
2510            .unwrap()
2511            .try_collect::<Vec<RecordBatch>>()
2512            .await
2513            .unwrap();
2514
2515        let total_rows_task2: usize = result2.iter().map(|b| b.num_rows()).sum();
2516        println!("Task 2 (bytes {rg1_start}-{file_end}) returned {total_rows_task2} rows");
2517
2518        assert_eq!(
2519            total_rows_task1, 100,
2520            "Task 1 should read only the first row group (100 rows), but got {total_rows_task1} rows"
2521        );
2522
2523        assert_eq!(
2524            total_rows_task2, 200,
2525            "Task 2 should read only the second+third row groups (200 rows), but got {total_rows_task2} rows"
2526        );
2527
2528        // Verify the actual data values are correct (not just the row count)
2529        if total_rows_task1 > 0 {
2530            let first_batch = &result1[0];
2531            let id_col = first_batch
2532                .column(0)
2533                .as_primitive::<arrow_array::types::Int32Type>();
2534            let first_val = id_col.value(0);
2535            let last_val = id_col.value(id_col.len() - 1);
2536            println!("Task 1 data range: {first_val} to {last_val}");
2537
2538            assert_eq!(first_val, 0, "Task 1 should start with id=0");
2539            assert_eq!(last_val, 99, "Task 1 should end with id=99");
2540        }
2541
2542        if total_rows_task2 > 0 {
2543            let first_batch = &result2[0];
2544            let id_col = first_batch
2545                .column(0)
2546                .as_primitive::<arrow_array::types::Int32Type>();
2547            let first_val = id_col.value(0);
2548            println!("Task 2 first value: {first_val}");
2549
2550            assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0");
2551        }
2552    }
2553
2554    /// Test schema evolution: reading old Parquet file (with only column 'a')
2555    /// using a newer table schema (with columns 'a' and 'b').
2556    /// This tests that:
2557    /// 1. get_arrow_projection_mask allows missing columns
2558    /// 2. RecordBatchTransformer adds missing column 'b' with NULL values
2559    #[tokio::test]
2560    async fn test_schema_evolution_add_column() {
2561        use arrow_array::{Array, Int32Array};
2562
2563        // New table schema: columns 'a' and 'b' (b was added later, file only has 'a')
2564        let new_schema = Arc::new(
2565            Schema::builder()
2566                .with_schema_id(2)
2567                .with_fields(vec![
2568                    NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)).into(),
2569                    NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
2570                ])
2571                .build()
2572                .unwrap(),
2573        );
2574
2575        // Create Arrow schema for old Parquet file (only has column 'a')
2576        let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
2577            Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([(
2578                PARQUET_FIELD_ID_META_KEY.to_string(),
2579                "1".to_string(),
2580            )])),
2581        ]));
2582
2583        // Write old Parquet file with only column 'a'
2584        let tmp_dir = TempDir::new().unwrap();
2585        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2586        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2587
2588        let data_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
2589        let to_write = RecordBatch::try_new(arrow_schema_old.clone(), vec![data_a]).unwrap();
2590
2591        let props = WriterProperties::builder()
2592            .set_compression(Compression::SNAPPY)
2593            .build();
2594        let file = File::create(format!("{table_location}/old_file.parquet")).unwrap();
2595        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
2596        writer.write(&to_write).expect("Writing batch");
2597        writer.close().unwrap();
2598
2599        // Read the old Parquet file using the NEW schema (with column 'b')
2600        let reader = ArrowReaderBuilder::new(file_io).build();
2601        let tasks = Box::pin(futures::stream::iter(
2602            vec![Ok(FileScanTask {
2603                file_size_in_bytes: std::fs::metadata(format!("{table_location}/old_file.parquet"))
2604                    .unwrap()
2605                    .len(),
2606                start: 0,
2607                length: 0,
2608                record_count: None,
2609                data_file_path: format!("{table_location}/old_file.parquet"),
2610                data_file_format: DataFileFormat::Parquet,
2611                schema: new_schema.clone(),
2612                project_field_ids: vec![1, 2], // Request both columns 'a' and 'b'
2613                predicate: None,
2614                deletes: vec![],
2615                partition: None,
2616                partition_spec: None,
2617                name_mapping: None,
2618                case_sensitive: false,
2619            })]
2620            .into_iter(),
2621        )) as FileScanTaskStream;
2622
2623        let result = reader
2624            .read(tasks)
2625            .unwrap()
2626            .try_collect::<Vec<RecordBatch>>()
2627            .await
2628            .unwrap();
2629
2630        // Verify we got the correct data
2631        assert_eq!(result.len(), 1);
2632        let batch = &result[0];
2633
2634        // Should have 2 columns now
2635        assert_eq!(batch.num_columns(), 2);
2636        assert_eq!(batch.num_rows(), 3);
2637
2638        // Column 'a' should have the original data
2639        let col_a = batch
2640            .column(0)
2641            .as_primitive::<arrow_array::types::Int32Type>();
2642        assert_eq!(col_a.values(), &[1, 2, 3]);
2643
2644        // Column 'b' should be all NULLs (it didn't exist in the old file)
2645        let col_b = batch
2646            .column(1)
2647            .as_primitive::<arrow_array::types::Int32Type>();
2648        assert_eq!(col_b.null_count(), 3);
2649        assert!(col_b.is_null(0));
2650        assert!(col_b.is_null(1));
2651        assert!(col_b.is_null(2));
2652    }
2653
2654    /// Test for bug where position deletes in later row groups are not applied correctly.
2655    ///
2656    /// When a file has multiple row groups and a position delete targets a row in a later
2657    /// row group, the `build_deletes_row_selection` function had a bug where it would
2658    /// fail to increment `current_row_group_base_idx` when skipping row groups.
2659    ///
2660    /// This test creates:
2661    /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
2662    /// - A position delete file that deletes row 199 (last row in second row group)
2663    ///
2664    /// Expected behavior: Should return 199 rows (with id=200 deleted)
2665    /// Bug behavior: Returns 200 rows (delete is not applied)
2666    ///
2667    /// This bug was discovered while running Apache Spark + Apache Iceberg integration tests
2668    /// through DataFusion Comet. The following Iceberg Java tests failed due to this bug:
2669    /// - `org.apache.iceberg.spark.extensions.TestMergeOnReadDelete::testDeleteWithMultipleRowGroupsParquet`
2670    /// - `org.apache.iceberg.spark.extensions.TestMergeOnReadUpdate::testUpdateWithMultipleRowGroupsParquet`
2671    #[tokio::test]
2672    async fn test_position_delete_across_multiple_row_groups() {
2673        use arrow_array::{Int32Array, Int64Array};
2674        use parquet::file::reader::{FileReader, SerializedFileReader};
2675
2676        // Field IDs for positional delete schema
2677        const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2678        const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2679
2680        let tmp_dir = TempDir::new().unwrap();
2681        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2682
2683        // Create table schema with a single 'id' column
2684        let table_schema = Arc::new(
2685            Schema::builder()
2686                .with_schema_id(1)
2687                .with_fields(vec![
2688                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2689                ])
2690                .build()
2691                .unwrap(),
2692        );
2693
2694        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2695            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2696                PARQUET_FIELD_ID_META_KEY.to_string(),
2697                "1".to_string(),
2698            )])),
2699        ]));
2700
2701        // Step 1: Create data file with 200 rows in 2 row groups
2702        // Row group 0: rows 0-99 (ids 1-100)
2703        // Row group 1: rows 100-199 (ids 101-200)
2704        let data_file_path = format!("{table_location}/data.parquet");
2705
2706        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2707            Int32Array::from_iter_values(1..=100),
2708        )])
2709        .unwrap();
2710
2711        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2712            Int32Array::from_iter_values(101..=200),
2713        )])
2714        .unwrap();
2715
2716        // Force each batch into its own row group
2717        let props = WriterProperties::builder()
2718            .set_compression(Compression::SNAPPY)
2719            .set_max_row_group_size(100)
2720            .build();
2721
2722        let file = File::create(&data_file_path).unwrap();
2723        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2724        writer.write(&batch1).expect("Writing batch 1");
2725        writer.write(&batch2).expect("Writing batch 2");
2726        writer.close().unwrap();
2727
2728        // Verify we created 2 row groups
2729        let verify_file = File::open(&data_file_path).unwrap();
2730        let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2731        assert_eq!(
2732            verify_reader.metadata().num_row_groups(),
2733            2,
2734            "Should have 2 row groups"
2735        );
2736
2737        // Step 2: Create position delete file that deletes row 199 (id=200, last row in row group 1)
2738        let delete_file_path = format!("{table_location}/deletes.parquet");
2739
2740        let delete_schema = Arc::new(ArrowSchema::new(vec![
2741            Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2742                PARQUET_FIELD_ID_META_KEY.to_string(),
2743                FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2744            )])),
2745            Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2746                PARQUET_FIELD_ID_META_KEY.to_string(),
2747                FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2748            )])),
2749        ]));
2750
2751        // Delete row at position 199 (0-indexed, so it's the last row: id=200)
2752        let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2753            Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2754            Arc::new(Int64Array::from_iter_values(vec![199i64])),
2755        ])
2756        .unwrap();
2757
2758        let delete_props = WriterProperties::builder()
2759            .set_compression(Compression::SNAPPY)
2760            .build();
2761
2762        let delete_file = File::create(&delete_file_path).unwrap();
2763        let mut delete_writer =
2764            ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2765        delete_writer.write(&delete_batch).unwrap();
2766        delete_writer.close().unwrap();
2767
2768        // Step 3: Read the data file with the delete applied
2769        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2770        let reader = ArrowReaderBuilder::new(file_io).build();
2771
2772        let task = FileScanTask {
2773            file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(),
2774            start: 0,
2775            length: 0,
2776            record_count: Some(200),
2777            data_file_path: data_file_path.clone(),
2778            data_file_format: DataFileFormat::Parquet,
2779            schema: table_schema.clone(),
2780            project_field_ids: vec![1],
2781            predicate: None,
2782            deletes: vec![FileScanTaskDeleteFile {
2783                file_size_in_bytes: std::fs::metadata(&delete_file_path).unwrap().len(),
2784                file_path: delete_file_path,
2785                file_type: DataContentType::PositionDeletes,
2786                partition_spec_id: 0,
2787                equality_ids: None,
2788            }],
2789            partition: None,
2790            partition_spec: None,
2791            name_mapping: None,
2792            case_sensitive: false,
2793        };
2794
2795        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
2796        let result = reader
2797            .read(tasks)
2798            .unwrap()
2799            .try_collect::<Vec<RecordBatch>>()
2800            .await
2801            .unwrap();
2802
2803        // Step 4: Verify we got 199 rows (not 200)
2804        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
2805
2806        println!("Total rows read: {total_rows}");
2807        println!("Expected: 199 rows (deleted row 199 which had id=200)");
2808
2809        // This assertion will FAIL before the fix and PASS after the fix
2810        assert_eq!(
2811            total_rows, 199,
2812            "Expected 199 rows after deleting row 199, but got {total_rows} rows. \
2813             The bug causes position deletes in later row groups to be ignored."
2814        );
2815
2816        // Verify the deleted row (id=200) is not present
2817        let all_ids: Vec<i32> = result
2818            .iter()
2819            .flat_map(|batch| {
2820                batch
2821                    .column(0)
2822                    .as_primitive::<arrow_array::types::Int32Type>()
2823                    .values()
2824                    .iter()
2825                    .copied()
2826            })
2827            .collect();
2828
2829        assert!(
2830            !all_ids.contains(&200),
2831            "Row with id=200 should be deleted but was found in results"
2832        );
2833
2834        // Verify we have all other ids (1-199)
2835        let expected_ids: Vec<i32> = (1..=199).collect();
2836        assert_eq!(
2837            all_ids, expected_ids,
2838            "Should have ids 1-199 but got different values"
2839        );
2840    }
2841
2842    /// Test for bug where position deletes are lost when skipping unselected row groups.
2843    ///
2844    /// This is a variant of `test_position_delete_across_multiple_row_groups` that exercises
2845    /// the row group selection code path (`selected_row_groups: Some([...])`).
2846    ///
2847    /// When a file has multiple row groups and only some are selected for reading,
2848    /// the `build_deletes_row_selection` function must correctly skip over deletes in
2849    /// unselected row groups WITHOUT consuming deletes that belong to selected row groups.
2850    ///
2851    /// This test creates:
2852    /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
2853    /// - A position delete file that deletes row 199 (last row in second row group)
2854    /// - Row group selection that reads ONLY row group 1 (rows 100-199)
2855    ///
2856    /// Expected behavior: Should return 99 rows (with row 199 deleted)
2857    /// Bug behavior: Returns 100 rows (delete is lost when skipping row group 0)
2858    ///
2859    /// The bug occurs when processing row group 0 (unselected):
2860    /// ```rust
2861    /// delete_vector_iter.advance_to(next_row_group_base_idx); // Position at first delete >= 100
2862    /// next_deleted_row_idx_opt = delete_vector_iter.next(); // BUG: Consumes delete at 199!
2863    /// ```
2864    ///
2865    /// The fix is to NOT call `next()` after `advance_to()` when skipping unselected row groups,
2866    /// because `advance_to()` already positions the iterator correctly without consuming elements.
2867    #[tokio::test]
2868    async fn test_position_delete_with_row_group_selection() {
2869        use arrow_array::{Int32Array, Int64Array};
2870        use parquet::file::reader::{FileReader, SerializedFileReader};
2871
2872        // Field IDs for positional delete schema
2873        const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2874        const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2875
2876        let tmp_dir = TempDir::new().unwrap();
2877        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2878
2879        // Create table schema with a single 'id' column
2880        let table_schema = Arc::new(
2881            Schema::builder()
2882                .with_schema_id(1)
2883                .with_fields(vec![
2884                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2885                ])
2886                .build()
2887                .unwrap(),
2888        );
2889
2890        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2891            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2892                PARQUET_FIELD_ID_META_KEY.to_string(),
2893                "1".to_string(),
2894            )])),
2895        ]));
2896
2897        // Step 1: Create data file with 200 rows in 2 row groups
2898        // Row group 0: rows 0-99 (ids 1-100)
2899        // Row group 1: rows 100-199 (ids 101-200)
2900        let data_file_path = format!("{table_location}/data.parquet");
2901
2902        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2903            Int32Array::from_iter_values(1..=100),
2904        )])
2905        .unwrap();
2906
2907        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2908            Int32Array::from_iter_values(101..=200),
2909        )])
2910        .unwrap();
2911
2912        // Force each batch into its own row group
2913        let props = WriterProperties::builder()
2914            .set_compression(Compression::SNAPPY)
2915            .set_max_row_group_size(100)
2916            .build();
2917
2918        let file = File::create(&data_file_path).unwrap();
2919        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2920        writer.write(&batch1).expect("Writing batch 1");
2921        writer.write(&batch2).expect("Writing batch 2");
2922        writer.close().unwrap();
2923
2924        // Verify we created 2 row groups
2925        let verify_file = File::open(&data_file_path).unwrap();
2926        let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2927        assert_eq!(
2928            verify_reader.metadata().num_row_groups(),
2929            2,
2930            "Should have 2 row groups"
2931        );
2932
2933        // Step 2: Create position delete file that deletes row 199 (id=200, last row in row group 1)
2934        let delete_file_path = format!("{table_location}/deletes.parquet");
2935
2936        let delete_schema = Arc::new(ArrowSchema::new(vec![
2937            Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2938                PARQUET_FIELD_ID_META_KEY.to_string(),
2939                FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2940            )])),
2941            Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2942                PARQUET_FIELD_ID_META_KEY.to_string(),
2943                FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2944            )])),
2945        ]));
2946
2947        // Delete row at position 199 (0-indexed, so it's the last row: id=200)
2948        let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2949            Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2950            Arc::new(Int64Array::from_iter_values(vec![199i64])),
2951        ])
2952        .unwrap();
2953
2954        let delete_props = WriterProperties::builder()
2955            .set_compression(Compression::SNAPPY)
2956            .build();
2957
2958        let delete_file = File::create(&delete_file_path).unwrap();
2959        let mut delete_writer =
2960            ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2961        delete_writer.write(&delete_batch).unwrap();
2962        delete_writer.close().unwrap();
2963
2964        // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199)
2965        // This exercises the row group selection code path where row group 0 is skipped
2966        let metadata_file = File::open(&data_file_path).unwrap();
2967        let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
2968        let metadata = metadata_reader.metadata();
2969
2970        let row_group_0 = metadata.row_group(0);
2971        let row_group_1 = metadata.row_group(1);
2972
2973        let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
2974        let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2975        let rg1_length = row_group_1.compressed_size() as u64;
2976
2977        println!(
2978            "Row group 0: starts at byte {}, {} bytes compressed",
2979            rg0_start,
2980            row_group_0.compressed_size()
2981        );
2982        println!(
2983            "Row group 1: starts at byte {}, {} bytes compressed",
2984            rg1_start,
2985            row_group_1.compressed_size()
2986        );
2987
2988        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2989        let reader = ArrowReaderBuilder::new(file_io).build();
2990
2991        // Create FileScanTask that reads ONLY row group 1 via byte range filtering
2992        let task = FileScanTask {
2993            file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(),
2994            start: rg1_start,
2995            length: rg1_length,
2996            record_count: Some(100), // Row group 1 has 100 rows
2997            data_file_path: data_file_path.clone(),
2998            data_file_format: DataFileFormat::Parquet,
2999            schema: table_schema.clone(),
3000            project_field_ids: vec![1],
3001            predicate: None,
3002            deletes: vec![FileScanTaskDeleteFile {
3003                file_size_in_bytes: std::fs::metadata(&delete_file_path).unwrap().len(),
3004                file_path: delete_file_path,
3005                file_type: DataContentType::PositionDeletes,
3006                partition_spec_id: 0,
3007                equality_ids: None,
3008            }],
3009            partition: None,
3010            partition_spec: None,
3011            name_mapping: None,
3012            case_sensitive: false,
3013        };
3014
3015        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
3016        let result = reader
3017            .read(tasks)
3018            .unwrap()
3019            .try_collect::<Vec<RecordBatch>>()
3020            .await
3021            .unwrap();
3022
3023        // Step 4: Verify we got 99 rows (not 100)
3024        // Row group 1 has 100 rows (ids 101-200), minus 1 delete (id=200) = 99 rows
3025        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
3026
3027        println!("Total rows read from row group 1: {total_rows}");
3028        println!("Expected: 99 rows (row group 1 has 100 rows, 1 delete at position 199)");
3029
3030        // This assertion will FAIL before the fix and PASS after the fix
3031        assert_eq!(
3032            total_rows, 99,
3033            "Expected 99 rows from row group 1 after deleting position 199, but got {total_rows} rows. \
3034             The bug causes position deletes to be lost when advance_to() is followed by next() \
3035             when skipping unselected row groups."
3036        );
3037
3038        // Verify the deleted row (id=200) is not present
3039        let all_ids: Vec<i32> = result
3040            .iter()
3041            .flat_map(|batch| {
3042                batch
3043                    .column(0)
3044                    .as_primitive::<arrow_array::types::Int32Type>()
3045                    .values()
3046                    .iter()
3047                    .copied()
3048            })
3049            .collect();
3050
3051        assert!(
3052            !all_ids.contains(&200),
3053            "Row with id=200 should be deleted but was found in results"
3054        );
3055
3056        // Verify we have ids 101-199 (not 101-200)
3057        let expected_ids: Vec<i32> = (101..=199).collect();
3058        assert_eq!(
3059            all_ids, expected_ids,
3060            "Should have ids 101-199 but got different values"
3061        );
3062    }
3063    /// Test for bug where stale cached delete causes infinite loop when skipping row groups.
3064    ///
3065    /// This test exposes the inverse scenario of `test_position_delete_with_row_group_selection`:
3066    /// - Position delete targets a row in the SKIPPED row group (not the selected one)
3067    /// - After calling advance_to(), the cached delete index is stale
3068    /// - Without updating the cache, the code enters an infinite loop
3069    ///
3070    /// This test creates:
3071    /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
3072    /// - A position delete file that deletes row 0 (first row in SKIPPED row group 0)
3073    /// - Row group selection that reads ONLY row group 1 (rows 100-199)
3074    ///
3075    /// The bug occurs when skipping row group 0:
3076    /// ```rust
3077    /// let mut next_deleted_row_idx_opt = delete_vector_iter.next(); // Some(0)
3078    /// // ... skip to row group 1 ...
3079    /// delete_vector_iter.advance_to(100); // Iterator advances past delete at 0
3080    /// // BUG: next_deleted_row_idx_opt is still Some(0) - STALE!
3081    /// // When processing row group 1:
3082    /// //   current_idx = 100, next_deleted_row_idx = 0, next_row_group_base_idx = 200
3083    /// //   Loop condition: 0 < 200 (true)
3084    /// //   But: current_idx (100) > next_deleted_row_idx (0)
3085    /// //   And: current_idx (100) != next_deleted_row_idx (0)
3086    /// //   Neither branch executes -> INFINITE LOOP!
3087    /// ```
3088    ///
3089    /// Expected behavior: Should return 100 rows (delete at 0 doesn't affect row group 1)
3090    /// Bug behavior: Infinite loop in build_deletes_row_selection
3091    #[tokio::test]
3092    async fn test_position_delete_in_skipped_row_group() {
3093        use arrow_array::{Int32Array, Int64Array};
3094        use parquet::file::reader::{FileReader, SerializedFileReader};
3095
3096        // Field IDs for positional delete schema
3097        const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
3098        const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
3099
3100        let tmp_dir = TempDir::new().unwrap();
3101        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3102
3103        // Create table schema with a single 'id' column
3104        let table_schema = Arc::new(
3105            Schema::builder()
3106                .with_schema_id(1)
3107                .with_fields(vec![
3108                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3109                ])
3110                .build()
3111                .unwrap(),
3112        );
3113
3114        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3115            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3116                PARQUET_FIELD_ID_META_KEY.to_string(),
3117                "1".to_string(),
3118            )])),
3119        ]));
3120
3121        // Step 1: Create data file with 200 rows in 2 row groups
3122        // Row group 0: rows 0-99 (ids 1-100)
3123        // Row group 1: rows 100-199 (ids 101-200)
3124        let data_file_path = format!("{table_location}/data.parquet");
3125
3126        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3127            Int32Array::from_iter_values(1..=100),
3128        )])
3129        .unwrap();
3130
3131        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3132            Int32Array::from_iter_values(101..=200),
3133        )])
3134        .unwrap();
3135
3136        // Force each batch into its own row group
3137        let props = WriterProperties::builder()
3138            .set_compression(Compression::SNAPPY)
3139            .set_max_row_group_size(100)
3140            .build();
3141
3142        let file = File::create(&data_file_path).unwrap();
3143        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3144        writer.write(&batch1).expect("Writing batch 1");
3145        writer.write(&batch2).expect("Writing batch 2");
3146        writer.close().unwrap();
3147
3148        // Verify we created 2 row groups
3149        let verify_file = File::open(&data_file_path).unwrap();
3150        let verify_reader = SerializedFileReader::new(verify_file).unwrap();
3151        assert_eq!(
3152            verify_reader.metadata().num_row_groups(),
3153            2,
3154            "Should have 2 row groups"
3155        );
3156
3157        // Step 2: Create position delete file that deletes row 0 (id=1, first row in row group 0)
3158        let delete_file_path = format!("{table_location}/deletes.parquet");
3159
3160        let delete_schema = Arc::new(ArrowSchema::new(vec![
3161            Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
3162                PARQUET_FIELD_ID_META_KEY.to_string(),
3163                FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
3164            )])),
3165            Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
3166                PARQUET_FIELD_ID_META_KEY.to_string(),
3167                FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
3168            )])),
3169        ]));
3170
3171        // Delete row at position 0 (0-indexed, so it's the first row: id=1)
3172        let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
3173            Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
3174            Arc::new(Int64Array::from_iter_values(vec![0i64])),
3175        ])
3176        .unwrap();
3177
3178        let delete_props = WriterProperties::builder()
3179            .set_compression(Compression::SNAPPY)
3180            .build();
3181
3182        let delete_file = File::create(&delete_file_path).unwrap();
3183        let mut delete_writer =
3184            ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
3185        delete_writer.write(&delete_batch).unwrap();
3186        delete_writer.close().unwrap();
3187
3188        // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199)
3189        // This exercises the row group selection code path where row group 0 is skipped
3190        let metadata_file = File::open(&data_file_path).unwrap();
3191        let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
3192        let metadata = metadata_reader.metadata();
3193
3194        let row_group_0 = metadata.row_group(0);
3195        let row_group_1 = metadata.row_group(1);
3196
3197        let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
3198        let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
3199        let rg1_length = row_group_1.compressed_size() as u64;
3200
3201        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3202        let reader = ArrowReaderBuilder::new(file_io).build();
3203
3204        // Create FileScanTask that reads ONLY row group 1 via byte range filtering
3205        let task = FileScanTask {
3206            file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(),
3207            start: rg1_start,
3208            length: rg1_length,
3209            record_count: Some(100), // Row group 1 has 100 rows
3210            data_file_path: data_file_path.clone(),
3211            data_file_format: DataFileFormat::Parquet,
3212            schema: table_schema.clone(),
3213            project_field_ids: vec![1],
3214            predicate: None,
3215            deletes: vec![FileScanTaskDeleteFile {
3216                file_size_in_bytes: std::fs::metadata(&delete_file_path).unwrap().len(),
3217                file_path: delete_file_path,
3218                file_type: DataContentType::PositionDeletes,
3219                partition_spec_id: 0,
3220                equality_ids: None,
3221            }],
3222            partition: None,
3223            partition_spec: None,
3224            name_mapping: None,
3225            case_sensitive: false,
3226        };
3227
3228        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
3229        let result = reader
3230            .read(tasks)
3231            .unwrap()
3232            .try_collect::<Vec<RecordBatch>>()
3233            .await
3234            .unwrap();
3235
3236        // Step 4: Verify we got 100 rows (all of row group 1)
3237        // The delete at position 0 is in row group 0, which is skipped, so it doesn't affect us
3238        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
3239
3240        assert_eq!(
3241            total_rows, 100,
3242            "Expected 100 rows from row group 1 (delete at position 0 is in skipped row group 0). \
3243             If this hangs or fails, it indicates the cached delete index was not updated after advance_to()."
3244        );
3245
3246        // Verify we have all ids from row group 1 (101-200)
3247        let all_ids: Vec<i32> = result
3248            .iter()
3249            .flat_map(|batch| {
3250                batch
3251                    .column(0)
3252                    .as_primitive::<arrow_array::types::Int32Type>()
3253                    .values()
3254                    .iter()
3255                    .copied()
3256            })
3257            .collect();
3258
3259        let expected_ids: Vec<i32> = (101..=200).collect();
3260        assert_eq!(
3261            all_ids, expected_ids,
3262            "Should have ids 101-200 (all of row group 1)"
3263        );
3264    }
3265
3266    /// Test reading Parquet files without field ID metadata (e.g., migrated tables).
3267    /// This exercises the position-based fallback path.
3268    ///
3269    /// Corresponds to Java's ParquetSchemaUtil.addFallbackIds() + pruneColumnsFallback()
3270    /// in /parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java
3271    #[tokio::test]
3272    async fn test_read_parquet_file_without_field_ids() {
3273        let schema = Arc::new(
3274            Schema::builder()
3275                .with_schema_id(1)
3276                .with_fields(vec![
3277                    NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3278                    NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3279                ])
3280                .build()
3281                .unwrap(),
3282        );
3283
3284        // Parquet file from a migrated table - no field ID metadata
3285        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3286            Field::new("name", DataType::Utf8, false),
3287            Field::new("age", DataType::Int32, false),
3288        ]));
3289
3290        let tmp_dir = TempDir::new().unwrap();
3291        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3292        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3293
3294        let name_data = vec!["Alice", "Bob", "Charlie"];
3295        let age_data = vec![30, 25, 35];
3296
3297        use arrow_array::Int32Array;
3298        let name_col = Arc::new(StringArray::from(name_data.clone())) as ArrayRef;
3299        let age_col = Arc::new(Int32Array::from(age_data.clone())) as ArrayRef;
3300
3301        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![name_col, age_col]).unwrap();
3302
3303        let props = WriterProperties::builder()
3304            .set_compression(Compression::SNAPPY)
3305            .build();
3306
3307        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3308        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3309
3310        writer.write(&to_write).expect("Writing batch");
3311        writer.close().unwrap();
3312
3313        let reader = ArrowReaderBuilder::new(file_io).build();
3314
3315        let tasks = Box::pin(futures::stream::iter(
3316            vec![Ok(FileScanTask {
3317                file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3318                    .unwrap()
3319                    .len(),
3320                start: 0,
3321                length: 0,
3322                record_count: None,
3323                data_file_path: format!("{table_location}/1.parquet"),
3324                data_file_format: DataFileFormat::Parquet,
3325                schema: schema.clone(),
3326                project_field_ids: vec![1, 2],
3327                predicate: None,
3328                deletes: vec![],
3329                partition: None,
3330                partition_spec: None,
3331                name_mapping: None,
3332                case_sensitive: false,
3333            })]
3334            .into_iter(),
3335        )) as FileScanTaskStream;
3336
3337        let result = reader
3338            .read(tasks)
3339            .unwrap()
3340            .try_collect::<Vec<RecordBatch>>()
3341            .await
3342            .unwrap();
3343
3344        assert_eq!(result.len(), 1);
3345        let batch = &result[0];
3346        assert_eq!(batch.num_rows(), 3);
3347        assert_eq!(batch.num_columns(), 2);
3348
3349        // Verify position-based mapping: field_id 1 → position 0, field_id 2 → position 1
3350        let name_array = batch.column(0).as_string::<i32>();
3351        assert_eq!(name_array.value(0), "Alice");
3352        assert_eq!(name_array.value(1), "Bob");
3353        assert_eq!(name_array.value(2), "Charlie");
3354
3355        let age_array = batch
3356            .column(1)
3357            .as_primitive::<arrow_array::types::Int32Type>();
3358        assert_eq!(age_array.value(0), 30);
3359        assert_eq!(age_array.value(1), 25);
3360        assert_eq!(age_array.value(2), 35);
3361    }
3362
3363    /// Test reading Parquet files without field IDs with partial projection.
3364    /// Only a subset of columns are requested, verifying position-based fallback
3365    /// handles column selection correctly.
3366    #[tokio::test]
3367    async fn test_read_parquet_without_field_ids_partial_projection() {
3368        use arrow_array::Int32Array;
3369
3370        let schema = Arc::new(
3371            Schema::builder()
3372                .with_schema_id(1)
3373                .with_fields(vec![
3374                    NestedField::required(1, "col1", Type::Primitive(PrimitiveType::String)).into(),
3375                    NestedField::required(2, "col2", Type::Primitive(PrimitiveType::Int)).into(),
3376                    NestedField::required(3, "col3", Type::Primitive(PrimitiveType::String)).into(),
3377                    NestedField::required(4, "col4", Type::Primitive(PrimitiveType::Int)).into(),
3378                ])
3379                .build()
3380                .unwrap(),
3381        );
3382
3383        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3384            Field::new("col1", DataType::Utf8, false),
3385            Field::new("col2", DataType::Int32, false),
3386            Field::new("col3", DataType::Utf8, false),
3387            Field::new("col4", DataType::Int32, false),
3388        ]));
3389
3390        let tmp_dir = TempDir::new().unwrap();
3391        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3392        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3393
3394        let col1_data = Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef;
3395        let col2_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3396        let col3_data = Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef;
3397        let col4_data = Arc::new(Int32Array::from(vec![30, 40])) as ArrayRef;
3398
3399        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
3400            col1_data, col2_data, col3_data, col4_data,
3401        ])
3402        .unwrap();
3403
3404        let props = WriterProperties::builder()
3405            .set_compression(Compression::SNAPPY)
3406            .build();
3407
3408        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3409        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3410
3411        writer.write(&to_write).expect("Writing batch");
3412        writer.close().unwrap();
3413
3414        let reader = ArrowReaderBuilder::new(file_io).build();
3415
3416        let tasks = Box::pin(futures::stream::iter(
3417            vec![Ok(FileScanTask {
3418                file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3419                    .unwrap()
3420                    .len(),
3421                start: 0,
3422                length: 0,
3423                record_count: None,
3424                data_file_path: format!("{table_location}/1.parquet"),
3425                data_file_format: DataFileFormat::Parquet,
3426                schema: schema.clone(),
3427                project_field_ids: vec![1, 3],
3428                predicate: None,
3429                deletes: vec![],
3430                partition: None,
3431                partition_spec: None,
3432                name_mapping: None,
3433                case_sensitive: false,
3434            })]
3435            .into_iter(),
3436        )) as FileScanTaskStream;
3437
3438        let result = reader
3439            .read(tasks)
3440            .unwrap()
3441            .try_collect::<Vec<RecordBatch>>()
3442            .await
3443            .unwrap();
3444
3445        assert_eq!(result.len(), 1);
3446        let batch = &result[0];
3447        assert_eq!(batch.num_rows(), 2);
3448        assert_eq!(batch.num_columns(), 2);
3449
3450        let col1_array = batch.column(0).as_string::<i32>();
3451        assert_eq!(col1_array.value(0), "a");
3452        assert_eq!(col1_array.value(1), "b");
3453
3454        let col3_array = batch.column(1).as_string::<i32>();
3455        assert_eq!(col3_array.value(0), "c");
3456        assert_eq!(col3_array.value(1), "d");
3457    }
3458
3459    /// Test reading Parquet files without field IDs with schema evolution.
3460    /// The Iceberg schema has more fields than the Parquet file, testing that
3461    /// missing columns are filled with NULLs.
3462    #[tokio::test]
3463    async fn test_read_parquet_without_field_ids_schema_evolution() {
3464        use arrow_array::{Array, Int32Array};
3465
3466        // Schema with field 3 added after the file was written
3467        let schema = Arc::new(
3468            Schema::builder()
3469                .with_schema_id(1)
3470                .with_fields(vec![
3471                    NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3472                    NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3473                    NestedField::optional(3, "city", Type::Primitive(PrimitiveType::String)).into(),
3474                ])
3475                .build()
3476                .unwrap(),
3477        );
3478
3479        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3480            Field::new("name", DataType::Utf8, false),
3481            Field::new("age", DataType::Int32, false),
3482        ]));
3483
3484        let tmp_dir = TempDir::new().unwrap();
3485        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3486        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3487
3488        let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3489        let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3490
3491        let to_write =
3492            RecordBatch::try_new(arrow_schema.clone(), vec![name_data, age_data]).unwrap();
3493
3494        let props = WriterProperties::builder()
3495            .set_compression(Compression::SNAPPY)
3496            .build();
3497
3498        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3499        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3500
3501        writer.write(&to_write).expect("Writing batch");
3502        writer.close().unwrap();
3503
3504        let reader = ArrowReaderBuilder::new(file_io).build();
3505
3506        let tasks = Box::pin(futures::stream::iter(
3507            vec![Ok(FileScanTask {
3508                file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3509                    .unwrap()
3510                    .len(),
3511                start: 0,
3512                length: 0,
3513                record_count: None,
3514                data_file_path: format!("{table_location}/1.parquet"),
3515                data_file_format: DataFileFormat::Parquet,
3516                schema: schema.clone(),
3517                project_field_ids: vec![1, 2, 3],
3518                predicate: None,
3519                deletes: vec![],
3520                partition: None,
3521                partition_spec: None,
3522                name_mapping: None,
3523                case_sensitive: false,
3524            })]
3525            .into_iter(),
3526        )) as FileScanTaskStream;
3527
3528        let result = reader
3529            .read(tasks)
3530            .unwrap()
3531            .try_collect::<Vec<RecordBatch>>()
3532            .await
3533            .unwrap();
3534
3535        assert_eq!(result.len(), 1);
3536        let batch = &result[0];
3537        assert_eq!(batch.num_rows(), 2);
3538        assert_eq!(batch.num_columns(), 3);
3539
3540        let name_array = batch.column(0).as_string::<i32>();
3541        assert_eq!(name_array.value(0), "Alice");
3542        assert_eq!(name_array.value(1), "Bob");
3543
3544        let age_array = batch
3545            .column(1)
3546            .as_primitive::<arrow_array::types::Int32Type>();
3547        assert_eq!(age_array.value(0), 30);
3548        assert_eq!(age_array.value(1), 25);
3549
3550        // Verify missing column filled with NULLs
3551        let city_array = batch.column(2).as_string::<i32>();
3552        assert_eq!(city_array.null_count(), 2);
3553        assert!(city_array.is_null(0));
3554        assert!(city_array.is_null(1));
3555    }
3556
3557    /// Test reading Parquet files without field IDs that have multiple row groups.
3558    /// This ensures the position-based fallback works correctly across row group boundaries.
3559    #[tokio::test]
3560    async fn test_read_parquet_without_field_ids_multiple_row_groups() {
3561        use arrow_array::Int32Array;
3562
3563        let schema = Arc::new(
3564            Schema::builder()
3565                .with_schema_id(1)
3566                .with_fields(vec![
3567                    NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3568                    NestedField::required(2, "value", Type::Primitive(PrimitiveType::Int)).into(),
3569                ])
3570                .build()
3571                .unwrap(),
3572        );
3573
3574        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3575            Field::new("name", DataType::Utf8, false),
3576            Field::new("value", DataType::Int32, false),
3577        ]));
3578
3579        let tmp_dir = TempDir::new().unwrap();
3580        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3581        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3582
3583        // Small row group size to create multiple row groups
3584        let props = WriterProperties::builder()
3585            .set_compression(Compression::SNAPPY)
3586            .set_write_batch_size(2)
3587            .set_max_row_group_size(2)
3588            .build();
3589
3590        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3591        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3592
3593        // Write 6 rows in 3 batches (will create 3 row groups)
3594        for batch_num in 0..3 {
3595            let name_data = Arc::new(StringArray::from(vec![
3596                format!("name_{}", batch_num * 2),
3597                format!("name_{}", batch_num * 2 + 1),
3598            ])) as ArrayRef;
3599            let value_data =
3600                Arc::new(Int32Array::from(vec![batch_num * 2, batch_num * 2 + 1])) as ArrayRef;
3601
3602            let batch =
3603                RecordBatch::try_new(arrow_schema.clone(), vec![name_data, value_data]).unwrap();
3604            writer.write(&batch).expect("Writing batch");
3605        }
3606        writer.close().unwrap();
3607
3608        let reader = ArrowReaderBuilder::new(file_io).build();
3609
3610        let tasks = Box::pin(futures::stream::iter(
3611            vec![Ok(FileScanTask {
3612                file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3613                    .unwrap()
3614                    .len(),
3615                start: 0,
3616                length: 0,
3617                record_count: None,
3618                data_file_path: format!("{table_location}/1.parquet"),
3619                data_file_format: DataFileFormat::Parquet,
3620                schema: schema.clone(),
3621                project_field_ids: vec![1, 2],
3622                predicate: None,
3623                deletes: vec![],
3624                partition: None,
3625                partition_spec: None,
3626                name_mapping: None,
3627                case_sensitive: false,
3628            })]
3629            .into_iter(),
3630        )) as FileScanTaskStream;
3631
3632        let result = reader
3633            .read(tasks)
3634            .unwrap()
3635            .try_collect::<Vec<RecordBatch>>()
3636            .await
3637            .unwrap();
3638
3639        assert!(!result.is_empty());
3640
3641        let mut all_names = Vec::new();
3642        let mut all_values = Vec::new();
3643
3644        for batch in &result {
3645            let name_array = batch.column(0).as_string::<i32>();
3646            let value_array = batch
3647                .column(1)
3648                .as_primitive::<arrow_array::types::Int32Type>();
3649
3650            for i in 0..batch.num_rows() {
3651                all_names.push(name_array.value(i).to_string());
3652                all_values.push(value_array.value(i));
3653            }
3654        }
3655
3656        assert_eq!(all_names.len(), 6);
3657        assert_eq!(all_values.len(), 6);
3658
3659        for i in 0..6 {
3660            assert_eq!(all_names[i], format!("name_{i}"));
3661            assert_eq!(all_values[i], i as i32);
3662        }
3663    }
3664
3665    /// Test reading Parquet files without field IDs with nested types (struct).
3666    /// Java's pruneColumnsFallback() projects entire top-level columns including nested content.
3667    /// This test verifies that a top-level struct field is projected correctly with all its nested fields.
3668    #[tokio::test]
3669    async fn test_read_parquet_without_field_ids_with_struct() {
3670        use arrow_array::{Int32Array, StructArray};
3671        use arrow_schema::Fields;
3672
3673        let schema = Arc::new(
3674            Schema::builder()
3675                .with_schema_id(1)
3676                .with_fields(vec![
3677                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3678                    NestedField::required(
3679                        2,
3680                        "person",
3681                        Type::Struct(crate::spec::StructType::new(vec![
3682                            NestedField::required(
3683                                3,
3684                                "name",
3685                                Type::Primitive(PrimitiveType::String),
3686                            )
3687                            .into(),
3688                            NestedField::required(4, "age", Type::Primitive(PrimitiveType::Int))
3689                                .into(),
3690                        ])),
3691                    )
3692                    .into(),
3693                ])
3694                .build()
3695                .unwrap(),
3696        );
3697
3698        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3699            Field::new("id", DataType::Int32, false),
3700            Field::new(
3701                "person",
3702                DataType::Struct(Fields::from(vec![
3703                    Field::new("name", DataType::Utf8, false),
3704                    Field::new("age", DataType::Int32, false),
3705                ])),
3706                false,
3707            ),
3708        ]));
3709
3710        let tmp_dir = TempDir::new().unwrap();
3711        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3712        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3713
3714        let id_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3715        let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3716        let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3717        let person_data = Arc::new(StructArray::from(vec![
3718            (
3719                Arc::new(Field::new("name", DataType::Utf8, false)),
3720                name_data,
3721            ),
3722            (
3723                Arc::new(Field::new("age", DataType::Int32, false)),
3724                age_data,
3725            ),
3726        ])) as ArrayRef;
3727
3728        let to_write =
3729            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, person_data]).unwrap();
3730
3731        let props = WriterProperties::builder()
3732            .set_compression(Compression::SNAPPY)
3733            .build();
3734
3735        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3736        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3737
3738        writer.write(&to_write).expect("Writing batch");
3739        writer.close().unwrap();
3740
3741        let reader = ArrowReaderBuilder::new(file_io).build();
3742
3743        let tasks = Box::pin(futures::stream::iter(
3744            vec![Ok(FileScanTask {
3745                file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3746                    .unwrap()
3747                    .len(),
3748                start: 0,
3749                length: 0,
3750                record_count: None,
3751                data_file_path: format!("{table_location}/1.parquet"),
3752                data_file_format: DataFileFormat::Parquet,
3753                schema: schema.clone(),
3754                project_field_ids: vec![1, 2],
3755                predicate: None,
3756                deletes: vec![],
3757                partition: None,
3758                partition_spec: None,
3759                name_mapping: None,
3760                case_sensitive: false,
3761            })]
3762            .into_iter(),
3763        )) as FileScanTaskStream;
3764
3765        let result = reader
3766            .read(tasks)
3767            .unwrap()
3768            .try_collect::<Vec<RecordBatch>>()
3769            .await
3770            .unwrap();
3771
3772        assert_eq!(result.len(), 1);
3773        let batch = &result[0];
3774        assert_eq!(batch.num_rows(), 2);
3775        assert_eq!(batch.num_columns(), 2);
3776
3777        let id_array = batch
3778            .column(0)
3779            .as_primitive::<arrow_array::types::Int32Type>();
3780        assert_eq!(id_array.value(0), 1);
3781        assert_eq!(id_array.value(1), 2);
3782
3783        let person_array = batch.column(1).as_struct();
3784        assert_eq!(person_array.num_columns(), 2);
3785
3786        let name_array = person_array.column(0).as_string::<i32>();
3787        assert_eq!(name_array.value(0), "Alice");
3788        assert_eq!(name_array.value(1), "Bob");
3789
3790        let age_array = person_array
3791            .column(1)
3792            .as_primitive::<arrow_array::types::Int32Type>();
3793        assert_eq!(age_array.value(0), 30);
3794        assert_eq!(age_array.value(1), 25);
3795    }
3796
3797    /// Test reading Parquet files without field IDs with schema evolution - column added in the middle.
3798    /// When a new column is inserted between existing columns in the schema order,
3799    /// the fallback projection must correctly map field IDs to output positions.
3800    #[tokio::test]
3801    async fn test_read_parquet_without_field_ids_schema_evolution_add_column_in_middle() {
3802        use arrow_array::{Array, Int32Array};
3803
3804        let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
3805            Field::new("col0", DataType::Int32, true),
3806            Field::new("col1", DataType::Int32, true),
3807        ]));
3808
3809        // New column added between existing columns: col0 (id=1), newCol (id=5), col1 (id=2)
3810        let schema = Arc::new(
3811            Schema::builder()
3812                .with_schema_id(1)
3813                .with_fields(vec![
3814                    NestedField::optional(1, "col0", Type::Primitive(PrimitiveType::Int)).into(),
3815                    NestedField::optional(5, "newCol", Type::Primitive(PrimitiveType::Int)).into(),
3816                    NestedField::optional(2, "col1", Type::Primitive(PrimitiveType::Int)).into(),
3817                ])
3818                .build()
3819                .unwrap(),
3820        );
3821
3822        let tmp_dir = TempDir::new().unwrap();
3823        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3824        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3825
3826        let col0_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3827        let col1_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3828
3829        let to_write =
3830            RecordBatch::try_new(arrow_schema_old.clone(), vec![col0_data, col1_data]).unwrap();
3831
3832        let props = WriterProperties::builder()
3833            .set_compression(Compression::SNAPPY)
3834            .build();
3835
3836        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3837        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3838        writer.write(&to_write).expect("Writing batch");
3839        writer.close().unwrap();
3840
3841        let reader = ArrowReaderBuilder::new(file_io).build();
3842
3843        let tasks = Box::pin(futures::stream::iter(
3844            vec![Ok(FileScanTask {
3845                file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3846                    .unwrap()
3847                    .len(),
3848                start: 0,
3849                length: 0,
3850                record_count: None,
3851                data_file_path: format!("{table_location}/1.parquet"),
3852                data_file_format: DataFileFormat::Parquet,
3853                schema: schema.clone(),
3854                project_field_ids: vec![1, 5, 2],
3855                predicate: None,
3856                deletes: vec![],
3857                partition: None,
3858                partition_spec: None,
3859                name_mapping: None,
3860                case_sensitive: false,
3861            })]
3862            .into_iter(),
3863        )) as FileScanTaskStream;
3864
3865        let result = reader
3866            .read(tasks)
3867            .unwrap()
3868            .try_collect::<Vec<RecordBatch>>()
3869            .await
3870            .unwrap();
3871
3872        assert_eq!(result.len(), 1);
3873        let batch = &result[0];
3874        assert_eq!(batch.num_rows(), 2);
3875        assert_eq!(batch.num_columns(), 3);
3876
3877        let result_col0 = batch
3878            .column(0)
3879            .as_primitive::<arrow_array::types::Int32Type>();
3880        assert_eq!(result_col0.value(0), 1);
3881        assert_eq!(result_col0.value(1), 2);
3882
3883        // New column should be NULL (doesn't exist in old file)
3884        let result_newcol = batch
3885            .column(1)
3886            .as_primitive::<arrow_array::types::Int32Type>();
3887        assert_eq!(result_newcol.null_count(), 2);
3888        assert!(result_newcol.is_null(0));
3889        assert!(result_newcol.is_null(1));
3890
3891        let result_col1 = batch
3892            .column(2)
3893            .as_primitive::<arrow_array::types::Int32Type>();
3894        assert_eq!(result_col1.value(0), 10);
3895        assert_eq!(result_col1.value(1), 20);
3896    }
3897
3898    /// Test reading Parquet files without field IDs with a filter that eliminates all row groups.
3899    /// During development of field ID mapping, we saw a panic when row_selection_enabled=true and
3900    /// all row groups are filtered out.
3901    #[tokio::test]
3902    async fn test_read_parquet_without_field_ids_filter_eliminates_all_rows() {
3903        use arrow_array::{Float64Array, Int32Array};
3904
3905        // Schema with fields that will use fallback IDs 1, 2, 3
3906        let schema = Arc::new(
3907            Schema::builder()
3908                .with_schema_id(1)
3909                .with_fields(vec![
3910                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3911                    NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3912                    NestedField::required(3, "value", Type::Primitive(PrimitiveType::Double))
3913                        .into(),
3914                ])
3915                .build()
3916                .unwrap(),
3917        );
3918
3919        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3920            Field::new("id", DataType::Int32, false),
3921            Field::new("name", DataType::Utf8, false),
3922            Field::new("value", DataType::Float64, false),
3923        ]));
3924
3925        let tmp_dir = TempDir::new().unwrap();
3926        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3927        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3928
3929        // Write data where all ids are >= 10
3930        let id_data = Arc::new(Int32Array::from(vec![10, 11, 12])) as ArrayRef;
3931        let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
3932        let value_data = Arc::new(Float64Array::from(vec![100.0, 200.0, 300.0])) as ArrayRef;
3933
3934        let to_write =
3935            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data, value_data])
3936                .unwrap();
3937
3938        let props = WriterProperties::builder()
3939            .set_compression(Compression::SNAPPY)
3940            .build();
3941
3942        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3943        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3944        writer.write(&to_write).expect("Writing batch");
3945        writer.close().unwrap();
3946
3947        // Filter that eliminates all row groups: id < 5
3948        let predicate = Reference::new("id").less_than(Datum::int(5));
3949
3950        // Enable both row_group_filtering and row_selection - triggered the panic
3951        let reader = ArrowReaderBuilder::new(file_io)
3952            .with_row_group_filtering_enabled(true)
3953            .with_row_selection_enabled(true)
3954            .build();
3955
3956        let tasks = Box::pin(futures::stream::iter(
3957            vec![Ok(FileScanTask {
3958                file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3959                    .unwrap()
3960                    .len(),
3961                start: 0,
3962                length: 0,
3963                record_count: None,
3964                data_file_path: format!("{table_location}/1.parquet"),
3965                data_file_format: DataFileFormat::Parquet,
3966                schema: schema.clone(),
3967                project_field_ids: vec![1, 2, 3],
3968                predicate: Some(predicate.bind(schema, true).unwrap()),
3969                deletes: vec![],
3970                partition: None,
3971                partition_spec: None,
3972                name_mapping: None,
3973                case_sensitive: false,
3974            })]
3975            .into_iter(),
3976        )) as FileScanTaskStream;
3977
3978        // Should no longer panic
3979        let result = reader
3980            .read(tasks)
3981            .unwrap()
3982            .try_collect::<Vec<RecordBatch>>()
3983            .await
3984            .unwrap();
3985
3986        // Should return empty results
3987        assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0));
3988    }
3989
3990    /// Test that concurrency=1 reads all files correctly and in deterministic order.
3991    /// This verifies the fast-path optimization for single concurrency.
3992    #[tokio::test]
3993    async fn test_read_with_concurrency_one() {
3994        use arrow_array::Int32Array;
3995
3996        let schema = Arc::new(
3997            Schema::builder()
3998                .with_schema_id(1)
3999                .with_fields(vec![
4000                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
4001                    NestedField::required(2, "file_num", Type::Primitive(PrimitiveType::Int))
4002                        .into(),
4003                ])
4004                .build()
4005                .unwrap(),
4006        );
4007
4008        let arrow_schema = Arc::new(ArrowSchema::new(vec![
4009            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
4010                PARQUET_FIELD_ID_META_KEY.to_string(),
4011                "1".to_string(),
4012            )])),
4013            Field::new("file_num", DataType::Int32, false).with_metadata(HashMap::from([(
4014                PARQUET_FIELD_ID_META_KEY.to_string(),
4015                "2".to_string(),
4016            )])),
4017        ]));
4018
4019        let tmp_dir = TempDir::new().unwrap();
4020        let table_location = tmp_dir.path().to_str().unwrap().to_string();
4021        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
4022
4023        // Create 3 parquet files with different data
4024        let props = WriterProperties::builder()
4025            .set_compression(Compression::SNAPPY)
4026            .build();
4027
4028        for file_num in 0..3 {
4029            let id_data = Arc::new(Int32Array::from_iter_values(
4030                file_num * 10..(file_num + 1) * 10,
4031            )) as ArrayRef;
4032            let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10])) as ArrayRef;
4033
4034            let to_write =
4035                RecordBatch::try_new(arrow_schema.clone(), vec![id_data, file_num_data]).unwrap();
4036
4037            let file = File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap();
4038            let mut writer =
4039                ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
4040            writer.write(&to_write).expect("Writing batch");
4041            writer.close().unwrap();
4042        }
4043
4044        // Read with concurrency=1 (fast-path)
4045        let reader = ArrowReaderBuilder::new(file_io)
4046            .with_data_file_concurrency_limit(1)
4047            .build();
4048
4049        // Create tasks in a specific order: file_0, file_1, file_2
4050        let tasks = vec![
4051            Ok(FileScanTask {
4052                file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_0.parquet"))
4053                    .unwrap()
4054                    .len(),
4055                start: 0,
4056                length: 0,
4057                record_count: None,
4058                data_file_path: format!("{table_location}/file_0.parquet"),
4059                data_file_format: DataFileFormat::Parquet,
4060                schema: schema.clone(),
4061                project_field_ids: vec![1, 2],
4062                predicate: None,
4063                deletes: vec![],
4064                partition: None,
4065                partition_spec: None,
4066                name_mapping: None,
4067                case_sensitive: false,
4068            }),
4069            Ok(FileScanTask {
4070                file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_1.parquet"))
4071                    .unwrap()
4072                    .len(),
4073                start: 0,
4074                length: 0,
4075                record_count: None,
4076                data_file_path: format!("{table_location}/file_1.parquet"),
4077                data_file_format: DataFileFormat::Parquet,
4078                schema: schema.clone(),
4079                project_field_ids: vec![1, 2],
4080                predicate: None,
4081                deletes: vec![],
4082                partition: None,
4083                partition_spec: None,
4084                name_mapping: None,
4085                case_sensitive: false,
4086            }),
4087            Ok(FileScanTask {
4088                file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_2.parquet"))
4089                    .unwrap()
4090                    .len(),
4091                start: 0,
4092                length: 0,
4093                record_count: None,
4094                data_file_path: format!("{table_location}/file_2.parquet"),
4095                data_file_format: DataFileFormat::Parquet,
4096                schema: schema.clone(),
4097                project_field_ids: vec![1, 2],
4098                predicate: None,
4099                deletes: vec![],
4100                partition: None,
4101                partition_spec: None,
4102                name_mapping: None,
4103                case_sensitive: false,
4104            }),
4105        ];
4106
4107        let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream;
4108
4109        let result = reader
4110            .read(tasks_stream)
4111            .unwrap()
4112            .try_collect::<Vec<RecordBatch>>()
4113            .await
4114            .unwrap();
4115
4116        // Verify we got all 30 rows (10 from each file)
4117        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
4118        assert_eq!(total_rows, 30, "Should have 30 total rows");
4119
4120        // Collect all ids and file_nums to verify data
4121        let mut all_ids = Vec::new();
4122        let mut all_file_nums = Vec::new();
4123
4124        for batch in &result {
4125            let id_col = batch
4126                .column(0)
4127                .as_primitive::<arrow_array::types::Int32Type>();
4128            let file_num_col = batch
4129                .column(1)
4130                .as_primitive::<arrow_array::types::Int32Type>();
4131
4132            for i in 0..batch.num_rows() {
4133                all_ids.push(id_col.value(i));
4134                all_file_nums.push(file_num_col.value(i));
4135            }
4136        }
4137
4138        assert_eq!(all_ids.len(), 30);
4139        assert_eq!(all_file_nums.len(), 30);
4140
4141        // With concurrency=1 and sequential processing, files should be processed in order
4142        // file_0: ids 0-9, file_num=0
4143        // file_1: ids 10-19, file_num=1
4144        // file_2: ids 20-29, file_num=2
4145        for i in 0..10 {
4146            assert_eq!(all_file_nums[i], 0, "First 10 rows should be from file_0");
4147            assert_eq!(all_ids[i], i as i32, "IDs should be 0-9");
4148        }
4149        for i in 10..20 {
4150            assert_eq!(all_file_nums[i], 1, "Next 10 rows should be from file_1");
4151            assert_eq!(all_ids[i], i as i32, "IDs should be 10-19");
4152        }
4153        for i in 20..30 {
4154            assert_eq!(all_file_nums[i], 2, "Last 10 rows should be from file_2");
4155            assert_eq!(all_ids[i], i as i32, "IDs should be 20-29");
4156        }
4157    }
4158
4159    /// Test bucket partitioning reads source column from data file (not partition metadata).
4160    ///
4161    /// This is an integration test verifying the complete ArrowReader pipeline with bucket partitioning.
4162    /// It corresponds to TestRuntimeFiltering tests in Iceberg Java (e.g., testRenamedSourceColumnTable).
4163    ///
4164    /// # Iceberg Spec Requirements
4165    ///
4166    /// Per the Iceberg spec "Column Projection" section:
4167    /// > "Return the value from partition metadata if an **Identity Transform** exists for the field"
4168    ///
4169    /// This means:
4170    /// - Identity transforms (e.g., `identity(dept)`) use constants from partition metadata
4171    /// - Non-identity transforms (e.g., `bucket(4, id)`) must read source columns from data files
4172    /// - Partition metadata for bucket transforms stores bucket numbers (0-3), NOT source values
4173    ///
4174    /// Java's PartitionUtil.constantsMap() implements this via:
4175    /// ```java
4176    /// if (field.transform().isIdentity()) {
4177    ///     idToConstant.put(field.sourceId(), converted);
4178    /// }
4179    /// ```
4180    ///
4181    /// # What This Test Verifies
4182    ///
4183    /// This test ensures the full ArrowReader → RecordBatchTransformer pipeline correctly handles
4184    /// bucket partitioning when FileScanTask provides partition_spec and partition_data:
4185    ///
4186    /// - Parquet file has field_id=1 named "id" with actual data [1, 5, 9, 13]
4187    /// - FileScanTask specifies partition_spec with bucket(4, id) and partition_data with bucket=1
4188    /// - RecordBatchTransformer.constants_map() excludes bucket-partitioned field from constants
4189    /// - ArrowReader correctly reads [1, 5, 9, 13] from the data file
4190    /// - Values are NOT replaced with constant 1 from partition metadata
4191    ///
4192    /// # Why This Matters
4193    ///
4194    /// Without correct handling:
4195    /// - Runtime filtering would break (e.g., `WHERE id = 5` would fail)
4196    /// - Query results would be incorrect (all rows would have id=1)
4197    /// - Bucket partitioning would be unusable for query optimization
4198    ///
4199    /// # References
4200    /// - Iceberg spec: format/spec.md "Column Projection" + "Partition Transforms"
4201    /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java
4202    /// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
4203    #[tokio::test]
4204    async fn test_bucket_partitioning_reads_source_column_from_file() {
4205        use arrow_array::Int32Array;
4206
4207        use crate::spec::{Literal, PartitionSpec, Struct, Transform};
4208
4209        // Iceberg schema with id and name columns
4210        let schema = Arc::new(
4211            Schema::builder()
4212                .with_schema_id(0)
4213                .with_fields(vec![
4214                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
4215                    NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
4216                ])
4217                .build()
4218                .unwrap(),
4219        );
4220
4221        // Partition spec: bucket(4, id)
4222        let partition_spec = Arc::new(
4223            PartitionSpec::builder(schema.clone())
4224                .with_spec_id(0)
4225                .add_partition_field("id", "id_bucket", Transform::Bucket(4))
4226                .unwrap()
4227                .build()
4228                .unwrap(),
4229        );
4230
4231        // Partition data: bucket value is 1
4232        let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]);
4233
4234        // Create Arrow schema with field IDs for Parquet file
4235        let arrow_schema = Arc::new(ArrowSchema::new(vec![
4236            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
4237                PARQUET_FIELD_ID_META_KEY.to_string(),
4238                "1".to_string(),
4239            )])),
4240            Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
4241                PARQUET_FIELD_ID_META_KEY.to_string(),
4242                "2".to_string(),
4243            )])),
4244        ]));
4245
4246        // Write Parquet file with data
4247        let tmp_dir = TempDir::new().unwrap();
4248        let table_location = tmp_dir.path().to_str().unwrap().to_string();
4249        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
4250
4251        let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as ArrayRef;
4252        let name_data =
4253            Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef;
4254
4255        let to_write =
4256            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data]).unwrap();
4257
4258        let props = WriterProperties::builder()
4259            .set_compression(Compression::SNAPPY)
4260            .build();
4261        let file = File::create(format!("{}/data.parquet", &table_location)).unwrap();
4262        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
4263        writer.write(&to_write).expect("Writing batch");
4264        writer.close().unwrap();
4265
4266        // Read the Parquet file with partition spec and data
4267        let reader = ArrowReaderBuilder::new(file_io).build();
4268        let tasks = Box::pin(futures::stream::iter(
4269            vec![Ok(FileScanTask {
4270                file_size_in_bytes: std::fs::metadata(format!("{table_location}/data.parquet"))
4271                    .unwrap()
4272                    .len(),
4273                start: 0,
4274                length: 0,
4275                record_count: None,
4276                data_file_path: format!("{table_location}/data.parquet"),
4277                data_file_format: DataFileFormat::Parquet,
4278                schema: schema.clone(),
4279                project_field_ids: vec![1, 2],
4280                predicate: None,
4281                deletes: vec![],
4282                partition: Some(partition_data),
4283                partition_spec: Some(partition_spec),
4284                name_mapping: None,
4285                case_sensitive: false,
4286            })]
4287            .into_iter(),
4288        )) as FileScanTaskStream;
4289
4290        let result = reader
4291            .read(tasks)
4292            .unwrap()
4293            .try_collect::<Vec<RecordBatch>>()
4294            .await
4295            .unwrap();
4296
4297        // Verify we got the correct data
4298        assert_eq!(result.len(), 1);
4299        let batch = &result[0];
4300
4301        assert_eq!(batch.num_columns(), 2);
4302        assert_eq!(batch.num_rows(), 4);
4303
4304        // The id column MUST contain actual values from the Parquet file [1, 5, 9, 13],
4305        // NOT the constant partition value 1
4306        let id_col = batch
4307            .column(0)
4308            .as_primitive::<arrow_array::types::Int32Type>();
4309        assert_eq!(id_col.value(0), 1);
4310        assert_eq!(id_col.value(1), 5);
4311        assert_eq!(id_col.value(2), 9);
4312        assert_eq!(id_col.value(3), 13);
4313
4314        let name_col = batch.column(1).as_string::<i32>();
4315        assert_eq!(name_col.value(0), "Alice");
4316        assert_eq!(name_col.value(1), "Bob");
4317        assert_eq!(name_col.value(2), "Charlie");
4318        assert_eq!(name_col.value(3), "Dave");
4319    }
4320}