iceberg/expr/visitors/
row_group_metrics_evaluator.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Evaluates Parquet Row Group metrics
19
20use std::collections::HashMap;
21
22use fnv::FnvHashSet;
23use parquet::file::metadata::RowGroupMetaData;
24use parquet::file::statistics::Statistics;
25
26use crate::arrow::{get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum};
27use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit};
28use crate::expr::{BoundPredicate, BoundReference};
29use crate::spec::{Datum, PrimitiveLiteral, PrimitiveType, Schema};
30use crate::{Error, ErrorKind, Result};
31
32pub(crate) struct RowGroupMetricsEvaluator<'a> {
33    row_group_metadata: &'a RowGroupMetaData,
34    iceberg_field_id_to_parquet_column_index: &'a HashMap<i32, usize>,
35    snapshot_schema: &'a Schema,
36}
37
38const IN_PREDICATE_LIMIT: usize = 200;
39const ROW_GROUP_MIGHT_MATCH: Result<bool> = Ok(true);
40const ROW_GROUP_CANT_MATCH: Result<bool> = Ok(false);
41
42impl<'a> RowGroupMetricsEvaluator<'a> {
43    fn new(
44        row_group_metadata: &'a RowGroupMetaData,
45        field_id_map: &'a HashMap<i32, usize>,
46        snapshot_schema: &'a Schema,
47    ) -> Self {
48        Self {
49            row_group_metadata,
50            iceberg_field_id_to_parquet_column_index: field_id_map,
51            snapshot_schema,
52        }
53    }
54
55    /// Evaluate this `RowGroupMetricsEvaluator`'s filter predicate against the
56    /// provided [`RowGroupMetaData`]'. Used by [`ArrowReader`] to
57    /// see if a Parquet file RowGroup could possibly contain data that matches
58    /// the scan's filter.
59    pub(crate) fn eval(
60        filter: &'a BoundPredicate,
61        row_group_metadata: &'a RowGroupMetaData,
62        field_id_map: &'a HashMap<i32, usize>,
63        snapshot_schema: &'a Schema,
64    ) -> Result<bool> {
65        if row_group_metadata.num_rows() == 0 {
66            return ROW_GROUP_CANT_MATCH;
67        }
68
69        let mut evaluator = Self::new(row_group_metadata, field_id_map, snapshot_schema);
70
71        visit(&mut evaluator, filter)
72    }
73
74    fn stats_for_field_id(&self, field_id: i32) -> Option<&Statistics> {
75        let parquet_column_index = *self
76            .iceberg_field_id_to_parquet_column_index
77            .get(&field_id)?;
78        self.row_group_metadata
79            .column(parquet_column_index)
80            .statistics()
81    }
82
83    fn null_count(&self, field_id: i32) -> Option<u64> {
84        self.stats_for_field_id(field_id)?.null_count_opt()
85    }
86
87    fn value_count(&self) -> u64 {
88        self.row_group_metadata.num_rows() as u64
89    }
90
91    fn contains_nulls_only(&self, field_id: i32) -> bool {
92        let null_count = self.null_count(field_id);
93        let value_count = self.value_count();
94
95        null_count == Some(value_count)
96    }
97
98    fn may_contain_null(&self, field_id: i32) -> bool {
99        if let Some(null_count) = self.null_count(field_id) {
100            null_count > 0
101        } else {
102            true
103        }
104    }
105
106    fn stats_and_type_for_field_id(
107        &self,
108        field_id: i32,
109    ) -> Result<Option<(&Statistics, PrimitiveType)>> {
110        let Some(stats) = self.stats_for_field_id(field_id) else {
111            // No statistics for column
112            return Ok(None);
113        };
114
115        let Some(field) = self.snapshot_schema.field_by_id(field_id) else {
116            return Err(Error::new(
117                ErrorKind::Unexpected,
118                format!(
119                    "Could not find a field with id '{}' in the snapshot schema",
120                    &field_id
121                ),
122            ));
123        };
124
125        let Some(primitive_type) = field.field_type.as_primitive_type() else {
126            return Err(Error::new(
127                ErrorKind::Unexpected,
128                format!(
129                    "Could not determine the PrimitiveType for field id '{}'",
130                    &field_id
131                ),
132            ));
133        };
134
135        Ok(Some((stats, primitive_type.clone())))
136    }
137
138    fn min_value(&self, field_id: i32) -> Result<Option<Datum>> {
139        let Some((stats, primitive_type)) = self.stats_and_type_for_field_id(field_id)? else {
140            return Ok(None);
141        };
142
143        get_parquet_stat_min_as_datum(&primitive_type, stats)
144    }
145
146    fn max_value(&self, field_id: i32) -> Result<Option<Datum>> {
147        let Some((stats, primitive_type)) = self.stats_and_type_for_field_id(field_id)? else {
148            return Ok(None);
149        };
150
151        get_parquet_stat_max_as_datum(&primitive_type, stats)
152    }
153
154    fn visit_inequality(
155        &mut self,
156        reference: &BoundReference,
157        datum: &Datum,
158        cmp_fn: fn(&Datum, &Datum) -> bool,
159        use_lower_bound: bool,
160    ) -> Result<bool> {
161        let field_id = reference.field().id;
162
163        if self.contains_nulls_only(field_id) {
164            return ROW_GROUP_CANT_MATCH;
165        }
166
167        if datum.is_nan() {
168            // NaN indicates unreliable bounds.
169            // See the InclusiveMetricsEvaluator docs for more.
170            return ROW_GROUP_MIGHT_MATCH;
171        }
172
173        let bound = if use_lower_bound {
174            self.min_value(field_id)
175        } else {
176            self.max_value(field_id)
177        }?;
178
179        if let Some(bound) = bound {
180            if cmp_fn(&bound, datum) {
181                return ROW_GROUP_MIGHT_MATCH;
182            }
183
184            return ROW_GROUP_CANT_MATCH;
185        }
186
187        ROW_GROUP_MIGHT_MATCH
188    }
189}
190
191impl BoundPredicateVisitor for RowGroupMetricsEvaluator<'_> {
192    type T = bool;
193
194    fn always_true(&mut self) -> Result<bool> {
195        ROW_GROUP_MIGHT_MATCH
196    }
197
198    fn always_false(&mut self) -> Result<bool> {
199        ROW_GROUP_CANT_MATCH
200    }
201
202    fn and(&mut self, lhs: bool, rhs: bool) -> Result<bool> {
203        Ok(lhs && rhs)
204    }
205
206    fn or(&mut self, lhs: bool, rhs: bool) -> Result<bool> {
207        Ok(lhs || rhs)
208    }
209
210    fn not(&mut self, inner: bool) -> Result<bool> {
211        Ok(!inner)
212    }
213
214    fn is_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<bool> {
215        let field_id = reference.field().id;
216
217        match self.null_count(field_id) {
218            Some(0) => ROW_GROUP_CANT_MATCH,
219            Some(_) => ROW_GROUP_MIGHT_MATCH,
220            None => ROW_GROUP_MIGHT_MATCH,
221        }
222    }
223
224    fn not_null(
225        &mut self,
226        reference: &BoundReference,
227        _predicate: &BoundPredicate,
228    ) -> Result<bool> {
229        let field_id = reference.field().id;
230
231        if self.contains_nulls_only(field_id) {
232            return ROW_GROUP_CANT_MATCH;
233        }
234
235        ROW_GROUP_MIGHT_MATCH
236    }
237
238    fn is_nan(&mut self, _reference: &BoundReference, _predicate: &BoundPredicate) -> Result<bool> {
239        // NaN counts not in ColumnChunkMetadata Statistics
240        ROW_GROUP_MIGHT_MATCH
241    }
242
243    fn not_nan(
244        &mut self,
245        _reference: &BoundReference,
246        _predicate: &BoundPredicate,
247    ) -> Result<bool> {
248        // NaN counts not in ColumnChunkMetadata Statistics
249        ROW_GROUP_MIGHT_MATCH
250    }
251
252    fn less_than(
253        &mut self,
254        reference: &BoundReference,
255        datum: &Datum,
256        _predicate: &BoundPredicate,
257    ) -> Result<bool> {
258        self.visit_inequality(reference, datum, PartialOrd::lt, true)
259    }
260
261    fn less_than_or_eq(
262        &mut self,
263        reference: &BoundReference,
264        datum: &Datum,
265        _predicate: &BoundPredicate,
266    ) -> Result<bool> {
267        self.visit_inequality(reference, datum, PartialOrd::le, true)
268    }
269
270    fn greater_than(
271        &mut self,
272        reference: &BoundReference,
273        datum: &Datum,
274        _predicate: &BoundPredicate,
275    ) -> Result<bool> {
276        self.visit_inequality(reference, datum, PartialOrd::gt, false)
277    }
278
279    fn greater_than_or_eq(
280        &mut self,
281        reference: &BoundReference,
282        datum: &Datum,
283        _predicate: &BoundPredicate,
284    ) -> Result<bool> {
285        self.visit_inequality(reference, datum, PartialOrd::ge, false)
286    }
287
288    fn eq(
289        &mut self,
290        reference: &BoundReference,
291        datum: &Datum,
292        _predicate: &BoundPredicate,
293    ) -> Result<bool> {
294        let field_id = reference.field().id;
295
296        if self.contains_nulls_only(field_id) {
297            return ROW_GROUP_CANT_MATCH;
298        }
299
300        if let Some(lower_bound) = self.min_value(field_id)? {
301            if lower_bound.is_nan() {
302                // NaN indicates unreliable bounds.
303                // See the InclusiveMetricsEvaluator docs for more.
304                return ROW_GROUP_MIGHT_MATCH;
305            } else if lower_bound.gt(datum) {
306                return ROW_GROUP_CANT_MATCH;
307            }
308        }
309
310        if let Some(upper_bound) = self.max_value(field_id)? {
311            if upper_bound.is_nan() {
312                // NaN indicates unreliable bounds.
313                // See the InclusiveMetricsEvaluator docs for more.
314                return ROW_GROUP_MIGHT_MATCH;
315            } else if upper_bound.lt(datum) {
316                return ROW_GROUP_CANT_MATCH;
317            }
318        }
319
320        ROW_GROUP_MIGHT_MATCH
321    }
322
323    fn not_eq(
324        &mut self,
325        _reference: &BoundReference,
326        _datum: &Datum,
327        _predicate: &BoundPredicate,
328    ) -> Result<bool> {
329        // Because the bounds are not necessarily a min or max value,
330        // this cannot be answered using them. notEq(col, X) with (X, Y)
331        // doesn't guarantee that X is a value in col.
332        ROW_GROUP_MIGHT_MATCH
333    }
334
335    fn starts_with(
336        &mut self,
337        reference: &BoundReference,
338        datum: &Datum,
339        _predicate: &BoundPredicate,
340    ) -> Result<bool> {
341        let field_id = reference.field().id;
342
343        if self.contains_nulls_only(field_id) {
344            return ROW_GROUP_CANT_MATCH;
345        }
346
347        let PrimitiveLiteral::String(datum) = datum.literal() else {
348            return Err(Error::new(
349                ErrorKind::Unexpected,
350                "Cannot use StartsWith operator on non-string values",
351            ));
352        };
353
354        if let Some(lower_bound) = self.min_value(field_id)? {
355            let PrimitiveLiteral::String(lower_bound) = lower_bound.literal() else {
356                return Err(Error::new(
357                    ErrorKind::Unexpected,
358                    "Cannot use StartsWith operator on non-string lower_bound value",
359                ));
360            };
361
362            let prefix_length = lower_bound.chars().count().min(datum.chars().count());
363
364            // truncate lower bound so that its length
365            // is not greater than the length of prefix
366            let truncated_lower_bound = lower_bound.chars().take(prefix_length).collect::<String>();
367            if datum < &truncated_lower_bound {
368                return ROW_GROUP_CANT_MATCH;
369            }
370        }
371
372        if let Some(upper_bound) = self.max_value(field_id)? {
373            let PrimitiveLiteral::String(upper_bound) = upper_bound.literal() else {
374                return Err(Error::new(
375                    ErrorKind::Unexpected,
376                    "Cannot use StartsWith operator on non-string upper_bound value",
377                ));
378            };
379
380            let prefix_length = upper_bound.chars().count().min(datum.chars().count());
381
382            // truncate upper bound so that its length
383            // is not greater than the length of prefix
384            let truncated_upper_bound = upper_bound.chars().take(prefix_length).collect::<String>();
385            if datum > &truncated_upper_bound {
386                return ROW_GROUP_CANT_MATCH;
387            }
388        }
389
390        ROW_GROUP_MIGHT_MATCH
391    }
392
393    fn not_starts_with(
394        &mut self,
395        reference: &BoundReference,
396        datum: &Datum,
397        _predicate: &BoundPredicate,
398    ) -> Result<bool> {
399        let field_id = reference.field().id;
400
401        if self.may_contain_null(field_id) {
402            return ROW_GROUP_MIGHT_MATCH;
403        }
404
405        // notStartsWith will match unless all values must start with the prefix.
406        // This happens when the lower and upper bounds both start with the prefix.
407
408        let PrimitiveLiteral::String(prefix) = datum.literal() else {
409            return Err(Error::new(
410                ErrorKind::Unexpected,
411                "Cannot use StartsWith operator on non-string values",
412            ));
413        };
414
415        let Some(lower_bound) = self.min_value(field_id)? else {
416            return ROW_GROUP_MIGHT_MATCH;
417        };
418
419        let PrimitiveLiteral::String(lower_bound_str) = lower_bound.literal() else {
420            return Err(Error::new(
421                ErrorKind::Unexpected,
422                "Cannot use NotStartsWith operator on non-string lower_bound value",
423            ));
424        };
425
426        if lower_bound_str < prefix {
427            // if lower is shorter than the prefix then lower doesn't start with the prefix
428            return ROW_GROUP_MIGHT_MATCH;
429        }
430
431        let prefix_len = prefix.chars().count();
432
433        if lower_bound_str.chars().take(prefix_len).collect::<String>() == *prefix {
434            // lower bound matches the prefix
435
436            let Some(upper_bound) = self.max_value(field_id)? else {
437                return ROW_GROUP_MIGHT_MATCH;
438            };
439
440            let PrimitiveLiteral::String(upper_bound) = upper_bound.literal() else {
441                return Err(Error::new(
442                    ErrorKind::Unexpected,
443                    "Cannot use NotStartsWith operator on non-string upper_bound value",
444                ));
445            };
446
447            // if upper is shorter than the prefix then upper can't start with the prefix
448            if upper_bound.chars().count() < prefix_len {
449                return ROW_GROUP_MIGHT_MATCH;
450            }
451
452            if upper_bound.chars().take(prefix_len).collect::<String>() == *prefix {
453                // both bounds match the prefix, so all rows must match the
454                // prefix and therefore do not satisfy the predicate
455                return ROW_GROUP_CANT_MATCH;
456            }
457        }
458
459        ROW_GROUP_MIGHT_MATCH
460    }
461
462    fn r#in(
463        &mut self,
464        reference: &BoundReference,
465        literals: &FnvHashSet<Datum>,
466        _predicate: &BoundPredicate,
467    ) -> Result<bool> {
468        let field_id = reference.field().id;
469
470        if self.contains_nulls_only(field_id) {
471            return ROW_GROUP_CANT_MATCH;
472        }
473
474        if literals.len() > IN_PREDICATE_LIMIT {
475            // skip evaluating the predicate if the number of values is too big
476            return ROW_GROUP_MIGHT_MATCH;
477        }
478
479        if let Some(lower_bound) = self.min_value(field_id)? {
480            if lower_bound.is_nan() {
481                // NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
482                return ROW_GROUP_MIGHT_MATCH;
483            }
484
485            if !literals.iter().any(|datum| datum.ge(&lower_bound)) {
486                // if all values are less than lower bound, rows cannot match.
487                return ROW_GROUP_CANT_MATCH;
488            }
489        }
490
491        if let Some(upper_bound) = self.max_value(field_id)? {
492            if upper_bound.is_nan() {
493                // NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
494                return ROW_GROUP_MIGHT_MATCH;
495            }
496
497            if !literals.iter().any(|datum| datum.le(&upper_bound)) {
498                // if all values are greater than upper bound, rows cannot match.
499                return ROW_GROUP_CANT_MATCH;
500            }
501        }
502
503        ROW_GROUP_MIGHT_MATCH
504    }
505
506    fn not_in(
507        &mut self,
508        _reference: &BoundReference,
509        _literals: &FnvHashSet<Datum>,
510        _predicate: &BoundPredicate,
511    ) -> Result<bool> {
512        // Because the bounds are not necessarily a min or max value,
513        // this cannot be answered using them. notIn(col, {X, ...})
514        // with (X, Y) doesn't guarantee that X is a value in col.
515        ROW_GROUP_MIGHT_MATCH
516    }
517}
518
519#[cfg(test)]
520mod tests {
521    use std::collections::HashMap;
522    use std::sync::Arc;
523
524    use parquet::basic::{LogicalType as ParquetLogicalType, Type as ParquetPhysicalType};
525    use parquet::data_type::ByteArray;
526    use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
527    use parquet::file::statistics::Statistics;
528    use parquet::schema::types::{
529        ColumnDescriptor, ColumnPath, SchemaDescriptor, Type as parquetSchemaType,
530    };
531    use rand::{Rng, thread_rng};
532
533    use super::RowGroupMetricsEvaluator;
534    use crate::Result;
535    use crate::expr::{Bind, Reference};
536    use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
537
538    #[test]
539    fn eval_matches_no_rows_for_empty_row_group() -> Result<()> {
540        let row_group_metadata = create_row_group_metadata(0, 0, None, 0, None)?;
541
542        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
543
544        let filter = Reference::new("col_float")
545            .greater_than(Datum::float(1.0))
546            .bind(iceberg_schema_ref.clone(), false)?;
547
548        let result = RowGroupMetricsEvaluator::eval(
549            &filter,
550            &row_group_metadata,
551            &field_id_map,
552            iceberg_schema_ref.as_ref(),
553        )?;
554
555        assert!(!result);
556
557        Ok(())
558    }
559
560    #[test]
561    fn eval_true_for_row_group_no_bounds_present() -> Result<()> {
562        let row_group_metadata = create_row_group_metadata(1, 1, None, 1, None)?;
563
564        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
565
566        let filter = Reference::new("col_float")
567            .greater_than(Datum::float(1.0))
568            .bind(iceberg_schema_ref.clone(), false)?;
569
570        let result = RowGroupMetricsEvaluator::eval(
571            &filter,
572            &row_group_metadata,
573            &field_id_map,
574            iceberg_schema_ref.as_ref(),
575        )?;
576
577        assert!(result);
578
579        Ok(())
580    }
581
582    #[test]
583    fn eval_false_for_meta_all_null_filter_not_null() -> Result<()> {
584        let row_group_metadata = create_row_group_metadata(
585            1,
586            1,
587            Some(Statistics::float(None, None, None, Some(1), false)),
588            1,
589            None,
590        )?;
591
592        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
593
594        let filter = Reference::new("col_float")
595            .is_not_null()
596            .bind(iceberg_schema_ref.clone(), false)?;
597
598        let result = RowGroupMetricsEvaluator::eval(
599            &filter,
600            &row_group_metadata,
601            &field_id_map,
602            iceberg_schema_ref.as_ref(),
603        )?;
604
605        assert!(!result);
606        Ok(())
607    }
608
609    #[test]
610    fn eval_true_for_meta_all_null_filter_is_null() -> Result<()> {
611        let row_group_metadata = create_row_group_metadata(
612            1,
613            1,
614            Some(Statistics::float(None, None, None, Some(1), false)),
615            1,
616            None,
617        )?;
618
619        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
620
621        let filter = Reference::new("col_float")
622            .is_null()
623            .bind(iceberg_schema_ref.clone(), false)?;
624
625        let result = RowGroupMetricsEvaluator::eval(
626            &filter,
627            &row_group_metadata,
628            &field_id_map,
629            iceberg_schema_ref.as_ref(),
630        )?;
631
632        assert!(result);
633        Ok(())
634    }
635
636    #[test]
637    fn eval_true_for_meta_none_null_filter_not_null() -> Result<()> {
638        let row_group_metadata = create_row_group_metadata(
639            1,
640            1,
641            Some(Statistics::float(None, None, None, Some(0), false)),
642            1,
643            None,
644        )?;
645
646        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
647
648        let filter = Reference::new("col_float")
649            .is_not_null()
650            .bind(iceberg_schema_ref.clone(), false)?;
651
652        let result = RowGroupMetricsEvaluator::eval(
653            &filter,
654            &row_group_metadata,
655            &field_id_map,
656            iceberg_schema_ref.as_ref(),
657        )?;
658
659        assert!(result);
660        Ok(())
661    }
662
663    #[test]
664    fn eval_false_for_meta_none_null_filter_is_null() -> Result<()> {
665        let row_group_metadata = create_row_group_metadata(
666            1,
667            1,
668            Some(Statistics::float(None, None, None, Some(0), false)),
669            1,
670            None,
671        )?;
672
673        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
674
675        let filter = Reference::new("col_float")
676            .is_null()
677            .bind(iceberg_schema_ref.clone(), false)?;
678
679        let result = RowGroupMetricsEvaluator::eval(
680            &filter,
681            &row_group_metadata,
682            &field_id_map,
683            iceberg_schema_ref.as_ref(),
684        )?;
685
686        assert!(!result);
687        Ok(())
688    }
689
690    #[test]
691    fn eval_false_for_meta_all_nulls_filter_inequality() -> Result<()> {
692        let row_group_metadata = create_row_group_metadata(
693            1,
694            1,
695            Some(Statistics::float(None, None, None, Some(1), false)),
696            1,
697            None,
698        )?;
699
700        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
701
702        let filter = Reference::new("col_float")
703            .greater_than(Datum::float(1.0))
704            .bind(iceberg_schema_ref.clone(), false)?;
705
706        let result = RowGroupMetricsEvaluator::eval(
707            &filter,
708            &row_group_metadata,
709            &field_id_map,
710            iceberg_schema_ref.as_ref(),
711        )?;
712
713        assert!(!result);
714        Ok(())
715    }
716
717    #[test]
718    fn eval_true_for_datum_nan_filter_inequality() -> Result<()> {
719        let row_group_metadata = create_row_group_metadata(
720            1,
721            1,
722            Some(Statistics::float(
723                Some(0.0),
724                Some(2.0),
725                None,
726                Some(0),
727                false,
728            )),
729            1,
730            None,
731        )?;
732
733        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
734
735        let filter = Reference::new("col_float")
736            .greater_than(Datum::float(f32::NAN))
737            .bind(iceberg_schema_ref.clone(), false)?;
738
739        let result = RowGroupMetricsEvaluator::eval(
740            &filter,
741            &row_group_metadata,
742            &field_id_map,
743            iceberg_schema_ref.as_ref(),
744        )?;
745
746        assert!(result);
747        Ok(())
748    }
749
750    #[test]
751    fn eval_true_for_meta_missing_bound_valid_other_bound_filter_inequality() -> Result<()> {
752        let row_group_metadata = create_row_group_metadata(
753            1,
754            1,
755            Some(Statistics::float(None, Some(2.0), None, Some(0), false)),
756            1,
757            None,
758        )?;
759
760        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
761
762        let filter = Reference::new("col_float")
763            .greater_than(Datum::float(1.0))
764            .bind(iceberg_schema_ref.clone(), false)?;
765
766        let result = RowGroupMetricsEvaluator::eval(
767            &filter,
768            &row_group_metadata,
769            &field_id_map,
770            iceberg_schema_ref.as_ref(),
771        )?;
772
773        assert!(result);
774        Ok(())
775    }
776
777    #[test]
778    fn eval_false_for_meta_failing_bound_filter_inequality() -> Result<()> {
779        let row_group_metadata = create_row_group_metadata(
780            1,
781            1,
782            Some(Statistics::float(
783                Some(0.0),
784                Some(0.9),
785                None,
786                Some(0),
787                false,
788            )),
789            1,
790            None,
791        )?;
792
793        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
794
795        let filter = Reference::new("col_float")
796            .greater_than(Datum::float(1.0))
797            .bind(iceberg_schema_ref.clone(), false)?;
798
799        let result = RowGroupMetricsEvaluator::eval(
800            &filter,
801            &row_group_metadata,
802            &field_id_map,
803            iceberg_schema_ref.as_ref(),
804        )?;
805
806        assert!(!result);
807        Ok(())
808    }
809
810    #[test]
811    fn eval_true_for_meta_passing_bound_filter_inequality() -> Result<()> {
812        let row_group_metadata = create_row_group_metadata(
813            1,
814            1,
815            Some(Statistics::float(
816                Some(0.0),
817                Some(2.0),
818                None,
819                Some(0),
820                false,
821            )),
822            1,
823            None,
824        )?;
825
826        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
827
828        let filter = Reference::new("col_float")
829            .greater_than(Datum::float(1.0))
830            .bind(iceberg_schema_ref.clone(), false)?;
831
832        let result = RowGroupMetricsEvaluator::eval(
833            &filter,
834            &row_group_metadata,
835            &field_id_map,
836            iceberg_schema_ref.as_ref(),
837        )?;
838
839        assert!(result);
840        Ok(())
841    }
842
843    #[test]
844    fn eval_false_for_meta_all_nulls_filter_eq() -> Result<()> {
845        let row_group_metadata = create_row_group_metadata(
846            1,
847            1,
848            Some(Statistics::float(None, None, None, Some(1), false)),
849            1,
850            None,
851        )?;
852
853        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
854
855        let filter = Reference::new("col_float")
856            .equal_to(Datum::float(1.0))
857            .bind(iceberg_schema_ref.clone(), false)?;
858
859        let result = RowGroupMetricsEvaluator::eval(
860            &filter,
861            &row_group_metadata,
862            &field_id_map,
863            iceberg_schema_ref.as_ref(),
864        )?;
865
866        assert!(!result);
867        Ok(())
868    }
869
870    #[test]
871    fn eval_true_for_meta_lower_nan_filter_eq() -> Result<()> {
872        let row_group_metadata = create_row_group_metadata(
873            1,
874            1,
875            Some(Statistics::float(
876                Some(f32::NAN),
877                Some(2.0),
878                None,
879                Some(0),
880                false,
881            )),
882            1,
883            None,
884        )?;
885
886        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
887
888        let filter = Reference::new("col_float")
889            .equal_to(Datum::float(1.0))
890            .bind(iceberg_schema_ref.clone(), false)?;
891
892        let result = RowGroupMetricsEvaluator::eval(
893            &filter,
894            &row_group_metadata,
895            &field_id_map,
896            iceberg_schema_ref.as_ref(),
897        )?;
898
899        assert!(result);
900        Ok(())
901    }
902
903    #[test]
904    fn eval_false_for_meta_lower_gt_than_datum_filter_eq() -> Result<()> {
905        let row_group_metadata = create_row_group_metadata(
906            1,
907            1,
908            Some(Statistics::float(
909                Some(1.5),
910                Some(2.0),
911                None,
912                Some(0),
913                false,
914            )),
915            1,
916            None,
917        )?;
918
919        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
920
921        let filter = Reference::new("col_float")
922            .equal_to(Datum::float(1.0))
923            .bind(iceberg_schema_ref.clone(), false)?;
924
925        let result = RowGroupMetricsEvaluator::eval(
926            &filter,
927            &row_group_metadata,
928            &field_id_map,
929            iceberg_schema_ref.as_ref(),
930        )?;
931
932        assert!(!result);
933        Ok(())
934    }
935
936    #[test]
937    fn eval_true_for_meta_upper_nan_filter_eq() -> Result<()> {
938        let row_group_metadata = create_row_group_metadata(
939            1,
940            1,
941            Some(Statistics::float(
942                Some(0.0),
943                Some(f32::NAN),
944                None,
945                Some(0),
946                false,
947            )),
948            1,
949            None,
950        )?;
951
952        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
953
954        let filter = Reference::new("col_float")
955            .equal_to(Datum::float(1.0))
956            .bind(iceberg_schema_ref.clone(), false)?;
957
958        let result = RowGroupMetricsEvaluator::eval(
959            &filter,
960            &row_group_metadata,
961            &field_id_map,
962            iceberg_schema_ref.as_ref(),
963        )?;
964
965        assert!(result);
966        Ok(())
967    }
968
969    #[test]
970    fn eval_false_for_meta_upper_lt_than_datum_filter_eq() -> Result<()> {
971        let row_group_metadata = create_row_group_metadata(
972            1,
973            1,
974            Some(Statistics::float(
975                Some(0.0),
976                Some(0.5),
977                None,
978                Some(0),
979                false,
980            )),
981            1,
982            None,
983        )?;
984
985        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
986
987        let filter = Reference::new("col_float")
988            .equal_to(Datum::float(1.0))
989            .bind(iceberg_schema_ref.clone(), false)?;
990
991        let result = RowGroupMetricsEvaluator::eval(
992            &filter,
993            &row_group_metadata,
994            &field_id_map,
995            iceberg_schema_ref.as_ref(),
996        )?;
997
998        assert!(!result);
999        Ok(())
1000    }
1001
1002    #[test]
1003    fn eval_true_for_meta_good_bounds_than_datum_filter_eq() -> Result<()> {
1004        let row_group_metadata = create_row_group_metadata(
1005            1,
1006            1,
1007            Some(Statistics::float(
1008                Some(0.0),
1009                Some(2.0),
1010                None,
1011                Some(0),
1012                false,
1013            )),
1014            1,
1015            None,
1016        )?;
1017
1018        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1019
1020        let filter = Reference::new("col_float")
1021            .equal_to(Datum::float(1.0))
1022            .bind(iceberg_schema_ref.clone(), false)?;
1023
1024        let result = RowGroupMetricsEvaluator::eval(
1025            &filter,
1026            &row_group_metadata,
1027            &field_id_map,
1028            iceberg_schema_ref.as_ref(),
1029        )?;
1030
1031        assert!(result);
1032        Ok(())
1033    }
1034
1035    #[test]
1036    fn eval_true_for_meta_bounds_eq_datum_filter_neq() -> Result<()> {
1037        let row_group_metadata = create_row_group_metadata(
1038            1,
1039            1,
1040            Some(Statistics::float(
1041                Some(1.0),
1042                Some(1.0),
1043                None,
1044                Some(0),
1045                false,
1046            )),
1047            1,
1048            None,
1049        )?;
1050
1051        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1052
1053        let filter = Reference::new("col_float")
1054            .not_equal_to(Datum::float(1.0))
1055            .bind(iceberg_schema_ref.clone(), false)?;
1056
1057        let result = RowGroupMetricsEvaluator::eval(
1058            &filter,
1059            &row_group_metadata,
1060            &field_id_map,
1061            iceberg_schema_ref.as_ref(),
1062        )?;
1063
1064        assert!(result);
1065        Ok(())
1066    }
1067
1068    #[test]
1069    fn eval_false_for_meta_all_nulls_filter_starts_with() -> Result<()> {
1070        let row_group_metadata = create_row_group_metadata(
1071            1,
1072            1,
1073            None,
1074            1,
1075            Some(Statistics::byte_array(None, None, None, Some(1), false)),
1076        )?;
1077
1078        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1079
1080        let filter = Reference::new("col_string")
1081            .starts_with(Datum::string("iceberg"))
1082            .bind(iceberg_schema_ref.clone(), false)?;
1083
1084        let result = RowGroupMetricsEvaluator::eval(
1085            &filter,
1086            &row_group_metadata,
1087            &field_id_map,
1088            iceberg_schema_ref.as_ref(),
1089        )?;
1090
1091        assert!(!result);
1092        Ok(())
1093    }
1094
1095    #[test]
1096    fn eval_error_for_starts_with_non_string_filter_datum() -> Result<()> {
1097        let row_group_metadata = create_row_group_metadata(
1098            1,
1099            1,
1100            None,
1101            1,
1102            Some(Statistics::byte_array(None, None, None, Some(0), false)),
1103        )?;
1104
1105        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1106
1107        let filter = Reference::new("col_float")
1108            .starts_with(Datum::float(1.0))
1109            .bind(iceberg_schema_ref.clone(), false)?;
1110
1111        let result = RowGroupMetricsEvaluator::eval(
1112            &filter,
1113            &row_group_metadata,
1114            &field_id_map,
1115            iceberg_schema_ref.as_ref(),
1116        );
1117
1118        assert!(result.is_err());
1119        Ok(())
1120    }
1121
1122    #[test]
1123    fn eval_error_for_starts_with_non_utf8_lower_bound() -> Result<()> {
1124        let row_group_metadata = create_row_group_metadata(
1125            1,
1126            1,
1127            None,
1128            1,
1129            // min val of 0xff is not valid utf-8 string. Max val of 0x20 is valid utf8
1130            Some(Statistics::byte_array(
1131                Some(ByteArray::from(vec![255u8])),
1132                Some(ByteArray::from(vec![32u8])),
1133                None,
1134                Some(0),
1135                false,
1136            )),
1137        )?;
1138
1139        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1140
1141        let filter = Reference::new("col_string")
1142            .starts_with(Datum::string("iceberg"))
1143            .bind(iceberg_schema_ref.clone(), false)?;
1144
1145        let result = RowGroupMetricsEvaluator::eval(
1146            &filter,
1147            &row_group_metadata,
1148            &field_id_map,
1149            iceberg_schema_ref.as_ref(),
1150        );
1151
1152        assert!(result.is_err());
1153        Ok(())
1154    }
1155
1156    #[test]
1157    fn eval_error_for_starts_with_non_utf8_upper_bound() -> Result<()> {
1158        let row_group_metadata = create_row_group_metadata(
1159            1,
1160            1,
1161            None,
1162            1,
1163            // Max val of 0xFF is not valid utf8
1164            Some(Statistics::byte_array(
1165                Some(ByteArray::from("ice".as_bytes())),
1166                Some(ByteArray::from(vec![255u8])),
1167                None,
1168                Some(0),
1169                false,
1170            )),
1171        )?;
1172
1173        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1174
1175        let filter = Reference::new("col_string")
1176            .starts_with(Datum::string("iceberg"))
1177            .bind(iceberg_schema_ref.clone(), false)?;
1178
1179        let result = RowGroupMetricsEvaluator::eval(
1180            &filter,
1181            &row_group_metadata,
1182            &field_id_map,
1183            iceberg_schema_ref.as_ref(),
1184        );
1185
1186        assert!(result.is_err());
1187        Ok(())
1188    }
1189
1190    #[test]
1191    fn eval_false_for_starts_with_meta_all_nulls() -> Result<()> {
1192        let row_group_metadata = create_row_group_metadata(
1193            1,
1194            1,
1195            None,
1196            1,
1197            // Max val of 0xFF is not valid utf8
1198            Some(Statistics::byte_array(None, None, None, Some(1), false)),
1199        )?;
1200
1201        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1202
1203        let filter = Reference::new("col_string")
1204            .starts_with(Datum::string("iceberg"))
1205            .bind(iceberg_schema_ref.clone(), false)?;
1206
1207        let result = RowGroupMetricsEvaluator::eval(
1208            &filter,
1209            &row_group_metadata,
1210            &field_id_map,
1211            iceberg_schema_ref.as_ref(),
1212        )?;
1213
1214        assert!(!result);
1215        Ok(())
1216    }
1217
1218    #[test]
1219    fn eval_false_for_starts_with_datum_below_min_bound() -> Result<()> {
1220        let row_group_metadata = create_row_group_metadata(
1221            1,
1222            1,
1223            None,
1224            1,
1225            // Max val of 0xFF is not valid utf8
1226            Some(Statistics::byte_array(
1227                Some(ByteArray::from("id".as_bytes())),
1228                Some(ByteArray::from("ie".as_bytes())),
1229                None,
1230                Some(0),
1231                false,
1232            )),
1233        )?;
1234
1235        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1236
1237        let filter = Reference::new("col_string")
1238            .starts_with(Datum::string("iceberg"))
1239            .bind(iceberg_schema_ref.clone(), false)?;
1240
1241        let result = RowGroupMetricsEvaluator::eval(
1242            &filter,
1243            &row_group_metadata,
1244            &field_id_map,
1245            iceberg_schema_ref.as_ref(),
1246        )?;
1247
1248        assert!(!result);
1249        Ok(())
1250    }
1251
1252    #[test]
1253    fn eval_false_for_starts_with_datum_above_max_bound() -> Result<()> {
1254        let row_group_metadata = create_row_group_metadata(
1255            1,
1256            1,
1257            None,
1258            1,
1259            // Max val of 0xFF is not valid utf8
1260            Some(Statistics::byte_array(
1261                Some(ByteArray::from("h".as_bytes())),
1262                Some(ByteArray::from("ib".as_bytes())),
1263                None,
1264                Some(0),
1265                false,
1266            )),
1267        )?;
1268
1269        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1270
1271        let filter = Reference::new("col_string")
1272            .starts_with(Datum::string("iceberg"))
1273            .bind(iceberg_schema_ref.clone(), false)?;
1274
1275        let result = RowGroupMetricsEvaluator::eval(
1276            &filter,
1277            &row_group_metadata,
1278            &field_id_map,
1279            iceberg_schema_ref.as_ref(),
1280        )?;
1281
1282        assert!(!result);
1283        Ok(())
1284    }
1285
1286    #[test]
1287    fn eval_true_for_starts_with_datum_between_bounds() -> Result<()> {
1288        let row_group_metadata = create_row_group_metadata(
1289            1,
1290            1,
1291            None,
1292            1,
1293            // Max val of 0xFF is not valid utf8
1294            Some(Statistics::byte_array(
1295                Some(ByteArray::from("h".as_bytes())),
1296                Some(ByteArray::from("j".as_bytes())),
1297                None,
1298                Some(0),
1299                false,
1300            )),
1301        )?;
1302
1303        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1304
1305        let filter = Reference::new("col_string")
1306            .starts_with(Datum::string("iceberg"))
1307            .bind(iceberg_schema_ref.clone(), false)?;
1308
1309        let result = RowGroupMetricsEvaluator::eval(
1310            &filter,
1311            &row_group_metadata,
1312            &field_id_map,
1313            iceberg_schema_ref.as_ref(),
1314        )?;
1315
1316        assert!(result);
1317        Ok(())
1318    }
1319
1320    #[test]
1321    fn eval_true_for_meta_all_nulls_filter_not_starts_with() -> Result<()> {
1322        let row_group_metadata = create_row_group_metadata(
1323            1,
1324            1,
1325            None,
1326            1,
1327            Some(Statistics::byte_array(None, None, None, Some(1), false)),
1328        )?;
1329
1330        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1331
1332        let filter = Reference::new("col_string")
1333            .not_starts_with(Datum::string("iceberg"))
1334            .bind(iceberg_schema_ref.clone(), false)?;
1335
1336        let result = RowGroupMetricsEvaluator::eval(
1337            &filter,
1338            &row_group_metadata,
1339            &field_id_map,
1340            iceberg_schema_ref.as_ref(),
1341        )?;
1342
1343        assert!(result);
1344        Ok(())
1345    }
1346
1347    #[test]
1348    fn eval_error_for_not_starts_with_non_utf8_lower_bound() -> Result<()> {
1349        let row_group_metadata = create_row_group_metadata(
1350            1,
1351            1,
1352            None,
1353            1,
1354            // min val of 0xff is not valid utf-8 string. Max val of 0x20 is valid utf8
1355            Some(Statistics::byte_array(
1356                Some(ByteArray::from(vec![255u8])),
1357                Some(ByteArray::from(vec![32u8])),
1358                None,
1359                Some(0),
1360                false,
1361            )),
1362        )?;
1363
1364        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1365
1366        let filter = Reference::new("col_string")
1367            .not_starts_with(Datum::string("iceberg"))
1368            .bind(iceberg_schema_ref.clone(), false)?;
1369
1370        let result = RowGroupMetricsEvaluator::eval(
1371            &filter,
1372            &row_group_metadata,
1373            &field_id_map,
1374            iceberg_schema_ref.as_ref(),
1375        );
1376
1377        assert!(result.is_err());
1378        Ok(())
1379    }
1380
1381    #[test]
1382    fn eval_error_for_not_starts_with_non_utf8_upper_bound() -> Result<()> {
1383        let row_group_metadata = create_row_group_metadata(
1384            1,
1385            1,
1386            None,
1387            1,
1388            // Max val of 0xFF is not valid utf8
1389            Some(Statistics::byte_array(
1390                Some(ByteArray::from("iceberg".as_bytes())),
1391                Some(ByteArray::from(vec![255u8])),
1392                None,
1393                Some(0),
1394                false,
1395            )),
1396        )?;
1397
1398        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1399
1400        let filter = Reference::new("col_string")
1401            .not_starts_with(Datum::string("iceberg"))
1402            .bind(iceberg_schema_ref.clone(), false)?;
1403
1404        let result = RowGroupMetricsEvaluator::eval(
1405            &filter,
1406            &row_group_metadata,
1407            &field_id_map,
1408            iceberg_schema_ref.as_ref(),
1409        );
1410
1411        assert!(result.is_err());
1412        Ok(())
1413    }
1414
1415    #[test]
1416    fn eval_true_for_not_starts_with_no_min_bound() -> Result<()> {
1417        let row_group_metadata = create_row_group_metadata(
1418            1,
1419            1,
1420            None,
1421            1,
1422            // Max val of 0xFF is not valid utf8
1423            Some(Statistics::byte_array(
1424                None,
1425                Some(ByteArray::from("iceberg".as_bytes())),
1426                None,
1427                Some(0),
1428                false,
1429            )),
1430        )?;
1431
1432        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1433
1434        let filter = Reference::new("col_string")
1435            .not_starts_with(Datum::string("iceberg"))
1436            .bind(iceberg_schema_ref.clone(), false)?;
1437
1438        let result = RowGroupMetricsEvaluator::eval(
1439            &filter,
1440            &row_group_metadata,
1441            &field_id_map,
1442            iceberg_schema_ref.as_ref(),
1443        )?;
1444
1445        assert!(result);
1446        Ok(())
1447    }
1448
1449    #[test]
1450    fn eval_true_for_not_starts_with_datum_longer_min_max_bound() -> Result<()> {
1451        let row_group_metadata = create_row_group_metadata(
1452            1,
1453            1,
1454            None,
1455            1,
1456            // Max val of 0xFF is not valid utf8
1457            Some(Statistics::byte_array(
1458                Some(ByteArray::from("ice".as_bytes())),
1459                Some(ByteArray::from("iceberg".as_bytes())),
1460                None,
1461                Some(0),
1462                false,
1463            )),
1464        )?;
1465
1466        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1467
1468        let filter = Reference::new("col_string")
1469            .not_starts_with(Datum::string("iceberg"))
1470            .bind(iceberg_schema_ref.clone(), false)?;
1471
1472        let result = RowGroupMetricsEvaluator::eval(
1473            &filter,
1474            &row_group_metadata,
1475            &field_id_map,
1476            iceberg_schema_ref.as_ref(),
1477        )?;
1478
1479        assert!(result);
1480        Ok(())
1481    }
1482
1483    #[test]
1484    fn eval_true_for_not_starts_with_datum_matches_lower_no_upper() -> Result<()> {
1485        let row_group_metadata = create_row_group_metadata(
1486            1,
1487            1,
1488            None,
1489            1,
1490            // Max val of 0xFF is not valid utf8
1491            Some(Statistics::byte_array(
1492                Some(ByteArray::from("iceberg".as_bytes())),
1493                None,
1494                None,
1495                Some(0),
1496                false,
1497            )),
1498        )?;
1499
1500        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1501
1502        let filter = Reference::new("col_string")
1503            .not_starts_with(Datum::string("iceberg"))
1504            .bind(iceberg_schema_ref.clone(), false)?;
1505
1506        let result = RowGroupMetricsEvaluator::eval(
1507            &filter,
1508            &row_group_metadata,
1509            &field_id_map,
1510            iceberg_schema_ref.as_ref(),
1511        )?;
1512
1513        assert!(result);
1514        Ok(())
1515    }
1516
1517    #[test]
1518    fn eval_true_for_not_starts_with_datum_matches_lower_upper_shorter() -> Result<()> {
1519        let row_group_metadata = create_row_group_metadata(
1520            1,
1521            1,
1522            None,
1523            1,
1524            // Max val of 0xFF is not valid utf8
1525            Some(Statistics::byte_array(
1526                Some(ByteArray::from("iceberg".as_bytes())),
1527                Some(ByteArray::from("icy".as_bytes())),
1528                None,
1529                Some(0),
1530                false,
1531            )),
1532        )?;
1533
1534        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1535
1536        let filter = Reference::new("col_string")
1537            .not_starts_with(Datum::string("iceberg"))
1538            .bind(iceberg_schema_ref.clone(), false)?;
1539
1540        let result = RowGroupMetricsEvaluator::eval(
1541            &filter,
1542            &row_group_metadata,
1543            &field_id_map,
1544            iceberg_schema_ref.as_ref(),
1545        )?;
1546
1547        assert!(result);
1548        Ok(())
1549    }
1550
1551    #[test]
1552    fn eval_false_for_not_starts_with_datum_matches_lower_and_upper() -> Result<()> {
1553        let row_group_metadata = create_row_group_metadata(
1554            1,
1555            1,
1556            None,
1557            1,
1558            // Max val of 0xFF is not valid utf8
1559            Some(Statistics::byte_array(
1560                Some(ByteArray::from("iceberg".as_bytes())),
1561                Some(ByteArray::from("iceberg".as_bytes())),
1562                None,
1563                Some(0),
1564                false,
1565            )),
1566        )?;
1567
1568        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1569
1570        let filter = Reference::new("col_string")
1571            .not_starts_with(Datum::string("iceberg"))
1572            .bind(iceberg_schema_ref.clone(), false)?;
1573
1574        let result = RowGroupMetricsEvaluator::eval(
1575            &filter,
1576            &row_group_metadata,
1577            &field_id_map,
1578            iceberg_schema_ref.as_ref(),
1579        )?;
1580
1581        assert!(!result);
1582        Ok(())
1583    }
1584
1585    #[test]
1586    fn eval_false_for_meta_all_nulls_filter_is_in() -> Result<()> {
1587        let row_group_metadata = create_row_group_metadata(
1588            1,
1589            1,
1590            None,
1591            1,
1592            Some(Statistics::byte_array(
1593                Some(ByteArray::from("iceberg".as_bytes())),
1594                Some(ByteArray::from("iceberg".as_bytes())),
1595                None,
1596                Some(1),
1597                false,
1598            )),
1599        )?;
1600
1601        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1602
1603        let filter = Reference::new("col_string")
1604            .is_in([Datum::string("ice"), Datum::string("berg")])
1605            .bind(iceberg_schema_ref.clone(), false)?;
1606
1607        let result = RowGroupMetricsEvaluator::eval(
1608            &filter,
1609            &row_group_metadata,
1610            &field_id_map,
1611            iceberg_schema_ref.as_ref(),
1612        )?;
1613
1614        assert!(!result);
1615        Ok(())
1616    }
1617
1618    #[test]
1619    fn eval_true_for_too_many_literals_filter_is_in() -> Result<()> {
1620        let mut rng = thread_rng();
1621
1622        let row_group_metadata = create_row_group_metadata(
1623            1,
1624            1,
1625            Some(Statistics::float(
1626                Some(11.0),
1627                Some(12.0),
1628                None,
1629                Some(0),
1630                false,
1631            )),
1632            1,
1633            None,
1634        )?;
1635
1636        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1637
1638        let filter = Reference::new("col_float")
1639            .is_in(std::iter::repeat_with(|| Datum::float(rng.gen_range(0.0..10.0))).take(1000))
1640            .bind(iceberg_schema_ref.clone(), false)?;
1641
1642        let result = RowGroupMetricsEvaluator::eval(
1643            &filter,
1644            &row_group_metadata,
1645            &field_id_map,
1646            iceberg_schema_ref.as_ref(),
1647        )?;
1648
1649        assert!(result);
1650        Ok(())
1651    }
1652
1653    #[test]
1654    fn eval_true_for_missing_bounds_filter_is_in() -> Result<()> {
1655        let row_group_metadata = create_row_group_metadata(
1656            1,
1657            1,
1658            None,
1659            1,
1660            Some(Statistics::byte_array(None, None, None, Some(0), false)),
1661        )?;
1662
1663        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1664
1665        let filter = Reference::new("col_string")
1666            .is_in([Datum::string("ice")])
1667            .bind(iceberg_schema_ref.clone(), false)?;
1668
1669        let result = RowGroupMetricsEvaluator::eval(
1670            &filter,
1671            &row_group_metadata,
1672            &field_id_map,
1673            iceberg_schema_ref.as_ref(),
1674        )?;
1675
1676        assert!(result);
1677        Ok(())
1678    }
1679
1680    #[test]
1681    fn eval_true_for_lower_bound_is_nan_filter_is_in() -> Result<()> {
1682        // TODO: should this be false, since the max stat
1683        //       is lower than the min val in the set?
1684        let row_group_metadata = create_row_group_metadata(
1685            1,
1686            1,
1687            Some(Statistics::float(
1688                Some(f32::NAN),
1689                Some(1.0),
1690                None,
1691                Some(0),
1692                false,
1693            )),
1694            1,
1695            None,
1696        )?;
1697
1698        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1699
1700        let filter = Reference::new("col_float")
1701            .is_in([Datum::float(2.0), Datum::float(3.0)])
1702            .bind(iceberg_schema_ref.clone(), false)?;
1703
1704        let result = RowGroupMetricsEvaluator::eval(
1705            &filter,
1706            &row_group_metadata,
1707            &field_id_map,
1708            iceberg_schema_ref.as_ref(),
1709        )?;
1710
1711        assert!(result);
1712        Ok(())
1713    }
1714
1715    #[test]
1716    fn eval_false_for_lower_bound_greater_than_all_vals_is_in() -> Result<()> {
1717        let row_group_metadata = create_row_group_metadata(
1718            1,
1719            1,
1720            Some(Statistics::float(Some(4.0), None, None, Some(0), false)),
1721            1,
1722            None,
1723        )?;
1724
1725        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1726
1727        let filter = Reference::new("col_float")
1728            .is_in([Datum::float(2.0), Datum::float(3.0)])
1729            .bind(iceberg_schema_ref.clone(), false)?;
1730
1731        let result = RowGroupMetricsEvaluator::eval(
1732            &filter,
1733            &row_group_metadata,
1734            &field_id_map,
1735            iceberg_schema_ref.as_ref(),
1736        )?;
1737
1738        assert!(!result);
1739        Ok(())
1740    }
1741
1742    #[test]
1743    fn eval_true_for_nan_upper_bound_is_in() -> Result<()> {
1744        let row_group_metadata = create_row_group_metadata(
1745            1,
1746            1,
1747            Some(Statistics::float(
1748                Some(0.0),
1749                Some(f32::NAN),
1750                None,
1751                Some(0),
1752                false,
1753            )),
1754            1,
1755            None,
1756        )?;
1757
1758        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1759
1760        let filter = Reference::new("col_float")
1761            .is_in([Datum::float(2.0), Datum::float(3.0)])
1762            .bind(iceberg_schema_ref.clone(), false)?;
1763
1764        let result = RowGroupMetricsEvaluator::eval(
1765            &filter,
1766            &row_group_metadata,
1767            &field_id_map,
1768            iceberg_schema_ref.as_ref(),
1769        )?;
1770
1771        assert!(result);
1772        Ok(())
1773    }
1774
1775    #[test]
1776    fn eval_false_for_upper_bound_below_all_vals_is_in() -> Result<()> {
1777        let row_group_metadata = create_row_group_metadata(
1778            1,
1779            1,
1780            Some(Statistics::float(
1781                Some(0.0),
1782                Some(1.0),
1783                None,
1784                Some(0),
1785                false,
1786            )),
1787            1,
1788            None,
1789        )?;
1790
1791        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1792
1793        let filter = Reference::new("col_float")
1794            .is_in([Datum::float(2.0), Datum::float(3.0)])
1795            .bind(iceberg_schema_ref.clone(), false)?;
1796
1797        let result = RowGroupMetricsEvaluator::eval(
1798            &filter,
1799            &row_group_metadata,
1800            &field_id_map,
1801            iceberg_schema_ref.as_ref(),
1802        )?;
1803
1804        assert!(!result);
1805        Ok(())
1806    }
1807
1808    #[test]
1809    fn eval_true_for_not_in() -> Result<()> {
1810        let row_group_metadata = create_row_group_metadata(
1811            1,
1812            1,
1813            None,
1814            1,
1815            // Max val of 0xFF is not valid utf8
1816            Some(Statistics::byte_array(
1817                Some(ByteArray::from("iceberg".as_bytes())),
1818                Some(ByteArray::from("iceberg".as_bytes())),
1819                None,
1820                Some(0),
1821                false,
1822            )),
1823        )?;
1824
1825        let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1826
1827        let filter = Reference::new("col_string")
1828            .is_not_in([Datum::string("iceberg")])
1829            .bind(iceberg_schema_ref.clone(), false)?;
1830
1831        let result = RowGroupMetricsEvaluator::eval(
1832            &filter,
1833            &row_group_metadata,
1834            &field_id_map,
1835            iceberg_schema_ref.as_ref(),
1836        )?;
1837
1838        assert!(result);
1839        Ok(())
1840    }
1841
1842    fn build_iceberg_schema_and_field_map() -> Result<(Arc<Schema>, HashMap<i32, usize>)> {
1843        let iceberg_schema = Schema::builder()
1844            .with_fields([
1845                Arc::new(NestedField::new(
1846                    1,
1847                    "col_float",
1848                    Type::Primitive(PrimitiveType::Float),
1849                    false,
1850                )),
1851                Arc::new(NestedField::new(
1852                    2,
1853                    "col_string",
1854                    Type::Primitive(PrimitiveType::String),
1855                    false,
1856                )),
1857            ])
1858            .build()?;
1859        let iceberg_schema_ref = Arc::new(iceberg_schema);
1860
1861        let field_id_map = HashMap::from_iter([(1, 0), (2, 1)]);
1862
1863        Ok((iceberg_schema_ref, field_id_map))
1864    }
1865
1866    fn build_parquet_schema_descriptor() -> Result<Arc<SchemaDescriptor>> {
1867        let field_1 = Arc::new(
1868            parquetSchemaType::primitive_type_builder("col_float", ParquetPhysicalType::FLOAT)
1869                .with_id(Some(1))
1870                .build()?,
1871        );
1872
1873        let field_2 = Arc::new(
1874            parquetSchemaType::primitive_type_builder(
1875                "col_string",
1876                ParquetPhysicalType::BYTE_ARRAY,
1877            )
1878            .with_id(Some(2))
1879            .with_logical_type(Some(ParquetLogicalType::String))
1880            .build()?,
1881        );
1882
1883        let group_type = Arc::new(
1884            parquetSchemaType::group_type_builder("all")
1885                .with_id(Some(1000))
1886                .with_fields(vec![field_1, field_2])
1887                .build()?,
1888        );
1889
1890        let schema_descriptor = SchemaDescriptor::new(group_type);
1891        let schema_descriptor_arc = Arc::new(schema_descriptor);
1892        Ok(schema_descriptor_arc)
1893    }
1894
1895    fn create_row_group_metadata(
1896        num_rows: i64,
1897        col_1_num_vals: i64,
1898        col_1_stats: Option<Statistics>,
1899        col_2_num_vals: i64,
1900        col_2_stats: Option<Statistics>,
1901    ) -> Result<RowGroupMetaData> {
1902        let schema_descriptor_arc = build_parquet_schema_descriptor()?;
1903
1904        let column_1_desc_ptr = Arc::new(ColumnDescriptor::new(
1905            schema_descriptor_arc.column(0).self_type_ptr(),
1906            1,
1907            1,
1908            ColumnPath::new(vec!["col_float".to_string()]),
1909        ));
1910
1911        let column_2_desc_ptr = Arc::new(ColumnDescriptor::new(
1912            schema_descriptor_arc.column(1).self_type_ptr(),
1913            1,
1914            1,
1915            ColumnPath::new(vec!["col_string".to_string()]),
1916        ));
1917
1918        let mut col_1_meta =
1919            ColumnChunkMetaData::builder(column_1_desc_ptr).set_num_values(col_1_num_vals);
1920        if let Some(stats1) = col_1_stats {
1921            col_1_meta = col_1_meta.set_statistics(stats1)
1922        }
1923
1924        let mut col_2_meta =
1925            ColumnChunkMetaData::builder(column_2_desc_ptr).set_num_values(col_2_num_vals);
1926        if let Some(stats2) = col_2_stats {
1927            col_2_meta = col_2_meta.set_statistics(stats2)
1928        }
1929
1930        let row_group_metadata = RowGroupMetaData::builder(schema_descriptor_arc)
1931            .set_num_rows(num_rows)
1932            .set_column_metadata(vec![
1933                col_1_meta.build()?,
1934                // .set_statistics(Statistics::float(None, None, None, Some(1), false))
1935                col_2_meta.build()?,
1936            ])
1937            .build();
1938
1939        Ok(row_group_metadata?)
1940    }
1941}