1use std::collections::HashMap;
21
22use fnv::FnvHashSet;
23use ordered_float::OrderedFloat;
24use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
25use parquet::file::metadata::RowGroupMetaData;
26use parquet::file::page_index::column_index::ColumnIndexMetaData;
27use parquet::file::page_index::offset_index::OffsetIndexMetaData;
28
29use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit};
30use crate::expr::{BoundPredicate, BoundReference};
31use crate::spec::{Datum, PrimitiveLiteral, PrimitiveType, Schema};
32use crate::{Error, ErrorKind, Result};
33
34type OffsetIndex = Vec<OffsetIndexMetaData>;
35
36const IN_PREDICATE_LIMIT: usize = 200;
37
38enum MissingColBehavior {
39 CantMatch,
40 MightMatch,
41}
42
43enum PageNullCount {
44 AllNull,
45 NoneNull,
46 SomeNull,
47 Unknown,
48}
49
50impl PageNullCount {
51 fn from_row_and_null_counts(num_rows: usize, null_count: Option<i64>) -> Self {
52 match (num_rows, null_count) {
53 (x, Some(y)) if x == y as usize => PageNullCount::AllNull,
54 (_, Some(0)) => PageNullCount::NoneNull,
55 (_, Some(_)) => PageNullCount::SomeNull,
56 _ => PageNullCount::Unknown,
57 }
58 }
59}
60
61pub(crate) struct PageIndexEvaluator<'a> {
62 column_index: &'a [ColumnIndexMetaData],
63 offset_index: &'a OffsetIndex,
64 row_group_metadata: &'a RowGroupMetaData,
65 iceberg_field_id_to_parquet_column_index: &'a HashMap<i32, usize>,
66 snapshot_schema: &'a Schema,
67 row_count_cache: HashMap<usize, Vec<usize>>,
68}
69
70impl<'a> PageIndexEvaluator<'a> {
71 pub(crate) fn new(
72 column_index: &'a [ColumnIndexMetaData],
73 offset_index: &'a OffsetIndex,
74 row_group_metadata: &'a RowGroupMetaData,
75 field_id_map: &'a HashMap<i32, usize>,
76 snapshot_schema: &'a Schema,
77 ) -> Self {
78 Self {
79 column_index,
80 offset_index,
81 row_group_metadata,
82 iceberg_field_id_to_parquet_column_index: field_id_map,
83 snapshot_schema,
84 row_count_cache: HashMap::new(),
85 }
86 }
87
88 pub(crate) fn eval(
94 filter: &'a BoundPredicate,
95 column_index: &'a [ColumnIndexMetaData],
96 offset_index: &'a OffsetIndex,
97 row_group_metadata: &'a RowGroupMetaData,
98 field_id_map: &'a HashMap<i32, usize>,
99 snapshot_schema: &'a Schema,
100 ) -> Result<Vec<RowSelector>> {
101 if row_group_metadata.num_rows() == 0 {
102 return Ok(vec![]);
103 }
104
105 let mut evaluator = Self::new(
106 column_index,
107 offset_index,
108 row_group_metadata,
109 field_id_map,
110 snapshot_schema,
111 );
112
113 Ok(visit(&mut evaluator, filter)?.iter().copied().collect())
114 }
115
116 fn select_all_rows(&self) -> Result<RowSelection> {
117 Ok(vec![RowSelector::select(
118 self.row_group_metadata.num_rows() as usize
119 )]
120 .into())
121 }
122
123 fn skip_all_rows(&self) -> Result<RowSelection> {
124 Ok(vec![RowSelector::skip(
125 self.row_group_metadata.num_rows() as usize
126 )]
127 .into())
128 }
129
130 fn calc_row_selection<F>(
131 &mut self,
132 field_id: i32,
133 predicate: F,
134 missing_col_behavior: MissingColBehavior,
135 ) -> Result<RowSelection>
136 where
137 F: Fn(Option<Datum>, Option<Datum>, PageNullCount) -> Result<bool>,
138 {
139 let Some(&parquet_column_index) =
140 self.iceberg_field_id_to_parquet_column_index.get(&field_id)
141 else {
142 return match missing_col_behavior {
145 MissingColBehavior::CantMatch => self.skip_all_rows(),
146 MissingColBehavior::MightMatch => self.select_all_rows(),
147 };
148 };
149
150 let Some(field) = self.snapshot_schema.field_by_id(field_id) else {
151 return Err(Error::new(
152 ErrorKind::Unexpected,
153 format!("Field with id {field_id} missing from snapshot schema"),
154 ));
155 };
156
157 let Some(field_type) = field.field_type.as_primitive_type() else {
158 return Err(Error::new(
159 ErrorKind::Unexpected,
160 format!("Field with id {field_id} not convertible to primitive type"),
161 ));
162 };
163
164 let Some(column_index) = self.column_index.get(parquet_column_index) else {
165 return self.select_all_rows();
168 };
169
170 let row_counts = {
171 match self.row_count_cache.get(&parquet_column_index) {
174 Some(count) => count.clone(),
175 None => {
176 let Some(offset_index) = self.offset_index.get(parquet_column_index) else {
177 return Err(Error::new(
179 ErrorKind::Unexpected,
180 format!("Missing offset index for field id {field_id}"),
181 ));
182 };
183
184 let count = self.calc_row_counts(offset_index);
185 self.row_count_cache
186 .insert(parquet_column_index, count.clone());
187
188 count
189 }
190 }
191 };
192
193 let Some(page_filter) = Self::apply_predicate_to_column_index(
194 predicate,
195 field_type,
196 column_index,
197 &row_counts,
198 )?
199 else {
200 return self.select_all_rows();
201 };
202
203 let row_selectors: Vec<_> = row_counts
204 .iter()
205 .zip(page_filter.iter())
206 .map(|(&row_count, &is_selected)| {
207 if is_selected {
208 RowSelector::select(row_count)
209 } else {
210 RowSelector::skip(row_count)
211 }
212 })
213 .collect();
214
215 Ok(row_selectors.into())
216 }
217
218 fn calc_row_counts(&self, offset_index: &OffsetIndexMetaData) -> Vec<usize> {
220 let mut remaining_rows = self.row_group_metadata.num_rows() as usize;
221 let mut row_counts = Vec::with_capacity(self.offset_index.len());
222
223 let page_locations = offset_index.page_locations();
224 for (idx, page_location) in page_locations.iter().enumerate() {
225 let row_count = if idx < page_locations.len() - 1 {
226 let row_count = (page_locations[idx + 1].first_row_index
227 - page_location.first_row_index) as usize;
228 remaining_rows -= row_count;
229 row_count
230 } else {
231 remaining_rows
232 };
233
234 row_counts.push(row_count);
235 }
236
237 row_counts
238 }
239
240 fn apply_predicate_to_column_index<F>(
241 predicate: F,
242 field_type: &PrimitiveType,
243 column_index: &ColumnIndexMetaData,
244 row_counts: &[usize],
245 ) -> Result<Option<Vec<bool>>>
246 where
247 F: Fn(Option<Datum>, Option<Datum>, PageNullCount) -> Result<bool>,
248 {
249 let result: Result<Vec<bool>> = match column_index {
250 ColumnIndexMetaData::NONE => {
251 return Ok(None);
252 }
253 ColumnIndexMetaData::BOOLEAN(idx) => idx
254 .min_values_iter()
255 .zip(idx.max_values_iter())
256 .enumerate()
257 .zip(row_counts.iter())
258 .map(|((i, (min, max)), &row_count)| {
259 predicate(
260 min.map(|&val| {
261 Datum::new(field_type.clone(), PrimitiveLiteral::Boolean(val))
262 }),
263 max.map(|&val| {
264 Datum::new(field_type.clone(), PrimitiveLiteral::Boolean(val))
265 }),
266 PageNullCount::from_row_and_null_counts(row_count, idx.null_count(i)),
267 )
268 })
269 .collect(),
270 ColumnIndexMetaData::INT32(idx) => idx
271 .min_values_iter()
272 .zip(idx.max_values_iter())
273 .enumerate()
274 .zip(row_counts.iter())
275 .map(|((i, (min, max)), &row_count)| {
276 predicate(
277 min.map(|&val| Datum::new(field_type.clone(), PrimitiveLiteral::Int(val))),
278 max.map(|&val| Datum::new(field_type.clone(), PrimitiveLiteral::Int(val))),
279 PageNullCount::from_row_and_null_counts(row_count, idx.null_count(i)),
280 )
281 })
282 .collect(),
283 ColumnIndexMetaData::INT64(idx) => idx
284 .min_values_iter()
285 .zip(idx.max_values_iter())
286 .enumerate()
287 .zip(row_counts.iter())
288 .map(|((i, (min, max)), &row_count)| {
289 predicate(
290 min.map(|&val| Datum::new(field_type.clone(), PrimitiveLiteral::Long(val))),
291 max.map(|&val| Datum::new(field_type.clone(), PrimitiveLiteral::Long(val))),
292 PageNullCount::from_row_and_null_counts(row_count, idx.null_count(i)),
293 )
294 })
295 .collect(),
296 ColumnIndexMetaData::FLOAT(idx) => idx
297 .min_values_iter()
298 .zip(idx.max_values_iter())
299 .enumerate()
300 .zip(row_counts.iter())
301 .map(|((i, (min, max)), &row_count)| {
302 predicate(
303 min.map(|&val| {
304 Datum::new(
305 field_type.clone(),
306 PrimitiveLiteral::Float(OrderedFloat::from(val)),
307 )
308 }),
309 max.map(|&val| {
310 Datum::new(
311 field_type.clone(),
312 PrimitiveLiteral::Float(OrderedFloat::from(val)),
313 )
314 }),
315 PageNullCount::from_row_and_null_counts(row_count, idx.null_count(i)),
316 )
317 })
318 .collect(),
319 ColumnIndexMetaData::DOUBLE(idx) => idx
320 .min_values_iter()
321 .zip(idx.max_values_iter())
322 .enumerate()
323 .zip(row_counts.iter())
324 .map(|((i, (min, max)), &row_count)| {
325 predicate(
326 min.map(|&val| {
327 Datum::new(
328 field_type.clone(),
329 PrimitiveLiteral::Double(OrderedFloat::from(val)),
330 )
331 }),
332 max.map(|&val| {
333 Datum::new(
334 field_type.clone(),
335 PrimitiveLiteral::Double(OrderedFloat::from(val)),
336 )
337 }),
338 PageNullCount::from_row_and_null_counts(row_count, idx.null_count(i)),
339 )
340 })
341 .collect(),
342 ColumnIndexMetaData::BYTE_ARRAY(idx) => idx
343 .min_values_iter()
344 .zip(idx.max_values_iter())
345 .enumerate()
346 .zip(row_counts.iter())
347 .map(|((i, (min, max)), &row_count)| {
348 predicate(
349 min.map(|val| {
350 Datum::new(
351 field_type.clone(),
352 PrimitiveLiteral::String(String::from_utf8(val.to_vec()).unwrap()),
353 )
354 }),
355 max.map(|val| {
356 Datum::new(
357 field_type.clone(),
358 PrimitiveLiteral::String(String::from_utf8(val.to_vec()).unwrap()),
359 )
360 }),
361 PageNullCount::from_row_and_null_counts(row_count, idx.null_count(i)),
362 )
363 })
364 .collect(),
365 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(_) => {
366 return Err(Error::new(
367 ErrorKind::FeatureUnsupported,
368 "unsupported 'FIXED_LEN_BYTE_ARRAY' index type in column_index",
369 ));
370 }
371 ColumnIndexMetaData::INT96(_) => {
372 return Err(Error::new(
373 ErrorKind::FeatureUnsupported,
374 "unsupported 'INT96' index type in column_index",
375 ));
376 }
377 };
378
379 Ok(Some(result?))
380 }
381
382 fn visit_inequality(
383 &mut self,
384 reference: &BoundReference,
385 datum: &Datum,
386 cmp_fn: fn(&Datum, &Datum) -> bool,
387 use_lower_bound: bool,
388 ) -> Result<RowSelection> {
389 let field_id = reference.field().id;
390
391 self.calc_row_selection(
392 field_id,
393 |min, max, null_count| {
394 if matches!(null_count, PageNullCount::AllNull) {
395 return Ok(false);
396 }
397
398 if datum.is_nan() {
399 return Ok(true);
401 }
402
403 let bound = if use_lower_bound { min } else { max };
404
405 if let Some(bound) = bound {
406 if cmp_fn(&bound, datum) {
407 return Ok(true);
408 }
409
410 return Ok(false);
411 }
412
413 Ok(true)
414 },
415 MissingColBehavior::MightMatch,
416 )
417 }
418}
419
420impl BoundPredicateVisitor for PageIndexEvaluator<'_> {
421 type T = RowSelection;
422
423 fn always_true(&mut self) -> Result<RowSelection> {
424 self.select_all_rows()
425 }
426
427 fn always_false(&mut self) -> Result<RowSelection> {
428 self.skip_all_rows()
429 }
430
431 fn and(&mut self, lhs: RowSelection, rhs: RowSelection) -> Result<RowSelection> {
432 Ok(lhs.intersection(&rhs))
433 }
434
435 fn or(&mut self, lhs: RowSelection, rhs: RowSelection) -> Result<RowSelection> {
436 Ok(lhs.union(&rhs))
437 }
438
439 fn not(&mut self, _: RowSelection) -> Result<RowSelection> {
440 Err(Error::new(
441 ErrorKind::Unexpected,
442 "NOT unsupported at this point. NOT-rewrite should be performed first",
443 ))
444 }
445
446 fn is_null(
447 &mut self,
448 reference: &BoundReference,
449 _predicate: &BoundPredicate,
450 ) -> Result<RowSelection> {
451 let field_id = reference.field().id;
452
453 self.calc_row_selection(
454 field_id,
455 |_max, _min, null_count| Ok(!matches!(null_count, PageNullCount::NoneNull)),
456 MissingColBehavior::MightMatch,
457 )
458 }
459
460 fn not_null(
461 &mut self,
462 reference: &BoundReference,
463 _predicate: &BoundPredicate,
464 ) -> Result<RowSelection> {
465 let field_id = reference.field().id;
466
467 self.calc_row_selection(
468 field_id,
469 |_max, _min, null_count| Ok(!matches!(null_count, PageNullCount::AllNull)),
470 MissingColBehavior::CantMatch,
471 )
472 }
473
474 fn is_nan(
475 &mut self,
476 reference: &BoundReference,
477 _predicate: &BoundPredicate,
478 ) -> Result<RowSelection> {
479 if reference.field().field_type.is_floating_type() {
482 self.select_all_rows()
483 } else {
484 self.skip_all_rows()
485 }
486 }
487
488 fn not_nan(
489 &mut self,
490 _reference: &BoundReference,
491 _predicate: &BoundPredicate,
492 ) -> Result<RowSelection> {
493 self.select_all_rows()
495 }
496
497 fn less_than(
498 &mut self,
499 reference: &BoundReference,
500 datum: &Datum,
501 _predicate: &BoundPredicate,
502 ) -> Result<RowSelection> {
503 self.visit_inequality(reference, datum, PartialOrd::lt, true)
504 }
505
506 fn less_than_or_eq(
507 &mut self,
508 reference: &BoundReference,
509 datum: &Datum,
510 _predicate: &BoundPredicate,
511 ) -> Result<RowSelection> {
512 self.visit_inequality(reference, datum, PartialOrd::le, true)
513 }
514
515 fn greater_than(
516 &mut self,
517 reference: &BoundReference,
518 datum: &Datum,
519 _predicate: &BoundPredicate,
520 ) -> Result<RowSelection> {
521 self.visit_inequality(reference, datum, PartialOrd::gt, false)
522 }
523
524 fn greater_than_or_eq(
525 &mut self,
526 reference: &BoundReference,
527 datum: &Datum,
528 _predicate: &BoundPredicate,
529 ) -> Result<RowSelection> {
530 self.visit_inequality(reference, datum, PartialOrd::ge, false)
531 }
532
533 fn eq(
534 &mut self,
535 reference: &BoundReference,
536 datum: &Datum,
537 _predicate: &BoundPredicate,
538 ) -> Result<RowSelection> {
539 let field_id = reference.field().id;
540
541 self.calc_row_selection(
542 field_id,
543 |min, max, nulls| {
544 if matches!(nulls, PageNullCount::AllNull) {
545 return Ok(false);
546 }
547
548 if let Some(min) = min
549 && min.gt(datum)
550 {
551 return Ok(false);
552 }
553
554 if let Some(max) = max
555 && max.lt(datum)
556 {
557 return Ok(false);
558 }
559
560 Ok(true)
561 },
562 MissingColBehavior::CantMatch,
563 )
564 }
565
566 fn not_eq(
567 &mut self,
568 _reference: &BoundReference,
569 _datum: &Datum,
570 _predicate: &BoundPredicate,
571 ) -> Result<RowSelection> {
572 self.select_all_rows()
576 }
577
578 fn starts_with(
579 &mut self,
580 reference: &BoundReference,
581 datum: &Datum,
582 _predicate: &BoundPredicate,
583 ) -> Result<RowSelection> {
584 let field_id = reference.field().id;
585
586 let PrimitiveLiteral::String(datum) = datum.literal() else {
587 return Err(Error::new(
588 ErrorKind::Unexpected,
589 "Cannot use StartsWith operator on non-string values",
590 ));
591 };
592
593 self.calc_row_selection(
594 field_id,
595 |min, max, nulls| {
596 if matches!(nulls, PageNullCount::AllNull) {
597 return Ok(false);
598 }
599
600 if let Some(lower_bound) = min {
601 let PrimitiveLiteral::String(lower_bound) = lower_bound.literal() else {
602 return Err(Error::new(
603 ErrorKind::Unexpected,
604 "Cannot use StartsWith operator on non-string lower_bound value",
605 ));
606 };
607
608 let prefix_length = lower_bound.chars().count().min(datum.chars().count());
609
610 let truncated_lower_bound =
613 lower_bound.chars().take(prefix_length).collect::<String>();
614 if datum < &truncated_lower_bound {
615 return Ok(false);
616 }
617 }
618
619 if let Some(upper_bound) = max {
620 let PrimitiveLiteral::String(upper_bound) = upper_bound.literal() else {
621 return Err(Error::new(
622 ErrorKind::Unexpected,
623 "Cannot use StartsWith operator on non-string upper_bound value",
624 ));
625 };
626
627 let prefix_length = upper_bound.chars().count().min(datum.chars().count());
628
629 let truncated_upper_bound =
632 upper_bound.chars().take(prefix_length).collect::<String>();
633 if datum > &truncated_upper_bound {
634 return Ok(false);
635 }
636 }
637
638 Ok(true)
639 },
640 MissingColBehavior::CantMatch,
641 )
642 }
643
644 fn not_starts_with(
645 &mut self,
646 reference: &BoundReference,
647 datum: &Datum,
648 _predicate: &BoundPredicate,
649 ) -> Result<RowSelection> {
650 let field_id = reference.field().id;
651
652 let PrimitiveLiteral::String(prefix) = datum.literal() else {
656 return Err(Error::new(
657 ErrorKind::Unexpected,
658 "Cannot use StartsWith operator on non-string values",
659 ));
660 };
661
662 self.calc_row_selection(
663 field_id,
664 |min, max, nulls| {
665 if !matches!(nulls, PageNullCount::NoneNull) {
666 return Ok(true);
667 }
668
669 let Some(lower_bound) = min else {
670 return Ok(true);
671 };
672
673 let PrimitiveLiteral::String(lower_bound_str) = lower_bound.literal() else {
674 return Err(Error::new(
675 ErrorKind::Unexpected,
676 "Cannot use NotStartsWith operator on non-string lower_bound value",
677 ));
678 };
679
680 if lower_bound_str < prefix {
681 return Ok(true);
683 }
684
685 let prefix_len = prefix.chars().count();
686
687 if lower_bound_str.chars().take(prefix_len).collect::<String>() == *prefix {
688 let Some(upper_bound) = max else {
691 return Ok(true);
692 };
693
694 let PrimitiveLiteral::String(upper_bound) = upper_bound.literal() else {
695 return Err(Error::new(
696 ErrorKind::Unexpected,
697 "Cannot use NotStartsWith operator on non-string upper_bound value",
698 ));
699 };
700
701 if upper_bound.chars().count() < prefix_len {
703 return Ok(true);
704 }
705
706 if upper_bound.chars().take(prefix_len).collect::<String>() == *prefix {
707 return Ok(false);
710 }
711 }
712
713 Ok(true)
714 },
715 MissingColBehavior::MightMatch,
716 )
717 }
718
719 fn r#in(
720 &mut self,
721 reference: &BoundReference,
722 literals: &FnvHashSet<Datum>,
723 _predicate: &BoundPredicate,
724 ) -> Result<RowSelection> {
725 let field_id = reference.field().id;
726
727 if literals.len() > IN_PREDICATE_LIMIT {
728 return self.select_all_rows();
730 }
731 self.calc_row_selection(
732 field_id,
733 |min, max, nulls| {
734 if matches!(nulls, PageNullCount::AllNull) {
735 return Ok(false);
736 }
737
738 match (min, max) {
739 (Some(min), Some(max)) => {
740 if literals
741 .iter()
742 .all(|datum| datum.lt(&min) || datum.gt(&max))
743 {
744 return Ok(false);
746 }
747 }
748 (Some(min), _) => {
749 if !literals.iter().any(|datum| datum.ge(&min)) {
750 return Ok(false);
752 }
753 }
754 (_, Some(max)) => {
755 if !literals.iter().any(|datum| datum.le(&max)) {
756 return Ok(false);
758 }
759 }
760
761 _ => {}
762 }
763
764 Ok(true)
765 },
766 MissingColBehavior::CantMatch,
767 )
768 }
769
770 fn not_in(
771 &mut self,
772 _reference: &BoundReference,
773 _literals: &FnvHashSet<Datum>,
774 _predicate: &BoundPredicate,
775 ) -> Result<RowSelection> {
776 self.select_all_rows()
780 }
781}
782
783#[cfg(test)]
784mod tests {
785 use std::collections::HashMap;
786 use std::sync::Arc;
787
788 use arrow_array::{ArrayRef, Float32Array, RecordBatch, StringArray};
789 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
790 use parquet::arrow::ArrowWriter;
791 use parquet::arrow::arrow_reader::{
792 ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelector,
793 };
794 use parquet::file::metadata::ParquetMetaData;
795 use parquet::file::properties::WriterProperties;
796 use rand::{Rng, thread_rng};
797 use tempfile::NamedTempFile;
798
799 use super::PageIndexEvaluator;
800 use crate::expr::{Bind, Reference};
801 use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
802 use crate::{ErrorKind, Result};
803
804 fn create_test_parquet_file() -> Result<(Arc<ParquetMetaData>, NamedTempFile)> {
807 let arrow_schema = Arc::new(ArrowSchema::new(vec![
808 Field::new("col_float", DataType::Float32, true),
809 Field::new("col_string", DataType::Utf8, true),
810 ]));
811
812 let temp_file = NamedTempFile::new().unwrap();
813 let file = temp_file.reopen().unwrap();
814
815 let props = WriterProperties::builder()
816 .set_data_page_row_count_limit(1024)
817 .set_write_batch_size(512)
818 .build();
819
820 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
821
822 let mut batches = vec![];
823
824 let float_vals: Vec<Option<f32>> = vec![None; 1024];
826 let mut string_vals = vec![];
827 string_vals.push(Some("AARDVARK".to_string()));
828 for _ in 1..1023 {
829 string_vals.push(Some("BEAR".to_string()));
830 }
831 string_vals.push(Some("BISON".to_string()));
832
833 batches.push(
834 RecordBatch::try_new(arrow_schema.clone(), vec![
835 Arc::new(Float32Array::from(float_vals)),
836 Arc::new(StringArray::from(string_vals)),
837 ])
838 .unwrap(),
839 );
840
841 let float_vals: Vec<Option<f32>> = vec![None; 1024];
843 let string_vals = vec![Some("DEER".to_string()); 1024];
844
845 batches.push(
846 RecordBatch::try_new(arrow_schema.clone(), vec![
847 Arc::new(Float32Array::from(float_vals)),
848 Arc::new(StringArray::from(string_vals)),
849 ])
850 .unwrap(),
851 );
852
853 let mut float_vals = vec![];
855 for i in 0..1024 {
856 float_vals.push(Some(i as f32 * 10.0 / 1024.0));
857 }
858 let mut string_vals = vec![];
859 string_vals.push(Some("GIRAFFE".to_string()));
860 string_vals.push(None);
861 for _ in 2..1024 {
862 string_vals.push(Some("HIPPO".to_string()));
863 }
864
865 batches.push(
866 RecordBatch::try_new(arrow_schema.clone(), vec![
867 Arc::new(Float32Array::from(float_vals)),
868 Arc::new(StringArray::from(string_vals)),
869 ])
870 .unwrap(),
871 );
872
873 let mut float_vals = vec![None];
875 for i in 1..1024 {
876 float_vals.push(Some(10.0 + i as f32 * 10.0 / 1024.0));
877 }
878 let string_vals = vec![Some("HIPPO".to_string()); 1024];
879
880 batches.push(
881 RecordBatch::try_new(arrow_schema.clone(), vec![
882 Arc::new(Float32Array::from(float_vals)),
883 Arc::new(StringArray::from(string_vals)),
884 ])
885 .unwrap(),
886 );
887
888 for batch in &batches {
890 for i in 0..batch.num_rows() {
891 writer.write(&batch.slice(i, 1)).unwrap();
892 }
893 }
894
895 writer.close().unwrap();
896
897 let file = temp_file.reopen().unwrap();
898 let options = ArrowReaderOptions::new().with_page_index(true);
899 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
900 let metadata = reader.metadata().clone();
901
902 Ok((metadata, temp_file))
903 }
904
905 fn get_test_metadata(
907 metadata: &ParquetMetaData,
908 ) -> (
909 Vec<parquet::file::page_index::column_index::ColumnIndexMetaData>,
910 Vec<parquet::file::page_index::offset_index::OffsetIndexMetaData>,
911 &parquet::file::metadata::RowGroupMetaData,
912 ) {
913 let row_group_metadata = metadata.row_group(0);
914 let column_index = metadata.column_index().unwrap()[0].to_vec();
915 let offset_index = metadata.offset_index().unwrap()[0].to_vec();
916 (column_index, offset_index, row_group_metadata)
917 }
918
919 #[test]
920 fn eval_matches_no_rows_for_empty_row_group() -> Result<()> {
921 let arrow_schema = Arc::new(ArrowSchema::new(vec![
922 Field::new("col_float", DataType::Float32, true),
923 Field::new("col_string", DataType::Utf8, true),
924 ]));
925
926 let empty_float: ArrayRef = Arc::new(Float32Array::from(Vec::<Option<f32>>::new()));
927 let empty_string: ArrayRef = Arc::new(StringArray::from(Vec::<Option<String>>::new()));
928 let empty_batch =
929 RecordBatch::try_new(arrow_schema.clone(), vec![empty_float, empty_string]).unwrap();
930
931 let temp_file = NamedTempFile::new().unwrap();
932 let file = temp_file.reopen().unwrap();
933
934 let mut writer = ArrowWriter::try_new(file, arrow_schema, None).unwrap();
935 writer.write(&empty_batch).unwrap();
936 writer.close().unwrap();
937
938 let file = temp_file.reopen().unwrap();
939 let options = ArrowReaderOptions::new().with_page_index(true);
940 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
941 let metadata = reader.metadata();
942
943 if metadata.num_row_groups() == 0 || metadata.row_group(0).num_rows() == 0 {
944 return Ok(());
945 }
946
947 let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
948
949 let filter = Reference::new("col_float")
950 .greater_than(Datum::float(1.0))
951 .bind(iceberg_schema_ref.clone(), false)?;
952
953 let row_group_metadata = metadata.row_group(0);
954 let column_index = metadata.column_index().unwrap()[0].to_vec();
955 let offset_index = metadata.offset_index().unwrap()[0].to_vec();
956
957 let result = PageIndexEvaluator::eval(
958 &filter,
959 &column_index,
960 &offset_index,
961 row_group_metadata,
962 &field_id_map,
963 iceberg_schema_ref.as_ref(),
964 )?;
965
966 assert_eq!(result.len(), 0);
967
968 Ok(())
969 }
970
971 #[test]
972 fn eval_is_null_select_only_pages_with_nulls() -> Result<()> {
973 let (metadata, _temp_file) = create_test_parquet_file()?;
974 let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
975 let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
976
977 let filter = Reference::new("col_float")
978 .is_null()
979 .bind(iceberg_schema_ref.clone(), false)?;
980
981 let result = PageIndexEvaluator::eval(
982 &filter,
983 &column_index,
984 &offset_index,
985 row_group_metadata,
986 &field_id_map,
987 iceberg_schema_ref.as_ref(),
988 )?;
989
990 let expected = vec![
991 RowSelector::select(2048),
992 RowSelector::skip(1024),
993 RowSelector::select(1024),
994 ];
995
996 assert_eq!(result, expected);
997
998 Ok(())
999 }
1000
1001 #[test]
1002 fn eval_is_not_null_dont_select_pages_with_all_nulls() -> Result<()> {
1003 let (metadata, _temp_file) = create_test_parquet_file()?;
1004 let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1005 let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1006
1007 let filter = Reference::new("col_float")
1008 .is_not_null()
1009 .bind(iceberg_schema_ref.clone(), false)?;
1010
1011 let result = PageIndexEvaluator::eval(
1012 &filter,
1013 &column_index,
1014 &offset_index,
1015 row_group_metadata,
1016 &field_id_map,
1017 iceberg_schema_ref.as_ref(),
1018 )?;
1019
1020 let expected = vec![RowSelector::skip(2048), RowSelector::select(2048)];
1021
1022 assert_eq!(result, expected);
1023
1024 Ok(())
1025 }
1026
1027 #[test]
1028 fn eval_is_nan_select_all() -> Result<()> {
1029 let (metadata, _temp_file) = create_test_parquet_file()?;
1030 let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1031 let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1032
1033 let filter = Reference::new("col_float")
1034 .is_nan()
1035 .bind(iceberg_schema_ref.clone(), false)?;
1036
1037 let result = PageIndexEvaluator::eval(
1038 &filter,
1039 &column_index,
1040 &offset_index,
1041 row_group_metadata,
1042 &field_id_map,
1043 iceberg_schema_ref.as_ref(),
1044 )?;
1045
1046 let expected = vec![RowSelector::select(4096)];
1047
1048 assert_eq!(result, expected);
1049
1050 Ok(())
1051 }
1052
1053 #[test]
1054 fn eval_not_nan_select_all() -> Result<()> {
1055 let (metadata, _temp_file) = create_test_parquet_file()?;
1056 let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1057 let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1058
1059 let filter = Reference::new("col_float")
1060 .is_not_nan()
1061 .bind(iceberg_schema_ref.clone(), false)?;
1062
1063 let result = PageIndexEvaluator::eval(
1064 &filter,
1065 &column_index,
1066 &offset_index,
1067 row_group_metadata,
1068 &field_id_map,
1069 iceberg_schema_ref.as_ref(),
1070 )?;
1071
1072 let expected = vec![RowSelector::select(4096)];
1073
1074 assert_eq!(result, expected);
1075
1076 Ok(())
1077 }
1078
1079 #[test]
1080 fn eval_inequality_nan_datum_all_rows_except_all_null_pages() -> Result<()> {
1081 let (metadata, _temp_file) = create_test_parquet_file()?;
1082 let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1083 let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1084
1085 let filter = Reference::new("col_float")
1086 .less_than(Datum::float(f32::NAN))
1087 .bind(iceberg_schema_ref.clone(), false)?;
1088
1089 let result = PageIndexEvaluator::eval(
1090 &filter,
1091 &column_index,
1092 &offset_index,
1093 row_group_metadata,
1094 &field_id_map,
1095 iceberg_schema_ref.as_ref(),
1096 )?;
1097
1098 let expected = vec![RowSelector::skip(2048), RowSelector::select(2048)];
1099
1100 assert_eq!(result, expected);
1101
1102 Ok(())
1103 }
1104
1105 #[test]
1106 fn eval_inequality_pages_containing_value_except_all_null_pages() -> Result<()> {
1107 let (metadata, _temp_file) = create_test_parquet_file()?;
1108 let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1109 let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1110
1111 let filter = Reference::new("col_float")
1112 .less_than(Datum::float(5.0))
1113 .bind(iceberg_schema_ref.clone(), false)?;
1114
1115 let result = PageIndexEvaluator::eval(
1116 &filter,
1117 &column_index,
1118 &offset_index,
1119 row_group_metadata,
1120 &field_id_map,
1121 iceberg_schema_ref.as_ref(),
1122 )?;
1123
1124 let expected = vec![
1125 RowSelector::skip(2048),
1126 RowSelector::select(1024),
1127 RowSelector::skip(1024),
1128 ];
1129
1130 assert_eq!(result, expected);
1131
1132 Ok(())
1133 }
1134
1135 #[test]
1136 fn eval_eq_pages_containing_value_except_all_null_pages() -> Result<()> {
1137 let (metadata, _temp_file) = create_test_parquet_file()?;
1138 let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1139 let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1140
1141 let filter = Reference::new("col_float")
1142 .equal_to(Datum::float(5.0))
1143 .bind(iceberg_schema_ref.clone(), false)?;
1144
1145 let result = PageIndexEvaluator::eval(
1146 &filter,
1147 &column_index,
1148 &offset_index,
1149 row_group_metadata,
1150 &field_id_map,
1151 iceberg_schema_ref.as_ref(),
1152 )?;
1153
1154 let expected = vec![
1158 RowSelector::skip(2048),
1159 RowSelector::select(1024),
1160 RowSelector::skip(1024),
1161 ];
1162
1163 assert_eq!(result, expected);
1164
1165 Ok(())
1166 }
1167
1168 #[test]
1169 fn eval_not_eq_all_rows() -> Result<()> {
1170 let (metadata, _temp_file) = create_test_parquet_file()?;
1171 let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1172 let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1173
1174 let filter = Reference::new("col_float")
1175 .not_equal_to(Datum::float(5.0))
1176 .bind(iceberg_schema_ref.clone(), false)?;
1177
1178 let result = PageIndexEvaluator::eval(
1179 &filter,
1180 &column_index,
1181 &offset_index,
1182 row_group_metadata,
1183 &field_id_map,
1184 iceberg_schema_ref.as_ref(),
1185 )?;
1186
1187 let expected = vec![RowSelector::select(4096)];
1188
1189 assert_eq!(result, expected);
1190
1191 Ok(())
1192 }
1193
1194 #[test]
1195 fn eval_starts_with_error_float_col() -> Result<()> {
1196 let (metadata, _temp_file) = create_test_parquet_file()?;
1197 let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1198 let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1199
1200 let filter = Reference::new("col_float")
1201 .starts_with(Datum::float(5.0))
1202 .bind(iceberg_schema_ref.clone(), false)?;
1203
1204 let result = PageIndexEvaluator::eval(
1205 &filter,
1206 &column_index,
1207 &offset_index,
1208 row_group_metadata,
1209 &field_id_map,
1210 iceberg_schema_ref.as_ref(),
1211 );
1212
1213 assert_eq!(result.unwrap_err().kind(), ErrorKind::Unexpected);
1214
1215 Ok(())
1216 }
1217
1218 #[test]
1219 fn eval_starts_with_pages_containing_value_except_all_null_pages() -> Result<()> {
1220 let (metadata, _temp_file) = create_test_parquet_file()?;
1221 let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1222 let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1223
1224 let filter = Reference::new("col_string")
1228 .starts_with(Datum::string("B"))
1229 .bind(iceberg_schema_ref.clone(), false)?;
1230
1231 let result = PageIndexEvaluator::eval(
1232 &filter,
1233 &column_index,
1234 &offset_index,
1235 row_group_metadata,
1236 &field_id_map,
1237 iceberg_schema_ref.as_ref(),
1238 )?;
1239
1240 let expected = vec![RowSelector::select(1024), RowSelector::skip(3072)];
1242
1243 assert_eq!(result, expected);
1244
1245 Ok(())
1246 }
1247
1248 #[test]
1249 fn eval_not_starts_with_pages_containing_value_except_pages_with_min_and_max_equal_to_prefix_and_all_null_pages()
1250 -> Result<()> {
1251 let (metadata, _temp_file) = create_test_parquet_file()?;
1252 let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1253 let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1254
1255 let filter = Reference::new("col_string")
1259 .not_starts_with(Datum::string("DE"))
1260 .bind(iceberg_schema_ref.clone(), false)?;
1261
1262 let result = PageIndexEvaluator::eval(
1263 &filter,
1264 &column_index,
1265 &offset_index,
1266 row_group_metadata,
1267 &field_id_map,
1268 iceberg_schema_ref.as_ref(),
1269 )?;
1270
1271 let expected = vec![
1275 RowSelector::select(1024),
1276 RowSelector::skip(1024),
1277 RowSelector::select(2048),
1278 ];
1279
1280 assert_eq!(result, expected);
1281
1282 Ok(())
1283 }
1284
1285 #[test]
1286 fn eval_in_length_of_set_above_limit_all_rows() -> Result<()> {
1287 let mut rng = thread_rng();
1288 let (metadata, _temp_file) = create_test_parquet_file()?;
1289 let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1290 let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1291
1292 let filter = Reference::new("col_float")
1293 .is_in(std::iter::repeat_with(|| Datum::float(rng.gen_range(0.0..10.0))).take(1000))
1294 .bind(iceberg_schema_ref.clone(), false)?;
1295
1296 let result = PageIndexEvaluator::eval(
1297 &filter,
1298 &column_index,
1299 &offset_index,
1300 row_group_metadata,
1301 &field_id_map,
1302 iceberg_schema_ref.as_ref(),
1303 )?;
1304
1305 let expected = vec![RowSelector::select(4096)];
1306
1307 assert_eq!(result, expected);
1308
1309 Ok(())
1310 }
1311
1312 #[test]
1313 fn eval_in_valid_set_size_some_rows() -> Result<()> {
1314 let (metadata, _temp_file) = create_test_parquet_file()?;
1315 let (column_index, offset_index, row_group_metadata) = get_test_metadata(&metadata);
1316 let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;
1317
1318 let filter = Reference::new("col_string")
1322 .is_in([Datum::string("AARDVARK"), Datum::string("GIRAFFE")])
1323 .bind(iceberg_schema_ref.clone(), false)?;
1324
1325 let result = PageIndexEvaluator::eval(
1326 &filter,
1327 &column_index,
1328 &offset_index,
1329 row_group_metadata,
1330 &field_id_map,
1331 iceberg_schema_ref.as_ref(),
1332 )?;
1333
1334 let expected = vec![
1336 RowSelector::select(1024),
1337 RowSelector::skip(1024),
1338 RowSelector::select(1024),
1339 RowSelector::skip(1024),
1340 ];
1341
1342 assert_eq!(result, expected);
1343
1344 Ok(())
1345 }
1346
1347 fn build_iceberg_schema_and_field_map() -> Result<(Arc<Schema>, HashMap<i32, usize>)> {
1348 let iceberg_schema = Schema::builder()
1349 .with_fields([
1350 Arc::new(NestedField::new(
1351 1,
1352 "col_float",
1353 Type::Primitive(PrimitiveType::Float),
1354 false,
1355 )),
1356 Arc::new(NestedField::new(
1357 2,
1358 "col_string",
1359 Type::Primitive(PrimitiveType::String),
1360 false,
1361 )),
1362 ])
1363 .build()?;
1364 let iceberg_schema_ref = Arc::new(iceberg_schema);
1365
1366 let field_id_map = HashMap::from_iter([(1, 0), (2, 1)]);
1367
1368 Ok((iceberg_schema_ref, field_id_map))
1369 }
1370}