1use std::collections::{HashMap, HashSet};
21use std::ops::Range;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene};
26use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar};
27use arrow_cast::cast::cast;
28use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
29use arrow_schema::{
30 ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
31};
32use arrow_string::like::starts_with;
33use bytes::Bytes;
34use fnv::FnvHashSet;
35use futures::future::BoxFuture;
36use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join};
37use parquet::arrow::arrow_reader::{
38 ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
39};
40use parquet::arrow::async_reader::AsyncFileReader;
41use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask};
42use parquet::file::metadata::{
43 PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
44};
45use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
46
47use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
48use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
49use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
50use crate::delete_vector::DeleteVector;
51use crate::error::Result;
52use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit};
53use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
54use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
55use crate::expr::{BoundPredicate, BoundReference};
56use crate::io::{FileIO, FileMetadata, FileRead};
57use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
58use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
59use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type};
60use crate::utils::available_parallelism;
61use crate::{Error, ErrorKind};
62
63pub struct ArrowReaderBuilder {
65 batch_size: Option<usize>,
66 file_io: FileIO,
67 concurrency_limit_data_files: usize,
68 row_group_filtering_enabled: bool,
69 row_selection_enabled: bool,
70}
71
72impl ArrowReaderBuilder {
73 pub fn new(file_io: FileIO) -> Self {
75 let num_cpus = available_parallelism().get();
76
77 ArrowReaderBuilder {
78 batch_size: None,
79 file_io,
80 concurrency_limit_data_files: num_cpus,
81 row_group_filtering_enabled: true,
82 row_selection_enabled: false,
83 }
84 }
85
86 pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
88 self.concurrency_limit_data_files = val;
89 self
90 }
91
92 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
95 self.batch_size = Some(batch_size);
96 self
97 }
98
99 pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
101 self.row_group_filtering_enabled = row_group_filtering_enabled;
102 self
103 }
104
105 pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
107 self.row_selection_enabled = row_selection_enabled;
108 self
109 }
110
111 pub fn build(self) -> ArrowReader {
113 ArrowReader {
114 batch_size: self.batch_size,
115 file_io: self.file_io.clone(),
116 delete_file_loader: CachingDeleteFileLoader::new(
117 self.file_io.clone(),
118 self.concurrency_limit_data_files,
119 ),
120 concurrency_limit_data_files: self.concurrency_limit_data_files,
121 row_group_filtering_enabled: self.row_group_filtering_enabled,
122 row_selection_enabled: self.row_selection_enabled,
123 }
124 }
125}
126
127#[derive(Clone)]
129pub struct ArrowReader {
130 batch_size: Option<usize>,
131 file_io: FileIO,
132 delete_file_loader: CachingDeleteFileLoader,
133
134 concurrency_limit_data_files: usize,
136
137 row_group_filtering_enabled: bool,
138 row_selection_enabled: bool,
139}
140
141impl ArrowReader {
142 pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
145 let file_io = self.file_io.clone();
146 let batch_size = self.batch_size;
147 let concurrency_limit_data_files = self.concurrency_limit_data_files;
148 let row_group_filtering_enabled = self.row_group_filtering_enabled;
149 let row_selection_enabled = self.row_selection_enabled;
150
151 let stream = tasks
152 .map_ok(move |task| {
153 let file_io = file_io.clone();
154
155 Self::process_file_scan_task(
156 task,
157 batch_size,
158 file_io,
159 self.delete_file_loader.clone(),
160 row_group_filtering_enabled,
161 row_selection_enabled,
162 )
163 })
164 .map_err(|err| {
165 Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err)
166 })
167 .try_buffer_unordered(concurrency_limit_data_files)
168 .try_flatten_unordered(concurrency_limit_data_files);
169
170 Ok(Box::pin(stream) as ArrowRecordBatchStream)
171 }
172
173 #[allow(clippy::too_many_arguments)]
174 async fn process_file_scan_task(
175 task: FileScanTask,
176 batch_size: Option<usize>,
177 file_io: FileIO,
178 delete_file_loader: CachingDeleteFileLoader,
179 row_group_filtering_enabled: bool,
180 row_selection_enabled: bool,
181 ) -> Result<ArrowRecordBatchStream> {
182 let should_load_page_index =
183 (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
184
185 let delete_filter_rx =
186 delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema));
187
188 let initial_stream_builder = Self::create_parquet_record_batch_stream_builder(
191 &task.data_file_path,
192 file_io.clone(),
193 should_load_page_index,
194 None,
195 )
196 .await?;
197
198 let missing_field_ids = initial_stream_builder
202 .schema()
203 .fields()
204 .iter()
205 .next()
206 .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
207
208 let mut record_batch_stream_builder = if missing_field_ids {
224 let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
226 apply_name_mapping_to_arrow_schema(
231 Arc::clone(initial_stream_builder.schema()),
232 name_mapping,
233 )?
234 } else {
235 add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema())
238 };
239
240 let options = ArrowReaderOptions::new().with_schema(arrow_schema);
241
242 Self::create_parquet_record_batch_stream_builder(
243 &task.data_file_path,
244 file_io.clone(),
245 should_load_page_index,
246 Some(options),
247 )
248 .await?
249 } else {
250 initial_stream_builder
252 };
253
254 let project_field_ids_without_metadata: Vec<i32> = task
256 .project_field_ids
257 .iter()
258 .filter(|&&id| !is_metadata_field(id))
259 .copied()
260 .collect();
261
262 let projection_mask = Self::get_arrow_projection_mask(
267 &project_field_ids_without_metadata,
268 &task.schema,
269 record_batch_stream_builder.parquet_schema(),
270 record_batch_stream_builder.schema(),
271 missing_field_ids, )?;
273
274 record_batch_stream_builder =
275 record_batch_stream_builder.with_projection(projection_mask.clone());
276
277 let mut record_batch_transformer_builder =
281 RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
282
283 if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
285 let file_datum = Datum::string(task.data_file_path.clone());
286 record_batch_transformer_builder =
287 record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum);
288 }
289
290 if let (Some(partition_spec), Some(partition_data)) =
291 (task.partition_spec.clone(), task.partition.clone())
292 {
293 record_batch_transformer_builder =
294 record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
295 }
296
297 let mut record_batch_transformer = record_batch_transformer_builder.build();
298
299 if let Some(batch_size) = batch_size {
300 record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
301 }
302
303 let delete_filter = delete_filter_rx.await.unwrap()?;
304 let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
305
306 let final_predicate = match (&task.predicate, delete_predicate) {
311 (None, None) => None,
312 (Some(predicate), None) => Some(predicate.clone()),
313 (None, Some(ref predicate)) => Some(predicate.clone()),
314 (Some(filter_predicate), Some(delete_predicate)) => {
315 Some(filter_predicate.clone().and(delete_predicate))
316 }
317 };
318
319 let mut selected_row_group_indices = None;
335 let mut row_selection = None;
336
337 if task.start != 0 || task.length != 0 {
340 let byte_range_filtered_row_groups = Self::filter_row_groups_by_byte_range(
341 record_batch_stream_builder.metadata(),
342 task.start,
343 task.length,
344 )?;
345 selected_row_group_indices = Some(byte_range_filtered_row_groups);
346 }
347
348 if let Some(predicate) = final_predicate {
349 let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
350 record_batch_stream_builder.parquet_schema(),
351 &predicate,
352 )?;
353
354 let row_filter = Self::get_row_filter(
355 &predicate,
356 record_batch_stream_builder.parquet_schema(),
357 &iceberg_field_ids,
358 &field_id_map,
359 )?;
360 record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
361
362 if row_group_filtering_enabled {
363 let predicate_filtered_row_groups = Self::get_selected_row_group_indices(
364 &predicate,
365 record_batch_stream_builder.metadata(),
366 &field_id_map,
367 &task.schema,
368 )?;
369
370 selected_row_group_indices = match selected_row_group_indices {
373 Some(byte_range_filtered) => {
374 let intersection: Vec<usize> = byte_range_filtered
376 .into_iter()
377 .filter(|idx| predicate_filtered_row_groups.contains(idx))
378 .collect();
379 Some(intersection)
380 }
381 None => Some(predicate_filtered_row_groups),
382 };
383 }
384
385 if row_selection_enabled {
386 row_selection = Some(Self::get_row_selection_for_filter_predicate(
387 &predicate,
388 record_batch_stream_builder.metadata(),
389 &selected_row_group_indices,
390 &field_id_map,
391 &task.schema,
392 )?);
393 }
394 }
395
396 let positional_delete_indexes = delete_filter.get_delete_vector(&task);
397
398 if let Some(positional_delete_indexes) = positional_delete_indexes {
399 let delete_row_selection = {
400 let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
401
402 Self::build_deletes_row_selection(
403 record_batch_stream_builder.metadata().row_groups(),
404 &selected_row_group_indices,
405 &positional_delete_indexes,
406 )
407 }?;
408
409 row_selection = match row_selection {
412 None => Some(delete_row_selection),
413 Some(filter_row_selection) => {
414 Some(filter_row_selection.intersection(&delete_row_selection))
415 }
416 };
417 }
418
419 if let Some(row_selection) = row_selection {
420 record_batch_stream_builder =
421 record_batch_stream_builder.with_row_selection(row_selection);
422 }
423
424 if let Some(selected_row_group_indices) = selected_row_group_indices {
425 record_batch_stream_builder =
426 record_batch_stream_builder.with_row_groups(selected_row_group_indices);
427 }
428
429 let record_batch_stream =
432 record_batch_stream_builder
433 .build()?
434 .map(move |batch| match batch {
435 Ok(batch) => {
436 record_batch_transformer.process_record_batch(batch)
438 }
439 Err(err) => Err(err.into()),
440 });
441
442 Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
443 }
444
445 pub(crate) async fn create_parquet_record_batch_stream_builder(
446 data_file_path: &str,
447 file_io: FileIO,
448 should_load_page_index: bool,
449 arrow_reader_options: Option<ArrowReaderOptions>,
450 ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader<impl FileRead + Sized>>> {
451 let parquet_file = file_io.new_input(data_file_path)?;
454 let (parquet_metadata, parquet_reader) =
455 try_join!(parquet_file.metadata(), parquet_file.reader())?;
456 let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader)
457 .with_preload_column_index(true)
458 .with_preload_offset_index(true)
459 .with_preload_page_index(should_load_page_index);
460
461 let options = arrow_reader_options.unwrap_or_default();
463 let record_batch_stream_builder =
464 ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, options).await?;
465 Ok(record_batch_stream_builder)
466 }
467
468 fn build_deletes_row_selection(
474 row_group_metadata_list: &[RowGroupMetaData],
475 selected_row_groups: &Option<Vec<usize>>,
476 positional_deletes: &DeleteVector,
477 ) -> Result<RowSelection> {
478 let mut results: Vec<RowSelector> = Vec::new();
479 let mut selected_row_groups_idx = 0;
480 let mut current_row_group_base_idx: u64 = 0;
481 let mut delete_vector_iter = positional_deletes.iter();
482 let mut next_deleted_row_idx_opt = delete_vector_iter.next();
483
484 for (idx, row_group_metadata) in row_group_metadata_list.iter().enumerate() {
485 let row_group_num_rows = row_group_metadata.num_rows() as u64;
486 let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;
487
488 if let Some(selected_row_groups) = selected_row_groups {
490 if selected_row_groups_idx == selected_row_groups.len() {
492 break;
493 }
494
495 if idx == selected_row_groups[selected_row_groups_idx] {
496 selected_row_groups_idx += 1;
500 } else {
501 delete_vector_iter.advance_to(next_row_group_base_idx);
506 if let Some(cached_idx) = next_deleted_row_idx_opt
508 && cached_idx < next_row_group_base_idx
509 {
510 next_deleted_row_idx_opt = delete_vector_iter.next();
511 }
512
513 current_row_group_base_idx += row_group_num_rows;
516 continue;
517 }
518 }
519
520 let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
521 Some(next_deleted_row_idx) => {
522 if next_deleted_row_idx >= next_row_group_base_idx {
525 results.push(RowSelector::select(row_group_num_rows as usize));
526 current_row_group_base_idx += row_group_num_rows;
527 continue;
528 }
529
530 next_deleted_row_idx
531 }
532
533 _ => {
535 results.push(RowSelector::select(row_group_num_rows as usize));
536 current_row_group_base_idx += row_group_num_rows;
537 continue;
538 }
539 };
540
541 let mut current_idx = current_row_group_base_idx;
542 'chunks: while next_deleted_row_idx < next_row_group_base_idx {
543 if current_idx < next_deleted_row_idx {
545 let run_length = next_deleted_row_idx - current_idx;
546 results.push(RowSelector::select(run_length as usize));
547 current_idx += run_length;
548 }
549
550 let mut run_length = 0;
552 while next_deleted_row_idx == current_idx
553 && next_deleted_row_idx < next_row_group_base_idx
554 {
555 run_length += 1;
556 current_idx += 1;
557
558 next_deleted_row_idx_opt = delete_vector_iter.next();
559 next_deleted_row_idx = match next_deleted_row_idx_opt {
560 Some(next_deleted_row_idx) => next_deleted_row_idx,
561 _ => {
562 results.push(RowSelector::skip(run_length));
566 break 'chunks;
567 }
568 };
569 }
570 if run_length > 0 {
571 results.push(RowSelector::skip(run_length));
572 }
573 }
574
575 if current_idx < next_row_group_base_idx {
576 results.push(RowSelector::select(
577 (next_row_group_base_idx - current_idx) as usize,
578 ));
579 }
580
581 current_row_group_base_idx += row_group_num_rows;
582 }
583
584 Ok(results.into())
585 }
586
587 fn build_field_id_set_and_map(
588 parquet_schema: &SchemaDescriptor,
589 predicate: &BoundPredicate,
590 ) -> Result<(HashSet<i32>, HashMap<i32, usize>)> {
591 let mut collector = CollectFieldIdVisitor {
593 field_ids: HashSet::default(),
594 };
595 visit(&mut collector, predicate)?;
596
597 let iceberg_field_ids = collector.field_ids();
598
599 let field_id_map = match build_field_id_map(parquet_schema)? {
601 Some(map) => map,
602 None => build_fallback_field_id_map(parquet_schema),
603 };
604
605 Ok((iceberg_field_ids, field_id_map))
606 }
607
608 fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
611 match field.field_type.as_ref() {
612 Type::Primitive(_) => {
613 field_ids.push(field.id);
614 }
615 Type::Struct(struct_type) => {
616 for nested_field in struct_type.fields() {
617 Self::include_leaf_field_id(nested_field, field_ids);
618 }
619 }
620 Type::List(list_type) => {
621 Self::include_leaf_field_id(&list_type.element_field, field_ids);
622 }
623 Type::Map(map_type) => {
624 Self::include_leaf_field_id(&map_type.key_field, field_ids);
625 Self::include_leaf_field_id(&map_type.value_field, field_ids);
626 }
627 }
628 }
629
630 fn get_arrow_projection_mask(
631 field_ids: &[i32],
632 iceberg_schema_of_task: &Schema,
633 parquet_schema: &SchemaDescriptor,
634 arrow_schema: &ArrowSchemaRef,
635 use_fallback: bool, ) -> Result<ProjectionMask> {
637 fn type_promotion_is_valid(
638 file_type: Option<&PrimitiveType>,
639 projected_type: Option<&PrimitiveType>,
640 ) -> bool {
641 match (file_type, projected_type) {
642 (Some(lhs), Some(rhs)) if lhs == rhs => true,
643 (Some(PrimitiveType::Int), Some(PrimitiveType::Long)) => true,
644 (Some(PrimitiveType::Float), Some(PrimitiveType::Double)) => true,
645 (
646 Some(PrimitiveType::Decimal {
647 precision: file_precision,
648 scale: file_scale,
649 }),
650 Some(PrimitiveType::Decimal {
651 precision: requested_precision,
652 scale: requested_scale,
653 }),
654 ) if requested_precision >= file_precision && file_scale == requested_scale => true,
655 (Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true,
657 _ => false,
658 }
659 }
660
661 if field_ids.is_empty() {
662 return Ok(ProjectionMask::all());
663 }
664
665 if use_fallback {
666 Self::get_arrow_projection_mask_fallback(field_ids, parquet_schema)
668 } else {
669 let mut leaf_field_ids = vec![];
673 for field_id in field_ids {
674 let field = iceberg_schema_of_task.field_by_id(*field_id);
675 if let Some(field) = field {
676 Self::include_leaf_field_id(field, &mut leaf_field_ids);
677 }
678 }
679
680 Self::get_arrow_projection_mask_with_field_ids(
681 &leaf_field_ids,
682 iceberg_schema_of_task,
683 parquet_schema,
684 arrow_schema,
685 type_promotion_is_valid,
686 )
687 }
688 }
689
690 fn get_arrow_projection_mask_with_field_ids(
693 leaf_field_ids: &[i32],
694 iceberg_schema_of_task: &Schema,
695 parquet_schema: &SchemaDescriptor,
696 arrow_schema: &ArrowSchemaRef,
697 type_promotion_is_valid: fn(Option<&PrimitiveType>, Option<&PrimitiveType>) -> bool,
698 ) -> Result<ProjectionMask> {
699 let mut column_map = HashMap::new();
700 let fields = arrow_schema.fields();
701
702 let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
705 let projected_arrow_schema = ArrowSchema::new_with_metadata(
706 fields.filter_leaves(|_, f| {
707 f.metadata()
708 .get(PARQUET_FIELD_ID_META_KEY)
709 .and_then(|field_id| i32::from_str(field_id).ok())
710 .is_some_and(|field_id| {
711 projected_fields.insert((*f).clone(), field_id);
712 leaf_field_ids.contains(&field_id)
713 })
714 }),
715 arrow_schema.metadata().clone(),
716 );
717 let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
718
719 fields.filter_leaves(|idx, field| {
720 let Some(field_id) = projected_fields.get(field).cloned() else {
721 return false;
722 };
723
724 let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
725 let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
726
727 if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
728 return false;
729 }
730
731 if !type_promotion_is_valid(
732 parquet_iceberg_field
733 .unwrap()
734 .field_type
735 .as_primitive_type(),
736 iceberg_field.unwrap().field_type.as_primitive_type(),
737 ) {
738 return false;
739 }
740
741 column_map.insert(field_id, idx);
742 true
743 });
744
745 let mut indices = vec![];
748 for field_id in leaf_field_ids {
749 if let Some(col_idx) = column_map.get(field_id) {
750 indices.push(*col_idx);
751 }
752 }
753
754 if indices.is_empty() {
755 Ok(ProjectionMask::all())
758 } else {
759 Ok(ProjectionMask::leaves(parquet_schema, indices))
760 }
761 }
762
763 fn get_arrow_projection_mask_fallback(
767 field_ids: &[i32],
768 parquet_schema: &SchemaDescriptor,
769 ) -> Result<ProjectionMask> {
770 let parquet_root_fields = parquet_schema.root_schema().get_fields();
772 let mut root_indices = vec![];
773
774 for field_id in field_ids.iter() {
775 let parquet_pos = (*field_id - 1) as usize;
776
777 if parquet_pos < parquet_root_fields.len() {
778 root_indices.push(parquet_pos);
779 }
780 }
782
783 if root_indices.is_empty() {
784 Ok(ProjectionMask::all())
785 } else {
786 Ok(ProjectionMask::roots(parquet_schema, root_indices))
787 }
788 }
789
790 fn get_row_filter(
791 predicates: &BoundPredicate,
792 parquet_schema: &SchemaDescriptor,
793 iceberg_field_ids: &HashSet<i32>,
794 field_id_map: &HashMap<i32, usize>,
795 ) -> Result<RowFilter> {
796 let mut column_indices = iceberg_field_ids
799 .iter()
800 .filter_map(|field_id| field_id_map.get(field_id).cloned())
801 .collect::<Vec<_>>();
802 column_indices.sort();
803
804 let mut converter = PredicateConverter {
806 parquet_schema,
807 column_map: field_id_map,
808 column_indices: &column_indices,
809 };
810
811 let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
814 let predicate_func = visit(&mut converter, predicates)?;
815 let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
816 Ok(RowFilter::new(vec![Box::new(arrow_predicate)]))
817 }
818
819 fn get_selected_row_group_indices(
820 predicate: &BoundPredicate,
821 parquet_metadata: &Arc<ParquetMetaData>,
822 field_id_map: &HashMap<i32, usize>,
823 snapshot_schema: &Schema,
824 ) -> Result<Vec<usize>> {
825 let row_groups_metadata = parquet_metadata.row_groups();
826 let mut results = Vec::with_capacity(row_groups_metadata.len());
827
828 for (idx, row_group_metadata) in row_groups_metadata.iter().enumerate() {
829 if RowGroupMetricsEvaluator::eval(
830 predicate,
831 row_group_metadata,
832 field_id_map,
833 snapshot_schema,
834 )? {
835 results.push(idx);
836 }
837 }
838
839 Ok(results)
840 }
841
842 fn get_row_selection_for_filter_predicate(
843 predicate: &BoundPredicate,
844 parquet_metadata: &Arc<ParquetMetaData>,
845 selected_row_groups: &Option<Vec<usize>>,
846 field_id_map: &HashMap<i32, usize>,
847 snapshot_schema: &Schema,
848 ) -> Result<RowSelection> {
849 let Some(column_index) = parquet_metadata.column_index() else {
850 return Err(Error::new(
851 ErrorKind::Unexpected,
852 "Parquet file metadata does not contain a column index",
853 ));
854 };
855
856 let Some(offset_index) = parquet_metadata.offset_index() else {
857 return Err(Error::new(
858 ErrorKind::Unexpected,
859 "Parquet file metadata does not contain an offset index",
860 ));
861 };
862
863 if let Some(selected_row_groups) = selected_row_groups
865 && selected_row_groups.is_empty()
866 {
867 return Ok(RowSelection::from(Vec::new()));
868 }
869
870 let mut selected_row_groups_idx = 0;
871
872 let page_index = column_index
873 .iter()
874 .enumerate()
875 .zip(offset_index)
876 .zip(parquet_metadata.row_groups());
877
878 let mut results = Vec::new();
879 for (((idx, column_index), offset_index), row_group_metadata) in page_index {
880 if let Some(selected_row_groups) = selected_row_groups {
881 if idx == selected_row_groups[selected_row_groups_idx] {
883 selected_row_groups_idx += 1;
884 } else {
885 continue;
886 }
887 }
888
889 let selections_for_page = PageIndexEvaluator::eval(
890 predicate,
891 column_index,
892 offset_index,
893 row_group_metadata,
894 field_id_map,
895 snapshot_schema,
896 )?;
897
898 results.push(selections_for_page);
899
900 if let Some(selected_row_groups) = selected_row_groups
901 && selected_row_groups_idx == selected_row_groups.len()
902 {
903 break;
904 }
905 }
906
907 Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
908 }
909
910 fn filter_row_groups_by_byte_range(
915 parquet_metadata: &Arc<ParquetMetaData>,
916 start: u64,
917 length: u64,
918 ) -> Result<Vec<usize>> {
919 let row_groups = parquet_metadata.row_groups();
920 let mut selected = Vec::new();
921 let end = start + length;
922
923 let mut current_byte_offset = 4u64;
925
926 for (idx, row_group) in row_groups.iter().enumerate() {
927 let row_group_size = row_group.compressed_size() as u64;
928 let row_group_end = current_byte_offset + row_group_size;
929
930 if current_byte_offset < end && start < row_group_end {
931 selected.push(idx);
932 }
933
934 current_byte_offset = row_group_end;
935 }
936
937 Ok(selected)
938 }
939}
940
941fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<Option<HashMap<i32, usize>>> {
944 let mut column_map = HashMap::new();
945
946 for (idx, field) in parquet_schema.columns().iter().enumerate() {
947 let field_type = field.self_type();
948 match field_type {
949 ParquetType::PrimitiveType { basic_info, .. } => {
950 if !basic_info.has_id() {
951 return Ok(None);
952 }
953 column_map.insert(basic_info.id(), idx);
954 }
955 ParquetType::GroupType { .. } => {
956 return Err(Error::new(
957 ErrorKind::DataInvalid,
958 format!(
959 "Leave column in schema should be primitive type but got {field_type:?}"
960 ),
961 ));
962 }
963 };
964 }
965
966 Ok(Some(column_map))
967}
968
969fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap<i32, usize> {
972 let mut column_map = HashMap::new();
973
974 for (idx, _field) in parquet_schema.columns().iter().enumerate() {
976 let field_id = (idx + 1) as i32;
977 column_map.insert(field_id, idx);
978 }
979
980 column_map
981}
982
983fn apply_name_mapping_to_arrow_schema(
1002 arrow_schema: ArrowSchemaRef,
1003 name_mapping: &NameMapping,
1004) -> Result<Arc<ArrowSchema>> {
1005 debug_assert!(
1006 arrow_schema
1007 .fields()
1008 .iter()
1009 .next()
1010 .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1011 "Schema already has field IDs - name mapping should not be applied"
1012 );
1013
1014 use arrow_schema::Field;
1015
1016 let fields_with_mapped_ids: Vec<_> = arrow_schema
1017 .fields()
1018 .iter()
1019 .map(|field| {
1020 let mapped_field_opt = name_mapping
1028 .fields()
1029 .iter()
1030 .find(|f| f.names().contains(&field.name().to_string()));
1031
1032 let mut metadata = field.metadata().clone();
1033
1034 if let Some(mapped_field) = mapped_field_opt
1035 && let Some(field_id) = mapped_field.field_id()
1036 {
1037 metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1039 }
1040 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1043 .with_metadata(metadata)
1044 })
1045 .collect();
1046
1047 Ok(Arc::new(ArrowSchema::new_with_metadata(
1048 fields_with_mapped_ids,
1049 arrow_schema.metadata().clone(),
1050 )))
1051}
1052
1053fn add_fallback_field_ids_to_arrow_schema(arrow_schema: &ArrowSchemaRef) -> Arc<ArrowSchema> {
1060 debug_assert!(
1061 arrow_schema
1062 .fields()
1063 .iter()
1064 .next()
1065 .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1066 "Schema already has field IDs"
1067 );
1068
1069 use arrow_schema::Field;
1070
1071 let fields_with_fallback_ids: Vec<_> = arrow_schema
1072 .fields()
1073 .iter()
1074 .enumerate()
1075 .map(|(pos, field)| {
1076 let mut metadata = field.metadata().clone();
1077 let field_id = (pos + 1) as i32; metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1079
1080 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1081 .with_metadata(metadata)
1082 })
1083 .collect();
1084
1085 Arc::new(ArrowSchema::new_with_metadata(
1086 fields_with_fallback_ids,
1087 arrow_schema.metadata().clone(),
1088 ))
1089}
1090
1091struct CollectFieldIdVisitor {
1093 field_ids: HashSet<i32>,
1094}
1095
1096impl CollectFieldIdVisitor {
1097 fn field_ids(self) -> HashSet<i32> {
1098 self.field_ids
1099 }
1100}
1101
1102impl BoundPredicateVisitor for CollectFieldIdVisitor {
1103 type T = ();
1104
1105 fn always_true(&mut self) -> Result<()> {
1106 Ok(())
1107 }
1108
1109 fn always_false(&mut self) -> Result<()> {
1110 Ok(())
1111 }
1112
1113 fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1114 Ok(())
1115 }
1116
1117 fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1118 Ok(())
1119 }
1120
1121 fn not(&mut self, _inner: ()) -> Result<()> {
1122 Ok(())
1123 }
1124
1125 fn is_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1126 self.field_ids.insert(reference.field().id);
1127 Ok(())
1128 }
1129
1130 fn not_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1131 self.field_ids.insert(reference.field().id);
1132 Ok(())
1133 }
1134
1135 fn is_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1136 self.field_ids.insert(reference.field().id);
1137 Ok(())
1138 }
1139
1140 fn not_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1141 self.field_ids.insert(reference.field().id);
1142 Ok(())
1143 }
1144
1145 fn less_than(
1146 &mut self,
1147 reference: &BoundReference,
1148 _literal: &Datum,
1149 _predicate: &BoundPredicate,
1150 ) -> Result<()> {
1151 self.field_ids.insert(reference.field().id);
1152 Ok(())
1153 }
1154
1155 fn less_than_or_eq(
1156 &mut self,
1157 reference: &BoundReference,
1158 _literal: &Datum,
1159 _predicate: &BoundPredicate,
1160 ) -> Result<()> {
1161 self.field_ids.insert(reference.field().id);
1162 Ok(())
1163 }
1164
1165 fn greater_than(
1166 &mut self,
1167 reference: &BoundReference,
1168 _literal: &Datum,
1169 _predicate: &BoundPredicate,
1170 ) -> Result<()> {
1171 self.field_ids.insert(reference.field().id);
1172 Ok(())
1173 }
1174
1175 fn greater_than_or_eq(
1176 &mut self,
1177 reference: &BoundReference,
1178 _literal: &Datum,
1179 _predicate: &BoundPredicate,
1180 ) -> Result<()> {
1181 self.field_ids.insert(reference.field().id);
1182 Ok(())
1183 }
1184
1185 fn eq(
1186 &mut self,
1187 reference: &BoundReference,
1188 _literal: &Datum,
1189 _predicate: &BoundPredicate,
1190 ) -> Result<()> {
1191 self.field_ids.insert(reference.field().id);
1192 Ok(())
1193 }
1194
1195 fn not_eq(
1196 &mut self,
1197 reference: &BoundReference,
1198 _literal: &Datum,
1199 _predicate: &BoundPredicate,
1200 ) -> Result<()> {
1201 self.field_ids.insert(reference.field().id);
1202 Ok(())
1203 }
1204
1205 fn starts_with(
1206 &mut self,
1207 reference: &BoundReference,
1208 _literal: &Datum,
1209 _predicate: &BoundPredicate,
1210 ) -> Result<()> {
1211 self.field_ids.insert(reference.field().id);
1212 Ok(())
1213 }
1214
1215 fn not_starts_with(
1216 &mut self,
1217 reference: &BoundReference,
1218 _literal: &Datum,
1219 _predicate: &BoundPredicate,
1220 ) -> Result<()> {
1221 self.field_ids.insert(reference.field().id);
1222 Ok(())
1223 }
1224
1225 fn r#in(
1226 &mut self,
1227 reference: &BoundReference,
1228 _literals: &FnvHashSet<Datum>,
1229 _predicate: &BoundPredicate,
1230 ) -> Result<()> {
1231 self.field_ids.insert(reference.field().id);
1232 Ok(())
1233 }
1234
1235 fn not_in(
1236 &mut self,
1237 reference: &BoundReference,
1238 _literals: &FnvHashSet<Datum>,
1239 _predicate: &BoundPredicate,
1240 ) -> Result<()> {
1241 self.field_ids.insert(reference.field().id);
1242 Ok(())
1243 }
1244}
1245
1246struct PredicateConverter<'a> {
1248 pub parquet_schema: &'a SchemaDescriptor,
1250 pub column_map: &'a HashMap<i32, usize>,
1252 pub column_indices: &'a Vec<usize>,
1254}
1255
1256impl PredicateConverter<'_> {
1257 fn bound_reference(&mut self, reference: &BoundReference) -> Result<Option<usize>> {
1262 if let Some(column_idx) = self.column_map.get(&reference.field().id) {
1264 if self.parquet_schema.get_column_root(*column_idx).is_group() {
1265 return Err(Error::new(
1266 ErrorKind::DataInvalid,
1267 format!(
1268 "Leave column `{}` in predicates isn't a root column in Parquet schema.",
1269 reference.field().name
1270 ),
1271 ));
1272 }
1273
1274 let index = self
1276 .column_indices
1277 .iter()
1278 .position(|&idx| idx == *column_idx)
1279 .ok_or(Error::new(
1280 ErrorKind::DataInvalid,
1281 format!(
1282 "Leave column `{}` in predicates cannot be found in the required column indices.",
1283 reference.field().name
1284 ),
1285 ))?;
1286
1287 Ok(Some(index))
1288 } else {
1289 Ok(None)
1290 }
1291 }
1292
1293 fn build_always_true(&self) -> Result<Box<PredicateResult>> {
1295 Ok(Box::new(|batch| {
1296 Ok(BooleanArray::from(vec![true; batch.num_rows()]))
1297 }))
1298 }
1299
1300 fn build_always_false(&self) -> Result<Box<PredicateResult>> {
1302 Ok(Box::new(|batch| {
1303 Ok(BooleanArray::from(vec![false; batch.num_rows()]))
1304 }))
1305 }
1306}
1307
1308fn project_column(
1311 batch: &RecordBatch,
1312 column_idx: usize,
1313) -> std::result::Result<ArrayRef, ArrowError> {
1314 let column = batch.column(column_idx);
1315
1316 match column.data_type() {
1317 DataType::Struct(_) => Err(ArrowError::SchemaError(
1318 "Does not support struct column yet.".to_string(),
1319 )),
1320 _ => Ok(column.clone()),
1321 }
1322}
1323
1324type PredicateResult =
1325 dyn FnMut(RecordBatch) -> std::result::Result<BooleanArray, ArrowError> + Send + 'static;
1326
1327impl BoundPredicateVisitor for PredicateConverter<'_> {
1328 type T = Box<PredicateResult>;
1329
1330 fn always_true(&mut self) -> Result<Box<PredicateResult>> {
1331 self.build_always_true()
1332 }
1333
1334 fn always_false(&mut self) -> Result<Box<PredicateResult>> {
1335 self.build_always_false()
1336 }
1337
1338 fn and(
1339 &mut self,
1340 mut lhs: Box<PredicateResult>,
1341 mut rhs: Box<PredicateResult>,
1342 ) -> Result<Box<PredicateResult>> {
1343 Ok(Box::new(move |batch| {
1344 let left = lhs(batch.clone())?;
1345 let right = rhs(batch)?;
1346 and_kleene(&left, &right)
1347 }))
1348 }
1349
1350 fn or(
1351 &mut self,
1352 mut lhs: Box<PredicateResult>,
1353 mut rhs: Box<PredicateResult>,
1354 ) -> Result<Box<PredicateResult>> {
1355 Ok(Box::new(move |batch| {
1356 let left = lhs(batch.clone())?;
1357 let right = rhs(batch)?;
1358 or_kleene(&left, &right)
1359 }))
1360 }
1361
1362 fn not(&mut self, mut inner: Box<PredicateResult>) -> Result<Box<PredicateResult>> {
1363 Ok(Box::new(move |batch| {
1364 let pred_ret = inner(batch)?;
1365 not(&pred_ret)
1366 }))
1367 }
1368
1369 fn is_null(
1370 &mut self,
1371 reference: &BoundReference,
1372 _predicate: &BoundPredicate,
1373 ) -> Result<Box<PredicateResult>> {
1374 if let Some(idx) = self.bound_reference(reference)? {
1375 Ok(Box::new(move |batch| {
1376 let column = project_column(&batch, idx)?;
1377 is_null(&column)
1378 }))
1379 } else {
1380 self.build_always_true()
1382 }
1383 }
1384
1385 fn not_null(
1386 &mut self,
1387 reference: &BoundReference,
1388 _predicate: &BoundPredicate,
1389 ) -> Result<Box<PredicateResult>> {
1390 if let Some(idx) = self.bound_reference(reference)? {
1391 Ok(Box::new(move |batch| {
1392 let column = project_column(&batch, idx)?;
1393 is_not_null(&column)
1394 }))
1395 } else {
1396 self.build_always_false()
1398 }
1399 }
1400
1401 fn is_nan(
1402 &mut self,
1403 reference: &BoundReference,
1404 _predicate: &BoundPredicate,
1405 ) -> Result<Box<PredicateResult>> {
1406 if self.bound_reference(reference)?.is_some() {
1407 self.build_always_true()
1408 } else {
1409 self.build_always_false()
1411 }
1412 }
1413
1414 fn not_nan(
1415 &mut self,
1416 reference: &BoundReference,
1417 _predicate: &BoundPredicate,
1418 ) -> Result<Box<PredicateResult>> {
1419 if self.bound_reference(reference)?.is_some() {
1420 self.build_always_false()
1421 } else {
1422 self.build_always_true()
1424 }
1425 }
1426
1427 fn less_than(
1428 &mut self,
1429 reference: &BoundReference,
1430 literal: &Datum,
1431 _predicate: &BoundPredicate,
1432 ) -> Result<Box<PredicateResult>> {
1433 if let Some(idx) = self.bound_reference(reference)? {
1434 let literal = get_arrow_datum(literal)?;
1435
1436 Ok(Box::new(move |batch| {
1437 let left = project_column(&batch, idx)?;
1438 let literal = try_cast_literal(&literal, left.data_type())?;
1439 lt(&left, literal.as_ref())
1440 }))
1441 } else {
1442 self.build_always_true()
1444 }
1445 }
1446
1447 fn less_than_or_eq(
1448 &mut self,
1449 reference: &BoundReference,
1450 literal: &Datum,
1451 _predicate: &BoundPredicate,
1452 ) -> Result<Box<PredicateResult>> {
1453 if let Some(idx) = self.bound_reference(reference)? {
1454 let literal = get_arrow_datum(literal)?;
1455
1456 Ok(Box::new(move |batch| {
1457 let left = project_column(&batch, idx)?;
1458 let literal = try_cast_literal(&literal, left.data_type())?;
1459 lt_eq(&left, literal.as_ref())
1460 }))
1461 } else {
1462 self.build_always_true()
1464 }
1465 }
1466
1467 fn greater_than(
1468 &mut self,
1469 reference: &BoundReference,
1470 literal: &Datum,
1471 _predicate: &BoundPredicate,
1472 ) -> Result<Box<PredicateResult>> {
1473 if let Some(idx) = self.bound_reference(reference)? {
1474 let literal = get_arrow_datum(literal)?;
1475
1476 Ok(Box::new(move |batch| {
1477 let left = project_column(&batch, idx)?;
1478 let literal = try_cast_literal(&literal, left.data_type())?;
1479 gt(&left, literal.as_ref())
1480 }))
1481 } else {
1482 self.build_always_false()
1484 }
1485 }
1486
1487 fn greater_than_or_eq(
1488 &mut self,
1489 reference: &BoundReference,
1490 literal: &Datum,
1491 _predicate: &BoundPredicate,
1492 ) -> Result<Box<PredicateResult>> {
1493 if let Some(idx) = self.bound_reference(reference)? {
1494 let literal = get_arrow_datum(literal)?;
1495
1496 Ok(Box::new(move |batch| {
1497 let left = project_column(&batch, idx)?;
1498 let literal = try_cast_literal(&literal, left.data_type())?;
1499 gt_eq(&left, literal.as_ref())
1500 }))
1501 } else {
1502 self.build_always_false()
1504 }
1505 }
1506
1507 fn eq(
1508 &mut self,
1509 reference: &BoundReference,
1510 literal: &Datum,
1511 _predicate: &BoundPredicate,
1512 ) -> Result<Box<PredicateResult>> {
1513 if let Some(idx) = self.bound_reference(reference)? {
1514 let literal = get_arrow_datum(literal)?;
1515
1516 Ok(Box::new(move |batch| {
1517 let left = project_column(&batch, idx)?;
1518 let literal = try_cast_literal(&literal, left.data_type())?;
1519 eq(&left, literal.as_ref())
1520 }))
1521 } else {
1522 self.build_always_false()
1524 }
1525 }
1526
1527 fn not_eq(
1528 &mut self,
1529 reference: &BoundReference,
1530 literal: &Datum,
1531 _predicate: &BoundPredicate,
1532 ) -> Result<Box<PredicateResult>> {
1533 if let Some(idx) = self.bound_reference(reference)? {
1534 let literal = get_arrow_datum(literal)?;
1535
1536 Ok(Box::new(move |batch| {
1537 let left = project_column(&batch, idx)?;
1538 let literal = try_cast_literal(&literal, left.data_type())?;
1539 neq(&left, literal.as_ref())
1540 }))
1541 } else {
1542 self.build_always_false()
1544 }
1545 }
1546
1547 fn starts_with(
1548 &mut self,
1549 reference: &BoundReference,
1550 literal: &Datum,
1551 _predicate: &BoundPredicate,
1552 ) -> Result<Box<PredicateResult>> {
1553 if let Some(idx) = self.bound_reference(reference)? {
1554 let literal = get_arrow_datum(literal)?;
1555
1556 Ok(Box::new(move |batch| {
1557 let left = project_column(&batch, idx)?;
1558 let literal = try_cast_literal(&literal, left.data_type())?;
1559 starts_with(&left, literal.as_ref())
1560 }))
1561 } else {
1562 self.build_always_false()
1564 }
1565 }
1566
1567 fn not_starts_with(
1568 &mut self,
1569 reference: &BoundReference,
1570 literal: &Datum,
1571 _predicate: &BoundPredicate,
1572 ) -> Result<Box<PredicateResult>> {
1573 if let Some(idx) = self.bound_reference(reference)? {
1574 let literal = get_arrow_datum(literal)?;
1575
1576 Ok(Box::new(move |batch| {
1577 let left = project_column(&batch, idx)?;
1578 let literal = try_cast_literal(&literal, left.data_type())?;
1579 not(&starts_with(&left, literal.as_ref())?)
1581 }))
1582 } else {
1583 self.build_always_true()
1585 }
1586 }
1587
1588 fn r#in(
1589 &mut self,
1590 reference: &BoundReference,
1591 literals: &FnvHashSet<Datum>,
1592 _predicate: &BoundPredicate,
1593 ) -> Result<Box<PredicateResult>> {
1594 if let Some(idx) = self.bound_reference(reference)? {
1595 let literals: Vec<_> = literals
1596 .iter()
1597 .map(|lit| get_arrow_datum(lit).unwrap())
1598 .collect();
1599
1600 Ok(Box::new(move |batch| {
1601 let left = project_column(&batch, idx)?;
1603
1604 let mut acc = BooleanArray::from(vec![false; batch.num_rows()]);
1605 for literal in &literals {
1606 let literal = try_cast_literal(literal, left.data_type())?;
1607 acc = or(&acc, &eq(&left, literal.as_ref())?)?
1608 }
1609
1610 Ok(acc)
1611 }))
1612 } else {
1613 self.build_always_false()
1615 }
1616 }
1617
1618 fn not_in(
1619 &mut self,
1620 reference: &BoundReference,
1621 literals: &FnvHashSet<Datum>,
1622 _predicate: &BoundPredicate,
1623 ) -> Result<Box<PredicateResult>> {
1624 if let Some(idx) = self.bound_reference(reference)? {
1625 let literals: Vec<_> = literals
1626 .iter()
1627 .map(|lit| get_arrow_datum(lit).unwrap())
1628 .collect();
1629
1630 Ok(Box::new(move |batch| {
1631 let left = project_column(&batch, idx)?;
1633 let mut acc = BooleanArray::from(vec![true; batch.num_rows()]);
1634 for literal in &literals {
1635 let literal = try_cast_literal(literal, left.data_type())?;
1636 acc = and(&acc, &neq(&left, literal.as_ref())?)?
1637 }
1638
1639 Ok(acc)
1640 }))
1641 } else {
1642 self.build_always_true()
1644 }
1645 }
1646}
1647
1648pub struct ArrowFileReader<R: FileRead> {
1650 meta: FileMetadata,
1651 preload_column_index: bool,
1652 preload_offset_index: bool,
1653 preload_page_index: bool,
1654 metadata_size_hint: Option<usize>,
1655 r: R,
1656}
1657
1658impl<R: FileRead> ArrowFileReader<R> {
1659 pub fn new(meta: FileMetadata, r: R) -> Self {
1661 Self {
1662 meta,
1663 preload_column_index: false,
1664 preload_offset_index: false,
1665 preload_page_index: false,
1666 metadata_size_hint: None,
1667 r,
1668 }
1669 }
1670
1671 pub fn with_preload_column_index(mut self, preload: bool) -> Self {
1673 self.preload_column_index = preload;
1674 self
1675 }
1676
1677 pub fn with_preload_offset_index(mut self, preload: bool) -> Self {
1679 self.preload_offset_index = preload;
1680 self
1681 }
1682
1683 pub fn with_preload_page_index(mut self, preload: bool) -> Self {
1685 self.preload_page_index = preload;
1686 self
1687 }
1688
1689 pub fn with_metadata_size_hint(mut self, hint: usize) -> Self {
1694 self.metadata_size_hint = Some(hint);
1695 self
1696 }
1697}
1698
1699impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
1700 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
1701 Box::pin(
1702 self.r
1703 .read(range.start..range.end)
1704 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
1705 )
1706 }
1707
1708 fn get_metadata(
1711 &mut self,
1712 _options: Option<&'_ ArrowReaderOptions>,
1713 ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
1714 async move {
1715 let reader = ParquetMetaDataReader::new()
1716 .with_prefetch_hint(self.metadata_size_hint)
1717 .with_page_index_policy(PageIndexPolicy::from(self.preload_page_index))
1719 .with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
1720 .with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index));
1721 let size = self.meta.size;
1722 let meta = reader.load_and_finish(self, size).await?;
1723
1724 Ok(Arc::new(meta))
1725 }
1726 .boxed()
1727 }
1728}
1729
1730fn try_cast_literal(
1737 literal: &Arc<dyn ArrowDatum + Send + Sync>,
1738 column_type: &DataType,
1739) -> std::result::Result<Arc<dyn ArrowDatum + Send + Sync>, ArrowError> {
1740 let literal_array = literal.get().0;
1741
1742 if literal_array.data_type() == column_type {
1744 return Ok(Arc::clone(literal));
1745 }
1746
1747 let literal_array = cast(literal_array, column_type)?;
1748 Ok(Arc::new(Scalar::new(literal_array)))
1749}
1750
1751#[cfg(test)]
1752mod tests {
1753 use std::collections::{HashMap, HashSet};
1754 use std::fs::File;
1755 use std::sync::Arc;
1756
1757 use arrow_array::cast::AsArray;
1758 use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray};
1759 use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1760 use futures::TryStreamExt;
1761 use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
1762 use parquet::arrow::{ArrowWriter, ProjectionMask};
1763 use parquet::basic::Compression;
1764 use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1765 use parquet::file::properties::WriterProperties;
1766 use parquet::schema::parser::parse_message_type;
1767 use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
1768 use roaring::RoaringTreemap;
1769 use tempfile::TempDir;
1770
1771 use crate::ErrorKind;
1772 use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
1773 use crate::arrow::{ArrowReader, ArrowReaderBuilder};
1774 use crate::delete_vector::DeleteVector;
1775 use crate::expr::visitors::bound_predicate_visitor::visit;
1776 use crate::expr::{Bind, Predicate, Reference};
1777 use crate::io::FileIO;
1778 use crate::scan::{FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream};
1779 use crate::spec::{
1780 DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type,
1781 };
1782
1783 fn table_schema_simple() -> SchemaRef {
1784 Arc::new(
1785 Schema::builder()
1786 .with_schema_id(1)
1787 .with_identifier_field_ids(vec![2])
1788 .with_fields(vec![
1789 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1790 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1791 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1792 NestedField::optional(4, "qux", Type::Primitive(PrimitiveType::Float)).into(),
1793 ])
1794 .build()
1795 .unwrap(),
1796 )
1797 }
1798
1799 #[test]
1800 fn test_collect_field_id() {
1801 let schema = table_schema_simple();
1802 let expr = Reference::new("qux").is_null();
1803 let bound_expr = expr.bind(schema, true).unwrap();
1804
1805 let mut visitor = CollectFieldIdVisitor {
1806 field_ids: HashSet::default(),
1807 };
1808 visit(&mut visitor, &bound_expr).unwrap();
1809
1810 let mut expected = HashSet::default();
1811 expected.insert(4_i32);
1812
1813 assert_eq!(visitor.field_ids, expected);
1814 }
1815
1816 #[test]
1817 fn test_collect_field_id_with_and() {
1818 let schema = table_schema_simple();
1819 let expr = Reference::new("qux")
1820 .is_null()
1821 .and(Reference::new("baz").is_null());
1822 let bound_expr = expr.bind(schema, true).unwrap();
1823
1824 let mut visitor = CollectFieldIdVisitor {
1825 field_ids: HashSet::default(),
1826 };
1827 visit(&mut visitor, &bound_expr).unwrap();
1828
1829 let mut expected = HashSet::default();
1830 expected.insert(4_i32);
1831 expected.insert(3);
1832
1833 assert_eq!(visitor.field_ids, expected);
1834 }
1835
1836 #[test]
1837 fn test_collect_field_id_with_or() {
1838 let schema = table_schema_simple();
1839 let expr = Reference::new("qux")
1840 .is_null()
1841 .or(Reference::new("baz").is_null());
1842 let bound_expr = expr.bind(schema, true).unwrap();
1843
1844 let mut visitor = CollectFieldIdVisitor {
1845 field_ids: HashSet::default(),
1846 };
1847 visit(&mut visitor, &bound_expr).unwrap();
1848
1849 let mut expected = HashSet::default();
1850 expected.insert(4_i32);
1851 expected.insert(3);
1852
1853 assert_eq!(visitor.field_ids, expected);
1854 }
1855
1856 #[test]
1857 fn test_arrow_projection_mask() {
1858 let schema = Arc::new(
1859 Schema::builder()
1860 .with_schema_id(1)
1861 .with_identifier_field_ids(vec![1])
1862 .with_fields(vec![
1863 NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
1864 NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
1865 NestedField::optional(
1866 3,
1867 "c3",
1868 Type::Primitive(PrimitiveType::Decimal {
1869 precision: 38,
1870 scale: 3,
1871 }),
1872 )
1873 .into(),
1874 ])
1875 .build()
1876 .unwrap(),
1877 );
1878 let arrow_schema = Arc::new(ArrowSchema::new(vec![
1879 Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
1880 PARQUET_FIELD_ID_META_KEY.to_string(),
1881 "1".to_string(),
1882 )])),
1883 Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
1885 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
1886 ),
1887 Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
1889 PARQUET_FIELD_ID_META_KEY.to_string(),
1890 "3".to_string(),
1891 )])),
1892 ]));
1893
1894 let message_type = "
1895message schema {
1896 required binary c1 (STRING) = 1;
1897 optional int32 c2 (INTEGER(8,true)) = 2;
1898 optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
1899}
1900 ";
1901 let parquet_type = parse_message_type(message_type).expect("should parse schema");
1902 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
1903
1904 let err = ArrowReader::get_arrow_projection_mask(
1906 &[1, 2, 3],
1907 &schema,
1908 &parquet_schema,
1909 &arrow_schema,
1910 false,
1911 )
1912 .unwrap_err();
1913
1914 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1915 assert_eq!(
1916 err.to_string(),
1917 "DataInvalid => Unsupported Arrow data type: Duration(µs)".to_string()
1918 );
1919
1920 let err = ArrowReader::get_arrow_projection_mask(
1922 &[1, 3],
1923 &schema,
1924 &parquet_schema,
1925 &arrow_schema,
1926 false,
1927 )
1928 .unwrap_err();
1929
1930 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1931 assert_eq!(
1932 err.to_string(),
1933 "DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
1934 );
1935
1936 let mask = ArrowReader::get_arrow_projection_mask(
1938 &[1],
1939 &schema,
1940 &parquet_schema,
1941 &arrow_schema,
1942 false,
1943 )
1944 .expect("Some ProjectionMask");
1945 assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
1946 }
1947
1948 #[tokio::test]
1949 async fn test_kleene_logic_or_behaviour() {
1950 let predicate = Reference::new("a")
1952 .is_null()
1953 .or(Reference::new("a").equal_to(Datum::string("foo")));
1954
1955 let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
1957
1958 let expected = vec![None, Some("foo".to_string())];
1960
1961 let (file_io, schema, table_location, _temp_dir) =
1962 setup_kleene_logic(data_for_col_a, DataType::Utf8);
1963 let reader = ArrowReaderBuilder::new(file_io).build();
1964
1965 let result_data = test_perform_read(predicate, schema, table_location, reader).await;
1966
1967 assert_eq!(result_data, expected);
1968 }
1969
1970 #[tokio::test]
1971 async fn test_kleene_logic_and_behaviour() {
1972 let predicate = Reference::new("a")
1974 .is_not_null()
1975 .and(Reference::new("a").not_equal_to(Datum::string("foo")));
1976
1977 let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
1979
1980 let expected = vec![Some("bar".to_string())];
1982
1983 let (file_io, schema, table_location, _temp_dir) =
1984 setup_kleene_logic(data_for_col_a, DataType::Utf8);
1985 let reader = ArrowReaderBuilder::new(file_io).build();
1986
1987 let result_data = test_perform_read(predicate, schema, table_location, reader).await;
1988
1989 assert_eq!(result_data, expected);
1990 }
1991
1992 #[tokio::test]
1993 async fn test_predicate_cast_literal() {
1994 let predicates = vec![
1995 (Reference::new("a").equal_to(Datum::string("foo")), vec![
1997 Some("foo".to_string()),
1998 ]),
1999 (
2001 Reference::new("a").not_equal_to(Datum::string("foo")),
2002 vec![Some("bar".to_string())],
2003 ),
2004 (Reference::new("a").starts_with(Datum::string("f")), vec![
2006 Some("foo".to_string()),
2007 ]),
2008 (
2010 Reference::new("a").not_starts_with(Datum::string("f")),
2011 vec![Some("bar".to_string())],
2012 ),
2013 (Reference::new("a").less_than(Datum::string("foo")), vec![
2015 Some("bar".to_string()),
2016 ]),
2017 (
2019 Reference::new("a").less_than_or_equal_to(Datum::string("foo")),
2020 vec![Some("foo".to_string()), Some("bar".to_string())],
2021 ),
2022 (
2024 Reference::new("a").greater_than(Datum::string("bar")),
2025 vec![Some("foo".to_string())],
2026 ),
2027 (
2029 Reference::new("a").greater_than_or_equal_to(Datum::string("foo")),
2030 vec![Some("foo".to_string())],
2031 ),
2032 (
2034 Reference::new("a").is_in([Datum::string("foo"), Datum::string("baz")]),
2035 vec![Some("foo".to_string())],
2036 ),
2037 (
2039 Reference::new("a").is_not_in([Datum::string("foo"), Datum::string("baz")]),
2040 vec![Some("bar".to_string())],
2041 ),
2042 ];
2043
2044 let data_for_col_a = vec![Some("foo".to_string()), Some("bar".to_string())];
2046
2047 let (file_io, schema, table_location, _temp_dir) =
2048 setup_kleene_logic(data_for_col_a, DataType::LargeUtf8);
2049 let reader = ArrowReaderBuilder::new(file_io).build();
2050
2051 for (predicate, expected) in predicates {
2052 println!("testing predicate {predicate}");
2053 let result_data = test_perform_read(
2054 predicate.clone(),
2055 schema.clone(),
2056 table_location.clone(),
2057 reader.clone(),
2058 )
2059 .await;
2060
2061 assert_eq!(result_data, expected, "predicate={predicate}");
2062 }
2063 }
2064
2065 async fn test_perform_read(
2066 predicate: Predicate,
2067 schema: SchemaRef,
2068 table_location: String,
2069 reader: ArrowReader,
2070 ) -> Vec<Option<String>> {
2071 let tasks = Box::pin(futures::stream::iter(
2072 vec![Ok(FileScanTask {
2073 start: 0,
2074 length: 0,
2075 record_count: None,
2076 data_file_path: format!("{table_location}/1.parquet"),
2077 data_file_format: DataFileFormat::Parquet,
2078 schema: schema.clone(),
2079 project_field_ids: vec![1],
2080 predicate: Some(predicate.bind(schema, true).unwrap()),
2081 deletes: vec![],
2082 partition: None,
2083 partition_spec: None,
2084 name_mapping: None,
2085 case_sensitive: false,
2086 })]
2087 .into_iter(),
2088 )) as FileScanTaskStream;
2089
2090 let result = reader
2091 .read(tasks)
2092 .unwrap()
2093 .try_collect::<Vec<RecordBatch>>()
2094 .await
2095 .unwrap();
2096
2097 result[0].columns()[0]
2098 .as_string_opt::<i32>()
2099 .unwrap()
2100 .iter()
2101 .map(|v| v.map(ToOwned::to_owned))
2102 .collect::<Vec<_>>()
2103 }
2104
2105 fn setup_kleene_logic(
2106 data_for_col_a: Vec<Option<String>>,
2107 col_a_type: DataType,
2108 ) -> (FileIO, SchemaRef, String, TempDir) {
2109 let schema = Arc::new(
2110 Schema::builder()
2111 .with_schema_id(1)
2112 .with_fields(vec![
2113 NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(),
2114 ])
2115 .build()
2116 .unwrap(),
2117 );
2118
2119 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2120 Field::new("a", col_a_type.clone(), true).with_metadata(HashMap::from([(
2121 PARQUET_FIELD_ID_META_KEY.to_string(),
2122 "1".to_string(),
2123 )])),
2124 ]));
2125
2126 let tmp_dir = TempDir::new().unwrap();
2127 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2128
2129 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2130
2131 let col = match col_a_type {
2132 DataType::Utf8 => Arc::new(StringArray::from(data_for_col_a)) as ArrayRef,
2133 DataType::LargeUtf8 => Arc::new(LargeStringArray::from(data_for_col_a)) as ArrayRef,
2134 _ => panic!("unexpected col_a_type"),
2135 };
2136
2137 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col]).unwrap();
2138
2139 let props = WriterProperties::builder()
2141 .set_compression(Compression::SNAPPY)
2142 .build();
2143
2144 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
2145 let mut writer =
2146 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
2147
2148 writer.write(&to_write).expect("Writing batch");
2149
2150 writer.close().unwrap();
2152
2153 (file_io, schema, table_location, tmp_dir)
2154 }
2155
2156 #[test]
2157 fn test_build_deletes_row_selection() {
2158 let schema_descr = get_test_schema_descr();
2159
2160 let mut columns = vec![];
2161 for ptr in schema_descr.columns() {
2162 let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
2163 columns.push(column);
2164 }
2165
2166 let row_groups_metadata = vec![
2167 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 0),
2168 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 1),
2169 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 2),
2170 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 3),
2171 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 4),
2172 ];
2173
2174 let selected_row_groups = Some(vec![1, 3]);
2175
2176 let positional_deletes = RoaringTreemap::from_iter(&[
2183 1, 3, 4, 5, 998, 999, 1000, 1010, 1011, 1012, 1498, 1499, 1500, 1501, 1600, 1999, 2000, 2001, 2100, 2200, 2201, 2202, 2999, 3000, ]);
2201
2202 let positional_deletes = DeleteVector::new(positional_deletes);
2203
2204 let result = ArrowReader::build_deletes_row_selection(
2206 &row_groups_metadata,
2207 &selected_row_groups,
2208 &positional_deletes,
2209 )
2210 .unwrap();
2211
2212 let expected = RowSelection::from(vec![
2213 RowSelector::skip(1),
2214 RowSelector::select(9),
2215 RowSelector::skip(3),
2216 RowSelector::select(485),
2217 RowSelector::skip(4),
2218 RowSelector::select(98),
2219 RowSelector::skip(1),
2220 RowSelector::select(99),
2221 RowSelector::skip(3),
2222 RowSelector::select(796),
2223 RowSelector::skip(1),
2224 ]);
2225
2226 assert_eq!(result, expected);
2227
2228 let result = ArrowReader::build_deletes_row_selection(
2230 &row_groups_metadata,
2231 &None,
2232 &positional_deletes,
2233 )
2234 .unwrap();
2235
2236 let expected = RowSelection::from(vec![
2237 RowSelector::select(1),
2238 RowSelector::skip(1),
2239 RowSelector::select(1),
2240 RowSelector::skip(3),
2241 RowSelector::select(992),
2242 RowSelector::skip(3),
2243 RowSelector::select(9),
2244 RowSelector::skip(3),
2245 RowSelector::select(485),
2246 RowSelector::skip(4),
2247 RowSelector::select(98),
2248 RowSelector::skip(1),
2249 RowSelector::select(398),
2250 RowSelector::skip(3),
2251 RowSelector::select(98),
2252 RowSelector::skip(1),
2253 RowSelector::select(99),
2254 RowSelector::skip(3),
2255 RowSelector::select(796),
2256 RowSelector::skip(2),
2257 RowSelector::select(499),
2258 ]);
2259
2260 assert_eq!(result, expected);
2261 }
2262
2263 fn build_test_row_group_meta(
2264 schema_descr: SchemaDescPtr,
2265 columns: Vec<ColumnChunkMetaData>,
2266 num_rows: i64,
2267 ordinal: i16,
2268 ) -> RowGroupMetaData {
2269 RowGroupMetaData::builder(schema_descr.clone())
2270 .set_num_rows(num_rows)
2271 .set_total_byte_size(2000)
2272 .set_column_metadata(columns)
2273 .set_ordinal(ordinal)
2274 .build()
2275 .unwrap()
2276 }
2277
2278 fn get_test_schema_descr() -> SchemaDescPtr {
2279 use parquet::schema::types::Type as SchemaType;
2280
2281 let schema = SchemaType::group_type_builder("schema")
2282 .with_fields(vec![
2283 Arc::new(
2284 SchemaType::primitive_type_builder("a", parquet::basic::Type::INT32)
2285 .build()
2286 .unwrap(),
2287 ),
2288 Arc::new(
2289 SchemaType::primitive_type_builder("b", parquet::basic::Type::INT32)
2290 .build()
2291 .unwrap(),
2292 ),
2293 ])
2294 .build()
2295 .unwrap();
2296
2297 Arc::new(SchemaDescriptor::new(Arc::new(schema)))
2298 }
2299
2300 #[tokio::test]
2302 async fn test_file_splits_respect_byte_ranges() {
2303 use arrow_array::Int32Array;
2304 use parquet::file::reader::{FileReader, SerializedFileReader};
2305
2306 let schema = Arc::new(
2307 Schema::builder()
2308 .with_schema_id(1)
2309 .with_fields(vec![
2310 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2311 ])
2312 .build()
2313 .unwrap(),
2314 );
2315
2316 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2317 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2318 PARQUET_FIELD_ID_META_KEY.to_string(),
2319 "1".to_string(),
2320 )])),
2321 ]));
2322
2323 let tmp_dir = TempDir::new().unwrap();
2324 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2325 let file_path = format!("{table_location}/multi_row_group.parquet");
2326
2327 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2329 (0..100).collect::<Vec<i32>>(),
2330 ))])
2331 .unwrap();
2332 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2333 (100..200).collect::<Vec<i32>>(),
2334 ))])
2335 .unwrap();
2336 let batch3 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2337 (200..300).collect::<Vec<i32>>(),
2338 ))])
2339 .unwrap();
2340
2341 let props = WriterProperties::builder()
2342 .set_compression(Compression::SNAPPY)
2343 .set_max_row_group_size(100)
2344 .build();
2345
2346 let file = File::create(&file_path).unwrap();
2347 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2348 writer.write(&batch1).expect("Writing batch 1");
2349 writer.write(&batch2).expect("Writing batch 2");
2350 writer.write(&batch3).expect("Writing batch 3");
2351 writer.close().unwrap();
2352
2353 let file = File::open(&file_path).unwrap();
2355 let reader = SerializedFileReader::new(file).unwrap();
2356 let metadata = reader.metadata();
2357
2358 println!("File has {} row groups", metadata.num_row_groups());
2359 assert_eq!(metadata.num_row_groups(), 3, "Expected 3 row groups");
2360
2361 let row_group_0 = metadata.row_group(0);
2363 let row_group_1 = metadata.row_group(1);
2364 let row_group_2 = metadata.row_group(2);
2365
2366 let rg0_start = 4u64; let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2368 let rg2_start = rg1_start + row_group_1.compressed_size() as u64;
2369 let file_end = rg2_start + row_group_2.compressed_size() as u64;
2370
2371 println!(
2372 "Row group 0: {} rows, starts at byte {}, {} bytes compressed",
2373 row_group_0.num_rows(),
2374 rg0_start,
2375 row_group_0.compressed_size()
2376 );
2377 println!(
2378 "Row group 1: {} rows, starts at byte {}, {} bytes compressed",
2379 row_group_1.num_rows(),
2380 rg1_start,
2381 row_group_1.compressed_size()
2382 );
2383 println!(
2384 "Row group 2: {} rows, starts at byte {}, {} bytes compressed",
2385 row_group_2.num_rows(),
2386 rg2_start,
2387 row_group_2.compressed_size()
2388 );
2389
2390 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2391 let reader = ArrowReaderBuilder::new(file_io).build();
2392
2393 let task1 = FileScanTask {
2395 start: rg0_start,
2396 length: row_group_0.compressed_size() as u64,
2397 record_count: Some(100),
2398 data_file_path: file_path.clone(),
2399 data_file_format: DataFileFormat::Parquet,
2400 schema: schema.clone(),
2401 project_field_ids: vec![1],
2402 predicate: None,
2403 deletes: vec![],
2404 partition: None,
2405 partition_spec: None,
2406 name_mapping: None,
2407 case_sensitive: false,
2408 };
2409
2410 let task2 = FileScanTask {
2412 start: rg1_start,
2413 length: file_end - rg1_start,
2414 record_count: Some(200),
2415 data_file_path: file_path.clone(),
2416 data_file_format: DataFileFormat::Parquet,
2417 schema: schema.clone(),
2418 project_field_ids: vec![1],
2419 predicate: None,
2420 deletes: vec![],
2421 partition: None,
2422 partition_spec: None,
2423 name_mapping: None,
2424 case_sensitive: false,
2425 };
2426
2427 let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
2428 let result1 = reader
2429 .clone()
2430 .read(tasks1)
2431 .unwrap()
2432 .try_collect::<Vec<RecordBatch>>()
2433 .await
2434 .unwrap();
2435
2436 let total_rows_task1: usize = result1.iter().map(|b| b.num_rows()).sum();
2437 println!(
2438 "Task 1 (bytes {}-{}) returned {} rows",
2439 rg0_start,
2440 rg0_start + row_group_0.compressed_size() as u64,
2441 total_rows_task1
2442 );
2443
2444 let tasks2 = Box::pin(futures::stream::iter(vec![Ok(task2)])) as FileScanTaskStream;
2445 let result2 = reader
2446 .read(tasks2)
2447 .unwrap()
2448 .try_collect::<Vec<RecordBatch>>()
2449 .await
2450 .unwrap();
2451
2452 let total_rows_task2: usize = result2.iter().map(|b| b.num_rows()).sum();
2453 println!("Task 2 (bytes {rg1_start}-{file_end}) returned {total_rows_task2} rows");
2454
2455 assert_eq!(
2456 total_rows_task1, 100,
2457 "Task 1 should read only the first row group (100 rows), but got {total_rows_task1} rows"
2458 );
2459
2460 assert_eq!(
2461 total_rows_task2, 200,
2462 "Task 2 should read only the second+third row groups (200 rows), but got {total_rows_task2} rows"
2463 );
2464
2465 if total_rows_task1 > 0 {
2467 let first_batch = &result1[0];
2468 let id_col = first_batch
2469 .column(0)
2470 .as_primitive::<arrow_array::types::Int32Type>();
2471 let first_val = id_col.value(0);
2472 let last_val = id_col.value(id_col.len() - 1);
2473 println!("Task 1 data range: {first_val} to {last_val}");
2474
2475 assert_eq!(first_val, 0, "Task 1 should start with id=0");
2476 assert_eq!(last_val, 99, "Task 1 should end with id=99");
2477 }
2478
2479 if total_rows_task2 > 0 {
2480 let first_batch = &result2[0];
2481 let id_col = first_batch
2482 .column(0)
2483 .as_primitive::<arrow_array::types::Int32Type>();
2484 let first_val = id_col.value(0);
2485 println!("Task 2 first value: {first_val}");
2486
2487 assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0");
2488 }
2489 }
2490
2491 #[tokio::test]
2497 async fn test_schema_evolution_add_column() {
2498 use arrow_array::{Array, Int32Array};
2499
2500 let new_schema = Arc::new(
2502 Schema::builder()
2503 .with_schema_id(2)
2504 .with_fields(vec![
2505 NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)).into(),
2506 NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
2507 ])
2508 .build()
2509 .unwrap(),
2510 );
2511
2512 let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
2514 Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([(
2515 PARQUET_FIELD_ID_META_KEY.to_string(),
2516 "1".to_string(),
2517 )])),
2518 ]));
2519
2520 let tmp_dir = TempDir::new().unwrap();
2522 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2523 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2524
2525 let data_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
2526 let to_write = RecordBatch::try_new(arrow_schema_old.clone(), vec![data_a]).unwrap();
2527
2528 let props = WriterProperties::builder()
2529 .set_compression(Compression::SNAPPY)
2530 .build();
2531 let file = File::create(format!("{table_location}/old_file.parquet")).unwrap();
2532 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
2533 writer.write(&to_write).expect("Writing batch");
2534 writer.close().unwrap();
2535
2536 let reader = ArrowReaderBuilder::new(file_io).build();
2538 let tasks = Box::pin(futures::stream::iter(
2539 vec![Ok(FileScanTask {
2540 start: 0,
2541 length: 0,
2542 record_count: None,
2543 data_file_path: format!("{table_location}/old_file.parquet"),
2544 data_file_format: DataFileFormat::Parquet,
2545 schema: new_schema.clone(),
2546 project_field_ids: vec![1, 2], predicate: None,
2548 deletes: vec![],
2549 partition: None,
2550 partition_spec: None,
2551 name_mapping: None,
2552 case_sensitive: false,
2553 })]
2554 .into_iter(),
2555 )) as FileScanTaskStream;
2556
2557 let result = reader
2558 .read(tasks)
2559 .unwrap()
2560 .try_collect::<Vec<RecordBatch>>()
2561 .await
2562 .unwrap();
2563
2564 assert_eq!(result.len(), 1);
2566 let batch = &result[0];
2567
2568 assert_eq!(batch.num_columns(), 2);
2570 assert_eq!(batch.num_rows(), 3);
2571
2572 let col_a = batch
2574 .column(0)
2575 .as_primitive::<arrow_array::types::Int32Type>();
2576 assert_eq!(col_a.values(), &[1, 2, 3]);
2577
2578 let col_b = batch
2580 .column(1)
2581 .as_primitive::<arrow_array::types::Int32Type>();
2582 assert_eq!(col_b.null_count(), 3);
2583 assert!(col_b.is_null(0));
2584 assert!(col_b.is_null(1));
2585 assert!(col_b.is_null(2));
2586 }
2587
2588 #[tokio::test]
2606 async fn test_position_delete_across_multiple_row_groups() {
2607 use arrow_array::{Int32Array, Int64Array};
2608 use parquet::file::reader::{FileReader, SerializedFileReader};
2609
2610 const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2612 const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2613
2614 let tmp_dir = TempDir::new().unwrap();
2615 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2616
2617 let table_schema = Arc::new(
2619 Schema::builder()
2620 .with_schema_id(1)
2621 .with_fields(vec![
2622 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2623 ])
2624 .build()
2625 .unwrap(),
2626 );
2627
2628 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2629 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2630 PARQUET_FIELD_ID_META_KEY.to_string(),
2631 "1".to_string(),
2632 )])),
2633 ]));
2634
2635 let data_file_path = format!("{table_location}/data.parquet");
2639
2640 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2641 Int32Array::from_iter_values(1..=100),
2642 )])
2643 .unwrap();
2644
2645 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2646 Int32Array::from_iter_values(101..=200),
2647 )])
2648 .unwrap();
2649
2650 let props = WriterProperties::builder()
2652 .set_compression(Compression::SNAPPY)
2653 .set_max_row_group_size(100)
2654 .build();
2655
2656 let file = File::create(&data_file_path).unwrap();
2657 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2658 writer.write(&batch1).expect("Writing batch 1");
2659 writer.write(&batch2).expect("Writing batch 2");
2660 writer.close().unwrap();
2661
2662 let verify_file = File::open(&data_file_path).unwrap();
2664 let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2665 assert_eq!(
2666 verify_reader.metadata().num_row_groups(),
2667 2,
2668 "Should have 2 row groups"
2669 );
2670
2671 let delete_file_path = format!("{table_location}/deletes.parquet");
2673
2674 let delete_schema = Arc::new(ArrowSchema::new(vec![
2675 Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2676 PARQUET_FIELD_ID_META_KEY.to_string(),
2677 FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2678 )])),
2679 Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2680 PARQUET_FIELD_ID_META_KEY.to_string(),
2681 FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2682 )])),
2683 ]));
2684
2685 let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2687 Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2688 Arc::new(Int64Array::from_iter_values(vec![199i64])),
2689 ])
2690 .unwrap();
2691
2692 let delete_props = WriterProperties::builder()
2693 .set_compression(Compression::SNAPPY)
2694 .build();
2695
2696 let delete_file = File::create(&delete_file_path).unwrap();
2697 let mut delete_writer =
2698 ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2699 delete_writer.write(&delete_batch).unwrap();
2700 delete_writer.close().unwrap();
2701
2702 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2704 let reader = ArrowReaderBuilder::new(file_io).build();
2705
2706 let task = FileScanTask {
2707 start: 0,
2708 length: 0,
2709 record_count: Some(200),
2710 data_file_path: data_file_path.clone(),
2711 data_file_format: DataFileFormat::Parquet,
2712 schema: table_schema.clone(),
2713 project_field_ids: vec![1],
2714 predicate: None,
2715 deletes: vec![FileScanTaskDeleteFile {
2716 file_path: delete_file_path,
2717 file_type: DataContentType::PositionDeletes,
2718 partition_spec_id: 0,
2719 equality_ids: None,
2720 }],
2721 partition: None,
2722 partition_spec: None,
2723 name_mapping: None,
2724 case_sensitive: false,
2725 };
2726
2727 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
2728 let result = reader
2729 .read(tasks)
2730 .unwrap()
2731 .try_collect::<Vec<RecordBatch>>()
2732 .await
2733 .unwrap();
2734
2735 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
2737
2738 println!("Total rows read: {total_rows}");
2739 println!("Expected: 199 rows (deleted row 199 which had id=200)");
2740
2741 assert_eq!(
2743 total_rows, 199,
2744 "Expected 199 rows after deleting row 199, but got {total_rows} rows. \
2745 The bug causes position deletes in later row groups to be ignored."
2746 );
2747
2748 let all_ids: Vec<i32> = result
2750 .iter()
2751 .flat_map(|batch| {
2752 batch
2753 .column(0)
2754 .as_primitive::<arrow_array::types::Int32Type>()
2755 .values()
2756 .iter()
2757 .copied()
2758 })
2759 .collect();
2760
2761 assert!(
2762 !all_ids.contains(&200),
2763 "Row with id=200 should be deleted but was found in results"
2764 );
2765
2766 let expected_ids: Vec<i32> = (1..=199).collect();
2768 assert_eq!(
2769 all_ids, expected_ids,
2770 "Should have ids 1-199 but got different values"
2771 );
2772 }
2773
2774 #[tokio::test]
2800 async fn test_position_delete_with_row_group_selection() {
2801 use arrow_array::{Int32Array, Int64Array};
2802 use parquet::file::reader::{FileReader, SerializedFileReader};
2803
2804 const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2806 const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2807
2808 let tmp_dir = TempDir::new().unwrap();
2809 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2810
2811 let table_schema = Arc::new(
2813 Schema::builder()
2814 .with_schema_id(1)
2815 .with_fields(vec![
2816 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2817 ])
2818 .build()
2819 .unwrap(),
2820 );
2821
2822 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2823 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2824 PARQUET_FIELD_ID_META_KEY.to_string(),
2825 "1".to_string(),
2826 )])),
2827 ]));
2828
2829 let data_file_path = format!("{table_location}/data.parquet");
2833
2834 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2835 Int32Array::from_iter_values(1..=100),
2836 )])
2837 .unwrap();
2838
2839 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2840 Int32Array::from_iter_values(101..=200),
2841 )])
2842 .unwrap();
2843
2844 let props = WriterProperties::builder()
2846 .set_compression(Compression::SNAPPY)
2847 .set_max_row_group_size(100)
2848 .build();
2849
2850 let file = File::create(&data_file_path).unwrap();
2851 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2852 writer.write(&batch1).expect("Writing batch 1");
2853 writer.write(&batch2).expect("Writing batch 2");
2854 writer.close().unwrap();
2855
2856 let verify_file = File::open(&data_file_path).unwrap();
2858 let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2859 assert_eq!(
2860 verify_reader.metadata().num_row_groups(),
2861 2,
2862 "Should have 2 row groups"
2863 );
2864
2865 let delete_file_path = format!("{table_location}/deletes.parquet");
2867
2868 let delete_schema = Arc::new(ArrowSchema::new(vec![
2869 Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2870 PARQUET_FIELD_ID_META_KEY.to_string(),
2871 FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2872 )])),
2873 Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2874 PARQUET_FIELD_ID_META_KEY.to_string(),
2875 FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2876 )])),
2877 ]));
2878
2879 let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2881 Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2882 Arc::new(Int64Array::from_iter_values(vec![199i64])),
2883 ])
2884 .unwrap();
2885
2886 let delete_props = WriterProperties::builder()
2887 .set_compression(Compression::SNAPPY)
2888 .build();
2889
2890 let delete_file = File::create(&delete_file_path).unwrap();
2891 let mut delete_writer =
2892 ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2893 delete_writer.write(&delete_batch).unwrap();
2894 delete_writer.close().unwrap();
2895
2896 let metadata_file = File::open(&data_file_path).unwrap();
2899 let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
2900 let metadata = metadata_reader.metadata();
2901
2902 let row_group_0 = metadata.row_group(0);
2903 let row_group_1 = metadata.row_group(1);
2904
2905 let rg0_start = 4u64; let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2907 let rg1_length = row_group_1.compressed_size() as u64;
2908
2909 println!(
2910 "Row group 0: starts at byte {}, {} bytes compressed",
2911 rg0_start,
2912 row_group_0.compressed_size()
2913 );
2914 println!(
2915 "Row group 1: starts at byte {}, {} bytes compressed",
2916 rg1_start,
2917 row_group_1.compressed_size()
2918 );
2919
2920 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2921 let reader = ArrowReaderBuilder::new(file_io).build();
2922
2923 let task = FileScanTask {
2925 start: rg1_start,
2926 length: rg1_length,
2927 record_count: Some(100), data_file_path: data_file_path.clone(),
2929 data_file_format: DataFileFormat::Parquet,
2930 schema: table_schema.clone(),
2931 project_field_ids: vec![1],
2932 predicate: None,
2933 deletes: vec![FileScanTaskDeleteFile {
2934 file_path: delete_file_path,
2935 file_type: DataContentType::PositionDeletes,
2936 partition_spec_id: 0,
2937 equality_ids: None,
2938 }],
2939 partition: None,
2940 partition_spec: None,
2941 name_mapping: None,
2942 case_sensitive: false,
2943 };
2944
2945 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
2946 let result = reader
2947 .read(tasks)
2948 .unwrap()
2949 .try_collect::<Vec<RecordBatch>>()
2950 .await
2951 .unwrap();
2952
2953 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
2956
2957 println!("Total rows read from row group 1: {total_rows}");
2958 println!("Expected: 99 rows (row group 1 has 100 rows, 1 delete at position 199)");
2959
2960 assert_eq!(
2962 total_rows, 99,
2963 "Expected 99 rows from row group 1 after deleting position 199, but got {total_rows} rows. \
2964 The bug causes position deletes to be lost when advance_to() is followed by next() \
2965 when skipping unselected row groups."
2966 );
2967
2968 let all_ids: Vec<i32> = result
2970 .iter()
2971 .flat_map(|batch| {
2972 batch
2973 .column(0)
2974 .as_primitive::<arrow_array::types::Int32Type>()
2975 .values()
2976 .iter()
2977 .copied()
2978 })
2979 .collect();
2980
2981 assert!(
2982 !all_ids.contains(&200),
2983 "Row with id=200 should be deleted but was found in results"
2984 );
2985
2986 let expected_ids: Vec<i32> = (101..=199).collect();
2988 assert_eq!(
2989 all_ids, expected_ids,
2990 "Should have ids 101-199 but got different values"
2991 );
2992 }
2993 #[tokio::test]
3022 async fn test_position_delete_in_skipped_row_group() {
3023 use arrow_array::{Int32Array, Int64Array};
3024 use parquet::file::reader::{FileReader, SerializedFileReader};
3025
3026 const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
3028 const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
3029
3030 let tmp_dir = TempDir::new().unwrap();
3031 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3032
3033 let table_schema = Arc::new(
3035 Schema::builder()
3036 .with_schema_id(1)
3037 .with_fields(vec![
3038 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3039 ])
3040 .build()
3041 .unwrap(),
3042 );
3043
3044 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3045 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3046 PARQUET_FIELD_ID_META_KEY.to_string(),
3047 "1".to_string(),
3048 )])),
3049 ]));
3050
3051 let data_file_path = format!("{table_location}/data.parquet");
3055
3056 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3057 Int32Array::from_iter_values(1..=100),
3058 )])
3059 .unwrap();
3060
3061 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3062 Int32Array::from_iter_values(101..=200),
3063 )])
3064 .unwrap();
3065
3066 let props = WriterProperties::builder()
3068 .set_compression(Compression::SNAPPY)
3069 .set_max_row_group_size(100)
3070 .build();
3071
3072 let file = File::create(&data_file_path).unwrap();
3073 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3074 writer.write(&batch1).expect("Writing batch 1");
3075 writer.write(&batch2).expect("Writing batch 2");
3076 writer.close().unwrap();
3077
3078 let verify_file = File::open(&data_file_path).unwrap();
3080 let verify_reader = SerializedFileReader::new(verify_file).unwrap();
3081 assert_eq!(
3082 verify_reader.metadata().num_row_groups(),
3083 2,
3084 "Should have 2 row groups"
3085 );
3086
3087 let delete_file_path = format!("{table_location}/deletes.parquet");
3089
3090 let delete_schema = Arc::new(ArrowSchema::new(vec![
3091 Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
3092 PARQUET_FIELD_ID_META_KEY.to_string(),
3093 FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
3094 )])),
3095 Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
3096 PARQUET_FIELD_ID_META_KEY.to_string(),
3097 FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
3098 )])),
3099 ]));
3100
3101 let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
3103 Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
3104 Arc::new(Int64Array::from_iter_values(vec![0i64])),
3105 ])
3106 .unwrap();
3107
3108 let delete_props = WriterProperties::builder()
3109 .set_compression(Compression::SNAPPY)
3110 .build();
3111
3112 let delete_file = File::create(&delete_file_path).unwrap();
3113 let mut delete_writer =
3114 ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
3115 delete_writer.write(&delete_batch).unwrap();
3116 delete_writer.close().unwrap();
3117
3118 let metadata_file = File::open(&data_file_path).unwrap();
3121 let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
3122 let metadata = metadata_reader.metadata();
3123
3124 let row_group_0 = metadata.row_group(0);
3125 let row_group_1 = metadata.row_group(1);
3126
3127 let rg0_start = 4u64; let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
3129 let rg1_length = row_group_1.compressed_size() as u64;
3130
3131 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3132 let reader = ArrowReaderBuilder::new(file_io).build();
3133
3134 let task = FileScanTask {
3136 start: rg1_start,
3137 length: rg1_length,
3138 record_count: Some(100), data_file_path: data_file_path.clone(),
3140 data_file_format: DataFileFormat::Parquet,
3141 schema: table_schema.clone(),
3142 project_field_ids: vec![1],
3143 predicate: None,
3144 deletes: vec![FileScanTaskDeleteFile {
3145 file_path: delete_file_path,
3146 file_type: DataContentType::PositionDeletes,
3147 partition_spec_id: 0,
3148 equality_ids: None,
3149 }],
3150 partition: None,
3151 partition_spec: None,
3152 name_mapping: None,
3153 case_sensitive: false,
3154 };
3155
3156 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
3157 let result = reader
3158 .read(tasks)
3159 .unwrap()
3160 .try_collect::<Vec<RecordBatch>>()
3161 .await
3162 .unwrap();
3163
3164 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
3167
3168 assert_eq!(
3169 total_rows, 100,
3170 "Expected 100 rows from row group 1 (delete at position 0 is in skipped row group 0). \
3171 If this hangs or fails, it indicates the cached delete index was not updated after advance_to()."
3172 );
3173
3174 let all_ids: Vec<i32> = result
3176 .iter()
3177 .flat_map(|batch| {
3178 batch
3179 .column(0)
3180 .as_primitive::<arrow_array::types::Int32Type>()
3181 .values()
3182 .iter()
3183 .copied()
3184 })
3185 .collect();
3186
3187 let expected_ids: Vec<i32> = (101..=200).collect();
3188 assert_eq!(
3189 all_ids, expected_ids,
3190 "Should have ids 101-200 (all of row group 1)"
3191 );
3192 }
3193
3194 #[tokio::test]
3200 async fn test_read_parquet_file_without_field_ids() {
3201 let schema = Arc::new(
3202 Schema::builder()
3203 .with_schema_id(1)
3204 .with_fields(vec![
3205 NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3206 NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3207 ])
3208 .build()
3209 .unwrap(),
3210 );
3211
3212 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3214 Field::new("name", DataType::Utf8, false),
3215 Field::new("age", DataType::Int32, false),
3216 ]));
3217
3218 let tmp_dir = TempDir::new().unwrap();
3219 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3220 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3221
3222 let name_data = vec!["Alice", "Bob", "Charlie"];
3223 let age_data = vec![30, 25, 35];
3224
3225 use arrow_array::Int32Array;
3226 let name_col = Arc::new(StringArray::from(name_data.clone())) as ArrayRef;
3227 let age_col = Arc::new(Int32Array::from(age_data.clone())) as ArrayRef;
3228
3229 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![name_col, age_col]).unwrap();
3230
3231 let props = WriterProperties::builder()
3232 .set_compression(Compression::SNAPPY)
3233 .build();
3234
3235 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3236 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3237
3238 writer.write(&to_write).expect("Writing batch");
3239 writer.close().unwrap();
3240
3241 let reader = ArrowReaderBuilder::new(file_io).build();
3242
3243 let tasks = Box::pin(futures::stream::iter(
3244 vec![Ok(FileScanTask {
3245 start: 0,
3246 length: 0,
3247 record_count: None,
3248 data_file_path: format!("{table_location}/1.parquet"),
3249 data_file_format: DataFileFormat::Parquet,
3250 schema: schema.clone(),
3251 project_field_ids: vec![1, 2],
3252 predicate: None,
3253 deletes: vec![],
3254 partition: None,
3255 partition_spec: None,
3256 name_mapping: None,
3257 case_sensitive: false,
3258 })]
3259 .into_iter(),
3260 )) as FileScanTaskStream;
3261
3262 let result = reader
3263 .read(tasks)
3264 .unwrap()
3265 .try_collect::<Vec<RecordBatch>>()
3266 .await
3267 .unwrap();
3268
3269 assert_eq!(result.len(), 1);
3270 let batch = &result[0];
3271 assert_eq!(batch.num_rows(), 3);
3272 assert_eq!(batch.num_columns(), 2);
3273
3274 let name_array = batch.column(0).as_string::<i32>();
3276 assert_eq!(name_array.value(0), "Alice");
3277 assert_eq!(name_array.value(1), "Bob");
3278 assert_eq!(name_array.value(2), "Charlie");
3279
3280 let age_array = batch
3281 .column(1)
3282 .as_primitive::<arrow_array::types::Int32Type>();
3283 assert_eq!(age_array.value(0), 30);
3284 assert_eq!(age_array.value(1), 25);
3285 assert_eq!(age_array.value(2), 35);
3286 }
3287
3288 #[tokio::test]
3292 async fn test_read_parquet_without_field_ids_partial_projection() {
3293 use arrow_array::Int32Array;
3294
3295 let schema = Arc::new(
3296 Schema::builder()
3297 .with_schema_id(1)
3298 .with_fields(vec![
3299 NestedField::required(1, "col1", Type::Primitive(PrimitiveType::String)).into(),
3300 NestedField::required(2, "col2", Type::Primitive(PrimitiveType::Int)).into(),
3301 NestedField::required(3, "col3", Type::Primitive(PrimitiveType::String)).into(),
3302 NestedField::required(4, "col4", Type::Primitive(PrimitiveType::Int)).into(),
3303 ])
3304 .build()
3305 .unwrap(),
3306 );
3307
3308 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3309 Field::new("col1", DataType::Utf8, false),
3310 Field::new("col2", DataType::Int32, false),
3311 Field::new("col3", DataType::Utf8, false),
3312 Field::new("col4", DataType::Int32, false),
3313 ]));
3314
3315 let tmp_dir = TempDir::new().unwrap();
3316 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3317 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3318
3319 let col1_data = Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef;
3320 let col2_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3321 let col3_data = Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef;
3322 let col4_data = Arc::new(Int32Array::from(vec![30, 40])) as ArrayRef;
3323
3324 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
3325 col1_data, col2_data, col3_data, col4_data,
3326 ])
3327 .unwrap();
3328
3329 let props = WriterProperties::builder()
3330 .set_compression(Compression::SNAPPY)
3331 .build();
3332
3333 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3334 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3335
3336 writer.write(&to_write).expect("Writing batch");
3337 writer.close().unwrap();
3338
3339 let reader = ArrowReaderBuilder::new(file_io).build();
3340
3341 let tasks = Box::pin(futures::stream::iter(
3342 vec![Ok(FileScanTask {
3343 start: 0,
3344 length: 0,
3345 record_count: None,
3346 data_file_path: format!("{table_location}/1.parquet"),
3347 data_file_format: DataFileFormat::Parquet,
3348 schema: schema.clone(),
3349 project_field_ids: vec![1, 3],
3350 predicate: None,
3351 deletes: vec![],
3352 partition: None,
3353 partition_spec: None,
3354 name_mapping: None,
3355 case_sensitive: false,
3356 })]
3357 .into_iter(),
3358 )) as FileScanTaskStream;
3359
3360 let result = reader
3361 .read(tasks)
3362 .unwrap()
3363 .try_collect::<Vec<RecordBatch>>()
3364 .await
3365 .unwrap();
3366
3367 assert_eq!(result.len(), 1);
3368 let batch = &result[0];
3369 assert_eq!(batch.num_rows(), 2);
3370 assert_eq!(batch.num_columns(), 2);
3371
3372 let col1_array = batch.column(0).as_string::<i32>();
3373 assert_eq!(col1_array.value(0), "a");
3374 assert_eq!(col1_array.value(1), "b");
3375
3376 let col3_array = batch.column(1).as_string::<i32>();
3377 assert_eq!(col3_array.value(0), "c");
3378 assert_eq!(col3_array.value(1), "d");
3379 }
3380
3381 #[tokio::test]
3385 async fn test_read_parquet_without_field_ids_schema_evolution() {
3386 use arrow_array::{Array, Int32Array};
3387
3388 let schema = Arc::new(
3390 Schema::builder()
3391 .with_schema_id(1)
3392 .with_fields(vec![
3393 NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3394 NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3395 NestedField::optional(3, "city", Type::Primitive(PrimitiveType::String)).into(),
3396 ])
3397 .build()
3398 .unwrap(),
3399 );
3400
3401 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3402 Field::new("name", DataType::Utf8, false),
3403 Field::new("age", DataType::Int32, false),
3404 ]));
3405
3406 let tmp_dir = TempDir::new().unwrap();
3407 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3408 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3409
3410 let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3411 let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3412
3413 let to_write =
3414 RecordBatch::try_new(arrow_schema.clone(), vec![name_data, age_data]).unwrap();
3415
3416 let props = WriterProperties::builder()
3417 .set_compression(Compression::SNAPPY)
3418 .build();
3419
3420 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3421 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3422
3423 writer.write(&to_write).expect("Writing batch");
3424 writer.close().unwrap();
3425
3426 let reader = ArrowReaderBuilder::new(file_io).build();
3427
3428 let tasks = Box::pin(futures::stream::iter(
3429 vec![Ok(FileScanTask {
3430 start: 0,
3431 length: 0,
3432 record_count: None,
3433 data_file_path: format!("{table_location}/1.parquet"),
3434 data_file_format: DataFileFormat::Parquet,
3435 schema: schema.clone(),
3436 project_field_ids: vec![1, 2, 3],
3437 predicate: None,
3438 deletes: vec![],
3439 partition: None,
3440 partition_spec: None,
3441 name_mapping: None,
3442 case_sensitive: false,
3443 })]
3444 .into_iter(),
3445 )) as FileScanTaskStream;
3446
3447 let result = reader
3448 .read(tasks)
3449 .unwrap()
3450 .try_collect::<Vec<RecordBatch>>()
3451 .await
3452 .unwrap();
3453
3454 assert_eq!(result.len(), 1);
3455 let batch = &result[0];
3456 assert_eq!(batch.num_rows(), 2);
3457 assert_eq!(batch.num_columns(), 3);
3458
3459 let name_array = batch.column(0).as_string::<i32>();
3460 assert_eq!(name_array.value(0), "Alice");
3461 assert_eq!(name_array.value(1), "Bob");
3462
3463 let age_array = batch
3464 .column(1)
3465 .as_primitive::<arrow_array::types::Int32Type>();
3466 assert_eq!(age_array.value(0), 30);
3467 assert_eq!(age_array.value(1), 25);
3468
3469 let city_array = batch.column(2).as_string::<i32>();
3471 assert_eq!(city_array.null_count(), 2);
3472 assert!(city_array.is_null(0));
3473 assert!(city_array.is_null(1));
3474 }
3475
3476 #[tokio::test]
3479 async fn test_read_parquet_without_field_ids_multiple_row_groups() {
3480 use arrow_array::Int32Array;
3481
3482 let schema = Arc::new(
3483 Schema::builder()
3484 .with_schema_id(1)
3485 .with_fields(vec![
3486 NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3487 NestedField::required(2, "value", Type::Primitive(PrimitiveType::Int)).into(),
3488 ])
3489 .build()
3490 .unwrap(),
3491 );
3492
3493 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3494 Field::new("name", DataType::Utf8, false),
3495 Field::new("value", DataType::Int32, false),
3496 ]));
3497
3498 let tmp_dir = TempDir::new().unwrap();
3499 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3500 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3501
3502 let props = WriterProperties::builder()
3504 .set_compression(Compression::SNAPPY)
3505 .set_write_batch_size(2)
3506 .set_max_row_group_size(2)
3507 .build();
3508
3509 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3510 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3511
3512 for batch_num in 0..3 {
3514 let name_data = Arc::new(StringArray::from(vec![
3515 format!("name_{}", batch_num * 2),
3516 format!("name_{}", batch_num * 2 + 1),
3517 ])) as ArrayRef;
3518 let value_data =
3519 Arc::new(Int32Array::from(vec![batch_num * 2, batch_num * 2 + 1])) as ArrayRef;
3520
3521 let batch =
3522 RecordBatch::try_new(arrow_schema.clone(), vec![name_data, value_data]).unwrap();
3523 writer.write(&batch).expect("Writing batch");
3524 }
3525 writer.close().unwrap();
3526
3527 let reader = ArrowReaderBuilder::new(file_io).build();
3528
3529 let tasks = Box::pin(futures::stream::iter(
3530 vec![Ok(FileScanTask {
3531 start: 0,
3532 length: 0,
3533 record_count: None,
3534 data_file_path: format!("{table_location}/1.parquet"),
3535 data_file_format: DataFileFormat::Parquet,
3536 schema: schema.clone(),
3537 project_field_ids: vec![1, 2],
3538 predicate: None,
3539 deletes: vec![],
3540 partition: None,
3541 partition_spec: None,
3542 name_mapping: None,
3543 case_sensitive: false,
3544 })]
3545 .into_iter(),
3546 )) as FileScanTaskStream;
3547
3548 let result = reader
3549 .read(tasks)
3550 .unwrap()
3551 .try_collect::<Vec<RecordBatch>>()
3552 .await
3553 .unwrap();
3554
3555 assert!(!result.is_empty());
3556
3557 let mut all_names = Vec::new();
3558 let mut all_values = Vec::new();
3559
3560 for batch in &result {
3561 let name_array = batch.column(0).as_string::<i32>();
3562 let value_array = batch
3563 .column(1)
3564 .as_primitive::<arrow_array::types::Int32Type>();
3565
3566 for i in 0..batch.num_rows() {
3567 all_names.push(name_array.value(i).to_string());
3568 all_values.push(value_array.value(i));
3569 }
3570 }
3571
3572 assert_eq!(all_names.len(), 6);
3573 assert_eq!(all_values.len(), 6);
3574
3575 for i in 0..6 {
3576 assert_eq!(all_names[i], format!("name_{i}"));
3577 assert_eq!(all_values[i], i as i32);
3578 }
3579 }
3580
3581 #[tokio::test]
3585 async fn test_read_parquet_without_field_ids_with_struct() {
3586 use arrow_array::{Int32Array, StructArray};
3587 use arrow_schema::Fields;
3588
3589 let schema = Arc::new(
3590 Schema::builder()
3591 .with_schema_id(1)
3592 .with_fields(vec![
3593 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3594 NestedField::required(
3595 2,
3596 "person",
3597 Type::Struct(crate::spec::StructType::new(vec![
3598 NestedField::required(
3599 3,
3600 "name",
3601 Type::Primitive(PrimitiveType::String),
3602 )
3603 .into(),
3604 NestedField::required(4, "age", Type::Primitive(PrimitiveType::Int))
3605 .into(),
3606 ])),
3607 )
3608 .into(),
3609 ])
3610 .build()
3611 .unwrap(),
3612 );
3613
3614 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3615 Field::new("id", DataType::Int32, false),
3616 Field::new(
3617 "person",
3618 DataType::Struct(Fields::from(vec![
3619 Field::new("name", DataType::Utf8, false),
3620 Field::new("age", DataType::Int32, false),
3621 ])),
3622 false,
3623 ),
3624 ]));
3625
3626 let tmp_dir = TempDir::new().unwrap();
3627 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3628 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3629
3630 let id_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3631 let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3632 let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3633 let person_data = Arc::new(StructArray::from(vec![
3634 (
3635 Arc::new(Field::new("name", DataType::Utf8, false)),
3636 name_data,
3637 ),
3638 (
3639 Arc::new(Field::new("age", DataType::Int32, false)),
3640 age_data,
3641 ),
3642 ])) as ArrayRef;
3643
3644 let to_write =
3645 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, person_data]).unwrap();
3646
3647 let props = WriterProperties::builder()
3648 .set_compression(Compression::SNAPPY)
3649 .build();
3650
3651 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3652 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3653
3654 writer.write(&to_write).expect("Writing batch");
3655 writer.close().unwrap();
3656
3657 let reader = ArrowReaderBuilder::new(file_io).build();
3658
3659 let tasks = Box::pin(futures::stream::iter(
3660 vec![Ok(FileScanTask {
3661 start: 0,
3662 length: 0,
3663 record_count: None,
3664 data_file_path: format!("{table_location}/1.parquet"),
3665 data_file_format: DataFileFormat::Parquet,
3666 schema: schema.clone(),
3667 project_field_ids: vec![1, 2],
3668 predicate: None,
3669 deletes: vec![],
3670 partition: None,
3671 partition_spec: None,
3672 name_mapping: None,
3673 case_sensitive: false,
3674 })]
3675 .into_iter(),
3676 )) as FileScanTaskStream;
3677
3678 let result = reader
3679 .read(tasks)
3680 .unwrap()
3681 .try_collect::<Vec<RecordBatch>>()
3682 .await
3683 .unwrap();
3684
3685 assert_eq!(result.len(), 1);
3686 let batch = &result[0];
3687 assert_eq!(batch.num_rows(), 2);
3688 assert_eq!(batch.num_columns(), 2);
3689
3690 let id_array = batch
3691 .column(0)
3692 .as_primitive::<arrow_array::types::Int32Type>();
3693 assert_eq!(id_array.value(0), 1);
3694 assert_eq!(id_array.value(1), 2);
3695
3696 let person_array = batch.column(1).as_struct();
3697 assert_eq!(person_array.num_columns(), 2);
3698
3699 let name_array = person_array.column(0).as_string::<i32>();
3700 assert_eq!(name_array.value(0), "Alice");
3701 assert_eq!(name_array.value(1), "Bob");
3702
3703 let age_array = person_array
3704 .column(1)
3705 .as_primitive::<arrow_array::types::Int32Type>();
3706 assert_eq!(age_array.value(0), 30);
3707 assert_eq!(age_array.value(1), 25);
3708 }
3709
3710 #[tokio::test]
3714 async fn test_read_parquet_without_field_ids_schema_evolution_add_column_in_middle() {
3715 use arrow_array::{Array, Int32Array};
3716
3717 let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
3718 Field::new("col0", DataType::Int32, true),
3719 Field::new("col1", DataType::Int32, true),
3720 ]));
3721
3722 let schema = Arc::new(
3724 Schema::builder()
3725 .with_schema_id(1)
3726 .with_fields(vec![
3727 NestedField::optional(1, "col0", Type::Primitive(PrimitiveType::Int)).into(),
3728 NestedField::optional(5, "newCol", Type::Primitive(PrimitiveType::Int)).into(),
3729 NestedField::optional(2, "col1", Type::Primitive(PrimitiveType::Int)).into(),
3730 ])
3731 .build()
3732 .unwrap(),
3733 );
3734
3735 let tmp_dir = TempDir::new().unwrap();
3736 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3737 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3738
3739 let col0_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3740 let col1_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3741
3742 let to_write =
3743 RecordBatch::try_new(arrow_schema_old.clone(), vec![col0_data, col1_data]).unwrap();
3744
3745 let props = WriterProperties::builder()
3746 .set_compression(Compression::SNAPPY)
3747 .build();
3748
3749 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3750 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3751 writer.write(&to_write).expect("Writing batch");
3752 writer.close().unwrap();
3753
3754 let reader = ArrowReaderBuilder::new(file_io).build();
3755
3756 let tasks = Box::pin(futures::stream::iter(
3757 vec![Ok(FileScanTask {
3758 start: 0,
3759 length: 0,
3760 record_count: None,
3761 data_file_path: format!("{table_location}/1.parquet"),
3762 data_file_format: DataFileFormat::Parquet,
3763 schema: schema.clone(),
3764 project_field_ids: vec![1, 5, 2],
3765 predicate: None,
3766 deletes: vec![],
3767 partition: None,
3768 partition_spec: None,
3769 name_mapping: None,
3770 case_sensitive: false,
3771 })]
3772 .into_iter(),
3773 )) as FileScanTaskStream;
3774
3775 let result = reader
3776 .read(tasks)
3777 .unwrap()
3778 .try_collect::<Vec<RecordBatch>>()
3779 .await
3780 .unwrap();
3781
3782 assert_eq!(result.len(), 1);
3783 let batch = &result[0];
3784 assert_eq!(batch.num_rows(), 2);
3785 assert_eq!(batch.num_columns(), 3);
3786
3787 let result_col0 = batch
3788 .column(0)
3789 .as_primitive::<arrow_array::types::Int32Type>();
3790 assert_eq!(result_col0.value(0), 1);
3791 assert_eq!(result_col0.value(1), 2);
3792
3793 let result_newcol = batch
3795 .column(1)
3796 .as_primitive::<arrow_array::types::Int32Type>();
3797 assert_eq!(result_newcol.null_count(), 2);
3798 assert!(result_newcol.is_null(0));
3799 assert!(result_newcol.is_null(1));
3800
3801 let result_col1 = batch
3802 .column(2)
3803 .as_primitive::<arrow_array::types::Int32Type>();
3804 assert_eq!(result_col1.value(0), 10);
3805 assert_eq!(result_col1.value(1), 20);
3806 }
3807
3808 #[tokio::test]
3812 async fn test_read_parquet_without_field_ids_filter_eliminates_all_rows() {
3813 use arrow_array::{Float64Array, Int32Array};
3814
3815 let schema = Arc::new(
3817 Schema::builder()
3818 .with_schema_id(1)
3819 .with_fields(vec![
3820 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3821 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3822 NestedField::required(3, "value", Type::Primitive(PrimitiveType::Double))
3823 .into(),
3824 ])
3825 .build()
3826 .unwrap(),
3827 );
3828
3829 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3830 Field::new("id", DataType::Int32, false),
3831 Field::new("name", DataType::Utf8, false),
3832 Field::new("value", DataType::Float64, false),
3833 ]));
3834
3835 let tmp_dir = TempDir::new().unwrap();
3836 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3837 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3838
3839 let id_data = Arc::new(Int32Array::from(vec![10, 11, 12])) as ArrayRef;
3841 let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
3842 let value_data = Arc::new(Float64Array::from(vec![100.0, 200.0, 300.0])) as ArrayRef;
3843
3844 let to_write =
3845 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data, value_data])
3846 .unwrap();
3847
3848 let props = WriterProperties::builder()
3849 .set_compression(Compression::SNAPPY)
3850 .build();
3851
3852 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3853 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3854 writer.write(&to_write).expect("Writing batch");
3855 writer.close().unwrap();
3856
3857 let predicate = Reference::new("id").less_than(Datum::int(5));
3859
3860 let reader = ArrowReaderBuilder::new(file_io)
3862 .with_row_group_filtering_enabled(true)
3863 .with_row_selection_enabled(true)
3864 .build();
3865
3866 let tasks = Box::pin(futures::stream::iter(
3867 vec![Ok(FileScanTask {
3868 start: 0,
3869 length: 0,
3870 record_count: None,
3871 data_file_path: format!("{table_location}/1.parquet"),
3872 data_file_format: DataFileFormat::Parquet,
3873 schema: schema.clone(),
3874 project_field_ids: vec![1, 2, 3],
3875 predicate: Some(predicate.bind(schema, true).unwrap()),
3876 deletes: vec![],
3877 partition: None,
3878 partition_spec: None,
3879 name_mapping: None,
3880 case_sensitive: false,
3881 })]
3882 .into_iter(),
3883 )) as FileScanTaskStream;
3884
3885 let result = reader
3887 .read(tasks)
3888 .unwrap()
3889 .try_collect::<Vec<RecordBatch>>()
3890 .await
3891 .unwrap();
3892
3893 assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0));
3895 }
3896
3897 #[tokio::test]
3942 async fn test_bucket_partitioning_reads_source_column_from_file() {
3943 use arrow_array::Int32Array;
3944
3945 use crate::spec::{Literal, PartitionSpec, Struct, Transform};
3946
3947 let schema = Arc::new(
3949 Schema::builder()
3950 .with_schema_id(0)
3951 .with_fields(vec![
3952 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3953 NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3954 ])
3955 .build()
3956 .unwrap(),
3957 );
3958
3959 let partition_spec = Arc::new(
3961 PartitionSpec::builder(schema.clone())
3962 .with_spec_id(0)
3963 .add_partition_field("id", "id_bucket", Transform::Bucket(4))
3964 .unwrap()
3965 .build()
3966 .unwrap(),
3967 );
3968
3969 let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]);
3971
3972 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3974 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3975 PARQUET_FIELD_ID_META_KEY.to_string(),
3976 "1".to_string(),
3977 )])),
3978 Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
3979 PARQUET_FIELD_ID_META_KEY.to_string(),
3980 "2".to_string(),
3981 )])),
3982 ]));
3983
3984 let tmp_dir = TempDir::new().unwrap();
3986 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3987 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3988
3989 let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as ArrayRef;
3990 let name_data =
3991 Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef;
3992
3993 let to_write =
3994 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data]).unwrap();
3995
3996 let props = WriterProperties::builder()
3997 .set_compression(Compression::SNAPPY)
3998 .build();
3999 let file = File::create(format!("{}/data.parquet", &table_location)).unwrap();
4000 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
4001 writer.write(&to_write).expect("Writing batch");
4002 writer.close().unwrap();
4003
4004 let reader = ArrowReaderBuilder::new(file_io).build();
4006 let tasks = Box::pin(futures::stream::iter(
4007 vec![Ok(FileScanTask {
4008 start: 0,
4009 length: 0,
4010 record_count: None,
4011 data_file_path: format!("{table_location}/data.parquet"),
4012 data_file_format: DataFileFormat::Parquet,
4013 schema: schema.clone(),
4014 project_field_ids: vec![1, 2],
4015 predicate: None,
4016 deletes: vec![],
4017 partition: Some(partition_data),
4018 partition_spec: Some(partition_spec),
4019 name_mapping: None,
4020 case_sensitive: false,
4021 })]
4022 .into_iter(),
4023 )) as FileScanTaskStream;
4024
4025 let result = reader
4026 .read(tasks)
4027 .unwrap()
4028 .try_collect::<Vec<RecordBatch>>()
4029 .await
4030 .unwrap();
4031
4032 assert_eq!(result.len(), 1);
4034 let batch = &result[0];
4035
4036 assert_eq!(batch.num_columns(), 2);
4037 assert_eq!(batch.num_rows(), 4);
4038
4039 let id_col = batch
4042 .column(0)
4043 .as_primitive::<arrow_array::types::Int32Type>();
4044 assert_eq!(id_col.value(0), 1);
4045 assert_eq!(id_col.value(1), 5);
4046 assert_eq!(id_col.value(2), 9);
4047 assert_eq!(id_col.value(3), 13);
4048
4049 let name_col = batch.column(1).as_string::<i32>();
4050 assert_eq!(name_col.value(0), "Alice");
4051 assert_eq!(name_col.value(1), "Bob");
4052 assert_eq!(name_col.value(2), "Charlie");
4053 assert_eq!(name_col.value(3), "Dave");
4054 }
4055}