iceberg/expr/visitors/
page_index_evaluator.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Evaluates predicates against a Parquet Page Index
19
20use std::collections::HashMap;
21
22use fnv::FnvHashSet;
23use ordered_float::OrderedFloat;
24use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
25use parquet::file::metadata::RowGroupMetaData;
26use parquet::file::page_index::column_index::ColumnIndexMetaData;
27use parquet::file::page_index::offset_index::OffsetIndexMetaData;
28
29use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit};
30use crate::expr::{BoundPredicate, BoundReference};
31use crate::spec::{Datum, PrimitiveLiteral, PrimitiveType, Schema};
32use crate::{Error, ErrorKind, Result};
33
34type OffsetIndex = Vec<OffsetIndexMetaData>;
35
36const IN_PREDICATE_LIMIT: usize = 200;
37
38enum MissingColBehavior {
39    CantMatch,
40    MightMatch,
41}
42
43enum PageNullCount {
44    AllNull,
45    NoneNull,
46    SomeNull,
47    Unknown,
48}
49
50impl PageNullCount {
51    fn from_row_and_null_counts(num_rows: usize, null_count: Option<i64>) -> Self {
52        match (num_rows, null_count) {
53            (x, Some(y)) if x == y as usize => PageNullCount::AllNull,
54            (_, Some(0)) => PageNullCount::NoneNull,
55            (_, Some(_)) => PageNullCount::SomeNull,
56            _ => PageNullCount::Unknown,
57        }
58    }
59}
60
61pub(crate) struct PageIndexEvaluator<'a> {
62    column_index: &'a [ColumnIndexMetaData],
63    offset_index: &'a OffsetIndex,
64    row_group_metadata: &'a RowGroupMetaData,
65    iceberg_field_id_to_parquet_column_index: &'a HashMap<i32, usize>,
66    snapshot_schema: &'a Schema,
67    row_count_cache: HashMap<usize, Vec<usize>>,
68}
69
70impl<'a> PageIndexEvaluator<'a> {
71    pub(crate) fn new(
72        column_index: &'a [ColumnIndexMetaData],
73        offset_index: &'a OffsetIndex,
74        row_group_metadata: &'a RowGroupMetaData,
75        field_id_map: &'a HashMap<i32, usize>,
76        snapshot_schema: &'a Schema,
77    ) -> Self {
78        Self {
79            column_index,
80            offset_index,
81            row_group_metadata,
82            iceberg_field_id_to_parquet_column_index: field_id_map,
83            snapshot_schema,
84            row_count_cache: HashMap::new(),
85        }
86    }
87
88    /// Evaluate this `PageIndexEvaluator`'s filter predicate against a
89    /// specific page's column index entry in a parquet file's page index.
90    /// [`ArrowReader`] uses the resulting [`RowSelection`] to reject
91    /// pages within a parquet file's row group that cannot contain rows
92    /// matching the filter predicate.
93    pub(crate) fn eval(
94        filter: &'a BoundPredicate,
95        column_index: &'a [ColumnIndexMetaData],
96        offset_index: &'a OffsetIndex,
97        row_group_metadata: &'a RowGroupMetaData,
98        field_id_map: &'a HashMap<i32, usize>,
99        snapshot_schema: &'a Schema,
100    ) -> Result<Vec<RowSelector>> {
101        if row_group_metadata.num_rows() == 0 {
102            return Ok(vec![]);
103        }
104
105        let mut evaluator = Self::new(
106            column_index,
107            offset_index,
108            row_group_metadata,
109            field_id_map,
110            snapshot_schema,
111        );
112
113        Ok(visit(&mut evaluator, filter)?.iter().copied().collect())
114    }
115
116    fn select_all_rows(&self) -> Result<RowSelection> {
117        Ok(vec![RowSelector::select(
118            self.row_group_metadata.num_rows() as usize
119        )]
120        .into())
121    }
122
123    fn skip_all_rows(&self) -> Result<RowSelection> {
124        Ok(vec![RowSelector::skip(
125            self.row_group_metadata.num_rows() as usize
126        )]
127        .into())
128    }
129
130    fn calc_row_selection<F>(
131        &mut self,
132        field_id: i32,
133        predicate: F,
134        missing_col_behavior: MissingColBehavior,
135    ) -> Result<RowSelection>
136    where
137        F: Fn(Option<Datum>, Option<Datum>, PageNullCount) -> Result<bool>,
138    {
139        let Some(&parquet_column_index) =
140            self.iceberg_field_id_to_parquet_column_index.get(&field_id)
141        else {
142            // if the snapshot's column is not present in the row group,
143            // exit early
144            return match missing_col_behavior {
145                MissingColBehavior::CantMatch => self.skip_all_rows(),
146                MissingColBehavior::MightMatch => self.select_all_rows(),
147            };
148        };
149
150        let Some(field) = self.snapshot_schema.field_by_id(field_id) else {
151            return Err(Error::new(
152                ErrorKind::Unexpected,
153                format!("Field with id {field_id} missing from snapshot schema"),
154            ));
155        };
156
157        let Some(field_type) = field.field_type.as_primitive_type() else {
158            return Err(Error::new(
159                ErrorKind::Unexpected,
160                format!("Field with id {field_id} not convertible to primitive type"),
161            ));
162        };
163
164        let Some(column_index) = self.column_index.get(parquet_column_index) else {
165            // This should not happen, but we fail soft anyway so that the scan is still
166            // successful, just a bit slower
167            return self.select_all_rows();
168        };
169
170        let row_counts = {
171            // Caches row count calculations for columns that appear multiple times in
172            // the predicate
173            match self.row_count_cache.get(&parquet_column_index) {
174                Some(count) => count.clone(),
175                None => {
176                    let Some(offset_index) = self.offset_index.get(parquet_column_index) else {
177                        // if we have a column index, we should always have an offset index.
178                        return Err(Error::new(
179                            ErrorKind::Unexpected,
180                            format!("Missing offset index for field id {field_id}"),
181                        ));
182                    };
183
184                    let count = self.calc_row_counts(offset_index);
185                    self.row_count_cache
186                        .insert(parquet_column_index, count.clone());
187
188                    count
189                }
190            }
191        };
192
193        let Some(page_filter) = Self::apply_predicate_to_column_index(
194            predicate,
195            field_type,
196            column_index,
197            &row_counts,
198        )?
199        else {
200            return self.select_all_rows();
201        };
202
203        let row_selectors: Vec<_> = row_counts
204            .iter()
205            .zip(page_filter.iter())
206            .map(|(&row_count, &is_selected)| {
207                if is_selected {
208                    RowSelector::select(row_count)
209                } else {
210                    RowSelector::skip(row_count)
211                }
212            })
213            .collect();
214
215        Ok(row_selectors.into())
216    }
217
218    /// Returns a list of row counts per page
219    fn calc_row_counts(&self, offset_index: &OffsetIndexMetaData) -> Vec<usize> {
220        let mut remaining_rows = self.row_group_metadata.num_rows() as usize;
221        let mut row_counts = Vec::with_capacity(self.offset_index.len());
222
223        let page_locations = offset_index.page_locations();
224        for (idx, page_location) in page_locations.iter().enumerate() {
225            let row_count = if idx < page_locations.len() - 1 {
226                let row_count = (page_locations[idx + 1].first_row_index
227                    - page_location.first_row_index) as usize;
228                remaining_rows -= row_count;
229                row_count
230            } else {
231                remaining_rows
232            };
233
234            row_counts.push(row_count);
235        }
236
237        row_counts
238    }
239
240    fn apply_predicate_to_column_index<F>(
241        predicate: F,
242        field_type: &PrimitiveType,
243        column_index: &ColumnIndexMetaData,
244        row_counts: &[usize],
245    ) -> Result<Option<Vec<bool>>>
246    where
247        F: Fn(Option<Datum>, Option<Datum>, PageNullCount) -> Result<bool>,
248    {
249        let result: Result<Vec<bool>> = match column_index {
250            ColumnIndexMetaData::NONE => {
251                return Ok(None);
252            }
253            ColumnIndexMetaData::BOOLEAN(idx) => idx
254                .min_values_iter()
255                .zip(idx.max_values_iter())
256                .enumerate()
257                .zip(row_counts.iter())
258                .map(|((i, (min, max)), &row_count)| {
259                    predicate(
260                        min.map(|&val| {
261                            Datum::new(field_type.clone(), PrimitiveLiteral::Boolean(val))
262                        }),
263                        max.map(|&val| {
264                            Datum::new(field_type.clone(), PrimitiveLiteral::Boolean(val))
265                        }),
266                        PageNullCount::from_row_and_null_counts(row_count, idx.null_count(i)),
267                    )
268                })
269                .collect(),
270            ColumnIndexMetaData::INT32(idx) => idx
271                .min_values_iter()
272                .zip(idx.max_values_iter())
273                .enumerate()
274                .zip(row_counts.iter())
275                .map(|((i, (min, max)), &row_count)| {
276                    predicate(
277                        min.map(|&val| Datum::new(field_type.clone(), PrimitiveLiteral::Int(val))),
278                        max.map(|&val| Datum::new(field_type.clone(), PrimitiveLiteral::Int(val))),
279                        PageNullCount::from_row_and_null_counts(row_count, idx.null_count(i)),
280                    )
281                })
282                .collect(),
283            ColumnIndexMetaData::INT64(idx) => idx
284                .min_values_iter()
285                .zip(idx.max_values_iter())
286                .enumerate()
287                .zip(row_counts.iter())
288                .map(|((i, (min, max)), &row_count)| {
289                    predicate(
290                        min.map(|&val| Datum::new(field_type.clone(), PrimitiveLiteral::Long(val))),
291                        max.map(|&val| Datum::new(field_type.clone(), PrimitiveLiteral::Long(val))),
292                        PageNullCount::from_row_and_null_counts(row_count, idx.null_count(i)),
293                    )
294                })
295                .collect(),
296            ColumnIndexMetaData::FLOAT(idx) => idx
297                .min_values_iter()
298                .zip(idx.max_values_iter())
299                .enumerate()
300                .zip(row_counts.iter())
301                .map(|((i, (min, max)), &row_count)| {
302                    predicate(
303                        min.map(|&val| {
304                            Datum::new(
305                                field_type.clone(),
306                                PrimitiveLiteral::Float(OrderedFloat::from(val)),
307                            )
308                        }),
309                        max.map(|&val| {
310                            Datum::new(
311                                field_type.clone(),
312                                PrimitiveLiteral::Float(OrderedFloat::from(val)),
313                            )
314                        }),
315                        PageNullCount::from_row_and_null_counts(row_count, idx.null_count(i)),
316                    )
317                })
318                .collect(),
319            ColumnIndexMetaData::DOUBLE(idx) => idx
320                .min_values_iter()
321                .zip(idx.max_values_iter())
322                .enumerate()
323                .zip(row_counts.iter())
324                .map(|((i, (min, max)), &row_count)| {
325                    predicate(
326                        min.map(|&val| {
327                            Datum::new(
328                                field_type.clone(),
329                                PrimitiveLiteral::Double(OrderedFloat::from(val)),
330                            )
331                        }),
332                        max.map(|&val| {
333                            Datum::new(
334                                field_type.clone(),
335                                PrimitiveLiteral::Double(OrderedFloat::from(val)),
336                            )
337                        }),
338                        PageNullCount::from_row_and_null_counts(row_count, idx.null_count(i)),
339                    )
340                })
341                .collect(),
342            ColumnIndexMetaData::BYTE_ARRAY(idx) => idx
343                .min_values_iter()
344                .zip(idx.max_values_iter())
345                .enumerate()
346                .zip(row_counts.iter())
347                .map(|((i, (min, max)), &row_count)| {
348                    predicate(
349                        min.map(|val| {
350                            Datum::new(
351                                field_type.clone(),
352                                PrimitiveLiteral::String(String::from_utf8(val.to_vec()).unwrap()),
353                            )
354                        }),
355                        max.map(|val| {
356                            Datum::new(
357                                field_type.clone(),
358                                PrimitiveLiteral::String(String::from_utf8(val.to_vec()).unwrap()),
359                            )
360                        }),
361                        PageNullCount::from_row_and_null_counts(row_count, idx.null_count(i)),
362                    )
363                })
364                .collect(),
365            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(_) => {
366                return Err(Error::new(
367                    ErrorKind::FeatureUnsupported,
368                    "unsupported 'FIXED_LEN_BYTE_ARRAY' index type in column_index",
369                ));
370            }
371            ColumnIndexMetaData::INT96(_) => {
372                return Err(Error::new(
373                    ErrorKind::FeatureUnsupported,
374                    "unsupported 'INT96' index type in column_index",
375                ));
376            }
377        };
378
379        Ok(Some(result?))
380    }
381
382    fn visit_inequality(
383        &mut self,
384        reference: &BoundReference,
385        datum: &Datum,
386        cmp_fn: fn(&Datum, &Datum) -> bool,
387        use_lower_bound: bool,
388    ) -> Result<RowSelection> {
389        let field_id = reference.field().id;
390
391        self.calc_row_selection(
392            field_id,
393            |min, max, null_count| {
394                if matches!(null_count, PageNullCount::AllNull) {
395                    return Ok(false);
396                }
397
398                if datum.is_nan() {
399                    // NaN indicates unreliable bounds.
400                    return Ok(true);
401                }
402
403                let bound = if use_lower_bound { min } else { max };
404
405                if let Some(bound) = bound {
406                    if cmp_fn(&bound, datum) {
407                        return Ok(true);
408                    }
409
410                    return Ok(false);
411                }
412
413                Ok(true)
414            },
415            MissingColBehavior::MightMatch,
416        )
417    }
418}
419
420impl BoundPredicateVisitor for PageIndexEvaluator<'_> {
421    type T = RowSelection;
422
423    fn always_true(&mut self) -> Result<RowSelection> {
424        self.select_all_rows()
425    }
426
427    fn always_false(&mut self) -> Result<RowSelection> {
428        self.skip_all_rows()
429    }
430
431    fn and(&mut self, lhs: RowSelection, rhs: RowSelection) -> Result<RowSelection> {
432        Ok(lhs.intersection(&rhs))
433    }
434
435    fn or(&mut self, lhs: RowSelection, rhs: RowSelection) -> Result<RowSelection> {
436        Ok(lhs.union(&rhs))
437    }
438
439    fn not(&mut self, _: RowSelection) -> Result<RowSelection> {
440        Err(Error::new(
441            ErrorKind::Unexpected,
442            "NOT unsupported at this point. NOT-rewrite should be performed first",
443        ))
444    }
445
446    fn is_null(
447        &mut self,
448        reference: &BoundReference,
449        _predicate: &BoundPredicate,
450    ) -> Result<RowSelection> {
451        let field_id = reference.field().id;
452
453        self.calc_row_selection(
454            field_id,
455            |_max, _min, null_count| Ok(!matches!(null_count, PageNullCount::NoneNull)),
456            MissingColBehavior::MightMatch,
457        )
458    }
459
460    fn not_null(
461        &mut self,
462        reference: &BoundReference,
463        _predicate: &BoundPredicate,
464    ) -> Result<RowSelection> {
465        let field_id = reference.field().id;
466
467        self.calc_row_selection(
468            field_id,
469            |_max, _min, null_count| Ok(!matches!(null_count, PageNullCount::AllNull)),
470            MissingColBehavior::CantMatch,
471        )
472    }
473
474    fn is_nan(
475        &mut self,
476        reference: &BoundReference,
477        _predicate: &BoundPredicate,
478    ) -> Result<RowSelection> {
479        // NaN counts not present in ColumnChunkMetadata Statistics.
480        // Only float columns can be NaN.
481        if reference.field().field_type.is_floating_type() {
482            self.select_all_rows()
483        } else {
484            self.skip_all_rows()
485        }
486    }
487
488    fn not_nan(
489        &mut self,
490        _reference: &BoundReference,
491        _predicate: &BoundPredicate,
492    ) -> Result<RowSelection> {
493        // NaN counts not present in ColumnChunkMetadata Statistics
494        self.select_all_rows()
495    }
496
497    fn less_than(
498        &mut self,
499        reference: &BoundReference,
500        datum: &Datum,
501        _predicate: &BoundPredicate,
502    ) -> Result<RowSelection> {
503        self.visit_inequality(reference, datum, PartialOrd::lt, true)
504    }
505
506    fn less_than_or_eq(
507        &mut self,
508        reference: &BoundReference,
509        datum: &Datum,
510        _predicate: &BoundPredicate,
511    ) -> Result<RowSelection> {
512        self.visit_inequality(reference, datum, PartialOrd::le, true)
513    }
514
515    fn greater_than(
516        &mut self,
517        reference: &BoundReference,
518        datum: &Datum,
519        _predicate: &BoundPredicate,
520    ) -> Result<RowSelection> {
521        self.visit_inequality(reference, datum, PartialOrd::gt, false)
522    }
523
524    fn greater_than_or_eq(
525        &mut self,
526        reference: &BoundReference,
527        datum: &Datum,
528        _predicate: &BoundPredicate,
529    ) -> Result<RowSelection> {
530        self.visit_inequality(reference, datum, PartialOrd::ge, false)
531    }
532
533    fn eq(
534        &mut self,
535        reference: &BoundReference,
536        datum: &Datum,
537        _predicate: &BoundPredicate,
538    ) -> Result<RowSelection> {
539        let field_id = reference.field().id;
540
541        self.calc_row_selection(
542            field_id,
543            |min, max, nulls| {
544                if matches!(nulls, PageNullCount::AllNull) {
545                    return Ok(false);
546                }
547
548                if let Some(min) = min
549                    && min.gt(datum)
550                {
551                    return Ok(false);
552                }
553
554                if let Some(max) = max
555                    && max.lt(datum)
556                {
557                    return Ok(false);
558                }
559
560                Ok(true)
561            },
562            MissingColBehavior::CantMatch,
563        )
564    }
565
566    fn not_eq(
567        &mut self,
568        _reference: &BoundReference,
569        _datum: &Datum,
570        _predicate: &BoundPredicate,
571    ) -> Result<RowSelection> {
572        // Because the bounds are not necessarily a min or max value,
573        // this cannot be answered using them. notEq(col, X) with (X, Y)
574        // doesn't guarantee that X is a value in col.
575        self.select_all_rows()
576    }
577
578    fn starts_with(
579        &mut self,
580        reference: &BoundReference,
581        datum: &Datum,
582        _predicate: &BoundPredicate,
583    ) -> Result<RowSelection> {
584        let field_id = reference.field().id;
585
586        let PrimitiveLiteral::String(datum) = datum.literal() else {
587            return Err(Error::new(
588                ErrorKind::Unexpected,
589                "Cannot use StartsWith operator on non-string values",
590            ));
591        };
592
593        self.calc_row_selection(
594            field_id,
595            |min, max, nulls| {
596                if matches!(nulls, PageNullCount::AllNull) {
597                    return Ok(false);
598                }
599
600                if let Some(lower_bound) = min {
601                    let PrimitiveLiteral::String(lower_bound) = lower_bound.literal() else {
602                        return Err(Error::new(
603                            ErrorKind::Unexpected,
604                            "Cannot use StartsWith operator on non-string lower_bound value",
605                        ));
606                    };
607
608                    let prefix_length = lower_bound.chars().count().min(datum.chars().count());
609
610                    // truncate lower bound so that its length
611                    // is not greater than the length of prefix
612                    let truncated_lower_bound =
613                        lower_bound.chars().take(prefix_length).collect::<String>();
614                    if datum < &truncated_lower_bound {
615                        return Ok(false);
616                    }
617                }
618
619                if let Some(upper_bound) = max {
620                    let PrimitiveLiteral::String(upper_bound) = upper_bound.literal() else {
621                        return Err(Error::new(
622                            ErrorKind::Unexpected,
623                            "Cannot use StartsWith operator on non-string upper_bound value",
624                        ));
625                    };
626
627                    let prefix_length = upper_bound.chars().count().min(datum.chars().count());
628
629                    // truncate upper bound so that its length
630                    // is not greater than the length of prefix
631                    let truncated_upper_bound =
632                        upper_bound.chars().take(prefix_length).collect::<String>();
633                    if datum > &truncated_upper_bound {
634                        return Ok(false);
635                    }
636                }
637
638                Ok(true)
639            },
640            MissingColBehavior::CantMatch,
641        )
642    }
643
644    fn not_starts_with(
645        &mut self,
646        reference: &BoundReference,
647        datum: &Datum,
648        _predicate: &BoundPredicate,
649    ) -> Result<RowSelection> {
650        let field_id = reference.field().id;
651
652        // notStartsWith will match unless all values must start with the prefix.
653        // This happens when the lower and upper bounds both start with the prefix.
654
655        let PrimitiveLiteral::String(prefix) = datum.literal() else {
656            return Err(Error::new(
657                ErrorKind::Unexpected,
658                "Cannot use StartsWith operator on non-string values",
659            ));
660        };
661
662        self.calc_row_selection(
663            field_id,
664            |min, max, nulls| {
665                if !matches!(nulls, PageNullCount::NoneNull) {
666                    return Ok(true);
667                }
668
669                let Some(lower_bound) = min else {
670                    return Ok(true);
671                };
672
673                let PrimitiveLiteral::String(lower_bound_str) = lower_bound.literal() else {
674                    return Err(Error::new(
675                        ErrorKind::Unexpected,
676                        "Cannot use NotStartsWith operator on non-string lower_bound value",
677                    ));
678                };
679
680                if lower_bound_str < prefix {
681                    // if lower is shorter than the prefix then lower doesn't start with the prefix
682                    return Ok(true);
683                }
684
685                let prefix_len = prefix.chars().count();
686
687                if lower_bound_str.chars().take(prefix_len).collect::<String>() == *prefix {
688                    // lower bound matches the prefix
689
690                    let Some(upper_bound) = max else {
691                        return Ok(true);
692                    };
693
694                    let PrimitiveLiteral::String(upper_bound) = upper_bound.literal() else {
695                        return Err(Error::new(
696                            ErrorKind::Unexpected,
697                            "Cannot use NotStartsWith operator on non-string upper_bound value",
698                        ));
699                    };
700
701                    // if upper is shorter than the prefix then upper can't start with the prefix
702                    if upper_bound.chars().count() < prefix_len {
703                        return Ok(true);
704                    }
705
706                    if upper_bound.chars().take(prefix_len).collect::<String>() == *prefix {
707                        // both bounds match the prefix, so all rows must match the
708                        // prefix and therefore do not satisfy the predicate
709                        return Ok(false);
710                    }
711                }
712
713                Ok(true)
714            },
715            MissingColBehavior::MightMatch,
716        )
717    }
718
719    fn r#in(
720        &mut self,
721        reference: &BoundReference,
722        literals: &FnvHashSet<Datum>,
723        _predicate: &BoundPredicate,
724    ) -> Result<RowSelection> {
725        let field_id = reference.field().id;
726
727        if literals.len() > IN_PREDICATE_LIMIT {
728            // skip evaluating the predicate if the number of values is too big
729            return self.select_all_rows();
730        }
731        self.calc_row_selection(
732            field_id,
733            |min, max, nulls| {
734                if matches!(nulls, PageNullCount::AllNull) {
735                    return Ok(false);
736                }
737
738                match (min, max) {
739                    (Some(min), Some(max)) => {
740                        if literals
741                            .iter()
742                            .all(|datum| datum.lt(&min) || datum.gt(&max))
743                        {
744                            // if all values are outside the bounds, rows cannot match.
745                            return Ok(false);
746                        }
747                    }
748                    (Some(min), _) => {
749                        if !literals.iter().any(|datum| datum.ge(&min)) {
750                            // if none of the values are greater than the min bound, rows cant match
751                            return Ok(false);
752                        }
753                    }
754                    (_, Some(max)) => {
755                        if !literals.iter().any(|datum| datum.le(&max)) {
756                            // if all values are greater than upper bound, rows cannot match.
757                            return Ok(false);
758                        }
759                    }
760
761                    _ => {}
762                }
763
764                Ok(true)
765            },
766            MissingColBehavior::CantMatch,
767        )
768    }
769
770    fn not_in(
771        &mut self,
772        _reference: &BoundReference,
773        _literals: &FnvHashSet<Datum>,
774        _predicate: &BoundPredicate,
775    ) -> Result<RowSelection> {
776        // Because the bounds are not necessarily a min or max value,
777        // this cannot be answered using them. notIn(col, {X, ...})
778        // with (X, Y) doesn't guarantee that X is a value in col.
779        self.select_all_rows()
780    }
781}
782
783#[cfg(test)]
784mod tests {
785    use std::collections::HashMap;
786    use std::sync::Arc;
787
788    use arrow_array::{ArrayRef, Float32Array, RecordBatch, StringArray};
789    use arrow_schema::{DataType, Field, Schema as ArrowSchema};
790    use parquet::arrow::ArrowWriter;
791    use parquet::arrow::arrow_reader::{
792        ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelector,
793    };
794    use parquet::file::metadata::ParquetMetaData;
795    use parquet::file::properties::WriterProperties;
796    use rand::{Rng, thread_rng};
797    use tempfile::NamedTempFile;
798
799    use super::PageIndexEvaluator;
800    use crate::expr::{Bind, Reference};
801    use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
802    use crate::{ErrorKind, Result};
803
804    /// Helper function to create a test parquet file with page indexes
805    /// and return the metadata needed for testing
806    fn create_test_parquet_file() -> Result<(Arc<ParquetMetaData>, NamedTempFile)> {
807        let arrow_schema = Arc::new(ArrowSchema::new(vec![
808            Field::new("col_float", DataType::Float32, true),
809            Field::new("col_string", DataType::Utf8, true),
810        ]));
811
812        let temp_file = NamedTempFile::new().unwrap();
813        let file = temp_file.reopen().unwrap();
814
815        let props = WriterProperties::builder()
816            .set_data_page_row_count_limit(1024)
817            .set_write_batch_size(512)
818            .build();
819
820        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
821
822        let mut batches = vec![];
823
824        // Batch 1: 1024 rows - strings with AARDVARK, BEAR, BISON
825        let float_vals: Vec<Option<f32>> = vec![None; 1024];
826        let mut string_vals = vec![];
827        string_vals.push(Some("AARDVARK".to_string()));
828        for _ in 1..1023 {
829            string_vals.push(Some("BEAR".to_string()));
830        }
831        string_vals.push(Some("BISON".to_string()));
832
833        batches.push(
834            RecordBatch::try_new(arrow_schema.clone(), vec![
835                Arc::new(Float32Array::from(float_vals)),
836                Arc::new(StringArray::from(string_vals)),
837            ])
838            .unwrap(),
839        );
840
841        // Batch 2: 1024 rows - all DEER
842        let float_vals: Vec<Option<f32>> = vec![None; 1024];
843        let string_vals = vec![Some("DEER".to_string()); 1024];
844
845        batches.push(
846            RecordBatch::try_new(arrow_schema.clone(), vec![
847                Arc::new(Float32Array::from(float_vals)),
848                Arc::new(StringArray::from(string_vals)),
849            ])
850            .unwrap(),
851        );
852
853        // Batch 3: 1024 rows - float 0-10
854        let mut float_vals = vec![];
855        for i in 0..1024 {
856            float_vals.push(Some(i as f32 * 10.0 / 1024.0));
857        }
858        let mut string_vals = vec![];
859        string_vals.push(Some("GIRAFFE".to_string()));
860        string_vals.push(None);
861        for _ in 2..1024 {
862            string_vals.push(Some("HIPPO".to_string()));
863        }
864
865        batches.push(
866            RecordBatch::try_new(arrow_schema.clone(), vec![
867                Arc::new(Float32Array::from(float_vals)),
868                Arc::new(StringArray::from(string_vals)),
869            ])
870            .unwrap(),
871        );
872
873        // Batch 4: 1024 rows - float 10-20
874        let mut float_vals = vec![None];
875        for i in 1..1024 {
876            float_vals.push(Some(10.0 + i as f32 * 10.0 / 1024.0));
877        }
878        let string_vals = vec![Some("HIPPO".to_string()); 1024];
879
880        batches.push(
881            RecordBatch::try_new(arrow_schema.clone(), vec![
882                Arc::new(Float32Array::from(float_vals)),
883                Arc::new(StringArray::from(string_vals)),
884            ])
885            .unwrap(),
886        );
887
888        // Write rows one at a time to give the writer a chance to split into pages
889        for batch in &batches {
890            for i in 0..batch.num_rows() {
891                writer.write(&batch.slice(i, 1)).unwrap();
892            }
893        }
894
895        writer.close().unwrap();
896
897        let file = temp_file.reopen().unwrap();
898        let options = ArrowReaderOptions::new().with_page_index(true);
899        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
900        let metadata = reader.metadata().clone();
901
902        Ok((metadata, temp_file))
903    }
904
905    /// Get the test metadata components for testing
906    fn get_test_metadata(
907        metadata: &ParquetMetaData,
908    ) -> (
909        Vec<parquet::file::page_index::column_index::ColumnIndexMetaData>,
910        Vec<parquet::file::page_index::offset_index::OffsetIndexMetaData>,
911        &parquet::file::metadata::RowGroupMetaData,
912    ) {
913        let row_group_metadata = metadata.row_group(0);
914        let column_index = metadata.column_index().unwrap()[0].to_vec();
915        let offset_index = metadata.offset_index().unwrap()[0].to_vec();
916        (column_index, offset_index, row_group_metadata)
917    }
918
919    #[test]
920    fn eval_matches_no_rows_for_empty_row_group() -> Result<()> {
921        let arrow_schema = Arc::new(ArrowSchema::new(vec![
922            Field::new("col_float", DataType::Float32, true),
923            Field::new("col_string", DataType::Utf8, true),
924        ]));
925
926        let empty_float: ArrayRef = Arc::new(Float32Array::from(Vec::<Option<f32>>::new()));
927        let empty_string: ArrayRef = Arc::new(StringArray::from(Vec::<Option<String>>::new()));
928        let empty_batch =
929            RecordBatch::try_new(arrow_schema.clone(), vec![empty_float, empty_string]).unwrap();
930
931        let temp_file = NamedTempFile::new().unwrap();
932        let file = temp_file.reopen().unwrap();
933
934        let mut writer = ArrowWriter::try_new(file, arrow_schema, None).unwrap();
935        writer.write(&empty_batch).unwrap();
936        writer.close().unwrap();
937
938        let file = temp_file.reopen().unwrap();
939        let options = ArrowReaderOptions::new().with_page_index(true);
940        let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
941        let metadata = reader.metadata();
942
943        if metadata.num_row_groups() == 0 || metadata.row_group(0).num_rows() == 0 {
944            return Ok(());
945        }
946
947        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
948
949        let filter = Reference::new("col_float")
950            .greater_than(Datum::float(1.0))
951            .bind(iceberg_schema_ref.clone(), false)?;
952
953        let row_group_metadata = metadata.row_group(0);
954        let column_index = metadata.column_index().unwrap()[0].to_vec();
955        let offset_index = metadata.offset_index().unwrap()[0].to_vec();
956
957        let result = PageIndexEvaluator::eval(
958            &filter,
959            &column_index,
960            &offset_index,
961            row_group_metadata,
962            &field_id_map,
963            iceberg_schema_ref.as_ref(),
964        )?;
965
966        assert_eq!(result.len(), 0);
967
968        Ok(())
969    }
970
971    #[test]
972    fn eval_is_null_select_only_pages_with_nulls() -> Result<()> {
973        let (metadata, _temp_file) = create_test_parquet_file()?;
974        let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
975        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
976
977        let filter = Reference::new("col_float")
978            .is_null()
979            .bind(iceberg_schema_ref.clone(), false)?;
980
981        let result = PageIndexEvaluator::eval(
982            &filter,
983            &column_index,
984            &offset_index,
985            row_group_metadata,
986            &field_id_map,
987            iceberg_schema_ref.as_ref(),
988        )?;
989
990        let expected = vec![
991            RowSelector::select(2048),
992            RowSelector::skip(1024),
993            RowSelector::select(1024),
994        ];
995
996        assert_eq!(result, expected);
997
998        Ok(())
999    }
1000
1001    #[test]
1002    fn eval_is_not_null_dont_select_pages_with_all_nulls() -> Result<()> {
1003        let (metadata, _temp_file) = create_test_parquet_file()?;
1004        let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1005        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1006
1007        let filter = Reference::new("col_float")
1008            .is_not_null()
1009            .bind(iceberg_schema_ref.clone(), false)?;
1010
1011        let result = PageIndexEvaluator::eval(
1012            &filter,
1013            &column_index,
1014            &offset_index,
1015            row_group_metadata,
1016            &field_id_map,
1017            iceberg_schema_ref.as_ref(),
1018        )?;
1019
1020        let expected = vec![RowSelector::skip(2048), RowSelector::select(2048)];
1021
1022        assert_eq!(result, expected);
1023
1024        Ok(())
1025    }
1026
1027    #[test]
1028    fn eval_is_nan_select_all() -> Result<()> {
1029        let (metadata, _temp_file) = create_test_parquet_file()?;
1030        let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1031        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1032
1033        let filter = Reference::new("col_float")
1034            .is_nan()
1035            .bind(iceberg_schema_ref.clone(), false)?;
1036
1037        let result = PageIndexEvaluator::eval(
1038            &filter,
1039            &column_index,
1040            &offset_index,
1041            row_group_metadata,
1042            &field_id_map,
1043            iceberg_schema_ref.as_ref(),
1044        )?;
1045
1046        let expected = vec![RowSelector::select(4096)];
1047
1048        assert_eq!(result, expected);
1049
1050        Ok(())
1051    }
1052
1053    #[test]
1054    fn eval_not_nan_select_all() -> Result<()> {
1055        let (metadata, _temp_file) = create_test_parquet_file()?;
1056        let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1057        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1058
1059        let filter = Reference::new("col_float")
1060            .is_not_nan()
1061            .bind(iceberg_schema_ref.clone(), false)?;
1062
1063        let result = PageIndexEvaluator::eval(
1064            &filter,
1065            &column_index,
1066            &offset_index,
1067            row_group_metadata,
1068            &field_id_map,
1069            iceberg_schema_ref.as_ref(),
1070        )?;
1071
1072        let expected = vec![RowSelector::select(4096)];
1073
1074        assert_eq!(result, expected);
1075
1076        Ok(())
1077    }
1078
1079    #[test]
1080    fn eval_inequality_nan_datum_all_rows_except_all_null_pages() -> Result<()> {
1081        let (metadata, _temp_file) = create_test_parquet_file()?;
1082        let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1083        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1084
1085        let filter = Reference::new("col_float")
1086            .less_than(Datum::float(f32::NAN))
1087            .bind(iceberg_schema_ref.clone(), false)?;
1088
1089        let result = PageIndexEvaluator::eval(
1090            &filter,
1091            &column_index,
1092            &offset_index,
1093            row_group_metadata,
1094            &field_id_map,
1095            iceberg_schema_ref.as_ref(),
1096        )?;
1097
1098        let expected = vec![RowSelector::skip(2048), RowSelector::select(2048)];
1099
1100        assert_eq!(result, expected);
1101
1102        Ok(())
1103    }
1104
1105    #[test]
1106    fn eval_inequality_pages_containing_value_except_all_null_pages() -> Result<()> {
1107        let (metadata, _temp_file) = create_test_parquet_file()?;
1108        let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1109        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1110
1111        let filter = Reference::new("col_float")
1112            .less_than(Datum::float(5.0))
1113            .bind(iceberg_schema_ref.clone(), false)?;
1114
1115        let result = PageIndexEvaluator::eval(
1116            &filter,
1117            &column_index,
1118            &offset_index,
1119            row_group_metadata,
1120            &field_id_map,
1121            iceberg_schema_ref.as_ref(),
1122        )?;
1123
1124        let expected = vec![
1125            RowSelector::skip(2048),
1126            RowSelector::select(1024),
1127            RowSelector::skip(1024),
1128        ];
1129
1130        assert_eq!(result, expected);
1131
1132        Ok(())
1133    }
1134
1135    #[test]
1136    fn eval_eq_pages_containing_value_except_all_null_pages() -> Result<()> {
1137        let (metadata, _temp_file) = create_test_parquet_file()?;
1138        let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1139        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1140
1141        let filter = Reference::new("col_float")
1142            .equal_to(Datum::float(5.0))
1143            .bind(iceberg_schema_ref.clone(), false)?;
1144
1145        let result = PageIndexEvaluator::eval(
1146            &filter,
1147            &column_index,
1148            &offset_index,
1149            row_group_metadata,
1150            &field_id_map,
1151            iceberg_schema_ref.as_ref(),
1152        )?;
1153
1154        // Pages 0-1: all null (skip)
1155        // Page 2: 0-10 (select, might contain 5.0)
1156        // Page 3: 10-20 (skip, min > 5.0)
1157        let expected = vec![
1158            RowSelector::skip(2048),
1159            RowSelector::select(1024),
1160            RowSelector::skip(1024),
1161        ];
1162
1163        assert_eq!(result, expected);
1164
1165        Ok(())
1166    }
1167
1168    #[test]
1169    fn eval_not_eq_all_rows() -> Result<()> {
1170        let (metadata, _temp_file) = create_test_parquet_file()?;
1171        let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1172        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1173
1174        let filter = Reference::new("col_float")
1175            .not_equal_to(Datum::float(5.0))
1176            .bind(iceberg_schema_ref.clone(), false)?;
1177
1178        let result = PageIndexEvaluator::eval(
1179            &filter,
1180            &column_index,
1181            &offset_index,
1182            row_group_metadata,
1183            &field_id_map,
1184            iceberg_schema_ref.as_ref(),
1185        )?;
1186
1187        let expected = vec![RowSelector::select(4096)];
1188
1189        assert_eq!(result, expected);
1190
1191        Ok(())
1192    }
1193
1194    #[test]
1195    fn eval_starts_with_error_float_col() -> Result<()> {
1196        let (metadata, _temp_file) = create_test_parquet_file()?;
1197        let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1198        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1199
1200        let filter = Reference::new("col_float")
1201            .starts_with(Datum::float(5.0))
1202            .bind(iceberg_schema_ref.clone(), false)?;
1203
1204        let result = PageIndexEvaluator::eval(
1205            &filter,
1206            &column_index,
1207            &offset_index,
1208            row_group_metadata,
1209            &field_id_map,
1210            iceberg_schema_ref.as_ref(),
1211        );
1212
1213        assert_eq!(result.unwrap_err().kind(), ErrorKind::Unexpected);
1214
1215        Ok(())
1216    }
1217
1218    #[test]
1219    fn eval_starts_with_pages_containing_value_except_all_null_pages() -> Result<()> {
1220        let (metadata, _temp_file) = create_test_parquet_file()?;
1221        let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1222        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1223
1224        // Test starts_with on string column where only some pages match
1225        // Our file has 4 pages: ["AARDVARK".."BISON"], ["DEER"], ["GIRAFFE".."HIPPO"], ["HIPPO"]
1226        // Testing starts_with("B") should select only page 0
1227        let filter = Reference::new("col_string")
1228            .starts_with(Datum::string("B"))
1229            .bind(iceberg_schema_ref.clone(), false)?;
1230
1231        let result = PageIndexEvaluator::eval(
1232            &filter,
1233            &column_index,
1234            &offset_index,
1235            row_group_metadata,
1236            &field_id_map,
1237            iceberg_schema_ref.as_ref(),
1238        )?;
1239
1240        // Page 0 has "BEAR" and "BISON" (starts with B), rest don't
1241        let expected = vec![RowSelector::select(1024), RowSelector::skip(3072)];
1242
1243        assert_eq!(result, expected);
1244
1245        Ok(())
1246    }
1247
1248    #[test]
1249    fn eval_not_starts_with_pages_containing_value_except_pages_with_min_and_max_equal_to_prefix_and_all_null_pages()
1250    -> Result<()> {
1251        let (metadata, _temp_file) = create_test_parquet_file()?;
1252        let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1253        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1254
1255        // Test not_starts_with where one page has ALL values starting with prefix
1256        // Our file has page 1 with all "DEER" (min="DEER", max="DEER")
1257        // Testing not_starts_with("DE") should skip page 1 where all values start with "DE"
1258        let filter = Reference::new("col_string")
1259            .not_starts_with(Datum::string("DE"))
1260            .bind(iceberg_schema_ref.clone(), false)?;
1261
1262        let result = PageIndexEvaluator::eval(
1263            &filter,
1264            &column_index,
1265            &offset_index,
1266            row_group_metadata,
1267            &field_id_map,
1268            iceberg_schema_ref.as_ref(),
1269        )?;
1270
1271        // Page 0: mixed values (select)
1272        // Page 1: all "DEER" starting with "DE" (skip)
1273        // Pages 2-3: other values not all starting with "DE" (select)
1274        let expected = vec![
1275            RowSelector::select(1024),
1276            RowSelector::skip(1024),
1277            RowSelector::select(2048),
1278        ];
1279
1280        assert_eq!(result, expected);
1281
1282        Ok(())
1283    }
1284
1285    #[test]
1286    fn eval_in_length_of_set_above_limit_all_rows() -> Result<()> {
1287        let mut rng = thread_rng();
1288        let (metadata, _temp_file) = create_test_parquet_file()?;
1289        let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1290        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1291
1292        let filter = Reference::new("col_float")
1293            .is_in(std::iter::repeat_with(|| Datum::float(rng.gen_range(0.0..10.0))).take(1000))
1294            .bind(iceberg_schema_ref.clone(), false)?;
1295
1296        let result = PageIndexEvaluator::eval(
1297            &filter,
1298            &column_index,
1299            &offset_index,
1300            row_group_metadata,
1301            &field_id_map,
1302            iceberg_schema_ref.as_ref(),
1303        )?;
1304
1305        let expected = vec![RowSelector::select(4096)];
1306
1307        assert_eq!(result, expected);
1308
1309        Ok(())
1310    }
1311
1312    #[test]
1313    fn eval_in_valid_set_size_some_rows() -> Result<()> {
1314        let (metadata, _temp_file) = create_test_parquet_file()?;
1315        let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1316        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1317
1318        // Test is_in with multiple values using min/max bounds
1319        // Our file has 4 pages: ["AARDVARK".."BISON"], ["DEER"], ["GIRAFFE".."HIPPO"], ["HIPPO"]
1320        // Testing is_in(["AARDVARK", "GIRAFFE"]) - both are in different pages
1321        let filter = Reference::new("col_string")
1322            .is_in([Datum::string("AARDVARK"), Datum::string("GIRAFFE")])
1323            .bind(iceberg_schema_ref.clone(), false)?;
1324
1325        let result = PageIndexEvaluator::eval(
1326            &filter,
1327            &column_index,
1328            &offset_index,
1329            row_group_metadata,
1330            &field_id_map,
1331            iceberg_schema_ref.as_ref(),
1332        )?;
1333
1334        // Page 0 contains "AARDVARK", page 1 doesn't contain either, page 2 contains "GIRAFFE", page 3 doesn't
1335        let expected = vec![
1336            RowSelector::select(1024),
1337            RowSelector::skip(1024),
1338            RowSelector::select(1024),
1339            RowSelector::skip(1024),
1340        ];
1341
1342        assert_eq!(result, expected);
1343
1344        Ok(())
1345    }
1346
1347    fn build_iceberg_schema_and_field_map() -> Result<(Arc<Schema>, HashMap<i32, usize>)> {
1348        let iceberg_schema = Schema::builder()
1349            .with_fields([
1350                Arc::new(NestedField::new(
1351                    1,
1352                    "col_float",
1353                    Type::Primitive(PrimitiveType::Float),
1354                    false,
1355                )),
1356                Arc::new(NestedField::new(
1357                    2,
1358                    "col_string",
1359                    Type::Primitive(PrimitiveType::String),
1360                    false,
1361                )),
1362            ])
1363            .build()?;
1364        let iceberg_schema_ref = Arc::new(iceberg_schema);
1365
1366        let field_id_map = HashMap::from_iter([(1, 0), (2, 1)]);
1367
1368        Ok((iceberg_schema_ref, field_id_map))
1369    }
1370}