1use std::collections::{HashMap, HashSet};
21use std::ops::Range;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene};
26use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar};
27use arrow_cast::cast::cast;
28use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
29use arrow_schema::{
30 ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
31};
32use arrow_string::like::starts_with;
33use bytes::Bytes;
34use fnv::FnvHashSet;
35use futures::future::BoxFuture;
36use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
37use parquet::arrow::arrow_reader::{
38 ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
39};
40use parquet::arrow::async_reader::AsyncFileReader;
41use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask};
42use parquet::file::metadata::{
43 PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
44};
45use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
46
47use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
48use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
49use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
50use crate::delete_vector::DeleteVector;
51use crate::error::Result;
52use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit};
53use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
54use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
55use crate::expr::{BoundPredicate, BoundReference};
56use crate::io::{FileIO, FileMetadata, FileRead};
57use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
58use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
59use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type};
60use crate::utils::available_parallelism;
61use crate::{Error, ErrorKind};
62
63pub struct ArrowReaderBuilder {
65 batch_size: Option<usize>,
66 file_io: FileIO,
67 concurrency_limit_data_files: usize,
68 row_group_filtering_enabled: bool,
69 row_selection_enabled: bool,
70 metadata_size_hint: Option<usize>,
71}
72
73impl ArrowReaderBuilder {
74 pub fn new(file_io: FileIO) -> Self {
76 let num_cpus = available_parallelism().get();
77
78 ArrowReaderBuilder {
79 batch_size: None,
80 file_io,
81 concurrency_limit_data_files: num_cpus,
82 row_group_filtering_enabled: true,
83 row_selection_enabled: false,
84 metadata_size_hint: None,
85 }
86 }
87
88 pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
90 self.concurrency_limit_data_files = val;
91 self
92 }
93
94 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
97 self.batch_size = Some(batch_size);
98 self
99 }
100
101 pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
103 self.row_group_filtering_enabled = row_group_filtering_enabled;
104 self
105 }
106
107 pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
109 self.row_selection_enabled = row_selection_enabled;
110 self
111 }
112
113 pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
118 self.metadata_size_hint = Some(metadata_size_hint);
119 self
120 }
121
122 pub fn build(self) -> ArrowReader {
124 ArrowReader {
125 batch_size: self.batch_size,
126 file_io: self.file_io.clone(),
127 delete_file_loader: CachingDeleteFileLoader::new(
128 self.file_io.clone(),
129 self.concurrency_limit_data_files,
130 ),
131 concurrency_limit_data_files: self.concurrency_limit_data_files,
132 row_group_filtering_enabled: self.row_group_filtering_enabled,
133 row_selection_enabled: self.row_selection_enabled,
134 metadata_size_hint: self.metadata_size_hint,
135 }
136 }
137}
138
139#[derive(Clone)]
141pub struct ArrowReader {
142 batch_size: Option<usize>,
143 file_io: FileIO,
144 delete_file_loader: CachingDeleteFileLoader,
145
146 concurrency_limit_data_files: usize,
148
149 row_group_filtering_enabled: bool,
150 row_selection_enabled: bool,
151 metadata_size_hint: Option<usize>,
152}
153
154impl ArrowReader {
155 pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
158 let file_io = self.file_io.clone();
159 let batch_size = self.batch_size;
160 let concurrency_limit_data_files = self.concurrency_limit_data_files;
161 let row_group_filtering_enabled = self.row_group_filtering_enabled;
162 let row_selection_enabled = self.row_selection_enabled;
163 let metadata_size_hint = self.metadata_size_hint;
164
165 let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 {
167 Box::pin(
168 tasks
169 .and_then(move |task| {
170 let file_io = file_io.clone();
171
172 Self::process_file_scan_task(
173 task,
174 batch_size,
175 file_io,
176 self.delete_file_loader.clone(),
177 row_group_filtering_enabled,
178 row_selection_enabled,
179 metadata_size_hint,
180 )
181 })
182 .map_err(|err| {
183 Error::new(ErrorKind::Unexpected, "file scan task generate failed")
184 .with_source(err)
185 })
186 .try_flatten(),
187 )
188 } else {
189 Box::pin(
190 tasks
191 .map_ok(move |task| {
192 let file_io = file_io.clone();
193
194 Self::process_file_scan_task(
195 task,
196 batch_size,
197 file_io,
198 self.delete_file_loader.clone(),
199 row_group_filtering_enabled,
200 row_selection_enabled,
201 metadata_size_hint,
202 )
203 })
204 .map_err(|err| {
205 Error::new(ErrorKind::Unexpected, "file scan task generate failed")
206 .with_source(err)
207 })
208 .try_buffer_unordered(concurrency_limit_data_files)
209 .try_flatten_unordered(concurrency_limit_data_files),
210 )
211 };
212
213 Ok(stream)
214 }
215
216 #[allow(clippy::too_many_arguments)]
217 async fn process_file_scan_task(
218 task: FileScanTask,
219 batch_size: Option<usize>,
220 file_io: FileIO,
221 delete_file_loader: CachingDeleteFileLoader,
222 row_group_filtering_enabled: bool,
223 row_selection_enabled: bool,
224 metadata_size_hint: Option<usize>,
225 ) -> Result<ArrowRecordBatchStream> {
226 let should_load_page_index =
227 (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
228
229 let delete_filter_rx =
230 delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema));
231
232 let initial_stream_builder = Self::create_parquet_record_batch_stream_builder(
235 &task.data_file_path,
236 file_io.clone(),
237 should_load_page_index,
238 None,
239 metadata_size_hint,
240 task.file_size_in_bytes,
241 )
242 .await?;
243
244 let missing_field_ids = initial_stream_builder
248 .schema()
249 .fields()
250 .iter()
251 .next()
252 .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
253
254 let mut record_batch_stream_builder = if missing_field_ids {
270 let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
272 apply_name_mapping_to_arrow_schema(
277 Arc::clone(initial_stream_builder.schema()),
278 name_mapping,
279 )?
280 } else {
281 add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema())
284 };
285
286 let options = ArrowReaderOptions::new().with_schema(arrow_schema);
287
288 Self::create_parquet_record_batch_stream_builder(
289 &task.data_file_path,
290 file_io.clone(),
291 should_load_page_index,
292 Some(options),
293 metadata_size_hint,
294 task.file_size_in_bytes,
295 )
296 .await?
297 } else {
298 initial_stream_builder
300 };
301
302 let project_field_ids_without_metadata: Vec<i32> = task
304 .project_field_ids
305 .iter()
306 .filter(|&&id| !is_metadata_field(id))
307 .copied()
308 .collect();
309
310 let projection_mask = Self::get_arrow_projection_mask(
315 &project_field_ids_without_metadata,
316 &task.schema,
317 record_batch_stream_builder.parquet_schema(),
318 record_batch_stream_builder.schema(),
319 missing_field_ids, )?;
321
322 record_batch_stream_builder =
323 record_batch_stream_builder.with_projection(projection_mask.clone());
324
325 let mut record_batch_transformer_builder =
329 RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
330
331 if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
333 let file_datum = Datum::string(task.data_file_path.clone());
334 record_batch_transformer_builder =
335 record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum);
336 }
337
338 if let (Some(partition_spec), Some(partition_data)) =
339 (task.partition_spec.clone(), task.partition.clone())
340 {
341 record_batch_transformer_builder =
342 record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
343 }
344
345 let mut record_batch_transformer = record_batch_transformer_builder.build();
346
347 if let Some(batch_size) = batch_size {
348 record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
349 }
350
351 let delete_filter = delete_filter_rx.await.unwrap()?;
352 let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
353
354 let final_predicate = match (&task.predicate, delete_predicate) {
359 (None, None) => None,
360 (Some(predicate), None) => Some(predicate.clone()),
361 (None, Some(ref predicate)) => Some(predicate.clone()),
362 (Some(filter_predicate), Some(delete_predicate)) => {
363 Some(filter_predicate.clone().and(delete_predicate))
364 }
365 };
366
367 let mut selected_row_group_indices = None;
383 let mut row_selection = None;
384
385 if task.start != 0 || task.length != 0 {
388 let byte_range_filtered_row_groups = Self::filter_row_groups_by_byte_range(
389 record_batch_stream_builder.metadata(),
390 task.start,
391 task.length,
392 )?;
393 selected_row_group_indices = Some(byte_range_filtered_row_groups);
394 }
395
396 if let Some(predicate) = final_predicate {
397 let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
398 record_batch_stream_builder.parquet_schema(),
399 &predicate,
400 )?;
401
402 let row_filter = Self::get_row_filter(
403 &predicate,
404 record_batch_stream_builder.parquet_schema(),
405 &iceberg_field_ids,
406 &field_id_map,
407 )?;
408 record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
409
410 if row_group_filtering_enabled {
411 let predicate_filtered_row_groups = Self::get_selected_row_group_indices(
412 &predicate,
413 record_batch_stream_builder.metadata(),
414 &field_id_map,
415 &task.schema,
416 )?;
417
418 selected_row_group_indices = match selected_row_group_indices {
421 Some(byte_range_filtered) => {
422 let intersection: Vec<usize> = byte_range_filtered
424 .into_iter()
425 .filter(|idx| predicate_filtered_row_groups.contains(idx))
426 .collect();
427 Some(intersection)
428 }
429 None => Some(predicate_filtered_row_groups),
430 };
431 }
432
433 if row_selection_enabled {
434 row_selection = Some(Self::get_row_selection_for_filter_predicate(
435 &predicate,
436 record_batch_stream_builder.metadata(),
437 &selected_row_group_indices,
438 &field_id_map,
439 &task.schema,
440 )?);
441 }
442 }
443
444 let positional_delete_indexes = delete_filter.get_delete_vector(&task);
445
446 if let Some(positional_delete_indexes) = positional_delete_indexes {
447 let delete_row_selection = {
448 let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
449
450 Self::build_deletes_row_selection(
451 record_batch_stream_builder.metadata().row_groups(),
452 &selected_row_group_indices,
453 &positional_delete_indexes,
454 )
455 }?;
456
457 row_selection = match row_selection {
460 None => Some(delete_row_selection),
461 Some(filter_row_selection) => {
462 Some(filter_row_selection.intersection(&delete_row_selection))
463 }
464 };
465 }
466
467 if let Some(row_selection) = row_selection {
468 record_batch_stream_builder =
469 record_batch_stream_builder.with_row_selection(row_selection);
470 }
471
472 if let Some(selected_row_group_indices) = selected_row_group_indices {
473 record_batch_stream_builder =
474 record_batch_stream_builder.with_row_groups(selected_row_group_indices);
475 }
476
477 let record_batch_stream =
480 record_batch_stream_builder
481 .build()?
482 .map(move |batch| match batch {
483 Ok(batch) => {
484 record_batch_transformer.process_record_batch(batch)
486 }
487 Err(err) => Err(err.into()),
488 });
489
490 Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
491 }
492
493 pub(crate) async fn create_parquet_record_batch_stream_builder(
494 data_file_path: &str,
495 file_io: FileIO,
496 should_load_page_index: bool,
497 arrow_reader_options: Option<ArrowReaderOptions>,
498 metadata_size_hint: Option<usize>,
499 file_size_in_bytes: u64,
500 ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader>> {
501 let parquet_file = file_io.new_input(data_file_path)?;
504 let parquet_reader = parquet_file.reader().await?;
505 let mut parquet_file_reader = ArrowFileReader::new(
506 FileMetadata {
507 size: file_size_in_bytes,
508 },
509 parquet_reader,
510 )
511 .with_preload_column_index(true)
512 .with_preload_offset_index(true)
513 .with_preload_page_index(should_load_page_index);
514
515 if let Some(hint) = metadata_size_hint {
516 parquet_file_reader = parquet_file_reader.with_metadata_size_hint(hint);
517 }
518
519 let options = arrow_reader_options.unwrap_or_default();
521 let record_batch_stream_builder =
522 ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, options).await?;
523 Ok(record_batch_stream_builder)
524 }
525
526 fn build_deletes_row_selection(
532 row_group_metadata_list: &[RowGroupMetaData],
533 selected_row_groups: &Option<Vec<usize>>,
534 positional_deletes: &DeleteVector,
535 ) -> Result<RowSelection> {
536 let mut results: Vec<RowSelector> = Vec::new();
537 let mut selected_row_groups_idx = 0;
538 let mut current_row_group_base_idx: u64 = 0;
539 let mut delete_vector_iter = positional_deletes.iter();
540 let mut next_deleted_row_idx_opt = delete_vector_iter.next();
541
542 for (idx, row_group_metadata) in row_group_metadata_list.iter().enumerate() {
543 let row_group_num_rows = row_group_metadata.num_rows() as u64;
544 let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;
545
546 if let Some(selected_row_groups) = selected_row_groups {
548 if selected_row_groups_idx == selected_row_groups.len() {
550 break;
551 }
552
553 if idx == selected_row_groups[selected_row_groups_idx] {
554 selected_row_groups_idx += 1;
558 } else {
559 delete_vector_iter.advance_to(next_row_group_base_idx);
564 if let Some(cached_idx) = next_deleted_row_idx_opt
566 && cached_idx < next_row_group_base_idx
567 {
568 next_deleted_row_idx_opt = delete_vector_iter.next();
569 }
570
571 current_row_group_base_idx += row_group_num_rows;
574 continue;
575 }
576 }
577
578 let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
579 Some(next_deleted_row_idx) => {
580 if next_deleted_row_idx >= next_row_group_base_idx {
583 results.push(RowSelector::select(row_group_num_rows as usize));
584 current_row_group_base_idx += row_group_num_rows;
585 continue;
586 }
587
588 next_deleted_row_idx
589 }
590
591 _ => {
593 results.push(RowSelector::select(row_group_num_rows as usize));
594 current_row_group_base_idx += row_group_num_rows;
595 continue;
596 }
597 };
598
599 let mut current_idx = current_row_group_base_idx;
600 'chunks: while next_deleted_row_idx < next_row_group_base_idx {
601 if current_idx < next_deleted_row_idx {
603 let run_length = next_deleted_row_idx - current_idx;
604 results.push(RowSelector::select(run_length as usize));
605 current_idx += run_length;
606 }
607
608 let mut run_length = 0;
610 while next_deleted_row_idx == current_idx
611 && next_deleted_row_idx < next_row_group_base_idx
612 {
613 run_length += 1;
614 current_idx += 1;
615
616 next_deleted_row_idx_opt = delete_vector_iter.next();
617 next_deleted_row_idx = match next_deleted_row_idx_opt {
618 Some(next_deleted_row_idx) => next_deleted_row_idx,
619 _ => {
620 results.push(RowSelector::skip(run_length));
624 break 'chunks;
625 }
626 };
627 }
628 if run_length > 0 {
629 results.push(RowSelector::skip(run_length));
630 }
631 }
632
633 if current_idx < next_row_group_base_idx {
634 results.push(RowSelector::select(
635 (next_row_group_base_idx - current_idx) as usize,
636 ));
637 }
638
639 current_row_group_base_idx += row_group_num_rows;
640 }
641
642 Ok(results.into())
643 }
644
645 fn build_field_id_set_and_map(
646 parquet_schema: &SchemaDescriptor,
647 predicate: &BoundPredicate,
648 ) -> Result<(HashSet<i32>, HashMap<i32, usize>)> {
649 let mut collector = CollectFieldIdVisitor {
651 field_ids: HashSet::default(),
652 };
653 visit(&mut collector, predicate)?;
654
655 let iceberg_field_ids = collector.field_ids();
656
657 let field_id_map = match build_field_id_map(parquet_schema)? {
659 Some(map) => map,
660 None => build_fallback_field_id_map(parquet_schema),
661 };
662
663 Ok((iceberg_field_ids, field_id_map))
664 }
665
666 fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
669 match field.field_type.as_ref() {
670 Type::Primitive(_) => {
671 field_ids.push(field.id);
672 }
673 Type::Struct(struct_type) => {
674 for nested_field in struct_type.fields() {
675 Self::include_leaf_field_id(nested_field, field_ids);
676 }
677 }
678 Type::List(list_type) => {
679 Self::include_leaf_field_id(&list_type.element_field, field_ids);
680 }
681 Type::Map(map_type) => {
682 Self::include_leaf_field_id(&map_type.key_field, field_ids);
683 Self::include_leaf_field_id(&map_type.value_field, field_ids);
684 }
685 }
686 }
687
688 fn get_arrow_projection_mask(
689 field_ids: &[i32],
690 iceberg_schema_of_task: &Schema,
691 parquet_schema: &SchemaDescriptor,
692 arrow_schema: &ArrowSchemaRef,
693 use_fallback: bool, ) -> Result<ProjectionMask> {
695 fn type_promotion_is_valid(
696 file_type: Option<&PrimitiveType>,
697 projected_type: Option<&PrimitiveType>,
698 ) -> bool {
699 match (file_type, projected_type) {
700 (Some(lhs), Some(rhs)) if lhs == rhs => true,
701 (Some(PrimitiveType::Int), Some(PrimitiveType::Long)) => true,
702 (Some(PrimitiveType::Float), Some(PrimitiveType::Double)) => true,
703 (
704 Some(PrimitiveType::Decimal {
705 precision: file_precision,
706 scale: file_scale,
707 }),
708 Some(PrimitiveType::Decimal {
709 precision: requested_precision,
710 scale: requested_scale,
711 }),
712 ) if requested_precision >= file_precision && file_scale == requested_scale => true,
713 (Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true,
715 _ => false,
716 }
717 }
718
719 if field_ids.is_empty() {
720 return Ok(ProjectionMask::all());
721 }
722
723 if use_fallback {
724 Self::get_arrow_projection_mask_fallback(field_ids, parquet_schema)
726 } else {
727 let mut leaf_field_ids = vec![];
731 for field_id in field_ids {
732 let field = iceberg_schema_of_task.field_by_id(*field_id);
733 if let Some(field) = field {
734 Self::include_leaf_field_id(field, &mut leaf_field_ids);
735 }
736 }
737
738 Self::get_arrow_projection_mask_with_field_ids(
739 &leaf_field_ids,
740 iceberg_schema_of_task,
741 parquet_schema,
742 arrow_schema,
743 type_promotion_is_valid,
744 )
745 }
746 }
747
748 fn get_arrow_projection_mask_with_field_ids(
751 leaf_field_ids: &[i32],
752 iceberg_schema_of_task: &Schema,
753 parquet_schema: &SchemaDescriptor,
754 arrow_schema: &ArrowSchemaRef,
755 type_promotion_is_valid: fn(Option<&PrimitiveType>, Option<&PrimitiveType>) -> bool,
756 ) -> Result<ProjectionMask> {
757 let mut column_map = HashMap::new();
758 let fields = arrow_schema.fields();
759
760 let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
763 let projected_arrow_schema = ArrowSchema::new_with_metadata(
764 fields.filter_leaves(|_, f| {
765 f.metadata()
766 .get(PARQUET_FIELD_ID_META_KEY)
767 .and_then(|field_id| i32::from_str(field_id).ok())
768 .is_some_and(|field_id| {
769 projected_fields.insert((*f).clone(), field_id);
770 leaf_field_ids.contains(&field_id)
771 })
772 }),
773 arrow_schema.metadata().clone(),
774 );
775 let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
776
777 fields.filter_leaves(|idx, field| {
778 let Some(field_id) = projected_fields.get(field).cloned() else {
779 return false;
780 };
781
782 let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
783 let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
784
785 if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
786 return false;
787 }
788
789 if !type_promotion_is_valid(
790 parquet_iceberg_field
791 .unwrap()
792 .field_type
793 .as_primitive_type(),
794 iceberg_field.unwrap().field_type.as_primitive_type(),
795 ) {
796 return false;
797 }
798
799 column_map.insert(field_id, idx);
800 true
801 });
802
803 let mut indices = vec![];
806 for field_id in leaf_field_ids {
807 if let Some(col_idx) = column_map.get(field_id) {
808 indices.push(*col_idx);
809 }
810 }
811
812 if indices.is_empty() {
813 Ok(ProjectionMask::all())
816 } else {
817 Ok(ProjectionMask::leaves(parquet_schema, indices))
818 }
819 }
820
821 fn get_arrow_projection_mask_fallback(
825 field_ids: &[i32],
826 parquet_schema: &SchemaDescriptor,
827 ) -> Result<ProjectionMask> {
828 let parquet_root_fields = parquet_schema.root_schema().get_fields();
830 let mut root_indices = vec![];
831
832 for field_id in field_ids.iter() {
833 let parquet_pos = (*field_id - 1) as usize;
834
835 if parquet_pos < parquet_root_fields.len() {
836 root_indices.push(parquet_pos);
837 }
838 }
840
841 if root_indices.is_empty() {
842 Ok(ProjectionMask::all())
843 } else {
844 Ok(ProjectionMask::roots(parquet_schema, root_indices))
845 }
846 }
847
848 fn get_row_filter(
849 predicates: &BoundPredicate,
850 parquet_schema: &SchemaDescriptor,
851 iceberg_field_ids: &HashSet<i32>,
852 field_id_map: &HashMap<i32, usize>,
853 ) -> Result<RowFilter> {
854 let mut column_indices = iceberg_field_ids
857 .iter()
858 .filter_map(|field_id| field_id_map.get(field_id).cloned())
859 .collect::<Vec<_>>();
860 column_indices.sort();
861
862 let mut converter = PredicateConverter {
864 parquet_schema,
865 column_map: field_id_map,
866 column_indices: &column_indices,
867 };
868
869 let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
872 let predicate_func = visit(&mut converter, predicates)?;
873 let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
874 Ok(RowFilter::new(vec![Box::new(arrow_predicate)]))
875 }
876
877 fn get_selected_row_group_indices(
878 predicate: &BoundPredicate,
879 parquet_metadata: &Arc<ParquetMetaData>,
880 field_id_map: &HashMap<i32, usize>,
881 snapshot_schema: &Schema,
882 ) -> Result<Vec<usize>> {
883 let row_groups_metadata = parquet_metadata.row_groups();
884 let mut results = Vec::with_capacity(row_groups_metadata.len());
885
886 for (idx, row_group_metadata) in row_groups_metadata.iter().enumerate() {
887 if RowGroupMetricsEvaluator::eval(
888 predicate,
889 row_group_metadata,
890 field_id_map,
891 snapshot_schema,
892 )? {
893 results.push(idx);
894 }
895 }
896
897 Ok(results)
898 }
899
900 fn get_row_selection_for_filter_predicate(
901 predicate: &BoundPredicate,
902 parquet_metadata: &Arc<ParquetMetaData>,
903 selected_row_groups: &Option<Vec<usize>>,
904 field_id_map: &HashMap<i32, usize>,
905 snapshot_schema: &Schema,
906 ) -> Result<RowSelection> {
907 let Some(column_index) = parquet_metadata.column_index() else {
908 return Err(Error::new(
909 ErrorKind::Unexpected,
910 "Parquet file metadata does not contain a column index",
911 ));
912 };
913
914 let Some(offset_index) = parquet_metadata.offset_index() else {
915 return Err(Error::new(
916 ErrorKind::Unexpected,
917 "Parquet file metadata does not contain an offset index",
918 ));
919 };
920
921 if let Some(selected_row_groups) = selected_row_groups
923 && selected_row_groups.is_empty()
924 {
925 return Ok(RowSelection::from(Vec::new()));
926 }
927
928 let mut selected_row_groups_idx = 0;
929
930 let page_index = column_index
931 .iter()
932 .enumerate()
933 .zip(offset_index)
934 .zip(parquet_metadata.row_groups());
935
936 let mut results = Vec::new();
937 for (((idx, column_index), offset_index), row_group_metadata) in page_index {
938 if let Some(selected_row_groups) = selected_row_groups {
939 if idx == selected_row_groups[selected_row_groups_idx] {
941 selected_row_groups_idx += 1;
942 } else {
943 continue;
944 }
945 }
946
947 let selections_for_page = PageIndexEvaluator::eval(
948 predicate,
949 column_index,
950 offset_index,
951 row_group_metadata,
952 field_id_map,
953 snapshot_schema,
954 )?;
955
956 results.push(selections_for_page);
957
958 if let Some(selected_row_groups) = selected_row_groups
959 && selected_row_groups_idx == selected_row_groups.len()
960 {
961 break;
962 }
963 }
964
965 Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
966 }
967
968 fn filter_row_groups_by_byte_range(
973 parquet_metadata: &Arc<ParquetMetaData>,
974 start: u64,
975 length: u64,
976 ) -> Result<Vec<usize>> {
977 let row_groups = parquet_metadata.row_groups();
978 let mut selected = Vec::new();
979 let end = start + length;
980
981 let mut current_byte_offset = 4u64;
983
984 for (idx, row_group) in row_groups.iter().enumerate() {
985 let row_group_size = row_group.compressed_size() as u64;
986 let row_group_end = current_byte_offset + row_group_size;
987
988 if current_byte_offset < end && start < row_group_end {
989 selected.push(idx);
990 }
991
992 current_byte_offset = row_group_end;
993 }
994
995 Ok(selected)
996 }
997}
998
999fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<Option<HashMap<i32, usize>>> {
1002 let mut column_map = HashMap::new();
1003
1004 for (idx, field) in parquet_schema.columns().iter().enumerate() {
1005 let field_type = field.self_type();
1006 match field_type {
1007 ParquetType::PrimitiveType { basic_info, .. } => {
1008 if !basic_info.has_id() {
1009 return Ok(None);
1010 }
1011 column_map.insert(basic_info.id(), idx);
1012 }
1013 ParquetType::GroupType { .. } => {
1014 return Err(Error::new(
1015 ErrorKind::DataInvalid,
1016 format!(
1017 "Leave column in schema should be primitive type but got {field_type:?}"
1018 ),
1019 ));
1020 }
1021 };
1022 }
1023
1024 Ok(Some(column_map))
1025}
1026
1027fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap<i32, usize> {
1030 let mut column_map = HashMap::new();
1031
1032 for (idx, _field) in parquet_schema.columns().iter().enumerate() {
1034 let field_id = (idx + 1) as i32;
1035 column_map.insert(field_id, idx);
1036 }
1037
1038 column_map
1039}
1040
1041fn apply_name_mapping_to_arrow_schema(
1060 arrow_schema: ArrowSchemaRef,
1061 name_mapping: &NameMapping,
1062) -> Result<Arc<ArrowSchema>> {
1063 debug_assert!(
1064 arrow_schema
1065 .fields()
1066 .iter()
1067 .next()
1068 .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1069 "Schema already has field IDs - name mapping should not be applied"
1070 );
1071
1072 use arrow_schema::Field;
1073
1074 let fields_with_mapped_ids: Vec<_> = arrow_schema
1075 .fields()
1076 .iter()
1077 .map(|field| {
1078 let mapped_field_opt = name_mapping
1086 .fields()
1087 .iter()
1088 .find(|f| f.names().contains(&field.name().to_string()));
1089
1090 let mut metadata = field.metadata().clone();
1091
1092 if let Some(mapped_field) = mapped_field_opt
1093 && let Some(field_id) = mapped_field.field_id()
1094 {
1095 metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1097 }
1098 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1101 .with_metadata(metadata)
1102 })
1103 .collect();
1104
1105 Ok(Arc::new(ArrowSchema::new_with_metadata(
1106 fields_with_mapped_ids,
1107 arrow_schema.metadata().clone(),
1108 )))
1109}
1110
1111fn add_fallback_field_ids_to_arrow_schema(arrow_schema: &ArrowSchemaRef) -> Arc<ArrowSchema> {
1118 debug_assert!(
1119 arrow_schema
1120 .fields()
1121 .iter()
1122 .next()
1123 .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1124 "Schema already has field IDs"
1125 );
1126
1127 use arrow_schema::Field;
1128
1129 let fields_with_fallback_ids: Vec<_> = arrow_schema
1130 .fields()
1131 .iter()
1132 .enumerate()
1133 .map(|(pos, field)| {
1134 let mut metadata = field.metadata().clone();
1135 let field_id = (pos + 1) as i32; metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1137
1138 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1139 .with_metadata(metadata)
1140 })
1141 .collect();
1142
1143 Arc::new(ArrowSchema::new_with_metadata(
1144 fields_with_fallback_ids,
1145 arrow_schema.metadata().clone(),
1146 ))
1147}
1148
1149struct CollectFieldIdVisitor {
1151 field_ids: HashSet<i32>,
1152}
1153
1154impl CollectFieldIdVisitor {
1155 fn field_ids(self) -> HashSet<i32> {
1156 self.field_ids
1157 }
1158}
1159
1160impl BoundPredicateVisitor for CollectFieldIdVisitor {
1161 type T = ();
1162
1163 fn always_true(&mut self) -> Result<()> {
1164 Ok(())
1165 }
1166
1167 fn always_false(&mut self) -> Result<()> {
1168 Ok(())
1169 }
1170
1171 fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1172 Ok(())
1173 }
1174
1175 fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1176 Ok(())
1177 }
1178
1179 fn not(&mut self, _inner: ()) -> Result<()> {
1180 Ok(())
1181 }
1182
1183 fn is_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1184 self.field_ids.insert(reference.field().id);
1185 Ok(())
1186 }
1187
1188 fn not_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1189 self.field_ids.insert(reference.field().id);
1190 Ok(())
1191 }
1192
1193 fn is_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1194 self.field_ids.insert(reference.field().id);
1195 Ok(())
1196 }
1197
1198 fn not_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1199 self.field_ids.insert(reference.field().id);
1200 Ok(())
1201 }
1202
1203 fn less_than(
1204 &mut self,
1205 reference: &BoundReference,
1206 _literal: &Datum,
1207 _predicate: &BoundPredicate,
1208 ) -> Result<()> {
1209 self.field_ids.insert(reference.field().id);
1210 Ok(())
1211 }
1212
1213 fn less_than_or_eq(
1214 &mut self,
1215 reference: &BoundReference,
1216 _literal: &Datum,
1217 _predicate: &BoundPredicate,
1218 ) -> Result<()> {
1219 self.field_ids.insert(reference.field().id);
1220 Ok(())
1221 }
1222
1223 fn greater_than(
1224 &mut self,
1225 reference: &BoundReference,
1226 _literal: &Datum,
1227 _predicate: &BoundPredicate,
1228 ) -> Result<()> {
1229 self.field_ids.insert(reference.field().id);
1230 Ok(())
1231 }
1232
1233 fn greater_than_or_eq(
1234 &mut self,
1235 reference: &BoundReference,
1236 _literal: &Datum,
1237 _predicate: &BoundPredicate,
1238 ) -> Result<()> {
1239 self.field_ids.insert(reference.field().id);
1240 Ok(())
1241 }
1242
1243 fn eq(
1244 &mut self,
1245 reference: &BoundReference,
1246 _literal: &Datum,
1247 _predicate: &BoundPredicate,
1248 ) -> Result<()> {
1249 self.field_ids.insert(reference.field().id);
1250 Ok(())
1251 }
1252
1253 fn not_eq(
1254 &mut self,
1255 reference: &BoundReference,
1256 _literal: &Datum,
1257 _predicate: &BoundPredicate,
1258 ) -> Result<()> {
1259 self.field_ids.insert(reference.field().id);
1260 Ok(())
1261 }
1262
1263 fn starts_with(
1264 &mut self,
1265 reference: &BoundReference,
1266 _literal: &Datum,
1267 _predicate: &BoundPredicate,
1268 ) -> Result<()> {
1269 self.field_ids.insert(reference.field().id);
1270 Ok(())
1271 }
1272
1273 fn not_starts_with(
1274 &mut self,
1275 reference: &BoundReference,
1276 _literal: &Datum,
1277 _predicate: &BoundPredicate,
1278 ) -> Result<()> {
1279 self.field_ids.insert(reference.field().id);
1280 Ok(())
1281 }
1282
1283 fn r#in(
1284 &mut self,
1285 reference: &BoundReference,
1286 _literals: &FnvHashSet<Datum>,
1287 _predicate: &BoundPredicate,
1288 ) -> Result<()> {
1289 self.field_ids.insert(reference.field().id);
1290 Ok(())
1291 }
1292
1293 fn not_in(
1294 &mut self,
1295 reference: &BoundReference,
1296 _literals: &FnvHashSet<Datum>,
1297 _predicate: &BoundPredicate,
1298 ) -> Result<()> {
1299 self.field_ids.insert(reference.field().id);
1300 Ok(())
1301 }
1302}
1303
1304struct PredicateConverter<'a> {
1306 pub parquet_schema: &'a SchemaDescriptor,
1308 pub column_map: &'a HashMap<i32, usize>,
1310 pub column_indices: &'a Vec<usize>,
1312}
1313
1314impl PredicateConverter<'_> {
1315 fn bound_reference(&mut self, reference: &BoundReference) -> Result<Option<usize>> {
1320 if let Some(column_idx) = self.column_map.get(&reference.field().id) {
1322 if self.parquet_schema.get_column_root(*column_idx).is_group() {
1323 return Err(Error::new(
1324 ErrorKind::DataInvalid,
1325 format!(
1326 "Leave column `{}` in predicates isn't a root column in Parquet schema.",
1327 reference.field().name
1328 ),
1329 ));
1330 }
1331
1332 let index = self
1334 .column_indices
1335 .iter()
1336 .position(|&idx| idx == *column_idx)
1337 .ok_or(Error::new(
1338 ErrorKind::DataInvalid,
1339 format!(
1340 "Leave column `{}` in predicates cannot be found in the required column indices.",
1341 reference.field().name
1342 ),
1343 ))?;
1344
1345 Ok(Some(index))
1346 } else {
1347 Ok(None)
1348 }
1349 }
1350
1351 fn build_always_true(&self) -> Result<Box<PredicateResult>> {
1353 Ok(Box::new(|batch| {
1354 Ok(BooleanArray::from(vec![true; batch.num_rows()]))
1355 }))
1356 }
1357
1358 fn build_always_false(&self) -> Result<Box<PredicateResult>> {
1360 Ok(Box::new(|batch| {
1361 Ok(BooleanArray::from(vec![false; batch.num_rows()]))
1362 }))
1363 }
1364}
1365
1366fn project_column(
1369 batch: &RecordBatch,
1370 column_idx: usize,
1371) -> std::result::Result<ArrayRef, ArrowError> {
1372 let column = batch.column(column_idx);
1373
1374 match column.data_type() {
1375 DataType::Struct(_) => Err(ArrowError::SchemaError(
1376 "Does not support struct column yet.".to_string(),
1377 )),
1378 _ => Ok(column.clone()),
1379 }
1380}
1381
1382type PredicateResult =
1383 dyn FnMut(RecordBatch) -> std::result::Result<BooleanArray, ArrowError> + Send + 'static;
1384
1385impl BoundPredicateVisitor for PredicateConverter<'_> {
1386 type T = Box<PredicateResult>;
1387
1388 fn always_true(&mut self) -> Result<Box<PredicateResult>> {
1389 self.build_always_true()
1390 }
1391
1392 fn always_false(&mut self) -> Result<Box<PredicateResult>> {
1393 self.build_always_false()
1394 }
1395
1396 fn and(
1397 &mut self,
1398 mut lhs: Box<PredicateResult>,
1399 mut rhs: Box<PredicateResult>,
1400 ) -> Result<Box<PredicateResult>> {
1401 Ok(Box::new(move |batch| {
1402 let left = lhs(batch.clone())?;
1403 let right = rhs(batch)?;
1404 and_kleene(&left, &right)
1405 }))
1406 }
1407
1408 fn or(
1409 &mut self,
1410 mut lhs: Box<PredicateResult>,
1411 mut rhs: Box<PredicateResult>,
1412 ) -> Result<Box<PredicateResult>> {
1413 Ok(Box::new(move |batch| {
1414 let left = lhs(batch.clone())?;
1415 let right = rhs(batch)?;
1416 or_kleene(&left, &right)
1417 }))
1418 }
1419
1420 fn not(&mut self, mut inner: Box<PredicateResult>) -> Result<Box<PredicateResult>> {
1421 Ok(Box::new(move |batch| {
1422 let pred_ret = inner(batch)?;
1423 not(&pred_ret)
1424 }))
1425 }
1426
1427 fn is_null(
1428 &mut self,
1429 reference: &BoundReference,
1430 _predicate: &BoundPredicate,
1431 ) -> Result<Box<PredicateResult>> {
1432 if let Some(idx) = self.bound_reference(reference)? {
1433 Ok(Box::new(move |batch| {
1434 let column = project_column(&batch, idx)?;
1435 is_null(&column)
1436 }))
1437 } else {
1438 self.build_always_true()
1440 }
1441 }
1442
1443 fn not_null(
1444 &mut self,
1445 reference: &BoundReference,
1446 _predicate: &BoundPredicate,
1447 ) -> Result<Box<PredicateResult>> {
1448 if let Some(idx) = self.bound_reference(reference)? {
1449 Ok(Box::new(move |batch| {
1450 let column = project_column(&batch, idx)?;
1451 is_not_null(&column)
1452 }))
1453 } else {
1454 self.build_always_false()
1456 }
1457 }
1458
1459 fn is_nan(
1460 &mut self,
1461 reference: &BoundReference,
1462 _predicate: &BoundPredicate,
1463 ) -> Result<Box<PredicateResult>> {
1464 if self.bound_reference(reference)?.is_some() {
1465 self.build_always_true()
1466 } else {
1467 self.build_always_false()
1469 }
1470 }
1471
1472 fn not_nan(
1473 &mut self,
1474 reference: &BoundReference,
1475 _predicate: &BoundPredicate,
1476 ) -> Result<Box<PredicateResult>> {
1477 if self.bound_reference(reference)?.is_some() {
1478 self.build_always_false()
1479 } else {
1480 self.build_always_true()
1482 }
1483 }
1484
1485 fn less_than(
1486 &mut self,
1487 reference: &BoundReference,
1488 literal: &Datum,
1489 _predicate: &BoundPredicate,
1490 ) -> Result<Box<PredicateResult>> {
1491 if let Some(idx) = self.bound_reference(reference)? {
1492 let literal = get_arrow_datum(literal)?;
1493
1494 Ok(Box::new(move |batch| {
1495 let left = project_column(&batch, idx)?;
1496 let literal = try_cast_literal(&literal, left.data_type())?;
1497 lt(&left, literal.as_ref())
1498 }))
1499 } else {
1500 self.build_always_true()
1502 }
1503 }
1504
1505 fn less_than_or_eq(
1506 &mut self,
1507 reference: &BoundReference,
1508 literal: &Datum,
1509 _predicate: &BoundPredicate,
1510 ) -> Result<Box<PredicateResult>> {
1511 if let Some(idx) = self.bound_reference(reference)? {
1512 let literal = get_arrow_datum(literal)?;
1513
1514 Ok(Box::new(move |batch| {
1515 let left = project_column(&batch, idx)?;
1516 let literal = try_cast_literal(&literal, left.data_type())?;
1517 lt_eq(&left, literal.as_ref())
1518 }))
1519 } else {
1520 self.build_always_true()
1522 }
1523 }
1524
1525 fn greater_than(
1526 &mut self,
1527 reference: &BoundReference,
1528 literal: &Datum,
1529 _predicate: &BoundPredicate,
1530 ) -> Result<Box<PredicateResult>> {
1531 if let Some(idx) = self.bound_reference(reference)? {
1532 let literal = get_arrow_datum(literal)?;
1533
1534 Ok(Box::new(move |batch| {
1535 let left = project_column(&batch, idx)?;
1536 let literal = try_cast_literal(&literal, left.data_type())?;
1537 gt(&left, literal.as_ref())
1538 }))
1539 } else {
1540 self.build_always_false()
1542 }
1543 }
1544
1545 fn greater_than_or_eq(
1546 &mut self,
1547 reference: &BoundReference,
1548 literal: &Datum,
1549 _predicate: &BoundPredicate,
1550 ) -> Result<Box<PredicateResult>> {
1551 if let Some(idx) = self.bound_reference(reference)? {
1552 let literal = get_arrow_datum(literal)?;
1553
1554 Ok(Box::new(move |batch| {
1555 let left = project_column(&batch, idx)?;
1556 let literal = try_cast_literal(&literal, left.data_type())?;
1557 gt_eq(&left, literal.as_ref())
1558 }))
1559 } else {
1560 self.build_always_false()
1562 }
1563 }
1564
1565 fn eq(
1566 &mut self,
1567 reference: &BoundReference,
1568 literal: &Datum,
1569 _predicate: &BoundPredicate,
1570 ) -> Result<Box<PredicateResult>> {
1571 if let Some(idx) = self.bound_reference(reference)? {
1572 let literal = get_arrow_datum(literal)?;
1573
1574 Ok(Box::new(move |batch| {
1575 let left = project_column(&batch, idx)?;
1576 let literal = try_cast_literal(&literal, left.data_type())?;
1577 eq(&left, literal.as_ref())
1578 }))
1579 } else {
1580 self.build_always_false()
1582 }
1583 }
1584
1585 fn not_eq(
1586 &mut self,
1587 reference: &BoundReference,
1588 literal: &Datum,
1589 _predicate: &BoundPredicate,
1590 ) -> Result<Box<PredicateResult>> {
1591 if let Some(idx) = self.bound_reference(reference)? {
1592 let literal = get_arrow_datum(literal)?;
1593
1594 Ok(Box::new(move |batch| {
1595 let left = project_column(&batch, idx)?;
1596 let literal = try_cast_literal(&literal, left.data_type())?;
1597 neq(&left, literal.as_ref())
1598 }))
1599 } else {
1600 self.build_always_false()
1602 }
1603 }
1604
1605 fn starts_with(
1606 &mut self,
1607 reference: &BoundReference,
1608 literal: &Datum,
1609 _predicate: &BoundPredicate,
1610 ) -> Result<Box<PredicateResult>> {
1611 if let Some(idx) = self.bound_reference(reference)? {
1612 let literal = get_arrow_datum(literal)?;
1613
1614 Ok(Box::new(move |batch| {
1615 let left = project_column(&batch, idx)?;
1616 let literal = try_cast_literal(&literal, left.data_type())?;
1617 starts_with(&left, literal.as_ref())
1618 }))
1619 } else {
1620 self.build_always_false()
1622 }
1623 }
1624
1625 fn not_starts_with(
1626 &mut self,
1627 reference: &BoundReference,
1628 literal: &Datum,
1629 _predicate: &BoundPredicate,
1630 ) -> Result<Box<PredicateResult>> {
1631 if let Some(idx) = self.bound_reference(reference)? {
1632 let literal = get_arrow_datum(literal)?;
1633
1634 Ok(Box::new(move |batch| {
1635 let left = project_column(&batch, idx)?;
1636 let literal = try_cast_literal(&literal, left.data_type())?;
1637 not(&starts_with(&left, literal.as_ref())?)
1639 }))
1640 } else {
1641 self.build_always_true()
1643 }
1644 }
1645
1646 fn r#in(
1647 &mut self,
1648 reference: &BoundReference,
1649 literals: &FnvHashSet<Datum>,
1650 _predicate: &BoundPredicate,
1651 ) -> Result<Box<PredicateResult>> {
1652 if let Some(idx) = self.bound_reference(reference)? {
1653 let literals: Vec<_> = literals
1654 .iter()
1655 .map(|lit| get_arrow_datum(lit).unwrap())
1656 .collect();
1657
1658 Ok(Box::new(move |batch| {
1659 let left = project_column(&batch, idx)?;
1661
1662 let mut acc = BooleanArray::from(vec![false; batch.num_rows()]);
1663 for literal in &literals {
1664 let literal = try_cast_literal(literal, left.data_type())?;
1665 acc = or(&acc, &eq(&left, literal.as_ref())?)?
1666 }
1667
1668 Ok(acc)
1669 }))
1670 } else {
1671 self.build_always_false()
1673 }
1674 }
1675
1676 fn not_in(
1677 &mut self,
1678 reference: &BoundReference,
1679 literals: &FnvHashSet<Datum>,
1680 _predicate: &BoundPredicate,
1681 ) -> Result<Box<PredicateResult>> {
1682 if let Some(idx) = self.bound_reference(reference)? {
1683 let literals: Vec<_> = literals
1684 .iter()
1685 .map(|lit| get_arrow_datum(lit).unwrap())
1686 .collect();
1687
1688 Ok(Box::new(move |batch| {
1689 let left = project_column(&batch, idx)?;
1691 let mut acc = BooleanArray::from(vec![true; batch.num_rows()]);
1692 for literal in &literals {
1693 let literal = try_cast_literal(literal, left.data_type())?;
1694 acc = and(&acc, &neq(&left, literal.as_ref())?)?
1695 }
1696
1697 Ok(acc)
1698 }))
1699 } else {
1700 self.build_always_true()
1702 }
1703 }
1704}
1705
1706pub struct ArrowFileReader {
1708 meta: FileMetadata,
1709 preload_column_index: bool,
1710 preload_offset_index: bool,
1711 preload_page_index: bool,
1712 metadata_size_hint: Option<usize>,
1713 r: Box<dyn FileRead>,
1714}
1715
1716impl ArrowFileReader {
1717 pub fn new(meta: FileMetadata, r: Box<dyn FileRead>) -> Self {
1719 Self {
1720 meta,
1721 preload_column_index: false,
1722 preload_offset_index: false,
1723 preload_page_index: false,
1724 metadata_size_hint: None,
1725 r,
1726 }
1727 }
1728
1729 pub fn with_preload_column_index(mut self, preload: bool) -> Self {
1731 self.preload_column_index = preload;
1732 self
1733 }
1734
1735 pub fn with_preload_offset_index(mut self, preload: bool) -> Self {
1737 self.preload_offset_index = preload;
1738 self
1739 }
1740
1741 pub fn with_preload_page_index(mut self, preload: bool) -> Self {
1743 self.preload_page_index = preload;
1744 self
1745 }
1746
1747 pub fn with_metadata_size_hint(mut self, hint: usize) -> Self {
1752 self.metadata_size_hint = Some(hint);
1753 self
1754 }
1755}
1756
1757impl AsyncFileReader for ArrowFileReader {
1758 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
1759 Box::pin(
1760 self.r
1761 .read(range.start..range.end)
1762 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
1763 )
1764 }
1765
1766 fn get_metadata(
1769 &mut self,
1770 _options: Option<&'_ ArrowReaderOptions>,
1771 ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
1772 async move {
1773 let reader = ParquetMetaDataReader::new()
1774 .with_prefetch_hint(self.metadata_size_hint)
1775 .with_page_index_policy(PageIndexPolicy::from(self.preload_page_index))
1777 .with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
1778 .with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index));
1779 let size = self.meta.size;
1780 let meta = reader.load_and_finish(self, size).await?;
1781
1782 Ok(Arc::new(meta))
1783 }
1784 .boxed()
1785 }
1786}
1787
1788fn try_cast_literal(
1795 literal: &Arc<dyn ArrowDatum + Send + Sync>,
1796 column_type: &DataType,
1797) -> std::result::Result<Arc<dyn ArrowDatum + Send + Sync>, ArrowError> {
1798 let literal_array = literal.get().0;
1799
1800 if literal_array.data_type() == column_type {
1802 return Ok(Arc::clone(literal));
1803 }
1804
1805 let literal_array = cast(literal_array, column_type)?;
1806 Ok(Arc::new(Scalar::new(literal_array)))
1807}
1808
1809#[cfg(test)]
1810mod tests {
1811 use std::collections::{HashMap, HashSet};
1812 use std::fs::File;
1813 use std::sync::Arc;
1814
1815 use arrow_array::cast::AsArray;
1816 use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray};
1817 use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1818 use futures::TryStreamExt;
1819 use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
1820 use parquet::arrow::{ArrowWriter, ProjectionMask};
1821 use parquet::basic::Compression;
1822 use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1823 use parquet::file::properties::WriterProperties;
1824 use parquet::schema::parser::parse_message_type;
1825 use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
1826 use roaring::RoaringTreemap;
1827 use tempfile::TempDir;
1828
1829 use crate::ErrorKind;
1830 use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
1831 use crate::arrow::{ArrowReader, ArrowReaderBuilder};
1832 use crate::delete_vector::DeleteVector;
1833 use crate::expr::visitors::bound_predicate_visitor::visit;
1834 use crate::expr::{Bind, Predicate, Reference};
1835 use crate::io::FileIO;
1836 use crate::scan::{FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream};
1837 use crate::spec::{
1838 DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type,
1839 };
1840
1841 fn table_schema_simple() -> SchemaRef {
1842 Arc::new(
1843 Schema::builder()
1844 .with_schema_id(1)
1845 .with_identifier_field_ids(vec![2])
1846 .with_fields(vec![
1847 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1848 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1849 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1850 NestedField::optional(4, "qux", Type::Primitive(PrimitiveType::Float)).into(),
1851 ])
1852 .build()
1853 .unwrap(),
1854 )
1855 }
1856
1857 #[test]
1858 fn test_collect_field_id() {
1859 let schema = table_schema_simple();
1860 let expr = Reference::new("qux").is_null();
1861 let bound_expr = expr.bind(schema, true).unwrap();
1862
1863 let mut visitor = CollectFieldIdVisitor {
1864 field_ids: HashSet::default(),
1865 };
1866 visit(&mut visitor, &bound_expr).unwrap();
1867
1868 let mut expected = HashSet::default();
1869 expected.insert(4_i32);
1870
1871 assert_eq!(visitor.field_ids, expected);
1872 }
1873
1874 #[test]
1875 fn test_collect_field_id_with_and() {
1876 let schema = table_schema_simple();
1877 let expr = Reference::new("qux")
1878 .is_null()
1879 .and(Reference::new("baz").is_null());
1880 let bound_expr = expr.bind(schema, true).unwrap();
1881
1882 let mut visitor = CollectFieldIdVisitor {
1883 field_ids: HashSet::default(),
1884 };
1885 visit(&mut visitor, &bound_expr).unwrap();
1886
1887 let mut expected = HashSet::default();
1888 expected.insert(4_i32);
1889 expected.insert(3);
1890
1891 assert_eq!(visitor.field_ids, expected);
1892 }
1893
1894 #[test]
1895 fn test_collect_field_id_with_or() {
1896 let schema = table_schema_simple();
1897 let expr = Reference::new("qux")
1898 .is_null()
1899 .or(Reference::new("baz").is_null());
1900 let bound_expr = expr.bind(schema, true).unwrap();
1901
1902 let mut visitor = CollectFieldIdVisitor {
1903 field_ids: HashSet::default(),
1904 };
1905 visit(&mut visitor, &bound_expr).unwrap();
1906
1907 let mut expected = HashSet::default();
1908 expected.insert(4_i32);
1909 expected.insert(3);
1910
1911 assert_eq!(visitor.field_ids, expected);
1912 }
1913
1914 #[test]
1915 fn test_arrow_projection_mask() {
1916 let schema = Arc::new(
1917 Schema::builder()
1918 .with_schema_id(1)
1919 .with_identifier_field_ids(vec![1])
1920 .with_fields(vec![
1921 NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
1922 NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
1923 NestedField::optional(
1924 3,
1925 "c3",
1926 Type::Primitive(PrimitiveType::Decimal {
1927 precision: 38,
1928 scale: 3,
1929 }),
1930 )
1931 .into(),
1932 ])
1933 .build()
1934 .unwrap(),
1935 );
1936 let arrow_schema = Arc::new(ArrowSchema::new(vec![
1937 Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
1938 PARQUET_FIELD_ID_META_KEY.to_string(),
1939 "1".to_string(),
1940 )])),
1941 Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
1943 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
1944 ),
1945 Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
1947 PARQUET_FIELD_ID_META_KEY.to_string(),
1948 "3".to_string(),
1949 )])),
1950 ]));
1951
1952 let message_type = "
1953message schema {
1954 required binary c1 (STRING) = 1;
1955 optional int32 c2 (INTEGER(8,true)) = 2;
1956 optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
1957}
1958 ";
1959 let parquet_type = parse_message_type(message_type).expect("should parse schema");
1960 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
1961
1962 let err = ArrowReader::get_arrow_projection_mask(
1964 &[1, 2, 3],
1965 &schema,
1966 &parquet_schema,
1967 &arrow_schema,
1968 false,
1969 )
1970 .unwrap_err();
1971
1972 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1973 assert_eq!(
1974 err.to_string(),
1975 "DataInvalid => Unsupported Arrow data type: Duration(µs)".to_string()
1976 );
1977
1978 let err = ArrowReader::get_arrow_projection_mask(
1980 &[1, 3],
1981 &schema,
1982 &parquet_schema,
1983 &arrow_schema,
1984 false,
1985 )
1986 .unwrap_err();
1987
1988 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1989 assert_eq!(
1990 err.to_string(),
1991 "DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
1992 );
1993
1994 let mask = ArrowReader::get_arrow_projection_mask(
1996 &[1],
1997 &schema,
1998 &parquet_schema,
1999 &arrow_schema,
2000 false,
2001 )
2002 .expect("Some ProjectionMask");
2003 assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
2004 }
2005
2006 #[tokio::test]
2007 async fn test_kleene_logic_or_behaviour() {
2008 let predicate = Reference::new("a")
2010 .is_null()
2011 .or(Reference::new("a").equal_to(Datum::string("foo")));
2012
2013 let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
2015
2016 let expected = vec![None, Some("foo".to_string())];
2018
2019 let (file_io, schema, table_location, _temp_dir) =
2020 setup_kleene_logic(data_for_col_a, DataType::Utf8);
2021 let reader = ArrowReaderBuilder::new(file_io).build();
2022
2023 let result_data = test_perform_read(predicate, schema, table_location, reader).await;
2024
2025 assert_eq!(result_data, expected);
2026 }
2027
2028 #[tokio::test]
2029 async fn test_kleene_logic_and_behaviour() {
2030 let predicate = Reference::new("a")
2032 .is_not_null()
2033 .and(Reference::new("a").not_equal_to(Datum::string("foo")));
2034
2035 let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
2037
2038 let expected = vec![Some("bar".to_string())];
2040
2041 let (file_io, schema, table_location, _temp_dir) =
2042 setup_kleene_logic(data_for_col_a, DataType::Utf8);
2043 let reader = ArrowReaderBuilder::new(file_io).build();
2044
2045 let result_data = test_perform_read(predicate, schema, table_location, reader).await;
2046
2047 assert_eq!(result_data, expected);
2048 }
2049
2050 #[tokio::test]
2051 async fn test_predicate_cast_literal() {
2052 let predicates = vec![
2053 (Reference::new("a").equal_to(Datum::string("foo")), vec![
2055 Some("foo".to_string()),
2056 ]),
2057 (
2059 Reference::new("a").not_equal_to(Datum::string("foo")),
2060 vec![Some("bar".to_string())],
2061 ),
2062 (Reference::new("a").starts_with(Datum::string("f")), vec![
2064 Some("foo".to_string()),
2065 ]),
2066 (
2068 Reference::new("a").not_starts_with(Datum::string("f")),
2069 vec![Some("bar".to_string())],
2070 ),
2071 (Reference::new("a").less_than(Datum::string("foo")), vec![
2073 Some("bar".to_string()),
2074 ]),
2075 (
2077 Reference::new("a").less_than_or_equal_to(Datum::string("foo")),
2078 vec![Some("foo".to_string()), Some("bar".to_string())],
2079 ),
2080 (
2082 Reference::new("a").greater_than(Datum::string("bar")),
2083 vec![Some("foo".to_string())],
2084 ),
2085 (
2087 Reference::new("a").greater_than_or_equal_to(Datum::string("foo")),
2088 vec![Some("foo".to_string())],
2089 ),
2090 (
2092 Reference::new("a").is_in([Datum::string("foo"), Datum::string("baz")]),
2093 vec![Some("foo".to_string())],
2094 ),
2095 (
2097 Reference::new("a").is_not_in([Datum::string("foo"), Datum::string("baz")]),
2098 vec![Some("bar".to_string())],
2099 ),
2100 ];
2101
2102 let data_for_col_a = vec![Some("foo".to_string()), Some("bar".to_string())];
2104
2105 let (file_io, schema, table_location, _temp_dir) =
2106 setup_kleene_logic(data_for_col_a, DataType::LargeUtf8);
2107 let reader = ArrowReaderBuilder::new(file_io).build();
2108
2109 for (predicate, expected) in predicates {
2110 println!("testing predicate {predicate}");
2111 let result_data = test_perform_read(
2112 predicate.clone(),
2113 schema.clone(),
2114 table_location.clone(),
2115 reader.clone(),
2116 )
2117 .await;
2118
2119 assert_eq!(result_data, expected, "predicate={predicate}");
2120 }
2121 }
2122
2123 async fn test_perform_read(
2124 predicate: Predicate,
2125 schema: SchemaRef,
2126 table_location: String,
2127 reader: ArrowReader,
2128 ) -> Vec<Option<String>> {
2129 let tasks = Box::pin(futures::stream::iter(
2130 vec![Ok(FileScanTask {
2131 file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
2132 .unwrap()
2133 .len(),
2134 start: 0,
2135 length: 0,
2136 record_count: None,
2137 data_file_path: format!("{table_location}/1.parquet"),
2138 data_file_format: DataFileFormat::Parquet,
2139 schema: schema.clone(),
2140 project_field_ids: vec![1],
2141 predicate: Some(predicate.bind(schema, true).unwrap()),
2142 deletes: vec![],
2143 partition: None,
2144 partition_spec: None,
2145 name_mapping: None,
2146 case_sensitive: false,
2147 })]
2148 .into_iter(),
2149 )) as FileScanTaskStream;
2150
2151 let result = reader
2152 .read(tasks)
2153 .unwrap()
2154 .try_collect::<Vec<RecordBatch>>()
2155 .await
2156 .unwrap();
2157
2158 result[0].columns()[0]
2159 .as_string_opt::<i32>()
2160 .unwrap()
2161 .iter()
2162 .map(|v| v.map(ToOwned::to_owned))
2163 .collect::<Vec<_>>()
2164 }
2165
2166 fn setup_kleene_logic(
2167 data_for_col_a: Vec<Option<String>>,
2168 col_a_type: DataType,
2169 ) -> (FileIO, SchemaRef, String, TempDir) {
2170 let schema = Arc::new(
2171 Schema::builder()
2172 .with_schema_id(1)
2173 .with_fields(vec![
2174 NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(),
2175 ])
2176 .build()
2177 .unwrap(),
2178 );
2179
2180 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2181 Field::new("a", col_a_type.clone(), true).with_metadata(HashMap::from([(
2182 PARQUET_FIELD_ID_META_KEY.to_string(),
2183 "1".to_string(),
2184 )])),
2185 ]));
2186
2187 let tmp_dir = TempDir::new().unwrap();
2188 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2189
2190 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2191
2192 let col = match col_a_type {
2193 DataType::Utf8 => Arc::new(StringArray::from(data_for_col_a)) as ArrayRef,
2194 DataType::LargeUtf8 => Arc::new(LargeStringArray::from(data_for_col_a)) as ArrayRef,
2195 _ => panic!("unexpected col_a_type"),
2196 };
2197
2198 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col]).unwrap();
2199
2200 let props = WriterProperties::builder()
2202 .set_compression(Compression::SNAPPY)
2203 .build();
2204
2205 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
2206 let mut writer =
2207 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
2208
2209 writer.write(&to_write).expect("Writing batch");
2210
2211 writer.close().unwrap();
2213
2214 (file_io, schema, table_location, tmp_dir)
2215 }
2216
2217 #[test]
2218 fn test_build_deletes_row_selection() {
2219 let schema_descr = get_test_schema_descr();
2220
2221 let mut columns = vec![];
2222 for ptr in schema_descr.columns() {
2223 let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
2224 columns.push(column);
2225 }
2226
2227 let row_groups_metadata = vec![
2228 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 0),
2229 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 1),
2230 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 2),
2231 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 3),
2232 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 4),
2233 ];
2234
2235 let selected_row_groups = Some(vec![1, 3]);
2236
2237 let positional_deletes = RoaringTreemap::from_iter(&[
2244 1, 3, 4, 5, 998, 999, 1000, 1010, 1011, 1012, 1498, 1499, 1500, 1501, 1600, 1999, 2000, 2001, 2100, 2200, 2201, 2202, 2999, 3000, ]);
2262
2263 let positional_deletes = DeleteVector::new(positional_deletes);
2264
2265 let result = ArrowReader::build_deletes_row_selection(
2267 &row_groups_metadata,
2268 &selected_row_groups,
2269 &positional_deletes,
2270 )
2271 .unwrap();
2272
2273 let expected = RowSelection::from(vec![
2274 RowSelector::skip(1),
2275 RowSelector::select(9),
2276 RowSelector::skip(3),
2277 RowSelector::select(485),
2278 RowSelector::skip(4),
2279 RowSelector::select(98),
2280 RowSelector::skip(1),
2281 RowSelector::select(99),
2282 RowSelector::skip(3),
2283 RowSelector::select(796),
2284 RowSelector::skip(1),
2285 ]);
2286
2287 assert_eq!(result, expected);
2288
2289 let result = ArrowReader::build_deletes_row_selection(
2291 &row_groups_metadata,
2292 &None,
2293 &positional_deletes,
2294 )
2295 .unwrap();
2296
2297 let expected = RowSelection::from(vec![
2298 RowSelector::select(1),
2299 RowSelector::skip(1),
2300 RowSelector::select(1),
2301 RowSelector::skip(3),
2302 RowSelector::select(992),
2303 RowSelector::skip(3),
2304 RowSelector::select(9),
2305 RowSelector::skip(3),
2306 RowSelector::select(485),
2307 RowSelector::skip(4),
2308 RowSelector::select(98),
2309 RowSelector::skip(1),
2310 RowSelector::select(398),
2311 RowSelector::skip(3),
2312 RowSelector::select(98),
2313 RowSelector::skip(1),
2314 RowSelector::select(99),
2315 RowSelector::skip(3),
2316 RowSelector::select(796),
2317 RowSelector::skip(2),
2318 RowSelector::select(499),
2319 ]);
2320
2321 assert_eq!(result, expected);
2322 }
2323
2324 fn build_test_row_group_meta(
2325 schema_descr: SchemaDescPtr,
2326 columns: Vec<ColumnChunkMetaData>,
2327 num_rows: i64,
2328 ordinal: i16,
2329 ) -> RowGroupMetaData {
2330 RowGroupMetaData::builder(schema_descr.clone())
2331 .set_num_rows(num_rows)
2332 .set_total_byte_size(2000)
2333 .set_column_metadata(columns)
2334 .set_ordinal(ordinal)
2335 .build()
2336 .unwrap()
2337 }
2338
2339 fn get_test_schema_descr() -> SchemaDescPtr {
2340 use parquet::schema::types::Type as SchemaType;
2341
2342 let schema = SchemaType::group_type_builder("schema")
2343 .with_fields(vec![
2344 Arc::new(
2345 SchemaType::primitive_type_builder("a", parquet::basic::Type::INT32)
2346 .build()
2347 .unwrap(),
2348 ),
2349 Arc::new(
2350 SchemaType::primitive_type_builder("b", parquet::basic::Type::INT32)
2351 .build()
2352 .unwrap(),
2353 ),
2354 ])
2355 .build()
2356 .unwrap();
2357
2358 Arc::new(SchemaDescriptor::new(Arc::new(schema)))
2359 }
2360
2361 #[tokio::test]
2363 async fn test_file_splits_respect_byte_ranges() {
2364 use arrow_array::Int32Array;
2365 use parquet::file::reader::{FileReader, SerializedFileReader};
2366
2367 let schema = Arc::new(
2368 Schema::builder()
2369 .with_schema_id(1)
2370 .with_fields(vec![
2371 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2372 ])
2373 .build()
2374 .unwrap(),
2375 );
2376
2377 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2378 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2379 PARQUET_FIELD_ID_META_KEY.to_string(),
2380 "1".to_string(),
2381 )])),
2382 ]));
2383
2384 let tmp_dir = TempDir::new().unwrap();
2385 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2386 let file_path = format!("{table_location}/multi_row_group.parquet");
2387
2388 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2390 (0..100).collect::<Vec<i32>>(),
2391 ))])
2392 .unwrap();
2393 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2394 (100..200).collect::<Vec<i32>>(),
2395 ))])
2396 .unwrap();
2397 let batch3 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2398 (200..300).collect::<Vec<i32>>(),
2399 ))])
2400 .unwrap();
2401
2402 let props = WriterProperties::builder()
2403 .set_compression(Compression::SNAPPY)
2404 .set_max_row_group_size(100)
2405 .build();
2406
2407 let file = File::create(&file_path).unwrap();
2408 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2409 writer.write(&batch1).expect("Writing batch 1");
2410 writer.write(&batch2).expect("Writing batch 2");
2411 writer.write(&batch3).expect("Writing batch 3");
2412 writer.close().unwrap();
2413
2414 let file = File::open(&file_path).unwrap();
2416 let reader = SerializedFileReader::new(file).unwrap();
2417 let metadata = reader.metadata();
2418
2419 println!("File has {} row groups", metadata.num_row_groups());
2420 assert_eq!(metadata.num_row_groups(), 3, "Expected 3 row groups");
2421
2422 let row_group_0 = metadata.row_group(0);
2424 let row_group_1 = metadata.row_group(1);
2425 let row_group_2 = metadata.row_group(2);
2426
2427 let rg0_start = 4u64; let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2429 let rg2_start = rg1_start + row_group_1.compressed_size() as u64;
2430 let file_end = rg2_start + row_group_2.compressed_size() as u64;
2431
2432 println!(
2433 "Row group 0: {} rows, starts at byte {}, {} bytes compressed",
2434 row_group_0.num_rows(),
2435 rg0_start,
2436 row_group_0.compressed_size()
2437 );
2438 println!(
2439 "Row group 1: {} rows, starts at byte {}, {} bytes compressed",
2440 row_group_1.num_rows(),
2441 rg1_start,
2442 row_group_1.compressed_size()
2443 );
2444 println!(
2445 "Row group 2: {} rows, starts at byte {}, {} bytes compressed",
2446 row_group_2.num_rows(),
2447 rg2_start,
2448 row_group_2.compressed_size()
2449 );
2450
2451 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2452 let reader = ArrowReaderBuilder::new(file_io).build();
2453
2454 let task1 = FileScanTask {
2456 file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(),
2457 start: rg0_start,
2458 length: row_group_0.compressed_size() as u64,
2459 record_count: Some(100),
2460 data_file_path: file_path.clone(),
2461 data_file_format: DataFileFormat::Parquet,
2462 schema: schema.clone(),
2463 project_field_ids: vec![1],
2464 predicate: None,
2465 deletes: vec![],
2466 partition: None,
2467 partition_spec: None,
2468 name_mapping: None,
2469 case_sensitive: false,
2470 };
2471
2472 let task2 = FileScanTask {
2474 file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(),
2475 start: rg1_start,
2476 length: file_end - rg1_start,
2477 record_count: Some(200),
2478 data_file_path: file_path.clone(),
2479 data_file_format: DataFileFormat::Parquet,
2480 schema: schema.clone(),
2481 project_field_ids: vec![1],
2482 predicate: None,
2483 deletes: vec![],
2484 partition: None,
2485 partition_spec: None,
2486 name_mapping: None,
2487 case_sensitive: false,
2488 };
2489
2490 let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
2491 let result1 = reader
2492 .clone()
2493 .read(tasks1)
2494 .unwrap()
2495 .try_collect::<Vec<RecordBatch>>()
2496 .await
2497 .unwrap();
2498
2499 let total_rows_task1: usize = result1.iter().map(|b| b.num_rows()).sum();
2500 println!(
2501 "Task 1 (bytes {}-{}) returned {} rows",
2502 rg0_start,
2503 rg0_start + row_group_0.compressed_size() as u64,
2504 total_rows_task1
2505 );
2506
2507 let tasks2 = Box::pin(futures::stream::iter(vec![Ok(task2)])) as FileScanTaskStream;
2508 let result2 = reader
2509 .read(tasks2)
2510 .unwrap()
2511 .try_collect::<Vec<RecordBatch>>()
2512 .await
2513 .unwrap();
2514
2515 let total_rows_task2: usize = result2.iter().map(|b| b.num_rows()).sum();
2516 println!("Task 2 (bytes {rg1_start}-{file_end}) returned {total_rows_task2} rows");
2517
2518 assert_eq!(
2519 total_rows_task1, 100,
2520 "Task 1 should read only the first row group (100 rows), but got {total_rows_task1} rows"
2521 );
2522
2523 assert_eq!(
2524 total_rows_task2, 200,
2525 "Task 2 should read only the second+third row groups (200 rows), but got {total_rows_task2} rows"
2526 );
2527
2528 if total_rows_task1 > 0 {
2530 let first_batch = &result1[0];
2531 let id_col = first_batch
2532 .column(0)
2533 .as_primitive::<arrow_array::types::Int32Type>();
2534 let first_val = id_col.value(0);
2535 let last_val = id_col.value(id_col.len() - 1);
2536 println!("Task 1 data range: {first_val} to {last_val}");
2537
2538 assert_eq!(first_val, 0, "Task 1 should start with id=0");
2539 assert_eq!(last_val, 99, "Task 1 should end with id=99");
2540 }
2541
2542 if total_rows_task2 > 0 {
2543 let first_batch = &result2[0];
2544 let id_col = first_batch
2545 .column(0)
2546 .as_primitive::<arrow_array::types::Int32Type>();
2547 let first_val = id_col.value(0);
2548 println!("Task 2 first value: {first_val}");
2549
2550 assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0");
2551 }
2552 }
2553
2554 #[tokio::test]
2560 async fn test_schema_evolution_add_column() {
2561 use arrow_array::{Array, Int32Array};
2562
2563 let new_schema = Arc::new(
2565 Schema::builder()
2566 .with_schema_id(2)
2567 .with_fields(vec![
2568 NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)).into(),
2569 NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
2570 ])
2571 .build()
2572 .unwrap(),
2573 );
2574
2575 let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
2577 Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([(
2578 PARQUET_FIELD_ID_META_KEY.to_string(),
2579 "1".to_string(),
2580 )])),
2581 ]));
2582
2583 let tmp_dir = TempDir::new().unwrap();
2585 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2586 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2587
2588 let data_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
2589 let to_write = RecordBatch::try_new(arrow_schema_old.clone(), vec![data_a]).unwrap();
2590
2591 let props = WriterProperties::builder()
2592 .set_compression(Compression::SNAPPY)
2593 .build();
2594 let file = File::create(format!("{table_location}/old_file.parquet")).unwrap();
2595 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
2596 writer.write(&to_write).expect("Writing batch");
2597 writer.close().unwrap();
2598
2599 let reader = ArrowReaderBuilder::new(file_io).build();
2601 let tasks = Box::pin(futures::stream::iter(
2602 vec![Ok(FileScanTask {
2603 file_size_in_bytes: std::fs::metadata(format!("{table_location}/old_file.parquet"))
2604 .unwrap()
2605 .len(),
2606 start: 0,
2607 length: 0,
2608 record_count: None,
2609 data_file_path: format!("{table_location}/old_file.parquet"),
2610 data_file_format: DataFileFormat::Parquet,
2611 schema: new_schema.clone(),
2612 project_field_ids: vec![1, 2], predicate: None,
2614 deletes: vec![],
2615 partition: None,
2616 partition_spec: None,
2617 name_mapping: None,
2618 case_sensitive: false,
2619 })]
2620 .into_iter(),
2621 )) as FileScanTaskStream;
2622
2623 let result = reader
2624 .read(tasks)
2625 .unwrap()
2626 .try_collect::<Vec<RecordBatch>>()
2627 .await
2628 .unwrap();
2629
2630 assert_eq!(result.len(), 1);
2632 let batch = &result[0];
2633
2634 assert_eq!(batch.num_columns(), 2);
2636 assert_eq!(batch.num_rows(), 3);
2637
2638 let col_a = batch
2640 .column(0)
2641 .as_primitive::<arrow_array::types::Int32Type>();
2642 assert_eq!(col_a.values(), &[1, 2, 3]);
2643
2644 let col_b = batch
2646 .column(1)
2647 .as_primitive::<arrow_array::types::Int32Type>();
2648 assert_eq!(col_b.null_count(), 3);
2649 assert!(col_b.is_null(0));
2650 assert!(col_b.is_null(1));
2651 assert!(col_b.is_null(2));
2652 }
2653
2654 #[tokio::test]
2672 async fn test_position_delete_across_multiple_row_groups() {
2673 use arrow_array::{Int32Array, Int64Array};
2674 use parquet::file::reader::{FileReader, SerializedFileReader};
2675
2676 const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2678 const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2679
2680 let tmp_dir = TempDir::new().unwrap();
2681 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2682
2683 let table_schema = Arc::new(
2685 Schema::builder()
2686 .with_schema_id(1)
2687 .with_fields(vec![
2688 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2689 ])
2690 .build()
2691 .unwrap(),
2692 );
2693
2694 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2695 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2696 PARQUET_FIELD_ID_META_KEY.to_string(),
2697 "1".to_string(),
2698 )])),
2699 ]));
2700
2701 let data_file_path = format!("{table_location}/data.parquet");
2705
2706 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2707 Int32Array::from_iter_values(1..=100),
2708 )])
2709 .unwrap();
2710
2711 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2712 Int32Array::from_iter_values(101..=200),
2713 )])
2714 .unwrap();
2715
2716 let props = WriterProperties::builder()
2718 .set_compression(Compression::SNAPPY)
2719 .set_max_row_group_size(100)
2720 .build();
2721
2722 let file = File::create(&data_file_path).unwrap();
2723 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2724 writer.write(&batch1).expect("Writing batch 1");
2725 writer.write(&batch2).expect("Writing batch 2");
2726 writer.close().unwrap();
2727
2728 let verify_file = File::open(&data_file_path).unwrap();
2730 let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2731 assert_eq!(
2732 verify_reader.metadata().num_row_groups(),
2733 2,
2734 "Should have 2 row groups"
2735 );
2736
2737 let delete_file_path = format!("{table_location}/deletes.parquet");
2739
2740 let delete_schema = Arc::new(ArrowSchema::new(vec![
2741 Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2742 PARQUET_FIELD_ID_META_KEY.to_string(),
2743 FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2744 )])),
2745 Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2746 PARQUET_FIELD_ID_META_KEY.to_string(),
2747 FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2748 )])),
2749 ]));
2750
2751 let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2753 Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2754 Arc::new(Int64Array::from_iter_values(vec![199i64])),
2755 ])
2756 .unwrap();
2757
2758 let delete_props = WriterProperties::builder()
2759 .set_compression(Compression::SNAPPY)
2760 .build();
2761
2762 let delete_file = File::create(&delete_file_path).unwrap();
2763 let mut delete_writer =
2764 ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2765 delete_writer.write(&delete_batch).unwrap();
2766 delete_writer.close().unwrap();
2767
2768 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2770 let reader = ArrowReaderBuilder::new(file_io).build();
2771
2772 let task = FileScanTask {
2773 file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(),
2774 start: 0,
2775 length: 0,
2776 record_count: Some(200),
2777 data_file_path: data_file_path.clone(),
2778 data_file_format: DataFileFormat::Parquet,
2779 schema: table_schema.clone(),
2780 project_field_ids: vec![1],
2781 predicate: None,
2782 deletes: vec![FileScanTaskDeleteFile {
2783 file_size_in_bytes: std::fs::metadata(&delete_file_path).unwrap().len(),
2784 file_path: delete_file_path,
2785 file_type: DataContentType::PositionDeletes,
2786 partition_spec_id: 0,
2787 equality_ids: None,
2788 }],
2789 partition: None,
2790 partition_spec: None,
2791 name_mapping: None,
2792 case_sensitive: false,
2793 };
2794
2795 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
2796 let result = reader
2797 .read(tasks)
2798 .unwrap()
2799 .try_collect::<Vec<RecordBatch>>()
2800 .await
2801 .unwrap();
2802
2803 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
2805
2806 println!("Total rows read: {total_rows}");
2807 println!("Expected: 199 rows (deleted row 199 which had id=200)");
2808
2809 assert_eq!(
2811 total_rows, 199,
2812 "Expected 199 rows after deleting row 199, but got {total_rows} rows. \
2813 The bug causes position deletes in later row groups to be ignored."
2814 );
2815
2816 let all_ids: Vec<i32> = result
2818 .iter()
2819 .flat_map(|batch| {
2820 batch
2821 .column(0)
2822 .as_primitive::<arrow_array::types::Int32Type>()
2823 .values()
2824 .iter()
2825 .copied()
2826 })
2827 .collect();
2828
2829 assert!(
2830 !all_ids.contains(&200),
2831 "Row with id=200 should be deleted but was found in results"
2832 );
2833
2834 let expected_ids: Vec<i32> = (1..=199).collect();
2836 assert_eq!(
2837 all_ids, expected_ids,
2838 "Should have ids 1-199 but got different values"
2839 );
2840 }
2841
2842 #[tokio::test]
2868 async fn test_position_delete_with_row_group_selection() {
2869 use arrow_array::{Int32Array, Int64Array};
2870 use parquet::file::reader::{FileReader, SerializedFileReader};
2871
2872 const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2874 const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2875
2876 let tmp_dir = TempDir::new().unwrap();
2877 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2878
2879 let table_schema = Arc::new(
2881 Schema::builder()
2882 .with_schema_id(1)
2883 .with_fields(vec![
2884 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2885 ])
2886 .build()
2887 .unwrap(),
2888 );
2889
2890 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2891 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2892 PARQUET_FIELD_ID_META_KEY.to_string(),
2893 "1".to_string(),
2894 )])),
2895 ]));
2896
2897 let data_file_path = format!("{table_location}/data.parquet");
2901
2902 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2903 Int32Array::from_iter_values(1..=100),
2904 )])
2905 .unwrap();
2906
2907 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2908 Int32Array::from_iter_values(101..=200),
2909 )])
2910 .unwrap();
2911
2912 let props = WriterProperties::builder()
2914 .set_compression(Compression::SNAPPY)
2915 .set_max_row_group_size(100)
2916 .build();
2917
2918 let file = File::create(&data_file_path).unwrap();
2919 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2920 writer.write(&batch1).expect("Writing batch 1");
2921 writer.write(&batch2).expect("Writing batch 2");
2922 writer.close().unwrap();
2923
2924 let verify_file = File::open(&data_file_path).unwrap();
2926 let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2927 assert_eq!(
2928 verify_reader.metadata().num_row_groups(),
2929 2,
2930 "Should have 2 row groups"
2931 );
2932
2933 let delete_file_path = format!("{table_location}/deletes.parquet");
2935
2936 let delete_schema = Arc::new(ArrowSchema::new(vec![
2937 Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2938 PARQUET_FIELD_ID_META_KEY.to_string(),
2939 FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2940 )])),
2941 Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2942 PARQUET_FIELD_ID_META_KEY.to_string(),
2943 FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2944 )])),
2945 ]));
2946
2947 let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2949 Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2950 Arc::new(Int64Array::from_iter_values(vec![199i64])),
2951 ])
2952 .unwrap();
2953
2954 let delete_props = WriterProperties::builder()
2955 .set_compression(Compression::SNAPPY)
2956 .build();
2957
2958 let delete_file = File::create(&delete_file_path).unwrap();
2959 let mut delete_writer =
2960 ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2961 delete_writer.write(&delete_batch).unwrap();
2962 delete_writer.close().unwrap();
2963
2964 let metadata_file = File::open(&data_file_path).unwrap();
2967 let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
2968 let metadata = metadata_reader.metadata();
2969
2970 let row_group_0 = metadata.row_group(0);
2971 let row_group_1 = metadata.row_group(1);
2972
2973 let rg0_start = 4u64; let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2975 let rg1_length = row_group_1.compressed_size() as u64;
2976
2977 println!(
2978 "Row group 0: starts at byte {}, {} bytes compressed",
2979 rg0_start,
2980 row_group_0.compressed_size()
2981 );
2982 println!(
2983 "Row group 1: starts at byte {}, {} bytes compressed",
2984 rg1_start,
2985 row_group_1.compressed_size()
2986 );
2987
2988 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2989 let reader = ArrowReaderBuilder::new(file_io).build();
2990
2991 let task = FileScanTask {
2993 file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(),
2994 start: rg1_start,
2995 length: rg1_length,
2996 record_count: Some(100), data_file_path: data_file_path.clone(),
2998 data_file_format: DataFileFormat::Parquet,
2999 schema: table_schema.clone(),
3000 project_field_ids: vec![1],
3001 predicate: None,
3002 deletes: vec![FileScanTaskDeleteFile {
3003 file_size_in_bytes: std::fs::metadata(&delete_file_path).unwrap().len(),
3004 file_path: delete_file_path,
3005 file_type: DataContentType::PositionDeletes,
3006 partition_spec_id: 0,
3007 equality_ids: None,
3008 }],
3009 partition: None,
3010 partition_spec: None,
3011 name_mapping: None,
3012 case_sensitive: false,
3013 };
3014
3015 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
3016 let result = reader
3017 .read(tasks)
3018 .unwrap()
3019 .try_collect::<Vec<RecordBatch>>()
3020 .await
3021 .unwrap();
3022
3023 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
3026
3027 println!("Total rows read from row group 1: {total_rows}");
3028 println!("Expected: 99 rows (row group 1 has 100 rows, 1 delete at position 199)");
3029
3030 assert_eq!(
3032 total_rows, 99,
3033 "Expected 99 rows from row group 1 after deleting position 199, but got {total_rows} rows. \
3034 The bug causes position deletes to be lost when advance_to() is followed by next() \
3035 when skipping unselected row groups."
3036 );
3037
3038 let all_ids: Vec<i32> = result
3040 .iter()
3041 .flat_map(|batch| {
3042 batch
3043 .column(0)
3044 .as_primitive::<arrow_array::types::Int32Type>()
3045 .values()
3046 .iter()
3047 .copied()
3048 })
3049 .collect();
3050
3051 assert!(
3052 !all_ids.contains(&200),
3053 "Row with id=200 should be deleted but was found in results"
3054 );
3055
3056 let expected_ids: Vec<i32> = (101..=199).collect();
3058 assert_eq!(
3059 all_ids, expected_ids,
3060 "Should have ids 101-199 but got different values"
3061 );
3062 }
3063 #[tokio::test]
3092 async fn test_position_delete_in_skipped_row_group() {
3093 use arrow_array::{Int32Array, Int64Array};
3094 use parquet::file::reader::{FileReader, SerializedFileReader};
3095
3096 const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
3098 const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
3099
3100 let tmp_dir = TempDir::new().unwrap();
3101 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3102
3103 let table_schema = Arc::new(
3105 Schema::builder()
3106 .with_schema_id(1)
3107 .with_fields(vec![
3108 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3109 ])
3110 .build()
3111 .unwrap(),
3112 );
3113
3114 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3115 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3116 PARQUET_FIELD_ID_META_KEY.to_string(),
3117 "1".to_string(),
3118 )])),
3119 ]));
3120
3121 let data_file_path = format!("{table_location}/data.parquet");
3125
3126 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3127 Int32Array::from_iter_values(1..=100),
3128 )])
3129 .unwrap();
3130
3131 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3132 Int32Array::from_iter_values(101..=200),
3133 )])
3134 .unwrap();
3135
3136 let props = WriterProperties::builder()
3138 .set_compression(Compression::SNAPPY)
3139 .set_max_row_group_size(100)
3140 .build();
3141
3142 let file = File::create(&data_file_path).unwrap();
3143 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3144 writer.write(&batch1).expect("Writing batch 1");
3145 writer.write(&batch2).expect("Writing batch 2");
3146 writer.close().unwrap();
3147
3148 let verify_file = File::open(&data_file_path).unwrap();
3150 let verify_reader = SerializedFileReader::new(verify_file).unwrap();
3151 assert_eq!(
3152 verify_reader.metadata().num_row_groups(),
3153 2,
3154 "Should have 2 row groups"
3155 );
3156
3157 let delete_file_path = format!("{table_location}/deletes.parquet");
3159
3160 let delete_schema = Arc::new(ArrowSchema::new(vec![
3161 Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
3162 PARQUET_FIELD_ID_META_KEY.to_string(),
3163 FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
3164 )])),
3165 Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
3166 PARQUET_FIELD_ID_META_KEY.to_string(),
3167 FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
3168 )])),
3169 ]));
3170
3171 let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
3173 Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
3174 Arc::new(Int64Array::from_iter_values(vec![0i64])),
3175 ])
3176 .unwrap();
3177
3178 let delete_props = WriterProperties::builder()
3179 .set_compression(Compression::SNAPPY)
3180 .build();
3181
3182 let delete_file = File::create(&delete_file_path).unwrap();
3183 let mut delete_writer =
3184 ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
3185 delete_writer.write(&delete_batch).unwrap();
3186 delete_writer.close().unwrap();
3187
3188 let metadata_file = File::open(&data_file_path).unwrap();
3191 let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
3192 let metadata = metadata_reader.metadata();
3193
3194 let row_group_0 = metadata.row_group(0);
3195 let row_group_1 = metadata.row_group(1);
3196
3197 let rg0_start = 4u64; let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
3199 let rg1_length = row_group_1.compressed_size() as u64;
3200
3201 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3202 let reader = ArrowReaderBuilder::new(file_io).build();
3203
3204 let task = FileScanTask {
3206 file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(),
3207 start: rg1_start,
3208 length: rg1_length,
3209 record_count: Some(100), data_file_path: data_file_path.clone(),
3211 data_file_format: DataFileFormat::Parquet,
3212 schema: table_schema.clone(),
3213 project_field_ids: vec![1],
3214 predicate: None,
3215 deletes: vec![FileScanTaskDeleteFile {
3216 file_size_in_bytes: std::fs::metadata(&delete_file_path).unwrap().len(),
3217 file_path: delete_file_path,
3218 file_type: DataContentType::PositionDeletes,
3219 partition_spec_id: 0,
3220 equality_ids: None,
3221 }],
3222 partition: None,
3223 partition_spec: None,
3224 name_mapping: None,
3225 case_sensitive: false,
3226 };
3227
3228 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
3229 let result = reader
3230 .read(tasks)
3231 .unwrap()
3232 .try_collect::<Vec<RecordBatch>>()
3233 .await
3234 .unwrap();
3235
3236 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
3239
3240 assert_eq!(
3241 total_rows, 100,
3242 "Expected 100 rows from row group 1 (delete at position 0 is in skipped row group 0). \
3243 If this hangs or fails, it indicates the cached delete index was not updated after advance_to()."
3244 );
3245
3246 let all_ids: Vec<i32> = result
3248 .iter()
3249 .flat_map(|batch| {
3250 batch
3251 .column(0)
3252 .as_primitive::<arrow_array::types::Int32Type>()
3253 .values()
3254 .iter()
3255 .copied()
3256 })
3257 .collect();
3258
3259 let expected_ids: Vec<i32> = (101..=200).collect();
3260 assert_eq!(
3261 all_ids, expected_ids,
3262 "Should have ids 101-200 (all of row group 1)"
3263 );
3264 }
3265
3266 #[tokio::test]
3272 async fn test_read_parquet_file_without_field_ids() {
3273 let schema = Arc::new(
3274 Schema::builder()
3275 .with_schema_id(1)
3276 .with_fields(vec![
3277 NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3278 NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3279 ])
3280 .build()
3281 .unwrap(),
3282 );
3283
3284 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3286 Field::new("name", DataType::Utf8, false),
3287 Field::new("age", DataType::Int32, false),
3288 ]));
3289
3290 let tmp_dir = TempDir::new().unwrap();
3291 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3292 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3293
3294 let name_data = vec!["Alice", "Bob", "Charlie"];
3295 let age_data = vec![30, 25, 35];
3296
3297 use arrow_array::Int32Array;
3298 let name_col = Arc::new(StringArray::from(name_data.clone())) as ArrayRef;
3299 let age_col = Arc::new(Int32Array::from(age_data.clone())) as ArrayRef;
3300
3301 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![name_col, age_col]).unwrap();
3302
3303 let props = WriterProperties::builder()
3304 .set_compression(Compression::SNAPPY)
3305 .build();
3306
3307 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3308 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3309
3310 writer.write(&to_write).expect("Writing batch");
3311 writer.close().unwrap();
3312
3313 let reader = ArrowReaderBuilder::new(file_io).build();
3314
3315 let tasks = Box::pin(futures::stream::iter(
3316 vec![Ok(FileScanTask {
3317 file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3318 .unwrap()
3319 .len(),
3320 start: 0,
3321 length: 0,
3322 record_count: None,
3323 data_file_path: format!("{table_location}/1.parquet"),
3324 data_file_format: DataFileFormat::Parquet,
3325 schema: schema.clone(),
3326 project_field_ids: vec![1, 2],
3327 predicate: None,
3328 deletes: vec![],
3329 partition: None,
3330 partition_spec: None,
3331 name_mapping: None,
3332 case_sensitive: false,
3333 })]
3334 .into_iter(),
3335 )) as FileScanTaskStream;
3336
3337 let result = reader
3338 .read(tasks)
3339 .unwrap()
3340 .try_collect::<Vec<RecordBatch>>()
3341 .await
3342 .unwrap();
3343
3344 assert_eq!(result.len(), 1);
3345 let batch = &result[0];
3346 assert_eq!(batch.num_rows(), 3);
3347 assert_eq!(batch.num_columns(), 2);
3348
3349 let name_array = batch.column(0).as_string::<i32>();
3351 assert_eq!(name_array.value(0), "Alice");
3352 assert_eq!(name_array.value(1), "Bob");
3353 assert_eq!(name_array.value(2), "Charlie");
3354
3355 let age_array = batch
3356 .column(1)
3357 .as_primitive::<arrow_array::types::Int32Type>();
3358 assert_eq!(age_array.value(0), 30);
3359 assert_eq!(age_array.value(1), 25);
3360 assert_eq!(age_array.value(2), 35);
3361 }
3362
3363 #[tokio::test]
3367 async fn test_read_parquet_without_field_ids_partial_projection() {
3368 use arrow_array::Int32Array;
3369
3370 let schema = Arc::new(
3371 Schema::builder()
3372 .with_schema_id(1)
3373 .with_fields(vec![
3374 NestedField::required(1, "col1", Type::Primitive(PrimitiveType::String)).into(),
3375 NestedField::required(2, "col2", Type::Primitive(PrimitiveType::Int)).into(),
3376 NestedField::required(3, "col3", Type::Primitive(PrimitiveType::String)).into(),
3377 NestedField::required(4, "col4", Type::Primitive(PrimitiveType::Int)).into(),
3378 ])
3379 .build()
3380 .unwrap(),
3381 );
3382
3383 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3384 Field::new("col1", DataType::Utf8, false),
3385 Field::new("col2", DataType::Int32, false),
3386 Field::new("col3", DataType::Utf8, false),
3387 Field::new("col4", DataType::Int32, false),
3388 ]));
3389
3390 let tmp_dir = TempDir::new().unwrap();
3391 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3392 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3393
3394 let col1_data = Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef;
3395 let col2_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3396 let col3_data = Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef;
3397 let col4_data = Arc::new(Int32Array::from(vec![30, 40])) as ArrayRef;
3398
3399 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
3400 col1_data, col2_data, col3_data, col4_data,
3401 ])
3402 .unwrap();
3403
3404 let props = WriterProperties::builder()
3405 .set_compression(Compression::SNAPPY)
3406 .build();
3407
3408 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3409 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3410
3411 writer.write(&to_write).expect("Writing batch");
3412 writer.close().unwrap();
3413
3414 let reader = ArrowReaderBuilder::new(file_io).build();
3415
3416 let tasks = Box::pin(futures::stream::iter(
3417 vec![Ok(FileScanTask {
3418 file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3419 .unwrap()
3420 .len(),
3421 start: 0,
3422 length: 0,
3423 record_count: None,
3424 data_file_path: format!("{table_location}/1.parquet"),
3425 data_file_format: DataFileFormat::Parquet,
3426 schema: schema.clone(),
3427 project_field_ids: vec![1, 3],
3428 predicate: None,
3429 deletes: vec![],
3430 partition: None,
3431 partition_spec: None,
3432 name_mapping: None,
3433 case_sensitive: false,
3434 })]
3435 .into_iter(),
3436 )) as FileScanTaskStream;
3437
3438 let result = reader
3439 .read(tasks)
3440 .unwrap()
3441 .try_collect::<Vec<RecordBatch>>()
3442 .await
3443 .unwrap();
3444
3445 assert_eq!(result.len(), 1);
3446 let batch = &result[0];
3447 assert_eq!(batch.num_rows(), 2);
3448 assert_eq!(batch.num_columns(), 2);
3449
3450 let col1_array = batch.column(0).as_string::<i32>();
3451 assert_eq!(col1_array.value(0), "a");
3452 assert_eq!(col1_array.value(1), "b");
3453
3454 let col3_array = batch.column(1).as_string::<i32>();
3455 assert_eq!(col3_array.value(0), "c");
3456 assert_eq!(col3_array.value(1), "d");
3457 }
3458
3459 #[tokio::test]
3463 async fn test_read_parquet_without_field_ids_schema_evolution() {
3464 use arrow_array::{Array, Int32Array};
3465
3466 let schema = Arc::new(
3468 Schema::builder()
3469 .with_schema_id(1)
3470 .with_fields(vec![
3471 NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3472 NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3473 NestedField::optional(3, "city", Type::Primitive(PrimitiveType::String)).into(),
3474 ])
3475 .build()
3476 .unwrap(),
3477 );
3478
3479 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3480 Field::new("name", DataType::Utf8, false),
3481 Field::new("age", DataType::Int32, false),
3482 ]));
3483
3484 let tmp_dir = TempDir::new().unwrap();
3485 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3486 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3487
3488 let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3489 let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3490
3491 let to_write =
3492 RecordBatch::try_new(arrow_schema.clone(), vec![name_data, age_data]).unwrap();
3493
3494 let props = WriterProperties::builder()
3495 .set_compression(Compression::SNAPPY)
3496 .build();
3497
3498 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3499 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3500
3501 writer.write(&to_write).expect("Writing batch");
3502 writer.close().unwrap();
3503
3504 let reader = ArrowReaderBuilder::new(file_io).build();
3505
3506 let tasks = Box::pin(futures::stream::iter(
3507 vec![Ok(FileScanTask {
3508 file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3509 .unwrap()
3510 .len(),
3511 start: 0,
3512 length: 0,
3513 record_count: None,
3514 data_file_path: format!("{table_location}/1.parquet"),
3515 data_file_format: DataFileFormat::Parquet,
3516 schema: schema.clone(),
3517 project_field_ids: vec![1, 2, 3],
3518 predicate: None,
3519 deletes: vec![],
3520 partition: None,
3521 partition_spec: None,
3522 name_mapping: None,
3523 case_sensitive: false,
3524 })]
3525 .into_iter(),
3526 )) as FileScanTaskStream;
3527
3528 let result = reader
3529 .read(tasks)
3530 .unwrap()
3531 .try_collect::<Vec<RecordBatch>>()
3532 .await
3533 .unwrap();
3534
3535 assert_eq!(result.len(), 1);
3536 let batch = &result[0];
3537 assert_eq!(batch.num_rows(), 2);
3538 assert_eq!(batch.num_columns(), 3);
3539
3540 let name_array = batch.column(0).as_string::<i32>();
3541 assert_eq!(name_array.value(0), "Alice");
3542 assert_eq!(name_array.value(1), "Bob");
3543
3544 let age_array = batch
3545 .column(1)
3546 .as_primitive::<arrow_array::types::Int32Type>();
3547 assert_eq!(age_array.value(0), 30);
3548 assert_eq!(age_array.value(1), 25);
3549
3550 let city_array = batch.column(2).as_string::<i32>();
3552 assert_eq!(city_array.null_count(), 2);
3553 assert!(city_array.is_null(0));
3554 assert!(city_array.is_null(1));
3555 }
3556
3557 #[tokio::test]
3560 async fn test_read_parquet_without_field_ids_multiple_row_groups() {
3561 use arrow_array::Int32Array;
3562
3563 let schema = Arc::new(
3564 Schema::builder()
3565 .with_schema_id(1)
3566 .with_fields(vec![
3567 NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3568 NestedField::required(2, "value", Type::Primitive(PrimitiveType::Int)).into(),
3569 ])
3570 .build()
3571 .unwrap(),
3572 );
3573
3574 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3575 Field::new("name", DataType::Utf8, false),
3576 Field::new("value", DataType::Int32, false),
3577 ]));
3578
3579 let tmp_dir = TempDir::new().unwrap();
3580 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3581 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3582
3583 let props = WriterProperties::builder()
3585 .set_compression(Compression::SNAPPY)
3586 .set_write_batch_size(2)
3587 .set_max_row_group_size(2)
3588 .build();
3589
3590 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3591 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3592
3593 for batch_num in 0..3 {
3595 let name_data = Arc::new(StringArray::from(vec![
3596 format!("name_{}", batch_num * 2),
3597 format!("name_{}", batch_num * 2 + 1),
3598 ])) as ArrayRef;
3599 let value_data =
3600 Arc::new(Int32Array::from(vec![batch_num * 2, batch_num * 2 + 1])) as ArrayRef;
3601
3602 let batch =
3603 RecordBatch::try_new(arrow_schema.clone(), vec![name_data, value_data]).unwrap();
3604 writer.write(&batch).expect("Writing batch");
3605 }
3606 writer.close().unwrap();
3607
3608 let reader = ArrowReaderBuilder::new(file_io).build();
3609
3610 let tasks = Box::pin(futures::stream::iter(
3611 vec![Ok(FileScanTask {
3612 file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3613 .unwrap()
3614 .len(),
3615 start: 0,
3616 length: 0,
3617 record_count: None,
3618 data_file_path: format!("{table_location}/1.parquet"),
3619 data_file_format: DataFileFormat::Parquet,
3620 schema: schema.clone(),
3621 project_field_ids: vec![1, 2],
3622 predicate: None,
3623 deletes: vec![],
3624 partition: None,
3625 partition_spec: None,
3626 name_mapping: None,
3627 case_sensitive: false,
3628 })]
3629 .into_iter(),
3630 )) as FileScanTaskStream;
3631
3632 let result = reader
3633 .read(tasks)
3634 .unwrap()
3635 .try_collect::<Vec<RecordBatch>>()
3636 .await
3637 .unwrap();
3638
3639 assert!(!result.is_empty());
3640
3641 let mut all_names = Vec::new();
3642 let mut all_values = Vec::new();
3643
3644 for batch in &result {
3645 let name_array = batch.column(0).as_string::<i32>();
3646 let value_array = batch
3647 .column(1)
3648 .as_primitive::<arrow_array::types::Int32Type>();
3649
3650 for i in 0..batch.num_rows() {
3651 all_names.push(name_array.value(i).to_string());
3652 all_values.push(value_array.value(i));
3653 }
3654 }
3655
3656 assert_eq!(all_names.len(), 6);
3657 assert_eq!(all_values.len(), 6);
3658
3659 for i in 0..6 {
3660 assert_eq!(all_names[i], format!("name_{i}"));
3661 assert_eq!(all_values[i], i as i32);
3662 }
3663 }
3664
3665 #[tokio::test]
3669 async fn test_read_parquet_without_field_ids_with_struct() {
3670 use arrow_array::{Int32Array, StructArray};
3671 use arrow_schema::Fields;
3672
3673 let schema = Arc::new(
3674 Schema::builder()
3675 .with_schema_id(1)
3676 .with_fields(vec![
3677 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3678 NestedField::required(
3679 2,
3680 "person",
3681 Type::Struct(crate::spec::StructType::new(vec![
3682 NestedField::required(
3683 3,
3684 "name",
3685 Type::Primitive(PrimitiveType::String),
3686 )
3687 .into(),
3688 NestedField::required(4, "age", Type::Primitive(PrimitiveType::Int))
3689 .into(),
3690 ])),
3691 )
3692 .into(),
3693 ])
3694 .build()
3695 .unwrap(),
3696 );
3697
3698 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3699 Field::new("id", DataType::Int32, false),
3700 Field::new(
3701 "person",
3702 DataType::Struct(Fields::from(vec![
3703 Field::new("name", DataType::Utf8, false),
3704 Field::new("age", DataType::Int32, false),
3705 ])),
3706 false,
3707 ),
3708 ]));
3709
3710 let tmp_dir = TempDir::new().unwrap();
3711 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3712 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3713
3714 let id_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3715 let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3716 let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3717 let person_data = Arc::new(StructArray::from(vec![
3718 (
3719 Arc::new(Field::new("name", DataType::Utf8, false)),
3720 name_data,
3721 ),
3722 (
3723 Arc::new(Field::new("age", DataType::Int32, false)),
3724 age_data,
3725 ),
3726 ])) as ArrayRef;
3727
3728 let to_write =
3729 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, person_data]).unwrap();
3730
3731 let props = WriterProperties::builder()
3732 .set_compression(Compression::SNAPPY)
3733 .build();
3734
3735 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3736 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3737
3738 writer.write(&to_write).expect("Writing batch");
3739 writer.close().unwrap();
3740
3741 let reader = ArrowReaderBuilder::new(file_io).build();
3742
3743 let tasks = Box::pin(futures::stream::iter(
3744 vec![Ok(FileScanTask {
3745 file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3746 .unwrap()
3747 .len(),
3748 start: 0,
3749 length: 0,
3750 record_count: None,
3751 data_file_path: format!("{table_location}/1.parquet"),
3752 data_file_format: DataFileFormat::Parquet,
3753 schema: schema.clone(),
3754 project_field_ids: vec![1, 2],
3755 predicate: None,
3756 deletes: vec![],
3757 partition: None,
3758 partition_spec: None,
3759 name_mapping: None,
3760 case_sensitive: false,
3761 })]
3762 .into_iter(),
3763 )) as FileScanTaskStream;
3764
3765 let result = reader
3766 .read(tasks)
3767 .unwrap()
3768 .try_collect::<Vec<RecordBatch>>()
3769 .await
3770 .unwrap();
3771
3772 assert_eq!(result.len(), 1);
3773 let batch = &result[0];
3774 assert_eq!(batch.num_rows(), 2);
3775 assert_eq!(batch.num_columns(), 2);
3776
3777 let id_array = batch
3778 .column(0)
3779 .as_primitive::<arrow_array::types::Int32Type>();
3780 assert_eq!(id_array.value(0), 1);
3781 assert_eq!(id_array.value(1), 2);
3782
3783 let person_array = batch.column(1).as_struct();
3784 assert_eq!(person_array.num_columns(), 2);
3785
3786 let name_array = person_array.column(0).as_string::<i32>();
3787 assert_eq!(name_array.value(0), "Alice");
3788 assert_eq!(name_array.value(1), "Bob");
3789
3790 let age_array = person_array
3791 .column(1)
3792 .as_primitive::<arrow_array::types::Int32Type>();
3793 assert_eq!(age_array.value(0), 30);
3794 assert_eq!(age_array.value(1), 25);
3795 }
3796
3797 #[tokio::test]
3801 async fn test_read_parquet_without_field_ids_schema_evolution_add_column_in_middle() {
3802 use arrow_array::{Array, Int32Array};
3803
3804 let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
3805 Field::new("col0", DataType::Int32, true),
3806 Field::new("col1", DataType::Int32, true),
3807 ]));
3808
3809 let schema = Arc::new(
3811 Schema::builder()
3812 .with_schema_id(1)
3813 .with_fields(vec![
3814 NestedField::optional(1, "col0", Type::Primitive(PrimitiveType::Int)).into(),
3815 NestedField::optional(5, "newCol", Type::Primitive(PrimitiveType::Int)).into(),
3816 NestedField::optional(2, "col1", Type::Primitive(PrimitiveType::Int)).into(),
3817 ])
3818 .build()
3819 .unwrap(),
3820 );
3821
3822 let tmp_dir = TempDir::new().unwrap();
3823 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3824 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3825
3826 let col0_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3827 let col1_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3828
3829 let to_write =
3830 RecordBatch::try_new(arrow_schema_old.clone(), vec![col0_data, col1_data]).unwrap();
3831
3832 let props = WriterProperties::builder()
3833 .set_compression(Compression::SNAPPY)
3834 .build();
3835
3836 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3837 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3838 writer.write(&to_write).expect("Writing batch");
3839 writer.close().unwrap();
3840
3841 let reader = ArrowReaderBuilder::new(file_io).build();
3842
3843 let tasks = Box::pin(futures::stream::iter(
3844 vec![Ok(FileScanTask {
3845 file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3846 .unwrap()
3847 .len(),
3848 start: 0,
3849 length: 0,
3850 record_count: None,
3851 data_file_path: format!("{table_location}/1.parquet"),
3852 data_file_format: DataFileFormat::Parquet,
3853 schema: schema.clone(),
3854 project_field_ids: vec![1, 5, 2],
3855 predicate: None,
3856 deletes: vec![],
3857 partition: None,
3858 partition_spec: None,
3859 name_mapping: None,
3860 case_sensitive: false,
3861 })]
3862 .into_iter(),
3863 )) as FileScanTaskStream;
3864
3865 let result = reader
3866 .read(tasks)
3867 .unwrap()
3868 .try_collect::<Vec<RecordBatch>>()
3869 .await
3870 .unwrap();
3871
3872 assert_eq!(result.len(), 1);
3873 let batch = &result[0];
3874 assert_eq!(batch.num_rows(), 2);
3875 assert_eq!(batch.num_columns(), 3);
3876
3877 let result_col0 = batch
3878 .column(0)
3879 .as_primitive::<arrow_array::types::Int32Type>();
3880 assert_eq!(result_col0.value(0), 1);
3881 assert_eq!(result_col0.value(1), 2);
3882
3883 let result_newcol = batch
3885 .column(1)
3886 .as_primitive::<arrow_array::types::Int32Type>();
3887 assert_eq!(result_newcol.null_count(), 2);
3888 assert!(result_newcol.is_null(0));
3889 assert!(result_newcol.is_null(1));
3890
3891 let result_col1 = batch
3892 .column(2)
3893 .as_primitive::<arrow_array::types::Int32Type>();
3894 assert_eq!(result_col1.value(0), 10);
3895 assert_eq!(result_col1.value(1), 20);
3896 }
3897
3898 #[tokio::test]
3902 async fn test_read_parquet_without_field_ids_filter_eliminates_all_rows() {
3903 use arrow_array::{Float64Array, Int32Array};
3904
3905 let schema = Arc::new(
3907 Schema::builder()
3908 .with_schema_id(1)
3909 .with_fields(vec![
3910 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3911 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3912 NestedField::required(3, "value", Type::Primitive(PrimitiveType::Double))
3913 .into(),
3914 ])
3915 .build()
3916 .unwrap(),
3917 );
3918
3919 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3920 Field::new("id", DataType::Int32, false),
3921 Field::new("name", DataType::Utf8, false),
3922 Field::new("value", DataType::Float64, false),
3923 ]));
3924
3925 let tmp_dir = TempDir::new().unwrap();
3926 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3927 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3928
3929 let id_data = Arc::new(Int32Array::from(vec![10, 11, 12])) as ArrayRef;
3931 let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
3932 let value_data = Arc::new(Float64Array::from(vec![100.0, 200.0, 300.0])) as ArrayRef;
3933
3934 let to_write =
3935 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data, value_data])
3936 .unwrap();
3937
3938 let props = WriterProperties::builder()
3939 .set_compression(Compression::SNAPPY)
3940 .build();
3941
3942 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3943 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3944 writer.write(&to_write).expect("Writing batch");
3945 writer.close().unwrap();
3946
3947 let predicate = Reference::new("id").less_than(Datum::int(5));
3949
3950 let reader = ArrowReaderBuilder::new(file_io)
3952 .with_row_group_filtering_enabled(true)
3953 .with_row_selection_enabled(true)
3954 .build();
3955
3956 let tasks = Box::pin(futures::stream::iter(
3957 vec![Ok(FileScanTask {
3958 file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3959 .unwrap()
3960 .len(),
3961 start: 0,
3962 length: 0,
3963 record_count: None,
3964 data_file_path: format!("{table_location}/1.parquet"),
3965 data_file_format: DataFileFormat::Parquet,
3966 schema: schema.clone(),
3967 project_field_ids: vec![1, 2, 3],
3968 predicate: Some(predicate.bind(schema, true).unwrap()),
3969 deletes: vec![],
3970 partition: None,
3971 partition_spec: None,
3972 name_mapping: None,
3973 case_sensitive: false,
3974 })]
3975 .into_iter(),
3976 )) as FileScanTaskStream;
3977
3978 let result = reader
3980 .read(tasks)
3981 .unwrap()
3982 .try_collect::<Vec<RecordBatch>>()
3983 .await
3984 .unwrap();
3985
3986 assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0));
3988 }
3989
3990 #[tokio::test]
3993 async fn test_read_with_concurrency_one() {
3994 use arrow_array::Int32Array;
3995
3996 let schema = Arc::new(
3997 Schema::builder()
3998 .with_schema_id(1)
3999 .with_fields(vec![
4000 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
4001 NestedField::required(2, "file_num", Type::Primitive(PrimitiveType::Int))
4002 .into(),
4003 ])
4004 .build()
4005 .unwrap(),
4006 );
4007
4008 let arrow_schema = Arc::new(ArrowSchema::new(vec![
4009 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
4010 PARQUET_FIELD_ID_META_KEY.to_string(),
4011 "1".to_string(),
4012 )])),
4013 Field::new("file_num", DataType::Int32, false).with_metadata(HashMap::from([(
4014 PARQUET_FIELD_ID_META_KEY.to_string(),
4015 "2".to_string(),
4016 )])),
4017 ]));
4018
4019 let tmp_dir = TempDir::new().unwrap();
4020 let table_location = tmp_dir.path().to_str().unwrap().to_string();
4021 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
4022
4023 let props = WriterProperties::builder()
4025 .set_compression(Compression::SNAPPY)
4026 .build();
4027
4028 for file_num in 0..3 {
4029 let id_data = Arc::new(Int32Array::from_iter_values(
4030 file_num * 10..(file_num + 1) * 10,
4031 )) as ArrayRef;
4032 let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10])) as ArrayRef;
4033
4034 let to_write =
4035 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, file_num_data]).unwrap();
4036
4037 let file = File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap();
4038 let mut writer =
4039 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
4040 writer.write(&to_write).expect("Writing batch");
4041 writer.close().unwrap();
4042 }
4043
4044 let reader = ArrowReaderBuilder::new(file_io)
4046 .with_data_file_concurrency_limit(1)
4047 .build();
4048
4049 let tasks = vec![
4051 Ok(FileScanTask {
4052 file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_0.parquet"))
4053 .unwrap()
4054 .len(),
4055 start: 0,
4056 length: 0,
4057 record_count: None,
4058 data_file_path: format!("{table_location}/file_0.parquet"),
4059 data_file_format: DataFileFormat::Parquet,
4060 schema: schema.clone(),
4061 project_field_ids: vec![1, 2],
4062 predicate: None,
4063 deletes: vec![],
4064 partition: None,
4065 partition_spec: None,
4066 name_mapping: None,
4067 case_sensitive: false,
4068 }),
4069 Ok(FileScanTask {
4070 file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_1.parquet"))
4071 .unwrap()
4072 .len(),
4073 start: 0,
4074 length: 0,
4075 record_count: None,
4076 data_file_path: format!("{table_location}/file_1.parquet"),
4077 data_file_format: DataFileFormat::Parquet,
4078 schema: schema.clone(),
4079 project_field_ids: vec![1, 2],
4080 predicate: None,
4081 deletes: vec![],
4082 partition: None,
4083 partition_spec: None,
4084 name_mapping: None,
4085 case_sensitive: false,
4086 }),
4087 Ok(FileScanTask {
4088 file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_2.parquet"))
4089 .unwrap()
4090 .len(),
4091 start: 0,
4092 length: 0,
4093 record_count: None,
4094 data_file_path: format!("{table_location}/file_2.parquet"),
4095 data_file_format: DataFileFormat::Parquet,
4096 schema: schema.clone(),
4097 project_field_ids: vec![1, 2],
4098 predicate: None,
4099 deletes: vec![],
4100 partition: None,
4101 partition_spec: None,
4102 name_mapping: None,
4103 case_sensitive: false,
4104 }),
4105 ];
4106
4107 let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream;
4108
4109 let result = reader
4110 .read(tasks_stream)
4111 .unwrap()
4112 .try_collect::<Vec<RecordBatch>>()
4113 .await
4114 .unwrap();
4115
4116 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
4118 assert_eq!(total_rows, 30, "Should have 30 total rows");
4119
4120 let mut all_ids = Vec::new();
4122 let mut all_file_nums = Vec::new();
4123
4124 for batch in &result {
4125 let id_col = batch
4126 .column(0)
4127 .as_primitive::<arrow_array::types::Int32Type>();
4128 let file_num_col = batch
4129 .column(1)
4130 .as_primitive::<arrow_array::types::Int32Type>();
4131
4132 for i in 0..batch.num_rows() {
4133 all_ids.push(id_col.value(i));
4134 all_file_nums.push(file_num_col.value(i));
4135 }
4136 }
4137
4138 assert_eq!(all_ids.len(), 30);
4139 assert_eq!(all_file_nums.len(), 30);
4140
4141 for i in 0..10 {
4146 assert_eq!(all_file_nums[i], 0, "First 10 rows should be from file_0");
4147 assert_eq!(all_ids[i], i as i32, "IDs should be 0-9");
4148 }
4149 for i in 10..20 {
4150 assert_eq!(all_file_nums[i], 1, "Next 10 rows should be from file_1");
4151 assert_eq!(all_ids[i], i as i32, "IDs should be 10-19");
4152 }
4153 for i in 20..30 {
4154 assert_eq!(all_file_nums[i], 2, "Last 10 rows should be from file_2");
4155 assert_eq!(all_ids[i], i as i32, "IDs should be 20-29");
4156 }
4157 }
4158
4159 #[tokio::test]
4204 async fn test_bucket_partitioning_reads_source_column_from_file() {
4205 use arrow_array::Int32Array;
4206
4207 use crate::spec::{Literal, PartitionSpec, Struct, Transform};
4208
4209 let schema = Arc::new(
4211 Schema::builder()
4212 .with_schema_id(0)
4213 .with_fields(vec![
4214 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
4215 NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
4216 ])
4217 .build()
4218 .unwrap(),
4219 );
4220
4221 let partition_spec = Arc::new(
4223 PartitionSpec::builder(schema.clone())
4224 .with_spec_id(0)
4225 .add_partition_field("id", "id_bucket", Transform::Bucket(4))
4226 .unwrap()
4227 .build()
4228 .unwrap(),
4229 );
4230
4231 let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]);
4233
4234 let arrow_schema = Arc::new(ArrowSchema::new(vec![
4236 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
4237 PARQUET_FIELD_ID_META_KEY.to_string(),
4238 "1".to_string(),
4239 )])),
4240 Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
4241 PARQUET_FIELD_ID_META_KEY.to_string(),
4242 "2".to_string(),
4243 )])),
4244 ]));
4245
4246 let tmp_dir = TempDir::new().unwrap();
4248 let table_location = tmp_dir.path().to_str().unwrap().to_string();
4249 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
4250
4251 let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as ArrayRef;
4252 let name_data =
4253 Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef;
4254
4255 let to_write =
4256 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data]).unwrap();
4257
4258 let props = WriterProperties::builder()
4259 .set_compression(Compression::SNAPPY)
4260 .build();
4261 let file = File::create(format!("{}/data.parquet", &table_location)).unwrap();
4262 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
4263 writer.write(&to_write).expect("Writing batch");
4264 writer.close().unwrap();
4265
4266 let reader = ArrowReaderBuilder::new(file_io).build();
4268 let tasks = Box::pin(futures::stream::iter(
4269 vec![Ok(FileScanTask {
4270 file_size_in_bytes: std::fs::metadata(format!("{table_location}/data.parquet"))
4271 .unwrap()
4272 .len(),
4273 start: 0,
4274 length: 0,
4275 record_count: None,
4276 data_file_path: format!("{table_location}/data.parquet"),
4277 data_file_format: DataFileFormat::Parquet,
4278 schema: schema.clone(),
4279 project_field_ids: vec![1, 2],
4280 predicate: None,
4281 deletes: vec![],
4282 partition: Some(partition_data),
4283 partition_spec: Some(partition_spec),
4284 name_mapping: None,
4285 case_sensitive: false,
4286 })]
4287 .into_iter(),
4288 )) as FileScanTaskStream;
4289
4290 let result = reader
4291 .read(tasks)
4292 .unwrap()
4293 .try_collect::<Vec<RecordBatch>>()
4294 .await
4295 .unwrap();
4296
4297 assert_eq!(result.len(), 1);
4299 let batch = &result[0];
4300
4301 assert_eq!(batch.num_columns(), 2);
4302 assert_eq!(batch.num_rows(), 4);
4303
4304 let id_col = batch
4307 .column(0)
4308 .as_primitive::<arrow_array::types::Int32Type>();
4309 assert_eq!(id_col.value(0), 1);
4310 assert_eq!(id_col.value(1), 5);
4311 assert_eq!(id_col.value(2), 9);
4312 assert_eq!(id_col.value(3), 13);
4313
4314 let name_col = batch.column(1).as_string::<i32>();
4315 assert_eq!(name_col.value(0), "Alice");
4316 assert_eq!(name_col.value(1), "Bob");
4317 assert_eq!(name_col.value(2), "Charlie");
4318 assert_eq!(name_col.value(3), "Dave");
4319 }
4320}