1use std::collections::{HashMap, HashSet};
21use std::ops::Range;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene};
26use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar};
27use arrow_cast::cast::cast;
28use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
29use arrow_schema::{
30 ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
31};
32use arrow_string::like::starts_with;
33use bytes::Bytes;
34use fnv::FnvHashSet;
35use futures::future::BoxFuture;
36use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
37use parquet::arrow::arrow_reader::{
38 ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
39};
40use parquet::arrow::async_reader::AsyncFileReader;
41use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask};
42use parquet::file::metadata::{
43 PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
44};
45use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
46use typed_builder::TypedBuilder;
47
48use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
49use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
50use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
51use crate::delete_vector::DeleteVector;
52use crate::error::Result;
53use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit};
54use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
55use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
56use crate::expr::{BoundPredicate, BoundReference};
57use crate::io::{FileIO, FileMetadata, FileRead};
58use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
59use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
60use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type};
61use crate::utils::available_parallelism;
62use crate::{Error, ErrorKind};
63
64const DEFAULT_RANGE_COALESCE_BYTES: u64 = 1024 * 1024;
67
68const DEFAULT_RANGE_FETCH_CONCURRENCY: usize = 10;
71
72const DEFAULT_METADATA_SIZE_HINT: usize = 512 * 1024;
75
76#[derive(Clone, Copy, Debug, TypedBuilder)]
78#[builder(field_defaults(setter(prefix = "with_")))]
79pub(crate) struct ParquetReadOptions {
80 #[builder(default = Some(DEFAULT_METADATA_SIZE_HINT))]
87 pub(crate) metadata_size_hint: Option<usize>,
88 #[builder(default = DEFAULT_RANGE_COALESCE_BYTES)]
93 pub(crate) range_coalesce_bytes: u64,
94 #[builder(default = DEFAULT_RANGE_FETCH_CONCURRENCY)]
98 pub(crate) range_fetch_concurrency: usize,
99 #[builder(default = true)]
101 pub(crate) preload_column_index: bool,
102 #[builder(default = true)]
104 pub(crate) preload_offset_index: bool,
105 #[builder(default = false)]
107 pub(crate) preload_page_index: bool,
108}
109
110impl ParquetReadOptions {
111 pub(crate) fn metadata_size_hint(&self) -> Option<usize> {
112 self.metadata_size_hint
113 }
114
115 pub(crate) fn range_coalesce_bytes(&self) -> u64 {
116 self.range_coalesce_bytes
117 }
118
119 pub(crate) fn range_fetch_concurrency(&self) -> usize {
120 self.range_fetch_concurrency
121 }
122
123 pub(crate) fn preload_column_index(&self) -> bool {
124 self.preload_column_index
125 }
126
127 pub(crate) fn preload_offset_index(&self) -> bool {
128 self.preload_offset_index
129 }
130
131 pub(crate) fn preload_page_index(&self) -> bool {
132 self.preload_page_index
133 }
134}
135
136pub struct ArrowReaderBuilder {
138 batch_size: Option<usize>,
139 file_io: FileIO,
140 concurrency_limit_data_files: usize,
141 row_group_filtering_enabled: bool,
142 row_selection_enabled: bool,
143 parquet_read_options: ParquetReadOptions,
144}
145
146impl ArrowReaderBuilder {
147 pub fn new(file_io: FileIO) -> Self {
149 let num_cpus = available_parallelism().get();
150
151 ArrowReaderBuilder {
152 batch_size: None,
153 file_io,
154 concurrency_limit_data_files: num_cpus,
155 row_group_filtering_enabled: true,
156 row_selection_enabled: false,
157 parquet_read_options: ParquetReadOptions::builder().build(),
158 }
159 }
160
161 pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
163 self.concurrency_limit_data_files = val;
164 self
165 }
166
167 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
170 self.batch_size = Some(batch_size);
171 self
172 }
173
174 pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
176 self.row_group_filtering_enabled = row_group_filtering_enabled;
177 self
178 }
179
180 pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
182 self.row_selection_enabled = row_selection_enabled;
183 self
184 }
185
186 pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
191 self.parquet_read_options.metadata_size_hint = Some(metadata_size_hint);
192 self
193 }
194
195 pub fn with_range_coalesce_bytes(mut self, range_coalesce_bytes: u64) -> Self {
200 self.parquet_read_options.range_coalesce_bytes = range_coalesce_bytes;
201 self
202 }
203
204 pub fn with_range_fetch_concurrency(mut self, range_fetch_concurrency: usize) -> Self {
208 self.parquet_read_options.range_fetch_concurrency = range_fetch_concurrency;
209 self
210 }
211
212 pub fn build(self) -> ArrowReader {
214 ArrowReader {
215 batch_size: self.batch_size,
216 file_io: self.file_io.clone(),
217 delete_file_loader: CachingDeleteFileLoader::new(
218 self.file_io.clone(),
219 self.concurrency_limit_data_files,
220 ),
221 concurrency_limit_data_files: self.concurrency_limit_data_files,
222 row_group_filtering_enabled: self.row_group_filtering_enabled,
223 row_selection_enabled: self.row_selection_enabled,
224 parquet_read_options: self.parquet_read_options,
225 }
226 }
227}
228
229#[derive(Clone)]
231pub struct ArrowReader {
232 batch_size: Option<usize>,
233 file_io: FileIO,
234 delete_file_loader: CachingDeleteFileLoader,
235
236 concurrency_limit_data_files: usize,
238
239 row_group_filtering_enabled: bool,
240 row_selection_enabled: bool,
241 parquet_read_options: ParquetReadOptions,
242}
243
244impl ArrowReader {
245 pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
248 let file_io = self.file_io.clone();
249 let batch_size = self.batch_size;
250 let concurrency_limit_data_files = self.concurrency_limit_data_files;
251 let row_group_filtering_enabled = self.row_group_filtering_enabled;
252 let row_selection_enabled = self.row_selection_enabled;
253 let parquet_read_options = self.parquet_read_options;
254
255 let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 {
257 Box::pin(
258 tasks
259 .and_then(move |task| {
260 let file_io = file_io.clone();
261
262 Self::process_file_scan_task(
263 task,
264 batch_size,
265 file_io,
266 self.delete_file_loader.clone(),
267 row_group_filtering_enabled,
268 row_selection_enabled,
269 parquet_read_options,
270 )
271 })
272 .map_err(|err| {
273 Error::new(ErrorKind::Unexpected, "file scan task generate failed")
274 .with_source(err)
275 })
276 .try_flatten(),
277 )
278 } else {
279 Box::pin(
280 tasks
281 .map_ok(move |task| {
282 let file_io = file_io.clone();
283
284 Self::process_file_scan_task(
285 task,
286 batch_size,
287 file_io,
288 self.delete_file_loader.clone(),
289 row_group_filtering_enabled,
290 row_selection_enabled,
291 parquet_read_options,
292 )
293 })
294 .map_err(|err| {
295 Error::new(ErrorKind::Unexpected, "file scan task generate failed")
296 .with_source(err)
297 })
298 .try_buffer_unordered(concurrency_limit_data_files)
299 .try_flatten_unordered(concurrency_limit_data_files),
300 )
301 };
302
303 Ok(stream)
304 }
305
306 async fn process_file_scan_task(
307 task: FileScanTask,
308 batch_size: Option<usize>,
309 file_io: FileIO,
310 delete_file_loader: CachingDeleteFileLoader,
311 row_group_filtering_enabled: bool,
312 row_selection_enabled: bool,
313 parquet_read_options: ParquetReadOptions,
314 ) -> Result<ArrowRecordBatchStream> {
315 let should_load_page_index =
316 (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
317 let mut parquet_read_options = parquet_read_options;
318 parquet_read_options.preload_page_index = should_load_page_index;
319
320 let delete_filter_rx =
321 delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema));
322
323 let (parquet_file_reader, arrow_metadata) = Self::open_parquet_file(
325 &task.data_file_path,
326 &file_io,
327 task.file_size_in_bytes,
328 parquet_read_options,
329 )
330 .await?;
331
332 let missing_field_ids = arrow_metadata
336 .schema()
337 .fields()
338 .iter()
339 .next()
340 .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
341
342 let arrow_metadata = if missing_field_ids {
358 let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
360 apply_name_mapping_to_arrow_schema(
365 Arc::clone(arrow_metadata.schema()),
366 name_mapping,
367 )?
368 } else {
369 add_fallback_field_ids_to_arrow_schema(arrow_metadata.schema())
372 };
373
374 let options = ArrowReaderOptions::new().with_schema(arrow_schema);
375 ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
376 |e| {
377 Error::new(
378 ErrorKind::Unexpected,
379 "Failed to create ArrowReaderMetadata with field ID schema",
380 )
381 .with_source(e)
382 },
383 )?
384 } else {
385 arrow_metadata
387 };
388
389 let mut record_batch_stream_builder =
391 ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_file_reader, arrow_metadata);
392
393 let project_field_ids_without_metadata: Vec<i32> = task
395 .project_field_ids
396 .iter()
397 .filter(|&&id| !is_metadata_field(id))
398 .copied()
399 .collect();
400
401 let projection_mask = Self::get_arrow_projection_mask(
406 &project_field_ids_without_metadata,
407 &task.schema,
408 record_batch_stream_builder.parquet_schema(),
409 record_batch_stream_builder.schema(),
410 missing_field_ids, )?;
412
413 record_batch_stream_builder =
414 record_batch_stream_builder.with_projection(projection_mask.clone());
415
416 let mut record_batch_transformer_builder =
420 RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
421
422 if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
424 let file_datum = Datum::string(task.data_file_path.clone());
425 record_batch_transformer_builder =
426 record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum);
427 }
428
429 if let (Some(partition_spec), Some(partition_data)) =
430 (task.partition_spec.clone(), task.partition.clone())
431 {
432 record_batch_transformer_builder =
433 record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
434 }
435
436 let mut record_batch_transformer = record_batch_transformer_builder.build();
437
438 if let Some(batch_size) = batch_size {
439 record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
440 }
441
442 let delete_filter = delete_filter_rx.await.unwrap()?;
443 let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
444
445 let final_predicate = match (&task.predicate, delete_predicate) {
450 (None, None) => None,
451 (Some(predicate), None) => Some(predicate.clone()),
452 (None, Some(ref predicate)) => Some(predicate.clone()),
453 (Some(filter_predicate), Some(delete_predicate)) => {
454 Some(filter_predicate.clone().and(delete_predicate))
455 }
456 };
457
458 let mut selected_row_group_indices = None;
474 let mut row_selection = None;
475
476 if task.start != 0 || task.length != 0 {
479 let byte_range_filtered_row_groups = Self::filter_row_groups_by_byte_range(
480 record_batch_stream_builder.metadata(),
481 task.start,
482 task.length,
483 )?;
484 selected_row_group_indices = Some(byte_range_filtered_row_groups);
485 }
486
487 if let Some(predicate) = final_predicate {
488 let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
489 record_batch_stream_builder.parquet_schema(),
490 &predicate,
491 )?;
492
493 let row_filter = Self::get_row_filter(
494 &predicate,
495 record_batch_stream_builder.parquet_schema(),
496 &iceberg_field_ids,
497 &field_id_map,
498 )?;
499 record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
500
501 if row_group_filtering_enabled {
502 let predicate_filtered_row_groups = Self::get_selected_row_group_indices(
503 &predicate,
504 record_batch_stream_builder.metadata(),
505 &field_id_map,
506 &task.schema,
507 )?;
508
509 selected_row_group_indices = match selected_row_group_indices {
512 Some(byte_range_filtered) => {
513 let intersection: Vec<usize> = byte_range_filtered
515 .into_iter()
516 .filter(|idx| predicate_filtered_row_groups.contains(idx))
517 .collect();
518 Some(intersection)
519 }
520 None => Some(predicate_filtered_row_groups),
521 };
522 }
523
524 if row_selection_enabled {
525 row_selection = Some(Self::get_row_selection_for_filter_predicate(
526 &predicate,
527 record_batch_stream_builder.metadata(),
528 &selected_row_group_indices,
529 &field_id_map,
530 &task.schema,
531 )?);
532 }
533 }
534
535 let positional_delete_indexes = delete_filter.get_delete_vector(&task);
536
537 if let Some(positional_delete_indexes) = positional_delete_indexes {
538 let delete_row_selection = {
539 let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
540
541 Self::build_deletes_row_selection(
542 record_batch_stream_builder.metadata().row_groups(),
543 &selected_row_group_indices,
544 &positional_delete_indexes,
545 )
546 }?;
547
548 row_selection = match row_selection {
551 None => Some(delete_row_selection),
552 Some(filter_row_selection) => {
553 Some(filter_row_selection.intersection(&delete_row_selection))
554 }
555 };
556 }
557
558 if let Some(row_selection) = row_selection {
559 record_batch_stream_builder =
560 record_batch_stream_builder.with_row_selection(row_selection);
561 }
562
563 if let Some(selected_row_group_indices) = selected_row_group_indices {
564 record_batch_stream_builder =
565 record_batch_stream_builder.with_row_groups(selected_row_group_indices);
566 }
567
568 let record_batch_stream =
571 record_batch_stream_builder
572 .build()?
573 .map(move |batch| match batch {
574 Ok(batch) => {
575 record_batch_transformer.process_record_batch(batch)
577 }
578 Err(err) => Err(err.into()),
579 });
580
581 Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
582 }
583
584 pub(crate) async fn open_parquet_file(
588 data_file_path: &str,
589 file_io: &FileIO,
590 file_size_in_bytes: u64,
591 parquet_read_options: ParquetReadOptions,
592 ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
593 let parquet_file = file_io.new_input(data_file_path)?;
594 let parquet_reader = parquet_file.reader().await?;
595 let mut reader = ArrowFileReader::new(
596 FileMetadata {
597 size: file_size_in_bytes,
598 },
599 parquet_reader,
600 )
601 .with_parquet_read_options(parquet_read_options);
602
603 let arrow_metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
604 .await
605 .map_err(|e| {
606 Error::new(ErrorKind::Unexpected, "Failed to load Parquet metadata").with_source(e)
607 })?;
608
609 Ok((reader, arrow_metadata))
610 }
611
612 fn build_deletes_row_selection(
618 row_group_metadata_list: &[RowGroupMetaData],
619 selected_row_groups: &Option<Vec<usize>>,
620 positional_deletes: &DeleteVector,
621 ) -> Result<RowSelection> {
622 let mut results: Vec<RowSelector> = Vec::new();
623 let mut selected_row_groups_idx = 0;
624 let mut current_row_group_base_idx: u64 = 0;
625 let mut delete_vector_iter = positional_deletes.iter();
626 let mut next_deleted_row_idx_opt = delete_vector_iter.next();
627
628 for (idx, row_group_metadata) in row_group_metadata_list.iter().enumerate() {
629 let row_group_num_rows = row_group_metadata.num_rows() as u64;
630 let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;
631
632 if let Some(selected_row_groups) = selected_row_groups {
634 if selected_row_groups_idx == selected_row_groups.len() {
636 break;
637 }
638
639 if idx == selected_row_groups[selected_row_groups_idx] {
640 selected_row_groups_idx += 1;
644 } else {
645 delete_vector_iter.advance_to(next_row_group_base_idx);
650 if let Some(cached_idx) = next_deleted_row_idx_opt
652 && cached_idx < next_row_group_base_idx
653 {
654 next_deleted_row_idx_opt = delete_vector_iter.next();
655 }
656
657 current_row_group_base_idx += row_group_num_rows;
660 continue;
661 }
662 }
663
664 let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
665 Some(next_deleted_row_idx) => {
666 if next_deleted_row_idx >= next_row_group_base_idx {
669 results.push(RowSelector::select(row_group_num_rows as usize));
670 current_row_group_base_idx += row_group_num_rows;
671 continue;
672 }
673
674 next_deleted_row_idx
675 }
676
677 _ => {
679 results.push(RowSelector::select(row_group_num_rows as usize));
680 current_row_group_base_idx += row_group_num_rows;
681 continue;
682 }
683 };
684
685 let mut current_idx = current_row_group_base_idx;
686 'chunks: while next_deleted_row_idx < next_row_group_base_idx {
687 if current_idx < next_deleted_row_idx {
689 let run_length = next_deleted_row_idx - current_idx;
690 results.push(RowSelector::select(run_length as usize));
691 current_idx += run_length;
692 }
693
694 let mut run_length = 0;
696 while next_deleted_row_idx == current_idx
697 && next_deleted_row_idx < next_row_group_base_idx
698 {
699 run_length += 1;
700 current_idx += 1;
701
702 next_deleted_row_idx_opt = delete_vector_iter.next();
703 next_deleted_row_idx = match next_deleted_row_idx_opt {
704 Some(next_deleted_row_idx) => next_deleted_row_idx,
705 _ => {
706 results.push(RowSelector::skip(run_length));
710 break 'chunks;
711 }
712 };
713 }
714 if run_length > 0 {
715 results.push(RowSelector::skip(run_length));
716 }
717 }
718
719 if current_idx < next_row_group_base_idx {
720 results.push(RowSelector::select(
721 (next_row_group_base_idx - current_idx) as usize,
722 ));
723 }
724
725 current_row_group_base_idx += row_group_num_rows;
726 }
727
728 Ok(results.into())
729 }
730
731 fn build_field_id_set_and_map(
732 parquet_schema: &SchemaDescriptor,
733 predicate: &BoundPredicate,
734 ) -> Result<(HashSet<i32>, HashMap<i32, usize>)> {
735 let mut collector = CollectFieldIdVisitor {
737 field_ids: HashSet::default(),
738 };
739 visit(&mut collector, predicate)?;
740
741 let iceberg_field_ids = collector.field_ids();
742
743 let field_id_map = match build_field_id_map(parquet_schema)? {
745 Some(map) => map,
746 None => build_fallback_field_id_map(parquet_schema),
747 };
748
749 Ok((iceberg_field_ids, field_id_map))
750 }
751
752 fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
755 match field.field_type.as_ref() {
756 Type::Primitive(_) => {
757 field_ids.push(field.id);
758 }
759 Type::Struct(struct_type) => {
760 for nested_field in struct_type.fields() {
761 Self::include_leaf_field_id(nested_field, field_ids);
762 }
763 }
764 Type::List(list_type) => {
765 Self::include_leaf_field_id(&list_type.element_field, field_ids);
766 }
767 Type::Map(map_type) => {
768 Self::include_leaf_field_id(&map_type.key_field, field_ids);
769 Self::include_leaf_field_id(&map_type.value_field, field_ids);
770 }
771 }
772 }
773
774 fn get_arrow_projection_mask(
775 field_ids: &[i32],
776 iceberg_schema_of_task: &Schema,
777 parquet_schema: &SchemaDescriptor,
778 arrow_schema: &ArrowSchemaRef,
779 use_fallback: bool, ) -> Result<ProjectionMask> {
781 fn type_promotion_is_valid(
782 file_type: Option<&PrimitiveType>,
783 projected_type: Option<&PrimitiveType>,
784 ) -> bool {
785 match (file_type, projected_type) {
786 (Some(lhs), Some(rhs)) if lhs == rhs => true,
787 (Some(PrimitiveType::Int), Some(PrimitiveType::Long)) => true,
788 (Some(PrimitiveType::Float), Some(PrimitiveType::Double)) => true,
789 (
790 Some(PrimitiveType::Decimal {
791 precision: file_precision,
792 scale: file_scale,
793 }),
794 Some(PrimitiveType::Decimal {
795 precision: requested_precision,
796 scale: requested_scale,
797 }),
798 ) if requested_precision >= file_precision && file_scale == requested_scale => true,
799 (Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true,
801 _ => false,
802 }
803 }
804
805 if field_ids.is_empty() {
806 return Ok(ProjectionMask::all());
807 }
808
809 if use_fallback {
810 Self::get_arrow_projection_mask_fallback(field_ids, parquet_schema)
812 } else {
813 let mut leaf_field_ids = vec![];
817 for field_id in field_ids {
818 let field = iceberg_schema_of_task.field_by_id(*field_id);
819 if let Some(field) = field {
820 Self::include_leaf_field_id(field, &mut leaf_field_ids);
821 }
822 }
823
824 Self::get_arrow_projection_mask_with_field_ids(
825 &leaf_field_ids,
826 iceberg_schema_of_task,
827 parquet_schema,
828 arrow_schema,
829 type_promotion_is_valid,
830 )
831 }
832 }
833
834 fn get_arrow_projection_mask_with_field_ids(
837 leaf_field_ids: &[i32],
838 iceberg_schema_of_task: &Schema,
839 parquet_schema: &SchemaDescriptor,
840 arrow_schema: &ArrowSchemaRef,
841 type_promotion_is_valid: fn(Option<&PrimitiveType>, Option<&PrimitiveType>) -> bool,
842 ) -> Result<ProjectionMask> {
843 let mut column_map = HashMap::new();
844 let fields = arrow_schema.fields();
845
846 let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
849 let projected_arrow_schema = ArrowSchema::new_with_metadata(
850 fields.filter_leaves(|_, f| {
851 f.metadata()
852 .get(PARQUET_FIELD_ID_META_KEY)
853 .and_then(|field_id| i32::from_str(field_id).ok())
854 .is_some_and(|field_id| {
855 projected_fields.insert((*f).clone(), field_id);
856 leaf_field_ids.contains(&field_id)
857 })
858 }),
859 arrow_schema.metadata().clone(),
860 );
861 let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
862
863 fields.filter_leaves(|idx, field| {
864 let Some(field_id) = projected_fields.get(field).cloned() else {
865 return false;
866 };
867
868 let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
869 let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
870
871 if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
872 return false;
873 }
874
875 if !type_promotion_is_valid(
876 parquet_iceberg_field
877 .unwrap()
878 .field_type
879 .as_primitive_type(),
880 iceberg_field.unwrap().field_type.as_primitive_type(),
881 ) {
882 return false;
883 }
884
885 column_map.insert(field_id, idx);
886 true
887 });
888
889 let mut indices = vec![];
892 for field_id in leaf_field_ids {
893 if let Some(col_idx) = column_map.get(field_id) {
894 indices.push(*col_idx);
895 }
896 }
897
898 if indices.is_empty() {
899 Ok(ProjectionMask::all())
902 } else {
903 Ok(ProjectionMask::leaves(parquet_schema, indices))
904 }
905 }
906
907 fn get_arrow_projection_mask_fallback(
911 field_ids: &[i32],
912 parquet_schema: &SchemaDescriptor,
913 ) -> Result<ProjectionMask> {
914 let parquet_root_fields = parquet_schema.root_schema().get_fields();
916 let mut root_indices = vec![];
917
918 for field_id in field_ids.iter() {
919 let parquet_pos = (*field_id - 1) as usize;
920
921 if parquet_pos < parquet_root_fields.len() {
922 root_indices.push(parquet_pos);
923 }
924 }
926
927 if root_indices.is_empty() {
928 Ok(ProjectionMask::all())
929 } else {
930 Ok(ProjectionMask::roots(parquet_schema, root_indices))
931 }
932 }
933
934 fn get_row_filter(
935 predicates: &BoundPredicate,
936 parquet_schema: &SchemaDescriptor,
937 iceberg_field_ids: &HashSet<i32>,
938 field_id_map: &HashMap<i32, usize>,
939 ) -> Result<RowFilter> {
940 let mut column_indices = iceberg_field_ids
943 .iter()
944 .filter_map(|field_id| field_id_map.get(field_id).cloned())
945 .collect::<Vec<_>>();
946 column_indices.sort();
947
948 let mut converter = PredicateConverter {
950 parquet_schema,
951 column_map: field_id_map,
952 column_indices: &column_indices,
953 };
954
955 let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
958 let predicate_func = visit(&mut converter, predicates)?;
959 let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
960 Ok(RowFilter::new(vec![Box::new(arrow_predicate)]))
961 }
962
963 fn get_selected_row_group_indices(
964 predicate: &BoundPredicate,
965 parquet_metadata: &Arc<ParquetMetaData>,
966 field_id_map: &HashMap<i32, usize>,
967 snapshot_schema: &Schema,
968 ) -> Result<Vec<usize>> {
969 let row_groups_metadata = parquet_metadata.row_groups();
970 let mut results = Vec::with_capacity(row_groups_metadata.len());
971
972 for (idx, row_group_metadata) in row_groups_metadata.iter().enumerate() {
973 if RowGroupMetricsEvaluator::eval(
974 predicate,
975 row_group_metadata,
976 field_id_map,
977 snapshot_schema,
978 )? {
979 results.push(idx);
980 }
981 }
982
983 Ok(results)
984 }
985
986 fn get_row_selection_for_filter_predicate(
987 predicate: &BoundPredicate,
988 parquet_metadata: &Arc<ParquetMetaData>,
989 selected_row_groups: &Option<Vec<usize>>,
990 field_id_map: &HashMap<i32, usize>,
991 snapshot_schema: &Schema,
992 ) -> Result<RowSelection> {
993 let Some(column_index) = parquet_metadata.column_index() else {
994 return Err(Error::new(
995 ErrorKind::Unexpected,
996 "Parquet file metadata does not contain a column index",
997 ));
998 };
999
1000 let Some(offset_index) = parquet_metadata.offset_index() else {
1001 return Err(Error::new(
1002 ErrorKind::Unexpected,
1003 "Parquet file metadata does not contain an offset index",
1004 ));
1005 };
1006
1007 if let Some(selected_row_groups) = selected_row_groups
1009 && selected_row_groups.is_empty()
1010 {
1011 return Ok(RowSelection::from(Vec::new()));
1012 }
1013
1014 let mut selected_row_groups_idx = 0;
1015
1016 let page_index = column_index
1017 .iter()
1018 .enumerate()
1019 .zip(offset_index)
1020 .zip(parquet_metadata.row_groups());
1021
1022 let mut results = Vec::new();
1023 for (((idx, column_index), offset_index), row_group_metadata) in page_index {
1024 if let Some(selected_row_groups) = selected_row_groups {
1025 if idx == selected_row_groups[selected_row_groups_idx] {
1027 selected_row_groups_idx += 1;
1028 } else {
1029 continue;
1030 }
1031 }
1032
1033 let selections_for_page = PageIndexEvaluator::eval(
1034 predicate,
1035 column_index,
1036 offset_index,
1037 row_group_metadata,
1038 field_id_map,
1039 snapshot_schema,
1040 )?;
1041
1042 results.push(selections_for_page);
1043
1044 if let Some(selected_row_groups) = selected_row_groups
1045 && selected_row_groups_idx == selected_row_groups.len()
1046 {
1047 break;
1048 }
1049 }
1050
1051 Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
1052 }
1053
1054 fn filter_row_groups_by_byte_range(
1059 parquet_metadata: &Arc<ParquetMetaData>,
1060 start: u64,
1061 length: u64,
1062 ) -> Result<Vec<usize>> {
1063 let row_groups = parquet_metadata.row_groups();
1064 let mut selected = Vec::new();
1065 let end = start + length;
1066
1067 let mut current_byte_offset = 4u64;
1069
1070 for (idx, row_group) in row_groups.iter().enumerate() {
1071 let row_group_size = row_group.compressed_size() as u64;
1072 let row_group_end = current_byte_offset + row_group_size;
1073
1074 if current_byte_offset < end && start < row_group_end {
1075 selected.push(idx);
1076 }
1077
1078 current_byte_offset = row_group_end;
1079 }
1080
1081 Ok(selected)
1082 }
1083}
1084
1085fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<Option<HashMap<i32, usize>>> {
1088 let mut column_map = HashMap::new();
1089
1090 for (idx, field) in parquet_schema.columns().iter().enumerate() {
1091 let field_type = field.self_type();
1092 match field_type {
1093 ParquetType::PrimitiveType { basic_info, .. } => {
1094 if !basic_info.has_id() {
1095 return Ok(None);
1096 }
1097 column_map.insert(basic_info.id(), idx);
1098 }
1099 ParquetType::GroupType { .. } => {
1100 return Err(Error::new(
1101 ErrorKind::DataInvalid,
1102 format!(
1103 "Leave column in schema should be primitive type but got {field_type:?}"
1104 ),
1105 ));
1106 }
1107 };
1108 }
1109
1110 Ok(Some(column_map))
1111}
1112
1113fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap<i32, usize> {
1116 let mut column_map = HashMap::new();
1117
1118 for (idx, _field) in parquet_schema.columns().iter().enumerate() {
1120 let field_id = (idx + 1) as i32;
1121 column_map.insert(field_id, idx);
1122 }
1123
1124 column_map
1125}
1126
1127fn apply_name_mapping_to_arrow_schema(
1146 arrow_schema: ArrowSchemaRef,
1147 name_mapping: &NameMapping,
1148) -> Result<Arc<ArrowSchema>> {
1149 debug_assert!(
1150 arrow_schema
1151 .fields()
1152 .iter()
1153 .next()
1154 .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1155 "Schema already has field IDs - name mapping should not be applied"
1156 );
1157
1158 use arrow_schema::Field;
1159
1160 let fields_with_mapped_ids: Vec<_> = arrow_schema
1161 .fields()
1162 .iter()
1163 .map(|field| {
1164 let mapped_field_opt = name_mapping
1172 .fields()
1173 .iter()
1174 .find(|f| f.names().contains(&field.name().to_string()));
1175
1176 let mut metadata = field.metadata().clone();
1177
1178 if let Some(mapped_field) = mapped_field_opt
1179 && let Some(field_id) = mapped_field.field_id()
1180 {
1181 metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1183 }
1184 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1187 .with_metadata(metadata)
1188 })
1189 .collect();
1190
1191 Ok(Arc::new(ArrowSchema::new_with_metadata(
1192 fields_with_mapped_ids,
1193 arrow_schema.metadata().clone(),
1194 )))
1195}
1196
1197fn add_fallback_field_ids_to_arrow_schema(arrow_schema: &ArrowSchemaRef) -> Arc<ArrowSchema> {
1204 debug_assert!(
1205 arrow_schema
1206 .fields()
1207 .iter()
1208 .next()
1209 .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1210 "Schema already has field IDs"
1211 );
1212
1213 use arrow_schema::Field;
1214
1215 let fields_with_fallback_ids: Vec<_> = arrow_schema
1216 .fields()
1217 .iter()
1218 .enumerate()
1219 .map(|(pos, field)| {
1220 let mut metadata = field.metadata().clone();
1221 let field_id = (pos + 1) as i32; metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1223
1224 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1225 .with_metadata(metadata)
1226 })
1227 .collect();
1228
1229 Arc::new(ArrowSchema::new_with_metadata(
1230 fields_with_fallback_ids,
1231 arrow_schema.metadata().clone(),
1232 ))
1233}
1234
1235struct CollectFieldIdVisitor {
1237 field_ids: HashSet<i32>,
1238}
1239
1240impl CollectFieldIdVisitor {
1241 fn field_ids(self) -> HashSet<i32> {
1242 self.field_ids
1243 }
1244}
1245
1246impl BoundPredicateVisitor for CollectFieldIdVisitor {
1247 type T = ();
1248
1249 fn always_true(&mut self) -> Result<()> {
1250 Ok(())
1251 }
1252
1253 fn always_false(&mut self) -> Result<()> {
1254 Ok(())
1255 }
1256
1257 fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1258 Ok(())
1259 }
1260
1261 fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1262 Ok(())
1263 }
1264
1265 fn not(&mut self, _inner: ()) -> Result<()> {
1266 Ok(())
1267 }
1268
1269 fn is_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1270 self.field_ids.insert(reference.field().id);
1271 Ok(())
1272 }
1273
1274 fn not_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1275 self.field_ids.insert(reference.field().id);
1276 Ok(())
1277 }
1278
1279 fn is_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1280 self.field_ids.insert(reference.field().id);
1281 Ok(())
1282 }
1283
1284 fn not_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1285 self.field_ids.insert(reference.field().id);
1286 Ok(())
1287 }
1288
1289 fn less_than(
1290 &mut self,
1291 reference: &BoundReference,
1292 _literal: &Datum,
1293 _predicate: &BoundPredicate,
1294 ) -> Result<()> {
1295 self.field_ids.insert(reference.field().id);
1296 Ok(())
1297 }
1298
1299 fn less_than_or_eq(
1300 &mut self,
1301 reference: &BoundReference,
1302 _literal: &Datum,
1303 _predicate: &BoundPredicate,
1304 ) -> Result<()> {
1305 self.field_ids.insert(reference.field().id);
1306 Ok(())
1307 }
1308
1309 fn greater_than(
1310 &mut self,
1311 reference: &BoundReference,
1312 _literal: &Datum,
1313 _predicate: &BoundPredicate,
1314 ) -> Result<()> {
1315 self.field_ids.insert(reference.field().id);
1316 Ok(())
1317 }
1318
1319 fn greater_than_or_eq(
1320 &mut self,
1321 reference: &BoundReference,
1322 _literal: &Datum,
1323 _predicate: &BoundPredicate,
1324 ) -> Result<()> {
1325 self.field_ids.insert(reference.field().id);
1326 Ok(())
1327 }
1328
1329 fn eq(
1330 &mut self,
1331 reference: &BoundReference,
1332 _literal: &Datum,
1333 _predicate: &BoundPredicate,
1334 ) -> Result<()> {
1335 self.field_ids.insert(reference.field().id);
1336 Ok(())
1337 }
1338
1339 fn not_eq(
1340 &mut self,
1341 reference: &BoundReference,
1342 _literal: &Datum,
1343 _predicate: &BoundPredicate,
1344 ) -> Result<()> {
1345 self.field_ids.insert(reference.field().id);
1346 Ok(())
1347 }
1348
1349 fn starts_with(
1350 &mut self,
1351 reference: &BoundReference,
1352 _literal: &Datum,
1353 _predicate: &BoundPredicate,
1354 ) -> Result<()> {
1355 self.field_ids.insert(reference.field().id);
1356 Ok(())
1357 }
1358
1359 fn not_starts_with(
1360 &mut self,
1361 reference: &BoundReference,
1362 _literal: &Datum,
1363 _predicate: &BoundPredicate,
1364 ) -> Result<()> {
1365 self.field_ids.insert(reference.field().id);
1366 Ok(())
1367 }
1368
1369 fn r#in(
1370 &mut self,
1371 reference: &BoundReference,
1372 _literals: &FnvHashSet<Datum>,
1373 _predicate: &BoundPredicate,
1374 ) -> Result<()> {
1375 self.field_ids.insert(reference.field().id);
1376 Ok(())
1377 }
1378
1379 fn not_in(
1380 &mut self,
1381 reference: &BoundReference,
1382 _literals: &FnvHashSet<Datum>,
1383 _predicate: &BoundPredicate,
1384 ) -> Result<()> {
1385 self.field_ids.insert(reference.field().id);
1386 Ok(())
1387 }
1388}
1389
1390struct PredicateConverter<'a> {
1392 pub parquet_schema: &'a SchemaDescriptor,
1394 pub column_map: &'a HashMap<i32, usize>,
1396 pub column_indices: &'a Vec<usize>,
1398}
1399
1400impl PredicateConverter<'_> {
1401 fn bound_reference(&mut self, reference: &BoundReference) -> Result<Option<usize>> {
1406 if let Some(column_idx) = self.column_map.get(&reference.field().id) {
1408 if self.parquet_schema.get_column_root(*column_idx).is_group() {
1409 return Err(Error::new(
1410 ErrorKind::DataInvalid,
1411 format!(
1412 "Leave column `{}` in predicates isn't a root column in Parquet schema.",
1413 reference.field().name
1414 ),
1415 ));
1416 }
1417
1418 let index = self
1420 .column_indices
1421 .iter()
1422 .position(|&idx| idx == *column_idx)
1423 .ok_or(Error::new(
1424 ErrorKind::DataInvalid,
1425 format!(
1426 "Leave column `{}` in predicates cannot be found in the required column indices.",
1427 reference.field().name
1428 ),
1429 ))?;
1430
1431 Ok(Some(index))
1432 } else {
1433 Ok(None)
1434 }
1435 }
1436
1437 fn build_always_true(&self) -> Result<Box<PredicateResult>> {
1439 Ok(Box::new(|batch| {
1440 Ok(BooleanArray::from(vec![true; batch.num_rows()]))
1441 }))
1442 }
1443
1444 fn build_always_false(&self) -> Result<Box<PredicateResult>> {
1446 Ok(Box::new(|batch| {
1447 Ok(BooleanArray::from(vec![false; batch.num_rows()]))
1448 }))
1449 }
1450}
1451
1452fn project_column(
1455 batch: &RecordBatch,
1456 column_idx: usize,
1457) -> std::result::Result<ArrayRef, ArrowError> {
1458 let column = batch.column(column_idx);
1459
1460 match column.data_type() {
1461 DataType::Struct(_) => Err(ArrowError::SchemaError(
1462 "Does not support struct column yet.".to_string(),
1463 )),
1464 _ => Ok(column.clone()),
1465 }
1466}
1467
1468type PredicateResult =
1469 dyn FnMut(RecordBatch) -> std::result::Result<BooleanArray, ArrowError> + Send + 'static;
1470
1471impl BoundPredicateVisitor for PredicateConverter<'_> {
1472 type T = Box<PredicateResult>;
1473
1474 fn always_true(&mut self) -> Result<Box<PredicateResult>> {
1475 self.build_always_true()
1476 }
1477
1478 fn always_false(&mut self) -> Result<Box<PredicateResult>> {
1479 self.build_always_false()
1480 }
1481
1482 fn and(
1483 &mut self,
1484 mut lhs: Box<PredicateResult>,
1485 mut rhs: Box<PredicateResult>,
1486 ) -> Result<Box<PredicateResult>> {
1487 Ok(Box::new(move |batch| {
1488 let left = lhs(batch.clone())?;
1489 let right = rhs(batch)?;
1490 and_kleene(&left, &right)
1491 }))
1492 }
1493
1494 fn or(
1495 &mut self,
1496 mut lhs: Box<PredicateResult>,
1497 mut rhs: Box<PredicateResult>,
1498 ) -> Result<Box<PredicateResult>> {
1499 Ok(Box::new(move |batch| {
1500 let left = lhs(batch.clone())?;
1501 let right = rhs(batch)?;
1502 or_kleene(&left, &right)
1503 }))
1504 }
1505
1506 fn not(&mut self, mut inner: Box<PredicateResult>) -> Result<Box<PredicateResult>> {
1507 Ok(Box::new(move |batch| {
1508 let pred_ret = inner(batch)?;
1509 not(&pred_ret)
1510 }))
1511 }
1512
1513 fn is_null(
1514 &mut self,
1515 reference: &BoundReference,
1516 _predicate: &BoundPredicate,
1517 ) -> Result<Box<PredicateResult>> {
1518 if let Some(idx) = self.bound_reference(reference)? {
1519 Ok(Box::new(move |batch| {
1520 let column = project_column(&batch, idx)?;
1521 is_null(&column)
1522 }))
1523 } else {
1524 self.build_always_true()
1526 }
1527 }
1528
1529 fn not_null(
1530 &mut self,
1531 reference: &BoundReference,
1532 _predicate: &BoundPredicate,
1533 ) -> Result<Box<PredicateResult>> {
1534 if let Some(idx) = self.bound_reference(reference)? {
1535 Ok(Box::new(move |batch| {
1536 let column = project_column(&batch, idx)?;
1537 is_not_null(&column)
1538 }))
1539 } else {
1540 self.build_always_false()
1542 }
1543 }
1544
1545 fn is_nan(
1546 &mut self,
1547 reference: &BoundReference,
1548 _predicate: &BoundPredicate,
1549 ) -> Result<Box<PredicateResult>> {
1550 if self.bound_reference(reference)?.is_some() {
1551 self.build_always_true()
1552 } else {
1553 self.build_always_false()
1555 }
1556 }
1557
1558 fn not_nan(
1559 &mut self,
1560 reference: &BoundReference,
1561 _predicate: &BoundPredicate,
1562 ) -> Result<Box<PredicateResult>> {
1563 if self.bound_reference(reference)?.is_some() {
1564 self.build_always_false()
1565 } else {
1566 self.build_always_true()
1568 }
1569 }
1570
1571 fn less_than(
1572 &mut self,
1573 reference: &BoundReference,
1574 literal: &Datum,
1575 _predicate: &BoundPredicate,
1576 ) -> Result<Box<PredicateResult>> {
1577 if let Some(idx) = self.bound_reference(reference)? {
1578 let literal = get_arrow_datum(literal)?;
1579
1580 Ok(Box::new(move |batch| {
1581 let left = project_column(&batch, idx)?;
1582 let literal = try_cast_literal(&literal, left.data_type())?;
1583 lt(&left, literal.as_ref())
1584 }))
1585 } else {
1586 self.build_always_true()
1588 }
1589 }
1590
1591 fn less_than_or_eq(
1592 &mut self,
1593 reference: &BoundReference,
1594 literal: &Datum,
1595 _predicate: &BoundPredicate,
1596 ) -> Result<Box<PredicateResult>> {
1597 if let Some(idx) = self.bound_reference(reference)? {
1598 let literal = get_arrow_datum(literal)?;
1599
1600 Ok(Box::new(move |batch| {
1601 let left = project_column(&batch, idx)?;
1602 let literal = try_cast_literal(&literal, left.data_type())?;
1603 lt_eq(&left, literal.as_ref())
1604 }))
1605 } else {
1606 self.build_always_true()
1608 }
1609 }
1610
1611 fn greater_than(
1612 &mut self,
1613 reference: &BoundReference,
1614 literal: &Datum,
1615 _predicate: &BoundPredicate,
1616 ) -> Result<Box<PredicateResult>> {
1617 if let Some(idx) = self.bound_reference(reference)? {
1618 let literal = get_arrow_datum(literal)?;
1619
1620 Ok(Box::new(move |batch| {
1621 let left = project_column(&batch, idx)?;
1622 let literal = try_cast_literal(&literal, left.data_type())?;
1623 gt(&left, literal.as_ref())
1624 }))
1625 } else {
1626 self.build_always_false()
1628 }
1629 }
1630
1631 fn greater_than_or_eq(
1632 &mut self,
1633 reference: &BoundReference,
1634 literal: &Datum,
1635 _predicate: &BoundPredicate,
1636 ) -> Result<Box<PredicateResult>> {
1637 if let Some(idx) = self.bound_reference(reference)? {
1638 let literal = get_arrow_datum(literal)?;
1639
1640 Ok(Box::new(move |batch| {
1641 let left = project_column(&batch, idx)?;
1642 let literal = try_cast_literal(&literal, left.data_type())?;
1643 gt_eq(&left, literal.as_ref())
1644 }))
1645 } else {
1646 self.build_always_false()
1648 }
1649 }
1650
1651 fn eq(
1652 &mut self,
1653 reference: &BoundReference,
1654 literal: &Datum,
1655 _predicate: &BoundPredicate,
1656 ) -> Result<Box<PredicateResult>> {
1657 if let Some(idx) = self.bound_reference(reference)? {
1658 let literal = get_arrow_datum(literal)?;
1659
1660 Ok(Box::new(move |batch| {
1661 let left = project_column(&batch, idx)?;
1662 let literal = try_cast_literal(&literal, left.data_type())?;
1663 eq(&left, literal.as_ref())
1664 }))
1665 } else {
1666 self.build_always_false()
1668 }
1669 }
1670
1671 fn not_eq(
1672 &mut self,
1673 reference: &BoundReference,
1674 literal: &Datum,
1675 _predicate: &BoundPredicate,
1676 ) -> Result<Box<PredicateResult>> {
1677 if let Some(idx) = self.bound_reference(reference)? {
1678 let literal = get_arrow_datum(literal)?;
1679
1680 Ok(Box::new(move |batch| {
1681 let left = project_column(&batch, idx)?;
1682 let literal = try_cast_literal(&literal, left.data_type())?;
1683 neq(&left, literal.as_ref())
1684 }))
1685 } else {
1686 self.build_always_false()
1688 }
1689 }
1690
1691 fn starts_with(
1692 &mut self,
1693 reference: &BoundReference,
1694 literal: &Datum,
1695 _predicate: &BoundPredicate,
1696 ) -> Result<Box<PredicateResult>> {
1697 if let Some(idx) = self.bound_reference(reference)? {
1698 let literal = get_arrow_datum(literal)?;
1699
1700 Ok(Box::new(move |batch| {
1701 let left = project_column(&batch, idx)?;
1702 let literal = try_cast_literal(&literal, left.data_type())?;
1703 starts_with(&left, literal.as_ref())
1704 }))
1705 } else {
1706 self.build_always_false()
1708 }
1709 }
1710
1711 fn not_starts_with(
1712 &mut self,
1713 reference: &BoundReference,
1714 literal: &Datum,
1715 _predicate: &BoundPredicate,
1716 ) -> Result<Box<PredicateResult>> {
1717 if let Some(idx) = self.bound_reference(reference)? {
1718 let literal = get_arrow_datum(literal)?;
1719
1720 Ok(Box::new(move |batch| {
1721 let left = project_column(&batch, idx)?;
1722 let literal = try_cast_literal(&literal, left.data_type())?;
1723 not(&starts_with(&left, literal.as_ref())?)
1725 }))
1726 } else {
1727 self.build_always_true()
1729 }
1730 }
1731
1732 fn r#in(
1733 &mut self,
1734 reference: &BoundReference,
1735 literals: &FnvHashSet<Datum>,
1736 _predicate: &BoundPredicate,
1737 ) -> Result<Box<PredicateResult>> {
1738 if let Some(idx) = self.bound_reference(reference)? {
1739 let literals: Vec<_> = literals
1740 .iter()
1741 .map(|lit| get_arrow_datum(lit).unwrap())
1742 .collect();
1743
1744 Ok(Box::new(move |batch| {
1745 let left = project_column(&batch, idx)?;
1747
1748 let mut acc = BooleanArray::from(vec![false; batch.num_rows()]);
1749 for literal in &literals {
1750 let literal = try_cast_literal(literal, left.data_type())?;
1751 acc = or(&acc, &eq(&left, literal.as_ref())?)?
1752 }
1753
1754 Ok(acc)
1755 }))
1756 } else {
1757 self.build_always_false()
1759 }
1760 }
1761
1762 fn not_in(
1763 &mut self,
1764 reference: &BoundReference,
1765 literals: &FnvHashSet<Datum>,
1766 _predicate: &BoundPredicate,
1767 ) -> Result<Box<PredicateResult>> {
1768 if let Some(idx) = self.bound_reference(reference)? {
1769 let literals: Vec<_> = literals
1770 .iter()
1771 .map(|lit| get_arrow_datum(lit).unwrap())
1772 .collect();
1773
1774 Ok(Box::new(move |batch| {
1775 let left = project_column(&batch, idx)?;
1777 let mut acc = BooleanArray::from(vec![true; batch.num_rows()]);
1778 for literal in &literals {
1779 let literal = try_cast_literal(literal, left.data_type())?;
1780 acc = and(&acc, &neq(&left, literal.as_ref())?)?
1781 }
1782
1783 Ok(acc)
1784 }))
1785 } else {
1786 self.build_always_true()
1788 }
1789 }
1790}
1791
1792pub struct ArrowFileReader {
1794 meta: FileMetadata,
1795 parquet_read_options: ParquetReadOptions,
1796 r: Box<dyn FileRead>,
1797}
1798
1799impl ArrowFileReader {
1800 pub fn new(meta: FileMetadata, r: Box<dyn FileRead>) -> Self {
1802 Self {
1803 meta,
1804 parquet_read_options: ParquetReadOptions::builder().build(),
1805 r,
1806 }
1807 }
1808
1809 pub(crate) fn with_parquet_read_options(mut self, options: ParquetReadOptions) -> Self {
1811 self.parquet_read_options = options;
1812 self
1813 }
1814}
1815
1816impl AsyncFileReader for ArrowFileReader {
1817 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
1818 Box::pin(
1819 self.r
1820 .read(range.start..range.end)
1821 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
1822 )
1823 }
1824
1825 fn get_byte_ranges(
1830 &mut self,
1831 ranges: Vec<Range<u64>>,
1832 ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
1833 let coalesce_bytes = self.parquet_read_options.range_coalesce_bytes();
1834 let concurrency = self.parquet_read_options.range_fetch_concurrency().max(1);
1835
1836 async move {
1837 let fetch_ranges = merge_ranges(&ranges, coalesce_bytes);
1839 let r = &self.r;
1840
1841 let fetched: Vec<Bytes> = futures::stream::iter(fetch_ranges.iter().cloned())
1843 .map(|range| async move {
1844 r.read(range)
1845 .await
1846 .map_err(|e| parquet::errors::ParquetError::External(Box::new(e)))
1847 })
1848 .buffered(concurrency)
1849 .try_collect()
1850 .await?;
1851
1852 Ok(ranges
1854 .iter()
1855 .map(|range| {
1856 let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1;
1857 let fetch_range = &fetch_ranges[idx];
1858 let fetch_bytes = &fetched[idx];
1859 let start = (range.start - fetch_range.start) as usize;
1860 let end = (range.end - fetch_range.start) as usize;
1861 fetch_bytes.slice(start..end.min(fetch_bytes.len()))
1862 })
1863 .collect())
1864 }
1865 .boxed()
1866 }
1867
1868 fn get_metadata(
1871 &mut self,
1872 _options: Option<&'_ ArrowReaderOptions>,
1873 ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
1874 async move {
1875 let reader = ParquetMetaDataReader::new()
1876 .with_prefetch_hint(self.parquet_read_options.metadata_size_hint())
1877 .with_page_index_policy(PageIndexPolicy::from(
1879 self.parquet_read_options.preload_page_index(),
1880 ))
1881 .with_column_index_policy(PageIndexPolicy::from(
1882 self.parquet_read_options.preload_column_index(),
1883 ))
1884 .with_offset_index_policy(PageIndexPolicy::from(
1885 self.parquet_read_options.preload_offset_index(),
1886 ));
1887 let size = self.meta.size;
1888 let meta = reader.load_and_finish(self, size).await?;
1889
1890 Ok(Arc::new(meta))
1891 }
1892 .boxed()
1893 }
1894}
1895
1896fn merge_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
1899 if ranges.is_empty() {
1900 return vec![];
1901 }
1902
1903 let mut ranges = ranges.to_vec();
1904 ranges.sort_unstable_by_key(|r| r.start);
1905
1906 let mut merged = Vec::with_capacity(ranges.len());
1907 let mut start_idx = 0;
1908 let mut end_idx = 1;
1909
1910 while start_idx != ranges.len() {
1911 let mut range_end = ranges[start_idx].end;
1912
1913 while end_idx != ranges.len()
1914 && ranges[end_idx]
1915 .start
1916 .checked_sub(range_end)
1917 .map(|delta| delta <= coalesce)
1918 .unwrap_or(true)
1919 {
1920 range_end = range_end.max(ranges[end_idx].end);
1921 end_idx += 1;
1922 }
1923
1924 merged.push(ranges[start_idx].start..range_end);
1925 start_idx = end_idx;
1926 end_idx += 1;
1927 }
1928
1929 merged
1930}
1931
1932fn try_cast_literal(
1939 literal: &Arc<dyn ArrowDatum + Send + Sync>,
1940 column_type: &DataType,
1941) -> std::result::Result<Arc<dyn ArrowDatum + Send + Sync>, ArrowError> {
1942 let literal_array = literal.get().0;
1943
1944 if literal_array.data_type() == column_type {
1946 return Ok(Arc::clone(literal));
1947 }
1948
1949 let literal_array = cast(literal_array, column_type)?;
1950 Ok(Arc::new(Scalar::new(literal_array)))
1951}
1952
1953#[cfg(test)]
1954mod tests {
1955 use std::collections::{HashMap, HashSet};
1956 use std::fs::File;
1957 use std::ops::Range;
1958 use std::sync::Arc;
1959
1960 use arrow_array::cast::AsArray;
1961 use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray};
1962 use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1963 use futures::TryStreamExt;
1964 use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
1965 use parquet::arrow::{ArrowWriter, ProjectionMask};
1966 use parquet::basic::Compression;
1967 use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1968 use parquet::file::properties::WriterProperties;
1969 use parquet::schema::parser::parse_message_type;
1970 use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
1971 use roaring::RoaringTreemap;
1972 use tempfile::TempDir;
1973
1974 use crate::ErrorKind;
1975 use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
1976 use crate::arrow::{ArrowReader, ArrowReaderBuilder};
1977 use crate::delete_vector::DeleteVector;
1978 use crate::expr::visitors::bound_predicate_visitor::visit;
1979 use crate::expr::{Bind, Predicate, Reference};
1980 use crate::io::FileIO;
1981 use crate::scan::{FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream};
1982 use crate::spec::{
1983 DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type,
1984 };
1985
1986 fn table_schema_simple() -> SchemaRef {
1987 Arc::new(
1988 Schema::builder()
1989 .with_schema_id(1)
1990 .with_identifier_field_ids(vec![2])
1991 .with_fields(vec![
1992 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1993 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1994 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1995 NestedField::optional(4, "qux", Type::Primitive(PrimitiveType::Float)).into(),
1996 ])
1997 .build()
1998 .unwrap(),
1999 )
2000 }
2001
2002 #[test]
2003 fn test_collect_field_id() {
2004 let schema = table_schema_simple();
2005 let expr = Reference::new("qux").is_null();
2006 let bound_expr = expr.bind(schema, true).unwrap();
2007
2008 let mut visitor = CollectFieldIdVisitor {
2009 field_ids: HashSet::default(),
2010 };
2011 visit(&mut visitor, &bound_expr).unwrap();
2012
2013 let mut expected = HashSet::default();
2014 expected.insert(4_i32);
2015
2016 assert_eq!(visitor.field_ids, expected);
2017 }
2018
2019 #[test]
2020 fn test_collect_field_id_with_and() {
2021 let schema = table_schema_simple();
2022 let expr = Reference::new("qux")
2023 .is_null()
2024 .and(Reference::new("baz").is_null());
2025 let bound_expr = expr.bind(schema, true).unwrap();
2026
2027 let mut visitor = CollectFieldIdVisitor {
2028 field_ids: HashSet::default(),
2029 };
2030 visit(&mut visitor, &bound_expr).unwrap();
2031
2032 let mut expected = HashSet::default();
2033 expected.insert(4_i32);
2034 expected.insert(3);
2035
2036 assert_eq!(visitor.field_ids, expected);
2037 }
2038
2039 #[test]
2040 fn test_collect_field_id_with_or() {
2041 let schema = table_schema_simple();
2042 let expr = Reference::new("qux")
2043 .is_null()
2044 .or(Reference::new("baz").is_null());
2045 let bound_expr = expr.bind(schema, true).unwrap();
2046
2047 let mut visitor = CollectFieldIdVisitor {
2048 field_ids: HashSet::default(),
2049 };
2050 visit(&mut visitor, &bound_expr).unwrap();
2051
2052 let mut expected = HashSet::default();
2053 expected.insert(4_i32);
2054 expected.insert(3);
2055
2056 assert_eq!(visitor.field_ids, expected);
2057 }
2058
2059 #[test]
2060 fn test_arrow_projection_mask() {
2061 let schema = Arc::new(
2062 Schema::builder()
2063 .with_schema_id(1)
2064 .with_identifier_field_ids(vec![1])
2065 .with_fields(vec![
2066 NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
2067 NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
2068 NestedField::optional(
2069 3,
2070 "c3",
2071 Type::Primitive(PrimitiveType::Decimal {
2072 precision: 38,
2073 scale: 3,
2074 }),
2075 )
2076 .into(),
2077 ])
2078 .build()
2079 .unwrap(),
2080 );
2081 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2082 Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
2083 PARQUET_FIELD_ID_META_KEY.to_string(),
2084 "1".to_string(),
2085 )])),
2086 Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
2088 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
2089 ),
2090 Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
2092 PARQUET_FIELD_ID_META_KEY.to_string(),
2093 "3".to_string(),
2094 )])),
2095 ]));
2096
2097 let message_type = "
2098message schema {
2099 required binary c1 (STRING) = 1;
2100 optional int32 c2 (INTEGER(8,true)) = 2;
2101 optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
2102}
2103 ";
2104 let parquet_type = parse_message_type(message_type).expect("should parse schema");
2105 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
2106
2107 let err = ArrowReader::get_arrow_projection_mask(
2109 &[1, 2, 3],
2110 &schema,
2111 &parquet_schema,
2112 &arrow_schema,
2113 false,
2114 )
2115 .unwrap_err();
2116
2117 assert_eq!(err.kind(), ErrorKind::DataInvalid);
2118 assert_eq!(
2119 err.to_string(),
2120 "DataInvalid => Unsupported Arrow data type: Duration(µs)".to_string()
2121 );
2122
2123 let err = ArrowReader::get_arrow_projection_mask(
2125 &[1, 3],
2126 &schema,
2127 &parquet_schema,
2128 &arrow_schema,
2129 false,
2130 )
2131 .unwrap_err();
2132
2133 assert_eq!(err.kind(), ErrorKind::DataInvalid);
2134 assert_eq!(
2135 err.to_string(),
2136 "DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
2137 );
2138
2139 let mask = ArrowReader::get_arrow_projection_mask(
2141 &[1],
2142 &schema,
2143 &parquet_schema,
2144 &arrow_schema,
2145 false,
2146 )
2147 .expect("Some ProjectionMask");
2148 assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
2149 }
2150
2151 #[tokio::test]
2152 async fn test_kleene_logic_or_behaviour() {
2153 let predicate = Reference::new("a")
2155 .is_null()
2156 .or(Reference::new("a").equal_to(Datum::string("foo")));
2157
2158 let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
2160
2161 let expected = vec![None, Some("foo".to_string())];
2163
2164 let (file_io, schema, table_location, _temp_dir) =
2165 setup_kleene_logic(data_for_col_a, DataType::Utf8);
2166 let reader = ArrowReaderBuilder::new(file_io).build();
2167
2168 let result_data = test_perform_read(predicate, schema, table_location, reader).await;
2169
2170 assert_eq!(result_data, expected);
2171 }
2172
2173 #[tokio::test]
2174 async fn test_kleene_logic_and_behaviour() {
2175 let predicate = Reference::new("a")
2177 .is_not_null()
2178 .and(Reference::new("a").not_equal_to(Datum::string("foo")));
2179
2180 let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
2182
2183 let expected = vec![Some("bar".to_string())];
2185
2186 let (file_io, schema, table_location, _temp_dir) =
2187 setup_kleene_logic(data_for_col_a, DataType::Utf8);
2188 let reader = ArrowReaderBuilder::new(file_io).build();
2189
2190 let result_data = test_perform_read(predicate, schema, table_location, reader).await;
2191
2192 assert_eq!(result_data, expected);
2193 }
2194
2195 #[tokio::test]
2196 async fn test_predicate_cast_literal() {
2197 let predicates = vec![
2198 (Reference::new("a").equal_to(Datum::string("foo")), vec![
2200 Some("foo".to_string()),
2201 ]),
2202 (
2204 Reference::new("a").not_equal_to(Datum::string("foo")),
2205 vec![Some("bar".to_string())],
2206 ),
2207 (Reference::new("a").starts_with(Datum::string("f")), vec![
2209 Some("foo".to_string()),
2210 ]),
2211 (
2213 Reference::new("a").not_starts_with(Datum::string("f")),
2214 vec![Some("bar".to_string())],
2215 ),
2216 (Reference::new("a").less_than(Datum::string("foo")), vec![
2218 Some("bar".to_string()),
2219 ]),
2220 (
2222 Reference::new("a").less_than_or_equal_to(Datum::string("foo")),
2223 vec![Some("foo".to_string()), Some("bar".to_string())],
2224 ),
2225 (
2227 Reference::new("a").greater_than(Datum::string("bar")),
2228 vec![Some("foo".to_string())],
2229 ),
2230 (
2232 Reference::new("a").greater_than_or_equal_to(Datum::string("foo")),
2233 vec![Some("foo".to_string())],
2234 ),
2235 (
2237 Reference::new("a").is_in([Datum::string("foo"), Datum::string("baz")]),
2238 vec![Some("foo".to_string())],
2239 ),
2240 (
2242 Reference::new("a").is_not_in([Datum::string("foo"), Datum::string("baz")]),
2243 vec![Some("bar".to_string())],
2244 ),
2245 ];
2246
2247 let data_for_col_a = vec![Some("foo".to_string()), Some("bar".to_string())];
2249
2250 let (file_io, schema, table_location, _temp_dir) =
2251 setup_kleene_logic(data_for_col_a, DataType::LargeUtf8);
2252 let reader = ArrowReaderBuilder::new(file_io).build();
2253
2254 for (predicate, expected) in predicates {
2255 println!("testing predicate {predicate}");
2256 let result_data = test_perform_read(
2257 predicate.clone(),
2258 schema.clone(),
2259 table_location.clone(),
2260 reader.clone(),
2261 )
2262 .await;
2263
2264 assert_eq!(result_data, expected, "predicate={predicate}");
2265 }
2266 }
2267
2268 async fn test_perform_read(
2269 predicate: Predicate,
2270 schema: SchemaRef,
2271 table_location: String,
2272 reader: ArrowReader,
2273 ) -> Vec<Option<String>> {
2274 let tasks = Box::pin(futures::stream::iter(
2275 vec![Ok(FileScanTask {
2276 file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
2277 .unwrap()
2278 .len(),
2279 start: 0,
2280 length: 0,
2281 record_count: None,
2282 data_file_path: format!("{table_location}/1.parquet"),
2283 data_file_format: DataFileFormat::Parquet,
2284 schema: schema.clone(),
2285 project_field_ids: vec![1],
2286 predicate: Some(predicate.bind(schema, true).unwrap()),
2287 deletes: vec![],
2288 partition: None,
2289 partition_spec: None,
2290 name_mapping: None,
2291 case_sensitive: false,
2292 })]
2293 .into_iter(),
2294 )) as FileScanTaskStream;
2295
2296 let result = reader
2297 .read(tasks)
2298 .unwrap()
2299 .try_collect::<Vec<RecordBatch>>()
2300 .await
2301 .unwrap();
2302
2303 result[0].columns()[0]
2304 .as_string_opt::<i32>()
2305 .unwrap()
2306 .iter()
2307 .map(|v| v.map(ToOwned::to_owned))
2308 .collect::<Vec<_>>()
2309 }
2310
2311 fn setup_kleene_logic(
2312 data_for_col_a: Vec<Option<String>>,
2313 col_a_type: DataType,
2314 ) -> (FileIO, SchemaRef, String, TempDir) {
2315 let schema = Arc::new(
2316 Schema::builder()
2317 .with_schema_id(1)
2318 .with_fields(vec![
2319 NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(),
2320 ])
2321 .build()
2322 .unwrap(),
2323 );
2324
2325 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2326 Field::new("a", col_a_type.clone(), true).with_metadata(HashMap::from([(
2327 PARQUET_FIELD_ID_META_KEY.to_string(),
2328 "1".to_string(),
2329 )])),
2330 ]));
2331
2332 let tmp_dir = TempDir::new().unwrap();
2333 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2334
2335 let file_io = FileIO::new_with_fs();
2336
2337 let col = match col_a_type {
2338 DataType::Utf8 => Arc::new(StringArray::from(data_for_col_a)) as ArrayRef,
2339 DataType::LargeUtf8 => Arc::new(LargeStringArray::from(data_for_col_a)) as ArrayRef,
2340 _ => panic!("unexpected col_a_type"),
2341 };
2342
2343 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col]).unwrap();
2344
2345 let props = WriterProperties::builder()
2347 .set_compression(Compression::SNAPPY)
2348 .build();
2349
2350 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
2351 let mut writer =
2352 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
2353
2354 writer.write(&to_write).expect("Writing batch");
2355
2356 writer.close().unwrap();
2358
2359 (file_io, schema, table_location, tmp_dir)
2360 }
2361
2362 #[test]
2363 fn test_build_deletes_row_selection() {
2364 let schema_descr = get_test_schema_descr();
2365
2366 let mut columns = vec![];
2367 for ptr in schema_descr.columns() {
2368 let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
2369 columns.push(column);
2370 }
2371
2372 let row_groups_metadata = vec![
2373 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 0),
2374 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 1),
2375 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 2),
2376 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 3),
2377 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 4),
2378 ];
2379
2380 let selected_row_groups = Some(vec![1, 3]);
2381
2382 let positional_deletes = RoaringTreemap::from_iter(&[
2389 1, 3, 4, 5, 998, 999, 1000, 1010, 1011, 1012, 1498, 1499, 1500, 1501, 1600, 1999, 2000, 2001, 2100, 2200, 2201, 2202, 2999, 3000, ]);
2407
2408 let positional_deletes = DeleteVector::new(positional_deletes);
2409
2410 let result = ArrowReader::build_deletes_row_selection(
2412 &row_groups_metadata,
2413 &selected_row_groups,
2414 &positional_deletes,
2415 )
2416 .unwrap();
2417
2418 let expected = RowSelection::from(vec![
2419 RowSelector::skip(1),
2420 RowSelector::select(9),
2421 RowSelector::skip(3),
2422 RowSelector::select(485),
2423 RowSelector::skip(4),
2424 RowSelector::select(98),
2425 RowSelector::skip(1),
2426 RowSelector::select(99),
2427 RowSelector::skip(3),
2428 RowSelector::select(796),
2429 RowSelector::skip(1),
2430 ]);
2431
2432 assert_eq!(result, expected);
2433
2434 let result = ArrowReader::build_deletes_row_selection(
2436 &row_groups_metadata,
2437 &None,
2438 &positional_deletes,
2439 )
2440 .unwrap();
2441
2442 let expected = RowSelection::from(vec![
2443 RowSelector::select(1),
2444 RowSelector::skip(1),
2445 RowSelector::select(1),
2446 RowSelector::skip(3),
2447 RowSelector::select(992),
2448 RowSelector::skip(3),
2449 RowSelector::select(9),
2450 RowSelector::skip(3),
2451 RowSelector::select(485),
2452 RowSelector::skip(4),
2453 RowSelector::select(98),
2454 RowSelector::skip(1),
2455 RowSelector::select(398),
2456 RowSelector::skip(3),
2457 RowSelector::select(98),
2458 RowSelector::skip(1),
2459 RowSelector::select(99),
2460 RowSelector::skip(3),
2461 RowSelector::select(796),
2462 RowSelector::skip(2),
2463 RowSelector::select(499),
2464 ]);
2465
2466 assert_eq!(result, expected);
2467 }
2468
2469 fn build_test_row_group_meta(
2470 schema_descr: SchemaDescPtr,
2471 columns: Vec<ColumnChunkMetaData>,
2472 num_rows: i64,
2473 ordinal: i16,
2474 ) -> RowGroupMetaData {
2475 RowGroupMetaData::builder(schema_descr.clone())
2476 .set_num_rows(num_rows)
2477 .set_total_byte_size(2000)
2478 .set_column_metadata(columns)
2479 .set_ordinal(ordinal)
2480 .build()
2481 .unwrap()
2482 }
2483
2484 fn get_test_schema_descr() -> SchemaDescPtr {
2485 use parquet::schema::types::Type as SchemaType;
2486
2487 let schema = SchemaType::group_type_builder("schema")
2488 .with_fields(vec![
2489 Arc::new(
2490 SchemaType::primitive_type_builder("a", parquet::basic::Type::INT32)
2491 .build()
2492 .unwrap(),
2493 ),
2494 Arc::new(
2495 SchemaType::primitive_type_builder("b", parquet::basic::Type::INT32)
2496 .build()
2497 .unwrap(),
2498 ),
2499 ])
2500 .build()
2501 .unwrap();
2502
2503 Arc::new(SchemaDescriptor::new(Arc::new(schema)))
2504 }
2505
2506 #[tokio::test]
2508 async fn test_file_splits_respect_byte_ranges() {
2509 use arrow_array::Int32Array;
2510 use parquet::file::reader::{FileReader, SerializedFileReader};
2511
2512 let schema = Arc::new(
2513 Schema::builder()
2514 .with_schema_id(1)
2515 .with_fields(vec![
2516 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2517 ])
2518 .build()
2519 .unwrap(),
2520 );
2521
2522 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2523 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2524 PARQUET_FIELD_ID_META_KEY.to_string(),
2525 "1".to_string(),
2526 )])),
2527 ]));
2528
2529 let tmp_dir = TempDir::new().unwrap();
2530 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2531 let file_path = format!("{table_location}/multi_row_group.parquet");
2532
2533 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2535 (0..100).collect::<Vec<i32>>(),
2536 ))])
2537 .unwrap();
2538 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2539 (100..200).collect::<Vec<i32>>(),
2540 ))])
2541 .unwrap();
2542 let batch3 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2543 (200..300).collect::<Vec<i32>>(),
2544 ))])
2545 .unwrap();
2546
2547 let props = WriterProperties::builder()
2548 .set_compression(Compression::SNAPPY)
2549 .set_max_row_group_size(100)
2550 .build();
2551
2552 let file = File::create(&file_path).unwrap();
2553 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2554 writer.write(&batch1).expect("Writing batch 1");
2555 writer.write(&batch2).expect("Writing batch 2");
2556 writer.write(&batch3).expect("Writing batch 3");
2557 writer.close().unwrap();
2558
2559 let file = File::open(&file_path).unwrap();
2561 let reader = SerializedFileReader::new(file).unwrap();
2562 let metadata = reader.metadata();
2563
2564 println!("File has {} row groups", metadata.num_row_groups());
2565 assert_eq!(metadata.num_row_groups(), 3, "Expected 3 row groups");
2566
2567 let row_group_0 = metadata.row_group(0);
2569 let row_group_1 = metadata.row_group(1);
2570 let row_group_2 = metadata.row_group(2);
2571
2572 let rg0_start = 4u64; let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2574 let rg2_start = rg1_start + row_group_1.compressed_size() as u64;
2575 let file_end = rg2_start + row_group_2.compressed_size() as u64;
2576
2577 println!(
2578 "Row group 0: {} rows, starts at byte {}, {} bytes compressed",
2579 row_group_0.num_rows(),
2580 rg0_start,
2581 row_group_0.compressed_size()
2582 );
2583 println!(
2584 "Row group 1: {} rows, starts at byte {}, {} bytes compressed",
2585 row_group_1.num_rows(),
2586 rg1_start,
2587 row_group_1.compressed_size()
2588 );
2589 println!(
2590 "Row group 2: {} rows, starts at byte {}, {} bytes compressed",
2591 row_group_2.num_rows(),
2592 rg2_start,
2593 row_group_2.compressed_size()
2594 );
2595
2596 let file_io = FileIO::new_with_fs();
2597 let reader = ArrowReaderBuilder::new(file_io).build();
2598
2599 let task1 = FileScanTask {
2601 file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(),
2602 start: rg0_start,
2603 length: row_group_0.compressed_size() as u64,
2604 record_count: Some(100),
2605 data_file_path: file_path.clone(),
2606 data_file_format: DataFileFormat::Parquet,
2607 schema: schema.clone(),
2608 project_field_ids: vec![1],
2609 predicate: None,
2610 deletes: vec![],
2611 partition: None,
2612 partition_spec: None,
2613 name_mapping: None,
2614 case_sensitive: false,
2615 };
2616
2617 let task2 = FileScanTask {
2619 file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(),
2620 start: rg1_start,
2621 length: file_end - rg1_start,
2622 record_count: Some(200),
2623 data_file_path: file_path.clone(),
2624 data_file_format: DataFileFormat::Parquet,
2625 schema: schema.clone(),
2626 project_field_ids: vec![1],
2627 predicate: None,
2628 deletes: vec![],
2629 partition: None,
2630 partition_spec: None,
2631 name_mapping: None,
2632 case_sensitive: false,
2633 };
2634
2635 let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
2636 let result1 = reader
2637 .clone()
2638 .read(tasks1)
2639 .unwrap()
2640 .try_collect::<Vec<RecordBatch>>()
2641 .await
2642 .unwrap();
2643
2644 let total_rows_task1: usize = result1.iter().map(|b| b.num_rows()).sum();
2645 println!(
2646 "Task 1 (bytes {}-{}) returned {} rows",
2647 rg0_start,
2648 rg0_start + row_group_0.compressed_size() as u64,
2649 total_rows_task1
2650 );
2651
2652 let tasks2 = Box::pin(futures::stream::iter(vec![Ok(task2)])) as FileScanTaskStream;
2653 let result2 = reader
2654 .read(tasks2)
2655 .unwrap()
2656 .try_collect::<Vec<RecordBatch>>()
2657 .await
2658 .unwrap();
2659
2660 let total_rows_task2: usize = result2.iter().map(|b| b.num_rows()).sum();
2661 println!("Task 2 (bytes {rg1_start}-{file_end}) returned {total_rows_task2} rows");
2662
2663 assert_eq!(
2664 total_rows_task1, 100,
2665 "Task 1 should read only the first row group (100 rows), but got {total_rows_task1} rows"
2666 );
2667
2668 assert_eq!(
2669 total_rows_task2, 200,
2670 "Task 2 should read only the second+third row groups (200 rows), but got {total_rows_task2} rows"
2671 );
2672
2673 if total_rows_task1 > 0 {
2675 let first_batch = &result1[0];
2676 let id_col = first_batch
2677 .column(0)
2678 .as_primitive::<arrow_array::types::Int32Type>();
2679 let first_val = id_col.value(0);
2680 let last_val = id_col.value(id_col.len() - 1);
2681 println!("Task 1 data range: {first_val} to {last_val}");
2682
2683 assert_eq!(first_val, 0, "Task 1 should start with id=0");
2684 assert_eq!(last_val, 99, "Task 1 should end with id=99");
2685 }
2686
2687 if total_rows_task2 > 0 {
2688 let first_batch = &result2[0];
2689 let id_col = first_batch
2690 .column(0)
2691 .as_primitive::<arrow_array::types::Int32Type>();
2692 let first_val = id_col.value(0);
2693 println!("Task 2 first value: {first_val}");
2694
2695 assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0");
2696 }
2697 }
2698
2699 #[tokio::test]
2705 async fn test_schema_evolution_add_column() {
2706 use arrow_array::{Array, Int32Array};
2707
2708 let new_schema = Arc::new(
2710 Schema::builder()
2711 .with_schema_id(2)
2712 .with_fields(vec![
2713 NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)).into(),
2714 NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
2715 ])
2716 .build()
2717 .unwrap(),
2718 );
2719
2720 let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
2722 Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([(
2723 PARQUET_FIELD_ID_META_KEY.to_string(),
2724 "1".to_string(),
2725 )])),
2726 ]));
2727
2728 let tmp_dir = TempDir::new().unwrap();
2730 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2731 let file_io = FileIO::new_with_fs();
2732
2733 let data_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
2734 let to_write = RecordBatch::try_new(arrow_schema_old.clone(), vec![data_a]).unwrap();
2735
2736 let props = WriterProperties::builder()
2737 .set_compression(Compression::SNAPPY)
2738 .build();
2739 let file = File::create(format!("{table_location}/old_file.parquet")).unwrap();
2740 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
2741 writer.write(&to_write).expect("Writing batch");
2742 writer.close().unwrap();
2743
2744 let reader = ArrowReaderBuilder::new(file_io).build();
2746 let tasks = Box::pin(futures::stream::iter(
2747 vec![Ok(FileScanTask {
2748 file_size_in_bytes: std::fs::metadata(format!("{table_location}/old_file.parquet"))
2749 .unwrap()
2750 .len(),
2751 start: 0,
2752 length: 0,
2753 record_count: None,
2754 data_file_path: format!("{table_location}/old_file.parquet"),
2755 data_file_format: DataFileFormat::Parquet,
2756 schema: new_schema.clone(),
2757 project_field_ids: vec![1, 2], predicate: None,
2759 deletes: vec![],
2760 partition: None,
2761 partition_spec: None,
2762 name_mapping: None,
2763 case_sensitive: false,
2764 })]
2765 .into_iter(),
2766 )) as FileScanTaskStream;
2767
2768 let result = reader
2769 .read(tasks)
2770 .unwrap()
2771 .try_collect::<Vec<RecordBatch>>()
2772 .await
2773 .unwrap();
2774
2775 assert_eq!(result.len(), 1);
2777 let batch = &result[0];
2778
2779 assert_eq!(batch.num_columns(), 2);
2781 assert_eq!(batch.num_rows(), 3);
2782
2783 let col_a = batch
2785 .column(0)
2786 .as_primitive::<arrow_array::types::Int32Type>();
2787 assert_eq!(col_a.values(), &[1, 2, 3]);
2788
2789 let col_b = batch
2791 .column(1)
2792 .as_primitive::<arrow_array::types::Int32Type>();
2793 assert_eq!(col_b.null_count(), 3);
2794 assert!(col_b.is_null(0));
2795 assert!(col_b.is_null(1));
2796 assert!(col_b.is_null(2));
2797 }
2798
2799 #[tokio::test]
2817 async fn test_position_delete_across_multiple_row_groups() {
2818 use arrow_array::{Int32Array, Int64Array};
2819 use parquet::file::reader::{FileReader, SerializedFileReader};
2820
2821 const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2823 const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2824
2825 let tmp_dir = TempDir::new().unwrap();
2826 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2827
2828 let table_schema = Arc::new(
2830 Schema::builder()
2831 .with_schema_id(1)
2832 .with_fields(vec![
2833 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2834 ])
2835 .build()
2836 .unwrap(),
2837 );
2838
2839 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2840 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2841 PARQUET_FIELD_ID_META_KEY.to_string(),
2842 "1".to_string(),
2843 )])),
2844 ]));
2845
2846 let data_file_path = format!("{table_location}/data.parquet");
2850
2851 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2852 Int32Array::from_iter_values(1..=100),
2853 )])
2854 .unwrap();
2855
2856 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2857 Int32Array::from_iter_values(101..=200),
2858 )])
2859 .unwrap();
2860
2861 let props = WriterProperties::builder()
2863 .set_compression(Compression::SNAPPY)
2864 .set_max_row_group_size(100)
2865 .build();
2866
2867 let file = File::create(&data_file_path).unwrap();
2868 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2869 writer.write(&batch1).expect("Writing batch 1");
2870 writer.write(&batch2).expect("Writing batch 2");
2871 writer.close().unwrap();
2872
2873 let verify_file = File::open(&data_file_path).unwrap();
2875 let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2876 assert_eq!(
2877 verify_reader.metadata().num_row_groups(),
2878 2,
2879 "Should have 2 row groups"
2880 );
2881
2882 let delete_file_path = format!("{table_location}/deletes.parquet");
2884
2885 let delete_schema = Arc::new(ArrowSchema::new(vec![
2886 Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2887 PARQUET_FIELD_ID_META_KEY.to_string(),
2888 FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2889 )])),
2890 Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2891 PARQUET_FIELD_ID_META_KEY.to_string(),
2892 FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2893 )])),
2894 ]));
2895
2896 let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2898 Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2899 Arc::new(Int64Array::from_iter_values(vec![199i64])),
2900 ])
2901 .unwrap();
2902
2903 let delete_props = WriterProperties::builder()
2904 .set_compression(Compression::SNAPPY)
2905 .build();
2906
2907 let delete_file = File::create(&delete_file_path).unwrap();
2908 let mut delete_writer =
2909 ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2910 delete_writer.write(&delete_batch).unwrap();
2911 delete_writer.close().unwrap();
2912
2913 let file_io = FileIO::new_with_fs();
2915 let reader = ArrowReaderBuilder::new(file_io).build();
2916
2917 let task = FileScanTask {
2918 file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(),
2919 start: 0,
2920 length: 0,
2921 record_count: Some(200),
2922 data_file_path: data_file_path.clone(),
2923 data_file_format: DataFileFormat::Parquet,
2924 schema: table_schema.clone(),
2925 project_field_ids: vec![1],
2926 predicate: None,
2927 deletes: vec![FileScanTaskDeleteFile {
2928 file_size_in_bytes: std::fs::metadata(&delete_file_path).unwrap().len(),
2929 file_path: delete_file_path,
2930 file_type: DataContentType::PositionDeletes,
2931 partition_spec_id: 0,
2932 equality_ids: None,
2933 }],
2934 partition: None,
2935 partition_spec: None,
2936 name_mapping: None,
2937 case_sensitive: false,
2938 };
2939
2940 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
2941 let result = reader
2942 .read(tasks)
2943 .unwrap()
2944 .try_collect::<Vec<RecordBatch>>()
2945 .await
2946 .unwrap();
2947
2948 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
2950
2951 println!("Total rows read: {total_rows}");
2952 println!("Expected: 199 rows (deleted row 199 which had id=200)");
2953
2954 assert_eq!(
2956 total_rows, 199,
2957 "Expected 199 rows after deleting row 199, but got {total_rows} rows. \
2958 The bug causes position deletes in later row groups to be ignored."
2959 );
2960
2961 let all_ids: Vec<i32> = result
2963 .iter()
2964 .flat_map(|batch| {
2965 batch
2966 .column(0)
2967 .as_primitive::<arrow_array::types::Int32Type>()
2968 .values()
2969 .iter()
2970 .copied()
2971 })
2972 .collect();
2973
2974 assert!(
2975 !all_ids.contains(&200),
2976 "Row with id=200 should be deleted but was found in results"
2977 );
2978
2979 let expected_ids: Vec<i32> = (1..=199).collect();
2981 assert_eq!(
2982 all_ids, expected_ids,
2983 "Should have ids 1-199 but got different values"
2984 );
2985 }
2986
2987 #[tokio::test]
3013 async fn test_position_delete_with_row_group_selection() {
3014 use arrow_array::{Int32Array, Int64Array};
3015 use parquet::file::reader::{FileReader, SerializedFileReader};
3016
3017 const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
3019 const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
3020
3021 let tmp_dir = TempDir::new().unwrap();
3022 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3023
3024 let table_schema = Arc::new(
3026 Schema::builder()
3027 .with_schema_id(1)
3028 .with_fields(vec![
3029 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3030 ])
3031 .build()
3032 .unwrap(),
3033 );
3034
3035 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3036 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3037 PARQUET_FIELD_ID_META_KEY.to_string(),
3038 "1".to_string(),
3039 )])),
3040 ]));
3041
3042 let data_file_path = format!("{table_location}/data.parquet");
3046
3047 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3048 Int32Array::from_iter_values(1..=100),
3049 )])
3050 .unwrap();
3051
3052 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3053 Int32Array::from_iter_values(101..=200),
3054 )])
3055 .unwrap();
3056
3057 let props = WriterProperties::builder()
3059 .set_compression(Compression::SNAPPY)
3060 .set_max_row_group_size(100)
3061 .build();
3062
3063 let file = File::create(&data_file_path).unwrap();
3064 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3065 writer.write(&batch1).expect("Writing batch 1");
3066 writer.write(&batch2).expect("Writing batch 2");
3067 writer.close().unwrap();
3068
3069 let verify_file = File::open(&data_file_path).unwrap();
3071 let verify_reader = SerializedFileReader::new(verify_file).unwrap();
3072 assert_eq!(
3073 verify_reader.metadata().num_row_groups(),
3074 2,
3075 "Should have 2 row groups"
3076 );
3077
3078 let delete_file_path = format!("{table_location}/deletes.parquet");
3080
3081 let delete_schema = Arc::new(ArrowSchema::new(vec![
3082 Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
3083 PARQUET_FIELD_ID_META_KEY.to_string(),
3084 FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
3085 )])),
3086 Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
3087 PARQUET_FIELD_ID_META_KEY.to_string(),
3088 FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
3089 )])),
3090 ]));
3091
3092 let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
3094 Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
3095 Arc::new(Int64Array::from_iter_values(vec![199i64])),
3096 ])
3097 .unwrap();
3098
3099 let delete_props = WriterProperties::builder()
3100 .set_compression(Compression::SNAPPY)
3101 .build();
3102
3103 let delete_file = File::create(&delete_file_path).unwrap();
3104 let mut delete_writer =
3105 ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
3106 delete_writer.write(&delete_batch).unwrap();
3107 delete_writer.close().unwrap();
3108
3109 let metadata_file = File::open(&data_file_path).unwrap();
3112 let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
3113 let metadata = metadata_reader.metadata();
3114
3115 let row_group_0 = metadata.row_group(0);
3116 let row_group_1 = metadata.row_group(1);
3117
3118 let rg0_start = 4u64; let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
3120 let rg1_length = row_group_1.compressed_size() as u64;
3121
3122 println!(
3123 "Row group 0: starts at byte {}, {} bytes compressed",
3124 rg0_start,
3125 row_group_0.compressed_size()
3126 );
3127 println!(
3128 "Row group 1: starts at byte {}, {} bytes compressed",
3129 rg1_start,
3130 row_group_1.compressed_size()
3131 );
3132
3133 let file_io = FileIO::new_with_fs();
3134 let reader = ArrowReaderBuilder::new(file_io).build();
3135
3136 let task = FileScanTask {
3138 file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(),
3139 start: rg1_start,
3140 length: rg1_length,
3141 record_count: Some(100), data_file_path: data_file_path.clone(),
3143 data_file_format: DataFileFormat::Parquet,
3144 schema: table_schema.clone(),
3145 project_field_ids: vec![1],
3146 predicate: None,
3147 deletes: vec![FileScanTaskDeleteFile {
3148 file_size_in_bytes: std::fs::metadata(&delete_file_path).unwrap().len(),
3149 file_path: delete_file_path,
3150 file_type: DataContentType::PositionDeletes,
3151 partition_spec_id: 0,
3152 equality_ids: None,
3153 }],
3154 partition: None,
3155 partition_spec: None,
3156 name_mapping: None,
3157 case_sensitive: false,
3158 };
3159
3160 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
3161 let result = reader
3162 .read(tasks)
3163 .unwrap()
3164 .try_collect::<Vec<RecordBatch>>()
3165 .await
3166 .unwrap();
3167
3168 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
3171
3172 println!("Total rows read from row group 1: {total_rows}");
3173 println!("Expected: 99 rows (row group 1 has 100 rows, 1 delete at position 199)");
3174
3175 assert_eq!(
3177 total_rows, 99,
3178 "Expected 99 rows from row group 1 after deleting position 199, but got {total_rows} rows. \
3179 The bug causes position deletes to be lost when advance_to() is followed by next() \
3180 when skipping unselected row groups."
3181 );
3182
3183 let all_ids: Vec<i32> = result
3185 .iter()
3186 .flat_map(|batch| {
3187 batch
3188 .column(0)
3189 .as_primitive::<arrow_array::types::Int32Type>()
3190 .values()
3191 .iter()
3192 .copied()
3193 })
3194 .collect();
3195
3196 assert!(
3197 !all_ids.contains(&200),
3198 "Row with id=200 should be deleted but was found in results"
3199 );
3200
3201 let expected_ids: Vec<i32> = (101..=199).collect();
3203 assert_eq!(
3204 all_ids, expected_ids,
3205 "Should have ids 101-199 but got different values"
3206 );
3207 }
3208 #[tokio::test]
3237 async fn test_position_delete_in_skipped_row_group() {
3238 use arrow_array::{Int32Array, Int64Array};
3239 use parquet::file::reader::{FileReader, SerializedFileReader};
3240
3241 const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
3243 const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
3244
3245 let tmp_dir = TempDir::new().unwrap();
3246 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3247
3248 let table_schema = Arc::new(
3250 Schema::builder()
3251 .with_schema_id(1)
3252 .with_fields(vec![
3253 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3254 ])
3255 .build()
3256 .unwrap(),
3257 );
3258
3259 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3260 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3261 PARQUET_FIELD_ID_META_KEY.to_string(),
3262 "1".to_string(),
3263 )])),
3264 ]));
3265
3266 let data_file_path = format!("{table_location}/data.parquet");
3270
3271 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3272 Int32Array::from_iter_values(1..=100),
3273 )])
3274 .unwrap();
3275
3276 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3277 Int32Array::from_iter_values(101..=200),
3278 )])
3279 .unwrap();
3280
3281 let props = WriterProperties::builder()
3283 .set_compression(Compression::SNAPPY)
3284 .set_max_row_group_size(100)
3285 .build();
3286
3287 let file = File::create(&data_file_path).unwrap();
3288 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3289 writer.write(&batch1).expect("Writing batch 1");
3290 writer.write(&batch2).expect("Writing batch 2");
3291 writer.close().unwrap();
3292
3293 let verify_file = File::open(&data_file_path).unwrap();
3295 let verify_reader = SerializedFileReader::new(verify_file).unwrap();
3296 assert_eq!(
3297 verify_reader.metadata().num_row_groups(),
3298 2,
3299 "Should have 2 row groups"
3300 );
3301
3302 let delete_file_path = format!("{table_location}/deletes.parquet");
3304
3305 let delete_schema = Arc::new(ArrowSchema::new(vec![
3306 Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
3307 PARQUET_FIELD_ID_META_KEY.to_string(),
3308 FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
3309 )])),
3310 Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
3311 PARQUET_FIELD_ID_META_KEY.to_string(),
3312 FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
3313 )])),
3314 ]));
3315
3316 let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
3318 Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
3319 Arc::new(Int64Array::from_iter_values(vec![0i64])),
3320 ])
3321 .unwrap();
3322
3323 let delete_props = WriterProperties::builder()
3324 .set_compression(Compression::SNAPPY)
3325 .build();
3326
3327 let delete_file = File::create(&delete_file_path).unwrap();
3328 let mut delete_writer =
3329 ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
3330 delete_writer.write(&delete_batch).unwrap();
3331 delete_writer.close().unwrap();
3332
3333 let metadata_file = File::open(&data_file_path).unwrap();
3336 let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
3337 let metadata = metadata_reader.metadata();
3338
3339 let row_group_0 = metadata.row_group(0);
3340 let row_group_1 = metadata.row_group(1);
3341
3342 let rg0_start = 4u64; let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
3344 let rg1_length = row_group_1.compressed_size() as u64;
3345
3346 let file_io = FileIO::new_with_fs();
3347 let reader = ArrowReaderBuilder::new(file_io).build();
3348
3349 let task = FileScanTask {
3351 file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(),
3352 start: rg1_start,
3353 length: rg1_length,
3354 record_count: Some(100), data_file_path: data_file_path.clone(),
3356 data_file_format: DataFileFormat::Parquet,
3357 schema: table_schema.clone(),
3358 project_field_ids: vec![1],
3359 predicate: None,
3360 deletes: vec![FileScanTaskDeleteFile {
3361 file_size_in_bytes: std::fs::metadata(&delete_file_path).unwrap().len(),
3362 file_path: delete_file_path,
3363 file_type: DataContentType::PositionDeletes,
3364 partition_spec_id: 0,
3365 equality_ids: None,
3366 }],
3367 partition: None,
3368 partition_spec: None,
3369 name_mapping: None,
3370 case_sensitive: false,
3371 };
3372
3373 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
3374 let result = reader
3375 .read(tasks)
3376 .unwrap()
3377 .try_collect::<Vec<RecordBatch>>()
3378 .await
3379 .unwrap();
3380
3381 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
3384
3385 assert_eq!(
3386 total_rows, 100,
3387 "Expected 100 rows from row group 1 (delete at position 0 is in skipped row group 0). \
3388 If this hangs or fails, it indicates the cached delete index was not updated after advance_to()."
3389 );
3390
3391 let all_ids: Vec<i32> = result
3393 .iter()
3394 .flat_map(|batch| {
3395 batch
3396 .column(0)
3397 .as_primitive::<arrow_array::types::Int32Type>()
3398 .values()
3399 .iter()
3400 .copied()
3401 })
3402 .collect();
3403
3404 let expected_ids: Vec<i32> = (101..=200).collect();
3405 assert_eq!(
3406 all_ids, expected_ids,
3407 "Should have ids 101-200 (all of row group 1)"
3408 );
3409 }
3410
3411 #[tokio::test]
3417 async fn test_read_parquet_file_without_field_ids() {
3418 let schema = Arc::new(
3419 Schema::builder()
3420 .with_schema_id(1)
3421 .with_fields(vec![
3422 NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3423 NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3424 ])
3425 .build()
3426 .unwrap(),
3427 );
3428
3429 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3431 Field::new("name", DataType::Utf8, false),
3432 Field::new("age", DataType::Int32, false),
3433 ]));
3434
3435 let tmp_dir = TempDir::new().unwrap();
3436 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3437 let file_io = FileIO::new_with_fs();
3438
3439 let name_data = vec!["Alice", "Bob", "Charlie"];
3440 let age_data = vec![30, 25, 35];
3441
3442 use arrow_array::Int32Array;
3443 let name_col = Arc::new(StringArray::from(name_data.clone())) as ArrayRef;
3444 let age_col = Arc::new(Int32Array::from(age_data.clone())) as ArrayRef;
3445
3446 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![name_col, age_col]).unwrap();
3447
3448 let props = WriterProperties::builder()
3449 .set_compression(Compression::SNAPPY)
3450 .build();
3451
3452 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3453 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3454
3455 writer.write(&to_write).expect("Writing batch");
3456 writer.close().unwrap();
3457
3458 let reader = ArrowReaderBuilder::new(file_io).build();
3459
3460 let tasks = Box::pin(futures::stream::iter(
3461 vec![Ok(FileScanTask {
3462 file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3463 .unwrap()
3464 .len(),
3465 start: 0,
3466 length: 0,
3467 record_count: None,
3468 data_file_path: format!("{table_location}/1.parquet"),
3469 data_file_format: DataFileFormat::Parquet,
3470 schema: schema.clone(),
3471 project_field_ids: vec![1, 2],
3472 predicate: None,
3473 deletes: vec![],
3474 partition: None,
3475 partition_spec: None,
3476 name_mapping: None,
3477 case_sensitive: false,
3478 })]
3479 .into_iter(),
3480 )) as FileScanTaskStream;
3481
3482 let result = reader
3483 .read(tasks)
3484 .unwrap()
3485 .try_collect::<Vec<RecordBatch>>()
3486 .await
3487 .unwrap();
3488
3489 assert_eq!(result.len(), 1);
3490 let batch = &result[0];
3491 assert_eq!(batch.num_rows(), 3);
3492 assert_eq!(batch.num_columns(), 2);
3493
3494 let name_array = batch.column(0).as_string::<i32>();
3496 assert_eq!(name_array.value(0), "Alice");
3497 assert_eq!(name_array.value(1), "Bob");
3498 assert_eq!(name_array.value(2), "Charlie");
3499
3500 let age_array = batch
3501 .column(1)
3502 .as_primitive::<arrow_array::types::Int32Type>();
3503 assert_eq!(age_array.value(0), 30);
3504 assert_eq!(age_array.value(1), 25);
3505 assert_eq!(age_array.value(2), 35);
3506 }
3507
3508 #[tokio::test]
3512 async fn test_read_parquet_without_field_ids_partial_projection() {
3513 use arrow_array::Int32Array;
3514
3515 let schema = Arc::new(
3516 Schema::builder()
3517 .with_schema_id(1)
3518 .with_fields(vec![
3519 NestedField::required(1, "col1", Type::Primitive(PrimitiveType::String)).into(),
3520 NestedField::required(2, "col2", Type::Primitive(PrimitiveType::Int)).into(),
3521 NestedField::required(3, "col3", Type::Primitive(PrimitiveType::String)).into(),
3522 NestedField::required(4, "col4", Type::Primitive(PrimitiveType::Int)).into(),
3523 ])
3524 .build()
3525 .unwrap(),
3526 );
3527
3528 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3529 Field::new("col1", DataType::Utf8, false),
3530 Field::new("col2", DataType::Int32, false),
3531 Field::new("col3", DataType::Utf8, false),
3532 Field::new("col4", DataType::Int32, false),
3533 ]));
3534
3535 let tmp_dir = TempDir::new().unwrap();
3536 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3537 let file_io = FileIO::new_with_fs();
3538
3539 let col1_data = Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef;
3540 let col2_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3541 let col3_data = Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef;
3542 let col4_data = Arc::new(Int32Array::from(vec![30, 40])) as ArrayRef;
3543
3544 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
3545 col1_data, col2_data, col3_data, col4_data,
3546 ])
3547 .unwrap();
3548
3549 let props = WriterProperties::builder()
3550 .set_compression(Compression::SNAPPY)
3551 .build();
3552
3553 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3554 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3555
3556 writer.write(&to_write).expect("Writing batch");
3557 writer.close().unwrap();
3558
3559 let reader = ArrowReaderBuilder::new(file_io).build();
3560
3561 let tasks = Box::pin(futures::stream::iter(
3562 vec![Ok(FileScanTask {
3563 file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3564 .unwrap()
3565 .len(),
3566 start: 0,
3567 length: 0,
3568 record_count: None,
3569 data_file_path: format!("{table_location}/1.parquet"),
3570 data_file_format: DataFileFormat::Parquet,
3571 schema: schema.clone(),
3572 project_field_ids: vec![1, 3],
3573 predicate: None,
3574 deletes: vec![],
3575 partition: None,
3576 partition_spec: None,
3577 name_mapping: None,
3578 case_sensitive: false,
3579 })]
3580 .into_iter(),
3581 )) as FileScanTaskStream;
3582
3583 let result = reader
3584 .read(tasks)
3585 .unwrap()
3586 .try_collect::<Vec<RecordBatch>>()
3587 .await
3588 .unwrap();
3589
3590 assert_eq!(result.len(), 1);
3591 let batch = &result[0];
3592 assert_eq!(batch.num_rows(), 2);
3593 assert_eq!(batch.num_columns(), 2);
3594
3595 let col1_array = batch.column(0).as_string::<i32>();
3596 assert_eq!(col1_array.value(0), "a");
3597 assert_eq!(col1_array.value(1), "b");
3598
3599 let col3_array = batch.column(1).as_string::<i32>();
3600 assert_eq!(col3_array.value(0), "c");
3601 assert_eq!(col3_array.value(1), "d");
3602 }
3603
3604 #[tokio::test]
3608 async fn test_read_parquet_without_field_ids_schema_evolution() {
3609 use arrow_array::{Array, Int32Array};
3610
3611 let schema = Arc::new(
3613 Schema::builder()
3614 .with_schema_id(1)
3615 .with_fields(vec![
3616 NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3617 NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3618 NestedField::optional(3, "city", Type::Primitive(PrimitiveType::String)).into(),
3619 ])
3620 .build()
3621 .unwrap(),
3622 );
3623
3624 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3625 Field::new("name", DataType::Utf8, false),
3626 Field::new("age", DataType::Int32, false),
3627 ]));
3628
3629 let tmp_dir = TempDir::new().unwrap();
3630 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3631 let file_io = FileIO::new_with_fs();
3632
3633 let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3634 let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3635
3636 let to_write =
3637 RecordBatch::try_new(arrow_schema.clone(), vec![name_data, age_data]).unwrap();
3638
3639 let props = WriterProperties::builder()
3640 .set_compression(Compression::SNAPPY)
3641 .build();
3642
3643 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3644 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3645
3646 writer.write(&to_write).expect("Writing batch");
3647 writer.close().unwrap();
3648
3649 let reader = ArrowReaderBuilder::new(file_io).build();
3650
3651 let tasks = Box::pin(futures::stream::iter(
3652 vec![Ok(FileScanTask {
3653 file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3654 .unwrap()
3655 .len(),
3656 start: 0,
3657 length: 0,
3658 record_count: None,
3659 data_file_path: format!("{table_location}/1.parquet"),
3660 data_file_format: DataFileFormat::Parquet,
3661 schema: schema.clone(),
3662 project_field_ids: vec![1, 2, 3],
3663 predicate: None,
3664 deletes: vec![],
3665 partition: None,
3666 partition_spec: None,
3667 name_mapping: None,
3668 case_sensitive: false,
3669 })]
3670 .into_iter(),
3671 )) as FileScanTaskStream;
3672
3673 let result = reader
3674 .read(tasks)
3675 .unwrap()
3676 .try_collect::<Vec<RecordBatch>>()
3677 .await
3678 .unwrap();
3679
3680 assert_eq!(result.len(), 1);
3681 let batch = &result[0];
3682 assert_eq!(batch.num_rows(), 2);
3683 assert_eq!(batch.num_columns(), 3);
3684
3685 let name_array = batch.column(0).as_string::<i32>();
3686 assert_eq!(name_array.value(0), "Alice");
3687 assert_eq!(name_array.value(1), "Bob");
3688
3689 let age_array = batch
3690 .column(1)
3691 .as_primitive::<arrow_array::types::Int32Type>();
3692 assert_eq!(age_array.value(0), 30);
3693 assert_eq!(age_array.value(1), 25);
3694
3695 let city_array = batch.column(2).as_string::<i32>();
3697 assert_eq!(city_array.null_count(), 2);
3698 assert!(city_array.is_null(0));
3699 assert!(city_array.is_null(1));
3700 }
3701
3702 #[tokio::test]
3705 async fn test_read_parquet_without_field_ids_multiple_row_groups() {
3706 use arrow_array::Int32Array;
3707
3708 let schema = Arc::new(
3709 Schema::builder()
3710 .with_schema_id(1)
3711 .with_fields(vec![
3712 NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3713 NestedField::required(2, "value", Type::Primitive(PrimitiveType::Int)).into(),
3714 ])
3715 .build()
3716 .unwrap(),
3717 );
3718
3719 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3720 Field::new("name", DataType::Utf8, false),
3721 Field::new("value", DataType::Int32, false),
3722 ]));
3723
3724 let tmp_dir = TempDir::new().unwrap();
3725 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3726 let file_io = FileIO::new_with_fs();
3727
3728 let props = WriterProperties::builder()
3730 .set_compression(Compression::SNAPPY)
3731 .set_write_batch_size(2)
3732 .set_max_row_group_size(2)
3733 .build();
3734
3735 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3736 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3737
3738 for batch_num in 0..3 {
3740 let name_data = Arc::new(StringArray::from(vec![
3741 format!("name_{}", batch_num * 2),
3742 format!("name_{}", batch_num * 2 + 1),
3743 ])) as ArrayRef;
3744 let value_data =
3745 Arc::new(Int32Array::from(vec![batch_num * 2, batch_num * 2 + 1])) as ArrayRef;
3746
3747 let batch =
3748 RecordBatch::try_new(arrow_schema.clone(), vec![name_data, value_data]).unwrap();
3749 writer.write(&batch).expect("Writing batch");
3750 }
3751 writer.close().unwrap();
3752
3753 let reader = ArrowReaderBuilder::new(file_io).build();
3754
3755 let tasks = Box::pin(futures::stream::iter(
3756 vec![Ok(FileScanTask {
3757 file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3758 .unwrap()
3759 .len(),
3760 start: 0,
3761 length: 0,
3762 record_count: None,
3763 data_file_path: format!("{table_location}/1.parquet"),
3764 data_file_format: DataFileFormat::Parquet,
3765 schema: schema.clone(),
3766 project_field_ids: vec![1, 2],
3767 predicate: None,
3768 deletes: vec![],
3769 partition: None,
3770 partition_spec: None,
3771 name_mapping: None,
3772 case_sensitive: false,
3773 })]
3774 .into_iter(),
3775 )) as FileScanTaskStream;
3776
3777 let result = reader
3778 .read(tasks)
3779 .unwrap()
3780 .try_collect::<Vec<RecordBatch>>()
3781 .await
3782 .unwrap();
3783
3784 assert!(!result.is_empty());
3785
3786 let mut all_names = Vec::new();
3787 let mut all_values = Vec::new();
3788
3789 for batch in &result {
3790 let name_array = batch.column(0).as_string::<i32>();
3791 let value_array = batch
3792 .column(1)
3793 .as_primitive::<arrow_array::types::Int32Type>();
3794
3795 for i in 0..batch.num_rows() {
3796 all_names.push(name_array.value(i).to_string());
3797 all_values.push(value_array.value(i));
3798 }
3799 }
3800
3801 assert_eq!(all_names.len(), 6);
3802 assert_eq!(all_values.len(), 6);
3803
3804 for i in 0..6 {
3805 assert_eq!(all_names[i], format!("name_{i}"));
3806 assert_eq!(all_values[i], i as i32);
3807 }
3808 }
3809
3810 #[tokio::test]
3814 async fn test_read_parquet_without_field_ids_with_struct() {
3815 use arrow_array::{Int32Array, StructArray};
3816 use arrow_schema::Fields;
3817
3818 let schema = Arc::new(
3819 Schema::builder()
3820 .with_schema_id(1)
3821 .with_fields(vec![
3822 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3823 NestedField::required(
3824 2,
3825 "person",
3826 Type::Struct(crate::spec::StructType::new(vec![
3827 NestedField::required(
3828 3,
3829 "name",
3830 Type::Primitive(PrimitiveType::String),
3831 )
3832 .into(),
3833 NestedField::required(4, "age", Type::Primitive(PrimitiveType::Int))
3834 .into(),
3835 ])),
3836 )
3837 .into(),
3838 ])
3839 .build()
3840 .unwrap(),
3841 );
3842
3843 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3844 Field::new("id", DataType::Int32, false),
3845 Field::new(
3846 "person",
3847 DataType::Struct(Fields::from(vec![
3848 Field::new("name", DataType::Utf8, false),
3849 Field::new("age", DataType::Int32, false),
3850 ])),
3851 false,
3852 ),
3853 ]));
3854
3855 let tmp_dir = TempDir::new().unwrap();
3856 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3857 let file_io = FileIO::new_with_fs();
3858
3859 let id_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3860 let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3861 let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3862 let person_data = Arc::new(StructArray::from(vec![
3863 (
3864 Arc::new(Field::new("name", DataType::Utf8, false)),
3865 name_data,
3866 ),
3867 (
3868 Arc::new(Field::new("age", DataType::Int32, false)),
3869 age_data,
3870 ),
3871 ])) as ArrayRef;
3872
3873 let to_write =
3874 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, person_data]).unwrap();
3875
3876 let props = WriterProperties::builder()
3877 .set_compression(Compression::SNAPPY)
3878 .build();
3879
3880 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3881 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3882
3883 writer.write(&to_write).expect("Writing batch");
3884 writer.close().unwrap();
3885
3886 let reader = ArrowReaderBuilder::new(file_io).build();
3887
3888 let tasks = Box::pin(futures::stream::iter(
3889 vec![Ok(FileScanTask {
3890 file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3891 .unwrap()
3892 .len(),
3893 start: 0,
3894 length: 0,
3895 record_count: None,
3896 data_file_path: format!("{table_location}/1.parquet"),
3897 data_file_format: DataFileFormat::Parquet,
3898 schema: schema.clone(),
3899 project_field_ids: vec![1, 2],
3900 predicate: None,
3901 deletes: vec![],
3902 partition: None,
3903 partition_spec: None,
3904 name_mapping: None,
3905 case_sensitive: false,
3906 })]
3907 .into_iter(),
3908 )) as FileScanTaskStream;
3909
3910 let result = reader
3911 .read(tasks)
3912 .unwrap()
3913 .try_collect::<Vec<RecordBatch>>()
3914 .await
3915 .unwrap();
3916
3917 assert_eq!(result.len(), 1);
3918 let batch = &result[0];
3919 assert_eq!(batch.num_rows(), 2);
3920 assert_eq!(batch.num_columns(), 2);
3921
3922 let id_array = batch
3923 .column(0)
3924 .as_primitive::<arrow_array::types::Int32Type>();
3925 assert_eq!(id_array.value(0), 1);
3926 assert_eq!(id_array.value(1), 2);
3927
3928 let person_array = batch.column(1).as_struct();
3929 assert_eq!(person_array.num_columns(), 2);
3930
3931 let name_array = person_array.column(0).as_string::<i32>();
3932 assert_eq!(name_array.value(0), "Alice");
3933 assert_eq!(name_array.value(1), "Bob");
3934
3935 let age_array = person_array
3936 .column(1)
3937 .as_primitive::<arrow_array::types::Int32Type>();
3938 assert_eq!(age_array.value(0), 30);
3939 assert_eq!(age_array.value(1), 25);
3940 }
3941
3942 #[tokio::test]
3946 async fn test_read_parquet_without_field_ids_schema_evolution_add_column_in_middle() {
3947 use arrow_array::{Array, Int32Array};
3948
3949 let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
3950 Field::new("col0", DataType::Int32, true),
3951 Field::new("col1", DataType::Int32, true),
3952 ]));
3953
3954 let schema = Arc::new(
3956 Schema::builder()
3957 .with_schema_id(1)
3958 .with_fields(vec![
3959 NestedField::optional(1, "col0", Type::Primitive(PrimitiveType::Int)).into(),
3960 NestedField::optional(5, "newCol", Type::Primitive(PrimitiveType::Int)).into(),
3961 NestedField::optional(2, "col1", Type::Primitive(PrimitiveType::Int)).into(),
3962 ])
3963 .build()
3964 .unwrap(),
3965 );
3966
3967 let tmp_dir = TempDir::new().unwrap();
3968 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3969 let file_io = FileIO::new_with_fs();
3970
3971 let col0_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3972 let col1_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3973
3974 let to_write =
3975 RecordBatch::try_new(arrow_schema_old.clone(), vec![col0_data, col1_data]).unwrap();
3976
3977 let props = WriterProperties::builder()
3978 .set_compression(Compression::SNAPPY)
3979 .build();
3980
3981 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3982 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3983 writer.write(&to_write).expect("Writing batch");
3984 writer.close().unwrap();
3985
3986 let reader = ArrowReaderBuilder::new(file_io).build();
3987
3988 let tasks = Box::pin(futures::stream::iter(
3989 vec![Ok(FileScanTask {
3990 file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
3991 .unwrap()
3992 .len(),
3993 start: 0,
3994 length: 0,
3995 record_count: None,
3996 data_file_path: format!("{table_location}/1.parquet"),
3997 data_file_format: DataFileFormat::Parquet,
3998 schema: schema.clone(),
3999 project_field_ids: vec![1, 5, 2],
4000 predicate: None,
4001 deletes: vec![],
4002 partition: None,
4003 partition_spec: None,
4004 name_mapping: None,
4005 case_sensitive: false,
4006 })]
4007 .into_iter(),
4008 )) as FileScanTaskStream;
4009
4010 let result = reader
4011 .read(tasks)
4012 .unwrap()
4013 .try_collect::<Vec<RecordBatch>>()
4014 .await
4015 .unwrap();
4016
4017 assert_eq!(result.len(), 1);
4018 let batch = &result[0];
4019 assert_eq!(batch.num_rows(), 2);
4020 assert_eq!(batch.num_columns(), 3);
4021
4022 let result_col0 = batch
4023 .column(0)
4024 .as_primitive::<arrow_array::types::Int32Type>();
4025 assert_eq!(result_col0.value(0), 1);
4026 assert_eq!(result_col0.value(1), 2);
4027
4028 let result_newcol = batch
4030 .column(1)
4031 .as_primitive::<arrow_array::types::Int32Type>();
4032 assert_eq!(result_newcol.null_count(), 2);
4033 assert!(result_newcol.is_null(0));
4034 assert!(result_newcol.is_null(1));
4035
4036 let result_col1 = batch
4037 .column(2)
4038 .as_primitive::<arrow_array::types::Int32Type>();
4039 assert_eq!(result_col1.value(0), 10);
4040 assert_eq!(result_col1.value(1), 20);
4041 }
4042
4043 #[tokio::test]
4047 async fn test_read_parquet_without_field_ids_filter_eliminates_all_rows() {
4048 use arrow_array::{Float64Array, Int32Array};
4049
4050 let schema = Arc::new(
4052 Schema::builder()
4053 .with_schema_id(1)
4054 .with_fields(vec![
4055 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
4056 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
4057 NestedField::required(3, "value", Type::Primitive(PrimitiveType::Double))
4058 .into(),
4059 ])
4060 .build()
4061 .unwrap(),
4062 );
4063
4064 let arrow_schema = Arc::new(ArrowSchema::new(vec![
4065 Field::new("id", DataType::Int32, false),
4066 Field::new("name", DataType::Utf8, false),
4067 Field::new("value", DataType::Float64, false),
4068 ]));
4069
4070 let tmp_dir = TempDir::new().unwrap();
4071 let table_location = tmp_dir.path().to_str().unwrap().to_string();
4072 let file_io = FileIO::new_with_fs();
4073
4074 let id_data = Arc::new(Int32Array::from(vec![10, 11, 12])) as ArrayRef;
4076 let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
4077 let value_data = Arc::new(Float64Array::from(vec![100.0, 200.0, 300.0])) as ArrayRef;
4078
4079 let to_write =
4080 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data, value_data])
4081 .unwrap();
4082
4083 let props = WriterProperties::builder()
4084 .set_compression(Compression::SNAPPY)
4085 .build();
4086
4087 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
4088 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
4089 writer.write(&to_write).expect("Writing batch");
4090 writer.close().unwrap();
4091
4092 let predicate = Reference::new("id").less_than(Datum::int(5));
4094
4095 let reader = ArrowReaderBuilder::new(file_io)
4097 .with_row_group_filtering_enabled(true)
4098 .with_row_selection_enabled(true)
4099 .build();
4100
4101 let tasks = Box::pin(futures::stream::iter(
4102 vec![Ok(FileScanTask {
4103 file_size_in_bytes: std::fs::metadata(format!("{table_location}/1.parquet"))
4104 .unwrap()
4105 .len(),
4106 start: 0,
4107 length: 0,
4108 record_count: None,
4109 data_file_path: format!("{table_location}/1.parquet"),
4110 data_file_format: DataFileFormat::Parquet,
4111 schema: schema.clone(),
4112 project_field_ids: vec![1, 2, 3],
4113 predicate: Some(predicate.bind(schema, true).unwrap()),
4114 deletes: vec![],
4115 partition: None,
4116 partition_spec: None,
4117 name_mapping: None,
4118 case_sensitive: false,
4119 })]
4120 .into_iter(),
4121 )) as FileScanTaskStream;
4122
4123 let result = reader
4125 .read(tasks)
4126 .unwrap()
4127 .try_collect::<Vec<RecordBatch>>()
4128 .await
4129 .unwrap();
4130
4131 assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0));
4133 }
4134
4135 #[tokio::test]
4138 async fn test_read_with_concurrency_one() {
4139 use arrow_array::Int32Array;
4140
4141 let schema = Arc::new(
4142 Schema::builder()
4143 .with_schema_id(1)
4144 .with_fields(vec![
4145 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
4146 NestedField::required(2, "file_num", Type::Primitive(PrimitiveType::Int))
4147 .into(),
4148 ])
4149 .build()
4150 .unwrap(),
4151 );
4152
4153 let arrow_schema = Arc::new(ArrowSchema::new(vec![
4154 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
4155 PARQUET_FIELD_ID_META_KEY.to_string(),
4156 "1".to_string(),
4157 )])),
4158 Field::new("file_num", DataType::Int32, false).with_metadata(HashMap::from([(
4159 PARQUET_FIELD_ID_META_KEY.to_string(),
4160 "2".to_string(),
4161 )])),
4162 ]));
4163
4164 let tmp_dir = TempDir::new().unwrap();
4165 let table_location = tmp_dir.path().to_str().unwrap().to_string();
4166 let file_io = FileIO::new_with_fs();
4167
4168 let props = WriterProperties::builder()
4170 .set_compression(Compression::SNAPPY)
4171 .build();
4172
4173 for file_num in 0..3 {
4174 let id_data = Arc::new(Int32Array::from_iter_values(
4175 file_num * 10..(file_num + 1) * 10,
4176 )) as ArrayRef;
4177 let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10])) as ArrayRef;
4178
4179 let to_write =
4180 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, file_num_data]).unwrap();
4181
4182 let file = File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap();
4183 let mut writer =
4184 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
4185 writer.write(&to_write).expect("Writing batch");
4186 writer.close().unwrap();
4187 }
4188
4189 let reader = ArrowReaderBuilder::new(file_io)
4191 .with_data_file_concurrency_limit(1)
4192 .build();
4193
4194 let tasks = vec![
4196 Ok(FileScanTask {
4197 file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_0.parquet"))
4198 .unwrap()
4199 .len(),
4200 start: 0,
4201 length: 0,
4202 record_count: None,
4203 data_file_path: format!("{table_location}/file_0.parquet"),
4204 data_file_format: DataFileFormat::Parquet,
4205 schema: schema.clone(),
4206 project_field_ids: vec![1, 2],
4207 predicate: None,
4208 deletes: vec![],
4209 partition: None,
4210 partition_spec: None,
4211 name_mapping: None,
4212 case_sensitive: false,
4213 }),
4214 Ok(FileScanTask {
4215 file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_1.parquet"))
4216 .unwrap()
4217 .len(),
4218 start: 0,
4219 length: 0,
4220 record_count: None,
4221 data_file_path: format!("{table_location}/file_1.parquet"),
4222 data_file_format: DataFileFormat::Parquet,
4223 schema: schema.clone(),
4224 project_field_ids: vec![1, 2],
4225 predicate: None,
4226 deletes: vec![],
4227 partition: None,
4228 partition_spec: None,
4229 name_mapping: None,
4230 case_sensitive: false,
4231 }),
4232 Ok(FileScanTask {
4233 file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_2.parquet"))
4234 .unwrap()
4235 .len(),
4236 start: 0,
4237 length: 0,
4238 record_count: None,
4239 data_file_path: format!("{table_location}/file_2.parquet"),
4240 data_file_format: DataFileFormat::Parquet,
4241 schema: schema.clone(),
4242 project_field_ids: vec![1, 2],
4243 predicate: None,
4244 deletes: vec![],
4245 partition: None,
4246 partition_spec: None,
4247 name_mapping: None,
4248 case_sensitive: false,
4249 }),
4250 ];
4251
4252 let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream;
4253
4254 let result = reader
4255 .read(tasks_stream)
4256 .unwrap()
4257 .try_collect::<Vec<RecordBatch>>()
4258 .await
4259 .unwrap();
4260
4261 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
4263 assert_eq!(total_rows, 30, "Should have 30 total rows");
4264
4265 let mut all_ids = Vec::new();
4267 let mut all_file_nums = Vec::new();
4268
4269 for batch in &result {
4270 let id_col = batch
4271 .column(0)
4272 .as_primitive::<arrow_array::types::Int32Type>();
4273 let file_num_col = batch
4274 .column(1)
4275 .as_primitive::<arrow_array::types::Int32Type>();
4276
4277 for i in 0..batch.num_rows() {
4278 all_ids.push(id_col.value(i));
4279 all_file_nums.push(file_num_col.value(i));
4280 }
4281 }
4282
4283 assert_eq!(all_ids.len(), 30);
4284 assert_eq!(all_file_nums.len(), 30);
4285
4286 for i in 0..10 {
4291 assert_eq!(all_file_nums[i], 0, "First 10 rows should be from file_0");
4292 assert_eq!(all_ids[i], i as i32, "IDs should be 0-9");
4293 }
4294 for i in 10..20 {
4295 assert_eq!(all_file_nums[i], 1, "Next 10 rows should be from file_1");
4296 assert_eq!(all_ids[i], i as i32, "IDs should be 10-19");
4297 }
4298 for i in 20..30 {
4299 assert_eq!(all_file_nums[i], 2, "Last 10 rows should be from file_2");
4300 assert_eq!(all_ids[i], i as i32, "IDs should be 20-29");
4301 }
4302 }
4303
4304 #[tokio::test]
4349 async fn test_bucket_partitioning_reads_source_column_from_file() {
4350 use arrow_array::Int32Array;
4351
4352 use crate::spec::{Literal, PartitionSpec, Struct, Transform};
4353
4354 let schema = Arc::new(
4356 Schema::builder()
4357 .with_schema_id(0)
4358 .with_fields(vec![
4359 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
4360 NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
4361 ])
4362 .build()
4363 .unwrap(),
4364 );
4365
4366 let partition_spec = Arc::new(
4368 PartitionSpec::builder(schema.clone())
4369 .with_spec_id(0)
4370 .add_partition_field("id", "id_bucket", Transform::Bucket(4))
4371 .unwrap()
4372 .build()
4373 .unwrap(),
4374 );
4375
4376 let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]);
4378
4379 let arrow_schema = Arc::new(ArrowSchema::new(vec![
4381 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
4382 PARQUET_FIELD_ID_META_KEY.to_string(),
4383 "1".to_string(),
4384 )])),
4385 Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
4386 PARQUET_FIELD_ID_META_KEY.to_string(),
4387 "2".to_string(),
4388 )])),
4389 ]));
4390
4391 let tmp_dir = TempDir::new().unwrap();
4393 let table_location = tmp_dir.path().to_str().unwrap().to_string();
4394 let file_io = FileIO::new_with_fs();
4395
4396 let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as ArrayRef;
4397 let name_data =
4398 Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef;
4399
4400 let to_write =
4401 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data]).unwrap();
4402
4403 let props = WriterProperties::builder()
4404 .set_compression(Compression::SNAPPY)
4405 .build();
4406 let file = File::create(format!("{}/data.parquet", &table_location)).unwrap();
4407 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
4408 writer.write(&to_write).expect("Writing batch");
4409 writer.close().unwrap();
4410
4411 let reader = ArrowReaderBuilder::new(file_io).build();
4413 let tasks = Box::pin(futures::stream::iter(
4414 vec![Ok(FileScanTask {
4415 file_size_in_bytes: std::fs::metadata(format!("{table_location}/data.parquet"))
4416 .unwrap()
4417 .len(),
4418 start: 0,
4419 length: 0,
4420 record_count: None,
4421 data_file_path: format!("{table_location}/data.parquet"),
4422 data_file_format: DataFileFormat::Parquet,
4423 schema: schema.clone(),
4424 project_field_ids: vec![1, 2],
4425 predicate: None,
4426 deletes: vec![],
4427 partition: Some(partition_data),
4428 partition_spec: Some(partition_spec),
4429 name_mapping: None,
4430 case_sensitive: false,
4431 })]
4432 .into_iter(),
4433 )) as FileScanTaskStream;
4434
4435 let result = reader
4436 .read(tasks)
4437 .unwrap()
4438 .try_collect::<Vec<RecordBatch>>()
4439 .await
4440 .unwrap();
4441
4442 assert_eq!(result.len(), 1);
4444 let batch = &result[0];
4445
4446 assert_eq!(batch.num_columns(), 2);
4447 assert_eq!(batch.num_rows(), 4);
4448
4449 let id_col = batch
4452 .column(0)
4453 .as_primitive::<arrow_array::types::Int32Type>();
4454 assert_eq!(id_col.value(0), 1);
4455 assert_eq!(id_col.value(1), 5);
4456 assert_eq!(id_col.value(2), 9);
4457 assert_eq!(id_col.value(3), 13);
4458
4459 let name_col = batch.column(1).as_string::<i32>();
4460 assert_eq!(name_col.value(0), "Alice");
4461 assert_eq!(name_col.value(1), "Bob");
4462 assert_eq!(name_col.value(2), "Charlie");
4463 assert_eq!(name_col.value(3), "Dave");
4464 }
4465
4466 #[test]
4467 fn test_merge_ranges_empty() {
4468 assert_eq!(super::merge_ranges(&[], 1024), Vec::<Range<u64>>::new());
4469 }
4470
4471 #[test]
4472 fn test_merge_ranges_no_coalesce() {
4473 let ranges = vec![0..100, 1_000_000..1_000_100];
4475 let merged = super::merge_ranges(&ranges, 1024);
4476 assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]);
4477 }
4478
4479 #[test]
4480 fn test_merge_ranges_coalesce() {
4481 let ranges = vec![0..100, 200..300, 500..600];
4483 let merged = super::merge_ranges(&ranges, 1024);
4484 assert_eq!(merged, vec![0..600]);
4485 }
4486
4487 #[test]
4488 fn test_merge_ranges_overlapping() {
4489 let ranges = vec![0..200, 100..300];
4490 let merged = super::merge_ranges(&ranges, 0);
4491 assert_eq!(merged, vec![0..300]);
4492 }
4493
4494 #[test]
4495 fn test_merge_ranges_unsorted() {
4496 let ranges = vec![500..600, 0..100, 200..300];
4497 let merged = super::merge_ranges(&ranges, 1024);
4498 assert_eq!(merged, vec![0..600]);
4499 }
4500
4501 struct MockFileRead {
4503 data: bytes::Bytes,
4504 }
4505
4506 impl MockFileRead {
4507 fn new(size: usize) -> Self {
4508 let data: Vec<u8> = (0..size).map(|i| (i % 256) as u8).collect();
4510 Self {
4511 data: bytes::Bytes::from(data),
4512 }
4513 }
4514 }
4515
4516 #[async_trait::async_trait]
4517 impl crate::io::FileRead for MockFileRead {
4518 async fn read(&self, range: Range<u64>) -> crate::Result<bytes::Bytes> {
4519 Ok(self.data.slice(range.start as usize..range.end as usize))
4520 }
4521 }
4522
4523 #[tokio::test]
4524 async fn test_get_byte_ranges_no_coalesce() {
4525 use parquet::arrow::async_reader::AsyncFileReader;
4526
4527 let mock = MockFileRead::new(2048);
4528 let expected_0 = mock.data.slice(0..100);
4529 let expected_1 = mock.data.slice(1500..1600);
4530
4531 let mut reader =
4532 super::ArrowFileReader::new(crate::io::FileMetadata { size: 2048 }, Box::new(mock))
4533 .with_parquet_read_options(
4534 super::ParquetReadOptions::builder()
4535 .with_range_coalesce_bytes(0)
4536 .build(),
4537 );
4538
4539 let result = reader
4540 .get_byte_ranges(vec![0..100, 1500..1600])
4541 .await
4542 .unwrap();
4543
4544 assert_eq!(result.len(), 2);
4545 assert_eq!(result[0], expected_0);
4546 assert_eq!(result[1], expected_1);
4547 }
4548
4549 #[tokio::test]
4550 async fn test_get_byte_ranges_with_coalesce() {
4551 use parquet::arrow::async_reader::AsyncFileReader;
4552
4553 let mock = MockFileRead::new(1024);
4554 let expected_0 = mock.data.slice(0..100);
4555 let expected_1 = mock.data.slice(200..300);
4556 let expected_2 = mock.data.slice(500..600);
4557
4558 let mut reader =
4559 super::ArrowFileReader::new(crate::io::FileMetadata { size: 1024 }, Box::new(mock))
4560 .with_parquet_read_options(
4561 super::ParquetReadOptions::builder()
4562 .with_range_coalesce_bytes(1024)
4563 .build(),
4564 );
4565
4566 let result = reader
4568 .get_byte_ranges(vec![0..100, 200..300, 500..600])
4569 .await
4570 .unwrap();
4571
4572 assert_eq!(result.len(), 3);
4573 assert_eq!(result[0], expected_0);
4574 assert_eq!(result[1], expected_1);
4575 assert_eq!(result[2], expected_2);
4576 }
4577
4578 #[tokio::test]
4579 async fn test_get_byte_ranges_empty() {
4580 use parquet::arrow::async_reader::AsyncFileReader;
4581
4582 let mock = MockFileRead::new(1024);
4583 let mut reader =
4584 super::ArrowFileReader::new(crate::io::FileMetadata { size: 1024 }, Box::new(mock));
4585
4586 let result = reader.get_byte_ranges(vec![]).await.unwrap();
4587 assert!(result.is_empty());
4588 }
4589
4590 #[tokio::test]
4591 async fn test_get_byte_ranges_coalesce_max() {
4592 use parquet::arrow::async_reader::AsyncFileReader;
4593
4594 let mock = MockFileRead::new(2048);
4595 let expected_0 = mock.data.slice(0..100);
4596 let expected_1 = mock.data.slice(1500..1600);
4597
4598 let mut reader =
4599 super::ArrowFileReader::new(crate::io::FileMetadata { size: 2048 }, Box::new(mock))
4600 .with_parquet_read_options(
4601 super::ParquetReadOptions::builder()
4602 .with_range_coalesce_bytes(u64::MAX)
4603 .build(),
4604 );
4605
4606 let result = reader
4608 .get_byte_ranges(vec![0..100, 1500..1600])
4609 .await
4610 .unwrap();
4611
4612 assert_eq!(result.len(), 2);
4613 assert_eq!(result[0], expected_0);
4614 assert_eq!(result[1], expected_1);
4615 }
4616
4617 #[tokio::test]
4618 async fn test_get_byte_ranges_concurrency_zero() {
4619 use parquet::arrow::async_reader::AsyncFileReader;
4620
4621 let mock = MockFileRead::new(1024);
4623 let expected = mock.data.slice(0..100);
4624
4625 let mut reader =
4626 super::ArrowFileReader::new(crate::io::FileMetadata { size: 1024 }, Box::new(mock))
4627 .with_parquet_read_options(
4628 super::ParquetReadOptions::builder()
4629 .with_range_fetch_concurrency(0)
4630 .build(),
4631 );
4632
4633 let result = reader
4634 .get_byte_ranges(vec![0..100, 200..300])
4635 .await
4636 .unwrap();
4637 assert_eq!(result.len(), 2);
4638 assert_eq!(result[0], expected);
4639 }
4640
4641 #[tokio::test]
4642 async fn test_get_byte_ranges_concurrency_one() {
4643 use parquet::arrow::async_reader::AsyncFileReader;
4644
4645 let mock = MockFileRead::new(2048);
4646 let expected_0 = mock.data.slice(0..100);
4647 let expected_1 = mock.data.slice(500..600);
4648 let expected_2 = mock.data.slice(1500..1600);
4649
4650 let mut reader =
4651 super::ArrowFileReader::new(crate::io::FileMetadata { size: 2048 }, Box::new(mock))
4652 .with_parquet_read_options(
4653 super::ParquetReadOptions::builder()
4654 .with_range_coalesce_bytes(0)
4655 .with_range_fetch_concurrency(1)
4656 .build(),
4657 );
4658
4659 let result = reader
4661 .get_byte_ranges(vec![0..100, 500..600, 1500..1600])
4662 .await
4663 .unwrap();
4664
4665 assert_eq!(result.len(), 3);
4666 assert_eq!(result[0], expected_0);
4667 assert_eq!(result[1], expected_1);
4668 assert_eq!(result[2], expected_2);
4669 }
4670}