iceberg/arrow/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Parquet file data reader
19
20use std::collections::{HashMap, HashSet};
21use std::ops::Range;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene};
26use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar};
27use arrow_cast::cast::cast;
28use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
29use arrow_schema::{
30    ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
31};
32use arrow_string::like::starts_with;
33use bytes::Bytes;
34use fnv::FnvHashSet;
35use futures::future::BoxFuture;
36use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join};
37use parquet::arrow::arrow_reader::{
38    ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
39};
40use parquet::arrow::async_reader::AsyncFileReader;
41use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask};
42use parquet::file::metadata::{
43    PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
44};
45use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
46
47use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
48use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
49use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
50use crate::delete_vector::DeleteVector;
51use crate::error::Result;
52use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit};
53use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
54use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
55use crate::expr::{BoundPredicate, BoundReference};
56use crate::io::{FileIO, FileMetadata, FileRead};
57use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
58use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
59use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type};
60use crate::utils::available_parallelism;
61use crate::{Error, ErrorKind};
62
63/// Builder to create ArrowReader
64pub struct ArrowReaderBuilder {
65    batch_size: Option<usize>,
66    file_io: FileIO,
67    concurrency_limit_data_files: usize,
68    row_group_filtering_enabled: bool,
69    row_selection_enabled: bool,
70}
71
72impl ArrowReaderBuilder {
73    /// Create a new ArrowReaderBuilder
74    pub fn new(file_io: FileIO) -> Self {
75        let num_cpus = available_parallelism().get();
76
77        ArrowReaderBuilder {
78            batch_size: None,
79            file_io,
80            concurrency_limit_data_files: num_cpus,
81            row_group_filtering_enabled: true,
82            row_selection_enabled: false,
83        }
84    }
85
86    /// Sets the max number of in flight data files that are being fetched
87    pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
88        self.concurrency_limit_data_files = val;
89        self
90    }
91
92    /// Sets the desired size of batches in the response
93    /// to something other than the default
94    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
95        self.batch_size = Some(batch_size);
96        self
97    }
98
99    /// Determines whether to enable row group filtering.
100    pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
101        self.row_group_filtering_enabled = row_group_filtering_enabled;
102        self
103    }
104
105    /// Determines whether to enable row selection.
106    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
107        self.row_selection_enabled = row_selection_enabled;
108        self
109    }
110
111    /// Build the ArrowReader.
112    pub fn build(self) -> ArrowReader {
113        ArrowReader {
114            batch_size: self.batch_size,
115            file_io: self.file_io.clone(),
116            delete_file_loader: CachingDeleteFileLoader::new(
117                self.file_io.clone(),
118                self.concurrency_limit_data_files,
119            ),
120            concurrency_limit_data_files: self.concurrency_limit_data_files,
121            row_group_filtering_enabled: self.row_group_filtering_enabled,
122            row_selection_enabled: self.row_selection_enabled,
123        }
124    }
125}
126
127/// Reads data from Parquet files
128#[derive(Clone)]
129pub struct ArrowReader {
130    batch_size: Option<usize>,
131    file_io: FileIO,
132    delete_file_loader: CachingDeleteFileLoader,
133
134    /// the maximum number of data files that can be fetched at the same time
135    concurrency_limit_data_files: usize,
136
137    row_group_filtering_enabled: bool,
138    row_selection_enabled: bool,
139}
140
141impl ArrowReader {
142    /// Take a stream of FileScanTasks and reads all the files.
143    /// Returns a stream of Arrow RecordBatches containing the data from the files
144    pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
145        let file_io = self.file_io.clone();
146        let batch_size = self.batch_size;
147        let concurrency_limit_data_files = self.concurrency_limit_data_files;
148        let row_group_filtering_enabled = self.row_group_filtering_enabled;
149        let row_selection_enabled = self.row_selection_enabled;
150
151        let stream = tasks
152            .map_ok(move |task| {
153                let file_io = file_io.clone();
154
155                Self::process_file_scan_task(
156                    task,
157                    batch_size,
158                    file_io,
159                    self.delete_file_loader.clone(),
160                    row_group_filtering_enabled,
161                    row_selection_enabled,
162                )
163            })
164            .map_err(|err| {
165                Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err)
166            })
167            .try_buffer_unordered(concurrency_limit_data_files)
168            .try_flatten_unordered(concurrency_limit_data_files);
169
170        Ok(Box::pin(stream) as ArrowRecordBatchStream)
171    }
172
173    #[allow(clippy::too_many_arguments)]
174    async fn process_file_scan_task(
175        task: FileScanTask,
176        batch_size: Option<usize>,
177        file_io: FileIO,
178        delete_file_loader: CachingDeleteFileLoader,
179        row_group_filtering_enabled: bool,
180        row_selection_enabled: bool,
181    ) -> Result<ArrowRecordBatchStream> {
182        let should_load_page_index =
183            (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
184
185        let delete_filter_rx =
186            delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema));
187
188        // Migrated tables lack field IDs, requiring us to inspect the schema to choose
189        // between field-ID-based or position-based projection
190        let initial_stream_builder = Self::create_parquet_record_batch_stream_builder(
191            &task.data_file_path,
192            file_io.clone(),
193            should_load_page_index,
194            None,
195        )
196        .await?;
197
198        // Check if Parquet file has embedded field IDs
199        // Corresponds to Java's ParquetSchemaUtil.hasIds()
200        // Reference: parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java:118
201        let missing_field_ids = initial_stream_builder
202            .schema()
203            .fields()
204            .iter()
205            .next()
206            .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
207
208        // Three-branch schema resolution strategy matching Java's ReadConf constructor
209        //
210        // Per Iceberg spec Column Projection rules:
211        // "Columns in Iceberg data files are selected by field id. The table schema's column
212        //  names and order may change after a data file is written, and projection must be done
213        //  using field ids."
214        // https://iceberg.apache.org/spec/#column-projection
215        //
216        // When Parquet files lack field IDs (e.g., Hive/Spark migrations via add_files),
217        // we must assign field IDs BEFORE reading data to enable correct projection.
218        //
219        // Java's ReadConf determines field ID strategy:
220        // - Branch 1: hasIds(fileSchema) → trust embedded field IDs, use pruneColumns()
221        // - Branch 2: nameMapping present → applyNameMapping(), then pruneColumns()
222        // - Branch 3: fallback → addFallbackIds(), then pruneColumnsFallback()
223        let mut record_batch_stream_builder = if missing_field_ids {
224            // Parquet file lacks field IDs - must assign them before reading
225            let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
226                // Branch 2: Apply name mapping to assign correct Iceberg field IDs
227                // Per spec rule #2: "Use schema.name-mapping.default metadata to map field id
228                // to columns without field id"
229                // Corresponds to Java's ParquetSchemaUtil.applyNameMapping()
230                apply_name_mapping_to_arrow_schema(
231                    Arc::clone(initial_stream_builder.schema()),
232                    name_mapping,
233                )?
234            } else {
235                // Branch 3: No name mapping - use position-based fallback IDs
236                // Corresponds to Java's ParquetSchemaUtil.addFallbackIds()
237                add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema())
238            };
239
240            let options = ArrowReaderOptions::new().with_schema(arrow_schema);
241
242            Self::create_parquet_record_batch_stream_builder(
243                &task.data_file_path,
244                file_io.clone(),
245                should_load_page_index,
246                Some(options),
247            )
248            .await?
249        } else {
250            // Branch 1: File has embedded field IDs - trust them
251            initial_stream_builder
252        };
253
254        // Filter out metadata fields for Parquet projection (they don't exist in files)
255        let project_field_ids_without_metadata: Vec<i32> = task
256            .project_field_ids
257            .iter()
258            .filter(|&&id| !is_metadata_field(id))
259            .copied()
260            .collect();
261
262        // Create projection mask based on field IDs
263        // - If file has embedded IDs: field-ID-based projection (missing_field_ids=false)
264        // - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match)
265        // - If fallback IDs: position-based projection (missing_field_ids=true)
266        let projection_mask = Self::get_arrow_projection_mask(
267            &project_field_ids_without_metadata,
268            &task.schema,
269            record_batch_stream_builder.parquet_schema(),
270            record_batch_stream_builder.schema(),
271            missing_field_ids, // Whether to use position-based (true) or field-ID-based (false) projection
272        )?;
273
274        record_batch_stream_builder =
275            record_batch_stream_builder.with_projection(projection_mask.clone());
276
277        // RecordBatchTransformer performs any transformations required on the RecordBatches
278        // that come back from the file, such as type promotion, default column insertion,
279        // column re-ordering, partition constants, and virtual field addition (like _file)
280        let mut record_batch_transformer_builder =
281            RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
282
283        // Add the _file metadata column if it's in the projected fields
284        if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
285            let file_datum = Datum::string(task.data_file_path.clone());
286            record_batch_transformer_builder =
287                record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum);
288        }
289
290        if let (Some(partition_spec), Some(partition_data)) =
291            (task.partition_spec.clone(), task.partition.clone())
292        {
293            record_batch_transformer_builder =
294                record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
295        }
296
297        let mut record_batch_transformer = record_batch_transformer_builder.build();
298
299        if let Some(batch_size) = batch_size {
300            record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
301        }
302
303        let delete_filter = delete_filter_rx.await.unwrap()?;
304        let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
305
306        // In addition to the optional predicate supplied in the `FileScanTask`,
307        // we also have an optional predicate resulting from equality delete files.
308        // If both are present, we logical-AND them together to form a single filter
309        // predicate that we can pass to the `RecordBatchStreamBuilder`.
310        let final_predicate = match (&task.predicate, delete_predicate) {
311            (None, None) => None,
312            (Some(predicate), None) => Some(predicate.clone()),
313            (None, Some(ref predicate)) => Some(predicate.clone()),
314            (Some(filter_predicate), Some(delete_predicate)) => {
315                Some(filter_predicate.clone().and(delete_predicate))
316            }
317        };
318
319        // There are three possible sources for potential lists of selected RowGroup indices,
320        // and two for `RowSelection`s.
321        // Selected RowGroup index lists can come from three sources:
322        //   * When task.start and task.length specify a byte range (file splitting);
323        //   * When there are equality delete files that are applicable;
324        //   * When there is a scan predicate and row_group_filtering_enabled = true.
325        // `RowSelection`s can be created in either or both of the following cases:
326        //   * When there are positional delete files that are applicable;
327        //   * When there is a scan predicate and row_selection_enabled = true
328        // Note that row group filtering from predicates only happens when
329        // there is a scan predicate AND row_group_filtering_enabled = true,
330        // but we perform row selection filtering if there are applicable
331        // equality delete files OR (there is a scan predicate AND row_selection_enabled),
332        // since the only implemented method of applying positional deletes is
333        // by using a `RowSelection`.
334        let mut selected_row_group_indices = None;
335        let mut row_selection = None;
336
337        // Filter row groups based on byte range from task.start and task.length.
338        // If both start and length are 0, read the entire file (backwards compatibility).
339        if task.start != 0 || task.length != 0 {
340            let byte_range_filtered_row_groups = Self::filter_row_groups_by_byte_range(
341                record_batch_stream_builder.metadata(),
342                task.start,
343                task.length,
344            )?;
345            selected_row_group_indices = Some(byte_range_filtered_row_groups);
346        }
347
348        if let Some(predicate) = final_predicate {
349            let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
350                record_batch_stream_builder.parquet_schema(),
351                &predicate,
352            )?;
353
354            let row_filter = Self::get_row_filter(
355                &predicate,
356                record_batch_stream_builder.parquet_schema(),
357                &iceberg_field_ids,
358                &field_id_map,
359            )?;
360            record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
361
362            if row_group_filtering_enabled {
363                let predicate_filtered_row_groups = Self::get_selected_row_group_indices(
364                    &predicate,
365                    record_batch_stream_builder.metadata(),
366                    &field_id_map,
367                    &task.schema,
368                )?;
369
370                // Merge predicate-based filtering with byte range filtering (if present)
371                // by taking the intersection of both filters
372                selected_row_group_indices = match selected_row_group_indices {
373                    Some(byte_range_filtered) => {
374                        // Keep only row groups that are in both filters
375                        let intersection: Vec<usize> = byte_range_filtered
376                            .into_iter()
377                            .filter(|idx| predicate_filtered_row_groups.contains(idx))
378                            .collect();
379                        Some(intersection)
380                    }
381                    None => Some(predicate_filtered_row_groups),
382                };
383            }
384
385            if row_selection_enabled {
386                row_selection = Some(Self::get_row_selection_for_filter_predicate(
387                    &predicate,
388                    record_batch_stream_builder.metadata(),
389                    &selected_row_group_indices,
390                    &field_id_map,
391                    &task.schema,
392                )?);
393            }
394        }
395
396        let positional_delete_indexes = delete_filter.get_delete_vector(&task);
397
398        if let Some(positional_delete_indexes) = positional_delete_indexes {
399            let delete_row_selection = {
400                let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
401
402                Self::build_deletes_row_selection(
403                    record_batch_stream_builder.metadata().row_groups(),
404                    &selected_row_group_indices,
405                    &positional_delete_indexes,
406                )
407            }?;
408
409            // merge the row selection from the delete files with the row selection
410            // from the filter predicate, if there is one from the filter predicate
411            row_selection = match row_selection {
412                None => Some(delete_row_selection),
413                Some(filter_row_selection) => {
414                    Some(filter_row_selection.intersection(&delete_row_selection))
415                }
416            };
417        }
418
419        if let Some(row_selection) = row_selection {
420            record_batch_stream_builder =
421                record_batch_stream_builder.with_row_selection(row_selection);
422        }
423
424        if let Some(selected_row_group_indices) = selected_row_group_indices {
425            record_batch_stream_builder =
426                record_batch_stream_builder.with_row_groups(selected_row_group_indices);
427        }
428
429        // Build the batch stream and send all the RecordBatches that it generates
430        // to the requester.
431        let record_batch_stream =
432            record_batch_stream_builder
433                .build()?
434                .map(move |batch| match batch {
435                    Ok(batch) => {
436                        // Process the record batch (type promotion, column reordering, virtual fields, etc.)
437                        record_batch_transformer.process_record_batch(batch)
438                    }
439                    Err(err) => Err(err.into()),
440                });
441
442        Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
443    }
444
445    pub(crate) async fn create_parquet_record_batch_stream_builder(
446        data_file_path: &str,
447        file_io: FileIO,
448        should_load_page_index: bool,
449        arrow_reader_options: Option<ArrowReaderOptions>,
450    ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader<impl FileRead + Sized>>> {
451        // Get the metadata for the Parquet file we need to read and build
452        // a reader for the data within
453        let parquet_file = file_io.new_input(data_file_path)?;
454        let (parquet_metadata, parquet_reader) =
455            try_join!(parquet_file.metadata(), parquet_file.reader())?;
456        let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader)
457            .with_preload_column_index(true)
458            .with_preload_offset_index(true)
459            .with_preload_page_index(should_load_page_index);
460
461        // Create the record batch stream builder, which wraps the parquet file reader
462        let options = arrow_reader_options.unwrap_or_default();
463        let record_batch_stream_builder =
464            ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, options).await?;
465        Ok(record_batch_stream_builder)
466    }
467
468    /// computes a `RowSelection` from positional delete indices.
469    ///
470    /// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated
471    /// as having been deleted by a positional delete, taking into account any row groups that have
472    /// been skipped entirely by the filter predicate
473    fn build_deletes_row_selection(
474        row_group_metadata_list: &[RowGroupMetaData],
475        selected_row_groups: &Option<Vec<usize>>,
476        positional_deletes: &DeleteVector,
477    ) -> Result<RowSelection> {
478        let mut results: Vec<RowSelector> = Vec::new();
479        let mut selected_row_groups_idx = 0;
480        let mut current_row_group_base_idx: u64 = 0;
481        let mut delete_vector_iter = positional_deletes.iter();
482        let mut next_deleted_row_idx_opt = delete_vector_iter.next();
483
484        for (idx, row_group_metadata) in row_group_metadata_list.iter().enumerate() {
485            let row_group_num_rows = row_group_metadata.num_rows() as u64;
486            let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;
487
488            // if row group selection is enabled,
489            if let Some(selected_row_groups) = selected_row_groups {
490                // if we've consumed all the selected row groups, we're done
491                if selected_row_groups_idx == selected_row_groups.len() {
492                    break;
493                }
494
495                if idx == selected_row_groups[selected_row_groups_idx] {
496                    // we're in a selected row group. Increment selected_row_groups_idx
497                    // so that next time around the for loop we're looking for the next
498                    // selected row group
499                    selected_row_groups_idx += 1;
500                } else {
501                    // Advance iterator past all deletes in the skipped row group.
502                    // advance_to() positions the iterator to the first delete >= next_row_group_base_idx.
503                    // However, if our cached next_deleted_row_idx_opt is in the skipped range,
504                    // we need to call next() to update the cache with the newly positioned value.
505                    delete_vector_iter.advance_to(next_row_group_base_idx);
506                    // Only update the cache if the cached value is stale (in the skipped range)
507                    if let Some(cached_idx) = next_deleted_row_idx_opt
508                        && cached_idx < next_row_group_base_idx
509                    {
510                        next_deleted_row_idx_opt = delete_vector_iter.next();
511                    }
512
513                    // still increment the current page base index but then skip to the next row group
514                    // in the file
515                    current_row_group_base_idx += row_group_num_rows;
516                    continue;
517                }
518            }
519
520            let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
521                Some(next_deleted_row_idx) => {
522                    // if the index of the next deleted row is beyond this row group, add a selection for
523                    // the remainder of this row group and skip to the next row group
524                    if next_deleted_row_idx >= next_row_group_base_idx {
525                        results.push(RowSelector::select(row_group_num_rows as usize));
526                        current_row_group_base_idx += row_group_num_rows;
527                        continue;
528                    }
529
530                    next_deleted_row_idx
531                }
532
533                // If there are no more pos deletes, add a selector for the entirety of this row group.
534                _ => {
535                    results.push(RowSelector::select(row_group_num_rows as usize));
536                    current_row_group_base_idx += row_group_num_rows;
537                    continue;
538                }
539            };
540
541            let mut current_idx = current_row_group_base_idx;
542            'chunks: while next_deleted_row_idx < next_row_group_base_idx {
543                // `select` all rows that precede the next delete index
544                if current_idx < next_deleted_row_idx {
545                    let run_length = next_deleted_row_idx - current_idx;
546                    results.push(RowSelector::select(run_length as usize));
547                    current_idx += run_length;
548                }
549
550                // `skip` all consecutive deleted rows in the current row group
551                let mut run_length = 0;
552                while next_deleted_row_idx == current_idx
553                    && next_deleted_row_idx < next_row_group_base_idx
554                {
555                    run_length += 1;
556                    current_idx += 1;
557
558                    next_deleted_row_idx_opt = delete_vector_iter.next();
559                    next_deleted_row_idx = match next_deleted_row_idx_opt {
560                        Some(next_deleted_row_idx) => next_deleted_row_idx,
561                        _ => {
562                            // We've processed the final positional delete.
563                            // Conclude the skip and then break so that we select the remaining
564                            // rows in the row group and move on to the next row group
565                            results.push(RowSelector::skip(run_length));
566                            break 'chunks;
567                        }
568                    };
569                }
570                if run_length > 0 {
571                    results.push(RowSelector::skip(run_length));
572                }
573            }
574
575            if current_idx < next_row_group_base_idx {
576                results.push(RowSelector::select(
577                    (next_row_group_base_idx - current_idx) as usize,
578                ));
579            }
580
581            current_row_group_base_idx += row_group_num_rows;
582        }
583
584        Ok(results.into())
585    }
586
587    fn build_field_id_set_and_map(
588        parquet_schema: &SchemaDescriptor,
589        predicate: &BoundPredicate,
590    ) -> Result<(HashSet<i32>, HashMap<i32, usize>)> {
591        // Collects all Iceberg field IDs referenced in the filter predicate
592        let mut collector = CollectFieldIdVisitor {
593            field_ids: HashSet::default(),
594        };
595        visit(&mut collector, predicate)?;
596
597        let iceberg_field_ids = collector.field_ids();
598
599        // Without embedded field IDs, we fall back to position-based mapping for compatibility
600        let field_id_map = match build_field_id_map(parquet_schema)? {
601            Some(map) => map,
602            None => build_fallback_field_id_map(parquet_schema),
603        };
604
605        Ok((iceberg_field_ids, field_id_map))
606    }
607
608    /// Recursively extract leaf field IDs because Parquet projection works at the leaf column level.
609    /// Nested types (struct/list/map) are flattened in Parquet's columnar format.
610    fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
611        match field.field_type.as_ref() {
612            Type::Primitive(_) => {
613                field_ids.push(field.id);
614            }
615            Type::Struct(struct_type) => {
616                for nested_field in struct_type.fields() {
617                    Self::include_leaf_field_id(nested_field, field_ids);
618                }
619            }
620            Type::List(list_type) => {
621                Self::include_leaf_field_id(&list_type.element_field, field_ids);
622            }
623            Type::Map(map_type) => {
624                Self::include_leaf_field_id(&map_type.key_field, field_ids);
625                Self::include_leaf_field_id(&map_type.value_field, field_ids);
626            }
627        }
628    }
629
630    fn get_arrow_projection_mask(
631        field_ids: &[i32],
632        iceberg_schema_of_task: &Schema,
633        parquet_schema: &SchemaDescriptor,
634        arrow_schema: &ArrowSchemaRef,
635        use_fallback: bool, // Whether file lacks embedded field IDs (e.g., migrated from Hive/Spark)
636    ) -> Result<ProjectionMask> {
637        fn type_promotion_is_valid(
638            file_type: Option<&PrimitiveType>,
639            projected_type: Option<&PrimitiveType>,
640        ) -> bool {
641            match (file_type, projected_type) {
642                (Some(lhs), Some(rhs)) if lhs == rhs => true,
643                (Some(PrimitiveType::Int), Some(PrimitiveType::Long)) => true,
644                (Some(PrimitiveType::Float), Some(PrimitiveType::Double)) => true,
645                (
646                    Some(PrimitiveType::Decimal {
647                        precision: file_precision,
648                        scale: file_scale,
649                    }),
650                    Some(PrimitiveType::Decimal {
651                        precision: requested_precision,
652                        scale: requested_scale,
653                    }),
654                ) if requested_precision >= file_precision && file_scale == requested_scale => true,
655                // Uuid will be store as Fixed(16) in parquet file, so the read back type will be Fixed(16).
656                (Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true,
657                _ => false,
658            }
659        }
660
661        if field_ids.is_empty() {
662            return Ok(ProjectionMask::all());
663        }
664
665        if use_fallback {
666            // Position-based projection necessary because file lacks embedded field IDs
667            Self::get_arrow_projection_mask_fallback(field_ids, parquet_schema)
668        } else {
669            // Field-ID-based projection using embedded field IDs from Parquet metadata
670
671            // Parquet's columnar format requires leaf-level (not top-level struct/list/map) projection
672            let mut leaf_field_ids = vec![];
673            for field_id in field_ids {
674                let field = iceberg_schema_of_task.field_by_id(*field_id);
675                if let Some(field) = field {
676                    Self::include_leaf_field_id(field, &mut leaf_field_ids);
677                }
678            }
679
680            Self::get_arrow_projection_mask_with_field_ids(
681                &leaf_field_ids,
682                iceberg_schema_of_task,
683                parquet_schema,
684                arrow_schema,
685                type_promotion_is_valid,
686            )
687        }
688    }
689
690    /// Standard projection using embedded field IDs from Parquet metadata.
691    /// For iceberg-java compatibility with ParquetSchemaUtil.pruneColumns().
692    fn get_arrow_projection_mask_with_field_ids(
693        leaf_field_ids: &[i32],
694        iceberg_schema_of_task: &Schema,
695        parquet_schema: &SchemaDescriptor,
696        arrow_schema: &ArrowSchemaRef,
697        type_promotion_is_valid: fn(Option<&PrimitiveType>, Option<&PrimitiveType>) -> bool,
698    ) -> Result<ProjectionMask> {
699        let mut column_map = HashMap::new();
700        let fields = arrow_schema.fields();
701
702        // Pre-project only the fields that have been selected, possibly avoiding converting
703        // some Arrow types that are not yet supported.
704        let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
705        let projected_arrow_schema = ArrowSchema::new_with_metadata(
706            fields.filter_leaves(|_, f| {
707                f.metadata()
708                    .get(PARQUET_FIELD_ID_META_KEY)
709                    .and_then(|field_id| i32::from_str(field_id).ok())
710                    .is_some_and(|field_id| {
711                        projected_fields.insert((*f).clone(), field_id);
712                        leaf_field_ids.contains(&field_id)
713                    })
714            }),
715            arrow_schema.metadata().clone(),
716        );
717        let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
718
719        fields.filter_leaves(|idx, field| {
720            let Some(field_id) = projected_fields.get(field).cloned() else {
721                return false;
722            };
723
724            let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
725            let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
726
727            if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
728                return false;
729            }
730
731            if !type_promotion_is_valid(
732                parquet_iceberg_field
733                    .unwrap()
734                    .field_type
735                    .as_primitive_type(),
736                iceberg_field.unwrap().field_type.as_primitive_type(),
737            ) {
738                return false;
739            }
740
741            column_map.insert(field_id, idx);
742            true
743        });
744
745        // Schema evolution: New columns may not exist in old Parquet files.
746        // We only project existing columns; RecordBatchTransformer adds default/NULL values.
747        let mut indices = vec![];
748        for field_id in leaf_field_ids {
749            if let Some(col_idx) = column_map.get(field_id) {
750                indices.push(*col_idx);
751            }
752        }
753
754        if indices.is_empty() {
755            // Edge case: All requested columns are new (don't exist in file).
756            // Project all columns so RecordBatchTransformer has a batch to transform.
757            Ok(ProjectionMask::all())
758        } else {
759            Ok(ProjectionMask::leaves(parquet_schema, indices))
760        }
761    }
762
763    /// Fallback projection for Parquet files without field IDs.
764    /// Uses position-based matching: field ID N → column position N-1.
765    /// Projects entire top-level columns (including nested content) for iceberg-java compatibility.
766    fn get_arrow_projection_mask_fallback(
767        field_ids: &[i32],
768        parquet_schema: &SchemaDescriptor,
769    ) -> Result<ProjectionMask> {
770        // Position-based: field_id N → column N-1 (field IDs are 1-indexed)
771        let parquet_root_fields = parquet_schema.root_schema().get_fields();
772        let mut root_indices = vec![];
773
774        for field_id in field_ids.iter() {
775            let parquet_pos = (*field_id - 1) as usize;
776
777            if parquet_pos < parquet_root_fields.len() {
778                root_indices.push(parquet_pos);
779            }
780            // RecordBatchTransformer adds missing columns with NULL values
781        }
782
783        if root_indices.is_empty() {
784            Ok(ProjectionMask::all())
785        } else {
786            Ok(ProjectionMask::roots(parquet_schema, root_indices))
787        }
788    }
789
790    fn get_row_filter(
791        predicates: &BoundPredicate,
792        parquet_schema: &SchemaDescriptor,
793        iceberg_field_ids: &HashSet<i32>,
794        field_id_map: &HashMap<i32, usize>,
795    ) -> Result<RowFilter> {
796        // Collect Parquet column indices from field ids.
797        // If the field id is not found in Parquet schema, it will be ignored due to schema evolution.
798        let mut column_indices = iceberg_field_ids
799            .iter()
800            .filter_map(|field_id| field_id_map.get(field_id).cloned())
801            .collect::<Vec<_>>();
802        column_indices.sort();
803
804        // The converter that converts `BoundPredicates` to `ArrowPredicates`
805        let mut converter = PredicateConverter {
806            parquet_schema,
807            column_map: field_id_map,
808            column_indices: &column_indices,
809        };
810
811        // After collecting required leaf column indices used in the predicate,
812        // creates the projection mask for the Arrow predicates.
813        let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
814        let predicate_func = visit(&mut converter, predicates)?;
815        let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
816        Ok(RowFilter::new(vec![Box::new(arrow_predicate)]))
817    }
818
819    fn get_selected_row_group_indices(
820        predicate: &BoundPredicate,
821        parquet_metadata: &Arc<ParquetMetaData>,
822        field_id_map: &HashMap<i32, usize>,
823        snapshot_schema: &Schema,
824    ) -> Result<Vec<usize>> {
825        let row_groups_metadata = parquet_metadata.row_groups();
826        let mut results = Vec::with_capacity(row_groups_metadata.len());
827
828        for (idx, row_group_metadata) in row_groups_metadata.iter().enumerate() {
829            if RowGroupMetricsEvaluator::eval(
830                predicate,
831                row_group_metadata,
832                field_id_map,
833                snapshot_schema,
834            )? {
835                results.push(idx);
836            }
837        }
838
839        Ok(results)
840    }
841
842    fn get_row_selection_for_filter_predicate(
843        predicate: &BoundPredicate,
844        parquet_metadata: &Arc<ParquetMetaData>,
845        selected_row_groups: &Option<Vec<usize>>,
846        field_id_map: &HashMap<i32, usize>,
847        snapshot_schema: &Schema,
848    ) -> Result<RowSelection> {
849        let Some(column_index) = parquet_metadata.column_index() else {
850            return Err(Error::new(
851                ErrorKind::Unexpected,
852                "Parquet file metadata does not contain a column index",
853            ));
854        };
855
856        let Some(offset_index) = parquet_metadata.offset_index() else {
857            return Err(Error::new(
858                ErrorKind::Unexpected,
859                "Parquet file metadata does not contain an offset index",
860            ));
861        };
862
863        // If all row groups were filtered out, return an empty RowSelection (select no rows)
864        if let Some(selected_row_groups) = selected_row_groups
865            && selected_row_groups.is_empty()
866        {
867            return Ok(RowSelection::from(Vec::new()));
868        }
869
870        let mut selected_row_groups_idx = 0;
871
872        let page_index = column_index
873            .iter()
874            .enumerate()
875            .zip(offset_index)
876            .zip(parquet_metadata.row_groups());
877
878        let mut results = Vec::new();
879        for (((idx, column_index), offset_index), row_group_metadata) in page_index {
880            if let Some(selected_row_groups) = selected_row_groups {
881                // skip row groups that aren't present in selected_row_groups
882                if idx == selected_row_groups[selected_row_groups_idx] {
883                    selected_row_groups_idx += 1;
884                } else {
885                    continue;
886                }
887            }
888
889            let selections_for_page = PageIndexEvaluator::eval(
890                predicate,
891                column_index,
892                offset_index,
893                row_group_metadata,
894                field_id_map,
895                snapshot_schema,
896            )?;
897
898            results.push(selections_for_page);
899
900            if let Some(selected_row_groups) = selected_row_groups
901                && selected_row_groups_idx == selected_row_groups.len()
902            {
903                break;
904            }
905        }
906
907        Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
908    }
909
910    /// Filters row groups by byte range to support Iceberg's file splitting.
911    ///
912    /// Iceberg splits large files at row group boundaries, so we only read row groups
913    /// whose byte ranges overlap with [start, start+length).
914    fn filter_row_groups_by_byte_range(
915        parquet_metadata: &Arc<ParquetMetaData>,
916        start: u64,
917        length: u64,
918    ) -> Result<Vec<usize>> {
919        let row_groups = parquet_metadata.row_groups();
920        let mut selected = Vec::new();
921        let end = start + length;
922
923        // Row groups are stored sequentially after the 4-byte magic header.
924        let mut current_byte_offset = 4u64;
925
926        for (idx, row_group) in row_groups.iter().enumerate() {
927            let row_group_size = row_group.compressed_size() as u64;
928            let row_group_end = current_byte_offset + row_group_size;
929
930            if current_byte_offset < end && start < row_group_end {
931                selected.push(idx);
932            }
933
934            current_byte_offset = row_group_end;
935        }
936
937        Ok(selected)
938    }
939}
940
941/// Build the map of parquet field id to Parquet column index in the schema.
942/// Returns None if the Parquet file doesn't have field IDs embedded (e.g., migrated tables).
943fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<Option<HashMap<i32, usize>>> {
944    let mut column_map = HashMap::new();
945
946    for (idx, field) in parquet_schema.columns().iter().enumerate() {
947        let field_type = field.self_type();
948        match field_type {
949            ParquetType::PrimitiveType { basic_info, .. } => {
950                if !basic_info.has_id() {
951                    return Ok(None);
952                }
953                column_map.insert(basic_info.id(), idx);
954            }
955            ParquetType::GroupType { .. } => {
956                return Err(Error::new(
957                    ErrorKind::DataInvalid,
958                    format!(
959                        "Leave column in schema should be primitive type but got {field_type:?}"
960                    ),
961                ));
962            }
963        };
964    }
965
966    Ok(Some(column_map))
967}
968
969/// Build a fallback field ID map for Parquet files without embedded field IDs.
970/// Position-based (1, 2, 3, ...) for compatibility with iceberg-java migrations.
971fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap<i32, usize> {
972    let mut column_map = HashMap::new();
973
974    // 1-indexed to match iceberg-java's convention
975    for (idx, _field) in parquet_schema.columns().iter().enumerate() {
976        let field_id = (idx + 1) as i32;
977        column_map.insert(field_id, idx);
978    }
979
980    column_map
981}
982
983/// Apply name mapping to Arrow schema for Parquet files lacking field IDs.
984///
985/// Assigns Iceberg field IDs based on column names using the name mapping,
986/// enabling correct projection on migrated files (e.g., from Hive/Spark via add_files).
987///
988/// Per Iceberg spec Column Projection rule #2:
989/// "Use schema.name-mapping.default metadata to map field id to columns without field id"
990/// https://iceberg.apache.org/spec/#column-projection
991///
992/// Corresponds to Java's ParquetSchemaUtil.applyNameMapping() and ApplyNameMapping visitor.
993/// The key difference is Java operates on Parquet MessageType, while we operate on Arrow Schema.
994///
995/// # Arguments
996/// * `arrow_schema` - Arrow schema from Parquet file (without field IDs)
997/// * `name_mapping` - Name mapping from table metadata (TableProperties.DEFAULT_NAME_MAPPING)
998///
999/// # Returns
1000/// Arrow schema with field IDs assigned based on name mapping
1001fn apply_name_mapping_to_arrow_schema(
1002    arrow_schema: ArrowSchemaRef,
1003    name_mapping: &NameMapping,
1004) -> Result<Arc<ArrowSchema>> {
1005    debug_assert!(
1006        arrow_schema
1007            .fields()
1008            .iter()
1009            .next()
1010            .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1011        "Schema already has field IDs - name mapping should not be applied"
1012    );
1013
1014    use arrow_schema::Field;
1015
1016    let fields_with_mapped_ids: Vec<_> = arrow_schema
1017        .fields()
1018        .iter()
1019        .map(|field| {
1020            // Look up this column name in name mapping to get the Iceberg field ID.
1021            // Corresponds to Java's ApplyNameMapping visitor which calls
1022            // nameMapping.find(currentPath()) and returns field.withId() if found.
1023            //
1024            // If the field isn't in the mapping, leave it WITHOUT assigning an ID
1025            // (matching Java's behavior of returning the field unchanged).
1026            // Later, during projection, fields without IDs are filtered out.
1027            let mapped_field_opt = name_mapping
1028                .fields()
1029                .iter()
1030                .find(|f| f.names().contains(&field.name().to_string()));
1031
1032            let mut metadata = field.metadata().clone();
1033
1034            if let Some(mapped_field) = mapped_field_opt
1035                && let Some(field_id) = mapped_field.field_id()
1036            {
1037                // Field found in mapping with a field_id → assign it
1038                metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1039            }
1040            // If field_id is None, leave the field without an ID (will be filtered by projection)
1041
1042            Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1043                .with_metadata(metadata)
1044        })
1045        .collect();
1046
1047    Ok(Arc::new(ArrowSchema::new_with_metadata(
1048        fields_with_mapped_ids,
1049        arrow_schema.metadata().clone(),
1050    )))
1051}
1052
1053/// Add position-based fallback field IDs to Arrow schema for Parquet files lacking them.
1054/// Enables projection on migrated files (e.g., from Hive/Spark).
1055///
1056/// Why at schema level (not per-batch): Efficiency - avoids repeated schema modification.
1057/// Why only top-level: Nested projection uses leaf column indices, not parent struct IDs.
1058/// Why 1-indexed: Compatibility with iceberg-java's ParquetSchemaUtil.addFallbackIds().
1059fn add_fallback_field_ids_to_arrow_schema(arrow_schema: &ArrowSchemaRef) -> Arc<ArrowSchema> {
1060    debug_assert!(
1061        arrow_schema
1062            .fields()
1063            .iter()
1064            .next()
1065            .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1066        "Schema already has field IDs"
1067    );
1068
1069    use arrow_schema::Field;
1070
1071    let fields_with_fallback_ids: Vec<_> = arrow_schema
1072        .fields()
1073        .iter()
1074        .enumerate()
1075        .map(|(pos, field)| {
1076            let mut metadata = field.metadata().clone();
1077            let field_id = (pos + 1) as i32; // 1-indexed for Java compatibility
1078            metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1079
1080            Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1081                .with_metadata(metadata)
1082        })
1083        .collect();
1084
1085    Arc::new(ArrowSchema::new_with_metadata(
1086        fields_with_fallback_ids,
1087        arrow_schema.metadata().clone(),
1088    ))
1089}
1090
1091/// A visitor to collect field ids from bound predicates.
1092struct CollectFieldIdVisitor {
1093    field_ids: HashSet<i32>,
1094}
1095
1096impl CollectFieldIdVisitor {
1097    fn field_ids(self) -> HashSet<i32> {
1098        self.field_ids
1099    }
1100}
1101
1102impl BoundPredicateVisitor for CollectFieldIdVisitor {
1103    type T = ();
1104
1105    fn always_true(&mut self) -> Result<()> {
1106        Ok(())
1107    }
1108
1109    fn always_false(&mut self) -> Result<()> {
1110        Ok(())
1111    }
1112
1113    fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1114        Ok(())
1115    }
1116
1117    fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1118        Ok(())
1119    }
1120
1121    fn not(&mut self, _inner: ()) -> Result<()> {
1122        Ok(())
1123    }
1124
1125    fn is_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1126        self.field_ids.insert(reference.field().id);
1127        Ok(())
1128    }
1129
1130    fn not_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1131        self.field_ids.insert(reference.field().id);
1132        Ok(())
1133    }
1134
1135    fn is_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1136        self.field_ids.insert(reference.field().id);
1137        Ok(())
1138    }
1139
1140    fn not_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1141        self.field_ids.insert(reference.field().id);
1142        Ok(())
1143    }
1144
1145    fn less_than(
1146        &mut self,
1147        reference: &BoundReference,
1148        _literal: &Datum,
1149        _predicate: &BoundPredicate,
1150    ) -> Result<()> {
1151        self.field_ids.insert(reference.field().id);
1152        Ok(())
1153    }
1154
1155    fn less_than_or_eq(
1156        &mut self,
1157        reference: &BoundReference,
1158        _literal: &Datum,
1159        _predicate: &BoundPredicate,
1160    ) -> Result<()> {
1161        self.field_ids.insert(reference.field().id);
1162        Ok(())
1163    }
1164
1165    fn greater_than(
1166        &mut self,
1167        reference: &BoundReference,
1168        _literal: &Datum,
1169        _predicate: &BoundPredicate,
1170    ) -> Result<()> {
1171        self.field_ids.insert(reference.field().id);
1172        Ok(())
1173    }
1174
1175    fn greater_than_or_eq(
1176        &mut self,
1177        reference: &BoundReference,
1178        _literal: &Datum,
1179        _predicate: &BoundPredicate,
1180    ) -> Result<()> {
1181        self.field_ids.insert(reference.field().id);
1182        Ok(())
1183    }
1184
1185    fn eq(
1186        &mut self,
1187        reference: &BoundReference,
1188        _literal: &Datum,
1189        _predicate: &BoundPredicate,
1190    ) -> Result<()> {
1191        self.field_ids.insert(reference.field().id);
1192        Ok(())
1193    }
1194
1195    fn not_eq(
1196        &mut self,
1197        reference: &BoundReference,
1198        _literal: &Datum,
1199        _predicate: &BoundPredicate,
1200    ) -> Result<()> {
1201        self.field_ids.insert(reference.field().id);
1202        Ok(())
1203    }
1204
1205    fn starts_with(
1206        &mut self,
1207        reference: &BoundReference,
1208        _literal: &Datum,
1209        _predicate: &BoundPredicate,
1210    ) -> Result<()> {
1211        self.field_ids.insert(reference.field().id);
1212        Ok(())
1213    }
1214
1215    fn not_starts_with(
1216        &mut self,
1217        reference: &BoundReference,
1218        _literal: &Datum,
1219        _predicate: &BoundPredicate,
1220    ) -> Result<()> {
1221        self.field_ids.insert(reference.field().id);
1222        Ok(())
1223    }
1224
1225    fn r#in(
1226        &mut self,
1227        reference: &BoundReference,
1228        _literals: &FnvHashSet<Datum>,
1229        _predicate: &BoundPredicate,
1230    ) -> Result<()> {
1231        self.field_ids.insert(reference.field().id);
1232        Ok(())
1233    }
1234
1235    fn not_in(
1236        &mut self,
1237        reference: &BoundReference,
1238        _literals: &FnvHashSet<Datum>,
1239        _predicate: &BoundPredicate,
1240    ) -> Result<()> {
1241        self.field_ids.insert(reference.field().id);
1242        Ok(())
1243    }
1244}
1245
1246/// A visitor to convert Iceberg bound predicates to Arrow predicates.
1247struct PredicateConverter<'a> {
1248    /// The Parquet schema descriptor.
1249    pub parquet_schema: &'a SchemaDescriptor,
1250    /// The map between field id and leaf column index in Parquet schema.
1251    pub column_map: &'a HashMap<i32, usize>,
1252    /// The required column indices in Parquet schema for the predicates.
1253    pub column_indices: &'a Vec<usize>,
1254}
1255
1256impl PredicateConverter<'_> {
1257    /// When visiting a bound reference, we return index of the leaf column in the
1258    /// required column indices which is used to project the column in the record batch.
1259    /// Return None if the field id is not found in the column map, which is possible
1260    /// due to schema evolution.
1261    fn bound_reference(&mut self, reference: &BoundReference) -> Result<Option<usize>> {
1262        // The leaf column's index in Parquet schema.
1263        if let Some(column_idx) = self.column_map.get(&reference.field().id) {
1264            if self.parquet_schema.get_column_root(*column_idx).is_group() {
1265                return Err(Error::new(
1266                    ErrorKind::DataInvalid,
1267                    format!(
1268                        "Leave column `{}` in predicates isn't a root column in Parquet schema.",
1269                        reference.field().name
1270                    ),
1271                ));
1272            }
1273
1274            // The leaf column's index in the required column indices.
1275            let index = self
1276                .column_indices
1277                .iter()
1278                .position(|&idx| idx == *column_idx)
1279                .ok_or(Error::new(
1280                    ErrorKind::DataInvalid,
1281                    format!(
1282                "Leave column `{}` in predicates cannot be found in the required column indices.",
1283                reference.field().name
1284            ),
1285                ))?;
1286
1287            Ok(Some(index))
1288        } else {
1289            Ok(None)
1290        }
1291    }
1292
1293    /// Build an Arrow predicate that always returns true.
1294    fn build_always_true(&self) -> Result<Box<PredicateResult>> {
1295        Ok(Box::new(|batch| {
1296            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
1297        }))
1298    }
1299
1300    /// Build an Arrow predicate that always returns false.
1301    fn build_always_false(&self) -> Result<Box<PredicateResult>> {
1302        Ok(Box::new(|batch| {
1303            Ok(BooleanArray::from(vec![false; batch.num_rows()]))
1304        }))
1305    }
1306}
1307
1308/// Gets the leaf column from the record batch for the required column index. Only
1309/// supports top-level columns for now.
1310fn project_column(
1311    batch: &RecordBatch,
1312    column_idx: usize,
1313) -> std::result::Result<ArrayRef, ArrowError> {
1314    let column = batch.column(column_idx);
1315
1316    match column.data_type() {
1317        DataType::Struct(_) => Err(ArrowError::SchemaError(
1318            "Does not support struct column yet.".to_string(),
1319        )),
1320        _ => Ok(column.clone()),
1321    }
1322}
1323
1324type PredicateResult =
1325    dyn FnMut(RecordBatch) -> std::result::Result<BooleanArray, ArrowError> + Send + 'static;
1326
1327impl BoundPredicateVisitor for PredicateConverter<'_> {
1328    type T = Box<PredicateResult>;
1329
1330    fn always_true(&mut self) -> Result<Box<PredicateResult>> {
1331        self.build_always_true()
1332    }
1333
1334    fn always_false(&mut self) -> Result<Box<PredicateResult>> {
1335        self.build_always_false()
1336    }
1337
1338    fn and(
1339        &mut self,
1340        mut lhs: Box<PredicateResult>,
1341        mut rhs: Box<PredicateResult>,
1342    ) -> Result<Box<PredicateResult>> {
1343        Ok(Box::new(move |batch| {
1344            let left = lhs(batch.clone())?;
1345            let right = rhs(batch)?;
1346            and_kleene(&left, &right)
1347        }))
1348    }
1349
1350    fn or(
1351        &mut self,
1352        mut lhs: Box<PredicateResult>,
1353        mut rhs: Box<PredicateResult>,
1354    ) -> Result<Box<PredicateResult>> {
1355        Ok(Box::new(move |batch| {
1356            let left = lhs(batch.clone())?;
1357            let right = rhs(batch)?;
1358            or_kleene(&left, &right)
1359        }))
1360    }
1361
1362    fn not(&mut self, mut inner: Box<PredicateResult>) -> Result<Box<PredicateResult>> {
1363        Ok(Box::new(move |batch| {
1364            let pred_ret = inner(batch)?;
1365            not(&pred_ret)
1366        }))
1367    }
1368
1369    fn is_null(
1370        &mut self,
1371        reference: &BoundReference,
1372        _predicate: &BoundPredicate,
1373    ) -> Result<Box<PredicateResult>> {
1374        if let Some(idx) = self.bound_reference(reference)? {
1375            Ok(Box::new(move |batch| {
1376                let column = project_column(&batch, idx)?;
1377                is_null(&column)
1378            }))
1379        } else {
1380            // A missing column, treating it as null.
1381            self.build_always_true()
1382        }
1383    }
1384
1385    fn not_null(
1386        &mut self,
1387        reference: &BoundReference,
1388        _predicate: &BoundPredicate,
1389    ) -> Result<Box<PredicateResult>> {
1390        if let Some(idx) = self.bound_reference(reference)? {
1391            Ok(Box::new(move |batch| {
1392                let column = project_column(&batch, idx)?;
1393                is_not_null(&column)
1394            }))
1395        } else {
1396            // A missing column, treating it as null.
1397            self.build_always_false()
1398        }
1399    }
1400
1401    fn is_nan(
1402        &mut self,
1403        reference: &BoundReference,
1404        _predicate: &BoundPredicate,
1405    ) -> Result<Box<PredicateResult>> {
1406        if self.bound_reference(reference)?.is_some() {
1407            self.build_always_true()
1408        } else {
1409            // A missing column, treating it as null.
1410            self.build_always_false()
1411        }
1412    }
1413
1414    fn not_nan(
1415        &mut self,
1416        reference: &BoundReference,
1417        _predicate: &BoundPredicate,
1418    ) -> Result<Box<PredicateResult>> {
1419        if self.bound_reference(reference)?.is_some() {
1420            self.build_always_false()
1421        } else {
1422            // A missing column, treating it as null.
1423            self.build_always_true()
1424        }
1425    }
1426
1427    fn less_than(
1428        &mut self,
1429        reference: &BoundReference,
1430        literal: &Datum,
1431        _predicate: &BoundPredicate,
1432    ) -> Result<Box<PredicateResult>> {
1433        if let Some(idx) = self.bound_reference(reference)? {
1434            let literal = get_arrow_datum(literal)?;
1435
1436            Ok(Box::new(move |batch| {
1437                let left = project_column(&batch, idx)?;
1438                let literal = try_cast_literal(&literal, left.data_type())?;
1439                lt(&left, literal.as_ref())
1440            }))
1441        } else {
1442            // A missing column, treating it as null.
1443            self.build_always_true()
1444        }
1445    }
1446
1447    fn less_than_or_eq(
1448        &mut self,
1449        reference: &BoundReference,
1450        literal: &Datum,
1451        _predicate: &BoundPredicate,
1452    ) -> Result<Box<PredicateResult>> {
1453        if let Some(idx) = self.bound_reference(reference)? {
1454            let literal = get_arrow_datum(literal)?;
1455
1456            Ok(Box::new(move |batch| {
1457                let left = project_column(&batch, idx)?;
1458                let literal = try_cast_literal(&literal, left.data_type())?;
1459                lt_eq(&left, literal.as_ref())
1460            }))
1461        } else {
1462            // A missing column, treating it as null.
1463            self.build_always_true()
1464        }
1465    }
1466
1467    fn greater_than(
1468        &mut self,
1469        reference: &BoundReference,
1470        literal: &Datum,
1471        _predicate: &BoundPredicate,
1472    ) -> Result<Box<PredicateResult>> {
1473        if let Some(idx) = self.bound_reference(reference)? {
1474            let literal = get_arrow_datum(literal)?;
1475
1476            Ok(Box::new(move |batch| {
1477                let left = project_column(&batch, idx)?;
1478                let literal = try_cast_literal(&literal, left.data_type())?;
1479                gt(&left, literal.as_ref())
1480            }))
1481        } else {
1482            // A missing column, treating it as null.
1483            self.build_always_false()
1484        }
1485    }
1486
1487    fn greater_than_or_eq(
1488        &mut self,
1489        reference: &BoundReference,
1490        literal: &Datum,
1491        _predicate: &BoundPredicate,
1492    ) -> Result<Box<PredicateResult>> {
1493        if let Some(idx) = self.bound_reference(reference)? {
1494            let literal = get_arrow_datum(literal)?;
1495
1496            Ok(Box::new(move |batch| {
1497                let left = project_column(&batch, idx)?;
1498                let literal = try_cast_literal(&literal, left.data_type())?;
1499                gt_eq(&left, literal.as_ref())
1500            }))
1501        } else {
1502            // A missing column, treating it as null.
1503            self.build_always_false()
1504        }
1505    }
1506
1507    fn eq(
1508        &mut self,
1509        reference: &BoundReference,
1510        literal: &Datum,
1511        _predicate: &BoundPredicate,
1512    ) -> Result<Box<PredicateResult>> {
1513        if let Some(idx) = self.bound_reference(reference)? {
1514            let literal = get_arrow_datum(literal)?;
1515
1516            Ok(Box::new(move |batch| {
1517                let left = project_column(&batch, idx)?;
1518                let literal = try_cast_literal(&literal, left.data_type())?;
1519                eq(&left, literal.as_ref())
1520            }))
1521        } else {
1522            // A missing column, treating it as null.
1523            self.build_always_false()
1524        }
1525    }
1526
1527    fn not_eq(
1528        &mut self,
1529        reference: &BoundReference,
1530        literal: &Datum,
1531        _predicate: &BoundPredicate,
1532    ) -> Result<Box<PredicateResult>> {
1533        if let Some(idx) = self.bound_reference(reference)? {
1534            let literal = get_arrow_datum(literal)?;
1535
1536            Ok(Box::new(move |batch| {
1537                let left = project_column(&batch, idx)?;
1538                let literal = try_cast_literal(&literal, left.data_type())?;
1539                neq(&left, literal.as_ref())
1540            }))
1541        } else {
1542            // A missing column, treating it as null.
1543            self.build_always_false()
1544        }
1545    }
1546
1547    fn starts_with(
1548        &mut self,
1549        reference: &BoundReference,
1550        literal: &Datum,
1551        _predicate: &BoundPredicate,
1552    ) -> Result<Box<PredicateResult>> {
1553        if let Some(idx) = self.bound_reference(reference)? {
1554            let literal = get_arrow_datum(literal)?;
1555
1556            Ok(Box::new(move |batch| {
1557                let left = project_column(&batch, idx)?;
1558                let literal = try_cast_literal(&literal, left.data_type())?;
1559                starts_with(&left, literal.as_ref())
1560            }))
1561        } else {
1562            // A missing column, treating it as null.
1563            self.build_always_false()
1564        }
1565    }
1566
1567    fn not_starts_with(
1568        &mut self,
1569        reference: &BoundReference,
1570        literal: &Datum,
1571        _predicate: &BoundPredicate,
1572    ) -> Result<Box<PredicateResult>> {
1573        if let Some(idx) = self.bound_reference(reference)? {
1574            let literal = get_arrow_datum(literal)?;
1575
1576            Ok(Box::new(move |batch| {
1577                let left = project_column(&batch, idx)?;
1578                let literal = try_cast_literal(&literal, left.data_type())?;
1579                // update here if arrow ever adds a native not_starts_with
1580                not(&starts_with(&left, literal.as_ref())?)
1581            }))
1582        } else {
1583            // A missing column, treating it as null.
1584            self.build_always_true()
1585        }
1586    }
1587
1588    fn r#in(
1589        &mut self,
1590        reference: &BoundReference,
1591        literals: &FnvHashSet<Datum>,
1592        _predicate: &BoundPredicate,
1593    ) -> Result<Box<PredicateResult>> {
1594        if let Some(idx) = self.bound_reference(reference)? {
1595            let literals: Vec<_> = literals
1596                .iter()
1597                .map(|lit| get_arrow_datum(lit).unwrap())
1598                .collect();
1599
1600            Ok(Box::new(move |batch| {
1601                // update this if arrow ever adds a native is_in kernel
1602                let left = project_column(&batch, idx)?;
1603
1604                let mut acc = BooleanArray::from(vec![false; batch.num_rows()]);
1605                for literal in &literals {
1606                    let literal = try_cast_literal(literal, left.data_type())?;
1607                    acc = or(&acc, &eq(&left, literal.as_ref())?)?
1608                }
1609
1610                Ok(acc)
1611            }))
1612        } else {
1613            // A missing column, treating it as null.
1614            self.build_always_false()
1615        }
1616    }
1617
1618    fn not_in(
1619        &mut self,
1620        reference: &BoundReference,
1621        literals: &FnvHashSet<Datum>,
1622        _predicate: &BoundPredicate,
1623    ) -> Result<Box<PredicateResult>> {
1624        if let Some(idx) = self.bound_reference(reference)? {
1625            let literals: Vec<_> = literals
1626                .iter()
1627                .map(|lit| get_arrow_datum(lit).unwrap())
1628                .collect();
1629
1630            Ok(Box::new(move |batch| {
1631                // update this if arrow ever adds a native not_in kernel
1632                let left = project_column(&batch, idx)?;
1633                let mut acc = BooleanArray::from(vec![true; batch.num_rows()]);
1634                for literal in &literals {
1635                    let literal = try_cast_literal(literal, left.data_type())?;
1636                    acc = and(&acc, &neq(&left, literal.as_ref())?)?
1637                }
1638
1639                Ok(acc)
1640            }))
1641        } else {
1642            // A missing column, treating it as null.
1643            self.build_always_true()
1644        }
1645    }
1646}
1647
1648/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader.
1649pub struct ArrowFileReader<R: FileRead> {
1650    meta: FileMetadata,
1651    preload_column_index: bool,
1652    preload_offset_index: bool,
1653    preload_page_index: bool,
1654    metadata_size_hint: Option<usize>,
1655    r: R,
1656}
1657
1658impl<R: FileRead> ArrowFileReader<R> {
1659    /// Create a new ArrowFileReader
1660    pub fn new(meta: FileMetadata, r: R) -> Self {
1661        Self {
1662            meta,
1663            preload_column_index: false,
1664            preload_offset_index: false,
1665            preload_page_index: false,
1666            metadata_size_hint: None,
1667            r,
1668        }
1669    }
1670
1671    /// Enable or disable preloading of the column index
1672    pub fn with_preload_column_index(mut self, preload: bool) -> Self {
1673        self.preload_column_index = preload;
1674        self
1675    }
1676
1677    /// Enable or disable preloading of the offset index
1678    pub fn with_preload_offset_index(mut self, preload: bool) -> Self {
1679        self.preload_offset_index = preload;
1680        self
1681    }
1682
1683    /// Enable or disable preloading of the page index
1684    pub fn with_preload_page_index(mut self, preload: bool) -> Self {
1685        self.preload_page_index = preload;
1686        self
1687    }
1688
1689    /// Provide a hint as to the number of bytes to prefetch for parsing the Parquet metadata
1690    ///
1691    /// This hint can help reduce the number of fetch requests. For more details see the
1692    /// [ParquetMetaDataReader documentation](https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html#method.with_prefetch_hint).
1693    pub fn with_metadata_size_hint(mut self, hint: usize) -> Self {
1694        self.metadata_size_hint = Some(hint);
1695        self
1696    }
1697}
1698
1699impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
1700    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
1701        Box::pin(
1702            self.r
1703                .read(range.start..range.end)
1704                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
1705        )
1706    }
1707
1708    // TODO: currently we don't respect `ArrowReaderOptions` cause it don't expose any method to access the option field
1709    // we will fix it after `v55.1.0` is released in https://github.com/apache/arrow-rs/issues/7393
1710    fn get_metadata(
1711        &mut self,
1712        _options: Option<&'_ ArrowReaderOptions>,
1713    ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
1714        async move {
1715            let reader = ParquetMetaDataReader::new()
1716                .with_prefetch_hint(self.metadata_size_hint)
1717                // Set the page policy first because it updates both column and offset policies.
1718                .with_page_index_policy(PageIndexPolicy::from(self.preload_page_index))
1719                .with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
1720                .with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index));
1721            let size = self.meta.size;
1722            let meta = reader.load_and_finish(self, size).await?;
1723
1724            Ok(Arc::new(meta))
1725        }
1726        .boxed()
1727    }
1728}
1729
1730/// The Arrow type of an array that the Parquet reader reads may not match the exact Arrow type
1731/// that Iceberg uses for literals - but they are effectively the same logical type,
1732/// i.e. LargeUtf8 and Utf8 or Utf8View and Utf8 or Utf8View and LargeUtf8.
1733///
1734/// The Arrow compute kernels that we use must match the type exactly, so first cast the literal
1735/// into the type of the batch we read from Parquet before sending it to the compute kernel.
1736fn try_cast_literal(
1737    literal: &Arc<dyn ArrowDatum + Send + Sync>,
1738    column_type: &DataType,
1739) -> std::result::Result<Arc<dyn ArrowDatum + Send + Sync>, ArrowError> {
1740    let literal_array = literal.get().0;
1741
1742    // No cast required
1743    if literal_array.data_type() == column_type {
1744        return Ok(Arc::clone(literal));
1745    }
1746
1747    let literal_array = cast(literal_array, column_type)?;
1748    Ok(Arc::new(Scalar::new(literal_array)))
1749}
1750
1751#[cfg(test)]
1752mod tests {
1753    use std::collections::{HashMap, HashSet};
1754    use std::fs::File;
1755    use std::sync::Arc;
1756
1757    use arrow_array::cast::AsArray;
1758    use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray};
1759    use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1760    use futures::TryStreamExt;
1761    use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
1762    use parquet::arrow::{ArrowWriter, ProjectionMask};
1763    use parquet::basic::Compression;
1764    use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1765    use parquet::file::properties::WriterProperties;
1766    use parquet::schema::parser::parse_message_type;
1767    use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
1768    use roaring::RoaringTreemap;
1769    use tempfile::TempDir;
1770
1771    use crate::ErrorKind;
1772    use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
1773    use crate::arrow::{ArrowReader, ArrowReaderBuilder};
1774    use crate::delete_vector::DeleteVector;
1775    use crate::expr::visitors::bound_predicate_visitor::visit;
1776    use crate::expr::{Bind, Predicate, Reference};
1777    use crate::io::FileIO;
1778    use crate::scan::{FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream};
1779    use crate::spec::{
1780        DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type,
1781    };
1782
1783    fn table_schema_simple() -> SchemaRef {
1784        Arc::new(
1785            Schema::builder()
1786                .with_schema_id(1)
1787                .with_identifier_field_ids(vec![2])
1788                .with_fields(vec![
1789                    NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1790                    NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1791                    NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1792                    NestedField::optional(4, "qux", Type::Primitive(PrimitiveType::Float)).into(),
1793                ])
1794                .build()
1795                .unwrap(),
1796        )
1797    }
1798
1799    #[test]
1800    fn test_collect_field_id() {
1801        let schema = table_schema_simple();
1802        let expr = Reference::new("qux").is_null();
1803        let bound_expr = expr.bind(schema, true).unwrap();
1804
1805        let mut visitor = CollectFieldIdVisitor {
1806            field_ids: HashSet::default(),
1807        };
1808        visit(&mut visitor, &bound_expr).unwrap();
1809
1810        let mut expected = HashSet::default();
1811        expected.insert(4_i32);
1812
1813        assert_eq!(visitor.field_ids, expected);
1814    }
1815
1816    #[test]
1817    fn test_collect_field_id_with_and() {
1818        let schema = table_schema_simple();
1819        let expr = Reference::new("qux")
1820            .is_null()
1821            .and(Reference::new("baz").is_null());
1822        let bound_expr = expr.bind(schema, true).unwrap();
1823
1824        let mut visitor = CollectFieldIdVisitor {
1825            field_ids: HashSet::default(),
1826        };
1827        visit(&mut visitor, &bound_expr).unwrap();
1828
1829        let mut expected = HashSet::default();
1830        expected.insert(4_i32);
1831        expected.insert(3);
1832
1833        assert_eq!(visitor.field_ids, expected);
1834    }
1835
1836    #[test]
1837    fn test_collect_field_id_with_or() {
1838        let schema = table_schema_simple();
1839        let expr = Reference::new("qux")
1840            .is_null()
1841            .or(Reference::new("baz").is_null());
1842        let bound_expr = expr.bind(schema, true).unwrap();
1843
1844        let mut visitor = CollectFieldIdVisitor {
1845            field_ids: HashSet::default(),
1846        };
1847        visit(&mut visitor, &bound_expr).unwrap();
1848
1849        let mut expected = HashSet::default();
1850        expected.insert(4_i32);
1851        expected.insert(3);
1852
1853        assert_eq!(visitor.field_ids, expected);
1854    }
1855
1856    #[test]
1857    fn test_arrow_projection_mask() {
1858        let schema = Arc::new(
1859            Schema::builder()
1860                .with_schema_id(1)
1861                .with_identifier_field_ids(vec![1])
1862                .with_fields(vec![
1863                    NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
1864                    NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
1865                    NestedField::optional(
1866                        3,
1867                        "c3",
1868                        Type::Primitive(PrimitiveType::Decimal {
1869                            precision: 38,
1870                            scale: 3,
1871                        }),
1872                    )
1873                    .into(),
1874                ])
1875                .build()
1876                .unwrap(),
1877        );
1878        let arrow_schema = Arc::new(ArrowSchema::new(vec![
1879            Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
1880                PARQUET_FIELD_ID_META_KEY.to_string(),
1881                "1".to_string(),
1882            )])),
1883            // Type not supported
1884            Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
1885                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
1886            ),
1887            // Precision is beyond the supported range
1888            Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
1889                PARQUET_FIELD_ID_META_KEY.to_string(),
1890                "3".to_string(),
1891            )])),
1892        ]));
1893
1894        let message_type = "
1895message schema {
1896  required binary c1 (STRING) = 1;
1897  optional int32 c2 (INTEGER(8,true)) = 2;
1898  optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
1899}
1900    ";
1901        let parquet_type = parse_message_type(message_type).expect("should parse schema");
1902        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
1903
1904        // Try projecting the fields c2 and c3 with the unsupported data types
1905        let err = ArrowReader::get_arrow_projection_mask(
1906            &[1, 2, 3],
1907            &schema,
1908            &parquet_schema,
1909            &arrow_schema,
1910            false,
1911        )
1912        .unwrap_err();
1913
1914        assert_eq!(err.kind(), ErrorKind::DataInvalid);
1915        assert_eq!(
1916            err.to_string(),
1917            "DataInvalid => Unsupported Arrow data type: Duration(µs)".to_string()
1918        );
1919
1920        // Omitting field c2, we still get an error due to c3 being selected
1921        let err = ArrowReader::get_arrow_projection_mask(
1922            &[1, 3],
1923            &schema,
1924            &parquet_schema,
1925            &arrow_schema,
1926            false,
1927        )
1928        .unwrap_err();
1929
1930        assert_eq!(err.kind(), ErrorKind::DataInvalid);
1931        assert_eq!(
1932            err.to_string(),
1933            "DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
1934        );
1935
1936        // Finally avoid selecting fields with unsupported data types
1937        let mask = ArrowReader::get_arrow_projection_mask(
1938            &[1],
1939            &schema,
1940            &parquet_schema,
1941            &arrow_schema,
1942            false,
1943        )
1944        .expect("Some ProjectionMask");
1945        assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
1946    }
1947
1948    #[tokio::test]
1949    async fn test_kleene_logic_or_behaviour() {
1950        // a IS NULL OR a = 'foo'
1951        let predicate = Reference::new("a")
1952            .is_null()
1953            .or(Reference::new("a").equal_to(Datum::string("foo")));
1954
1955        // Table data: [NULL, "foo", "bar"]
1956        let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
1957
1958        // Expected: [NULL, "foo"].
1959        let expected = vec![None, Some("foo".to_string())];
1960
1961        let (file_io, schema, table_location, _temp_dir) =
1962            setup_kleene_logic(data_for_col_a, DataType::Utf8);
1963        let reader = ArrowReaderBuilder::new(file_io).build();
1964
1965        let result_data = test_perform_read(predicate, schema, table_location, reader).await;
1966
1967        assert_eq!(result_data, expected);
1968    }
1969
1970    #[tokio::test]
1971    async fn test_kleene_logic_and_behaviour() {
1972        // a IS NOT NULL AND a != 'foo'
1973        let predicate = Reference::new("a")
1974            .is_not_null()
1975            .and(Reference::new("a").not_equal_to(Datum::string("foo")));
1976
1977        // Table data: [NULL, "foo", "bar"]
1978        let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
1979
1980        // Expected: ["bar"].
1981        let expected = vec![Some("bar".to_string())];
1982
1983        let (file_io, schema, table_location, _temp_dir) =
1984            setup_kleene_logic(data_for_col_a, DataType::Utf8);
1985        let reader = ArrowReaderBuilder::new(file_io).build();
1986
1987        let result_data = test_perform_read(predicate, schema, table_location, reader).await;
1988
1989        assert_eq!(result_data, expected);
1990    }
1991
1992    #[tokio::test]
1993    async fn test_predicate_cast_literal() {
1994        let predicates = vec![
1995            // a == 'foo'
1996            (Reference::new("a").equal_to(Datum::string("foo")), vec![
1997                Some("foo".to_string()),
1998            ]),
1999            // a != 'foo'
2000            (
2001                Reference::new("a").not_equal_to(Datum::string("foo")),
2002                vec![Some("bar".to_string())],
2003            ),
2004            // STARTS_WITH(a, 'foo')
2005            (Reference::new("a").starts_with(Datum::string("f")), vec![
2006                Some("foo".to_string()),
2007            ]),
2008            // NOT STARTS_WITH(a, 'foo')
2009            (
2010                Reference::new("a").not_starts_with(Datum::string("f")),
2011                vec![Some("bar".to_string())],
2012            ),
2013            // a < 'foo'
2014            (Reference::new("a").less_than(Datum::string("foo")), vec![
2015                Some("bar".to_string()),
2016            ]),
2017            // a <= 'foo'
2018            (
2019                Reference::new("a").less_than_or_equal_to(Datum::string("foo")),
2020                vec![Some("foo".to_string()), Some("bar".to_string())],
2021            ),
2022            // a > 'foo'
2023            (
2024                Reference::new("a").greater_than(Datum::string("bar")),
2025                vec![Some("foo".to_string())],
2026            ),
2027            // a >= 'foo'
2028            (
2029                Reference::new("a").greater_than_or_equal_to(Datum::string("foo")),
2030                vec![Some("foo".to_string())],
2031            ),
2032            // a IN ('foo', 'bar')
2033            (
2034                Reference::new("a").is_in([Datum::string("foo"), Datum::string("baz")]),
2035                vec![Some("foo".to_string())],
2036            ),
2037            // a NOT IN ('foo', 'bar')
2038            (
2039                Reference::new("a").is_not_in([Datum::string("foo"), Datum::string("baz")]),
2040                vec![Some("bar".to_string())],
2041            ),
2042        ];
2043
2044        // Table data: ["foo", "bar"]
2045        let data_for_col_a = vec![Some("foo".to_string()), Some("bar".to_string())];
2046
2047        let (file_io, schema, table_location, _temp_dir) =
2048            setup_kleene_logic(data_for_col_a, DataType::LargeUtf8);
2049        let reader = ArrowReaderBuilder::new(file_io).build();
2050
2051        for (predicate, expected) in predicates {
2052            println!("testing predicate {predicate}");
2053            let result_data = test_perform_read(
2054                predicate.clone(),
2055                schema.clone(),
2056                table_location.clone(),
2057                reader.clone(),
2058            )
2059            .await;
2060
2061            assert_eq!(result_data, expected, "predicate={predicate}");
2062        }
2063    }
2064
2065    async fn test_perform_read(
2066        predicate: Predicate,
2067        schema: SchemaRef,
2068        table_location: String,
2069        reader: ArrowReader,
2070    ) -> Vec<Option<String>> {
2071        let tasks = Box::pin(futures::stream::iter(
2072            vec![Ok(FileScanTask {
2073                start: 0,
2074                length: 0,
2075                record_count: None,
2076                data_file_path: format!("{table_location}/1.parquet"),
2077                data_file_format: DataFileFormat::Parquet,
2078                schema: schema.clone(),
2079                project_field_ids: vec![1],
2080                predicate: Some(predicate.bind(schema, true).unwrap()),
2081                deletes: vec![],
2082                partition: None,
2083                partition_spec: None,
2084                name_mapping: None,
2085                case_sensitive: false,
2086            })]
2087            .into_iter(),
2088        )) as FileScanTaskStream;
2089
2090        let result = reader
2091            .read(tasks)
2092            .unwrap()
2093            .try_collect::<Vec<RecordBatch>>()
2094            .await
2095            .unwrap();
2096
2097        result[0].columns()[0]
2098            .as_string_opt::<i32>()
2099            .unwrap()
2100            .iter()
2101            .map(|v| v.map(ToOwned::to_owned))
2102            .collect::<Vec<_>>()
2103    }
2104
2105    fn setup_kleene_logic(
2106        data_for_col_a: Vec<Option<String>>,
2107        col_a_type: DataType,
2108    ) -> (FileIO, SchemaRef, String, TempDir) {
2109        let schema = Arc::new(
2110            Schema::builder()
2111                .with_schema_id(1)
2112                .with_fields(vec![
2113                    NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(),
2114                ])
2115                .build()
2116                .unwrap(),
2117        );
2118
2119        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2120            Field::new("a", col_a_type.clone(), true).with_metadata(HashMap::from([(
2121                PARQUET_FIELD_ID_META_KEY.to_string(),
2122                "1".to_string(),
2123            )])),
2124        ]));
2125
2126        let tmp_dir = TempDir::new().unwrap();
2127        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2128
2129        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2130
2131        let col = match col_a_type {
2132            DataType::Utf8 => Arc::new(StringArray::from(data_for_col_a)) as ArrayRef,
2133            DataType::LargeUtf8 => Arc::new(LargeStringArray::from(data_for_col_a)) as ArrayRef,
2134            _ => panic!("unexpected col_a_type"),
2135        };
2136
2137        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col]).unwrap();
2138
2139        // Write the Parquet files
2140        let props = WriterProperties::builder()
2141            .set_compression(Compression::SNAPPY)
2142            .build();
2143
2144        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
2145        let mut writer =
2146            ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
2147
2148        writer.write(&to_write).expect("Writing batch");
2149
2150        // writer must be closed to write footer
2151        writer.close().unwrap();
2152
2153        (file_io, schema, table_location, tmp_dir)
2154    }
2155
2156    #[test]
2157    fn test_build_deletes_row_selection() {
2158        let schema_descr = get_test_schema_descr();
2159
2160        let mut columns = vec![];
2161        for ptr in schema_descr.columns() {
2162            let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
2163            columns.push(column);
2164        }
2165
2166        let row_groups_metadata = vec![
2167            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 0),
2168            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 1),
2169            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 2),
2170            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 3),
2171            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 4),
2172        ];
2173
2174        let selected_row_groups = Some(vec![1, 3]);
2175
2176        /* cases to cover:
2177           * {skip|select} {first|intermediate|last} {one row|multiple rows} in
2178             {first|intermediate|last} {skipped|selected} row group
2179           * row group selection disabled
2180        */
2181
2182        let positional_deletes = RoaringTreemap::from_iter(&[
2183            1, // in skipped rg 0, should be ignored
2184            3, // run of three consecutive items in skipped rg0
2185            4, 5, 998, // two consecutive items at end of skipped rg0
2186            999, 1000, // solitary row at start of selected rg1 (1, 9)
2187            1010, // run of 3 rows in selected rg1
2188            1011, 1012, // (3, 485)
2189            1498, // run of two items at end of selected rg1
2190            1499, 1500, // run of two items at start of skipped rg2
2191            1501, 1600, // should ignore, in skipped rg2
2192            1999, // single row at end of skipped rg2
2193            2000, // run of two items at start of selected rg3
2194            2001, // (4, 98)
2195            2100, // single row in selected row group 3 (1, 99)
2196            2200, // run of 3 consecutive rows in selected row group 3
2197            2201, 2202, // (3, 796)
2198            2999, // single item at end of selected rg3 (1)
2199            3000, // single item at start of skipped rg4
2200        ]);
2201
2202        let positional_deletes = DeleteVector::new(positional_deletes);
2203
2204        // using selected row groups 1 and 3
2205        let result = ArrowReader::build_deletes_row_selection(
2206            &row_groups_metadata,
2207            &selected_row_groups,
2208            &positional_deletes,
2209        )
2210        .unwrap();
2211
2212        let expected = RowSelection::from(vec![
2213            RowSelector::skip(1),
2214            RowSelector::select(9),
2215            RowSelector::skip(3),
2216            RowSelector::select(485),
2217            RowSelector::skip(4),
2218            RowSelector::select(98),
2219            RowSelector::skip(1),
2220            RowSelector::select(99),
2221            RowSelector::skip(3),
2222            RowSelector::select(796),
2223            RowSelector::skip(1),
2224        ]);
2225
2226        assert_eq!(result, expected);
2227
2228        // selecting all row groups
2229        let result = ArrowReader::build_deletes_row_selection(
2230            &row_groups_metadata,
2231            &None,
2232            &positional_deletes,
2233        )
2234        .unwrap();
2235
2236        let expected = RowSelection::from(vec![
2237            RowSelector::select(1),
2238            RowSelector::skip(1),
2239            RowSelector::select(1),
2240            RowSelector::skip(3),
2241            RowSelector::select(992),
2242            RowSelector::skip(3),
2243            RowSelector::select(9),
2244            RowSelector::skip(3),
2245            RowSelector::select(485),
2246            RowSelector::skip(4),
2247            RowSelector::select(98),
2248            RowSelector::skip(1),
2249            RowSelector::select(398),
2250            RowSelector::skip(3),
2251            RowSelector::select(98),
2252            RowSelector::skip(1),
2253            RowSelector::select(99),
2254            RowSelector::skip(3),
2255            RowSelector::select(796),
2256            RowSelector::skip(2),
2257            RowSelector::select(499),
2258        ]);
2259
2260        assert_eq!(result, expected);
2261    }
2262
2263    fn build_test_row_group_meta(
2264        schema_descr: SchemaDescPtr,
2265        columns: Vec<ColumnChunkMetaData>,
2266        num_rows: i64,
2267        ordinal: i16,
2268    ) -> RowGroupMetaData {
2269        RowGroupMetaData::builder(schema_descr.clone())
2270            .set_num_rows(num_rows)
2271            .set_total_byte_size(2000)
2272            .set_column_metadata(columns)
2273            .set_ordinal(ordinal)
2274            .build()
2275            .unwrap()
2276    }
2277
2278    fn get_test_schema_descr() -> SchemaDescPtr {
2279        use parquet::schema::types::Type as SchemaType;
2280
2281        let schema = SchemaType::group_type_builder("schema")
2282            .with_fields(vec![
2283                Arc::new(
2284                    SchemaType::primitive_type_builder("a", parquet::basic::Type::INT32)
2285                        .build()
2286                        .unwrap(),
2287                ),
2288                Arc::new(
2289                    SchemaType::primitive_type_builder("b", parquet::basic::Type::INT32)
2290                        .build()
2291                        .unwrap(),
2292                ),
2293            ])
2294            .build()
2295            .unwrap();
2296
2297        Arc::new(SchemaDescriptor::new(Arc::new(schema)))
2298    }
2299
2300    /// Verifies that file splits respect byte ranges and only read specific row groups.
2301    #[tokio::test]
2302    async fn test_file_splits_respect_byte_ranges() {
2303        use arrow_array::Int32Array;
2304        use parquet::file::reader::{FileReader, SerializedFileReader};
2305
2306        let schema = Arc::new(
2307            Schema::builder()
2308                .with_schema_id(1)
2309                .with_fields(vec![
2310                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2311                ])
2312                .build()
2313                .unwrap(),
2314        );
2315
2316        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2317            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2318                PARQUET_FIELD_ID_META_KEY.to_string(),
2319                "1".to_string(),
2320            )])),
2321        ]));
2322
2323        let tmp_dir = TempDir::new().unwrap();
2324        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2325        let file_path = format!("{table_location}/multi_row_group.parquet");
2326
2327        // Force each batch into its own row group for testing byte range filtering.
2328        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2329            (0..100).collect::<Vec<i32>>(),
2330        ))])
2331        .unwrap();
2332        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2333            (100..200).collect::<Vec<i32>>(),
2334        ))])
2335        .unwrap();
2336        let batch3 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2337            (200..300).collect::<Vec<i32>>(),
2338        ))])
2339        .unwrap();
2340
2341        let props = WriterProperties::builder()
2342            .set_compression(Compression::SNAPPY)
2343            .set_max_row_group_size(100)
2344            .build();
2345
2346        let file = File::create(&file_path).unwrap();
2347        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2348        writer.write(&batch1).expect("Writing batch 1");
2349        writer.write(&batch2).expect("Writing batch 2");
2350        writer.write(&batch3).expect("Writing batch 3");
2351        writer.close().unwrap();
2352
2353        // Read the file metadata to get row group byte positions
2354        let file = File::open(&file_path).unwrap();
2355        let reader = SerializedFileReader::new(file).unwrap();
2356        let metadata = reader.metadata();
2357
2358        println!("File has {} row groups", metadata.num_row_groups());
2359        assert_eq!(metadata.num_row_groups(), 3, "Expected 3 row groups");
2360
2361        // Get byte positions for each row group
2362        let row_group_0 = metadata.row_group(0);
2363        let row_group_1 = metadata.row_group(1);
2364        let row_group_2 = metadata.row_group(2);
2365
2366        let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
2367        let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2368        let rg2_start = rg1_start + row_group_1.compressed_size() as u64;
2369        let file_end = rg2_start + row_group_2.compressed_size() as u64;
2370
2371        println!(
2372            "Row group 0: {} rows, starts at byte {}, {} bytes compressed",
2373            row_group_0.num_rows(),
2374            rg0_start,
2375            row_group_0.compressed_size()
2376        );
2377        println!(
2378            "Row group 1: {} rows, starts at byte {}, {} bytes compressed",
2379            row_group_1.num_rows(),
2380            rg1_start,
2381            row_group_1.compressed_size()
2382        );
2383        println!(
2384            "Row group 2: {} rows, starts at byte {}, {} bytes compressed",
2385            row_group_2.num_rows(),
2386            rg2_start,
2387            row_group_2.compressed_size()
2388        );
2389
2390        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2391        let reader = ArrowReaderBuilder::new(file_io).build();
2392
2393        // Task 1: read only the first row group
2394        let task1 = FileScanTask {
2395            start: rg0_start,
2396            length: row_group_0.compressed_size() as u64,
2397            record_count: Some(100),
2398            data_file_path: file_path.clone(),
2399            data_file_format: DataFileFormat::Parquet,
2400            schema: schema.clone(),
2401            project_field_ids: vec![1],
2402            predicate: None,
2403            deletes: vec![],
2404            partition: None,
2405            partition_spec: None,
2406            name_mapping: None,
2407            case_sensitive: false,
2408        };
2409
2410        // Task 2: read the second and third row groups
2411        let task2 = FileScanTask {
2412            start: rg1_start,
2413            length: file_end - rg1_start,
2414            record_count: Some(200),
2415            data_file_path: file_path.clone(),
2416            data_file_format: DataFileFormat::Parquet,
2417            schema: schema.clone(),
2418            project_field_ids: vec![1],
2419            predicate: None,
2420            deletes: vec![],
2421            partition: None,
2422            partition_spec: None,
2423            name_mapping: None,
2424            case_sensitive: false,
2425        };
2426
2427        let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
2428        let result1 = reader
2429            .clone()
2430            .read(tasks1)
2431            .unwrap()
2432            .try_collect::<Vec<RecordBatch>>()
2433            .await
2434            .unwrap();
2435
2436        let total_rows_task1: usize = result1.iter().map(|b| b.num_rows()).sum();
2437        println!(
2438            "Task 1 (bytes {}-{}) returned {} rows",
2439            rg0_start,
2440            rg0_start + row_group_0.compressed_size() as u64,
2441            total_rows_task1
2442        );
2443
2444        let tasks2 = Box::pin(futures::stream::iter(vec![Ok(task2)])) as FileScanTaskStream;
2445        let result2 = reader
2446            .read(tasks2)
2447            .unwrap()
2448            .try_collect::<Vec<RecordBatch>>()
2449            .await
2450            .unwrap();
2451
2452        let total_rows_task2: usize = result2.iter().map(|b| b.num_rows()).sum();
2453        println!("Task 2 (bytes {rg1_start}-{file_end}) returned {total_rows_task2} rows");
2454
2455        assert_eq!(
2456            total_rows_task1, 100,
2457            "Task 1 should read only the first row group (100 rows), but got {total_rows_task1} rows"
2458        );
2459
2460        assert_eq!(
2461            total_rows_task2, 200,
2462            "Task 2 should read only the second+third row groups (200 rows), but got {total_rows_task2} rows"
2463        );
2464
2465        // Verify the actual data values are correct (not just the row count)
2466        if total_rows_task1 > 0 {
2467            let first_batch = &result1[0];
2468            let id_col = first_batch
2469                .column(0)
2470                .as_primitive::<arrow_array::types::Int32Type>();
2471            let first_val = id_col.value(0);
2472            let last_val = id_col.value(id_col.len() - 1);
2473            println!("Task 1 data range: {first_val} to {last_val}");
2474
2475            assert_eq!(first_val, 0, "Task 1 should start with id=0");
2476            assert_eq!(last_val, 99, "Task 1 should end with id=99");
2477        }
2478
2479        if total_rows_task2 > 0 {
2480            let first_batch = &result2[0];
2481            let id_col = first_batch
2482                .column(0)
2483                .as_primitive::<arrow_array::types::Int32Type>();
2484            let first_val = id_col.value(0);
2485            println!("Task 2 first value: {first_val}");
2486
2487            assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0");
2488        }
2489    }
2490
2491    /// Test schema evolution: reading old Parquet file (with only column 'a')
2492    /// using a newer table schema (with columns 'a' and 'b').
2493    /// This tests that:
2494    /// 1. get_arrow_projection_mask allows missing columns
2495    /// 2. RecordBatchTransformer adds missing column 'b' with NULL values
2496    #[tokio::test]
2497    async fn test_schema_evolution_add_column() {
2498        use arrow_array::{Array, Int32Array};
2499
2500        // New table schema: columns 'a' and 'b' (b was added later, file only has 'a')
2501        let new_schema = Arc::new(
2502            Schema::builder()
2503                .with_schema_id(2)
2504                .with_fields(vec![
2505                    NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)).into(),
2506                    NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
2507                ])
2508                .build()
2509                .unwrap(),
2510        );
2511
2512        // Create Arrow schema for old Parquet file (only has column 'a')
2513        let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
2514            Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([(
2515                PARQUET_FIELD_ID_META_KEY.to_string(),
2516                "1".to_string(),
2517            )])),
2518        ]));
2519
2520        // Write old Parquet file with only column 'a'
2521        let tmp_dir = TempDir::new().unwrap();
2522        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2523        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2524
2525        let data_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
2526        let to_write = RecordBatch::try_new(arrow_schema_old.clone(), vec![data_a]).unwrap();
2527
2528        let props = WriterProperties::builder()
2529            .set_compression(Compression::SNAPPY)
2530            .build();
2531        let file = File::create(format!("{table_location}/old_file.parquet")).unwrap();
2532        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
2533        writer.write(&to_write).expect("Writing batch");
2534        writer.close().unwrap();
2535
2536        // Read the old Parquet file using the NEW schema (with column 'b')
2537        let reader = ArrowReaderBuilder::new(file_io).build();
2538        let tasks = Box::pin(futures::stream::iter(
2539            vec![Ok(FileScanTask {
2540                start: 0,
2541                length: 0,
2542                record_count: None,
2543                data_file_path: format!("{table_location}/old_file.parquet"),
2544                data_file_format: DataFileFormat::Parquet,
2545                schema: new_schema.clone(),
2546                project_field_ids: vec![1, 2], // Request both columns 'a' and 'b'
2547                predicate: None,
2548                deletes: vec![],
2549                partition: None,
2550                partition_spec: None,
2551                name_mapping: None,
2552                case_sensitive: false,
2553            })]
2554            .into_iter(),
2555        )) as FileScanTaskStream;
2556
2557        let result = reader
2558            .read(tasks)
2559            .unwrap()
2560            .try_collect::<Vec<RecordBatch>>()
2561            .await
2562            .unwrap();
2563
2564        // Verify we got the correct data
2565        assert_eq!(result.len(), 1);
2566        let batch = &result[0];
2567
2568        // Should have 2 columns now
2569        assert_eq!(batch.num_columns(), 2);
2570        assert_eq!(batch.num_rows(), 3);
2571
2572        // Column 'a' should have the original data
2573        let col_a = batch
2574            .column(0)
2575            .as_primitive::<arrow_array::types::Int32Type>();
2576        assert_eq!(col_a.values(), &[1, 2, 3]);
2577
2578        // Column 'b' should be all NULLs (it didn't exist in the old file)
2579        let col_b = batch
2580            .column(1)
2581            .as_primitive::<arrow_array::types::Int32Type>();
2582        assert_eq!(col_b.null_count(), 3);
2583        assert!(col_b.is_null(0));
2584        assert!(col_b.is_null(1));
2585        assert!(col_b.is_null(2));
2586    }
2587
2588    /// Test for bug where position deletes in later row groups are not applied correctly.
2589    ///
2590    /// When a file has multiple row groups and a position delete targets a row in a later
2591    /// row group, the `build_deletes_row_selection` function had a bug where it would
2592    /// fail to increment `current_row_group_base_idx` when skipping row groups.
2593    ///
2594    /// This test creates:
2595    /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
2596    /// - A position delete file that deletes row 199 (last row in second row group)
2597    ///
2598    /// Expected behavior: Should return 199 rows (with id=200 deleted)
2599    /// Bug behavior: Returns 200 rows (delete is not applied)
2600    ///
2601    /// This bug was discovered while running Apache Spark + Apache Iceberg integration tests
2602    /// through DataFusion Comet. The following Iceberg Java tests failed due to this bug:
2603    /// - `org.apache.iceberg.spark.extensions.TestMergeOnReadDelete::testDeleteWithMultipleRowGroupsParquet`
2604    /// - `org.apache.iceberg.spark.extensions.TestMergeOnReadUpdate::testUpdateWithMultipleRowGroupsParquet`
2605    #[tokio::test]
2606    async fn test_position_delete_across_multiple_row_groups() {
2607        use arrow_array::{Int32Array, Int64Array};
2608        use parquet::file::reader::{FileReader, SerializedFileReader};
2609
2610        // Field IDs for positional delete schema
2611        const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2612        const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2613
2614        let tmp_dir = TempDir::new().unwrap();
2615        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2616
2617        // Create table schema with a single 'id' column
2618        let table_schema = Arc::new(
2619            Schema::builder()
2620                .with_schema_id(1)
2621                .with_fields(vec![
2622                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2623                ])
2624                .build()
2625                .unwrap(),
2626        );
2627
2628        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2629            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2630                PARQUET_FIELD_ID_META_KEY.to_string(),
2631                "1".to_string(),
2632            )])),
2633        ]));
2634
2635        // Step 1: Create data file with 200 rows in 2 row groups
2636        // Row group 0: rows 0-99 (ids 1-100)
2637        // Row group 1: rows 100-199 (ids 101-200)
2638        let data_file_path = format!("{table_location}/data.parquet");
2639
2640        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2641            Int32Array::from_iter_values(1..=100),
2642        )])
2643        .unwrap();
2644
2645        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2646            Int32Array::from_iter_values(101..=200),
2647        )])
2648        .unwrap();
2649
2650        // Force each batch into its own row group
2651        let props = WriterProperties::builder()
2652            .set_compression(Compression::SNAPPY)
2653            .set_max_row_group_size(100)
2654            .build();
2655
2656        let file = File::create(&data_file_path).unwrap();
2657        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2658        writer.write(&batch1).expect("Writing batch 1");
2659        writer.write(&batch2).expect("Writing batch 2");
2660        writer.close().unwrap();
2661
2662        // Verify we created 2 row groups
2663        let verify_file = File::open(&data_file_path).unwrap();
2664        let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2665        assert_eq!(
2666            verify_reader.metadata().num_row_groups(),
2667            2,
2668            "Should have 2 row groups"
2669        );
2670
2671        // Step 2: Create position delete file that deletes row 199 (id=200, last row in row group 1)
2672        let delete_file_path = format!("{table_location}/deletes.parquet");
2673
2674        let delete_schema = Arc::new(ArrowSchema::new(vec![
2675            Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2676                PARQUET_FIELD_ID_META_KEY.to_string(),
2677                FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2678            )])),
2679            Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2680                PARQUET_FIELD_ID_META_KEY.to_string(),
2681                FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2682            )])),
2683        ]));
2684
2685        // Delete row at position 199 (0-indexed, so it's the last row: id=200)
2686        let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2687            Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2688            Arc::new(Int64Array::from_iter_values(vec![199i64])),
2689        ])
2690        .unwrap();
2691
2692        let delete_props = WriterProperties::builder()
2693            .set_compression(Compression::SNAPPY)
2694            .build();
2695
2696        let delete_file = File::create(&delete_file_path).unwrap();
2697        let mut delete_writer =
2698            ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2699        delete_writer.write(&delete_batch).unwrap();
2700        delete_writer.close().unwrap();
2701
2702        // Step 3: Read the data file with the delete applied
2703        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2704        let reader = ArrowReaderBuilder::new(file_io).build();
2705
2706        let task = FileScanTask {
2707            start: 0,
2708            length: 0,
2709            record_count: Some(200),
2710            data_file_path: data_file_path.clone(),
2711            data_file_format: DataFileFormat::Parquet,
2712            schema: table_schema.clone(),
2713            project_field_ids: vec![1],
2714            predicate: None,
2715            deletes: vec![FileScanTaskDeleteFile {
2716                file_path: delete_file_path,
2717                file_type: DataContentType::PositionDeletes,
2718                partition_spec_id: 0,
2719                equality_ids: None,
2720            }],
2721            partition: None,
2722            partition_spec: None,
2723            name_mapping: None,
2724            case_sensitive: false,
2725        };
2726
2727        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
2728        let result = reader
2729            .read(tasks)
2730            .unwrap()
2731            .try_collect::<Vec<RecordBatch>>()
2732            .await
2733            .unwrap();
2734
2735        // Step 4: Verify we got 199 rows (not 200)
2736        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
2737
2738        println!("Total rows read: {total_rows}");
2739        println!("Expected: 199 rows (deleted row 199 which had id=200)");
2740
2741        // This assertion will FAIL before the fix and PASS after the fix
2742        assert_eq!(
2743            total_rows, 199,
2744            "Expected 199 rows after deleting row 199, but got {total_rows} rows. \
2745             The bug causes position deletes in later row groups to be ignored."
2746        );
2747
2748        // Verify the deleted row (id=200) is not present
2749        let all_ids: Vec<i32> = result
2750            .iter()
2751            .flat_map(|batch| {
2752                batch
2753                    .column(0)
2754                    .as_primitive::<arrow_array::types::Int32Type>()
2755                    .values()
2756                    .iter()
2757                    .copied()
2758            })
2759            .collect();
2760
2761        assert!(
2762            !all_ids.contains(&200),
2763            "Row with id=200 should be deleted but was found in results"
2764        );
2765
2766        // Verify we have all other ids (1-199)
2767        let expected_ids: Vec<i32> = (1..=199).collect();
2768        assert_eq!(
2769            all_ids, expected_ids,
2770            "Should have ids 1-199 but got different values"
2771        );
2772    }
2773
2774    /// Test for bug where position deletes are lost when skipping unselected row groups.
2775    ///
2776    /// This is a variant of `test_position_delete_across_multiple_row_groups` that exercises
2777    /// the row group selection code path (`selected_row_groups: Some([...])`).
2778    ///
2779    /// When a file has multiple row groups and only some are selected for reading,
2780    /// the `build_deletes_row_selection` function must correctly skip over deletes in
2781    /// unselected row groups WITHOUT consuming deletes that belong to selected row groups.
2782    ///
2783    /// This test creates:
2784    /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
2785    /// - A position delete file that deletes row 199 (last row in second row group)
2786    /// - Row group selection that reads ONLY row group 1 (rows 100-199)
2787    ///
2788    /// Expected behavior: Should return 99 rows (with row 199 deleted)
2789    /// Bug behavior: Returns 100 rows (delete is lost when skipping row group 0)
2790    ///
2791    /// The bug occurs when processing row group 0 (unselected):
2792    /// ```rust
2793    /// delete_vector_iter.advance_to(next_row_group_base_idx); // Position at first delete >= 100
2794    /// next_deleted_row_idx_opt = delete_vector_iter.next(); // BUG: Consumes delete at 199!
2795    /// ```
2796    ///
2797    /// The fix is to NOT call `next()` after `advance_to()` when skipping unselected row groups,
2798    /// because `advance_to()` already positions the iterator correctly without consuming elements.
2799    #[tokio::test]
2800    async fn test_position_delete_with_row_group_selection() {
2801        use arrow_array::{Int32Array, Int64Array};
2802        use parquet::file::reader::{FileReader, SerializedFileReader};
2803
2804        // Field IDs for positional delete schema
2805        const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2806        const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2807
2808        let tmp_dir = TempDir::new().unwrap();
2809        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2810
2811        // Create table schema with a single 'id' column
2812        let table_schema = Arc::new(
2813            Schema::builder()
2814                .with_schema_id(1)
2815                .with_fields(vec![
2816                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2817                ])
2818                .build()
2819                .unwrap(),
2820        );
2821
2822        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2823            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2824                PARQUET_FIELD_ID_META_KEY.to_string(),
2825                "1".to_string(),
2826            )])),
2827        ]));
2828
2829        // Step 1: Create data file with 200 rows in 2 row groups
2830        // Row group 0: rows 0-99 (ids 1-100)
2831        // Row group 1: rows 100-199 (ids 101-200)
2832        let data_file_path = format!("{table_location}/data.parquet");
2833
2834        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2835            Int32Array::from_iter_values(1..=100),
2836        )])
2837        .unwrap();
2838
2839        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2840            Int32Array::from_iter_values(101..=200),
2841        )])
2842        .unwrap();
2843
2844        // Force each batch into its own row group
2845        let props = WriterProperties::builder()
2846            .set_compression(Compression::SNAPPY)
2847            .set_max_row_group_size(100)
2848            .build();
2849
2850        let file = File::create(&data_file_path).unwrap();
2851        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2852        writer.write(&batch1).expect("Writing batch 1");
2853        writer.write(&batch2).expect("Writing batch 2");
2854        writer.close().unwrap();
2855
2856        // Verify we created 2 row groups
2857        let verify_file = File::open(&data_file_path).unwrap();
2858        let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2859        assert_eq!(
2860            verify_reader.metadata().num_row_groups(),
2861            2,
2862            "Should have 2 row groups"
2863        );
2864
2865        // Step 2: Create position delete file that deletes row 199 (id=200, last row in row group 1)
2866        let delete_file_path = format!("{table_location}/deletes.parquet");
2867
2868        let delete_schema = Arc::new(ArrowSchema::new(vec![
2869            Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2870                PARQUET_FIELD_ID_META_KEY.to_string(),
2871                FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2872            )])),
2873            Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2874                PARQUET_FIELD_ID_META_KEY.to_string(),
2875                FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2876            )])),
2877        ]));
2878
2879        // Delete row at position 199 (0-indexed, so it's the last row: id=200)
2880        let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2881            Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2882            Arc::new(Int64Array::from_iter_values(vec![199i64])),
2883        ])
2884        .unwrap();
2885
2886        let delete_props = WriterProperties::builder()
2887            .set_compression(Compression::SNAPPY)
2888            .build();
2889
2890        let delete_file = File::create(&delete_file_path).unwrap();
2891        let mut delete_writer =
2892            ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2893        delete_writer.write(&delete_batch).unwrap();
2894        delete_writer.close().unwrap();
2895
2896        // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199)
2897        // This exercises the row group selection code path where row group 0 is skipped
2898        let metadata_file = File::open(&data_file_path).unwrap();
2899        let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
2900        let metadata = metadata_reader.metadata();
2901
2902        let row_group_0 = metadata.row_group(0);
2903        let row_group_1 = metadata.row_group(1);
2904
2905        let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
2906        let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2907        let rg1_length = row_group_1.compressed_size() as u64;
2908
2909        println!(
2910            "Row group 0: starts at byte {}, {} bytes compressed",
2911            rg0_start,
2912            row_group_0.compressed_size()
2913        );
2914        println!(
2915            "Row group 1: starts at byte {}, {} bytes compressed",
2916            rg1_start,
2917            row_group_1.compressed_size()
2918        );
2919
2920        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2921        let reader = ArrowReaderBuilder::new(file_io).build();
2922
2923        // Create FileScanTask that reads ONLY row group 1 via byte range filtering
2924        let task = FileScanTask {
2925            start: rg1_start,
2926            length: rg1_length,
2927            record_count: Some(100), // Row group 1 has 100 rows
2928            data_file_path: data_file_path.clone(),
2929            data_file_format: DataFileFormat::Parquet,
2930            schema: table_schema.clone(),
2931            project_field_ids: vec![1],
2932            predicate: None,
2933            deletes: vec![FileScanTaskDeleteFile {
2934                file_path: delete_file_path,
2935                file_type: DataContentType::PositionDeletes,
2936                partition_spec_id: 0,
2937                equality_ids: None,
2938            }],
2939            partition: None,
2940            partition_spec: None,
2941            name_mapping: None,
2942            case_sensitive: false,
2943        };
2944
2945        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
2946        let result = reader
2947            .read(tasks)
2948            .unwrap()
2949            .try_collect::<Vec<RecordBatch>>()
2950            .await
2951            .unwrap();
2952
2953        // Step 4: Verify we got 99 rows (not 100)
2954        // Row group 1 has 100 rows (ids 101-200), minus 1 delete (id=200) = 99 rows
2955        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
2956
2957        println!("Total rows read from row group 1: {total_rows}");
2958        println!("Expected: 99 rows (row group 1 has 100 rows, 1 delete at position 199)");
2959
2960        // This assertion will FAIL before the fix and PASS after the fix
2961        assert_eq!(
2962            total_rows, 99,
2963            "Expected 99 rows from row group 1 after deleting position 199, but got {total_rows} rows. \
2964             The bug causes position deletes to be lost when advance_to() is followed by next() \
2965             when skipping unselected row groups."
2966        );
2967
2968        // Verify the deleted row (id=200) is not present
2969        let all_ids: Vec<i32> = result
2970            .iter()
2971            .flat_map(|batch| {
2972                batch
2973                    .column(0)
2974                    .as_primitive::<arrow_array::types::Int32Type>()
2975                    .values()
2976                    .iter()
2977                    .copied()
2978            })
2979            .collect();
2980
2981        assert!(
2982            !all_ids.contains(&200),
2983            "Row with id=200 should be deleted but was found in results"
2984        );
2985
2986        // Verify we have ids 101-199 (not 101-200)
2987        let expected_ids: Vec<i32> = (101..=199).collect();
2988        assert_eq!(
2989            all_ids, expected_ids,
2990            "Should have ids 101-199 but got different values"
2991        );
2992    }
2993    /// Test for bug where stale cached delete causes infinite loop when skipping row groups.
2994    ///
2995    /// This test exposes the inverse scenario of `test_position_delete_with_row_group_selection`:
2996    /// - Position delete targets a row in the SKIPPED row group (not the selected one)
2997    /// - After calling advance_to(), the cached delete index is stale
2998    /// - Without updating the cache, the code enters an infinite loop
2999    ///
3000    /// This test creates:
3001    /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
3002    /// - A position delete file that deletes row 0 (first row in SKIPPED row group 0)
3003    /// - Row group selection that reads ONLY row group 1 (rows 100-199)
3004    ///
3005    /// The bug occurs when skipping row group 0:
3006    /// ```rust
3007    /// let mut next_deleted_row_idx_opt = delete_vector_iter.next(); // Some(0)
3008    /// // ... skip to row group 1 ...
3009    /// delete_vector_iter.advance_to(100); // Iterator advances past delete at 0
3010    /// // BUG: next_deleted_row_idx_opt is still Some(0) - STALE!
3011    /// // When processing row group 1:
3012    /// //   current_idx = 100, next_deleted_row_idx = 0, next_row_group_base_idx = 200
3013    /// //   Loop condition: 0 < 200 (true)
3014    /// //   But: current_idx (100) > next_deleted_row_idx (0)
3015    /// //   And: current_idx (100) != next_deleted_row_idx (0)
3016    /// //   Neither branch executes -> INFINITE LOOP!
3017    /// ```
3018    ///
3019    /// Expected behavior: Should return 100 rows (delete at 0 doesn't affect row group 1)
3020    /// Bug behavior: Infinite loop in build_deletes_row_selection
3021    #[tokio::test]
3022    async fn test_position_delete_in_skipped_row_group() {
3023        use arrow_array::{Int32Array, Int64Array};
3024        use parquet::file::reader::{FileReader, SerializedFileReader};
3025
3026        // Field IDs for positional delete schema
3027        const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
3028        const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
3029
3030        let tmp_dir = TempDir::new().unwrap();
3031        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3032
3033        // Create table schema with a single 'id' column
3034        let table_schema = Arc::new(
3035            Schema::builder()
3036                .with_schema_id(1)
3037                .with_fields(vec![
3038                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3039                ])
3040                .build()
3041                .unwrap(),
3042        );
3043
3044        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3045            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3046                PARQUET_FIELD_ID_META_KEY.to_string(),
3047                "1".to_string(),
3048            )])),
3049        ]));
3050
3051        // Step 1: Create data file with 200 rows in 2 row groups
3052        // Row group 0: rows 0-99 (ids 1-100)
3053        // Row group 1: rows 100-199 (ids 101-200)
3054        let data_file_path = format!("{table_location}/data.parquet");
3055
3056        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3057            Int32Array::from_iter_values(1..=100),
3058        )])
3059        .unwrap();
3060
3061        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3062            Int32Array::from_iter_values(101..=200),
3063        )])
3064        .unwrap();
3065
3066        // Force each batch into its own row group
3067        let props = WriterProperties::builder()
3068            .set_compression(Compression::SNAPPY)
3069            .set_max_row_group_size(100)
3070            .build();
3071
3072        let file = File::create(&data_file_path).unwrap();
3073        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3074        writer.write(&batch1).expect("Writing batch 1");
3075        writer.write(&batch2).expect("Writing batch 2");
3076        writer.close().unwrap();
3077
3078        // Verify we created 2 row groups
3079        let verify_file = File::open(&data_file_path).unwrap();
3080        let verify_reader = SerializedFileReader::new(verify_file).unwrap();
3081        assert_eq!(
3082            verify_reader.metadata().num_row_groups(),
3083            2,
3084            "Should have 2 row groups"
3085        );
3086
3087        // Step 2: Create position delete file that deletes row 0 (id=1, first row in row group 0)
3088        let delete_file_path = format!("{table_location}/deletes.parquet");
3089
3090        let delete_schema = Arc::new(ArrowSchema::new(vec![
3091            Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
3092                PARQUET_FIELD_ID_META_KEY.to_string(),
3093                FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
3094            )])),
3095            Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
3096                PARQUET_FIELD_ID_META_KEY.to_string(),
3097                FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
3098            )])),
3099        ]));
3100
3101        // Delete row at position 0 (0-indexed, so it's the first row: id=1)
3102        let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
3103            Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
3104            Arc::new(Int64Array::from_iter_values(vec![0i64])),
3105        ])
3106        .unwrap();
3107
3108        let delete_props = WriterProperties::builder()
3109            .set_compression(Compression::SNAPPY)
3110            .build();
3111
3112        let delete_file = File::create(&delete_file_path).unwrap();
3113        let mut delete_writer =
3114            ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
3115        delete_writer.write(&delete_batch).unwrap();
3116        delete_writer.close().unwrap();
3117
3118        // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199)
3119        // This exercises the row group selection code path where row group 0 is skipped
3120        let metadata_file = File::open(&data_file_path).unwrap();
3121        let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
3122        let metadata = metadata_reader.metadata();
3123
3124        let row_group_0 = metadata.row_group(0);
3125        let row_group_1 = metadata.row_group(1);
3126
3127        let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
3128        let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
3129        let rg1_length = row_group_1.compressed_size() as u64;
3130
3131        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3132        let reader = ArrowReaderBuilder::new(file_io).build();
3133
3134        // Create FileScanTask that reads ONLY row group 1 via byte range filtering
3135        let task = FileScanTask {
3136            start: rg1_start,
3137            length: rg1_length,
3138            record_count: Some(100), // Row group 1 has 100 rows
3139            data_file_path: data_file_path.clone(),
3140            data_file_format: DataFileFormat::Parquet,
3141            schema: table_schema.clone(),
3142            project_field_ids: vec![1],
3143            predicate: None,
3144            deletes: vec![FileScanTaskDeleteFile {
3145                file_path: delete_file_path,
3146                file_type: DataContentType::PositionDeletes,
3147                partition_spec_id: 0,
3148                equality_ids: None,
3149            }],
3150            partition: None,
3151            partition_spec: None,
3152            name_mapping: None,
3153            case_sensitive: false,
3154        };
3155
3156        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
3157        let result = reader
3158            .read(tasks)
3159            .unwrap()
3160            .try_collect::<Vec<RecordBatch>>()
3161            .await
3162            .unwrap();
3163
3164        // Step 4: Verify we got 100 rows (all of row group 1)
3165        // The delete at position 0 is in row group 0, which is skipped, so it doesn't affect us
3166        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
3167
3168        assert_eq!(
3169            total_rows, 100,
3170            "Expected 100 rows from row group 1 (delete at position 0 is in skipped row group 0). \
3171             If this hangs or fails, it indicates the cached delete index was not updated after advance_to()."
3172        );
3173
3174        // Verify we have all ids from row group 1 (101-200)
3175        let all_ids: Vec<i32> = result
3176            .iter()
3177            .flat_map(|batch| {
3178                batch
3179                    .column(0)
3180                    .as_primitive::<arrow_array::types::Int32Type>()
3181                    .values()
3182                    .iter()
3183                    .copied()
3184            })
3185            .collect();
3186
3187        let expected_ids: Vec<i32> = (101..=200).collect();
3188        assert_eq!(
3189            all_ids, expected_ids,
3190            "Should have ids 101-200 (all of row group 1)"
3191        );
3192    }
3193
3194    /// Test reading Parquet files without field ID metadata (e.g., migrated tables).
3195    /// This exercises the position-based fallback path.
3196    ///
3197    /// Corresponds to Java's ParquetSchemaUtil.addFallbackIds() + pruneColumnsFallback()
3198    /// in /parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java
3199    #[tokio::test]
3200    async fn test_read_parquet_file_without_field_ids() {
3201        let schema = Arc::new(
3202            Schema::builder()
3203                .with_schema_id(1)
3204                .with_fields(vec![
3205                    NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3206                    NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3207                ])
3208                .build()
3209                .unwrap(),
3210        );
3211
3212        // Parquet file from a migrated table - no field ID metadata
3213        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3214            Field::new("name", DataType::Utf8, false),
3215            Field::new("age", DataType::Int32, false),
3216        ]));
3217
3218        let tmp_dir = TempDir::new().unwrap();
3219        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3220        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3221
3222        let name_data = vec!["Alice", "Bob", "Charlie"];
3223        let age_data = vec![30, 25, 35];
3224
3225        use arrow_array::Int32Array;
3226        let name_col = Arc::new(StringArray::from(name_data.clone())) as ArrayRef;
3227        let age_col = Arc::new(Int32Array::from(age_data.clone())) as ArrayRef;
3228
3229        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![name_col, age_col]).unwrap();
3230
3231        let props = WriterProperties::builder()
3232            .set_compression(Compression::SNAPPY)
3233            .build();
3234
3235        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3236        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3237
3238        writer.write(&to_write).expect("Writing batch");
3239        writer.close().unwrap();
3240
3241        let reader = ArrowReaderBuilder::new(file_io).build();
3242
3243        let tasks = Box::pin(futures::stream::iter(
3244            vec![Ok(FileScanTask {
3245                start: 0,
3246                length: 0,
3247                record_count: None,
3248                data_file_path: format!("{table_location}/1.parquet"),
3249                data_file_format: DataFileFormat::Parquet,
3250                schema: schema.clone(),
3251                project_field_ids: vec![1, 2],
3252                predicate: None,
3253                deletes: vec![],
3254                partition: None,
3255                partition_spec: None,
3256                name_mapping: None,
3257                case_sensitive: false,
3258            })]
3259            .into_iter(),
3260        )) as FileScanTaskStream;
3261
3262        let result = reader
3263            .read(tasks)
3264            .unwrap()
3265            .try_collect::<Vec<RecordBatch>>()
3266            .await
3267            .unwrap();
3268
3269        assert_eq!(result.len(), 1);
3270        let batch = &result[0];
3271        assert_eq!(batch.num_rows(), 3);
3272        assert_eq!(batch.num_columns(), 2);
3273
3274        // Verify position-based mapping: field_id 1 → position 0, field_id 2 → position 1
3275        let name_array = batch.column(0).as_string::<i32>();
3276        assert_eq!(name_array.value(0), "Alice");
3277        assert_eq!(name_array.value(1), "Bob");
3278        assert_eq!(name_array.value(2), "Charlie");
3279
3280        let age_array = batch
3281            .column(1)
3282            .as_primitive::<arrow_array::types::Int32Type>();
3283        assert_eq!(age_array.value(0), 30);
3284        assert_eq!(age_array.value(1), 25);
3285        assert_eq!(age_array.value(2), 35);
3286    }
3287
3288    /// Test reading Parquet files without field IDs with partial projection.
3289    /// Only a subset of columns are requested, verifying position-based fallback
3290    /// handles column selection correctly.
3291    #[tokio::test]
3292    async fn test_read_parquet_without_field_ids_partial_projection() {
3293        use arrow_array::Int32Array;
3294
3295        let schema = Arc::new(
3296            Schema::builder()
3297                .with_schema_id(1)
3298                .with_fields(vec![
3299                    NestedField::required(1, "col1", Type::Primitive(PrimitiveType::String)).into(),
3300                    NestedField::required(2, "col2", Type::Primitive(PrimitiveType::Int)).into(),
3301                    NestedField::required(3, "col3", Type::Primitive(PrimitiveType::String)).into(),
3302                    NestedField::required(4, "col4", Type::Primitive(PrimitiveType::Int)).into(),
3303                ])
3304                .build()
3305                .unwrap(),
3306        );
3307
3308        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3309            Field::new("col1", DataType::Utf8, false),
3310            Field::new("col2", DataType::Int32, false),
3311            Field::new("col3", DataType::Utf8, false),
3312            Field::new("col4", DataType::Int32, false),
3313        ]));
3314
3315        let tmp_dir = TempDir::new().unwrap();
3316        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3317        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3318
3319        let col1_data = Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef;
3320        let col2_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3321        let col3_data = Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef;
3322        let col4_data = Arc::new(Int32Array::from(vec![30, 40])) as ArrayRef;
3323
3324        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
3325            col1_data, col2_data, col3_data, col4_data,
3326        ])
3327        .unwrap();
3328
3329        let props = WriterProperties::builder()
3330            .set_compression(Compression::SNAPPY)
3331            .build();
3332
3333        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3334        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3335
3336        writer.write(&to_write).expect("Writing batch");
3337        writer.close().unwrap();
3338
3339        let reader = ArrowReaderBuilder::new(file_io).build();
3340
3341        let tasks = Box::pin(futures::stream::iter(
3342            vec![Ok(FileScanTask {
3343                start: 0,
3344                length: 0,
3345                record_count: None,
3346                data_file_path: format!("{table_location}/1.parquet"),
3347                data_file_format: DataFileFormat::Parquet,
3348                schema: schema.clone(),
3349                project_field_ids: vec![1, 3],
3350                predicate: None,
3351                deletes: vec![],
3352                partition: None,
3353                partition_spec: None,
3354                name_mapping: None,
3355                case_sensitive: false,
3356            })]
3357            .into_iter(),
3358        )) as FileScanTaskStream;
3359
3360        let result = reader
3361            .read(tasks)
3362            .unwrap()
3363            .try_collect::<Vec<RecordBatch>>()
3364            .await
3365            .unwrap();
3366
3367        assert_eq!(result.len(), 1);
3368        let batch = &result[0];
3369        assert_eq!(batch.num_rows(), 2);
3370        assert_eq!(batch.num_columns(), 2);
3371
3372        let col1_array = batch.column(0).as_string::<i32>();
3373        assert_eq!(col1_array.value(0), "a");
3374        assert_eq!(col1_array.value(1), "b");
3375
3376        let col3_array = batch.column(1).as_string::<i32>();
3377        assert_eq!(col3_array.value(0), "c");
3378        assert_eq!(col3_array.value(1), "d");
3379    }
3380
3381    /// Test reading Parquet files without field IDs with schema evolution.
3382    /// The Iceberg schema has more fields than the Parquet file, testing that
3383    /// missing columns are filled with NULLs.
3384    #[tokio::test]
3385    async fn test_read_parquet_without_field_ids_schema_evolution() {
3386        use arrow_array::{Array, Int32Array};
3387
3388        // Schema with field 3 added after the file was written
3389        let schema = Arc::new(
3390            Schema::builder()
3391                .with_schema_id(1)
3392                .with_fields(vec![
3393                    NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3394                    NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3395                    NestedField::optional(3, "city", Type::Primitive(PrimitiveType::String)).into(),
3396                ])
3397                .build()
3398                .unwrap(),
3399        );
3400
3401        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3402            Field::new("name", DataType::Utf8, false),
3403            Field::new("age", DataType::Int32, false),
3404        ]));
3405
3406        let tmp_dir = TempDir::new().unwrap();
3407        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3408        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3409
3410        let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3411        let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3412
3413        let to_write =
3414            RecordBatch::try_new(arrow_schema.clone(), vec![name_data, age_data]).unwrap();
3415
3416        let props = WriterProperties::builder()
3417            .set_compression(Compression::SNAPPY)
3418            .build();
3419
3420        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3421        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3422
3423        writer.write(&to_write).expect("Writing batch");
3424        writer.close().unwrap();
3425
3426        let reader = ArrowReaderBuilder::new(file_io).build();
3427
3428        let tasks = Box::pin(futures::stream::iter(
3429            vec![Ok(FileScanTask {
3430                start: 0,
3431                length: 0,
3432                record_count: None,
3433                data_file_path: format!("{table_location}/1.parquet"),
3434                data_file_format: DataFileFormat::Parquet,
3435                schema: schema.clone(),
3436                project_field_ids: vec![1, 2, 3],
3437                predicate: None,
3438                deletes: vec![],
3439                partition: None,
3440                partition_spec: None,
3441                name_mapping: None,
3442                case_sensitive: false,
3443            })]
3444            .into_iter(),
3445        )) as FileScanTaskStream;
3446
3447        let result = reader
3448            .read(tasks)
3449            .unwrap()
3450            .try_collect::<Vec<RecordBatch>>()
3451            .await
3452            .unwrap();
3453
3454        assert_eq!(result.len(), 1);
3455        let batch = &result[0];
3456        assert_eq!(batch.num_rows(), 2);
3457        assert_eq!(batch.num_columns(), 3);
3458
3459        let name_array = batch.column(0).as_string::<i32>();
3460        assert_eq!(name_array.value(0), "Alice");
3461        assert_eq!(name_array.value(1), "Bob");
3462
3463        let age_array = batch
3464            .column(1)
3465            .as_primitive::<arrow_array::types::Int32Type>();
3466        assert_eq!(age_array.value(0), 30);
3467        assert_eq!(age_array.value(1), 25);
3468
3469        // Verify missing column filled with NULLs
3470        let city_array = batch.column(2).as_string::<i32>();
3471        assert_eq!(city_array.null_count(), 2);
3472        assert!(city_array.is_null(0));
3473        assert!(city_array.is_null(1));
3474    }
3475
3476    /// Test reading Parquet files without field IDs that have multiple row groups.
3477    /// This ensures the position-based fallback works correctly across row group boundaries.
3478    #[tokio::test]
3479    async fn test_read_parquet_without_field_ids_multiple_row_groups() {
3480        use arrow_array::Int32Array;
3481
3482        let schema = Arc::new(
3483            Schema::builder()
3484                .with_schema_id(1)
3485                .with_fields(vec![
3486                    NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3487                    NestedField::required(2, "value", Type::Primitive(PrimitiveType::Int)).into(),
3488                ])
3489                .build()
3490                .unwrap(),
3491        );
3492
3493        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3494            Field::new("name", DataType::Utf8, false),
3495            Field::new("value", DataType::Int32, false),
3496        ]));
3497
3498        let tmp_dir = TempDir::new().unwrap();
3499        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3500        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3501
3502        // Small row group size to create multiple row groups
3503        let props = WriterProperties::builder()
3504            .set_compression(Compression::SNAPPY)
3505            .set_write_batch_size(2)
3506            .set_max_row_group_size(2)
3507            .build();
3508
3509        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3510        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3511
3512        // Write 6 rows in 3 batches (will create 3 row groups)
3513        for batch_num in 0..3 {
3514            let name_data = Arc::new(StringArray::from(vec![
3515                format!("name_{}", batch_num * 2),
3516                format!("name_{}", batch_num * 2 + 1),
3517            ])) as ArrayRef;
3518            let value_data =
3519                Arc::new(Int32Array::from(vec![batch_num * 2, batch_num * 2 + 1])) as ArrayRef;
3520
3521            let batch =
3522                RecordBatch::try_new(arrow_schema.clone(), vec![name_data, value_data]).unwrap();
3523            writer.write(&batch).expect("Writing batch");
3524        }
3525        writer.close().unwrap();
3526
3527        let reader = ArrowReaderBuilder::new(file_io).build();
3528
3529        let tasks = Box::pin(futures::stream::iter(
3530            vec![Ok(FileScanTask {
3531                start: 0,
3532                length: 0,
3533                record_count: None,
3534                data_file_path: format!("{table_location}/1.parquet"),
3535                data_file_format: DataFileFormat::Parquet,
3536                schema: schema.clone(),
3537                project_field_ids: vec![1, 2],
3538                predicate: None,
3539                deletes: vec![],
3540                partition: None,
3541                partition_spec: None,
3542                name_mapping: None,
3543                case_sensitive: false,
3544            })]
3545            .into_iter(),
3546        )) as FileScanTaskStream;
3547
3548        let result = reader
3549            .read(tasks)
3550            .unwrap()
3551            .try_collect::<Vec<RecordBatch>>()
3552            .await
3553            .unwrap();
3554
3555        assert!(!result.is_empty());
3556
3557        let mut all_names = Vec::new();
3558        let mut all_values = Vec::new();
3559
3560        for batch in &result {
3561            let name_array = batch.column(0).as_string::<i32>();
3562            let value_array = batch
3563                .column(1)
3564                .as_primitive::<arrow_array::types::Int32Type>();
3565
3566            for i in 0..batch.num_rows() {
3567                all_names.push(name_array.value(i).to_string());
3568                all_values.push(value_array.value(i));
3569            }
3570        }
3571
3572        assert_eq!(all_names.len(), 6);
3573        assert_eq!(all_values.len(), 6);
3574
3575        for i in 0..6 {
3576            assert_eq!(all_names[i], format!("name_{i}"));
3577            assert_eq!(all_values[i], i as i32);
3578        }
3579    }
3580
3581    /// Test reading Parquet files without field IDs with nested types (struct).
3582    /// Java's pruneColumnsFallback() projects entire top-level columns including nested content.
3583    /// This test verifies that a top-level struct field is projected correctly with all its nested fields.
3584    #[tokio::test]
3585    async fn test_read_parquet_without_field_ids_with_struct() {
3586        use arrow_array::{Int32Array, StructArray};
3587        use arrow_schema::Fields;
3588
3589        let schema = Arc::new(
3590            Schema::builder()
3591                .with_schema_id(1)
3592                .with_fields(vec![
3593                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3594                    NestedField::required(
3595                        2,
3596                        "person",
3597                        Type::Struct(crate::spec::StructType::new(vec![
3598                            NestedField::required(
3599                                3,
3600                                "name",
3601                                Type::Primitive(PrimitiveType::String),
3602                            )
3603                            .into(),
3604                            NestedField::required(4, "age", Type::Primitive(PrimitiveType::Int))
3605                                .into(),
3606                        ])),
3607                    )
3608                    .into(),
3609                ])
3610                .build()
3611                .unwrap(),
3612        );
3613
3614        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3615            Field::new("id", DataType::Int32, false),
3616            Field::new(
3617                "person",
3618                DataType::Struct(Fields::from(vec![
3619                    Field::new("name", DataType::Utf8, false),
3620                    Field::new("age", DataType::Int32, false),
3621                ])),
3622                false,
3623            ),
3624        ]));
3625
3626        let tmp_dir = TempDir::new().unwrap();
3627        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3628        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3629
3630        let id_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3631        let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3632        let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3633        let person_data = Arc::new(StructArray::from(vec![
3634            (
3635                Arc::new(Field::new("name", DataType::Utf8, false)),
3636                name_data,
3637            ),
3638            (
3639                Arc::new(Field::new("age", DataType::Int32, false)),
3640                age_data,
3641            ),
3642        ])) as ArrayRef;
3643
3644        let to_write =
3645            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, person_data]).unwrap();
3646
3647        let props = WriterProperties::builder()
3648            .set_compression(Compression::SNAPPY)
3649            .build();
3650
3651        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3652        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3653
3654        writer.write(&to_write).expect("Writing batch");
3655        writer.close().unwrap();
3656
3657        let reader = ArrowReaderBuilder::new(file_io).build();
3658
3659        let tasks = Box::pin(futures::stream::iter(
3660            vec![Ok(FileScanTask {
3661                start: 0,
3662                length: 0,
3663                record_count: None,
3664                data_file_path: format!("{table_location}/1.parquet"),
3665                data_file_format: DataFileFormat::Parquet,
3666                schema: schema.clone(),
3667                project_field_ids: vec![1, 2],
3668                predicate: None,
3669                deletes: vec![],
3670                partition: None,
3671                partition_spec: None,
3672                name_mapping: None,
3673                case_sensitive: false,
3674            })]
3675            .into_iter(),
3676        )) as FileScanTaskStream;
3677
3678        let result = reader
3679            .read(tasks)
3680            .unwrap()
3681            .try_collect::<Vec<RecordBatch>>()
3682            .await
3683            .unwrap();
3684
3685        assert_eq!(result.len(), 1);
3686        let batch = &result[0];
3687        assert_eq!(batch.num_rows(), 2);
3688        assert_eq!(batch.num_columns(), 2);
3689
3690        let id_array = batch
3691            .column(0)
3692            .as_primitive::<arrow_array::types::Int32Type>();
3693        assert_eq!(id_array.value(0), 1);
3694        assert_eq!(id_array.value(1), 2);
3695
3696        let person_array = batch.column(1).as_struct();
3697        assert_eq!(person_array.num_columns(), 2);
3698
3699        let name_array = person_array.column(0).as_string::<i32>();
3700        assert_eq!(name_array.value(0), "Alice");
3701        assert_eq!(name_array.value(1), "Bob");
3702
3703        let age_array = person_array
3704            .column(1)
3705            .as_primitive::<arrow_array::types::Int32Type>();
3706        assert_eq!(age_array.value(0), 30);
3707        assert_eq!(age_array.value(1), 25);
3708    }
3709
3710    /// Test reading Parquet files without field IDs with schema evolution - column added in the middle.
3711    /// When a new column is inserted between existing columns in the schema order,
3712    /// the fallback projection must correctly map field IDs to output positions.
3713    #[tokio::test]
3714    async fn test_read_parquet_without_field_ids_schema_evolution_add_column_in_middle() {
3715        use arrow_array::{Array, Int32Array};
3716
3717        let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
3718            Field::new("col0", DataType::Int32, true),
3719            Field::new("col1", DataType::Int32, true),
3720        ]));
3721
3722        // New column added between existing columns: col0 (id=1), newCol (id=5), col1 (id=2)
3723        let schema = Arc::new(
3724            Schema::builder()
3725                .with_schema_id(1)
3726                .with_fields(vec![
3727                    NestedField::optional(1, "col0", Type::Primitive(PrimitiveType::Int)).into(),
3728                    NestedField::optional(5, "newCol", Type::Primitive(PrimitiveType::Int)).into(),
3729                    NestedField::optional(2, "col1", Type::Primitive(PrimitiveType::Int)).into(),
3730                ])
3731                .build()
3732                .unwrap(),
3733        );
3734
3735        let tmp_dir = TempDir::new().unwrap();
3736        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3737        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3738
3739        let col0_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3740        let col1_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3741
3742        let to_write =
3743            RecordBatch::try_new(arrow_schema_old.clone(), vec![col0_data, col1_data]).unwrap();
3744
3745        let props = WriterProperties::builder()
3746            .set_compression(Compression::SNAPPY)
3747            .build();
3748
3749        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3750        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3751        writer.write(&to_write).expect("Writing batch");
3752        writer.close().unwrap();
3753
3754        let reader = ArrowReaderBuilder::new(file_io).build();
3755
3756        let tasks = Box::pin(futures::stream::iter(
3757            vec![Ok(FileScanTask {
3758                start: 0,
3759                length: 0,
3760                record_count: None,
3761                data_file_path: format!("{table_location}/1.parquet"),
3762                data_file_format: DataFileFormat::Parquet,
3763                schema: schema.clone(),
3764                project_field_ids: vec![1, 5, 2],
3765                predicate: None,
3766                deletes: vec![],
3767                partition: None,
3768                partition_spec: None,
3769                name_mapping: None,
3770                case_sensitive: false,
3771            })]
3772            .into_iter(),
3773        )) as FileScanTaskStream;
3774
3775        let result = reader
3776            .read(tasks)
3777            .unwrap()
3778            .try_collect::<Vec<RecordBatch>>()
3779            .await
3780            .unwrap();
3781
3782        assert_eq!(result.len(), 1);
3783        let batch = &result[0];
3784        assert_eq!(batch.num_rows(), 2);
3785        assert_eq!(batch.num_columns(), 3);
3786
3787        let result_col0 = batch
3788            .column(0)
3789            .as_primitive::<arrow_array::types::Int32Type>();
3790        assert_eq!(result_col0.value(0), 1);
3791        assert_eq!(result_col0.value(1), 2);
3792
3793        // New column should be NULL (doesn't exist in old file)
3794        let result_newcol = batch
3795            .column(1)
3796            .as_primitive::<arrow_array::types::Int32Type>();
3797        assert_eq!(result_newcol.null_count(), 2);
3798        assert!(result_newcol.is_null(0));
3799        assert!(result_newcol.is_null(1));
3800
3801        let result_col1 = batch
3802            .column(2)
3803            .as_primitive::<arrow_array::types::Int32Type>();
3804        assert_eq!(result_col1.value(0), 10);
3805        assert_eq!(result_col1.value(1), 20);
3806    }
3807
3808    /// Test reading Parquet files without field IDs with a filter that eliminates all row groups.
3809    /// During development of field ID mapping, we saw a panic when row_selection_enabled=true and
3810    /// all row groups are filtered out.
3811    #[tokio::test]
3812    async fn test_read_parquet_without_field_ids_filter_eliminates_all_rows() {
3813        use arrow_array::{Float64Array, Int32Array};
3814
3815        // Schema with fields that will use fallback IDs 1, 2, 3
3816        let schema = Arc::new(
3817            Schema::builder()
3818                .with_schema_id(1)
3819                .with_fields(vec![
3820                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3821                    NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3822                    NestedField::required(3, "value", Type::Primitive(PrimitiveType::Double))
3823                        .into(),
3824                ])
3825                .build()
3826                .unwrap(),
3827        );
3828
3829        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3830            Field::new("id", DataType::Int32, false),
3831            Field::new("name", DataType::Utf8, false),
3832            Field::new("value", DataType::Float64, false),
3833        ]));
3834
3835        let tmp_dir = TempDir::new().unwrap();
3836        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3837        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3838
3839        // Write data where all ids are >= 10
3840        let id_data = Arc::new(Int32Array::from(vec![10, 11, 12])) as ArrayRef;
3841        let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
3842        let value_data = Arc::new(Float64Array::from(vec![100.0, 200.0, 300.0])) as ArrayRef;
3843
3844        let to_write =
3845            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data, value_data])
3846                .unwrap();
3847
3848        let props = WriterProperties::builder()
3849            .set_compression(Compression::SNAPPY)
3850            .build();
3851
3852        let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3853        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3854        writer.write(&to_write).expect("Writing batch");
3855        writer.close().unwrap();
3856
3857        // Filter that eliminates all row groups: id < 5
3858        let predicate = Reference::new("id").less_than(Datum::int(5));
3859
3860        // Enable both row_group_filtering and row_selection - triggered the panic
3861        let reader = ArrowReaderBuilder::new(file_io)
3862            .with_row_group_filtering_enabled(true)
3863            .with_row_selection_enabled(true)
3864            .build();
3865
3866        let tasks = Box::pin(futures::stream::iter(
3867            vec![Ok(FileScanTask {
3868                start: 0,
3869                length: 0,
3870                record_count: None,
3871                data_file_path: format!("{table_location}/1.parquet"),
3872                data_file_format: DataFileFormat::Parquet,
3873                schema: schema.clone(),
3874                project_field_ids: vec![1, 2, 3],
3875                predicate: Some(predicate.bind(schema, true).unwrap()),
3876                deletes: vec![],
3877                partition: None,
3878                partition_spec: None,
3879                name_mapping: None,
3880                case_sensitive: false,
3881            })]
3882            .into_iter(),
3883        )) as FileScanTaskStream;
3884
3885        // Should no longer panic
3886        let result = reader
3887            .read(tasks)
3888            .unwrap()
3889            .try_collect::<Vec<RecordBatch>>()
3890            .await
3891            .unwrap();
3892
3893        // Should return empty results
3894        assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0));
3895    }
3896
3897    /// Test bucket partitioning reads source column from data file (not partition metadata).
3898    ///
3899    /// This is an integration test verifying the complete ArrowReader pipeline with bucket partitioning.
3900    /// It corresponds to TestRuntimeFiltering tests in Iceberg Java (e.g., testRenamedSourceColumnTable).
3901    ///
3902    /// # Iceberg Spec Requirements
3903    ///
3904    /// Per the Iceberg spec "Column Projection" section:
3905    /// > "Return the value from partition metadata if an **Identity Transform** exists for the field"
3906    ///
3907    /// This means:
3908    /// - Identity transforms (e.g., `identity(dept)`) use constants from partition metadata
3909    /// - Non-identity transforms (e.g., `bucket(4, id)`) must read source columns from data files
3910    /// - Partition metadata for bucket transforms stores bucket numbers (0-3), NOT source values
3911    ///
3912    /// Java's PartitionUtil.constantsMap() implements this via:
3913    /// ```java
3914    /// if (field.transform().isIdentity()) {
3915    ///     idToConstant.put(field.sourceId(), converted);
3916    /// }
3917    /// ```
3918    ///
3919    /// # What This Test Verifies
3920    ///
3921    /// This test ensures the full ArrowReader → RecordBatchTransformer pipeline correctly handles
3922    /// bucket partitioning when FileScanTask provides partition_spec and partition_data:
3923    ///
3924    /// - Parquet file has field_id=1 named "id" with actual data [1, 5, 9, 13]
3925    /// - FileScanTask specifies partition_spec with bucket(4, id) and partition_data with bucket=1
3926    /// - RecordBatchTransformer.constants_map() excludes bucket-partitioned field from constants
3927    /// - ArrowReader correctly reads [1, 5, 9, 13] from the data file
3928    /// - Values are NOT replaced with constant 1 from partition metadata
3929    ///
3930    /// # Why This Matters
3931    ///
3932    /// Without correct handling:
3933    /// - Runtime filtering would break (e.g., `WHERE id = 5` would fail)
3934    /// - Query results would be incorrect (all rows would have id=1)
3935    /// - Bucket partitioning would be unusable for query optimization
3936    ///
3937    /// # References
3938    /// - Iceberg spec: format/spec.md "Column Projection" + "Partition Transforms"
3939    /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java
3940    /// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
3941    #[tokio::test]
3942    async fn test_bucket_partitioning_reads_source_column_from_file() {
3943        use arrow_array::Int32Array;
3944
3945        use crate::spec::{Literal, PartitionSpec, Struct, Transform};
3946
3947        // Iceberg schema with id and name columns
3948        let schema = Arc::new(
3949            Schema::builder()
3950                .with_schema_id(0)
3951                .with_fields(vec![
3952                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3953                    NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3954                ])
3955                .build()
3956                .unwrap(),
3957        );
3958
3959        // Partition spec: bucket(4, id)
3960        let partition_spec = Arc::new(
3961            PartitionSpec::builder(schema.clone())
3962                .with_spec_id(0)
3963                .add_partition_field("id", "id_bucket", Transform::Bucket(4))
3964                .unwrap()
3965                .build()
3966                .unwrap(),
3967        );
3968
3969        // Partition data: bucket value is 1
3970        let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]);
3971
3972        // Create Arrow schema with field IDs for Parquet file
3973        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3974            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3975                PARQUET_FIELD_ID_META_KEY.to_string(),
3976                "1".to_string(),
3977            )])),
3978            Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
3979                PARQUET_FIELD_ID_META_KEY.to_string(),
3980                "2".to_string(),
3981            )])),
3982        ]));
3983
3984        // Write Parquet file with data
3985        let tmp_dir = TempDir::new().unwrap();
3986        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3987        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3988
3989        let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as ArrayRef;
3990        let name_data =
3991            Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef;
3992
3993        let to_write =
3994            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data]).unwrap();
3995
3996        let props = WriterProperties::builder()
3997            .set_compression(Compression::SNAPPY)
3998            .build();
3999        let file = File::create(format!("{}/data.parquet", &table_location)).unwrap();
4000        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
4001        writer.write(&to_write).expect("Writing batch");
4002        writer.close().unwrap();
4003
4004        // Read the Parquet file with partition spec and data
4005        let reader = ArrowReaderBuilder::new(file_io).build();
4006        let tasks = Box::pin(futures::stream::iter(
4007            vec![Ok(FileScanTask {
4008                start: 0,
4009                length: 0,
4010                record_count: None,
4011                data_file_path: format!("{table_location}/data.parquet"),
4012                data_file_format: DataFileFormat::Parquet,
4013                schema: schema.clone(),
4014                project_field_ids: vec![1, 2],
4015                predicate: None,
4016                deletes: vec![],
4017                partition: Some(partition_data),
4018                partition_spec: Some(partition_spec),
4019                name_mapping: None,
4020                case_sensitive: false,
4021            })]
4022            .into_iter(),
4023        )) as FileScanTaskStream;
4024
4025        let result = reader
4026            .read(tasks)
4027            .unwrap()
4028            .try_collect::<Vec<RecordBatch>>()
4029            .await
4030            .unwrap();
4031
4032        // Verify we got the correct data
4033        assert_eq!(result.len(), 1);
4034        let batch = &result[0];
4035
4036        assert_eq!(batch.num_columns(), 2);
4037        assert_eq!(batch.num_rows(), 4);
4038
4039        // The id column MUST contain actual values from the Parquet file [1, 5, 9, 13],
4040        // NOT the constant partition value 1
4041        let id_col = batch
4042            .column(0)
4043            .as_primitive::<arrow_array::types::Int32Type>();
4044        assert_eq!(id_col.value(0), 1);
4045        assert_eq!(id_col.value(1), 5);
4046        assert_eq!(id_col.value(2), 9);
4047        assert_eq!(id_col.value(3), 13);
4048
4049        let name_col = batch.column(1).as_string::<i32>();
4050        assert_eq!(name_col.value(0), "Alice");
4051        assert_eq!(name_col.value(1), "Bob");
4052        assert_eq!(name_col.value(2), "Charlie");
4053        assert_eq!(name_col.value(3), "Dave");
4054    }
4055}