iceberg/arrow/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Parquet file data reader
19
20use std::collections::{HashMap, HashSet};
21use std::ops::Range;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene};
26use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar};
27use arrow_cast::cast::cast;
28use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
29use arrow_schema::{
30    ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
31};
32use arrow_string::like::starts_with;
33use bytes::Bytes;
34use fnv::FnvHashSet;
35use futures::future::BoxFuture;
36use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join};
37use parquet::arrow::arrow_reader::{
38    ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
39};
40use parquet::arrow::async_reader::AsyncFileReader;
41use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask};
42use parquet::file::metadata::{
43    PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
44};
45use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
46
47use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
48use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
49use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
50use crate::delete_vector::DeleteVector;
51use crate::error::Result;
52use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit};
53use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
54use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
55use crate::expr::{BoundPredicate, BoundReference};
56use crate::io::{FileIO, FileMetadata, FileRead};
57use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
58use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type};
59use crate::utils::available_parallelism;
60use crate::{Error, ErrorKind};
61
62/// Builder to create ArrowReader
63pub struct ArrowReaderBuilder {
64    batch_size: Option<usize>,
65    file_io: FileIO,
66    concurrency_limit_data_files: usize,
67    row_group_filtering_enabled: bool,
68    row_selection_enabled: bool,
69}
70
71impl ArrowReaderBuilder {
72    /// Create a new ArrowReaderBuilder
73    pub fn new(file_io: FileIO) -> Self {
74        let num_cpus = available_parallelism().get();
75
76        ArrowReaderBuilder {
77            batch_size: None,
78            file_io,
79            concurrency_limit_data_files: num_cpus,
80            row_group_filtering_enabled: true,
81            row_selection_enabled: false,
82        }
83    }
84
85    /// Sets the max number of in flight data files that are being fetched
86    pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
87        self.concurrency_limit_data_files = val;
88        self
89    }
90
91    /// Sets the desired size of batches in the response
92    /// to something other than the default
93    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
94        self.batch_size = Some(batch_size);
95        self
96    }
97
98    /// Determines whether to enable row group filtering.
99    pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
100        self.row_group_filtering_enabled = row_group_filtering_enabled;
101        self
102    }
103
104    /// Determines whether to enable row selection.
105    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
106        self.row_selection_enabled = row_selection_enabled;
107        self
108    }
109
110    /// Build the ArrowReader.
111    pub fn build(self) -> ArrowReader {
112        ArrowReader {
113            batch_size: self.batch_size,
114            file_io: self.file_io.clone(),
115            delete_file_loader: CachingDeleteFileLoader::new(
116                self.file_io.clone(),
117                self.concurrency_limit_data_files,
118            ),
119            concurrency_limit_data_files: self.concurrency_limit_data_files,
120            row_group_filtering_enabled: self.row_group_filtering_enabled,
121            row_selection_enabled: self.row_selection_enabled,
122        }
123    }
124}
125
126/// Reads data from Parquet files
127#[derive(Clone)]
128pub struct ArrowReader {
129    batch_size: Option<usize>,
130    file_io: FileIO,
131    delete_file_loader: CachingDeleteFileLoader,
132
133    /// the maximum number of data files that can be fetched at the same time
134    concurrency_limit_data_files: usize,
135
136    row_group_filtering_enabled: bool,
137    row_selection_enabled: bool,
138}
139
140impl ArrowReader {
141    /// Take a stream of FileScanTasks and reads all the files.
142    /// Returns a stream of Arrow RecordBatches containing the data from the files
143    pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
144        let file_io = self.file_io.clone();
145        let batch_size = self.batch_size;
146        let concurrency_limit_data_files = self.concurrency_limit_data_files;
147        let row_group_filtering_enabled = self.row_group_filtering_enabled;
148        let row_selection_enabled = self.row_selection_enabled;
149
150        let stream = tasks
151            .map_ok(move |task| {
152                let file_io = file_io.clone();
153
154                Self::process_file_scan_task(
155                    task,
156                    batch_size,
157                    file_io,
158                    self.delete_file_loader.clone(),
159                    row_group_filtering_enabled,
160                    row_selection_enabled,
161                )
162            })
163            .map_err(|err| {
164                Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err)
165            })
166            .try_buffer_unordered(concurrency_limit_data_files)
167            .try_flatten_unordered(concurrency_limit_data_files);
168
169        Ok(Box::pin(stream) as ArrowRecordBatchStream)
170    }
171
172    #[allow(clippy::too_many_arguments)]
173    async fn process_file_scan_task(
174        task: FileScanTask,
175        batch_size: Option<usize>,
176        file_io: FileIO,
177        delete_file_loader: CachingDeleteFileLoader,
178        row_group_filtering_enabled: bool,
179        row_selection_enabled: bool,
180    ) -> Result<ArrowRecordBatchStream> {
181        let should_load_page_index =
182            (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
183
184        let delete_filter_rx =
185            delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema));
186
187        // Migrated tables lack field IDs, requiring us to inspect the schema to choose
188        // between field-ID-based or position-based projection
189        let initial_stream_builder = Self::create_parquet_record_batch_stream_builder(
190            &task.data_file_path,
191            file_io.clone(),
192            should_load_page_index,
193            None,
194        )
195        .await?;
196
197        // Check if Parquet file has embedded field IDs
198        // Corresponds to Java's ParquetSchemaUtil.hasIds()
199        // Reference: parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java:118
200        let missing_field_ids = initial_stream_builder
201            .schema()
202            .fields()
203            .iter()
204            .next()
205            .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
206
207        // Three-branch schema resolution strategy matching Java's ReadConf constructor
208        //
209        // Per Iceberg spec Column Projection rules:
210        // "Columns in Iceberg data files are selected by field id. The table schema's column
211        //  names and order may change after a data file is written, and projection must be done
212        //  using field ids."
213        // https://iceberg.apache.org/spec/#column-projection
214        //
215        // When Parquet files lack field IDs (e.g., Hive/Spark migrations via add_files),
216        // we must assign field IDs BEFORE reading data to enable correct projection.
217        //
218        // Java's ReadConf determines field ID strategy:
219        // - Branch 1: hasIds(fileSchema) → trust embedded field IDs, use pruneColumns()
220        // - Branch 2: nameMapping present → applyNameMapping(), then pruneColumns()
221        // - Branch 3: fallback → addFallbackIds(), then pruneColumnsFallback()
222        let mut record_batch_stream_builder = if missing_field_ids {
223            // Parquet file lacks field IDs - must assign them before reading
224            let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
225                // Branch 2: Apply name mapping to assign correct Iceberg field IDs
226                // Per spec rule #2: "Use schema.name-mapping.default metadata to map field id
227                // to columns without field id"
228                // Corresponds to Java's ParquetSchemaUtil.applyNameMapping()
229                apply_name_mapping_to_arrow_schema(
230                    Arc::clone(initial_stream_builder.schema()),
231                    name_mapping,
232                )?
233            } else {
234                // Branch 3: No name mapping - use position-based fallback IDs
235                // Corresponds to Java's ParquetSchemaUtil.addFallbackIds()
236                add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema())
237            };
238
239            let options = ArrowReaderOptions::new().with_schema(arrow_schema);
240
241            Self::create_parquet_record_batch_stream_builder(
242                &task.data_file_path,
243                file_io.clone(),
244                should_load_page_index,
245                Some(options),
246            )
247            .await?
248        } else {
249            // Branch 1: File has embedded field IDs - trust them
250            initial_stream_builder
251        };
252
253        // Create projection mask based on field IDs
254        // - If file has embedded IDs: field-ID-based projection (missing_field_ids=false)
255        // - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match)
256        // - If fallback IDs: position-based projection (missing_field_ids=true)
257        let projection_mask = Self::get_arrow_projection_mask(
258            &task.project_field_ids,
259            &task.schema,
260            record_batch_stream_builder.parquet_schema(),
261            record_batch_stream_builder.schema(),
262            missing_field_ids, // Whether to use position-based (true) or field-ID-based (false) projection
263        )?;
264
265        record_batch_stream_builder =
266            record_batch_stream_builder.with_projection(projection_mask.clone());
267
268        // RecordBatchTransformer performs any transformations required on the RecordBatches
269        // that come back from the file, such as type promotion, default column insertion
270        // and column re-ordering.
271        let mut record_batch_transformer_builder =
272            RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
273
274        if let (Some(partition_spec), Some(partition_data)) =
275            (task.partition_spec.clone(), task.partition.clone())
276        {
277            record_batch_transformer_builder =
278                record_batch_transformer_builder.with_partition(partition_spec, partition_data);
279        }
280
281        let mut record_batch_transformer = record_batch_transformer_builder.build();
282
283        if let Some(batch_size) = batch_size {
284            record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
285        }
286
287        let delete_filter = delete_filter_rx.await.unwrap()?;
288        let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
289
290        // In addition to the optional predicate supplied in the `FileScanTask`,
291        // we also have an optional predicate resulting from equality delete files.
292        // If both are present, we logical-AND them together to form a single filter
293        // predicate that we can pass to the `RecordBatchStreamBuilder`.
294        let final_predicate = match (&task.predicate, delete_predicate) {
295            (None, None) => None,
296            (Some(predicate), None) => Some(predicate.clone()),
297            (None, Some(ref predicate)) => Some(predicate.clone()),
298            (Some(filter_predicate), Some(delete_predicate)) => {
299                Some(filter_predicate.clone().and(delete_predicate))
300            }
301        };
302
303        // There are three possible sources for potential lists of selected RowGroup indices,
304        // and two for `RowSelection`s.
305        // Selected RowGroup index lists can come from three sources:
306        //   * When task.start and task.length specify a byte range (file splitting);
307        //   * When there are equality delete files that are applicable;
308        //   * When there is a scan predicate and row_group_filtering_enabled = true.
309        // `RowSelection`s can be created in either or both of the following cases:
310        //   * When there are positional delete files that are applicable;
311        //   * When there is a scan predicate and row_selection_enabled = true
312        // Note that row group filtering from predicates only happens when
313        // there is a scan predicate AND row_group_filtering_enabled = true,
314        // but we perform row selection filtering if there are applicable
315        // equality delete files OR (there is a scan predicate AND row_selection_enabled),
316        // since the only implemented method of applying positional deletes is
317        // by using a `RowSelection`.
318        let mut selected_row_group_indices = None;
319        let mut row_selection = None;
320
321        // Filter row groups based on byte range from task.start and task.length.
322        // If both start and length are 0, read the entire file (backwards compatibility).
323        if task.start != 0 || task.length != 0 {
324            let byte_range_filtered_row_groups = Self::filter_row_groups_by_byte_range(
325                record_batch_stream_builder.metadata(),
326                task.start,
327                task.length,
328            )?;
329            selected_row_group_indices = Some(byte_range_filtered_row_groups);
330        }
331
332        if let Some(predicate) = final_predicate {
333            let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
334                record_batch_stream_builder.parquet_schema(),
335                &predicate,
336            )?;
337
338            let row_filter = Self::get_row_filter(
339                &predicate,
340                record_batch_stream_builder.parquet_schema(),
341                &iceberg_field_ids,
342                &field_id_map,
343            )?;
344            record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
345
346            if row_group_filtering_enabled {
347                let predicate_filtered_row_groups = Self::get_selected_row_group_indices(
348                    &predicate,
349                    record_batch_stream_builder.metadata(),
350                    &field_id_map,
351                    &task.schema,
352                )?;
353
354                // Merge predicate-based filtering with byte range filtering (if present)
355                // by taking the intersection of both filters
356                selected_row_group_indices = match selected_row_group_indices {
357                    Some(byte_range_filtered) => {
358                        // Keep only row groups that are in both filters
359                        let intersection: Vec<usize> = byte_range_filtered
360                            .into_iter()
361                            .filter(|idx| predicate_filtered_row_groups.contains(idx))
362                            .collect();
363                        Some(intersection)
364                    }
365                    None => Some(predicate_filtered_row_groups),
366                };
367            }
368
369            if row_selection_enabled {
370                row_selection = Some(Self::get_row_selection_for_filter_predicate(
371                    &predicate,
372                    record_batch_stream_builder.metadata(),
373                    &selected_row_group_indices,
374                    &field_id_map,
375                    &task.schema,
376                )?);
377            }
378        }
379
380        let positional_delete_indexes = delete_filter.get_delete_vector(&task);
381
382        if let Some(positional_delete_indexes) = positional_delete_indexes {
383            let delete_row_selection = {
384                let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
385
386                Self::build_deletes_row_selection(
387                    record_batch_stream_builder.metadata().row_groups(),
388                    &selected_row_group_indices,
389                    &positional_delete_indexes,
390                )
391            }?;
392
393            // merge the row selection from the delete files with the row selection
394            // from the filter predicate, if there is one from the filter predicate
395            row_selection = match row_selection {
396                None => Some(delete_row_selection),
397                Some(filter_row_selection) => {
398                    Some(filter_row_selection.intersection(&delete_row_selection))
399                }
400            };
401        }
402
403        if let Some(row_selection) = row_selection {
404            record_batch_stream_builder =
405                record_batch_stream_builder.with_row_selection(row_selection);
406        }
407
408        if let Some(selected_row_group_indices) = selected_row_group_indices {
409            record_batch_stream_builder =
410                record_batch_stream_builder.with_row_groups(selected_row_group_indices);
411        }
412
413        // Build the batch stream and send all the RecordBatches that it generates
414        // to the requester.
415        let record_batch_stream =
416            record_batch_stream_builder
417                .build()?
418                .map(move |batch| match batch {
419                    Ok(batch) => record_batch_transformer.process_record_batch(batch),
420                    Err(err) => Err(err.into()),
421                });
422
423        Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
424    }
425
426    pub(crate) async fn create_parquet_record_batch_stream_builder(
427        data_file_path: &str,
428        file_io: FileIO,
429        should_load_page_index: bool,
430        arrow_reader_options: Option<ArrowReaderOptions>,
431    ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader<impl FileRead + Sized>>> {
432        // Get the metadata for the Parquet file we need to read and build
433        // a reader for the data within
434        let parquet_file = file_io.new_input(data_file_path)?;
435        let (parquet_metadata, parquet_reader) =
436            try_join!(parquet_file.metadata(), parquet_file.reader())?;
437        let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader)
438            .with_preload_column_index(true)
439            .with_preload_offset_index(true)
440            .with_preload_page_index(should_load_page_index);
441
442        // Create the record batch stream builder, which wraps the parquet file reader
443        let options = arrow_reader_options.unwrap_or_default();
444        let record_batch_stream_builder =
445            ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, options).await?;
446        Ok(record_batch_stream_builder)
447    }
448
449    /// computes a `RowSelection` from positional delete indices.
450    ///
451    /// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated
452    /// as having been deleted by a positional delete, taking into account any row groups that have
453    /// been skipped entirely by the filter predicate
454    fn build_deletes_row_selection(
455        row_group_metadata_list: &[RowGroupMetaData],
456        selected_row_groups: &Option<Vec<usize>>,
457        positional_deletes: &DeleteVector,
458    ) -> Result<RowSelection> {
459        let mut results: Vec<RowSelector> = Vec::new();
460        let mut selected_row_groups_idx = 0;
461        let mut current_row_group_base_idx: u64 = 0;
462        let mut delete_vector_iter = positional_deletes.iter();
463        let mut next_deleted_row_idx_opt = delete_vector_iter.next();
464
465        for (idx, row_group_metadata) in row_group_metadata_list.iter().enumerate() {
466            let row_group_num_rows = row_group_metadata.num_rows() as u64;
467            let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;
468
469            // if row group selection is enabled,
470            if let Some(selected_row_groups) = selected_row_groups {
471                // if we've consumed all the selected row groups, we're done
472                if selected_row_groups_idx == selected_row_groups.len() {
473                    break;
474                }
475
476                if idx == selected_row_groups[selected_row_groups_idx] {
477                    // we're in a selected row group. Increment selected_row_groups_idx
478                    // so that next time around the for loop we're looking for the next
479                    // selected row group
480                    selected_row_groups_idx += 1;
481                } else {
482                    // Advance iterator past all deletes in the skipped row group.
483                    // advance_to() positions the iterator to the first delete >= next_row_group_base_idx.
484                    // However, if our cached next_deleted_row_idx_opt is in the skipped range,
485                    // we need to call next() to update the cache with the newly positioned value.
486                    delete_vector_iter.advance_to(next_row_group_base_idx);
487                    // Only update the cache if the cached value is stale (in the skipped range)
488                    if let Some(cached_idx) = next_deleted_row_idx_opt {
489                        if cached_idx < next_row_group_base_idx {
490                            next_deleted_row_idx_opt = delete_vector_iter.next();
491                        }
492                    }
493
494                    // still increment the current page base index but then skip to the next row group
495                    // in the file
496                    current_row_group_base_idx += row_group_num_rows;
497                    continue;
498                }
499            }
500
501            let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
502                Some(next_deleted_row_idx) => {
503                    // if the index of the next deleted row is beyond this row group, add a selection for
504                    // the remainder of this row group and skip to the next row group
505                    if next_deleted_row_idx >= next_row_group_base_idx {
506                        results.push(RowSelector::select(row_group_num_rows as usize));
507                        current_row_group_base_idx += row_group_num_rows;
508                        continue;
509                    }
510
511                    next_deleted_row_idx
512                }
513
514                // If there are no more pos deletes, add a selector for the entirety of this row group.
515                _ => {
516                    results.push(RowSelector::select(row_group_num_rows as usize));
517                    current_row_group_base_idx += row_group_num_rows;
518                    continue;
519                }
520            };
521
522            let mut current_idx = current_row_group_base_idx;
523            'chunks: while next_deleted_row_idx < next_row_group_base_idx {
524                // `select` all rows that precede the next delete index
525                if current_idx < next_deleted_row_idx {
526                    let run_length = next_deleted_row_idx - current_idx;
527                    results.push(RowSelector::select(run_length as usize));
528                    current_idx += run_length;
529                }
530
531                // `skip` all consecutive deleted rows in the current row group
532                let mut run_length = 0;
533                while next_deleted_row_idx == current_idx
534                    && next_deleted_row_idx < next_row_group_base_idx
535                {
536                    run_length += 1;
537                    current_idx += 1;
538
539                    next_deleted_row_idx_opt = delete_vector_iter.next();
540                    next_deleted_row_idx = match next_deleted_row_idx_opt {
541                        Some(next_deleted_row_idx) => next_deleted_row_idx,
542                        _ => {
543                            // We've processed the final positional delete.
544                            // Conclude the skip and then break so that we select the remaining
545                            // rows in the row group and move on to the next row group
546                            results.push(RowSelector::skip(run_length));
547                            break 'chunks;
548                        }
549                    };
550                }
551                if run_length > 0 {
552                    results.push(RowSelector::skip(run_length));
553                }
554            }
555
556            if current_idx < next_row_group_base_idx {
557                results.push(RowSelector::select(
558                    (next_row_group_base_idx - current_idx) as usize,
559                ));
560            }
561
562            current_row_group_base_idx += row_group_num_rows;
563        }
564
565        Ok(results.into())
566    }
567
568    fn build_field_id_set_and_map(
569        parquet_schema: &SchemaDescriptor,
570        predicate: &BoundPredicate,
571    ) -> Result<(HashSet<i32>, HashMap<i32, usize>)> {
572        // Collects all Iceberg field IDs referenced in the filter predicate
573        let mut collector = CollectFieldIdVisitor {
574            field_ids: HashSet::default(),
575        };
576        visit(&mut collector, predicate)?;
577
578        let iceberg_field_ids = collector.field_ids();
579
580        // Without embedded field IDs, we fall back to position-based mapping for compatibility
581        let field_id_map = match build_field_id_map(parquet_schema)? {
582            Some(map) => map,
583            None => build_fallback_field_id_map(parquet_schema),
584        };
585
586        Ok((iceberg_field_ids, field_id_map))
587    }
588
589    /// Recursively extract leaf field IDs because Parquet projection works at the leaf column level.
590    /// Nested types (struct/list/map) are flattened in Parquet's columnar format.
591    fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
592        match field.field_type.as_ref() {
593            Type::Primitive(_) => {
594                field_ids.push(field.id);
595            }
596            Type::Struct(struct_type) => {
597                for nested_field in struct_type.fields() {
598                    Self::include_leaf_field_id(nested_field, field_ids);
599                }
600            }
601            Type::List(list_type) => {
602                Self::include_leaf_field_id(&list_type.element_field, field_ids);
603            }
604            Type::Map(map_type) => {
605                Self::include_leaf_field_id(&map_type.key_field, field_ids);
606                Self::include_leaf_field_id(&map_type.value_field, field_ids);
607            }
608        }
609    }
610
611    fn get_arrow_projection_mask(
612        field_ids: &[i32],
613        iceberg_schema_of_task: &Schema,
614        parquet_schema: &SchemaDescriptor,
615        arrow_schema: &ArrowSchemaRef,
616        use_fallback: bool, // Whether file lacks embedded field IDs (e.g., migrated from Hive/Spark)
617    ) -> Result<ProjectionMask> {
618        fn type_promotion_is_valid(
619            file_type: Option<&PrimitiveType>,
620            projected_type: Option<&PrimitiveType>,
621        ) -> bool {
622            match (file_type, projected_type) {
623                (Some(lhs), Some(rhs)) if lhs == rhs => true,
624                (Some(PrimitiveType::Int), Some(PrimitiveType::Long)) => true,
625                (Some(PrimitiveType::Float), Some(PrimitiveType::Double)) => true,
626                (
627                    Some(PrimitiveType::Decimal {
628                        precision: file_precision,
629                        scale: file_scale,
630                    }),
631                    Some(PrimitiveType::Decimal {
632                        precision: requested_precision,
633                        scale: requested_scale,
634                    }),
635                ) if requested_precision >= file_precision && file_scale == requested_scale => true,
636                // Uuid will be store as Fixed(16) in parquet file, so the read back type will be Fixed(16).
637                (Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true,
638                _ => false,
639            }
640        }
641
642        if field_ids.is_empty() {
643            return Ok(ProjectionMask::all());
644        }
645
646        if use_fallback {
647            // Position-based projection necessary because file lacks embedded field IDs
648            Self::get_arrow_projection_mask_fallback(field_ids, parquet_schema)
649        } else {
650            // Field-ID-based projection using embedded field IDs from Parquet metadata
651
652            // Parquet's columnar format requires leaf-level (not top-level struct/list/map) projection
653            let mut leaf_field_ids = vec![];
654            for field_id in field_ids {
655                let field = iceberg_schema_of_task.field_by_id(*field_id);
656                if let Some(field) = field {
657                    Self::include_leaf_field_id(field, &mut leaf_field_ids);
658                }
659            }
660
661            Self::get_arrow_projection_mask_with_field_ids(
662                &leaf_field_ids,
663                iceberg_schema_of_task,
664                parquet_schema,
665                arrow_schema,
666                type_promotion_is_valid,
667            )
668        }
669    }
670
671    /// Standard projection using embedded field IDs from Parquet metadata.
672    /// For iceberg-java compatibility with ParquetSchemaUtil.pruneColumns().
673    fn get_arrow_projection_mask_with_field_ids(
674        leaf_field_ids: &[i32],
675        iceberg_schema_of_task: &Schema,
676        parquet_schema: &SchemaDescriptor,
677        arrow_schema: &ArrowSchemaRef,
678        type_promotion_is_valid: fn(Option<&PrimitiveType>, Option<&PrimitiveType>) -> bool,
679    ) -> Result<ProjectionMask> {
680        let mut column_map = HashMap::new();
681        let fields = arrow_schema.fields();
682
683        // Pre-project only the fields that have been selected, possibly avoiding converting
684        // some Arrow types that are not yet supported.
685        let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
686        let projected_arrow_schema = ArrowSchema::new_with_metadata(
687            fields.filter_leaves(|_, f| {
688                f.metadata()
689                    .get(PARQUET_FIELD_ID_META_KEY)
690                    .and_then(|field_id| i32::from_str(field_id).ok())
691                    .is_some_and(|field_id| {
692                        projected_fields.insert((*f).clone(), field_id);
693                        leaf_field_ids.contains(&field_id)
694                    })
695            }),
696            arrow_schema.metadata().clone(),
697        );
698        let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
699
700        fields.filter_leaves(|idx, field| {
701            let Some(field_id) = projected_fields.get(field).cloned() else {
702                return false;
703            };
704
705            let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
706            let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
707
708            if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
709                return false;
710            }
711
712            if !type_promotion_is_valid(
713                parquet_iceberg_field
714                    .unwrap()
715                    .field_type
716                    .as_primitive_type(),
717                iceberg_field.unwrap().field_type.as_primitive_type(),
718            ) {
719                return false;
720            }
721
722            column_map.insert(field_id, idx);
723            true
724        });
725
726        // Schema evolution: New columns may not exist in old Parquet files.
727        // We only project existing columns; RecordBatchTransformer adds default/NULL values.
728        let mut indices = vec![];
729        for field_id in leaf_field_ids {
730            if let Some(col_idx) = column_map.get(field_id) {
731                indices.push(*col_idx);
732            }
733        }
734
735        if indices.is_empty() {
736            // Edge case: All requested columns are new (don't exist in file).
737            // Project all columns so RecordBatchTransformer has a batch to transform.
738            Ok(ProjectionMask::all())
739        } else {
740            Ok(ProjectionMask::leaves(parquet_schema, indices))
741        }
742    }
743
744    /// Fallback projection for Parquet files without field IDs.
745    /// Uses position-based matching: field ID N → column position N-1.
746    /// Projects entire top-level columns (including nested content) for iceberg-java compatibility.
747    fn get_arrow_projection_mask_fallback(
748        field_ids: &[i32],
749        parquet_schema: &SchemaDescriptor,
750    ) -> Result<ProjectionMask> {
751        // Position-based: field_id N → column N-1 (field IDs are 1-indexed)
752        let parquet_root_fields = parquet_schema.root_schema().get_fields();
753        let mut root_indices = vec![];
754
755        for field_id in field_ids.iter() {
756            let parquet_pos = (*field_id - 1) as usize;
757
758            if parquet_pos < parquet_root_fields.len() {
759                root_indices.push(parquet_pos);
760            }
761            // RecordBatchTransformer adds missing columns with NULL values
762        }
763
764        if root_indices.is_empty() {
765            Ok(ProjectionMask::all())
766        } else {
767            Ok(ProjectionMask::roots(parquet_schema, root_indices))
768        }
769    }
770
771    fn get_row_filter(
772        predicates: &BoundPredicate,
773        parquet_schema: &SchemaDescriptor,
774        iceberg_field_ids: &HashSet<i32>,
775        field_id_map: &HashMap<i32, usize>,
776    ) -> Result<RowFilter> {
777        // Collect Parquet column indices from field ids.
778        // If the field id is not found in Parquet schema, it will be ignored due to schema evolution.
779        let mut column_indices = iceberg_field_ids
780            .iter()
781            .filter_map(|field_id| field_id_map.get(field_id).cloned())
782            .collect::<Vec<_>>();
783        column_indices.sort();
784
785        // The converter that converts `BoundPredicates` to `ArrowPredicates`
786        let mut converter = PredicateConverter {
787            parquet_schema,
788            column_map: field_id_map,
789            column_indices: &column_indices,
790        };
791
792        // After collecting required leaf column indices used in the predicate,
793        // creates the projection mask for the Arrow predicates.
794        let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
795        let predicate_func = visit(&mut converter, predicates)?;
796        let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
797        Ok(RowFilter::new(vec![Box::new(arrow_predicate)]))
798    }
799
800    fn get_selected_row_group_indices(
801        predicate: &BoundPredicate,
802        parquet_metadata: &Arc<ParquetMetaData>,
803        field_id_map: &HashMap<i32, usize>,
804        snapshot_schema: &Schema,
805    ) -> Result<Vec<usize>> {
806        let row_groups_metadata = parquet_metadata.row_groups();
807        let mut results = Vec::with_capacity(row_groups_metadata.len());
808
809        for (idx, row_group_metadata) in row_groups_metadata.iter().enumerate() {
810            if RowGroupMetricsEvaluator::eval(
811                predicate,
812                row_group_metadata,
813                field_id_map,
814                snapshot_schema,
815            )? {
816                results.push(idx);
817            }
818        }
819
820        Ok(results)
821    }
822
823    fn get_row_selection_for_filter_predicate(
824        predicate: &BoundPredicate,
825        parquet_metadata: &Arc<ParquetMetaData>,
826        selected_row_groups: &Option<Vec<usize>>,
827        field_id_map: &HashMap<i32, usize>,
828        snapshot_schema: &Schema,
829    ) -> Result<RowSelection> {
830        let Some(column_index) = parquet_metadata.column_index() else {
831            return Err(Error::new(
832                ErrorKind::Unexpected,
833                "Parquet file metadata does not contain a column index",
834            ));
835        };
836
837        let Some(offset_index) = parquet_metadata.offset_index() else {
838            return Err(Error::new(
839                ErrorKind::Unexpected,
840                "Parquet file metadata does not contain an offset index",
841            ));
842        };
843
844        // If all row groups were filtered out, return an empty RowSelection (select no rows)
845        if let Some(selected_row_groups) = selected_row_groups {
846            if selected_row_groups.is_empty() {
847                return Ok(RowSelection::from(Vec::new()));
848            }
849        }
850
851        let mut selected_row_groups_idx = 0;
852
853        let page_index = column_index
854            .iter()
855            .enumerate()
856            .zip(offset_index)
857            .zip(parquet_metadata.row_groups());
858
859        let mut results = Vec::new();
860        for (((idx, column_index), offset_index), row_group_metadata) in page_index {
861            if let Some(selected_row_groups) = selected_row_groups {
862                // skip row groups that aren't present in selected_row_groups
863                if idx == selected_row_groups[selected_row_groups_idx] {
864                    selected_row_groups_idx += 1;
865                } else {
866                    continue;
867                }
868            }
869
870            let selections_for_page = PageIndexEvaluator::eval(
871                predicate,
872                column_index,
873                offset_index,
874                row_group_metadata,
875                field_id_map,
876                snapshot_schema,
877            )?;
878
879            results.push(selections_for_page);
880
881            if let Some(selected_row_groups) = selected_row_groups {
882                if selected_row_groups_idx == selected_row_groups.len() {
883                    break;
884                }
885            }
886        }
887
888        Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
889    }
890
891    /// Filters row groups by byte range to support Iceberg's file splitting.
892    ///
893    /// Iceberg splits large files at row group boundaries, so we only read row groups
894    /// whose byte ranges overlap with [start, start+length).
895    fn filter_row_groups_by_byte_range(
896        parquet_metadata: &Arc<ParquetMetaData>,
897        start: u64,
898        length: u64,
899    ) -> Result<Vec<usize>> {
900        let row_groups = parquet_metadata.row_groups();
901        let mut selected = Vec::new();
902        let end = start + length;
903
904        // Row groups are stored sequentially after the 4-byte magic header.
905        let mut current_byte_offset = 4u64;
906
907        for (idx, row_group) in row_groups.iter().enumerate() {
908            let row_group_size = row_group.compressed_size() as u64;
909            let row_group_end = current_byte_offset + row_group_size;
910
911            if current_byte_offset < end && start < row_group_end {
912                selected.push(idx);
913            }
914
915            current_byte_offset = row_group_end;
916        }
917
918        Ok(selected)
919    }
920}
921
922/// Build the map of parquet field id to Parquet column index in the schema.
923/// Returns None if the Parquet file doesn't have field IDs embedded (e.g., migrated tables).
924fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<Option<HashMap<i32, usize>>> {
925    let mut column_map = HashMap::new();
926
927    for (idx, field) in parquet_schema.columns().iter().enumerate() {
928        let field_type = field.self_type();
929        match field_type {
930            ParquetType::PrimitiveType { basic_info, .. } => {
931                if !basic_info.has_id() {
932                    return Ok(None);
933                }
934                column_map.insert(basic_info.id(), idx);
935            }
936            ParquetType::GroupType { .. } => {
937                return Err(Error::new(
938                    ErrorKind::DataInvalid,
939                    format!(
940                        "Leave column in schema should be primitive type but got {field_type:?}"
941                    ),
942                ));
943            }
944        };
945    }
946
947    Ok(Some(column_map))
948}
949
950/// Build a fallback field ID map for Parquet files without embedded field IDs.
951/// Position-based (1, 2, 3, ...) for compatibility with iceberg-java migrations.
952fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap<i32, usize> {
953    let mut column_map = HashMap::new();
954
955    // 1-indexed to match iceberg-java's convention
956    for (idx, _field) in parquet_schema.columns().iter().enumerate() {
957        let field_id = (idx + 1) as i32;
958        column_map.insert(field_id, idx);
959    }
960
961    column_map
962}
963
964/// Apply name mapping to Arrow schema for Parquet files lacking field IDs.
965///
966/// Assigns Iceberg field IDs based on column names using the name mapping,
967/// enabling correct projection on migrated files (e.g., from Hive/Spark via add_files).
968///
969/// Per Iceberg spec Column Projection rule #2:
970/// "Use schema.name-mapping.default metadata to map field id to columns without field id"
971/// https://iceberg.apache.org/spec/#column-projection
972///
973/// Corresponds to Java's ParquetSchemaUtil.applyNameMapping() and ApplyNameMapping visitor.
974/// The key difference is Java operates on Parquet MessageType, while we operate on Arrow Schema.
975///
976/// # Arguments
977/// * `arrow_schema` - Arrow schema from Parquet file (without field IDs)
978/// * `name_mapping` - Name mapping from table metadata (TableProperties.DEFAULT_NAME_MAPPING)
979///
980/// # Returns
981/// Arrow schema with field IDs assigned based on name mapping
982fn apply_name_mapping_to_arrow_schema(
983    arrow_schema: ArrowSchemaRef,
984    name_mapping: &NameMapping,
985) -> Result<Arc<ArrowSchema>> {
986    debug_assert!(
987        arrow_schema
988            .fields()
989            .iter()
990            .next()
991            .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
992        "Schema already has field IDs - name mapping should not be applied"
993    );
994
995    use arrow_schema::Field;
996
997    let fields_with_mapped_ids: Vec<_> = arrow_schema
998        .fields()
999        .iter()
1000        .map(|field| {
1001            // Look up this column name in name mapping to get the Iceberg field ID.
1002            // Corresponds to Java's ApplyNameMapping visitor which calls
1003            // nameMapping.find(currentPath()) and returns field.withId() if found.
1004            //
1005            // If the field isn't in the mapping, leave it WITHOUT assigning an ID
1006            // (matching Java's behavior of returning the field unchanged).
1007            // Later, during projection, fields without IDs are filtered out.
1008            let mapped_field_opt = name_mapping
1009                .fields()
1010                .iter()
1011                .find(|f| f.names().contains(&field.name().to_string()));
1012
1013            let mut metadata = field.metadata().clone();
1014
1015            if let Some(mapped_field) = mapped_field_opt {
1016                if let Some(field_id) = mapped_field.field_id() {
1017                    // Field found in mapping with a field_id → assign it
1018                    metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1019                }
1020                // If field_id is None, leave the field without an ID (will be filtered by projection)
1021            }
1022            // If field not found in mapping, leave it without an ID (will be filtered by projection)
1023
1024            Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1025                .with_metadata(metadata)
1026        })
1027        .collect();
1028
1029    Ok(Arc::new(ArrowSchema::new_with_metadata(
1030        fields_with_mapped_ids,
1031        arrow_schema.metadata().clone(),
1032    )))
1033}
1034
1035/// Add position-based fallback field IDs to Arrow schema for Parquet files lacking them.
1036/// Enables projection on migrated files (e.g., from Hive/Spark).
1037///
1038/// Why at schema level (not per-batch): Efficiency - avoids repeated schema modification.
1039/// Why only top-level: Nested projection uses leaf column indices, not parent struct IDs.
1040/// Why 1-indexed: Compatibility with iceberg-java's ParquetSchemaUtil.addFallbackIds().
1041fn add_fallback_field_ids_to_arrow_schema(arrow_schema: &ArrowSchemaRef) -> Arc<ArrowSchema> {
1042    debug_assert!(
1043        arrow_schema
1044            .fields()
1045            .iter()
1046            .next()
1047            .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1048        "Schema already has field IDs"
1049    );
1050
1051    use arrow_schema::Field;
1052
1053    let fields_with_fallback_ids: Vec<_> = arrow_schema
1054        .fields()
1055        .iter()
1056        .enumerate()
1057        .map(|(pos, field)| {
1058            let mut metadata = field.metadata().clone();
1059            let field_id = (pos + 1) as i32; // 1-indexed for Java compatibility
1060            metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1061
1062            Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1063                .with_metadata(metadata)
1064        })
1065        .collect();
1066
1067    Arc::new(ArrowSchema::new_with_metadata(
1068        fields_with_fallback_ids,
1069        arrow_schema.metadata().clone(),
1070    ))
1071}
1072
1073/// A visitor to collect field ids from bound predicates.
1074struct CollectFieldIdVisitor {
1075    field_ids: HashSet<i32>,
1076}
1077
1078impl CollectFieldIdVisitor {
1079    fn field_ids(self) -> HashSet<i32> {
1080        self.field_ids
1081    }
1082}
1083
1084impl BoundPredicateVisitor for CollectFieldIdVisitor {
1085    type T = ();
1086
1087    fn always_true(&mut self) -> Result<()> {
1088        Ok(())
1089    }
1090
1091    fn always_false(&mut self) -> Result<()> {
1092        Ok(())
1093    }
1094
1095    fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1096        Ok(())
1097    }
1098
1099    fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1100        Ok(())
1101    }
1102
1103    fn not(&mut self, _inner: ()) -> Result<()> {
1104        Ok(())
1105    }
1106
1107    fn is_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1108        self.field_ids.insert(reference.field().id);
1109        Ok(())
1110    }
1111
1112    fn not_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1113        self.field_ids.insert(reference.field().id);
1114        Ok(())
1115    }
1116
1117    fn is_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1118        self.field_ids.insert(reference.field().id);
1119        Ok(())
1120    }
1121
1122    fn not_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1123        self.field_ids.insert(reference.field().id);
1124        Ok(())
1125    }
1126
1127    fn less_than(
1128        &mut self,
1129        reference: &BoundReference,
1130        _literal: &Datum,
1131        _predicate: &BoundPredicate,
1132    ) -> Result<()> {
1133        self.field_ids.insert(reference.field().id);
1134        Ok(())
1135    }
1136
1137    fn less_than_or_eq(
1138        &mut self,
1139        reference: &BoundReference,
1140        _literal: &Datum,
1141        _predicate: &BoundPredicate,
1142    ) -> Result<()> {
1143        self.field_ids.insert(reference.field().id);
1144        Ok(())
1145    }
1146
1147    fn greater_than(
1148        &mut self,
1149        reference: &BoundReference,
1150        _literal: &Datum,
1151        _predicate: &BoundPredicate,
1152    ) -> Result<()> {
1153        self.field_ids.insert(reference.field().id);
1154        Ok(())
1155    }
1156
1157    fn greater_than_or_eq(
1158        &mut self,
1159        reference: &BoundReference,
1160        _literal: &Datum,
1161        _predicate: &BoundPredicate,
1162    ) -> Result<()> {
1163        self.field_ids.insert(reference.field().id);
1164        Ok(())
1165    }
1166
1167    fn eq(
1168        &mut self,
1169        reference: &BoundReference,
1170        _literal: &Datum,
1171        _predicate: &BoundPredicate,
1172    ) -> Result<()> {
1173        self.field_ids.insert(reference.field().id);
1174        Ok(())
1175    }
1176
1177    fn not_eq(
1178        &mut self,
1179        reference: &BoundReference,
1180        _literal: &Datum,
1181        _predicate: &BoundPredicate,
1182    ) -> Result<()> {
1183        self.field_ids.insert(reference.field().id);
1184        Ok(())
1185    }
1186
1187    fn starts_with(
1188        &mut self,
1189        reference: &BoundReference,
1190        _literal: &Datum,
1191        _predicate: &BoundPredicate,
1192    ) -> Result<()> {
1193        self.field_ids.insert(reference.field().id);
1194        Ok(())
1195    }
1196
1197    fn not_starts_with(
1198        &mut self,
1199        reference: &BoundReference,
1200        _literal: &Datum,
1201        _predicate: &BoundPredicate,
1202    ) -> Result<()> {
1203        self.field_ids.insert(reference.field().id);
1204        Ok(())
1205    }
1206
1207    fn r#in(
1208        &mut self,
1209        reference: &BoundReference,
1210        _literals: &FnvHashSet<Datum>,
1211        _predicate: &BoundPredicate,
1212    ) -> Result<()> {
1213        self.field_ids.insert(reference.field().id);
1214        Ok(())
1215    }
1216
1217    fn not_in(
1218        &mut self,
1219        reference: &BoundReference,
1220        _literals: &FnvHashSet<Datum>,
1221        _predicate: &BoundPredicate,
1222    ) -> Result<()> {
1223        self.field_ids.insert(reference.field().id);
1224        Ok(())
1225    }
1226}
1227
1228/// A visitor to convert Iceberg bound predicates to Arrow predicates.
1229struct PredicateConverter<'a> {
1230    /// The Parquet schema descriptor.
1231    pub parquet_schema: &'a SchemaDescriptor,
1232    /// The map between field id and leaf column index in Parquet schema.
1233    pub column_map: &'a HashMap<i32, usize>,
1234    /// The required column indices in Parquet schema for the predicates.
1235    pub column_indices: &'a Vec<usize>,
1236}
1237
1238impl PredicateConverter<'_> {
1239    /// When visiting a bound reference, we return index of the leaf column in the
1240    /// required column indices which is used to project the column in the record batch.
1241    /// Return None if the field id is not found in the column map, which is possible
1242    /// due to schema evolution.
1243    fn bound_reference(&mut self, reference: &BoundReference) -> Result<Option<usize>> {
1244        // The leaf column's index in Parquet schema.
1245        if let Some(column_idx) = self.column_map.get(&reference.field().id) {
1246            if self.parquet_schema.get_column_root(*column_idx).is_group() {
1247                return Err(Error::new(
1248                    ErrorKind::DataInvalid,
1249                    format!(
1250                        "Leave column `{}` in predicates isn't a root column in Parquet schema.",
1251                        reference.field().name
1252                    ),
1253                ));
1254            }
1255
1256            // The leaf column's index in the required column indices.
1257            let index = self
1258                .column_indices
1259                .iter()
1260                .position(|&idx| idx == *column_idx)
1261                .ok_or(Error::new(
1262                    ErrorKind::DataInvalid,
1263                    format!(
1264                "Leave column `{}` in predicates cannot be found in the required column indices.",
1265                reference.field().name
1266            ),
1267                ))?;
1268
1269            Ok(Some(index))
1270        } else {
1271            Ok(None)
1272        }
1273    }
1274
1275    /// Build an Arrow predicate that always returns true.
1276    fn build_always_true(&self) -> Result<Box<PredicateResult>> {
1277        Ok(Box::new(|batch| {
1278            Ok(BooleanArray::from(vec![true; batch.num_rows()]))
1279        }))
1280    }
1281
1282    /// Build an Arrow predicate that always returns false.
1283    fn build_always_false(&self) -> Result<Box<PredicateResult>> {
1284        Ok(Box::new(|batch| {
1285            Ok(BooleanArray::from(vec![false; batch.num_rows()]))
1286        }))
1287    }
1288}
1289
1290/// Gets the leaf column from the record batch for the required column index. Only
1291/// supports top-level columns for now.
1292fn project_column(
1293    batch: &RecordBatch,
1294    column_idx: usize,
1295) -> std::result::Result<ArrayRef, ArrowError> {
1296    let column = batch.column(column_idx);
1297
1298    match column.data_type() {
1299        DataType::Struct(_) => Err(ArrowError::SchemaError(
1300            "Does not support struct column yet.".to_string(),
1301        )),
1302        _ => Ok(column.clone()),
1303    }
1304}
1305
1306type PredicateResult =
1307    dyn FnMut(RecordBatch) -> std::result::Result<BooleanArray, ArrowError> + Send + 'static;
1308
1309impl BoundPredicateVisitor for PredicateConverter<'_> {
1310    type T = Box<PredicateResult>;
1311
1312    fn always_true(&mut self) -> Result<Box<PredicateResult>> {
1313        self.build_always_true()
1314    }
1315
1316    fn always_false(&mut self) -> Result<Box<PredicateResult>> {
1317        self.build_always_false()
1318    }
1319
1320    fn and(
1321        &mut self,
1322        mut lhs: Box<PredicateResult>,
1323        mut rhs: Box<PredicateResult>,
1324    ) -> Result<Box<PredicateResult>> {
1325        Ok(Box::new(move |batch| {
1326            let left = lhs(batch.clone())?;
1327            let right = rhs(batch)?;
1328            and_kleene(&left, &right)
1329        }))
1330    }
1331
1332    fn or(
1333        &mut self,
1334        mut lhs: Box<PredicateResult>,
1335        mut rhs: Box<PredicateResult>,
1336    ) -> Result<Box<PredicateResult>> {
1337        Ok(Box::new(move |batch| {
1338            let left = lhs(batch.clone())?;
1339            let right = rhs(batch)?;
1340            or_kleene(&left, &right)
1341        }))
1342    }
1343
1344    fn not(&mut self, mut inner: Box<PredicateResult>) -> Result<Box<PredicateResult>> {
1345        Ok(Box::new(move |batch| {
1346            let pred_ret = inner(batch)?;
1347            not(&pred_ret)
1348        }))
1349    }
1350
1351    fn is_null(
1352        &mut self,
1353        reference: &BoundReference,
1354        _predicate: &BoundPredicate,
1355    ) -> Result<Box<PredicateResult>> {
1356        if let Some(idx) = self.bound_reference(reference)? {
1357            Ok(Box::new(move |batch| {
1358                let column = project_column(&batch, idx)?;
1359                is_null(&column)
1360            }))
1361        } else {
1362            // A missing column, treating it as null.
1363            self.build_always_true()
1364        }
1365    }
1366
1367    fn not_null(
1368        &mut self,
1369        reference: &BoundReference,
1370        _predicate: &BoundPredicate,
1371    ) -> Result<Box<PredicateResult>> {
1372        if let Some(idx) = self.bound_reference(reference)? {
1373            Ok(Box::new(move |batch| {
1374                let column = project_column(&batch, idx)?;
1375                is_not_null(&column)
1376            }))
1377        } else {
1378            // A missing column, treating it as null.
1379            self.build_always_false()
1380        }
1381    }
1382
1383    fn is_nan(
1384        &mut self,
1385        reference: &BoundReference,
1386        _predicate: &BoundPredicate,
1387    ) -> Result<Box<PredicateResult>> {
1388        if self.bound_reference(reference)?.is_some() {
1389            self.build_always_true()
1390        } else {
1391            // A missing column, treating it as null.
1392            self.build_always_false()
1393        }
1394    }
1395
1396    fn not_nan(
1397        &mut self,
1398        reference: &BoundReference,
1399        _predicate: &BoundPredicate,
1400    ) -> Result<Box<PredicateResult>> {
1401        if self.bound_reference(reference)?.is_some() {
1402            self.build_always_false()
1403        } else {
1404            // A missing column, treating it as null.
1405            self.build_always_true()
1406        }
1407    }
1408
1409    fn less_than(
1410        &mut self,
1411        reference: &BoundReference,
1412        literal: &Datum,
1413        _predicate: &BoundPredicate,
1414    ) -> Result<Box<PredicateResult>> {
1415        if let Some(idx) = self.bound_reference(reference)? {
1416            let literal = get_arrow_datum(literal)?;
1417
1418            Ok(Box::new(move |batch| {
1419                let left = project_column(&batch, idx)?;
1420                let literal = try_cast_literal(&literal, left.data_type())?;
1421                lt(&left, literal.as_ref())
1422            }))
1423        } else {
1424            // A missing column, treating it as null.
1425            self.build_always_true()
1426        }
1427    }
1428
1429    fn less_than_or_eq(
1430        &mut self,
1431        reference: &BoundReference,
1432        literal: &Datum,
1433        _predicate: &BoundPredicate,
1434    ) -> Result<Box<PredicateResult>> {
1435        if let Some(idx) = self.bound_reference(reference)? {
1436            let literal = get_arrow_datum(literal)?;
1437
1438            Ok(Box::new(move |batch| {
1439                let left = project_column(&batch, idx)?;
1440                let literal = try_cast_literal(&literal, left.data_type())?;
1441                lt_eq(&left, literal.as_ref())
1442            }))
1443        } else {
1444            // A missing column, treating it as null.
1445            self.build_always_true()
1446        }
1447    }
1448
1449    fn greater_than(
1450        &mut self,
1451        reference: &BoundReference,
1452        literal: &Datum,
1453        _predicate: &BoundPredicate,
1454    ) -> Result<Box<PredicateResult>> {
1455        if let Some(idx) = self.bound_reference(reference)? {
1456            let literal = get_arrow_datum(literal)?;
1457
1458            Ok(Box::new(move |batch| {
1459                let left = project_column(&batch, idx)?;
1460                let literal = try_cast_literal(&literal, left.data_type())?;
1461                gt(&left, literal.as_ref())
1462            }))
1463        } else {
1464            // A missing column, treating it as null.
1465            self.build_always_false()
1466        }
1467    }
1468
1469    fn greater_than_or_eq(
1470        &mut self,
1471        reference: &BoundReference,
1472        literal: &Datum,
1473        _predicate: &BoundPredicate,
1474    ) -> Result<Box<PredicateResult>> {
1475        if let Some(idx) = self.bound_reference(reference)? {
1476            let literal = get_arrow_datum(literal)?;
1477
1478            Ok(Box::new(move |batch| {
1479                let left = project_column(&batch, idx)?;
1480                let literal = try_cast_literal(&literal, left.data_type())?;
1481                gt_eq(&left, literal.as_ref())
1482            }))
1483        } else {
1484            // A missing column, treating it as null.
1485            self.build_always_false()
1486        }
1487    }
1488
1489    fn eq(
1490        &mut self,
1491        reference: &BoundReference,
1492        literal: &Datum,
1493        _predicate: &BoundPredicate,
1494    ) -> Result<Box<PredicateResult>> {
1495        if let Some(idx) = self.bound_reference(reference)? {
1496            let literal = get_arrow_datum(literal)?;
1497
1498            Ok(Box::new(move |batch| {
1499                let left = project_column(&batch, idx)?;
1500                let literal = try_cast_literal(&literal, left.data_type())?;
1501                eq(&left, literal.as_ref())
1502            }))
1503        } else {
1504            // A missing column, treating it as null.
1505            self.build_always_false()
1506        }
1507    }
1508
1509    fn not_eq(
1510        &mut self,
1511        reference: &BoundReference,
1512        literal: &Datum,
1513        _predicate: &BoundPredicate,
1514    ) -> Result<Box<PredicateResult>> {
1515        if let Some(idx) = self.bound_reference(reference)? {
1516            let literal = get_arrow_datum(literal)?;
1517
1518            Ok(Box::new(move |batch| {
1519                let left = project_column(&batch, idx)?;
1520                let literal = try_cast_literal(&literal, left.data_type())?;
1521                neq(&left, literal.as_ref())
1522            }))
1523        } else {
1524            // A missing column, treating it as null.
1525            self.build_always_false()
1526        }
1527    }
1528
1529    fn starts_with(
1530        &mut self,
1531        reference: &BoundReference,
1532        literal: &Datum,
1533        _predicate: &BoundPredicate,
1534    ) -> Result<Box<PredicateResult>> {
1535        if let Some(idx) = self.bound_reference(reference)? {
1536            let literal = get_arrow_datum(literal)?;
1537
1538            Ok(Box::new(move |batch| {
1539                let left = project_column(&batch, idx)?;
1540                let literal = try_cast_literal(&literal, left.data_type())?;
1541                starts_with(&left, literal.as_ref())
1542            }))
1543        } else {
1544            // A missing column, treating it as null.
1545            self.build_always_false()
1546        }
1547    }
1548
1549    fn not_starts_with(
1550        &mut self,
1551        reference: &BoundReference,
1552        literal: &Datum,
1553        _predicate: &BoundPredicate,
1554    ) -> Result<Box<PredicateResult>> {
1555        if let Some(idx) = self.bound_reference(reference)? {
1556            let literal = get_arrow_datum(literal)?;
1557
1558            Ok(Box::new(move |batch| {
1559                let left = project_column(&batch, idx)?;
1560                let literal = try_cast_literal(&literal, left.data_type())?;
1561                // update here if arrow ever adds a native not_starts_with
1562                not(&starts_with(&left, literal.as_ref())?)
1563            }))
1564        } else {
1565            // A missing column, treating it as null.
1566            self.build_always_true()
1567        }
1568    }
1569
1570    fn r#in(
1571        &mut self,
1572        reference: &BoundReference,
1573        literals: &FnvHashSet<Datum>,
1574        _predicate: &BoundPredicate,
1575    ) -> Result<Box<PredicateResult>> {
1576        if let Some(idx) = self.bound_reference(reference)? {
1577            let literals: Vec<_> = literals
1578                .iter()
1579                .map(|lit| get_arrow_datum(lit).unwrap())
1580                .collect();
1581
1582            Ok(Box::new(move |batch| {
1583                // update this if arrow ever adds a native is_in kernel
1584                let left = project_column(&batch, idx)?;
1585
1586                let mut acc = BooleanArray::from(vec![false; batch.num_rows()]);
1587                for literal in &literals {
1588                    let literal = try_cast_literal(literal, left.data_type())?;
1589                    acc = or(&acc, &eq(&left, literal.as_ref())?)?
1590                }
1591
1592                Ok(acc)
1593            }))
1594        } else {
1595            // A missing column, treating it as null.
1596            self.build_always_false()
1597        }
1598    }
1599
1600    fn not_in(
1601        &mut self,
1602        reference: &BoundReference,
1603        literals: &FnvHashSet<Datum>,
1604        _predicate: &BoundPredicate,
1605    ) -> Result<Box<PredicateResult>> {
1606        if let Some(idx) = self.bound_reference(reference)? {
1607            let literals: Vec<_> = literals
1608                .iter()
1609                .map(|lit| get_arrow_datum(lit).unwrap())
1610                .collect();
1611
1612            Ok(Box::new(move |batch| {
1613                // update this if arrow ever adds a native not_in kernel
1614                let left = project_column(&batch, idx)?;
1615                let mut acc = BooleanArray::from(vec![true; batch.num_rows()]);
1616                for literal in &literals {
1617                    let literal = try_cast_literal(literal, left.data_type())?;
1618                    acc = and(&acc, &neq(&left, literal.as_ref())?)?
1619                }
1620
1621                Ok(acc)
1622            }))
1623        } else {
1624            // A missing column, treating it as null.
1625            self.build_always_true()
1626        }
1627    }
1628}
1629
1630/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader.
1631pub struct ArrowFileReader<R: FileRead> {
1632    meta: FileMetadata,
1633    preload_column_index: bool,
1634    preload_offset_index: bool,
1635    preload_page_index: bool,
1636    metadata_size_hint: Option<usize>,
1637    r: R,
1638}
1639
1640impl<R: FileRead> ArrowFileReader<R> {
1641    /// Create a new ArrowFileReader
1642    pub fn new(meta: FileMetadata, r: R) -> Self {
1643        Self {
1644            meta,
1645            preload_column_index: false,
1646            preload_offset_index: false,
1647            preload_page_index: false,
1648            metadata_size_hint: None,
1649            r,
1650        }
1651    }
1652
1653    /// Enable or disable preloading of the column index
1654    pub fn with_preload_column_index(mut self, preload: bool) -> Self {
1655        self.preload_column_index = preload;
1656        self
1657    }
1658
1659    /// Enable or disable preloading of the offset index
1660    pub fn with_preload_offset_index(mut self, preload: bool) -> Self {
1661        self.preload_offset_index = preload;
1662        self
1663    }
1664
1665    /// Enable or disable preloading of the page index
1666    pub fn with_preload_page_index(mut self, preload: bool) -> Self {
1667        self.preload_page_index = preload;
1668        self
1669    }
1670
1671    /// Provide a hint as to the number of bytes to prefetch for parsing the Parquet metadata
1672    ///
1673    /// This hint can help reduce the number of fetch requests. For more details see the
1674    /// [ParquetMetaDataReader documentation](https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html#method.with_prefetch_hint).
1675    pub fn with_metadata_size_hint(mut self, hint: usize) -> Self {
1676        self.metadata_size_hint = Some(hint);
1677        self
1678    }
1679}
1680
1681impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
1682    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
1683        Box::pin(
1684            self.r
1685                .read(range.start..range.end)
1686                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
1687        )
1688    }
1689
1690    // TODO: currently we don't respect `ArrowReaderOptions` cause it don't expose any method to access the option field
1691    // we will fix it after `v55.1.0` is released in https://github.com/apache/arrow-rs/issues/7393
1692    fn get_metadata(
1693        &mut self,
1694        _options: Option<&'_ ArrowReaderOptions>,
1695    ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
1696        async move {
1697            let reader = ParquetMetaDataReader::new()
1698                .with_prefetch_hint(self.metadata_size_hint)
1699                // Set the page policy first because it updates both column and offset policies.
1700                .with_page_index_policy(PageIndexPolicy::from(self.preload_page_index))
1701                .with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
1702                .with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index));
1703            let size = self.meta.size;
1704            let meta = reader.load_and_finish(self, size).await?;
1705
1706            Ok(Arc::new(meta))
1707        }
1708        .boxed()
1709    }
1710}
1711
1712/// The Arrow type of an array that the Parquet reader reads may not match the exact Arrow type
1713/// that Iceberg uses for literals - but they are effectively the same logical type,
1714/// i.e. LargeUtf8 and Utf8 or Utf8View and Utf8 or Utf8View and LargeUtf8.
1715///
1716/// The Arrow compute kernels that we use must match the type exactly, so first cast the literal
1717/// into the type of the batch we read from Parquet before sending it to the compute kernel.
1718fn try_cast_literal(
1719    literal: &Arc<dyn ArrowDatum + Send + Sync>,
1720    column_type: &DataType,
1721) -> std::result::Result<Arc<dyn ArrowDatum + Send + Sync>, ArrowError> {
1722    let literal_array = literal.get().0;
1723
1724    // No cast required
1725    if literal_array.data_type() == column_type {
1726        return Ok(Arc::clone(literal));
1727    }
1728
1729    let literal_array = cast(literal_array, column_type)?;
1730    Ok(Arc::new(Scalar::new(literal_array)))
1731}
1732
1733#[cfg(test)]
1734mod tests {
1735    use std::collections::{HashMap, HashSet};
1736    use std::fs::File;
1737    use std::sync::Arc;
1738
1739    use arrow_array::cast::AsArray;
1740    use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray};
1741    use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1742    use futures::TryStreamExt;
1743    use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
1744    use parquet::arrow::{ArrowWriter, ProjectionMask};
1745    use parquet::basic::Compression;
1746    use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1747    use parquet::file::properties::WriterProperties;
1748    use parquet::schema::parser::parse_message_type;
1749    use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
1750    use roaring::RoaringTreemap;
1751    use tempfile::TempDir;
1752
1753    use crate::ErrorKind;
1754    use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
1755    use crate::arrow::{ArrowReader, ArrowReaderBuilder};
1756    use crate::delete_vector::DeleteVector;
1757    use crate::expr::visitors::bound_predicate_visitor::visit;
1758    use crate::expr::{Bind, Predicate, Reference};
1759    use crate::io::FileIO;
1760    use crate::scan::{FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream};
1761    use crate::spec::{
1762        DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type,
1763    };
1764
1765    fn table_schema_simple() -> SchemaRef {
1766        Arc::new(
1767            Schema::builder()
1768                .with_schema_id(1)
1769                .with_identifier_field_ids(vec![2])
1770                .with_fields(vec![
1771                    NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1772                    NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1773                    NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1774                    NestedField::optional(4, "qux", Type::Primitive(PrimitiveType::Float)).into(),
1775                ])
1776                .build()
1777                .unwrap(),
1778        )
1779    }
1780
1781    #[test]
1782    fn test_collect_field_id() {
1783        let schema = table_schema_simple();
1784        let expr = Reference::new("qux").is_null();
1785        let bound_expr = expr.bind(schema, true).unwrap();
1786
1787        let mut visitor = CollectFieldIdVisitor {
1788            field_ids: HashSet::default(),
1789        };
1790        visit(&mut visitor, &bound_expr).unwrap();
1791
1792        let mut expected = HashSet::default();
1793        expected.insert(4_i32);
1794
1795        assert_eq!(visitor.field_ids, expected);
1796    }
1797
1798    #[test]
1799    fn test_collect_field_id_with_and() {
1800        let schema = table_schema_simple();
1801        let expr = Reference::new("qux")
1802            .is_null()
1803            .and(Reference::new("baz").is_null());
1804        let bound_expr = expr.bind(schema, true).unwrap();
1805
1806        let mut visitor = CollectFieldIdVisitor {
1807            field_ids: HashSet::default(),
1808        };
1809        visit(&mut visitor, &bound_expr).unwrap();
1810
1811        let mut expected = HashSet::default();
1812        expected.insert(4_i32);
1813        expected.insert(3);
1814
1815        assert_eq!(visitor.field_ids, expected);
1816    }
1817
1818    #[test]
1819    fn test_collect_field_id_with_or() {
1820        let schema = table_schema_simple();
1821        let expr = Reference::new("qux")
1822            .is_null()
1823            .or(Reference::new("baz").is_null());
1824        let bound_expr = expr.bind(schema, true).unwrap();
1825
1826        let mut visitor = CollectFieldIdVisitor {
1827            field_ids: HashSet::default(),
1828        };
1829        visit(&mut visitor, &bound_expr).unwrap();
1830
1831        let mut expected = HashSet::default();
1832        expected.insert(4_i32);
1833        expected.insert(3);
1834
1835        assert_eq!(visitor.field_ids, expected);
1836    }
1837
1838    #[test]
1839    fn test_arrow_projection_mask() {
1840        let schema = Arc::new(
1841            Schema::builder()
1842                .with_schema_id(1)
1843                .with_identifier_field_ids(vec![1])
1844                .with_fields(vec![
1845                    NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
1846                    NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
1847                    NestedField::optional(
1848                        3,
1849                        "c3",
1850                        Type::Primitive(PrimitiveType::Decimal {
1851                            precision: 38,
1852                            scale: 3,
1853                        }),
1854                    )
1855                    .into(),
1856                ])
1857                .build()
1858                .unwrap(),
1859        );
1860        let arrow_schema = Arc::new(ArrowSchema::new(vec![
1861            Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
1862                PARQUET_FIELD_ID_META_KEY.to_string(),
1863                "1".to_string(),
1864            )])),
1865            // Type not supported
1866            Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
1867                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
1868            ),
1869            // Precision is beyond the supported range
1870            Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
1871                PARQUET_FIELD_ID_META_KEY.to_string(),
1872                "3".to_string(),
1873            )])),
1874        ]));
1875
1876        let message_type = "
1877message schema {
1878  required binary c1 (STRING) = 1;
1879  optional int32 c2 (INTEGER(8,true)) = 2;
1880  optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
1881}
1882    ";
1883        let parquet_type = parse_message_type(message_type).expect("should parse schema");
1884        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
1885
1886        // Try projecting the fields c2 and c3 with the unsupported data types
1887        let err = ArrowReader::get_arrow_projection_mask(
1888            &[1, 2, 3],
1889            &schema,
1890            &parquet_schema,
1891            &arrow_schema,
1892            false,
1893        )
1894        .unwrap_err();
1895
1896        assert_eq!(err.kind(), ErrorKind::DataInvalid);
1897        assert_eq!(
1898            err.to_string(),
1899            "DataInvalid => Unsupported Arrow data type: Duration(Microsecond)".to_string()
1900        );
1901
1902        // Omitting field c2, we still get an error due to c3 being selected
1903        let err = ArrowReader::get_arrow_projection_mask(
1904            &[1, 3],
1905            &schema,
1906            &parquet_schema,
1907            &arrow_schema,
1908            false,
1909        )
1910        .unwrap_err();
1911
1912        assert_eq!(err.kind(), ErrorKind::DataInvalid);
1913        assert_eq!(
1914            err.to_string(),
1915            "DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
1916        );
1917
1918        // Finally avoid selecting fields with unsupported data types
1919        let mask = ArrowReader::get_arrow_projection_mask(
1920            &[1],
1921            &schema,
1922            &parquet_schema,
1923            &arrow_schema,
1924            false,
1925        )
1926        .expect("Some ProjectionMask");
1927        assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
1928    }
1929
1930    #[tokio::test]
1931    async fn test_kleene_logic_or_behaviour() {
1932        // a IS NULL OR a = 'foo'
1933        let predicate = Reference::new("a")
1934            .is_null()
1935            .or(Reference::new("a").equal_to(Datum::string("foo")));
1936
1937        // Table data: [NULL, "foo", "bar"]
1938        let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
1939
1940        // Expected: [NULL, "foo"].
1941        let expected = vec![None, Some("foo".to_string())];
1942
1943        let (file_io, schema, table_location, _temp_dir) =
1944            setup_kleene_logic(data_for_col_a, DataType::Utf8);
1945        let reader = ArrowReaderBuilder::new(file_io).build();
1946
1947        let result_data = test_perform_read(predicate, schema, table_location, reader).await;
1948
1949        assert_eq!(result_data, expected);
1950    }
1951
1952    #[tokio::test]
1953    async fn test_kleene_logic_and_behaviour() {
1954        // a IS NOT NULL AND a != 'foo'
1955        let predicate = Reference::new("a")
1956            .is_not_null()
1957            .and(Reference::new("a").not_equal_to(Datum::string("foo")));
1958
1959        // Table data: [NULL, "foo", "bar"]
1960        let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
1961
1962        // Expected: ["bar"].
1963        let expected = vec![Some("bar".to_string())];
1964
1965        let (file_io, schema, table_location, _temp_dir) =
1966            setup_kleene_logic(data_for_col_a, DataType::Utf8);
1967        let reader = ArrowReaderBuilder::new(file_io).build();
1968
1969        let result_data = test_perform_read(predicate, schema, table_location, reader).await;
1970
1971        assert_eq!(result_data, expected);
1972    }
1973
1974    #[tokio::test]
1975    async fn test_predicate_cast_literal() {
1976        let predicates = vec![
1977            // a == 'foo'
1978            (Reference::new("a").equal_to(Datum::string("foo")), vec![
1979                Some("foo".to_string()),
1980            ]),
1981            // a != 'foo'
1982            (
1983                Reference::new("a").not_equal_to(Datum::string("foo")),
1984                vec![Some("bar".to_string())],
1985            ),
1986            // STARTS_WITH(a, 'foo')
1987            (Reference::new("a").starts_with(Datum::string("f")), vec![
1988                Some("foo".to_string()),
1989            ]),
1990            // NOT STARTS_WITH(a, 'foo')
1991            (
1992                Reference::new("a").not_starts_with(Datum::string("f")),
1993                vec![Some("bar".to_string())],
1994            ),
1995            // a < 'foo'
1996            (Reference::new("a").less_than(Datum::string("foo")), vec![
1997                Some("bar".to_string()),
1998            ]),
1999            // a <= 'foo'
2000            (
2001                Reference::new("a").less_than_or_equal_to(Datum::string("foo")),
2002                vec![Some("foo".to_string()), Some("bar".to_string())],
2003            ),
2004            // a > 'foo'
2005            (
2006                Reference::new("a").greater_than(Datum::string("bar")),
2007                vec![Some("foo".to_string())],
2008            ),
2009            // a >= 'foo'
2010            (
2011                Reference::new("a").greater_than_or_equal_to(Datum::string("foo")),
2012                vec![Some("foo".to_string())],
2013            ),
2014            // a IN ('foo', 'bar')
2015            (
2016                Reference::new("a").is_in([Datum::string("foo"), Datum::string("baz")]),
2017                vec![Some("foo".to_string())],
2018            ),
2019            // a NOT IN ('foo', 'bar')
2020            (
2021                Reference::new("a").is_not_in([Datum::string("foo"), Datum::string("baz")]),
2022                vec![Some("bar".to_string())],
2023            ),
2024        ];
2025
2026        // Table data: ["foo", "bar"]
2027        let data_for_col_a = vec![Some("foo".to_string()), Some("bar".to_string())];
2028
2029        let (file_io, schema, table_location, _temp_dir) =
2030            setup_kleene_logic(data_for_col_a, DataType::LargeUtf8);
2031        let reader = ArrowReaderBuilder::new(file_io).build();
2032
2033        for (predicate, expected) in predicates {
2034            println!("testing predicate {predicate}");
2035            let result_data = test_perform_read(
2036                predicate.clone(),
2037                schema.clone(),
2038                table_location.clone(),
2039                reader.clone(),
2040            )
2041            .await;
2042
2043            assert_eq!(result_data, expected, "predicate={predicate}");
2044        }
2045    }
2046
2047    async fn test_perform_read(
2048        predicate: Predicate,
2049        schema: SchemaRef,
2050        table_location: String,
2051        reader: ArrowReader,
2052    ) -> Vec<Option<String>> {
2053        let tasks = Box::pin(futures::stream::iter(
2054            vec![Ok(FileScanTask {
2055                start: 0,
2056                length: 0,
2057                record_count: None,
2058                data_file_path: format!("{table_location}/1.parquet"),
2059                data_file_format: DataFileFormat::Parquet,
2060                schema: schema.clone(),
2061                project_field_ids: vec![1],
2062                predicate: Some(predicate.bind(schema, true).unwrap()),
2063                deletes: vec![],
2064                partition: None,
2065                partition_spec: None,
2066                name_mapping: None,
2067            })]
2068            .into_iter(),
2069        )) as FileScanTaskStream;
2070
2071        let result = reader
2072            .read(tasks)
2073            .unwrap()
2074            .try_collect::<Vec<RecordBatch>>()
2075            .await
2076            .unwrap();
2077
2078        result[0].columns()[0]
2079            .as_string_opt::<i32>()
2080            .unwrap()
2081            .iter()
2082            .map(|v| v.map(ToOwned::to_owned))
2083            .collect::<Vec<_>>()
2084    }
2085
2086    fn setup_kleene_logic(
2087        data_for_col_a: Vec<Option<String>>,
2088        col_a_type: DataType,
2089    ) -> (FileIO, SchemaRef, String, TempDir) {
2090        let schema = Arc::new(
2091            Schema::builder()
2092                .with_schema_id(1)
2093                .with_fields(vec![
2094                    NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(),
2095                ])
2096                .build()
2097                .unwrap(),
2098        );
2099
2100        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2101            Field::new("a", col_a_type.clone(), true).with_metadata(HashMap::from([(
2102                PARQUET_FIELD_ID_META_KEY.to_string(),
2103                "1".to_string(),
2104            )])),
2105        ]));
2106
2107        let tmp_dir = TempDir::new().unwrap();
2108        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2109
2110        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2111
2112        let col = match col_a_type {
2113            DataType::Utf8 => Arc::new(StringArray::from(data_for_col_a)) as ArrayRef,
2114            DataType::LargeUtf8 => Arc::new(LargeStringArray::from(data_for_col_a)) as ArrayRef,
2115            _ => panic!("unexpected col_a_type"),
2116        };
2117
2118        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col]).unwrap();
2119
2120        // Write the Parquet files
2121        let props = WriterProperties::builder()
2122            .set_compression(Compression::SNAPPY)
2123            .build();
2124
2125        let file = File::create(format!("{}/1.parquet", &table_location)).unwrap();
2126        let mut writer =
2127            ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
2128
2129        writer.write(&to_write).expect("Writing batch");
2130
2131        // writer must be closed to write footer
2132        writer.close().unwrap();
2133
2134        (file_io, schema, table_location, tmp_dir)
2135    }
2136
2137    #[test]
2138    fn test_build_deletes_row_selection() {
2139        let schema_descr = get_test_schema_descr();
2140
2141        let mut columns = vec![];
2142        for ptr in schema_descr.columns() {
2143            let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
2144            columns.push(column);
2145        }
2146
2147        let row_groups_metadata = vec![
2148            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 0),
2149            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 1),
2150            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 2),
2151            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 3),
2152            build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 4),
2153        ];
2154
2155        let selected_row_groups = Some(vec![1, 3]);
2156
2157        /* cases to cover:
2158           * {skip|select} {first|intermediate|last} {one row|multiple rows} in
2159             {first|intermediate|last} {skipped|selected} row group
2160           * row group selection disabled
2161        */
2162
2163        let positional_deletes = RoaringTreemap::from_iter(&[
2164            1, // in skipped rg 0, should be ignored
2165            3, // run of three consecutive items in skipped rg0
2166            4, 5, 998, // two consecutive items at end of skipped rg0
2167            999, 1000, // solitary row at start of selected rg1 (1, 9)
2168            1010, // run of 3 rows in selected rg1
2169            1011, 1012, // (3, 485)
2170            1498, // run of two items at end of selected rg1
2171            1499, 1500, // run of two items at start of skipped rg2
2172            1501, 1600, // should ignore, in skipped rg2
2173            1999, // single row at end of skipped rg2
2174            2000, // run of two items at start of selected rg3
2175            2001, // (4, 98)
2176            2100, // single row in selected row group 3 (1, 99)
2177            2200, // run of 3 consecutive rows in selected row group 3
2178            2201, 2202, // (3, 796)
2179            2999, // single item at end of selected rg3 (1)
2180            3000, // single item at start of skipped rg4
2181        ]);
2182
2183        let positional_deletes = DeleteVector::new(positional_deletes);
2184
2185        // using selected row groups 1 and 3
2186        let result = ArrowReader::build_deletes_row_selection(
2187            &row_groups_metadata,
2188            &selected_row_groups,
2189            &positional_deletes,
2190        )
2191        .unwrap();
2192
2193        let expected = RowSelection::from(vec![
2194            RowSelector::skip(1),
2195            RowSelector::select(9),
2196            RowSelector::skip(3),
2197            RowSelector::select(485),
2198            RowSelector::skip(4),
2199            RowSelector::select(98),
2200            RowSelector::skip(1),
2201            RowSelector::select(99),
2202            RowSelector::skip(3),
2203            RowSelector::select(796),
2204            RowSelector::skip(1),
2205        ]);
2206
2207        assert_eq!(result, expected);
2208
2209        // selecting all row groups
2210        let result = ArrowReader::build_deletes_row_selection(
2211            &row_groups_metadata,
2212            &None,
2213            &positional_deletes,
2214        )
2215        .unwrap();
2216
2217        let expected = RowSelection::from(vec![
2218            RowSelector::select(1),
2219            RowSelector::skip(1),
2220            RowSelector::select(1),
2221            RowSelector::skip(3),
2222            RowSelector::select(992),
2223            RowSelector::skip(3),
2224            RowSelector::select(9),
2225            RowSelector::skip(3),
2226            RowSelector::select(485),
2227            RowSelector::skip(4),
2228            RowSelector::select(98),
2229            RowSelector::skip(1),
2230            RowSelector::select(398),
2231            RowSelector::skip(3),
2232            RowSelector::select(98),
2233            RowSelector::skip(1),
2234            RowSelector::select(99),
2235            RowSelector::skip(3),
2236            RowSelector::select(796),
2237            RowSelector::skip(2),
2238            RowSelector::select(499),
2239        ]);
2240
2241        assert_eq!(result, expected);
2242    }
2243
2244    fn build_test_row_group_meta(
2245        schema_descr: SchemaDescPtr,
2246        columns: Vec<ColumnChunkMetaData>,
2247        num_rows: i64,
2248        ordinal: i16,
2249    ) -> RowGroupMetaData {
2250        RowGroupMetaData::builder(schema_descr.clone())
2251            .set_num_rows(num_rows)
2252            .set_total_byte_size(2000)
2253            .set_column_metadata(columns)
2254            .set_ordinal(ordinal)
2255            .build()
2256            .unwrap()
2257    }
2258
2259    fn get_test_schema_descr() -> SchemaDescPtr {
2260        use parquet::schema::types::Type as SchemaType;
2261
2262        let schema = SchemaType::group_type_builder("schema")
2263            .with_fields(vec![
2264                Arc::new(
2265                    SchemaType::primitive_type_builder("a", parquet::basic::Type::INT32)
2266                        .build()
2267                        .unwrap(),
2268                ),
2269                Arc::new(
2270                    SchemaType::primitive_type_builder("b", parquet::basic::Type::INT32)
2271                        .build()
2272                        .unwrap(),
2273                ),
2274            ])
2275            .build()
2276            .unwrap();
2277
2278        Arc::new(SchemaDescriptor::new(Arc::new(schema)))
2279    }
2280
2281    /// Verifies that file splits respect byte ranges and only read specific row groups.
2282    #[tokio::test]
2283    async fn test_file_splits_respect_byte_ranges() {
2284        use arrow_array::Int32Array;
2285        use parquet::file::reader::{FileReader, SerializedFileReader};
2286
2287        let schema = Arc::new(
2288            Schema::builder()
2289                .with_schema_id(1)
2290                .with_fields(vec![
2291                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2292                ])
2293                .build()
2294                .unwrap(),
2295        );
2296
2297        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2298            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2299                PARQUET_FIELD_ID_META_KEY.to_string(),
2300                "1".to_string(),
2301            )])),
2302        ]));
2303
2304        let tmp_dir = TempDir::new().unwrap();
2305        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2306        let file_path = format!("{}/multi_row_group.parquet", &table_location);
2307
2308        // Force each batch into its own row group for testing byte range filtering.
2309        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2310            (0..100).collect::<Vec<i32>>(),
2311        ))])
2312        .unwrap();
2313        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2314            (100..200).collect::<Vec<i32>>(),
2315        ))])
2316        .unwrap();
2317        let batch3 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2318            (200..300).collect::<Vec<i32>>(),
2319        ))])
2320        .unwrap();
2321
2322        let props = WriterProperties::builder()
2323            .set_compression(Compression::SNAPPY)
2324            .set_max_row_group_size(100)
2325            .build();
2326
2327        let file = File::create(&file_path).unwrap();
2328        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2329        writer.write(&batch1).expect("Writing batch 1");
2330        writer.write(&batch2).expect("Writing batch 2");
2331        writer.write(&batch3).expect("Writing batch 3");
2332        writer.close().unwrap();
2333
2334        // Read the file metadata to get row group byte positions
2335        let file = File::open(&file_path).unwrap();
2336        let reader = SerializedFileReader::new(file).unwrap();
2337        let metadata = reader.metadata();
2338
2339        println!("File has {} row groups", metadata.num_row_groups());
2340        assert_eq!(metadata.num_row_groups(), 3, "Expected 3 row groups");
2341
2342        // Get byte positions for each row group
2343        let row_group_0 = metadata.row_group(0);
2344        let row_group_1 = metadata.row_group(1);
2345        let row_group_2 = metadata.row_group(2);
2346
2347        let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
2348        let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2349        let rg2_start = rg1_start + row_group_1.compressed_size() as u64;
2350        let file_end = rg2_start + row_group_2.compressed_size() as u64;
2351
2352        println!(
2353            "Row group 0: {} rows, starts at byte {}, {} bytes compressed",
2354            row_group_0.num_rows(),
2355            rg0_start,
2356            row_group_0.compressed_size()
2357        );
2358        println!(
2359            "Row group 1: {} rows, starts at byte {}, {} bytes compressed",
2360            row_group_1.num_rows(),
2361            rg1_start,
2362            row_group_1.compressed_size()
2363        );
2364        println!(
2365            "Row group 2: {} rows, starts at byte {}, {} bytes compressed",
2366            row_group_2.num_rows(),
2367            rg2_start,
2368            row_group_2.compressed_size()
2369        );
2370
2371        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2372        let reader = ArrowReaderBuilder::new(file_io).build();
2373
2374        // Task 1: read only the first row group
2375        let task1 = FileScanTask {
2376            start: rg0_start,
2377            length: row_group_0.compressed_size() as u64,
2378            record_count: Some(100),
2379            data_file_path: file_path.clone(),
2380            data_file_format: DataFileFormat::Parquet,
2381            schema: schema.clone(),
2382            project_field_ids: vec![1],
2383            predicate: None,
2384            deletes: vec![],
2385            partition: None,
2386            partition_spec: None,
2387            name_mapping: None,
2388        };
2389
2390        // Task 2: read the second and third row groups
2391        let task2 = FileScanTask {
2392            start: rg1_start,
2393            length: file_end - rg1_start,
2394            record_count: Some(200),
2395            data_file_path: file_path.clone(),
2396            data_file_format: DataFileFormat::Parquet,
2397            schema: schema.clone(),
2398            project_field_ids: vec![1],
2399            predicate: None,
2400            deletes: vec![],
2401            partition: None,
2402            partition_spec: None,
2403            name_mapping: None,
2404        };
2405
2406        let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
2407        let result1 = reader
2408            .clone()
2409            .read(tasks1)
2410            .unwrap()
2411            .try_collect::<Vec<RecordBatch>>()
2412            .await
2413            .unwrap();
2414
2415        let total_rows_task1: usize = result1.iter().map(|b| b.num_rows()).sum();
2416        println!(
2417            "Task 1 (bytes {}-{}) returned {} rows",
2418            rg0_start,
2419            rg0_start + row_group_0.compressed_size() as u64,
2420            total_rows_task1
2421        );
2422
2423        let tasks2 = Box::pin(futures::stream::iter(vec![Ok(task2)])) as FileScanTaskStream;
2424        let result2 = reader
2425            .read(tasks2)
2426            .unwrap()
2427            .try_collect::<Vec<RecordBatch>>()
2428            .await
2429            .unwrap();
2430
2431        let total_rows_task2: usize = result2.iter().map(|b| b.num_rows()).sum();
2432        println!("Task 2 (bytes {rg1_start}-{file_end}) returned {total_rows_task2} rows");
2433
2434        assert_eq!(
2435            total_rows_task1, 100,
2436            "Task 1 should read only the first row group (100 rows), but got {total_rows_task1} rows"
2437        );
2438
2439        assert_eq!(
2440            total_rows_task2, 200,
2441            "Task 2 should read only the second+third row groups (200 rows), but got {total_rows_task2} rows"
2442        );
2443
2444        // Verify the actual data values are correct (not just the row count)
2445        if total_rows_task1 > 0 {
2446            let first_batch = &result1[0];
2447            let id_col = first_batch
2448                .column(0)
2449                .as_primitive::<arrow_array::types::Int32Type>();
2450            let first_val = id_col.value(0);
2451            let last_val = id_col.value(id_col.len() - 1);
2452            println!("Task 1 data range: {first_val} to {last_val}");
2453
2454            assert_eq!(first_val, 0, "Task 1 should start with id=0");
2455            assert_eq!(last_val, 99, "Task 1 should end with id=99");
2456        }
2457
2458        if total_rows_task2 > 0 {
2459            let first_batch = &result2[0];
2460            let id_col = first_batch
2461                .column(0)
2462                .as_primitive::<arrow_array::types::Int32Type>();
2463            let first_val = id_col.value(0);
2464            println!("Task 2 first value: {first_val}");
2465
2466            assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0");
2467        }
2468    }
2469
2470    /// Test schema evolution: reading old Parquet file (with only column 'a')
2471    /// using a newer table schema (with columns 'a' and 'b').
2472    /// This tests that:
2473    /// 1. get_arrow_projection_mask allows missing columns
2474    /// 2. RecordBatchTransformer adds missing column 'b' with NULL values
2475    #[tokio::test]
2476    async fn test_schema_evolution_add_column() {
2477        use arrow_array::{Array, Int32Array};
2478
2479        // New table schema: columns 'a' and 'b' (b was added later, file only has 'a')
2480        let new_schema = Arc::new(
2481            Schema::builder()
2482                .with_schema_id(2)
2483                .with_fields(vec![
2484                    NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)).into(),
2485                    NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
2486                ])
2487                .build()
2488                .unwrap(),
2489        );
2490
2491        // Create Arrow schema for old Parquet file (only has column 'a')
2492        let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
2493            Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([(
2494                PARQUET_FIELD_ID_META_KEY.to_string(),
2495                "1".to_string(),
2496            )])),
2497        ]));
2498
2499        // Write old Parquet file with only column 'a'
2500        let tmp_dir = TempDir::new().unwrap();
2501        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2502        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2503
2504        let data_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
2505        let to_write = RecordBatch::try_new(arrow_schema_old.clone(), vec![data_a]).unwrap();
2506
2507        let props = WriterProperties::builder()
2508            .set_compression(Compression::SNAPPY)
2509            .build();
2510        let file = File::create(format!("{}/old_file.parquet", &table_location)).unwrap();
2511        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
2512        writer.write(&to_write).expect("Writing batch");
2513        writer.close().unwrap();
2514
2515        // Read the old Parquet file using the NEW schema (with column 'b')
2516        let reader = ArrowReaderBuilder::new(file_io).build();
2517        let tasks = Box::pin(futures::stream::iter(
2518            vec![Ok(FileScanTask {
2519                start: 0,
2520                length: 0,
2521                record_count: None,
2522                data_file_path: format!("{table_location}/old_file.parquet"),
2523                data_file_format: DataFileFormat::Parquet,
2524                schema: new_schema.clone(),
2525                project_field_ids: vec![1, 2], // Request both columns 'a' and 'b'
2526                predicate: None,
2527                deletes: vec![],
2528                partition: None,
2529                partition_spec: None,
2530                name_mapping: None,
2531            })]
2532            .into_iter(),
2533        )) as FileScanTaskStream;
2534
2535        let result = reader
2536            .read(tasks)
2537            .unwrap()
2538            .try_collect::<Vec<RecordBatch>>()
2539            .await
2540            .unwrap();
2541
2542        // Verify we got the correct data
2543        assert_eq!(result.len(), 1);
2544        let batch = &result[0];
2545
2546        // Should have 2 columns now
2547        assert_eq!(batch.num_columns(), 2);
2548        assert_eq!(batch.num_rows(), 3);
2549
2550        // Column 'a' should have the original data
2551        let col_a = batch
2552            .column(0)
2553            .as_primitive::<arrow_array::types::Int32Type>();
2554        assert_eq!(col_a.values(), &[1, 2, 3]);
2555
2556        // Column 'b' should be all NULLs (it didn't exist in the old file)
2557        let col_b = batch
2558            .column(1)
2559            .as_primitive::<arrow_array::types::Int32Type>();
2560        assert_eq!(col_b.null_count(), 3);
2561        assert!(col_b.is_null(0));
2562        assert!(col_b.is_null(1));
2563        assert!(col_b.is_null(2));
2564    }
2565
2566    /// Test for bug where position deletes in later row groups are not applied correctly.
2567    ///
2568    /// When a file has multiple row groups and a position delete targets a row in a later
2569    /// row group, the `build_deletes_row_selection` function had a bug where it would
2570    /// fail to increment `current_row_group_base_idx` when skipping row groups.
2571    ///
2572    /// This test creates:
2573    /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
2574    /// - A position delete file that deletes row 199 (last row in second row group)
2575    ///
2576    /// Expected behavior: Should return 199 rows (with id=200 deleted)
2577    /// Bug behavior: Returns 200 rows (delete is not applied)
2578    ///
2579    /// This bug was discovered while running Apache Spark + Apache Iceberg integration tests
2580    /// through DataFusion Comet. The following Iceberg Java tests failed due to this bug:
2581    /// - `org.apache.iceberg.spark.extensions.TestMergeOnReadDelete::testDeleteWithMultipleRowGroupsParquet`
2582    /// - `org.apache.iceberg.spark.extensions.TestMergeOnReadUpdate::testUpdateWithMultipleRowGroupsParquet`
2583    #[tokio::test]
2584    async fn test_position_delete_across_multiple_row_groups() {
2585        use arrow_array::{Int32Array, Int64Array};
2586        use parquet::file::reader::{FileReader, SerializedFileReader};
2587
2588        // Field IDs for positional delete schema
2589        const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2590        const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2591
2592        let tmp_dir = TempDir::new().unwrap();
2593        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2594
2595        // Create table schema with a single 'id' column
2596        let table_schema = Arc::new(
2597            Schema::builder()
2598                .with_schema_id(1)
2599                .with_fields(vec![
2600                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2601                ])
2602                .build()
2603                .unwrap(),
2604        );
2605
2606        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2607            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2608                PARQUET_FIELD_ID_META_KEY.to_string(),
2609                "1".to_string(),
2610            )])),
2611        ]));
2612
2613        // Step 1: Create data file with 200 rows in 2 row groups
2614        // Row group 0: rows 0-99 (ids 1-100)
2615        // Row group 1: rows 100-199 (ids 101-200)
2616        let data_file_path = format!("{}/data.parquet", &table_location);
2617
2618        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2619            Int32Array::from_iter_values(1..=100),
2620        )])
2621        .unwrap();
2622
2623        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2624            Int32Array::from_iter_values(101..=200),
2625        )])
2626        .unwrap();
2627
2628        // Force each batch into its own row group
2629        let props = WriterProperties::builder()
2630            .set_compression(Compression::SNAPPY)
2631            .set_max_row_group_size(100)
2632            .build();
2633
2634        let file = File::create(&data_file_path).unwrap();
2635        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2636        writer.write(&batch1).expect("Writing batch 1");
2637        writer.write(&batch2).expect("Writing batch 2");
2638        writer.close().unwrap();
2639
2640        // Verify we created 2 row groups
2641        let verify_file = File::open(&data_file_path).unwrap();
2642        let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2643        assert_eq!(
2644            verify_reader.metadata().num_row_groups(),
2645            2,
2646            "Should have 2 row groups"
2647        );
2648
2649        // Step 2: Create position delete file that deletes row 199 (id=200, last row in row group 1)
2650        let delete_file_path = format!("{}/deletes.parquet", &table_location);
2651
2652        let delete_schema = Arc::new(ArrowSchema::new(vec![
2653            Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2654                PARQUET_FIELD_ID_META_KEY.to_string(),
2655                FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2656            )])),
2657            Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2658                PARQUET_FIELD_ID_META_KEY.to_string(),
2659                FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2660            )])),
2661        ]));
2662
2663        // Delete row at position 199 (0-indexed, so it's the last row: id=200)
2664        let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2665            Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2666            Arc::new(Int64Array::from_iter_values(vec![199i64])),
2667        ])
2668        .unwrap();
2669
2670        let delete_props = WriterProperties::builder()
2671            .set_compression(Compression::SNAPPY)
2672            .build();
2673
2674        let delete_file = File::create(&delete_file_path).unwrap();
2675        let mut delete_writer =
2676            ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2677        delete_writer.write(&delete_batch).unwrap();
2678        delete_writer.close().unwrap();
2679
2680        // Step 3: Read the data file with the delete applied
2681        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2682        let reader = ArrowReaderBuilder::new(file_io).build();
2683
2684        let task = FileScanTask {
2685            start: 0,
2686            length: 0,
2687            record_count: Some(200),
2688            data_file_path: data_file_path.clone(),
2689            data_file_format: DataFileFormat::Parquet,
2690            schema: table_schema.clone(),
2691            project_field_ids: vec![1],
2692            predicate: None,
2693            deletes: vec![FileScanTaskDeleteFile {
2694                file_path: delete_file_path,
2695                file_type: DataContentType::PositionDeletes,
2696                partition_spec_id: 0,
2697                equality_ids: None,
2698            }],
2699            partition: None,
2700            partition_spec: None,
2701            name_mapping: None,
2702        };
2703
2704        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
2705        let result = reader
2706            .read(tasks)
2707            .unwrap()
2708            .try_collect::<Vec<RecordBatch>>()
2709            .await
2710            .unwrap();
2711
2712        // Step 4: Verify we got 199 rows (not 200)
2713        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
2714
2715        println!("Total rows read: {}", total_rows);
2716        println!("Expected: 199 rows (deleted row 199 which had id=200)");
2717
2718        // This assertion will FAIL before the fix and PASS after the fix
2719        assert_eq!(
2720            total_rows, 199,
2721            "Expected 199 rows after deleting row 199, but got {} rows. \
2722             The bug causes position deletes in later row groups to be ignored.",
2723            total_rows
2724        );
2725
2726        // Verify the deleted row (id=200) is not present
2727        let all_ids: Vec<i32> = result
2728            .iter()
2729            .flat_map(|batch| {
2730                batch
2731                    .column(0)
2732                    .as_primitive::<arrow_array::types::Int32Type>()
2733                    .values()
2734                    .iter()
2735                    .copied()
2736            })
2737            .collect();
2738
2739        assert!(
2740            !all_ids.contains(&200),
2741            "Row with id=200 should be deleted but was found in results"
2742        );
2743
2744        // Verify we have all other ids (1-199)
2745        let expected_ids: Vec<i32> = (1..=199).collect();
2746        assert_eq!(
2747            all_ids, expected_ids,
2748            "Should have ids 1-199 but got different values"
2749        );
2750    }
2751
2752    /// Test for bug where position deletes are lost when skipping unselected row groups.
2753    ///
2754    /// This is a variant of `test_position_delete_across_multiple_row_groups` that exercises
2755    /// the row group selection code path (`selected_row_groups: Some([...])`).
2756    ///
2757    /// When a file has multiple row groups and only some are selected for reading,
2758    /// the `build_deletes_row_selection` function must correctly skip over deletes in
2759    /// unselected row groups WITHOUT consuming deletes that belong to selected row groups.
2760    ///
2761    /// This test creates:
2762    /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
2763    /// - A position delete file that deletes row 199 (last row in second row group)
2764    /// - Row group selection that reads ONLY row group 1 (rows 100-199)
2765    ///
2766    /// Expected behavior: Should return 99 rows (with row 199 deleted)
2767    /// Bug behavior: Returns 100 rows (delete is lost when skipping row group 0)
2768    ///
2769    /// The bug occurs when processing row group 0 (unselected):
2770    /// ```rust
2771    /// delete_vector_iter.advance_to(next_row_group_base_idx); // Position at first delete >= 100
2772    /// next_deleted_row_idx_opt = delete_vector_iter.next(); // BUG: Consumes delete at 199!
2773    /// ```
2774    ///
2775    /// The fix is to NOT call `next()` after `advance_to()` when skipping unselected row groups,
2776    /// because `advance_to()` already positions the iterator correctly without consuming elements.
2777    #[tokio::test]
2778    async fn test_position_delete_with_row_group_selection() {
2779        use arrow_array::{Int32Array, Int64Array};
2780        use parquet::file::reader::{FileReader, SerializedFileReader};
2781
2782        // Field IDs for positional delete schema
2783        const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2784        const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2785
2786        let tmp_dir = TempDir::new().unwrap();
2787        let table_location = tmp_dir.path().to_str().unwrap().to_string();
2788
2789        // Create table schema with a single 'id' column
2790        let table_schema = Arc::new(
2791            Schema::builder()
2792                .with_schema_id(1)
2793                .with_fields(vec![
2794                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2795                ])
2796                .build()
2797                .unwrap(),
2798        );
2799
2800        let arrow_schema = Arc::new(ArrowSchema::new(vec![
2801            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2802                PARQUET_FIELD_ID_META_KEY.to_string(),
2803                "1".to_string(),
2804            )])),
2805        ]));
2806
2807        // Step 1: Create data file with 200 rows in 2 row groups
2808        // Row group 0: rows 0-99 (ids 1-100)
2809        // Row group 1: rows 100-199 (ids 101-200)
2810        let data_file_path = format!("{}/data.parquet", &table_location);
2811
2812        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2813            Int32Array::from_iter_values(1..=100),
2814        )])
2815        .unwrap();
2816
2817        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2818            Int32Array::from_iter_values(101..=200),
2819        )])
2820        .unwrap();
2821
2822        // Force each batch into its own row group
2823        let props = WriterProperties::builder()
2824            .set_compression(Compression::SNAPPY)
2825            .set_max_row_group_size(100)
2826            .build();
2827
2828        let file = File::create(&data_file_path).unwrap();
2829        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2830        writer.write(&batch1).expect("Writing batch 1");
2831        writer.write(&batch2).expect("Writing batch 2");
2832        writer.close().unwrap();
2833
2834        // Verify we created 2 row groups
2835        let verify_file = File::open(&data_file_path).unwrap();
2836        let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2837        assert_eq!(
2838            verify_reader.metadata().num_row_groups(),
2839            2,
2840            "Should have 2 row groups"
2841        );
2842
2843        // Step 2: Create position delete file that deletes row 199 (id=200, last row in row group 1)
2844        let delete_file_path = format!("{}/deletes.parquet", &table_location);
2845
2846        let delete_schema = Arc::new(ArrowSchema::new(vec![
2847            Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2848                PARQUET_FIELD_ID_META_KEY.to_string(),
2849                FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2850            )])),
2851            Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2852                PARQUET_FIELD_ID_META_KEY.to_string(),
2853                FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2854            )])),
2855        ]));
2856
2857        // Delete row at position 199 (0-indexed, so it's the last row: id=200)
2858        let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2859            Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2860            Arc::new(Int64Array::from_iter_values(vec![199i64])),
2861        ])
2862        .unwrap();
2863
2864        let delete_props = WriterProperties::builder()
2865            .set_compression(Compression::SNAPPY)
2866            .build();
2867
2868        let delete_file = File::create(&delete_file_path).unwrap();
2869        let mut delete_writer =
2870            ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2871        delete_writer.write(&delete_batch).unwrap();
2872        delete_writer.close().unwrap();
2873
2874        // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199)
2875        // This exercises the row group selection code path where row group 0 is skipped
2876        let metadata_file = File::open(&data_file_path).unwrap();
2877        let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
2878        let metadata = metadata_reader.metadata();
2879
2880        let row_group_0 = metadata.row_group(0);
2881        let row_group_1 = metadata.row_group(1);
2882
2883        let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
2884        let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2885        let rg1_length = row_group_1.compressed_size() as u64;
2886
2887        println!(
2888            "Row group 0: starts at byte {}, {} bytes compressed",
2889            rg0_start,
2890            row_group_0.compressed_size()
2891        );
2892        println!(
2893            "Row group 1: starts at byte {}, {} bytes compressed",
2894            rg1_start,
2895            row_group_1.compressed_size()
2896        );
2897
2898        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2899        let reader = ArrowReaderBuilder::new(file_io).build();
2900
2901        // Create FileScanTask that reads ONLY row group 1 via byte range filtering
2902        let task = FileScanTask {
2903            start: rg1_start,
2904            length: rg1_length,
2905            record_count: Some(100), // Row group 1 has 100 rows
2906            data_file_path: data_file_path.clone(),
2907            data_file_format: DataFileFormat::Parquet,
2908            schema: table_schema.clone(),
2909            project_field_ids: vec![1],
2910            predicate: None,
2911            deletes: vec![FileScanTaskDeleteFile {
2912                file_path: delete_file_path,
2913                file_type: DataContentType::PositionDeletes,
2914                partition_spec_id: 0,
2915                equality_ids: None,
2916            }],
2917            partition: None,
2918            partition_spec: None,
2919            name_mapping: None,
2920        };
2921
2922        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
2923        let result = reader
2924            .read(tasks)
2925            .unwrap()
2926            .try_collect::<Vec<RecordBatch>>()
2927            .await
2928            .unwrap();
2929
2930        // Step 4: Verify we got 99 rows (not 100)
2931        // Row group 1 has 100 rows (ids 101-200), minus 1 delete (id=200) = 99 rows
2932        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
2933
2934        println!("Total rows read from row group 1: {}", total_rows);
2935        println!("Expected: 99 rows (row group 1 has 100 rows, 1 delete at position 199)");
2936
2937        // This assertion will FAIL before the fix and PASS after the fix
2938        assert_eq!(
2939            total_rows, 99,
2940            "Expected 99 rows from row group 1 after deleting position 199, but got {} rows. \
2941             The bug causes position deletes to be lost when advance_to() is followed by next() \
2942             when skipping unselected row groups.",
2943            total_rows
2944        );
2945
2946        // Verify the deleted row (id=200) is not present
2947        let all_ids: Vec<i32> = result
2948            .iter()
2949            .flat_map(|batch| {
2950                batch
2951                    .column(0)
2952                    .as_primitive::<arrow_array::types::Int32Type>()
2953                    .values()
2954                    .iter()
2955                    .copied()
2956            })
2957            .collect();
2958
2959        assert!(
2960            !all_ids.contains(&200),
2961            "Row with id=200 should be deleted but was found in results"
2962        );
2963
2964        // Verify we have ids 101-199 (not 101-200)
2965        let expected_ids: Vec<i32> = (101..=199).collect();
2966        assert_eq!(
2967            all_ids, expected_ids,
2968            "Should have ids 101-199 but got different values"
2969        );
2970    }
2971    /// Test for bug where stale cached delete causes infinite loop when skipping row groups.
2972    ///
2973    /// This test exposes the inverse scenario of `test_position_delete_with_row_group_selection`:
2974    /// - Position delete targets a row in the SKIPPED row group (not the selected one)
2975    /// - After calling advance_to(), the cached delete index is stale
2976    /// - Without updating the cache, the code enters an infinite loop
2977    ///
2978    /// This test creates:
2979    /// - A data file with 200 rows split into 2 row groups (0-99, 100-199)
2980    /// - A position delete file that deletes row 0 (first row in SKIPPED row group 0)
2981    /// - Row group selection that reads ONLY row group 1 (rows 100-199)
2982    ///
2983    /// The bug occurs when skipping row group 0:
2984    /// ```rust
2985    /// let mut next_deleted_row_idx_opt = delete_vector_iter.next(); // Some(0)
2986    /// // ... skip to row group 1 ...
2987    /// delete_vector_iter.advance_to(100); // Iterator advances past delete at 0
2988    /// // BUG: next_deleted_row_idx_opt is still Some(0) - STALE!
2989    /// // When processing row group 1:
2990    /// //   current_idx = 100, next_deleted_row_idx = 0, next_row_group_base_idx = 200
2991    /// //   Loop condition: 0 < 200 (true)
2992    /// //   But: current_idx (100) > next_deleted_row_idx (0)
2993    /// //   And: current_idx (100) != next_deleted_row_idx (0)
2994    /// //   Neither branch executes -> INFINITE LOOP!
2995    /// ```
2996    ///
2997    /// Expected behavior: Should return 100 rows (delete at 0 doesn't affect row group 1)
2998    /// Bug behavior: Infinite loop in build_deletes_row_selection
2999    #[tokio::test]
3000    async fn test_position_delete_in_skipped_row_group() {
3001        use arrow_array::{Int32Array, Int64Array};
3002        use parquet::file::reader::{FileReader, SerializedFileReader};
3003
3004        // Field IDs for positional delete schema
3005        const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
3006        const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
3007
3008        let tmp_dir = TempDir::new().unwrap();
3009        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3010
3011        // Create table schema with a single 'id' column
3012        let table_schema = Arc::new(
3013            Schema::builder()
3014                .with_schema_id(1)
3015                .with_fields(vec![
3016                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3017                ])
3018                .build()
3019                .unwrap(),
3020        );
3021
3022        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3023            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3024                PARQUET_FIELD_ID_META_KEY.to_string(),
3025                "1".to_string(),
3026            )])),
3027        ]));
3028
3029        // Step 1: Create data file with 200 rows in 2 row groups
3030        // Row group 0: rows 0-99 (ids 1-100)
3031        // Row group 1: rows 100-199 (ids 101-200)
3032        let data_file_path = format!("{}/data.parquet", &table_location);
3033
3034        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3035            Int32Array::from_iter_values(1..=100),
3036        )])
3037        .unwrap();
3038
3039        let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3040            Int32Array::from_iter_values(101..=200),
3041        )])
3042        .unwrap();
3043
3044        // Force each batch into its own row group
3045        let props = WriterProperties::builder()
3046            .set_compression(Compression::SNAPPY)
3047            .set_max_row_group_size(100)
3048            .build();
3049
3050        let file = File::create(&data_file_path).unwrap();
3051        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3052        writer.write(&batch1).expect("Writing batch 1");
3053        writer.write(&batch2).expect("Writing batch 2");
3054        writer.close().unwrap();
3055
3056        // Verify we created 2 row groups
3057        let verify_file = File::open(&data_file_path).unwrap();
3058        let verify_reader = SerializedFileReader::new(verify_file).unwrap();
3059        assert_eq!(
3060            verify_reader.metadata().num_row_groups(),
3061            2,
3062            "Should have 2 row groups"
3063        );
3064
3065        // Step 2: Create position delete file that deletes row 0 (id=1, first row in row group 0)
3066        let delete_file_path = format!("{}/deletes.parquet", &table_location);
3067
3068        let delete_schema = Arc::new(ArrowSchema::new(vec![
3069            Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
3070                PARQUET_FIELD_ID_META_KEY.to_string(),
3071                FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
3072            )])),
3073            Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
3074                PARQUET_FIELD_ID_META_KEY.to_string(),
3075                FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
3076            )])),
3077        ]));
3078
3079        // Delete row at position 0 (0-indexed, so it's the first row: id=1)
3080        let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
3081            Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
3082            Arc::new(Int64Array::from_iter_values(vec![0i64])),
3083        ])
3084        .unwrap();
3085
3086        let delete_props = WriterProperties::builder()
3087            .set_compression(Compression::SNAPPY)
3088            .build();
3089
3090        let delete_file = File::create(&delete_file_path).unwrap();
3091        let mut delete_writer =
3092            ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
3093        delete_writer.write(&delete_batch).unwrap();
3094        delete_writer.close().unwrap();
3095
3096        // Step 3: Get byte ranges to read ONLY row group 1 (rows 100-199)
3097        // This exercises the row group selection code path where row group 0 is skipped
3098        let metadata_file = File::open(&data_file_path).unwrap();
3099        let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
3100        let metadata = metadata_reader.metadata();
3101
3102        let row_group_0 = metadata.row_group(0);
3103        let row_group_1 = metadata.row_group(1);
3104
3105        let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
3106        let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
3107        let rg1_length = row_group_1.compressed_size() as u64;
3108
3109        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3110        let reader = ArrowReaderBuilder::new(file_io).build();
3111
3112        // Create FileScanTask that reads ONLY row group 1 via byte range filtering
3113        let task = FileScanTask {
3114            start: rg1_start,
3115            length: rg1_length,
3116            record_count: Some(100), // Row group 1 has 100 rows
3117            data_file_path: data_file_path.clone(),
3118            data_file_format: DataFileFormat::Parquet,
3119            schema: table_schema.clone(),
3120            project_field_ids: vec![1],
3121            predicate: None,
3122            deletes: vec![FileScanTaskDeleteFile {
3123                file_path: delete_file_path,
3124                file_type: DataContentType::PositionDeletes,
3125                partition_spec_id: 0,
3126                equality_ids: None,
3127            }],
3128            partition: None,
3129            partition_spec: None,
3130            name_mapping: None,
3131        };
3132
3133        let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
3134        let result = reader
3135            .read(tasks)
3136            .unwrap()
3137            .try_collect::<Vec<RecordBatch>>()
3138            .await
3139            .unwrap();
3140
3141        // Step 4: Verify we got 100 rows (all of row group 1)
3142        // The delete at position 0 is in row group 0, which is skipped, so it doesn't affect us
3143        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
3144
3145        assert_eq!(
3146            total_rows, 100,
3147            "Expected 100 rows from row group 1 (delete at position 0 is in skipped row group 0). \
3148             If this hangs or fails, it indicates the cached delete index was not updated after advance_to()."
3149        );
3150
3151        // Verify we have all ids from row group 1 (101-200)
3152        let all_ids: Vec<i32> = result
3153            .iter()
3154            .flat_map(|batch| {
3155                batch
3156                    .column(0)
3157                    .as_primitive::<arrow_array::types::Int32Type>()
3158                    .values()
3159                    .iter()
3160                    .copied()
3161            })
3162            .collect();
3163
3164        let expected_ids: Vec<i32> = (101..=200).collect();
3165        assert_eq!(
3166            all_ids, expected_ids,
3167            "Should have ids 101-200 (all of row group 1)"
3168        );
3169    }
3170
3171    /// Test reading Parquet files without field ID metadata (e.g., migrated tables).
3172    /// This exercises the position-based fallback path.
3173    ///
3174    /// Corresponds to Java's ParquetSchemaUtil.addFallbackIds() + pruneColumnsFallback()
3175    /// in /parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java
3176    #[tokio::test]
3177    async fn test_read_parquet_file_without_field_ids() {
3178        let schema = Arc::new(
3179            Schema::builder()
3180                .with_schema_id(1)
3181                .with_fields(vec![
3182                    NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3183                    NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3184                ])
3185                .build()
3186                .unwrap(),
3187        );
3188
3189        // Parquet file from a migrated table - no field ID metadata
3190        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3191            Field::new("name", DataType::Utf8, false),
3192            Field::new("age", DataType::Int32, false),
3193        ]));
3194
3195        let tmp_dir = TempDir::new().unwrap();
3196        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3197        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3198
3199        let name_data = vec!["Alice", "Bob", "Charlie"];
3200        let age_data = vec![30, 25, 35];
3201
3202        use arrow_array::Int32Array;
3203        let name_col = Arc::new(StringArray::from(name_data.clone())) as ArrayRef;
3204        let age_col = Arc::new(Int32Array::from(age_data.clone())) as ArrayRef;
3205
3206        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![name_col, age_col]).unwrap();
3207
3208        let props = WriterProperties::builder()
3209            .set_compression(Compression::SNAPPY)
3210            .build();
3211
3212        let file = File::create(format!("{}/1.parquet", &table_location)).unwrap();
3213        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3214
3215        writer.write(&to_write).expect("Writing batch");
3216        writer.close().unwrap();
3217
3218        let reader = ArrowReaderBuilder::new(file_io).build();
3219
3220        let tasks = Box::pin(futures::stream::iter(
3221            vec![Ok(FileScanTask {
3222                start: 0,
3223                length: 0,
3224                record_count: None,
3225                data_file_path: format!("{}/1.parquet", table_location),
3226                data_file_format: DataFileFormat::Parquet,
3227                schema: schema.clone(),
3228                project_field_ids: vec![1, 2],
3229                predicate: None,
3230                deletes: vec![],
3231                partition: None,
3232                partition_spec: None,
3233                name_mapping: None,
3234            })]
3235            .into_iter(),
3236        )) as FileScanTaskStream;
3237
3238        let result = reader
3239            .read(tasks)
3240            .unwrap()
3241            .try_collect::<Vec<RecordBatch>>()
3242            .await
3243            .unwrap();
3244
3245        assert_eq!(result.len(), 1);
3246        let batch = &result[0];
3247        assert_eq!(batch.num_rows(), 3);
3248        assert_eq!(batch.num_columns(), 2);
3249
3250        // Verify position-based mapping: field_id 1 → position 0, field_id 2 → position 1
3251        let name_array = batch.column(0).as_string::<i32>();
3252        assert_eq!(name_array.value(0), "Alice");
3253        assert_eq!(name_array.value(1), "Bob");
3254        assert_eq!(name_array.value(2), "Charlie");
3255
3256        let age_array = batch
3257            .column(1)
3258            .as_primitive::<arrow_array::types::Int32Type>();
3259        assert_eq!(age_array.value(0), 30);
3260        assert_eq!(age_array.value(1), 25);
3261        assert_eq!(age_array.value(2), 35);
3262    }
3263
3264    /// Test reading Parquet files without field IDs with partial projection.
3265    /// Only a subset of columns are requested, verifying position-based fallback
3266    /// handles column selection correctly.
3267    #[tokio::test]
3268    async fn test_read_parquet_without_field_ids_partial_projection() {
3269        use arrow_array::Int32Array;
3270
3271        let schema = Arc::new(
3272            Schema::builder()
3273                .with_schema_id(1)
3274                .with_fields(vec![
3275                    NestedField::required(1, "col1", Type::Primitive(PrimitiveType::String)).into(),
3276                    NestedField::required(2, "col2", Type::Primitive(PrimitiveType::Int)).into(),
3277                    NestedField::required(3, "col3", Type::Primitive(PrimitiveType::String)).into(),
3278                    NestedField::required(4, "col4", Type::Primitive(PrimitiveType::Int)).into(),
3279                ])
3280                .build()
3281                .unwrap(),
3282        );
3283
3284        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3285            Field::new("col1", DataType::Utf8, false),
3286            Field::new("col2", DataType::Int32, false),
3287            Field::new("col3", DataType::Utf8, false),
3288            Field::new("col4", DataType::Int32, false),
3289        ]));
3290
3291        let tmp_dir = TempDir::new().unwrap();
3292        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3293        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3294
3295        let col1_data = Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef;
3296        let col2_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3297        let col3_data = Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef;
3298        let col4_data = Arc::new(Int32Array::from(vec![30, 40])) as ArrayRef;
3299
3300        let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
3301            col1_data, col2_data, col3_data, col4_data,
3302        ])
3303        .unwrap();
3304
3305        let props = WriterProperties::builder()
3306            .set_compression(Compression::SNAPPY)
3307            .build();
3308
3309        let file = File::create(format!("{}/1.parquet", &table_location)).unwrap();
3310        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3311
3312        writer.write(&to_write).expect("Writing batch");
3313        writer.close().unwrap();
3314
3315        let reader = ArrowReaderBuilder::new(file_io).build();
3316
3317        let tasks = Box::pin(futures::stream::iter(
3318            vec![Ok(FileScanTask {
3319                start: 0,
3320                length: 0,
3321                record_count: None,
3322                data_file_path: format!("{}/1.parquet", table_location),
3323                data_file_format: DataFileFormat::Parquet,
3324                schema: schema.clone(),
3325                project_field_ids: vec![1, 3],
3326                predicate: None,
3327                deletes: vec![],
3328                partition: None,
3329                partition_spec: None,
3330                name_mapping: None,
3331            })]
3332            .into_iter(),
3333        )) as FileScanTaskStream;
3334
3335        let result = reader
3336            .read(tasks)
3337            .unwrap()
3338            .try_collect::<Vec<RecordBatch>>()
3339            .await
3340            .unwrap();
3341
3342        assert_eq!(result.len(), 1);
3343        let batch = &result[0];
3344        assert_eq!(batch.num_rows(), 2);
3345        assert_eq!(batch.num_columns(), 2);
3346
3347        let col1_array = batch.column(0).as_string::<i32>();
3348        assert_eq!(col1_array.value(0), "a");
3349        assert_eq!(col1_array.value(1), "b");
3350
3351        let col3_array = batch.column(1).as_string::<i32>();
3352        assert_eq!(col3_array.value(0), "c");
3353        assert_eq!(col3_array.value(1), "d");
3354    }
3355
3356    /// Test reading Parquet files without field IDs with schema evolution.
3357    /// The Iceberg schema has more fields than the Parquet file, testing that
3358    /// missing columns are filled with NULLs.
3359    #[tokio::test]
3360    async fn test_read_parquet_without_field_ids_schema_evolution() {
3361        use arrow_array::{Array, Int32Array};
3362
3363        // Schema with field 3 added after the file was written
3364        let schema = Arc::new(
3365            Schema::builder()
3366                .with_schema_id(1)
3367                .with_fields(vec![
3368                    NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3369                    NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3370                    NestedField::optional(3, "city", Type::Primitive(PrimitiveType::String)).into(),
3371                ])
3372                .build()
3373                .unwrap(),
3374        );
3375
3376        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3377            Field::new("name", DataType::Utf8, false),
3378            Field::new("age", DataType::Int32, false),
3379        ]));
3380
3381        let tmp_dir = TempDir::new().unwrap();
3382        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3383        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3384
3385        let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3386        let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3387
3388        let to_write =
3389            RecordBatch::try_new(arrow_schema.clone(), vec![name_data, age_data]).unwrap();
3390
3391        let props = WriterProperties::builder()
3392            .set_compression(Compression::SNAPPY)
3393            .build();
3394
3395        let file = File::create(format!("{}/1.parquet", &table_location)).unwrap();
3396        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3397
3398        writer.write(&to_write).expect("Writing batch");
3399        writer.close().unwrap();
3400
3401        let reader = ArrowReaderBuilder::new(file_io).build();
3402
3403        let tasks = Box::pin(futures::stream::iter(
3404            vec![Ok(FileScanTask {
3405                start: 0,
3406                length: 0,
3407                record_count: None,
3408                data_file_path: format!("{}/1.parquet", table_location),
3409                data_file_format: DataFileFormat::Parquet,
3410                schema: schema.clone(),
3411                project_field_ids: vec![1, 2, 3],
3412                predicate: None,
3413                deletes: vec![],
3414                partition: None,
3415                partition_spec: None,
3416                name_mapping: None,
3417            })]
3418            .into_iter(),
3419        )) as FileScanTaskStream;
3420
3421        let result = reader
3422            .read(tasks)
3423            .unwrap()
3424            .try_collect::<Vec<RecordBatch>>()
3425            .await
3426            .unwrap();
3427
3428        assert_eq!(result.len(), 1);
3429        let batch = &result[0];
3430        assert_eq!(batch.num_rows(), 2);
3431        assert_eq!(batch.num_columns(), 3);
3432
3433        let name_array = batch.column(0).as_string::<i32>();
3434        assert_eq!(name_array.value(0), "Alice");
3435        assert_eq!(name_array.value(1), "Bob");
3436
3437        let age_array = batch
3438            .column(1)
3439            .as_primitive::<arrow_array::types::Int32Type>();
3440        assert_eq!(age_array.value(0), 30);
3441        assert_eq!(age_array.value(1), 25);
3442
3443        // Verify missing column filled with NULLs
3444        let city_array = batch.column(2).as_string::<i32>();
3445        assert_eq!(city_array.null_count(), 2);
3446        assert!(city_array.is_null(0));
3447        assert!(city_array.is_null(1));
3448    }
3449
3450    /// Test reading Parquet files without field IDs that have multiple row groups.
3451    /// This ensures the position-based fallback works correctly across row group boundaries.
3452    #[tokio::test]
3453    async fn test_read_parquet_without_field_ids_multiple_row_groups() {
3454        use arrow_array::Int32Array;
3455
3456        let schema = Arc::new(
3457            Schema::builder()
3458                .with_schema_id(1)
3459                .with_fields(vec![
3460                    NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3461                    NestedField::required(2, "value", Type::Primitive(PrimitiveType::Int)).into(),
3462                ])
3463                .build()
3464                .unwrap(),
3465        );
3466
3467        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3468            Field::new("name", DataType::Utf8, false),
3469            Field::new("value", DataType::Int32, false),
3470        ]));
3471
3472        let tmp_dir = TempDir::new().unwrap();
3473        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3474        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3475
3476        // Small row group size to create multiple row groups
3477        let props = WriterProperties::builder()
3478            .set_compression(Compression::SNAPPY)
3479            .set_write_batch_size(2)
3480            .set_max_row_group_size(2)
3481            .build();
3482
3483        let file = File::create(format!("{}/1.parquet", &table_location)).unwrap();
3484        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3485
3486        // Write 6 rows in 3 batches (will create 3 row groups)
3487        for batch_num in 0..3 {
3488            let name_data = Arc::new(StringArray::from(vec![
3489                format!("name_{}", batch_num * 2),
3490                format!("name_{}", batch_num * 2 + 1),
3491            ])) as ArrayRef;
3492            let value_data =
3493                Arc::new(Int32Array::from(vec![batch_num * 2, batch_num * 2 + 1])) as ArrayRef;
3494
3495            let batch =
3496                RecordBatch::try_new(arrow_schema.clone(), vec![name_data, value_data]).unwrap();
3497            writer.write(&batch).expect("Writing batch");
3498        }
3499        writer.close().unwrap();
3500
3501        let reader = ArrowReaderBuilder::new(file_io).build();
3502
3503        let tasks = Box::pin(futures::stream::iter(
3504            vec![Ok(FileScanTask {
3505                start: 0,
3506                length: 0,
3507                record_count: None,
3508                data_file_path: format!("{}/1.parquet", table_location),
3509                data_file_format: DataFileFormat::Parquet,
3510                schema: schema.clone(),
3511                project_field_ids: vec![1, 2],
3512                predicate: None,
3513                deletes: vec![],
3514                partition: None,
3515                partition_spec: None,
3516                name_mapping: None,
3517            })]
3518            .into_iter(),
3519        )) as FileScanTaskStream;
3520
3521        let result = reader
3522            .read(tasks)
3523            .unwrap()
3524            .try_collect::<Vec<RecordBatch>>()
3525            .await
3526            .unwrap();
3527
3528        assert!(!result.is_empty());
3529
3530        let mut all_names = Vec::new();
3531        let mut all_values = Vec::new();
3532
3533        for batch in &result {
3534            let name_array = batch.column(0).as_string::<i32>();
3535            let value_array = batch
3536                .column(1)
3537                .as_primitive::<arrow_array::types::Int32Type>();
3538
3539            for i in 0..batch.num_rows() {
3540                all_names.push(name_array.value(i).to_string());
3541                all_values.push(value_array.value(i));
3542            }
3543        }
3544
3545        assert_eq!(all_names.len(), 6);
3546        assert_eq!(all_values.len(), 6);
3547
3548        for i in 0..6 {
3549            assert_eq!(all_names[i], format!("name_{}", i));
3550            assert_eq!(all_values[i], i as i32);
3551        }
3552    }
3553
3554    /// Test reading Parquet files without field IDs with nested types (struct).
3555    /// Java's pruneColumnsFallback() projects entire top-level columns including nested content.
3556    /// This test verifies that a top-level struct field is projected correctly with all its nested fields.
3557    #[tokio::test]
3558    async fn test_read_parquet_without_field_ids_with_struct() {
3559        use arrow_array::{Int32Array, StructArray};
3560        use arrow_schema::Fields;
3561
3562        let schema = Arc::new(
3563            Schema::builder()
3564                .with_schema_id(1)
3565                .with_fields(vec![
3566                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3567                    NestedField::required(
3568                        2,
3569                        "person",
3570                        Type::Struct(crate::spec::StructType::new(vec![
3571                            NestedField::required(
3572                                3,
3573                                "name",
3574                                Type::Primitive(PrimitiveType::String),
3575                            )
3576                            .into(),
3577                            NestedField::required(4, "age", Type::Primitive(PrimitiveType::Int))
3578                                .into(),
3579                        ])),
3580                    )
3581                    .into(),
3582                ])
3583                .build()
3584                .unwrap(),
3585        );
3586
3587        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3588            Field::new("id", DataType::Int32, false),
3589            Field::new(
3590                "person",
3591                DataType::Struct(Fields::from(vec![
3592                    Field::new("name", DataType::Utf8, false),
3593                    Field::new("age", DataType::Int32, false),
3594                ])),
3595                false,
3596            ),
3597        ]));
3598
3599        let tmp_dir = TempDir::new().unwrap();
3600        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3601        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3602
3603        let id_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3604        let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3605        let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3606        let person_data = Arc::new(StructArray::from(vec![
3607            (
3608                Arc::new(Field::new("name", DataType::Utf8, false)),
3609                name_data,
3610            ),
3611            (
3612                Arc::new(Field::new("age", DataType::Int32, false)),
3613                age_data,
3614            ),
3615        ])) as ArrayRef;
3616
3617        let to_write =
3618            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, person_data]).unwrap();
3619
3620        let props = WriterProperties::builder()
3621            .set_compression(Compression::SNAPPY)
3622            .build();
3623
3624        let file = File::create(format!("{}/1.parquet", &table_location)).unwrap();
3625        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3626
3627        writer.write(&to_write).expect("Writing batch");
3628        writer.close().unwrap();
3629
3630        let reader = ArrowReaderBuilder::new(file_io).build();
3631
3632        let tasks = Box::pin(futures::stream::iter(
3633            vec![Ok(FileScanTask {
3634                start: 0,
3635                length: 0,
3636                record_count: None,
3637                data_file_path: format!("{}/1.parquet", table_location),
3638                data_file_format: DataFileFormat::Parquet,
3639                schema: schema.clone(),
3640                project_field_ids: vec![1, 2],
3641                predicate: None,
3642                deletes: vec![],
3643                partition: None,
3644                partition_spec: None,
3645                name_mapping: None,
3646            })]
3647            .into_iter(),
3648        )) as FileScanTaskStream;
3649
3650        let result = reader
3651            .read(tasks)
3652            .unwrap()
3653            .try_collect::<Vec<RecordBatch>>()
3654            .await
3655            .unwrap();
3656
3657        assert_eq!(result.len(), 1);
3658        let batch = &result[0];
3659        assert_eq!(batch.num_rows(), 2);
3660        assert_eq!(batch.num_columns(), 2);
3661
3662        let id_array = batch
3663            .column(0)
3664            .as_primitive::<arrow_array::types::Int32Type>();
3665        assert_eq!(id_array.value(0), 1);
3666        assert_eq!(id_array.value(1), 2);
3667
3668        let person_array = batch.column(1).as_struct();
3669        assert_eq!(person_array.num_columns(), 2);
3670
3671        let name_array = person_array.column(0).as_string::<i32>();
3672        assert_eq!(name_array.value(0), "Alice");
3673        assert_eq!(name_array.value(1), "Bob");
3674
3675        let age_array = person_array
3676            .column(1)
3677            .as_primitive::<arrow_array::types::Int32Type>();
3678        assert_eq!(age_array.value(0), 30);
3679        assert_eq!(age_array.value(1), 25);
3680    }
3681
3682    /// Test reading Parquet files without field IDs with schema evolution - column added in the middle.
3683    /// When a new column is inserted between existing columns in the schema order,
3684    /// the fallback projection must correctly map field IDs to output positions.
3685    #[tokio::test]
3686    async fn test_read_parquet_without_field_ids_schema_evolution_add_column_in_middle() {
3687        use arrow_array::{Array, Int32Array};
3688
3689        let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
3690            Field::new("col0", DataType::Int32, true),
3691            Field::new("col1", DataType::Int32, true),
3692        ]));
3693
3694        // New column added between existing columns: col0 (id=1), newCol (id=5), col1 (id=2)
3695        let schema = Arc::new(
3696            Schema::builder()
3697                .with_schema_id(1)
3698                .with_fields(vec![
3699                    NestedField::optional(1, "col0", Type::Primitive(PrimitiveType::Int)).into(),
3700                    NestedField::optional(5, "newCol", Type::Primitive(PrimitiveType::Int)).into(),
3701                    NestedField::optional(2, "col1", Type::Primitive(PrimitiveType::Int)).into(),
3702                ])
3703                .build()
3704                .unwrap(),
3705        );
3706
3707        let tmp_dir = TempDir::new().unwrap();
3708        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3709        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3710
3711        let col0_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3712        let col1_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3713
3714        let to_write =
3715            RecordBatch::try_new(arrow_schema_old.clone(), vec![col0_data, col1_data]).unwrap();
3716
3717        let props = WriterProperties::builder()
3718            .set_compression(Compression::SNAPPY)
3719            .build();
3720
3721        let file = File::create(format!("{}/1.parquet", &table_location)).unwrap();
3722        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3723        writer.write(&to_write).expect("Writing batch");
3724        writer.close().unwrap();
3725
3726        let reader = ArrowReaderBuilder::new(file_io).build();
3727
3728        let tasks = Box::pin(futures::stream::iter(
3729            vec![Ok(FileScanTask {
3730                start: 0,
3731                length: 0,
3732                record_count: None,
3733                data_file_path: format!("{}/1.parquet", table_location),
3734                data_file_format: DataFileFormat::Parquet,
3735                schema: schema.clone(),
3736                project_field_ids: vec![1, 5, 2],
3737                predicate: None,
3738                deletes: vec![],
3739                partition: None,
3740                partition_spec: None,
3741                name_mapping: None,
3742            })]
3743            .into_iter(),
3744        )) as FileScanTaskStream;
3745
3746        let result = reader
3747            .read(tasks)
3748            .unwrap()
3749            .try_collect::<Vec<RecordBatch>>()
3750            .await
3751            .unwrap();
3752
3753        assert_eq!(result.len(), 1);
3754        let batch = &result[0];
3755        assert_eq!(batch.num_rows(), 2);
3756        assert_eq!(batch.num_columns(), 3);
3757
3758        let result_col0 = batch
3759            .column(0)
3760            .as_primitive::<arrow_array::types::Int32Type>();
3761        assert_eq!(result_col0.value(0), 1);
3762        assert_eq!(result_col0.value(1), 2);
3763
3764        // New column should be NULL (doesn't exist in old file)
3765        let result_newcol = batch
3766            .column(1)
3767            .as_primitive::<arrow_array::types::Int32Type>();
3768        assert_eq!(result_newcol.null_count(), 2);
3769        assert!(result_newcol.is_null(0));
3770        assert!(result_newcol.is_null(1));
3771
3772        let result_col1 = batch
3773            .column(2)
3774            .as_primitive::<arrow_array::types::Int32Type>();
3775        assert_eq!(result_col1.value(0), 10);
3776        assert_eq!(result_col1.value(1), 20);
3777    }
3778
3779    /// Test reading Parquet files without field IDs with a filter that eliminates all row groups.
3780    /// During development of field ID mapping, we saw a panic when row_selection_enabled=true and
3781    /// all row groups are filtered out.
3782    #[tokio::test]
3783    async fn test_read_parquet_without_field_ids_filter_eliminates_all_rows() {
3784        use arrow_array::{Float64Array, Int32Array};
3785
3786        // Schema with fields that will use fallback IDs 1, 2, 3
3787        let schema = Arc::new(
3788            Schema::builder()
3789                .with_schema_id(1)
3790                .with_fields(vec![
3791                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3792                    NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3793                    NestedField::required(3, "value", Type::Primitive(PrimitiveType::Double))
3794                        .into(),
3795                ])
3796                .build()
3797                .unwrap(),
3798        );
3799
3800        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3801            Field::new("id", DataType::Int32, false),
3802            Field::new("name", DataType::Utf8, false),
3803            Field::new("value", DataType::Float64, false),
3804        ]));
3805
3806        let tmp_dir = TempDir::new().unwrap();
3807        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3808        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3809
3810        // Write data where all ids are >= 10
3811        let id_data = Arc::new(Int32Array::from(vec![10, 11, 12])) as ArrayRef;
3812        let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
3813        let value_data = Arc::new(Float64Array::from(vec![100.0, 200.0, 300.0])) as ArrayRef;
3814
3815        let to_write =
3816            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data, value_data])
3817                .unwrap();
3818
3819        let props = WriterProperties::builder()
3820            .set_compression(Compression::SNAPPY)
3821            .build();
3822
3823        let file = File::create(format!("{}/1.parquet", &table_location)).unwrap();
3824        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3825        writer.write(&to_write).expect("Writing batch");
3826        writer.close().unwrap();
3827
3828        // Filter that eliminates all row groups: id < 5
3829        let predicate = Reference::new("id").less_than(Datum::int(5));
3830
3831        // Enable both row_group_filtering and row_selection - triggered the panic
3832        let reader = ArrowReaderBuilder::new(file_io)
3833            .with_row_group_filtering_enabled(true)
3834            .with_row_selection_enabled(true)
3835            .build();
3836
3837        let tasks = Box::pin(futures::stream::iter(
3838            vec![Ok(FileScanTask {
3839                start: 0,
3840                length: 0,
3841                record_count: None,
3842                data_file_path: format!("{}/1.parquet", table_location),
3843                data_file_format: DataFileFormat::Parquet,
3844                schema: schema.clone(),
3845                project_field_ids: vec![1, 2, 3],
3846                predicate: Some(predicate.bind(schema, true).unwrap()),
3847                deletes: vec![],
3848                partition: None,
3849                partition_spec: None,
3850                name_mapping: None,
3851            })]
3852            .into_iter(),
3853        )) as FileScanTaskStream;
3854
3855        // Should no longer panic
3856        let result = reader
3857            .read(tasks)
3858            .unwrap()
3859            .try_collect::<Vec<RecordBatch>>()
3860            .await
3861            .unwrap();
3862
3863        // Should return empty results
3864        assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0));
3865    }
3866
3867    /// Test bucket partitioning reads source column from data file (not partition metadata).
3868    ///
3869    /// This is an integration test verifying the complete ArrowReader pipeline with bucket partitioning.
3870    /// It corresponds to TestRuntimeFiltering tests in Iceberg Java (e.g., testRenamedSourceColumnTable).
3871    ///
3872    /// # Iceberg Spec Requirements
3873    ///
3874    /// Per the Iceberg spec "Column Projection" section:
3875    /// > "Return the value from partition metadata if an **Identity Transform** exists for the field"
3876    ///
3877    /// This means:
3878    /// - Identity transforms (e.g., `identity(dept)`) use constants from partition metadata
3879    /// - Non-identity transforms (e.g., `bucket(4, id)`) must read source columns from data files
3880    /// - Partition metadata for bucket transforms stores bucket numbers (0-3), NOT source values
3881    ///
3882    /// Java's PartitionUtil.constantsMap() implements this via:
3883    /// ```java
3884    /// if (field.transform().isIdentity()) {
3885    ///     idToConstant.put(field.sourceId(), converted);
3886    /// }
3887    /// ```
3888    ///
3889    /// # What This Test Verifies
3890    ///
3891    /// This test ensures the full ArrowReader → RecordBatchTransformer pipeline correctly handles
3892    /// bucket partitioning when FileScanTask provides partition_spec and partition_data:
3893    ///
3894    /// - Parquet file has field_id=1 named "id" with actual data [1, 5, 9, 13]
3895    /// - FileScanTask specifies partition_spec with bucket(4, id) and partition_data with bucket=1
3896    /// - RecordBatchTransformer.constants_map() excludes bucket-partitioned field from constants
3897    /// - ArrowReader correctly reads [1, 5, 9, 13] from the data file
3898    /// - Values are NOT replaced with constant 1 from partition metadata
3899    ///
3900    /// # Why This Matters
3901    ///
3902    /// Without correct handling:
3903    /// - Runtime filtering would break (e.g., `WHERE id = 5` would fail)
3904    /// - Query results would be incorrect (all rows would have id=1)
3905    /// - Bucket partitioning would be unusable for query optimization
3906    ///
3907    /// # References
3908    /// - Iceberg spec: format/spec.md "Column Projection" + "Partition Transforms"
3909    /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java
3910    /// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
3911    #[tokio::test]
3912    async fn test_bucket_partitioning_reads_source_column_from_file() {
3913        use arrow_array::Int32Array;
3914
3915        use crate::spec::{Literal, PartitionSpec, Struct, Transform};
3916
3917        // Iceberg schema with id and name columns
3918        let schema = Arc::new(
3919            Schema::builder()
3920                .with_schema_id(0)
3921                .with_fields(vec![
3922                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3923                    NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3924                ])
3925                .build()
3926                .unwrap(),
3927        );
3928
3929        // Partition spec: bucket(4, id)
3930        let partition_spec = Arc::new(
3931            PartitionSpec::builder(schema.clone())
3932                .with_spec_id(0)
3933                .add_partition_field("id", "id_bucket", Transform::Bucket(4))
3934                .unwrap()
3935                .build()
3936                .unwrap(),
3937        );
3938
3939        // Partition data: bucket value is 1
3940        let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]);
3941
3942        // Create Arrow schema with field IDs for Parquet file
3943        let arrow_schema = Arc::new(ArrowSchema::new(vec![
3944            Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3945                PARQUET_FIELD_ID_META_KEY.to_string(),
3946                "1".to_string(),
3947            )])),
3948            Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
3949                PARQUET_FIELD_ID_META_KEY.to_string(),
3950                "2".to_string(),
3951            )])),
3952        ]));
3953
3954        // Write Parquet file with data
3955        let tmp_dir = TempDir::new().unwrap();
3956        let table_location = tmp_dir.path().to_str().unwrap().to_string();
3957        let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3958
3959        let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as ArrayRef;
3960        let name_data =
3961            Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef;
3962
3963        let to_write =
3964            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data]).unwrap();
3965
3966        let props = WriterProperties::builder()
3967            .set_compression(Compression::SNAPPY)
3968            .build();
3969        let file = File::create(format!("{}/data.parquet", &table_location)).unwrap();
3970        let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3971        writer.write(&to_write).expect("Writing batch");
3972        writer.close().unwrap();
3973
3974        // Read the Parquet file with partition spec and data
3975        let reader = ArrowReaderBuilder::new(file_io).build();
3976        let tasks = Box::pin(futures::stream::iter(
3977            vec![Ok(FileScanTask {
3978                start: 0,
3979                length: 0,
3980                record_count: None,
3981                data_file_path: format!("{}/data.parquet", table_location),
3982                data_file_format: DataFileFormat::Parquet,
3983                schema: schema.clone(),
3984                project_field_ids: vec![1, 2],
3985                predicate: None,
3986                deletes: vec![],
3987                partition: Some(partition_data),
3988                partition_spec: Some(partition_spec),
3989                name_mapping: None,
3990            })]
3991            .into_iter(),
3992        )) as FileScanTaskStream;
3993
3994        let result = reader
3995            .read(tasks)
3996            .unwrap()
3997            .try_collect::<Vec<RecordBatch>>()
3998            .await
3999            .unwrap();
4000
4001        // Verify we got the correct data
4002        assert_eq!(result.len(), 1);
4003        let batch = &result[0];
4004
4005        assert_eq!(batch.num_columns(), 2);
4006        assert_eq!(batch.num_rows(), 4);
4007
4008        // The id column MUST contain actual values from the Parquet file [1, 5, 9, 13],
4009        // NOT the constant partition value 1
4010        let id_col = batch
4011            .column(0)
4012            .as_primitive::<arrow_array::types::Int32Type>();
4013        assert_eq!(id_col.value(0), 1);
4014        assert_eq!(id_col.value(1), 5);
4015        assert_eq!(id_col.value(2), 9);
4016        assert_eq!(id_col.value(3), 13);
4017
4018        let name_col = batch.column(1).as_string::<i32>();
4019        assert_eq!(name_col.value(0), "Alice");
4020        assert_eq!(name_col.value(1), "Bob");
4021        assert_eq!(name_col.value(2), "Charlie");
4022        assert_eq!(name_col.value(3), "Dave");
4023    }
4024}