1use std::collections::{HashMap, HashSet};
21use std::ops::Range;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene};
26use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar};
27use arrow_cast::cast::cast;
28use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
29use arrow_schema::{
30 ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
31};
32use arrow_string::like::starts_with;
33use bytes::Bytes;
34use fnv::FnvHashSet;
35use futures::future::BoxFuture;
36use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join};
37use parquet::arrow::arrow_reader::{
38 ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
39};
40use parquet::arrow::async_reader::AsyncFileReader;
41use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask};
42use parquet::file::metadata::{
43 PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
44};
45use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
46
47use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
48use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
49use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
50use crate::delete_vector::DeleteVector;
51use crate::error::Result;
52use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit};
53use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
54use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
55use crate::expr::{BoundPredicate, BoundReference};
56use crate::io::{FileIO, FileMetadata, FileRead};
57use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
58use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
59use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type};
60use crate::utils::available_parallelism;
61use crate::{Error, ErrorKind};
62
63pub struct ArrowReaderBuilder {
65 batch_size: Option<usize>,
66 file_io: FileIO,
67 concurrency_limit_data_files: usize,
68 row_group_filtering_enabled: bool,
69 row_selection_enabled: bool,
70}
71
72impl ArrowReaderBuilder {
73 pub fn new(file_io: FileIO) -> Self {
75 let num_cpus = available_parallelism().get();
76
77 ArrowReaderBuilder {
78 batch_size: None,
79 file_io,
80 concurrency_limit_data_files: num_cpus,
81 row_group_filtering_enabled: true,
82 row_selection_enabled: false,
83 }
84 }
85
86 pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
88 self.concurrency_limit_data_files = val;
89 self
90 }
91
92 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
95 self.batch_size = Some(batch_size);
96 self
97 }
98
99 pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
101 self.row_group_filtering_enabled = row_group_filtering_enabled;
102 self
103 }
104
105 pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
107 self.row_selection_enabled = row_selection_enabled;
108 self
109 }
110
111 pub fn build(self) -> ArrowReader {
113 ArrowReader {
114 batch_size: self.batch_size,
115 file_io: self.file_io.clone(),
116 delete_file_loader: CachingDeleteFileLoader::new(
117 self.file_io.clone(),
118 self.concurrency_limit_data_files,
119 ),
120 concurrency_limit_data_files: self.concurrency_limit_data_files,
121 row_group_filtering_enabled: self.row_group_filtering_enabled,
122 row_selection_enabled: self.row_selection_enabled,
123 }
124 }
125}
126
127#[derive(Clone)]
129pub struct ArrowReader {
130 batch_size: Option<usize>,
131 file_io: FileIO,
132 delete_file_loader: CachingDeleteFileLoader,
133
134 concurrency_limit_data_files: usize,
136
137 row_group_filtering_enabled: bool,
138 row_selection_enabled: bool,
139}
140
141impl ArrowReader {
142 pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
145 let file_io = self.file_io.clone();
146 let batch_size = self.batch_size;
147 let concurrency_limit_data_files = self.concurrency_limit_data_files;
148 let row_group_filtering_enabled = self.row_group_filtering_enabled;
149 let row_selection_enabled = self.row_selection_enabled;
150
151 let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 {
153 Box::pin(
154 tasks
155 .and_then(move |task| {
156 let file_io = file_io.clone();
157
158 Self::process_file_scan_task(
159 task,
160 batch_size,
161 file_io,
162 self.delete_file_loader.clone(),
163 row_group_filtering_enabled,
164 row_selection_enabled,
165 )
166 })
167 .map_err(|err| {
168 Error::new(ErrorKind::Unexpected, "file scan task generate failed")
169 .with_source(err)
170 })
171 .try_flatten(),
172 )
173 } else {
174 Box::pin(
175 tasks
176 .map_ok(move |task| {
177 let file_io = file_io.clone();
178
179 Self::process_file_scan_task(
180 task,
181 batch_size,
182 file_io,
183 self.delete_file_loader.clone(),
184 row_group_filtering_enabled,
185 row_selection_enabled,
186 )
187 })
188 .map_err(|err| {
189 Error::new(ErrorKind::Unexpected, "file scan task generate failed")
190 .with_source(err)
191 })
192 .try_buffer_unordered(concurrency_limit_data_files)
193 .try_flatten_unordered(concurrency_limit_data_files),
194 )
195 };
196
197 Ok(stream)
198 }
199
200 #[allow(clippy::too_many_arguments)]
201 async fn process_file_scan_task(
202 task: FileScanTask,
203 batch_size: Option<usize>,
204 file_io: FileIO,
205 delete_file_loader: CachingDeleteFileLoader,
206 row_group_filtering_enabled: bool,
207 row_selection_enabled: bool,
208 ) -> Result<ArrowRecordBatchStream> {
209 let should_load_page_index =
210 (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
211
212 let delete_filter_rx =
213 delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema));
214
215 let initial_stream_builder = Self::create_parquet_record_batch_stream_builder(
218 &task.data_file_path,
219 file_io.clone(),
220 should_load_page_index,
221 None,
222 )
223 .await?;
224
225 let missing_field_ids = initial_stream_builder
229 .schema()
230 .fields()
231 .iter()
232 .next()
233 .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
234
235 let mut record_batch_stream_builder = if missing_field_ids {
251 let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
253 apply_name_mapping_to_arrow_schema(
258 Arc::clone(initial_stream_builder.schema()),
259 name_mapping,
260 )?
261 } else {
262 add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema())
265 };
266
267 let options = ArrowReaderOptions::new().with_schema(arrow_schema);
268
269 Self::create_parquet_record_batch_stream_builder(
270 &task.data_file_path,
271 file_io.clone(),
272 should_load_page_index,
273 Some(options),
274 )
275 .await?
276 } else {
277 initial_stream_builder
279 };
280
281 let project_field_ids_without_metadata: Vec<i32> = task
283 .project_field_ids
284 .iter()
285 .filter(|&&id| !is_metadata_field(id))
286 .copied()
287 .collect();
288
289 let projection_mask = Self::get_arrow_projection_mask(
294 &project_field_ids_without_metadata,
295 &task.schema,
296 record_batch_stream_builder.parquet_schema(),
297 record_batch_stream_builder.schema(),
298 missing_field_ids, )?;
300
301 record_batch_stream_builder =
302 record_batch_stream_builder.with_projection(projection_mask.clone());
303
304 let mut record_batch_transformer_builder =
308 RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
309
310 if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
312 let file_datum = Datum::string(task.data_file_path.clone());
313 record_batch_transformer_builder =
314 record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum);
315 }
316
317 if let (Some(partition_spec), Some(partition_data)) =
318 (task.partition_spec.clone(), task.partition.clone())
319 {
320 record_batch_transformer_builder =
321 record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
322 }
323
324 let mut record_batch_transformer = record_batch_transformer_builder.build();
325
326 if let Some(batch_size) = batch_size {
327 record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
328 }
329
330 let delete_filter = delete_filter_rx.await.unwrap()?;
331 let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
332
333 let final_predicate = match (&task.predicate, delete_predicate) {
338 (None, None) => None,
339 (Some(predicate), None) => Some(predicate.clone()),
340 (None, Some(ref predicate)) => Some(predicate.clone()),
341 (Some(filter_predicate), Some(delete_predicate)) => {
342 Some(filter_predicate.clone().and(delete_predicate))
343 }
344 };
345
346 let mut selected_row_group_indices = None;
362 let mut row_selection = None;
363
364 if task.start != 0 || task.length != 0 {
367 let byte_range_filtered_row_groups = Self::filter_row_groups_by_byte_range(
368 record_batch_stream_builder.metadata(),
369 task.start,
370 task.length,
371 )?;
372 selected_row_group_indices = Some(byte_range_filtered_row_groups);
373 }
374
375 if let Some(predicate) = final_predicate {
376 let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
377 record_batch_stream_builder.parquet_schema(),
378 &predicate,
379 )?;
380
381 let row_filter = Self::get_row_filter(
382 &predicate,
383 record_batch_stream_builder.parquet_schema(),
384 &iceberg_field_ids,
385 &field_id_map,
386 )?;
387 record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
388
389 if row_group_filtering_enabled {
390 let predicate_filtered_row_groups = Self::get_selected_row_group_indices(
391 &predicate,
392 record_batch_stream_builder.metadata(),
393 &field_id_map,
394 &task.schema,
395 )?;
396
397 selected_row_group_indices = match selected_row_group_indices {
400 Some(byte_range_filtered) => {
401 let intersection: Vec<usize> = byte_range_filtered
403 .into_iter()
404 .filter(|idx| predicate_filtered_row_groups.contains(idx))
405 .collect();
406 Some(intersection)
407 }
408 None => Some(predicate_filtered_row_groups),
409 };
410 }
411
412 if row_selection_enabled {
413 row_selection = Some(Self::get_row_selection_for_filter_predicate(
414 &predicate,
415 record_batch_stream_builder.metadata(),
416 &selected_row_group_indices,
417 &field_id_map,
418 &task.schema,
419 )?);
420 }
421 }
422
423 let positional_delete_indexes = delete_filter.get_delete_vector(&task);
424
425 if let Some(positional_delete_indexes) = positional_delete_indexes {
426 let delete_row_selection = {
427 let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
428
429 Self::build_deletes_row_selection(
430 record_batch_stream_builder.metadata().row_groups(),
431 &selected_row_group_indices,
432 &positional_delete_indexes,
433 )
434 }?;
435
436 row_selection = match row_selection {
439 None => Some(delete_row_selection),
440 Some(filter_row_selection) => {
441 Some(filter_row_selection.intersection(&delete_row_selection))
442 }
443 };
444 }
445
446 if let Some(row_selection) = row_selection {
447 record_batch_stream_builder =
448 record_batch_stream_builder.with_row_selection(row_selection);
449 }
450
451 if let Some(selected_row_group_indices) = selected_row_group_indices {
452 record_batch_stream_builder =
453 record_batch_stream_builder.with_row_groups(selected_row_group_indices);
454 }
455
456 let record_batch_stream =
459 record_batch_stream_builder
460 .build()?
461 .map(move |batch| match batch {
462 Ok(batch) => {
463 record_batch_transformer.process_record_batch(batch)
465 }
466 Err(err) => Err(err.into()),
467 });
468
469 Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
470 }
471
472 pub(crate) async fn create_parquet_record_batch_stream_builder(
473 data_file_path: &str,
474 file_io: FileIO,
475 should_load_page_index: bool,
476 arrow_reader_options: Option<ArrowReaderOptions>,
477 ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader>> {
478 let parquet_file = file_io.new_input(data_file_path)?;
481 let (parquet_metadata, parquet_reader) =
482 try_join!(parquet_file.metadata(), parquet_file.reader())?;
483 let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader)
484 .with_preload_column_index(true)
485 .with_preload_offset_index(true)
486 .with_preload_page_index(should_load_page_index);
487
488 let options = arrow_reader_options.unwrap_or_default();
490 let record_batch_stream_builder =
491 ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, options).await?;
492 Ok(record_batch_stream_builder)
493 }
494
495 fn build_deletes_row_selection(
501 row_group_metadata_list: &[RowGroupMetaData],
502 selected_row_groups: &Option<Vec<usize>>,
503 positional_deletes: &DeleteVector,
504 ) -> Result<RowSelection> {
505 let mut results: Vec<RowSelector> = Vec::new();
506 let mut selected_row_groups_idx = 0;
507 let mut current_row_group_base_idx: u64 = 0;
508 let mut delete_vector_iter = positional_deletes.iter();
509 let mut next_deleted_row_idx_opt = delete_vector_iter.next();
510
511 for (idx, row_group_metadata) in row_group_metadata_list.iter().enumerate() {
512 let row_group_num_rows = row_group_metadata.num_rows() as u64;
513 let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;
514
515 if let Some(selected_row_groups) = selected_row_groups {
517 if selected_row_groups_idx == selected_row_groups.len() {
519 break;
520 }
521
522 if idx == selected_row_groups[selected_row_groups_idx] {
523 selected_row_groups_idx += 1;
527 } else {
528 delete_vector_iter.advance_to(next_row_group_base_idx);
533 if let Some(cached_idx) = next_deleted_row_idx_opt
535 && cached_idx < next_row_group_base_idx
536 {
537 next_deleted_row_idx_opt = delete_vector_iter.next();
538 }
539
540 current_row_group_base_idx += row_group_num_rows;
543 continue;
544 }
545 }
546
547 let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
548 Some(next_deleted_row_idx) => {
549 if next_deleted_row_idx >= next_row_group_base_idx {
552 results.push(RowSelector::select(row_group_num_rows as usize));
553 current_row_group_base_idx += row_group_num_rows;
554 continue;
555 }
556
557 next_deleted_row_idx
558 }
559
560 _ => {
562 results.push(RowSelector::select(row_group_num_rows as usize));
563 current_row_group_base_idx += row_group_num_rows;
564 continue;
565 }
566 };
567
568 let mut current_idx = current_row_group_base_idx;
569 'chunks: while next_deleted_row_idx < next_row_group_base_idx {
570 if current_idx < next_deleted_row_idx {
572 let run_length = next_deleted_row_idx - current_idx;
573 results.push(RowSelector::select(run_length as usize));
574 current_idx += run_length;
575 }
576
577 let mut run_length = 0;
579 while next_deleted_row_idx == current_idx
580 && next_deleted_row_idx < next_row_group_base_idx
581 {
582 run_length += 1;
583 current_idx += 1;
584
585 next_deleted_row_idx_opt = delete_vector_iter.next();
586 next_deleted_row_idx = match next_deleted_row_idx_opt {
587 Some(next_deleted_row_idx) => next_deleted_row_idx,
588 _ => {
589 results.push(RowSelector::skip(run_length));
593 break 'chunks;
594 }
595 };
596 }
597 if run_length > 0 {
598 results.push(RowSelector::skip(run_length));
599 }
600 }
601
602 if current_idx < next_row_group_base_idx {
603 results.push(RowSelector::select(
604 (next_row_group_base_idx - current_idx) as usize,
605 ));
606 }
607
608 current_row_group_base_idx += row_group_num_rows;
609 }
610
611 Ok(results.into())
612 }
613
614 fn build_field_id_set_and_map(
615 parquet_schema: &SchemaDescriptor,
616 predicate: &BoundPredicate,
617 ) -> Result<(HashSet<i32>, HashMap<i32, usize>)> {
618 let mut collector = CollectFieldIdVisitor {
620 field_ids: HashSet::default(),
621 };
622 visit(&mut collector, predicate)?;
623
624 let iceberg_field_ids = collector.field_ids();
625
626 let field_id_map = match build_field_id_map(parquet_schema)? {
628 Some(map) => map,
629 None => build_fallback_field_id_map(parquet_schema),
630 };
631
632 Ok((iceberg_field_ids, field_id_map))
633 }
634
635 fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
638 match field.field_type.as_ref() {
639 Type::Primitive(_) => {
640 field_ids.push(field.id);
641 }
642 Type::Struct(struct_type) => {
643 for nested_field in struct_type.fields() {
644 Self::include_leaf_field_id(nested_field, field_ids);
645 }
646 }
647 Type::List(list_type) => {
648 Self::include_leaf_field_id(&list_type.element_field, field_ids);
649 }
650 Type::Map(map_type) => {
651 Self::include_leaf_field_id(&map_type.key_field, field_ids);
652 Self::include_leaf_field_id(&map_type.value_field, field_ids);
653 }
654 }
655 }
656
657 fn get_arrow_projection_mask(
658 field_ids: &[i32],
659 iceberg_schema_of_task: &Schema,
660 parquet_schema: &SchemaDescriptor,
661 arrow_schema: &ArrowSchemaRef,
662 use_fallback: bool, ) -> Result<ProjectionMask> {
664 fn type_promotion_is_valid(
665 file_type: Option<&PrimitiveType>,
666 projected_type: Option<&PrimitiveType>,
667 ) -> bool {
668 match (file_type, projected_type) {
669 (Some(lhs), Some(rhs)) if lhs == rhs => true,
670 (Some(PrimitiveType::Int), Some(PrimitiveType::Long)) => true,
671 (Some(PrimitiveType::Float), Some(PrimitiveType::Double)) => true,
672 (
673 Some(PrimitiveType::Decimal {
674 precision: file_precision,
675 scale: file_scale,
676 }),
677 Some(PrimitiveType::Decimal {
678 precision: requested_precision,
679 scale: requested_scale,
680 }),
681 ) if requested_precision >= file_precision && file_scale == requested_scale => true,
682 (Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true,
684 _ => false,
685 }
686 }
687
688 if field_ids.is_empty() {
689 return Ok(ProjectionMask::all());
690 }
691
692 if use_fallback {
693 Self::get_arrow_projection_mask_fallback(field_ids, parquet_schema)
695 } else {
696 let mut leaf_field_ids = vec![];
700 for field_id in field_ids {
701 let field = iceberg_schema_of_task.field_by_id(*field_id);
702 if let Some(field) = field {
703 Self::include_leaf_field_id(field, &mut leaf_field_ids);
704 }
705 }
706
707 Self::get_arrow_projection_mask_with_field_ids(
708 &leaf_field_ids,
709 iceberg_schema_of_task,
710 parquet_schema,
711 arrow_schema,
712 type_promotion_is_valid,
713 )
714 }
715 }
716
717 fn get_arrow_projection_mask_with_field_ids(
720 leaf_field_ids: &[i32],
721 iceberg_schema_of_task: &Schema,
722 parquet_schema: &SchemaDescriptor,
723 arrow_schema: &ArrowSchemaRef,
724 type_promotion_is_valid: fn(Option<&PrimitiveType>, Option<&PrimitiveType>) -> bool,
725 ) -> Result<ProjectionMask> {
726 let mut column_map = HashMap::new();
727 let fields = arrow_schema.fields();
728
729 let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
732 let projected_arrow_schema = ArrowSchema::new_with_metadata(
733 fields.filter_leaves(|_, f| {
734 f.metadata()
735 .get(PARQUET_FIELD_ID_META_KEY)
736 .and_then(|field_id| i32::from_str(field_id).ok())
737 .is_some_and(|field_id| {
738 projected_fields.insert((*f).clone(), field_id);
739 leaf_field_ids.contains(&field_id)
740 })
741 }),
742 arrow_schema.metadata().clone(),
743 );
744 let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
745
746 fields.filter_leaves(|idx, field| {
747 let Some(field_id) = projected_fields.get(field).cloned() else {
748 return false;
749 };
750
751 let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
752 let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
753
754 if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
755 return false;
756 }
757
758 if !type_promotion_is_valid(
759 parquet_iceberg_field
760 .unwrap()
761 .field_type
762 .as_primitive_type(),
763 iceberg_field.unwrap().field_type.as_primitive_type(),
764 ) {
765 return false;
766 }
767
768 column_map.insert(field_id, idx);
769 true
770 });
771
772 let mut indices = vec![];
775 for field_id in leaf_field_ids {
776 if let Some(col_idx) = column_map.get(field_id) {
777 indices.push(*col_idx);
778 }
779 }
780
781 if indices.is_empty() {
782 Ok(ProjectionMask::all())
785 } else {
786 Ok(ProjectionMask::leaves(parquet_schema, indices))
787 }
788 }
789
790 fn get_arrow_projection_mask_fallback(
794 field_ids: &[i32],
795 parquet_schema: &SchemaDescriptor,
796 ) -> Result<ProjectionMask> {
797 let parquet_root_fields = parquet_schema.root_schema().get_fields();
799 let mut root_indices = vec![];
800
801 for field_id in field_ids.iter() {
802 let parquet_pos = (*field_id - 1) as usize;
803
804 if parquet_pos < parquet_root_fields.len() {
805 root_indices.push(parquet_pos);
806 }
807 }
809
810 if root_indices.is_empty() {
811 Ok(ProjectionMask::all())
812 } else {
813 Ok(ProjectionMask::roots(parquet_schema, root_indices))
814 }
815 }
816
817 fn get_row_filter(
818 predicates: &BoundPredicate,
819 parquet_schema: &SchemaDescriptor,
820 iceberg_field_ids: &HashSet<i32>,
821 field_id_map: &HashMap<i32, usize>,
822 ) -> Result<RowFilter> {
823 let mut column_indices = iceberg_field_ids
826 .iter()
827 .filter_map(|field_id| field_id_map.get(field_id).cloned())
828 .collect::<Vec<_>>();
829 column_indices.sort();
830
831 let mut converter = PredicateConverter {
833 parquet_schema,
834 column_map: field_id_map,
835 column_indices: &column_indices,
836 };
837
838 let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
841 let predicate_func = visit(&mut converter, predicates)?;
842 let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
843 Ok(RowFilter::new(vec![Box::new(arrow_predicate)]))
844 }
845
846 fn get_selected_row_group_indices(
847 predicate: &BoundPredicate,
848 parquet_metadata: &Arc<ParquetMetaData>,
849 field_id_map: &HashMap<i32, usize>,
850 snapshot_schema: &Schema,
851 ) -> Result<Vec<usize>> {
852 let row_groups_metadata = parquet_metadata.row_groups();
853 let mut results = Vec::with_capacity(row_groups_metadata.len());
854
855 for (idx, row_group_metadata) in row_groups_metadata.iter().enumerate() {
856 if RowGroupMetricsEvaluator::eval(
857 predicate,
858 row_group_metadata,
859 field_id_map,
860 snapshot_schema,
861 )? {
862 results.push(idx);
863 }
864 }
865
866 Ok(results)
867 }
868
869 fn get_row_selection_for_filter_predicate(
870 predicate: &BoundPredicate,
871 parquet_metadata: &Arc<ParquetMetaData>,
872 selected_row_groups: &Option<Vec<usize>>,
873 field_id_map: &HashMap<i32, usize>,
874 snapshot_schema: &Schema,
875 ) -> Result<RowSelection> {
876 let Some(column_index) = parquet_metadata.column_index() else {
877 return Err(Error::new(
878 ErrorKind::Unexpected,
879 "Parquet file metadata does not contain a column index",
880 ));
881 };
882
883 let Some(offset_index) = parquet_metadata.offset_index() else {
884 return Err(Error::new(
885 ErrorKind::Unexpected,
886 "Parquet file metadata does not contain an offset index",
887 ));
888 };
889
890 if let Some(selected_row_groups) = selected_row_groups
892 && selected_row_groups.is_empty()
893 {
894 return Ok(RowSelection::from(Vec::new()));
895 }
896
897 let mut selected_row_groups_idx = 0;
898
899 let page_index = column_index
900 .iter()
901 .enumerate()
902 .zip(offset_index)
903 .zip(parquet_metadata.row_groups());
904
905 let mut results = Vec::new();
906 for (((idx, column_index), offset_index), row_group_metadata) in page_index {
907 if let Some(selected_row_groups) = selected_row_groups {
908 if idx == selected_row_groups[selected_row_groups_idx] {
910 selected_row_groups_idx += 1;
911 } else {
912 continue;
913 }
914 }
915
916 let selections_for_page = PageIndexEvaluator::eval(
917 predicate,
918 column_index,
919 offset_index,
920 row_group_metadata,
921 field_id_map,
922 snapshot_schema,
923 )?;
924
925 results.push(selections_for_page);
926
927 if let Some(selected_row_groups) = selected_row_groups
928 && selected_row_groups_idx == selected_row_groups.len()
929 {
930 break;
931 }
932 }
933
934 Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
935 }
936
937 fn filter_row_groups_by_byte_range(
942 parquet_metadata: &Arc<ParquetMetaData>,
943 start: u64,
944 length: u64,
945 ) -> Result<Vec<usize>> {
946 let row_groups = parquet_metadata.row_groups();
947 let mut selected = Vec::new();
948 let end = start + length;
949
950 let mut current_byte_offset = 4u64;
952
953 for (idx, row_group) in row_groups.iter().enumerate() {
954 let row_group_size = row_group.compressed_size() as u64;
955 let row_group_end = current_byte_offset + row_group_size;
956
957 if current_byte_offset < end && start < row_group_end {
958 selected.push(idx);
959 }
960
961 current_byte_offset = row_group_end;
962 }
963
964 Ok(selected)
965 }
966}
967
968fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<Option<HashMap<i32, usize>>> {
971 let mut column_map = HashMap::new();
972
973 for (idx, field) in parquet_schema.columns().iter().enumerate() {
974 let field_type = field.self_type();
975 match field_type {
976 ParquetType::PrimitiveType { basic_info, .. } => {
977 if !basic_info.has_id() {
978 return Ok(None);
979 }
980 column_map.insert(basic_info.id(), idx);
981 }
982 ParquetType::GroupType { .. } => {
983 return Err(Error::new(
984 ErrorKind::DataInvalid,
985 format!(
986 "Leave column in schema should be primitive type but got {field_type:?}"
987 ),
988 ));
989 }
990 };
991 }
992
993 Ok(Some(column_map))
994}
995
996fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap<i32, usize> {
999 let mut column_map = HashMap::new();
1000
1001 for (idx, _field) in parquet_schema.columns().iter().enumerate() {
1003 let field_id = (idx + 1) as i32;
1004 column_map.insert(field_id, idx);
1005 }
1006
1007 column_map
1008}
1009
1010fn apply_name_mapping_to_arrow_schema(
1029 arrow_schema: ArrowSchemaRef,
1030 name_mapping: &NameMapping,
1031) -> Result<Arc<ArrowSchema>> {
1032 debug_assert!(
1033 arrow_schema
1034 .fields()
1035 .iter()
1036 .next()
1037 .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1038 "Schema already has field IDs - name mapping should not be applied"
1039 );
1040
1041 use arrow_schema::Field;
1042
1043 let fields_with_mapped_ids: Vec<_> = arrow_schema
1044 .fields()
1045 .iter()
1046 .map(|field| {
1047 let mapped_field_opt = name_mapping
1055 .fields()
1056 .iter()
1057 .find(|f| f.names().contains(&field.name().to_string()));
1058
1059 let mut metadata = field.metadata().clone();
1060
1061 if let Some(mapped_field) = mapped_field_opt
1062 && let Some(field_id) = mapped_field.field_id()
1063 {
1064 metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1066 }
1067 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1070 .with_metadata(metadata)
1071 })
1072 .collect();
1073
1074 Ok(Arc::new(ArrowSchema::new_with_metadata(
1075 fields_with_mapped_ids,
1076 arrow_schema.metadata().clone(),
1077 )))
1078}
1079
1080fn add_fallback_field_ids_to_arrow_schema(arrow_schema: &ArrowSchemaRef) -> Arc<ArrowSchema> {
1087 debug_assert!(
1088 arrow_schema
1089 .fields()
1090 .iter()
1091 .next()
1092 .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1093 "Schema already has field IDs"
1094 );
1095
1096 use arrow_schema::Field;
1097
1098 let fields_with_fallback_ids: Vec<_> = arrow_schema
1099 .fields()
1100 .iter()
1101 .enumerate()
1102 .map(|(pos, field)| {
1103 let mut metadata = field.metadata().clone();
1104 let field_id = (pos + 1) as i32; metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1106
1107 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1108 .with_metadata(metadata)
1109 })
1110 .collect();
1111
1112 Arc::new(ArrowSchema::new_with_metadata(
1113 fields_with_fallback_ids,
1114 arrow_schema.metadata().clone(),
1115 ))
1116}
1117
1118struct CollectFieldIdVisitor {
1120 field_ids: HashSet<i32>,
1121}
1122
1123impl CollectFieldIdVisitor {
1124 fn field_ids(self) -> HashSet<i32> {
1125 self.field_ids
1126 }
1127}
1128
1129impl BoundPredicateVisitor for CollectFieldIdVisitor {
1130 type T = ();
1131
1132 fn always_true(&mut self) -> Result<()> {
1133 Ok(())
1134 }
1135
1136 fn always_false(&mut self) -> Result<()> {
1137 Ok(())
1138 }
1139
1140 fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1141 Ok(())
1142 }
1143
1144 fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1145 Ok(())
1146 }
1147
1148 fn not(&mut self, _inner: ()) -> Result<()> {
1149 Ok(())
1150 }
1151
1152 fn is_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1153 self.field_ids.insert(reference.field().id);
1154 Ok(())
1155 }
1156
1157 fn not_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1158 self.field_ids.insert(reference.field().id);
1159 Ok(())
1160 }
1161
1162 fn is_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1163 self.field_ids.insert(reference.field().id);
1164 Ok(())
1165 }
1166
1167 fn not_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1168 self.field_ids.insert(reference.field().id);
1169 Ok(())
1170 }
1171
1172 fn less_than(
1173 &mut self,
1174 reference: &BoundReference,
1175 _literal: &Datum,
1176 _predicate: &BoundPredicate,
1177 ) -> Result<()> {
1178 self.field_ids.insert(reference.field().id);
1179 Ok(())
1180 }
1181
1182 fn less_than_or_eq(
1183 &mut self,
1184 reference: &BoundReference,
1185 _literal: &Datum,
1186 _predicate: &BoundPredicate,
1187 ) -> Result<()> {
1188 self.field_ids.insert(reference.field().id);
1189 Ok(())
1190 }
1191
1192 fn greater_than(
1193 &mut self,
1194 reference: &BoundReference,
1195 _literal: &Datum,
1196 _predicate: &BoundPredicate,
1197 ) -> Result<()> {
1198 self.field_ids.insert(reference.field().id);
1199 Ok(())
1200 }
1201
1202 fn greater_than_or_eq(
1203 &mut self,
1204 reference: &BoundReference,
1205 _literal: &Datum,
1206 _predicate: &BoundPredicate,
1207 ) -> Result<()> {
1208 self.field_ids.insert(reference.field().id);
1209 Ok(())
1210 }
1211
1212 fn eq(
1213 &mut self,
1214 reference: &BoundReference,
1215 _literal: &Datum,
1216 _predicate: &BoundPredicate,
1217 ) -> Result<()> {
1218 self.field_ids.insert(reference.field().id);
1219 Ok(())
1220 }
1221
1222 fn not_eq(
1223 &mut self,
1224 reference: &BoundReference,
1225 _literal: &Datum,
1226 _predicate: &BoundPredicate,
1227 ) -> Result<()> {
1228 self.field_ids.insert(reference.field().id);
1229 Ok(())
1230 }
1231
1232 fn starts_with(
1233 &mut self,
1234 reference: &BoundReference,
1235 _literal: &Datum,
1236 _predicate: &BoundPredicate,
1237 ) -> Result<()> {
1238 self.field_ids.insert(reference.field().id);
1239 Ok(())
1240 }
1241
1242 fn not_starts_with(
1243 &mut self,
1244 reference: &BoundReference,
1245 _literal: &Datum,
1246 _predicate: &BoundPredicate,
1247 ) -> Result<()> {
1248 self.field_ids.insert(reference.field().id);
1249 Ok(())
1250 }
1251
1252 fn r#in(
1253 &mut self,
1254 reference: &BoundReference,
1255 _literals: &FnvHashSet<Datum>,
1256 _predicate: &BoundPredicate,
1257 ) -> Result<()> {
1258 self.field_ids.insert(reference.field().id);
1259 Ok(())
1260 }
1261
1262 fn not_in(
1263 &mut self,
1264 reference: &BoundReference,
1265 _literals: &FnvHashSet<Datum>,
1266 _predicate: &BoundPredicate,
1267 ) -> Result<()> {
1268 self.field_ids.insert(reference.field().id);
1269 Ok(())
1270 }
1271}
1272
1273struct PredicateConverter<'a> {
1275 pub parquet_schema: &'a SchemaDescriptor,
1277 pub column_map: &'a HashMap<i32, usize>,
1279 pub column_indices: &'a Vec<usize>,
1281}
1282
1283impl PredicateConverter<'_> {
1284 fn bound_reference(&mut self, reference: &BoundReference) -> Result<Option<usize>> {
1289 if let Some(column_idx) = self.column_map.get(&reference.field().id) {
1291 if self.parquet_schema.get_column_root(*column_idx).is_group() {
1292 return Err(Error::new(
1293 ErrorKind::DataInvalid,
1294 format!(
1295 "Leave column `{}` in predicates isn't a root column in Parquet schema.",
1296 reference.field().name
1297 ),
1298 ));
1299 }
1300
1301 let index = self
1303 .column_indices
1304 .iter()
1305 .position(|&idx| idx == *column_idx)
1306 .ok_or(Error::new(
1307 ErrorKind::DataInvalid,
1308 format!(
1309 "Leave column `{}` in predicates cannot be found in the required column indices.",
1310 reference.field().name
1311 ),
1312 ))?;
1313
1314 Ok(Some(index))
1315 } else {
1316 Ok(None)
1317 }
1318 }
1319
1320 fn build_always_true(&self) -> Result<Box<PredicateResult>> {
1322 Ok(Box::new(|batch| {
1323 Ok(BooleanArray::from(vec![true; batch.num_rows()]))
1324 }))
1325 }
1326
1327 fn build_always_false(&self) -> Result<Box<PredicateResult>> {
1329 Ok(Box::new(|batch| {
1330 Ok(BooleanArray::from(vec![false; batch.num_rows()]))
1331 }))
1332 }
1333}
1334
1335fn project_column(
1338 batch: &RecordBatch,
1339 column_idx: usize,
1340) -> std::result::Result<ArrayRef, ArrowError> {
1341 let column = batch.column(column_idx);
1342
1343 match column.data_type() {
1344 DataType::Struct(_) => Err(ArrowError::SchemaError(
1345 "Does not support struct column yet.".to_string(),
1346 )),
1347 _ => Ok(column.clone()),
1348 }
1349}
1350
1351type PredicateResult =
1352 dyn FnMut(RecordBatch) -> std::result::Result<BooleanArray, ArrowError> + Send + 'static;
1353
1354impl BoundPredicateVisitor for PredicateConverter<'_> {
1355 type T = Box<PredicateResult>;
1356
1357 fn always_true(&mut self) -> Result<Box<PredicateResult>> {
1358 self.build_always_true()
1359 }
1360
1361 fn always_false(&mut self) -> Result<Box<PredicateResult>> {
1362 self.build_always_false()
1363 }
1364
1365 fn and(
1366 &mut self,
1367 mut lhs: Box<PredicateResult>,
1368 mut rhs: Box<PredicateResult>,
1369 ) -> Result<Box<PredicateResult>> {
1370 Ok(Box::new(move |batch| {
1371 let left = lhs(batch.clone())?;
1372 let right = rhs(batch)?;
1373 and_kleene(&left, &right)
1374 }))
1375 }
1376
1377 fn or(
1378 &mut self,
1379 mut lhs: Box<PredicateResult>,
1380 mut rhs: Box<PredicateResult>,
1381 ) -> Result<Box<PredicateResult>> {
1382 Ok(Box::new(move |batch| {
1383 let left = lhs(batch.clone())?;
1384 let right = rhs(batch)?;
1385 or_kleene(&left, &right)
1386 }))
1387 }
1388
1389 fn not(&mut self, mut inner: Box<PredicateResult>) -> Result<Box<PredicateResult>> {
1390 Ok(Box::new(move |batch| {
1391 let pred_ret = inner(batch)?;
1392 not(&pred_ret)
1393 }))
1394 }
1395
1396 fn is_null(
1397 &mut self,
1398 reference: &BoundReference,
1399 _predicate: &BoundPredicate,
1400 ) -> Result<Box<PredicateResult>> {
1401 if let Some(idx) = self.bound_reference(reference)? {
1402 Ok(Box::new(move |batch| {
1403 let column = project_column(&batch, idx)?;
1404 is_null(&column)
1405 }))
1406 } else {
1407 self.build_always_true()
1409 }
1410 }
1411
1412 fn not_null(
1413 &mut self,
1414 reference: &BoundReference,
1415 _predicate: &BoundPredicate,
1416 ) -> Result<Box<PredicateResult>> {
1417 if let Some(idx) = self.bound_reference(reference)? {
1418 Ok(Box::new(move |batch| {
1419 let column = project_column(&batch, idx)?;
1420 is_not_null(&column)
1421 }))
1422 } else {
1423 self.build_always_false()
1425 }
1426 }
1427
1428 fn is_nan(
1429 &mut self,
1430 reference: &BoundReference,
1431 _predicate: &BoundPredicate,
1432 ) -> Result<Box<PredicateResult>> {
1433 if self.bound_reference(reference)?.is_some() {
1434 self.build_always_true()
1435 } else {
1436 self.build_always_false()
1438 }
1439 }
1440
1441 fn not_nan(
1442 &mut self,
1443 reference: &BoundReference,
1444 _predicate: &BoundPredicate,
1445 ) -> Result<Box<PredicateResult>> {
1446 if self.bound_reference(reference)?.is_some() {
1447 self.build_always_false()
1448 } else {
1449 self.build_always_true()
1451 }
1452 }
1453
1454 fn less_than(
1455 &mut self,
1456 reference: &BoundReference,
1457 literal: &Datum,
1458 _predicate: &BoundPredicate,
1459 ) -> Result<Box<PredicateResult>> {
1460 if let Some(idx) = self.bound_reference(reference)? {
1461 let literal = get_arrow_datum(literal)?;
1462
1463 Ok(Box::new(move |batch| {
1464 let left = project_column(&batch, idx)?;
1465 let literal = try_cast_literal(&literal, left.data_type())?;
1466 lt(&left, literal.as_ref())
1467 }))
1468 } else {
1469 self.build_always_true()
1471 }
1472 }
1473
1474 fn less_than_or_eq(
1475 &mut self,
1476 reference: &BoundReference,
1477 literal: &Datum,
1478 _predicate: &BoundPredicate,
1479 ) -> Result<Box<PredicateResult>> {
1480 if let Some(idx) = self.bound_reference(reference)? {
1481 let literal = get_arrow_datum(literal)?;
1482
1483 Ok(Box::new(move |batch| {
1484 let left = project_column(&batch, idx)?;
1485 let literal = try_cast_literal(&literal, left.data_type())?;
1486 lt_eq(&left, literal.as_ref())
1487 }))
1488 } else {
1489 self.build_always_true()
1491 }
1492 }
1493
1494 fn greater_than(
1495 &mut self,
1496 reference: &BoundReference,
1497 literal: &Datum,
1498 _predicate: &BoundPredicate,
1499 ) -> Result<Box<PredicateResult>> {
1500 if let Some(idx) = self.bound_reference(reference)? {
1501 let literal = get_arrow_datum(literal)?;
1502
1503 Ok(Box::new(move |batch| {
1504 let left = project_column(&batch, idx)?;
1505 let literal = try_cast_literal(&literal, left.data_type())?;
1506 gt(&left, literal.as_ref())
1507 }))
1508 } else {
1509 self.build_always_false()
1511 }
1512 }
1513
1514 fn greater_than_or_eq(
1515 &mut self,
1516 reference: &BoundReference,
1517 literal: &Datum,
1518 _predicate: &BoundPredicate,
1519 ) -> Result<Box<PredicateResult>> {
1520 if let Some(idx) = self.bound_reference(reference)? {
1521 let literal = get_arrow_datum(literal)?;
1522
1523 Ok(Box::new(move |batch| {
1524 let left = project_column(&batch, idx)?;
1525 let literal = try_cast_literal(&literal, left.data_type())?;
1526 gt_eq(&left, literal.as_ref())
1527 }))
1528 } else {
1529 self.build_always_false()
1531 }
1532 }
1533
1534 fn eq(
1535 &mut self,
1536 reference: &BoundReference,
1537 literal: &Datum,
1538 _predicate: &BoundPredicate,
1539 ) -> Result<Box<PredicateResult>> {
1540 if let Some(idx) = self.bound_reference(reference)? {
1541 let literal = get_arrow_datum(literal)?;
1542
1543 Ok(Box::new(move |batch| {
1544 let left = project_column(&batch, idx)?;
1545 let literal = try_cast_literal(&literal, left.data_type())?;
1546 eq(&left, literal.as_ref())
1547 }))
1548 } else {
1549 self.build_always_false()
1551 }
1552 }
1553
1554 fn not_eq(
1555 &mut self,
1556 reference: &BoundReference,
1557 literal: &Datum,
1558 _predicate: &BoundPredicate,
1559 ) -> Result<Box<PredicateResult>> {
1560 if let Some(idx) = self.bound_reference(reference)? {
1561 let literal = get_arrow_datum(literal)?;
1562
1563 Ok(Box::new(move |batch| {
1564 let left = project_column(&batch, idx)?;
1565 let literal = try_cast_literal(&literal, left.data_type())?;
1566 neq(&left, literal.as_ref())
1567 }))
1568 } else {
1569 self.build_always_false()
1571 }
1572 }
1573
1574 fn starts_with(
1575 &mut self,
1576 reference: &BoundReference,
1577 literal: &Datum,
1578 _predicate: &BoundPredicate,
1579 ) -> Result<Box<PredicateResult>> {
1580 if let Some(idx) = self.bound_reference(reference)? {
1581 let literal = get_arrow_datum(literal)?;
1582
1583 Ok(Box::new(move |batch| {
1584 let left = project_column(&batch, idx)?;
1585 let literal = try_cast_literal(&literal, left.data_type())?;
1586 starts_with(&left, literal.as_ref())
1587 }))
1588 } else {
1589 self.build_always_false()
1591 }
1592 }
1593
1594 fn not_starts_with(
1595 &mut self,
1596 reference: &BoundReference,
1597 literal: &Datum,
1598 _predicate: &BoundPredicate,
1599 ) -> Result<Box<PredicateResult>> {
1600 if let Some(idx) = self.bound_reference(reference)? {
1601 let literal = get_arrow_datum(literal)?;
1602
1603 Ok(Box::new(move |batch| {
1604 let left = project_column(&batch, idx)?;
1605 let literal = try_cast_literal(&literal, left.data_type())?;
1606 not(&starts_with(&left, literal.as_ref())?)
1608 }))
1609 } else {
1610 self.build_always_true()
1612 }
1613 }
1614
1615 fn r#in(
1616 &mut self,
1617 reference: &BoundReference,
1618 literals: &FnvHashSet<Datum>,
1619 _predicate: &BoundPredicate,
1620 ) -> Result<Box<PredicateResult>> {
1621 if let Some(idx) = self.bound_reference(reference)? {
1622 let literals: Vec<_> = literals
1623 .iter()
1624 .map(|lit| get_arrow_datum(lit).unwrap())
1625 .collect();
1626
1627 Ok(Box::new(move |batch| {
1628 let left = project_column(&batch, idx)?;
1630
1631 let mut acc = BooleanArray::from(vec![false; batch.num_rows()]);
1632 for literal in &literals {
1633 let literal = try_cast_literal(literal, left.data_type())?;
1634 acc = or(&acc, &eq(&left, literal.as_ref())?)?
1635 }
1636
1637 Ok(acc)
1638 }))
1639 } else {
1640 self.build_always_false()
1642 }
1643 }
1644
1645 fn not_in(
1646 &mut self,
1647 reference: &BoundReference,
1648 literals: &FnvHashSet<Datum>,
1649 _predicate: &BoundPredicate,
1650 ) -> Result<Box<PredicateResult>> {
1651 if let Some(idx) = self.bound_reference(reference)? {
1652 let literals: Vec<_> = literals
1653 .iter()
1654 .map(|lit| get_arrow_datum(lit).unwrap())
1655 .collect();
1656
1657 Ok(Box::new(move |batch| {
1658 let left = project_column(&batch, idx)?;
1660 let mut acc = BooleanArray::from(vec![true; batch.num_rows()]);
1661 for literal in &literals {
1662 let literal = try_cast_literal(literal, left.data_type())?;
1663 acc = and(&acc, &neq(&left, literal.as_ref())?)?
1664 }
1665
1666 Ok(acc)
1667 }))
1668 } else {
1669 self.build_always_true()
1671 }
1672 }
1673}
1674
1675pub struct ArrowFileReader {
1677 meta: FileMetadata,
1678 preload_column_index: bool,
1679 preload_offset_index: bool,
1680 preload_page_index: bool,
1681 metadata_size_hint: Option<usize>,
1682 r: Box<dyn FileRead>,
1683}
1684
1685impl ArrowFileReader {
1686 pub fn new(meta: FileMetadata, r: Box<dyn FileRead>) -> Self {
1688 Self {
1689 meta,
1690 preload_column_index: false,
1691 preload_offset_index: false,
1692 preload_page_index: false,
1693 metadata_size_hint: None,
1694 r,
1695 }
1696 }
1697
1698 pub fn with_preload_column_index(mut self, preload: bool) -> Self {
1700 self.preload_column_index = preload;
1701 self
1702 }
1703
1704 pub fn with_preload_offset_index(mut self, preload: bool) -> Self {
1706 self.preload_offset_index = preload;
1707 self
1708 }
1709
1710 pub fn with_preload_page_index(mut self, preload: bool) -> Self {
1712 self.preload_page_index = preload;
1713 self
1714 }
1715
1716 pub fn with_metadata_size_hint(mut self, hint: usize) -> Self {
1721 self.metadata_size_hint = Some(hint);
1722 self
1723 }
1724}
1725
1726impl AsyncFileReader for ArrowFileReader {
1727 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
1728 Box::pin(
1729 self.r
1730 .read(range.start..range.end)
1731 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
1732 )
1733 }
1734
1735 fn get_metadata(
1738 &mut self,
1739 _options: Option<&'_ ArrowReaderOptions>,
1740 ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
1741 async move {
1742 let reader = ParquetMetaDataReader::new()
1743 .with_prefetch_hint(self.metadata_size_hint)
1744 .with_page_index_policy(PageIndexPolicy::from(self.preload_page_index))
1746 .with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
1747 .with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index));
1748 let size = self.meta.size;
1749 let meta = reader.load_and_finish(self, size).await?;
1750
1751 Ok(Arc::new(meta))
1752 }
1753 .boxed()
1754 }
1755}
1756
1757fn try_cast_literal(
1764 literal: &Arc<dyn ArrowDatum + Send + Sync>,
1765 column_type: &DataType,
1766) -> std::result::Result<Arc<dyn ArrowDatum + Send + Sync>, ArrowError> {
1767 let literal_array = literal.get().0;
1768
1769 if literal_array.data_type() == column_type {
1771 return Ok(Arc::clone(literal));
1772 }
1773
1774 let literal_array = cast(literal_array, column_type)?;
1775 Ok(Arc::new(Scalar::new(literal_array)))
1776}
1777
1778#[cfg(test)]
1779mod tests {
1780 use std::collections::{HashMap, HashSet};
1781 use std::fs::File;
1782 use std::sync::Arc;
1783
1784 use arrow_array::cast::AsArray;
1785 use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray};
1786 use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1787 use futures::TryStreamExt;
1788 use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
1789 use parquet::arrow::{ArrowWriter, ProjectionMask};
1790 use parquet::basic::Compression;
1791 use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1792 use parquet::file::properties::WriterProperties;
1793 use parquet::schema::parser::parse_message_type;
1794 use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
1795 use roaring::RoaringTreemap;
1796 use tempfile::TempDir;
1797
1798 use crate::ErrorKind;
1799 use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
1800 use crate::arrow::{ArrowReader, ArrowReaderBuilder};
1801 use crate::delete_vector::DeleteVector;
1802 use crate::expr::visitors::bound_predicate_visitor::visit;
1803 use crate::expr::{Bind, Predicate, Reference};
1804 use crate::io::FileIO;
1805 use crate::scan::{FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream};
1806 use crate::spec::{
1807 DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type,
1808 };
1809
1810 fn table_schema_simple() -> SchemaRef {
1811 Arc::new(
1812 Schema::builder()
1813 .with_schema_id(1)
1814 .with_identifier_field_ids(vec![2])
1815 .with_fields(vec![
1816 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1817 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1818 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1819 NestedField::optional(4, "qux", Type::Primitive(PrimitiveType::Float)).into(),
1820 ])
1821 .build()
1822 .unwrap(),
1823 )
1824 }
1825
1826 #[test]
1827 fn test_collect_field_id() {
1828 let schema = table_schema_simple();
1829 let expr = Reference::new("qux").is_null();
1830 let bound_expr = expr.bind(schema, true).unwrap();
1831
1832 let mut visitor = CollectFieldIdVisitor {
1833 field_ids: HashSet::default(),
1834 };
1835 visit(&mut visitor, &bound_expr).unwrap();
1836
1837 let mut expected = HashSet::default();
1838 expected.insert(4_i32);
1839
1840 assert_eq!(visitor.field_ids, expected);
1841 }
1842
1843 #[test]
1844 fn test_collect_field_id_with_and() {
1845 let schema = table_schema_simple();
1846 let expr = Reference::new("qux")
1847 .is_null()
1848 .and(Reference::new("baz").is_null());
1849 let bound_expr = expr.bind(schema, true).unwrap();
1850
1851 let mut visitor = CollectFieldIdVisitor {
1852 field_ids: HashSet::default(),
1853 };
1854 visit(&mut visitor, &bound_expr).unwrap();
1855
1856 let mut expected = HashSet::default();
1857 expected.insert(4_i32);
1858 expected.insert(3);
1859
1860 assert_eq!(visitor.field_ids, expected);
1861 }
1862
1863 #[test]
1864 fn test_collect_field_id_with_or() {
1865 let schema = table_schema_simple();
1866 let expr = Reference::new("qux")
1867 .is_null()
1868 .or(Reference::new("baz").is_null());
1869 let bound_expr = expr.bind(schema, true).unwrap();
1870
1871 let mut visitor = CollectFieldIdVisitor {
1872 field_ids: HashSet::default(),
1873 };
1874 visit(&mut visitor, &bound_expr).unwrap();
1875
1876 let mut expected = HashSet::default();
1877 expected.insert(4_i32);
1878 expected.insert(3);
1879
1880 assert_eq!(visitor.field_ids, expected);
1881 }
1882
1883 #[test]
1884 fn test_arrow_projection_mask() {
1885 let schema = Arc::new(
1886 Schema::builder()
1887 .with_schema_id(1)
1888 .with_identifier_field_ids(vec![1])
1889 .with_fields(vec![
1890 NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
1891 NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
1892 NestedField::optional(
1893 3,
1894 "c3",
1895 Type::Primitive(PrimitiveType::Decimal {
1896 precision: 38,
1897 scale: 3,
1898 }),
1899 )
1900 .into(),
1901 ])
1902 .build()
1903 .unwrap(),
1904 );
1905 let arrow_schema = Arc::new(ArrowSchema::new(vec![
1906 Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
1907 PARQUET_FIELD_ID_META_KEY.to_string(),
1908 "1".to_string(),
1909 )])),
1910 Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
1912 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
1913 ),
1914 Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
1916 PARQUET_FIELD_ID_META_KEY.to_string(),
1917 "3".to_string(),
1918 )])),
1919 ]));
1920
1921 let message_type = "
1922message schema {
1923 required binary c1 (STRING) = 1;
1924 optional int32 c2 (INTEGER(8,true)) = 2;
1925 optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
1926}
1927 ";
1928 let parquet_type = parse_message_type(message_type).expect("should parse schema");
1929 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
1930
1931 let err = ArrowReader::get_arrow_projection_mask(
1933 &[1, 2, 3],
1934 &schema,
1935 &parquet_schema,
1936 &arrow_schema,
1937 false,
1938 )
1939 .unwrap_err();
1940
1941 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1942 assert_eq!(
1943 err.to_string(),
1944 "DataInvalid => Unsupported Arrow data type: Duration(µs)".to_string()
1945 );
1946
1947 let err = ArrowReader::get_arrow_projection_mask(
1949 &[1, 3],
1950 &schema,
1951 &parquet_schema,
1952 &arrow_schema,
1953 false,
1954 )
1955 .unwrap_err();
1956
1957 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1958 assert_eq!(
1959 err.to_string(),
1960 "DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
1961 );
1962
1963 let mask = ArrowReader::get_arrow_projection_mask(
1965 &[1],
1966 &schema,
1967 &parquet_schema,
1968 &arrow_schema,
1969 false,
1970 )
1971 .expect("Some ProjectionMask");
1972 assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
1973 }
1974
1975 #[tokio::test]
1976 async fn test_kleene_logic_or_behaviour() {
1977 let predicate = Reference::new("a")
1979 .is_null()
1980 .or(Reference::new("a").equal_to(Datum::string("foo")));
1981
1982 let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
1984
1985 let expected = vec![None, Some("foo".to_string())];
1987
1988 let (file_io, schema, table_location, _temp_dir) =
1989 setup_kleene_logic(data_for_col_a, DataType::Utf8);
1990 let reader = ArrowReaderBuilder::new(file_io).build();
1991
1992 let result_data = test_perform_read(predicate, schema, table_location, reader).await;
1993
1994 assert_eq!(result_data, expected);
1995 }
1996
1997 #[tokio::test]
1998 async fn test_kleene_logic_and_behaviour() {
1999 let predicate = Reference::new("a")
2001 .is_not_null()
2002 .and(Reference::new("a").not_equal_to(Datum::string("foo")));
2003
2004 let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
2006
2007 let expected = vec![Some("bar".to_string())];
2009
2010 let (file_io, schema, table_location, _temp_dir) =
2011 setup_kleene_logic(data_for_col_a, DataType::Utf8);
2012 let reader = ArrowReaderBuilder::new(file_io).build();
2013
2014 let result_data = test_perform_read(predicate, schema, table_location, reader).await;
2015
2016 assert_eq!(result_data, expected);
2017 }
2018
2019 #[tokio::test]
2020 async fn test_predicate_cast_literal() {
2021 let predicates = vec![
2022 (Reference::new("a").equal_to(Datum::string("foo")), vec![
2024 Some("foo".to_string()),
2025 ]),
2026 (
2028 Reference::new("a").not_equal_to(Datum::string("foo")),
2029 vec![Some("bar".to_string())],
2030 ),
2031 (Reference::new("a").starts_with(Datum::string("f")), vec![
2033 Some("foo".to_string()),
2034 ]),
2035 (
2037 Reference::new("a").not_starts_with(Datum::string("f")),
2038 vec![Some("bar".to_string())],
2039 ),
2040 (Reference::new("a").less_than(Datum::string("foo")), vec![
2042 Some("bar".to_string()),
2043 ]),
2044 (
2046 Reference::new("a").less_than_or_equal_to(Datum::string("foo")),
2047 vec![Some("foo".to_string()), Some("bar".to_string())],
2048 ),
2049 (
2051 Reference::new("a").greater_than(Datum::string("bar")),
2052 vec![Some("foo".to_string())],
2053 ),
2054 (
2056 Reference::new("a").greater_than_or_equal_to(Datum::string("foo")),
2057 vec![Some("foo".to_string())],
2058 ),
2059 (
2061 Reference::new("a").is_in([Datum::string("foo"), Datum::string("baz")]),
2062 vec![Some("foo".to_string())],
2063 ),
2064 (
2066 Reference::new("a").is_not_in([Datum::string("foo"), Datum::string("baz")]),
2067 vec![Some("bar".to_string())],
2068 ),
2069 ];
2070
2071 let data_for_col_a = vec![Some("foo".to_string()), Some("bar".to_string())];
2073
2074 let (file_io, schema, table_location, _temp_dir) =
2075 setup_kleene_logic(data_for_col_a, DataType::LargeUtf8);
2076 let reader = ArrowReaderBuilder::new(file_io).build();
2077
2078 for (predicate, expected) in predicates {
2079 println!("testing predicate {predicate}");
2080 let result_data = test_perform_read(
2081 predicate.clone(),
2082 schema.clone(),
2083 table_location.clone(),
2084 reader.clone(),
2085 )
2086 .await;
2087
2088 assert_eq!(result_data, expected, "predicate={predicate}");
2089 }
2090 }
2091
2092 async fn test_perform_read(
2093 predicate: Predicate,
2094 schema: SchemaRef,
2095 table_location: String,
2096 reader: ArrowReader,
2097 ) -> Vec<Option<String>> {
2098 let tasks = Box::pin(futures::stream::iter(
2099 vec![Ok(FileScanTask {
2100 start: 0,
2101 length: 0,
2102 record_count: None,
2103 data_file_path: format!("{table_location}/1.parquet"),
2104 data_file_format: DataFileFormat::Parquet,
2105 schema: schema.clone(),
2106 project_field_ids: vec![1],
2107 predicate: Some(predicate.bind(schema, true).unwrap()),
2108 deletes: vec![],
2109 partition: None,
2110 partition_spec: None,
2111 name_mapping: None,
2112 case_sensitive: false,
2113 })]
2114 .into_iter(),
2115 )) as FileScanTaskStream;
2116
2117 let result = reader
2118 .read(tasks)
2119 .unwrap()
2120 .try_collect::<Vec<RecordBatch>>()
2121 .await
2122 .unwrap();
2123
2124 result[0].columns()[0]
2125 .as_string_opt::<i32>()
2126 .unwrap()
2127 .iter()
2128 .map(|v| v.map(ToOwned::to_owned))
2129 .collect::<Vec<_>>()
2130 }
2131
2132 fn setup_kleene_logic(
2133 data_for_col_a: Vec<Option<String>>,
2134 col_a_type: DataType,
2135 ) -> (FileIO, SchemaRef, String, TempDir) {
2136 let schema = Arc::new(
2137 Schema::builder()
2138 .with_schema_id(1)
2139 .with_fields(vec![
2140 NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(),
2141 ])
2142 .build()
2143 .unwrap(),
2144 );
2145
2146 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2147 Field::new("a", col_a_type.clone(), true).with_metadata(HashMap::from([(
2148 PARQUET_FIELD_ID_META_KEY.to_string(),
2149 "1".to_string(),
2150 )])),
2151 ]));
2152
2153 let tmp_dir = TempDir::new().unwrap();
2154 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2155
2156 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2157
2158 let col = match col_a_type {
2159 DataType::Utf8 => Arc::new(StringArray::from(data_for_col_a)) as ArrayRef,
2160 DataType::LargeUtf8 => Arc::new(LargeStringArray::from(data_for_col_a)) as ArrayRef,
2161 _ => panic!("unexpected col_a_type"),
2162 };
2163
2164 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col]).unwrap();
2165
2166 let props = WriterProperties::builder()
2168 .set_compression(Compression::SNAPPY)
2169 .build();
2170
2171 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
2172 let mut writer =
2173 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
2174
2175 writer.write(&to_write).expect("Writing batch");
2176
2177 writer.close().unwrap();
2179
2180 (file_io, schema, table_location, tmp_dir)
2181 }
2182
2183 #[test]
2184 fn test_build_deletes_row_selection() {
2185 let schema_descr = get_test_schema_descr();
2186
2187 let mut columns = vec![];
2188 for ptr in schema_descr.columns() {
2189 let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
2190 columns.push(column);
2191 }
2192
2193 let row_groups_metadata = vec![
2194 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 0),
2195 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 1),
2196 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 2),
2197 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 3),
2198 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 4),
2199 ];
2200
2201 let selected_row_groups = Some(vec![1, 3]);
2202
2203 let positional_deletes = RoaringTreemap::from_iter(&[
2210 1, 3, 4, 5, 998, 999, 1000, 1010, 1011, 1012, 1498, 1499, 1500, 1501, 1600, 1999, 2000, 2001, 2100, 2200, 2201, 2202, 2999, 3000, ]);
2228
2229 let positional_deletes = DeleteVector::new(positional_deletes);
2230
2231 let result = ArrowReader::build_deletes_row_selection(
2233 &row_groups_metadata,
2234 &selected_row_groups,
2235 &positional_deletes,
2236 )
2237 .unwrap();
2238
2239 let expected = RowSelection::from(vec![
2240 RowSelector::skip(1),
2241 RowSelector::select(9),
2242 RowSelector::skip(3),
2243 RowSelector::select(485),
2244 RowSelector::skip(4),
2245 RowSelector::select(98),
2246 RowSelector::skip(1),
2247 RowSelector::select(99),
2248 RowSelector::skip(3),
2249 RowSelector::select(796),
2250 RowSelector::skip(1),
2251 ]);
2252
2253 assert_eq!(result, expected);
2254
2255 let result = ArrowReader::build_deletes_row_selection(
2257 &row_groups_metadata,
2258 &None,
2259 &positional_deletes,
2260 )
2261 .unwrap();
2262
2263 let expected = RowSelection::from(vec![
2264 RowSelector::select(1),
2265 RowSelector::skip(1),
2266 RowSelector::select(1),
2267 RowSelector::skip(3),
2268 RowSelector::select(992),
2269 RowSelector::skip(3),
2270 RowSelector::select(9),
2271 RowSelector::skip(3),
2272 RowSelector::select(485),
2273 RowSelector::skip(4),
2274 RowSelector::select(98),
2275 RowSelector::skip(1),
2276 RowSelector::select(398),
2277 RowSelector::skip(3),
2278 RowSelector::select(98),
2279 RowSelector::skip(1),
2280 RowSelector::select(99),
2281 RowSelector::skip(3),
2282 RowSelector::select(796),
2283 RowSelector::skip(2),
2284 RowSelector::select(499),
2285 ]);
2286
2287 assert_eq!(result, expected);
2288 }
2289
2290 fn build_test_row_group_meta(
2291 schema_descr: SchemaDescPtr,
2292 columns: Vec<ColumnChunkMetaData>,
2293 num_rows: i64,
2294 ordinal: i16,
2295 ) -> RowGroupMetaData {
2296 RowGroupMetaData::builder(schema_descr.clone())
2297 .set_num_rows(num_rows)
2298 .set_total_byte_size(2000)
2299 .set_column_metadata(columns)
2300 .set_ordinal(ordinal)
2301 .build()
2302 .unwrap()
2303 }
2304
2305 fn get_test_schema_descr() -> SchemaDescPtr {
2306 use parquet::schema::types::Type as SchemaType;
2307
2308 let schema = SchemaType::group_type_builder("schema")
2309 .with_fields(vec![
2310 Arc::new(
2311 SchemaType::primitive_type_builder("a", parquet::basic::Type::INT32)
2312 .build()
2313 .unwrap(),
2314 ),
2315 Arc::new(
2316 SchemaType::primitive_type_builder("b", parquet::basic::Type::INT32)
2317 .build()
2318 .unwrap(),
2319 ),
2320 ])
2321 .build()
2322 .unwrap();
2323
2324 Arc::new(SchemaDescriptor::new(Arc::new(schema)))
2325 }
2326
2327 #[tokio::test]
2329 async fn test_file_splits_respect_byte_ranges() {
2330 use arrow_array::Int32Array;
2331 use parquet::file::reader::{FileReader, SerializedFileReader};
2332
2333 let schema = Arc::new(
2334 Schema::builder()
2335 .with_schema_id(1)
2336 .with_fields(vec![
2337 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2338 ])
2339 .build()
2340 .unwrap(),
2341 );
2342
2343 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2344 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2345 PARQUET_FIELD_ID_META_KEY.to_string(),
2346 "1".to_string(),
2347 )])),
2348 ]));
2349
2350 let tmp_dir = TempDir::new().unwrap();
2351 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2352 let file_path = format!("{table_location}/multi_row_group.parquet");
2353
2354 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2356 (0..100).collect::<Vec<i32>>(),
2357 ))])
2358 .unwrap();
2359 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2360 (100..200).collect::<Vec<i32>>(),
2361 ))])
2362 .unwrap();
2363 let batch3 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2364 (200..300).collect::<Vec<i32>>(),
2365 ))])
2366 .unwrap();
2367
2368 let props = WriterProperties::builder()
2369 .set_compression(Compression::SNAPPY)
2370 .set_max_row_group_size(100)
2371 .build();
2372
2373 let file = File::create(&file_path).unwrap();
2374 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2375 writer.write(&batch1).expect("Writing batch 1");
2376 writer.write(&batch2).expect("Writing batch 2");
2377 writer.write(&batch3).expect("Writing batch 3");
2378 writer.close().unwrap();
2379
2380 let file = File::open(&file_path).unwrap();
2382 let reader = SerializedFileReader::new(file).unwrap();
2383 let metadata = reader.metadata();
2384
2385 println!("File has {} row groups", metadata.num_row_groups());
2386 assert_eq!(metadata.num_row_groups(), 3, "Expected 3 row groups");
2387
2388 let row_group_0 = metadata.row_group(0);
2390 let row_group_1 = metadata.row_group(1);
2391 let row_group_2 = metadata.row_group(2);
2392
2393 let rg0_start = 4u64; let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2395 let rg2_start = rg1_start + row_group_1.compressed_size() as u64;
2396 let file_end = rg2_start + row_group_2.compressed_size() as u64;
2397
2398 println!(
2399 "Row group 0: {} rows, starts at byte {}, {} bytes compressed",
2400 row_group_0.num_rows(),
2401 rg0_start,
2402 row_group_0.compressed_size()
2403 );
2404 println!(
2405 "Row group 1: {} rows, starts at byte {}, {} bytes compressed",
2406 row_group_1.num_rows(),
2407 rg1_start,
2408 row_group_1.compressed_size()
2409 );
2410 println!(
2411 "Row group 2: {} rows, starts at byte {}, {} bytes compressed",
2412 row_group_2.num_rows(),
2413 rg2_start,
2414 row_group_2.compressed_size()
2415 );
2416
2417 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2418 let reader = ArrowReaderBuilder::new(file_io).build();
2419
2420 let task1 = FileScanTask {
2422 start: rg0_start,
2423 length: row_group_0.compressed_size() as u64,
2424 record_count: Some(100),
2425 data_file_path: file_path.clone(),
2426 data_file_format: DataFileFormat::Parquet,
2427 schema: schema.clone(),
2428 project_field_ids: vec![1],
2429 predicate: None,
2430 deletes: vec![],
2431 partition: None,
2432 partition_spec: None,
2433 name_mapping: None,
2434 case_sensitive: false,
2435 };
2436
2437 let task2 = FileScanTask {
2439 start: rg1_start,
2440 length: file_end - rg1_start,
2441 record_count: Some(200),
2442 data_file_path: file_path.clone(),
2443 data_file_format: DataFileFormat::Parquet,
2444 schema: schema.clone(),
2445 project_field_ids: vec![1],
2446 predicate: None,
2447 deletes: vec![],
2448 partition: None,
2449 partition_spec: None,
2450 name_mapping: None,
2451 case_sensitive: false,
2452 };
2453
2454 let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
2455 let result1 = reader
2456 .clone()
2457 .read(tasks1)
2458 .unwrap()
2459 .try_collect::<Vec<RecordBatch>>()
2460 .await
2461 .unwrap();
2462
2463 let total_rows_task1: usize = result1.iter().map(|b| b.num_rows()).sum();
2464 println!(
2465 "Task 1 (bytes {}-{}) returned {} rows",
2466 rg0_start,
2467 rg0_start + row_group_0.compressed_size() as u64,
2468 total_rows_task1
2469 );
2470
2471 let tasks2 = Box::pin(futures::stream::iter(vec![Ok(task2)])) as FileScanTaskStream;
2472 let result2 = reader
2473 .read(tasks2)
2474 .unwrap()
2475 .try_collect::<Vec<RecordBatch>>()
2476 .await
2477 .unwrap();
2478
2479 let total_rows_task2: usize = result2.iter().map(|b| b.num_rows()).sum();
2480 println!("Task 2 (bytes {rg1_start}-{file_end}) returned {total_rows_task2} rows");
2481
2482 assert_eq!(
2483 total_rows_task1, 100,
2484 "Task 1 should read only the first row group (100 rows), but got {total_rows_task1} rows"
2485 );
2486
2487 assert_eq!(
2488 total_rows_task2, 200,
2489 "Task 2 should read only the second+third row groups (200 rows), but got {total_rows_task2} rows"
2490 );
2491
2492 if total_rows_task1 > 0 {
2494 let first_batch = &result1[0];
2495 let id_col = first_batch
2496 .column(0)
2497 .as_primitive::<arrow_array::types::Int32Type>();
2498 let first_val = id_col.value(0);
2499 let last_val = id_col.value(id_col.len() - 1);
2500 println!("Task 1 data range: {first_val} to {last_val}");
2501
2502 assert_eq!(first_val, 0, "Task 1 should start with id=0");
2503 assert_eq!(last_val, 99, "Task 1 should end with id=99");
2504 }
2505
2506 if total_rows_task2 > 0 {
2507 let first_batch = &result2[0];
2508 let id_col = first_batch
2509 .column(0)
2510 .as_primitive::<arrow_array::types::Int32Type>();
2511 let first_val = id_col.value(0);
2512 println!("Task 2 first value: {first_val}");
2513
2514 assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0");
2515 }
2516 }
2517
2518 #[tokio::test]
2524 async fn test_schema_evolution_add_column() {
2525 use arrow_array::{Array, Int32Array};
2526
2527 let new_schema = Arc::new(
2529 Schema::builder()
2530 .with_schema_id(2)
2531 .with_fields(vec![
2532 NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)).into(),
2533 NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
2534 ])
2535 .build()
2536 .unwrap(),
2537 );
2538
2539 let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
2541 Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([(
2542 PARQUET_FIELD_ID_META_KEY.to_string(),
2543 "1".to_string(),
2544 )])),
2545 ]));
2546
2547 let tmp_dir = TempDir::new().unwrap();
2549 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2550 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2551
2552 let data_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
2553 let to_write = RecordBatch::try_new(arrow_schema_old.clone(), vec![data_a]).unwrap();
2554
2555 let props = WriterProperties::builder()
2556 .set_compression(Compression::SNAPPY)
2557 .build();
2558 let file = File::create(format!("{table_location}/old_file.parquet")).unwrap();
2559 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
2560 writer.write(&to_write).expect("Writing batch");
2561 writer.close().unwrap();
2562
2563 let reader = ArrowReaderBuilder::new(file_io).build();
2565 let tasks = Box::pin(futures::stream::iter(
2566 vec![Ok(FileScanTask {
2567 start: 0,
2568 length: 0,
2569 record_count: None,
2570 data_file_path: format!("{table_location}/old_file.parquet"),
2571 data_file_format: DataFileFormat::Parquet,
2572 schema: new_schema.clone(),
2573 project_field_ids: vec![1, 2], predicate: None,
2575 deletes: vec![],
2576 partition: None,
2577 partition_spec: None,
2578 name_mapping: None,
2579 case_sensitive: false,
2580 })]
2581 .into_iter(),
2582 )) as FileScanTaskStream;
2583
2584 let result = reader
2585 .read(tasks)
2586 .unwrap()
2587 .try_collect::<Vec<RecordBatch>>()
2588 .await
2589 .unwrap();
2590
2591 assert_eq!(result.len(), 1);
2593 let batch = &result[0];
2594
2595 assert_eq!(batch.num_columns(), 2);
2597 assert_eq!(batch.num_rows(), 3);
2598
2599 let col_a = batch
2601 .column(0)
2602 .as_primitive::<arrow_array::types::Int32Type>();
2603 assert_eq!(col_a.values(), &[1, 2, 3]);
2604
2605 let col_b = batch
2607 .column(1)
2608 .as_primitive::<arrow_array::types::Int32Type>();
2609 assert_eq!(col_b.null_count(), 3);
2610 assert!(col_b.is_null(0));
2611 assert!(col_b.is_null(1));
2612 assert!(col_b.is_null(2));
2613 }
2614
2615 #[tokio::test]
2633 async fn test_position_delete_across_multiple_row_groups() {
2634 use arrow_array::{Int32Array, Int64Array};
2635 use parquet::file::reader::{FileReader, SerializedFileReader};
2636
2637 const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2639 const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2640
2641 let tmp_dir = TempDir::new().unwrap();
2642 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2643
2644 let table_schema = Arc::new(
2646 Schema::builder()
2647 .with_schema_id(1)
2648 .with_fields(vec![
2649 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2650 ])
2651 .build()
2652 .unwrap(),
2653 );
2654
2655 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2656 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2657 PARQUET_FIELD_ID_META_KEY.to_string(),
2658 "1".to_string(),
2659 )])),
2660 ]));
2661
2662 let data_file_path = format!("{table_location}/data.parquet");
2666
2667 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2668 Int32Array::from_iter_values(1..=100),
2669 )])
2670 .unwrap();
2671
2672 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2673 Int32Array::from_iter_values(101..=200),
2674 )])
2675 .unwrap();
2676
2677 let props = WriterProperties::builder()
2679 .set_compression(Compression::SNAPPY)
2680 .set_max_row_group_size(100)
2681 .build();
2682
2683 let file = File::create(&data_file_path).unwrap();
2684 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2685 writer.write(&batch1).expect("Writing batch 1");
2686 writer.write(&batch2).expect("Writing batch 2");
2687 writer.close().unwrap();
2688
2689 let verify_file = File::open(&data_file_path).unwrap();
2691 let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2692 assert_eq!(
2693 verify_reader.metadata().num_row_groups(),
2694 2,
2695 "Should have 2 row groups"
2696 );
2697
2698 let delete_file_path = format!("{table_location}/deletes.parquet");
2700
2701 let delete_schema = Arc::new(ArrowSchema::new(vec![
2702 Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2703 PARQUET_FIELD_ID_META_KEY.to_string(),
2704 FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2705 )])),
2706 Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2707 PARQUET_FIELD_ID_META_KEY.to_string(),
2708 FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2709 )])),
2710 ]));
2711
2712 let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2714 Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2715 Arc::new(Int64Array::from_iter_values(vec![199i64])),
2716 ])
2717 .unwrap();
2718
2719 let delete_props = WriterProperties::builder()
2720 .set_compression(Compression::SNAPPY)
2721 .build();
2722
2723 let delete_file = File::create(&delete_file_path).unwrap();
2724 let mut delete_writer =
2725 ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2726 delete_writer.write(&delete_batch).unwrap();
2727 delete_writer.close().unwrap();
2728
2729 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2731 let reader = ArrowReaderBuilder::new(file_io).build();
2732
2733 let task = FileScanTask {
2734 start: 0,
2735 length: 0,
2736 record_count: Some(200),
2737 data_file_path: data_file_path.clone(),
2738 data_file_format: DataFileFormat::Parquet,
2739 schema: table_schema.clone(),
2740 project_field_ids: vec![1],
2741 predicate: None,
2742 deletes: vec![FileScanTaskDeleteFile {
2743 file_path: delete_file_path,
2744 file_type: DataContentType::PositionDeletes,
2745 partition_spec_id: 0,
2746 equality_ids: None,
2747 }],
2748 partition: None,
2749 partition_spec: None,
2750 name_mapping: None,
2751 case_sensitive: false,
2752 };
2753
2754 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
2755 let result = reader
2756 .read(tasks)
2757 .unwrap()
2758 .try_collect::<Vec<RecordBatch>>()
2759 .await
2760 .unwrap();
2761
2762 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
2764
2765 println!("Total rows read: {total_rows}");
2766 println!("Expected: 199 rows (deleted row 199 which had id=200)");
2767
2768 assert_eq!(
2770 total_rows, 199,
2771 "Expected 199 rows after deleting row 199, but got {total_rows} rows. \
2772 The bug causes position deletes in later row groups to be ignored."
2773 );
2774
2775 let all_ids: Vec<i32> = result
2777 .iter()
2778 .flat_map(|batch| {
2779 batch
2780 .column(0)
2781 .as_primitive::<arrow_array::types::Int32Type>()
2782 .values()
2783 .iter()
2784 .copied()
2785 })
2786 .collect();
2787
2788 assert!(
2789 !all_ids.contains(&200),
2790 "Row with id=200 should be deleted but was found in results"
2791 );
2792
2793 let expected_ids: Vec<i32> = (1..=199).collect();
2795 assert_eq!(
2796 all_ids, expected_ids,
2797 "Should have ids 1-199 but got different values"
2798 );
2799 }
2800
2801 #[tokio::test]
2827 async fn test_position_delete_with_row_group_selection() {
2828 use arrow_array::{Int32Array, Int64Array};
2829 use parquet::file::reader::{FileReader, SerializedFileReader};
2830
2831 const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2833 const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2834
2835 let tmp_dir = TempDir::new().unwrap();
2836 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2837
2838 let table_schema = Arc::new(
2840 Schema::builder()
2841 .with_schema_id(1)
2842 .with_fields(vec![
2843 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2844 ])
2845 .build()
2846 .unwrap(),
2847 );
2848
2849 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2850 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2851 PARQUET_FIELD_ID_META_KEY.to_string(),
2852 "1".to_string(),
2853 )])),
2854 ]));
2855
2856 let data_file_path = format!("{table_location}/data.parquet");
2860
2861 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2862 Int32Array::from_iter_values(1..=100),
2863 )])
2864 .unwrap();
2865
2866 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2867 Int32Array::from_iter_values(101..=200),
2868 )])
2869 .unwrap();
2870
2871 let props = WriterProperties::builder()
2873 .set_compression(Compression::SNAPPY)
2874 .set_max_row_group_size(100)
2875 .build();
2876
2877 let file = File::create(&data_file_path).unwrap();
2878 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2879 writer.write(&batch1).expect("Writing batch 1");
2880 writer.write(&batch2).expect("Writing batch 2");
2881 writer.close().unwrap();
2882
2883 let verify_file = File::open(&data_file_path).unwrap();
2885 let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2886 assert_eq!(
2887 verify_reader.metadata().num_row_groups(),
2888 2,
2889 "Should have 2 row groups"
2890 );
2891
2892 let delete_file_path = format!("{table_location}/deletes.parquet");
2894
2895 let delete_schema = Arc::new(ArrowSchema::new(vec![
2896 Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2897 PARQUET_FIELD_ID_META_KEY.to_string(),
2898 FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2899 )])),
2900 Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2901 PARQUET_FIELD_ID_META_KEY.to_string(),
2902 FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2903 )])),
2904 ]));
2905
2906 let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2908 Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2909 Arc::new(Int64Array::from_iter_values(vec![199i64])),
2910 ])
2911 .unwrap();
2912
2913 let delete_props = WriterProperties::builder()
2914 .set_compression(Compression::SNAPPY)
2915 .build();
2916
2917 let delete_file = File::create(&delete_file_path).unwrap();
2918 let mut delete_writer =
2919 ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2920 delete_writer.write(&delete_batch).unwrap();
2921 delete_writer.close().unwrap();
2922
2923 let metadata_file = File::open(&data_file_path).unwrap();
2926 let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
2927 let metadata = metadata_reader.metadata();
2928
2929 let row_group_0 = metadata.row_group(0);
2930 let row_group_1 = metadata.row_group(1);
2931
2932 let rg0_start = 4u64; let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2934 let rg1_length = row_group_1.compressed_size() as u64;
2935
2936 println!(
2937 "Row group 0: starts at byte {}, {} bytes compressed",
2938 rg0_start,
2939 row_group_0.compressed_size()
2940 );
2941 println!(
2942 "Row group 1: starts at byte {}, {} bytes compressed",
2943 rg1_start,
2944 row_group_1.compressed_size()
2945 );
2946
2947 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2948 let reader = ArrowReaderBuilder::new(file_io).build();
2949
2950 let task = FileScanTask {
2952 start: rg1_start,
2953 length: rg1_length,
2954 record_count: Some(100), data_file_path: data_file_path.clone(),
2956 data_file_format: DataFileFormat::Parquet,
2957 schema: table_schema.clone(),
2958 project_field_ids: vec![1],
2959 predicate: None,
2960 deletes: vec![FileScanTaskDeleteFile {
2961 file_path: delete_file_path,
2962 file_type: DataContentType::PositionDeletes,
2963 partition_spec_id: 0,
2964 equality_ids: None,
2965 }],
2966 partition: None,
2967 partition_spec: None,
2968 name_mapping: None,
2969 case_sensitive: false,
2970 };
2971
2972 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
2973 let result = reader
2974 .read(tasks)
2975 .unwrap()
2976 .try_collect::<Vec<RecordBatch>>()
2977 .await
2978 .unwrap();
2979
2980 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
2983
2984 println!("Total rows read from row group 1: {total_rows}");
2985 println!("Expected: 99 rows (row group 1 has 100 rows, 1 delete at position 199)");
2986
2987 assert_eq!(
2989 total_rows, 99,
2990 "Expected 99 rows from row group 1 after deleting position 199, but got {total_rows} rows. \
2991 The bug causes position deletes to be lost when advance_to() is followed by next() \
2992 when skipping unselected row groups."
2993 );
2994
2995 let all_ids: Vec<i32> = result
2997 .iter()
2998 .flat_map(|batch| {
2999 batch
3000 .column(0)
3001 .as_primitive::<arrow_array::types::Int32Type>()
3002 .values()
3003 .iter()
3004 .copied()
3005 })
3006 .collect();
3007
3008 assert!(
3009 !all_ids.contains(&200),
3010 "Row with id=200 should be deleted but was found in results"
3011 );
3012
3013 let expected_ids: Vec<i32> = (101..=199).collect();
3015 assert_eq!(
3016 all_ids, expected_ids,
3017 "Should have ids 101-199 but got different values"
3018 );
3019 }
3020 #[tokio::test]
3049 async fn test_position_delete_in_skipped_row_group() {
3050 use arrow_array::{Int32Array, Int64Array};
3051 use parquet::file::reader::{FileReader, SerializedFileReader};
3052
3053 const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
3055 const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
3056
3057 let tmp_dir = TempDir::new().unwrap();
3058 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3059
3060 let table_schema = Arc::new(
3062 Schema::builder()
3063 .with_schema_id(1)
3064 .with_fields(vec![
3065 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3066 ])
3067 .build()
3068 .unwrap(),
3069 );
3070
3071 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3072 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3073 PARQUET_FIELD_ID_META_KEY.to_string(),
3074 "1".to_string(),
3075 )])),
3076 ]));
3077
3078 let data_file_path = format!("{table_location}/data.parquet");
3082
3083 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3084 Int32Array::from_iter_values(1..=100),
3085 )])
3086 .unwrap();
3087
3088 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3089 Int32Array::from_iter_values(101..=200),
3090 )])
3091 .unwrap();
3092
3093 let props = WriterProperties::builder()
3095 .set_compression(Compression::SNAPPY)
3096 .set_max_row_group_size(100)
3097 .build();
3098
3099 let file = File::create(&data_file_path).unwrap();
3100 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3101 writer.write(&batch1).expect("Writing batch 1");
3102 writer.write(&batch2).expect("Writing batch 2");
3103 writer.close().unwrap();
3104
3105 let verify_file = File::open(&data_file_path).unwrap();
3107 let verify_reader = SerializedFileReader::new(verify_file).unwrap();
3108 assert_eq!(
3109 verify_reader.metadata().num_row_groups(),
3110 2,
3111 "Should have 2 row groups"
3112 );
3113
3114 let delete_file_path = format!("{table_location}/deletes.parquet");
3116
3117 let delete_schema = Arc::new(ArrowSchema::new(vec![
3118 Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
3119 PARQUET_FIELD_ID_META_KEY.to_string(),
3120 FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
3121 )])),
3122 Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
3123 PARQUET_FIELD_ID_META_KEY.to_string(),
3124 FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
3125 )])),
3126 ]));
3127
3128 let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
3130 Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
3131 Arc::new(Int64Array::from_iter_values(vec![0i64])),
3132 ])
3133 .unwrap();
3134
3135 let delete_props = WriterProperties::builder()
3136 .set_compression(Compression::SNAPPY)
3137 .build();
3138
3139 let delete_file = File::create(&delete_file_path).unwrap();
3140 let mut delete_writer =
3141 ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
3142 delete_writer.write(&delete_batch).unwrap();
3143 delete_writer.close().unwrap();
3144
3145 let metadata_file = File::open(&data_file_path).unwrap();
3148 let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
3149 let metadata = metadata_reader.metadata();
3150
3151 let row_group_0 = metadata.row_group(0);
3152 let row_group_1 = metadata.row_group(1);
3153
3154 let rg0_start = 4u64; let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
3156 let rg1_length = row_group_1.compressed_size() as u64;
3157
3158 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3159 let reader = ArrowReaderBuilder::new(file_io).build();
3160
3161 let task = FileScanTask {
3163 start: rg1_start,
3164 length: rg1_length,
3165 record_count: Some(100), data_file_path: data_file_path.clone(),
3167 data_file_format: DataFileFormat::Parquet,
3168 schema: table_schema.clone(),
3169 project_field_ids: vec![1],
3170 predicate: None,
3171 deletes: vec![FileScanTaskDeleteFile {
3172 file_path: delete_file_path,
3173 file_type: DataContentType::PositionDeletes,
3174 partition_spec_id: 0,
3175 equality_ids: None,
3176 }],
3177 partition: None,
3178 partition_spec: None,
3179 name_mapping: None,
3180 case_sensitive: false,
3181 };
3182
3183 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
3184 let result = reader
3185 .read(tasks)
3186 .unwrap()
3187 .try_collect::<Vec<RecordBatch>>()
3188 .await
3189 .unwrap();
3190
3191 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
3194
3195 assert_eq!(
3196 total_rows, 100,
3197 "Expected 100 rows from row group 1 (delete at position 0 is in skipped row group 0). \
3198 If this hangs or fails, it indicates the cached delete index was not updated after advance_to()."
3199 );
3200
3201 let all_ids: Vec<i32> = result
3203 .iter()
3204 .flat_map(|batch| {
3205 batch
3206 .column(0)
3207 .as_primitive::<arrow_array::types::Int32Type>()
3208 .values()
3209 .iter()
3210 .copied()
3211 })
3212 .collect();
3213
3214 let expected_ids: Vec<i32> = (101..=200).collect();
3215 assert_eq!(
3216 all_ids, expected_ids,
3217 "Should have ids 101-200 (all of row group 1)"
3218 );
3219 }
3220
3221 #[tokio::test]
3227 async fn test_read_parquet_file_without_field_ids() {
3228 let schema = Arc::new(
3229 Schema::builder()
3230 .with_schema_id(1)
3231 .with_fields(vec![
3232 NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3233 NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3234 ])
3235 .build()
3236 .unwrap(),
3237 );
3238
3239 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3241 Field::new("name", DataType::Utf8, false),
3242 Field::new("age", DataType::Int32, false),
3243 ]));
3244
3245 let tmp_dir = TempDir::new().unwrap();
3246 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3247 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3248
3249 let name_data = vec!["Alice", "Bob", "Charlie"];
3250 let age_data = vec![30, 25, 35];
3251
3252 use arrow_array::Int32Array;
3253 let name_col = Arc::new(StringArray::from(name_data.clone())) as ArrayRef;
3254 let age_col = Arc::new(Int32Array::from(age_data.clone())) as ArrayRef;
3255
3256 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![name_col, age_col]).unwrap();
3257
3258 let props = WriterProperties::builder()
3259 .set_compression(Compression::SNAPPY)
3260 .build();
3261
3262 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3263 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3264
3265 writer.write(&to_write).expect("Writing batch");
3266 writer.close().unwrap();
3267
3268 let reader = ArrowReaderBuilder::new(file_io).build();
3269
3270 let tasks = Box::pin(futures::stream::iter(
3271 vec![Ok(FileScanTask {
3272 start: 0,
3273 length: 0,
3274 record_count: None,
3275 data_file_path: format!("{table_location}/1.parquet"),
3276 data_file_format: DataFileFormat::Parquet,
3277 schema: schema.clone(),
3278 project_field_ids: vec![1, 2],
3279 predicate: None,
3280 deletes: vec![],
3281 partition: None,
3282 partition_spec: None,
3283 name_mapping: None,
3284 case_sensitive: false,
3285 })]
3286 .into_iter(),
3287 )) as FileScanTaskStream;
3288
3289 let result = reader
3290 .read(tasks)
3291 .unwrap()
3292 .try_collect::<Vec<RecordBatch>>()
3293 .await
3294 .unwrap();
3295
3296 assert_eq!(result.len(), 1);
3297 let batch = &result[0];
3298 assert_eq!(batch.num_rows(), 3);
3299 assert_eq!(batch.num_columns(), 2);
3300
3301 let name_array = batch.column(0).as_string::<i32>();
3303 assert_eq!(name_array.value(0), "Alice");
3304 assert_eq!(name_array.value(1), "Bob");
3305 assert_eq!(name_array.value(2), "Charlie");
3306
3307 let age_array = batch
3308 .column(1)
3309 .as_primitive::<arrow_array::types::Int32Type>();
3310 assert_eq!(age_array.value(0), 30);
3311 assert_eq!(age_array.value(1), 25);
3312 assert_eq!(age_array.value(2), 35);
3313 }
3314
3315 #[tokio::test]
3319 async fn test_read_parquet_without_field_ids_partial_projection() {
3320 use arrow_array::Int32Array;
3321
3322 let schema = Arc::new(
3323 Schema::builder()
3324 .with_schema_id(1)
3325 .with_fields(vec![
3326 NestedField::required(1, "col1", Type::Primitive(PrimitiveType::String)).into(),
3327 NestedField::required(2, "col2", Type::Primitive(PrimitiveType::Int)).into(),
3328 NestedField::required(3, "col3", Type::Primitive(PrimitiveType::String)).into(),
3329 NestedField::required(4, "col4", Type::Primitive(PrimitiveType::Int)).into(),
3330 ])
3331 .build()
3332 .unwrap(),
3333 );
3334
3335 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3336 Field::new("col1", DataType::Utf8, false),
3337 Field::new("col2", DataType::Int32, false),
3338 Field::new("col3", DataType::Utf8, false),
3339 Field::new("col4", DataType::Int32, false),
3340 ]));
3341
3342 let tmp_dir = TempDir::new().unwrap();
3343 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3344 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3345
3346 let col1_data = Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef;
3347 let col2_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3348 let col3_data = Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef;
3349 let col4_data = Arc::new(Int32Array::from(vec![30, 40])) as ArrayRef;
3350
3351 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
3352 col1_data, col2_data, col3_data, col4_data,
3353 ])
3354 .unwrap();
3355
3356 let props = WriterProperties::builder()
3357 .set_compression(Compression::SNAPPY)
3358 .build();
3359
3360 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3361 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3362
3363 writer.write(&to_write).expect("Writing batch");
3364 writer.close().unwrap();
3365
3366 let reader = ArrowReaderBuilder::new(file_io).build();
3367
3368 let tasks = Box::pin(futures::stream::iter(
3369 vec![Ok(FileScanTask {
3370 start: 0,
3371 length: 0,
3372 record_count: None,
3373 data_file_path: format!("{table_location}/1.parquet"),
3374 data_file_format: DataFileFormat::Parquet,
3375 schema: schema.clone(),
3376 project_field_ids: vec![1, 3],
3377 predicate: None,
3378 deletes: vec![],
3379 partition: None,
3380 partition_spec: None,
3381 name_mapping: None,
3382 case_sensitive: false,
3383 })]
3384 .into_iter(),
3385 )) as FileScanTaskStream;
3386
3387 let result = reader
3388 .read(tasks)
3389 .unwrap()
3390 .try_collect::<Vec<RecordBatch>>()
3391 .await
3392 .unwrap();
3393
3394 assert_eq!(result.len(), 1);
3395 let batch = &result[0];
3396 assert_eq!(batch.num_rows(), 2);
3397 assert_eq!(batch.num_columns(), 2);
3398
3399 let col1_array = batch.column(0).as_string::<i32>();
3400 assert_eq!(col1_array.value(0), "a");
3401 assert_eq!(col1_array.value(1), "b");
3402
3403 let col3_array = batch.column(1).as_string::<i32>();
3404 assert_eq!(col3_array.value(0), "c");
3405 assert_eq!(col3_array.value(1), "d");
3406 }
3407
3408 #[tokio::test]
3412 async fn test_read_parquet_without_field_ids_schema_evolution() {
3413 use arrow_array::{Array, Int32Array};
3414
3415 let schema = Arc::new(
3417 Schema::builder()
3418 .with_schema_id(1)
3419 .with_fields(vec![
3420 NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3421 NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3422 NestedField::optional(3, "city", Type::Primitive(PrimitiveType::String)).into(),
3423 ])
3424 .build()
3425 .unwrap(),
3426 );
3427
3428 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3429 Field::new("name", DataType::Utf8, false),
3430 Field::new("age", DataType::Int32, false),
3431 ]));
3432
3433 let tmp_dir = TempDir::new().unwrap();
3434 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3435 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3436
3437 let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3438 let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3439
3440 let to_write =
3441 RecordBatch::try_new(arrow_schema.clone(), vec![name_data, age_data]).unwrap();
3442
3443 let props = WriterProperties::builder()
3444 .set_compression(Compression::SNAPPY)
3445 .build();
3446
3447 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3448 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3449
3450 writer.write(&to_write).expect("Writing batch");
3451 writer.close().unwrap();
3452
3453 let reader = ArrowReaderBuilder::new(file_io).build();
3454
3455 let tasks = Box::pin(futures::stream::iter(
3456 vec![Ok(FileScanTask {
3457 start: 0,
3458 length: 0,
3459 record_count: None,
3460 data_file_path: format!("{table_location}/1.parquet"),
3461 data_file_format: DataFileFormat::Parquet,
3462 schema: schema.clone(),
3463 project_field_ids: vec![1, 2, 3],
3464 predicate: None,
3465 deletes: vec![],
3466 partition: None,
3467 partition_spec: None,
3468 name_mapping: None,
3469 case_sensitive: false,
3470 })]
3471 .into_iter(),
3472 )) as FileScanTaskStream;
3473
3474 let result = reader
3475 .read(tasks)
3476 .unwrap()
3477 .try_collect::<Vec<RecordBatch>>()
3478 .await
3479 .unwrap();
3480
3481 assert_eq!(result.len(), 1);
3482 let batch = &result[0];
3483 assert_eq!(batch.num_rows(), 2);
3484 assert_eq!(batch.num_columns(), 3);
3485
3486 let name_array = batch.column(0).as_string::<i32>();
3487 assert_eq!(name_array.value(0), "Alice");
3488 assert_eq!(name_array.value(1), "Bob");
3489
3490 let age_array = batch
3491 .column(1)
3492 .as_primitive::<arrow_array::types::Int32Type>();
3493 assert_eq!(age_array.value(0), 30);
3494 assert_eq!(age_array.value(1), 25);
3495
3496 let city_array = batch.column(2).as_string::<i32>();
3498 assert_eq!(city_array.null_count(), 2);
3499 assert!(city_array.is_null(0));
3500 assert!(city_array.is_null(1));
3501 }
3502
3503 #[tokio::test]
3506 async fn test_read_parquet_without_field_ids_multiple_row_groups() {
3507 use arrow_array::Int32Array;
3508
3509 let schema = Arc::new(
3510 Schema::builder()
3511 .with_schema_id(1)
3512 .with_fields(vec![
3513 NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3514 NestedField::required(2, "value", Type::Primitive(PrimitiveType::Int)).into(),
3515 ])
3516 .build()
3517 .unwrap(),
3518 );
3519
3520 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3521 Field::new("name", DataType::Utf8, false),
3522 Field::new("value", DataType::Int32, false),
3523 ]));
3524
3525 let tmp_dir = TempDir::new().unwrap();
3526 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3527 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3528
3529 let props = WriterProperties::builder()
3531 .set_compression(Compression::SNAPPY)
3532 .set_write_batch_size(2)
3533 .set_max_row_group_size(2)
3534 .build();
3535
3536 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3537 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3538
3539 for batch_num in 0..3 {
3541 let name_data = Arc::new(StringArray::from(vec![
3542 format!("name_{}", batch_num * 2),
3543 format!("name_{}", batch_num * 2 + 1),
3544 ])) as ArrayRef;
3545 let value_data =
3546 Arc::new(Int32Array::from(vec![batch_num * 2, batch_num * 2 + 1])) as ArrayRef;
3547
3548 let batch =
3549 RecordBatch::try_new(arrow_schema.clone(), vec![name_data, value_data]).unwrap();
3550 writer.write(&batch).expect("Writing batch");
3551 }
3552 writer.close().unwrap();
3553
3554 let reader = ArrowReaderBuilder::new(file_io).build();
3555
3556 let tasks = Box::pin(futures::stream::iter(
3557 vec![Ok(FileScanTask {
3558 start: 0,
3559 length: 0,
3560 record_count: None,
3561 data_file_path: format!("{table_location}/1.parquet"),
3562 data_file_format: DataFileFormat::Parquet,
3563 schema: schema.clone(),
3564 project_field_ids: vec![1, 2],
3565 predicate: None,
3566 deletes: vec![],
3567 partition: None,
3568 partition_spec: None,
3569 name_mapping: None,
3570 case_sensitive: false,
3571 })]
3572 .into_iter(),
3573 )) as FileScanTaskStream;
3574
3575 let result = reader
3576 .read(tasks)
3577 .unwrap()
3578 .try_collect::<Vec<RecordBatch>>()
3579 .await
3580 .unwrap();
3581
3582 assert!(!result.is_empty());
3583
3584 let mut all_names = Vec::new();
3585 let mut all_values = Vec::new();
3586
3587 for batch in &result {
3588 let name_array = batch.column(0).as_string::<i32>();
3589 let value_array = batch
3590 .column(1)
3591 .as_primitive::<arrow_array::types::Int32Type>();
3592
3593 for i in 0..batch.num_rows() {
3594 all_names.push(name_array.value(i).to_string());
3595 all_values.push(value_array.value(i));
3596 }
3597 }
3598
3599 assert_eq!(all_names.len(), 6);
3600 assert_eq!(all_values.len(), 6);
3601
3602 for i in 0..6 {
3603 assert_eq!(all_names[i], format!("name_{i}"));
3604 assert_eq!(all_values[i], i as i32);
3605 }
3606 }
3607
3608 #[tokio::test]
3612 async fn test_read_parquet_without_field_ids_with_struct() {
3613 use arrow_array::{Int32Array, StructArray};
3614 use arrow_schema::Fields;
3615
3616 let schema = Arc::new(
3617 Schema::builder()
3618 .with_schema_id(1)
3619 .with_fields(vec![
3620 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3621 NestedField::required(
3622 2,
3623 "person",
3624 Type::Struct(crate::spec::StructType::new(vec![
3625 NestedField::required(
3626 3,
3627 "name",
3628 Type::Primitive(PrimitiveType::String),
3629 )
3630 .into(),
3631 NestedField::required(4, "age", Type::Primitive(PrimitiveType::Int))
3632 .into(),
3633 ])),
3634 )
3635 .into(),
3636 ])
3637 .build()
3638 .unwrap(),
3639 );
3640
3641 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3642 Field::new("id", DataType::Int32, false),
3643 Field::new(
3644 "person",
3645 DataType::Struct(Fields::from(vec![
3646 Field::new("name", DataType::Utf8, false),
3647 Field::new("age", DataType::Int32, false),
3648 ])),
3649 false,
3650 ),
3651 ]));
3652
3653 let tmp_dir = TempDir::new().unwrap();
3654 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3655 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3656
3657 let id_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3658 let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3659 let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3660 let person_data = Arc::new(StructArray::from(vec![
3661 (
3662 Arc::new(Field::new("name", DataType::Utf8, false)),
3663 name_data,
3664 ),
3665 (
3666 Arc::new(Field::new("age", DataType::Int32, false)),
3667 age_data,
3668 ),
3669 ])) as ArrayRef;
3670
3671 let to_write =
3672 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, person_data]).unwrap();
3673
3674 let props = WriterProperties::builder()
3675 .set_compression(Compression::SNAPPY)
3676 .build();
3677
3678 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3679 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3680
3681 writer.write(&to_write).expect("Writing batch");
3682 writer.close().unwrap();
3683
3684 let reader = ArrowReaderBuilder::new(file_io).build();
3685
3686 let tasks = Box::pin(futures::stream::iter(
3687 vec![Ok(FileScanTask {
3688 start: 0,
3689 length: 0,
3690 record_count: None,
3691 data_file_path: format!("{table_location}/1.parquet"),
3692 data_file_format: DataFileFormat::Parquet,
3693 schema: schema.clone(),
3694 project_field_ids: vec![1, 2],
3695 predicate: None,
3696 deletes: vec![],
3697 partition: None,
3698 partition_spec: None,
3699 name_mapping: None,
3700 case_sensitive: false,
3701 })]
3702 .into_iter(),
3703 )) as FileScanTaskStream;
3704
3705 let result = reader
3706 .read(tasks)
3707 .unwrap()
3708 .try_collect::<Vec<RecordBatch>>()
3709 .await
3710 .unwrap();
3711
3712 assert_eq!(result.len(), 1);
3713 let batch = &result[0];
3714 assert_eq!(batch.num_rows(), 2);
3715 assert_eq!(batch.num_columns(), 2);
3716
3717 let id_array = batch
3718 .column(0)
3719 .as_primitive::<arrow_array::types::Int32Type>();
3720 assert_eq!(id_array.value(0), 1);
3721 assert_eq!(id_array.value(1), 2);
3722
3723 let person_array = batch.column(1).as_struct();
3724 assert_eq!(person_array.num_columns(), 2);
3725
3726 let name_array = person_array.column(0).as_string::<i32>();
3727 assert_eq!(name_array.value(0), "Alice");
3728 assert_eq!(name_array.value(1), "Bob");
3729
3730 let age_array = person_array
3731 .column(1)
3732 .as_primitive::<arrow_array::types::Int32Type>();
3733 assert_eq!(age_array.value(0), 30);
3734 assert_eq!(age_array.value(1), 25);
3735 }
3736
3737 #[tokio::test]
3741 async fn test_read_parquet_without_field_ids_schema_evolution_add_column_in_middle() {
3742 use arrow_array::{Array, Int32Array};
3743
3744 let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
3745 Field::new("col0", DataType::Int32, true),
3746 Field::new("col1", DataType::Int32, true),
3747 ]));
3748
3749 let schema = Arc::new(
3751 Schema::builder()
3752 .with_schema_id(1)
3753 .with_fields(vec![
3754 NestedField::optional(1, "col0", Type::Primitive(PrimitiveType::Int)).into(),
3755 NestedField::optional(5, "newCol", Type::Primitive(PrimitiveType::Int)).into(),
3756 NestedField::optional(2, "col1", Type::Primitive(PrimitiveType::Int)).into(),
3757 ])
3758 .build()
3759 .unwrap(),
3760 );
3761
3762 let tmp_dir = TempDir::new().unwrap();
3763 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3764 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3765
3766 let col0_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3767 let col1_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3768
3769 let to_write =
3770 RecordBatch::try_new(arrow_schema_old.clone(), vec![col0_data, col1_data]).unwrap();
3771
3772 let props = WriterProperties::builder()
3773 .set_compression(Compression::SNAPPY)
3774 .build();
3775
3776 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3777 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3778 writer.write(&to_write).expect("Writing batch");
3779 writer.close().unwrap();
3780
3781 let reader = ArrowReaderBuilder::new(file_io).build();
3782
3783 let tasks = Box::pin(futures::stream::iter(
3784 vec![Ok(FileScanTask {
3785 start: 0,
3786 length: 0,
3787 record_count: None,
3788 data_file_path: format!("{table_location}/1.parquet"),
3789 data_file_format: DataFileFormat::Parquet,
3790 schema: schema.clone(),
3791 project_field_ids: vec![1, 5, 2],
3792 predicate: None,
3793 deletes: vec![],
3794 partition: None,
3795 partition_spec: None,
3796 name_mapping: None,
3797 case_sensitive: false,
3798 })]
3799 .into_iter(),
3800 )) as FileScanTaskStream;
3801
3802 let result = reader
3803 .read(tasks)
3804 .unwrap()
3805 .try_collect::<Vec<RecordBatch>>()
3806 .await
3807 .unwrap();
3808
3809 assert_eq!(result.len(), 1);
3810 let batch = &result[0];
3811 assert_eq!(batch.num_rows(), 2);
3812 assert_eq!(batch.num_columns(), 3);
3813
3814 let result_col0 = batch
3815 .column(0)
3816 .as_primitive::<arrow_array::types::Int32Type>();
3817 assert_eq!(result_col0.value(0), 1);
3818 assert_eq!(result_col0.value(1), 2);
3819
3820 let result_newcol = batch
3822 .column(1)
3823 .as_primitive::<arrow_array::types::Int32Type>();
3824 assert_eq!(result_newcol.null_count(), 2);
3825 assert!(result_newcol.is_null(0));
3826 assert!(result_newcol.is_null(1));
3827
3828 let result_col1 = batch
3829 .column(2)
3830 .as_primitive::<arrow_array::types::Int32Type>();
3831 assert_eq!(result_col1.value(0), 10);
3832 assert_eq!(result_col1.value(1), 20);
3833 }
3834
3835 #[tokio::test]
3839 async fn test_read_parquet_without_field_ids_filter_eliminates_all_rows() {
3840 use arrow_array::{Float64Array, Int32Array};
3841
3842 let schema = Arc::new(
3844 Schema::builder()
3845 .with_schema_id(1)
3846 .with_fields(vec![
3847 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3848 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3849 NestedField::required(3, "value", Type::Primitive(PrimitiveType::Double))
3850 .into(),
3851 ])
3852 .build()
3853 .unwrap(),
3854 );
3855
3856 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3857 Field::new("id", DataType::Int32, false),
3858 Field::new("name", DataType::Utf8, false),
3859 Field::new("value", DataType::Float64, false),
3860 ]));
3861
3862 let tmp_dir = TempDir::new().unwrap();
3863 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3864 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3865
3866 let id_data = Arc::new(Int32Array::from(vec![10, 11, 12])) as ArrayRef;
3868 let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
3869 let value_data = Arc::new(Float64Array::from(vec![100.0, 200.0, 300.0])) as ArrayRef;
3870
3871 let to_write =
3872 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data, value_data])
3873 .unwrap();
3874
3875 let props = WriterProperties::builder()
3876 .set_compression(Compression::SNAPPY)
3877 .build();
3878
3879 let file = File::create(format!("{table_location}/1.parquet")).unwrap();
3880 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3881 writer.write(&to_write).expect("Writing batch");
3882 writer.close().unwrap();
3883
3884 let predicate = Reference::new("id").less_than(Datum::int(5));
3886
3887 let reader = ArrowReaderBuilder::new(file_io)
3889 .with_row_group_filtering_enabled(true)
3890 .with_row_selection_enabled(true)
3891 .build();
3892
3893 let tasks = Box::pin(futures::stream::iter(
3894 vec![Ok(FileScanTask {
3895 start: 0,
3896 length: 0,
3897 record_count: None,
3898 data_file_path: format!("{table_location}/1.parquet"),
3899 data_file_format: DataFileFormat::Parquet,
3900 schema: schema.clone(),
3901 project_field_ids: vec![1, 2, 3],
3902 predicate: Some(predicate.bind(schema, true).unwrap()),
3903 deletes: vec![],
3904 partition: None,
3905 partition_spec: None,
3906 name_mapping: None,
3907 case_sensitive: false,
3908 })]
3909 .into_iter(),
3910 )) as FileScanTaskStream;
3911
3912 let result = reader
3914 .read(tasks)
3915 .unwrap()
3916 .try_collect::<Vec<RecordBatch>>()
3917 .await
3918 .unwrap();
3919
3920 assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0));
3922 }
3923
3924 #[tokio::test]
3927 async fn test_read_with_concurrency_one() {
3928 use arrow_array::Int32Array;
3929
3930 let schema = Arc::new(
3931 Schema::builder()
3932 .with_schema_id(1)
3933 .with_fields(vec![
3934 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3935 NestedField::required(2, "file_num", Type::Primitive(PrimitiveType::Int))
3936 .into(),
3937 ])
3938 .build()
3939 .unwrap(),
3940 );
3941
3942 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3943 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3944 PARQUET_FIELD_ID_META_KEY.to_string(),
3945 "1".to_string(),
3946 )])),
3947 Field::new("file_num", DataType::Int32, false).with_metadata(HashMap::from([(
3948 PARQUET_FIELD_ID_META_KEY.to_string(),
3949 "2".to_string(),
3950 )])),
3951 ]));
3952
3953 let tmp_dir = TempDir::new().unwrap();
3954 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3955 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3956
3957 let props = WriterProperties::builder()
3959 .set_compression(Compression::SNAPPY)
3960 .build();
3961
3962 for file_num in 0..3 {
3963 let id_data = Arc::new(Int32Array::from_iter_values(
3964 file_num * 10..(file_num + 1) * 10,
3965 )) as ArrayRef;
3966 let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10])) as ArrayRef;
3967
3968 let to_write =
3969 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, file_num_data]).unwrap();
3970
3971 let file = File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap();
3972 let mut writer =
3973 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
3974 writer.write(&to_write).expect("Writing batch");
3975 writer.close().unwrap();
3976 }
3977
3978 let reader = ArrowReaderBuilder::new(file_io)
3980 .with_data_file_concurrency_limit(1)
3981 .build();
3982
3983 let tasks = vec![
3985 Ok(FileScanTask {
3986 start: 0,
3987 length: 0,
3988 record_count: None,
3989 data_file_path: format!("{table_location}/file_0.parquet"),
3990 data_file_format: DataFileFormat::Parquet,
3991 schema: schema.clone(),
3992 project_field_ids: vec![1, 2],
3993 predicate: None,
3994 deletes: vec![],
3995 partition: None,
3996 partition_spec: None,
3997 name_mapping: None,
3998 case_sensitive: false,
3999 }),
4000 Ok(FileScanTask {
4001 start: 0,
4002 length: 0,
4003 record_count: None,
4004 data_file_path: format!("{table_location}/file_1.parquet"),
4005 data_file_format: DataFileFormat::Parquet,
4006 schema: schema.clone(),
4007 project_field_ids: vec![1, 2],
4008 predicate: None,
4009 deletes: vec![],
4010 partition: None,
4011 partition_spec: None,
4012 name_mapping: None,
4013 case_sensitive: false,
4014 }),
4015 Ok(FileScanTask {
4016 start: 0,
4017 length: 0,
4018 record_count: None,
4019 data_file_path: format!("{table_location}/file_2.parquet"),
4020 data_file_format: DataFileFormat::Parquet,
4021 schema: schema.clone(),
4022 project_field_ids: vec![1, 2],
4023 predicate: None,
4024 deletes: vec![],
4025 partition: None,
4026 partition_spec: None,
4027 name_mapping: None,
4028 case_sensitive: false,
4029 }),
4030 ];
4031
4032 let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream;
4033
4034 let result = reader
4035 .read(tasks_stream)
4036 .unwrap()
4037 .try_collect::<Vec<RecordBatch>>()
4038 .await
4039 .unwrap();
4040
4041 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
4043 assert_eq!(total_rows, 30, "Should have 30 total rows");
4044
4045 let mut all_ids = Vec::new();
4047 let mut all_file_nums = Vec::new();
4048
4049 for batch in &result {
4050 let id_col = batch
4051 .column(0)
4052 .as_primitive::<arrow_array::types::Int32Type>();
4053 let file_num_col = batch
4054 .column(1)
4055 .as_primitive::<arrow_array::types::Int32Type>();
4056
4057 for i in 0..batch.num_rows() {
4058 all_ids.push(id_col.value(i));
4059 all_file_nums.push(file_num_col.value(i));
4060 }
4061 }
4062
4063 assert_eq!(all_ids.len(), 30);
4064 assert_eq!(all_file_nums.len(), 30);
4065
4066 for i in 0..10 {
4071 assert_eq!(all_file_nums[i], 0, "First 10 rows should be from file_0");
4072 assert_eq!(all_ids[i], i as i32, "IDs should be 0-9");
4073 }
4074 for i in 10..20 {
4075 assert_eq!(all_file_nums[i], 1, "Next 10 rows should be from file_1");
4076 assert_eq!(all_ids[i], i as i32, "IDs should be 10-19");
4077 }
4078 for i in 20..30 {
4079 assert_eq!(all_file_nums[i], 2, "Last 10 rows should be from file_2");
4080 assert_eq!(all_ids[i], i as i32, "IDs should be 20-29");
4081 }
4082 }
4083
4084 #[tokio::test]
4129 async fn test_bucket_partitioning_reads_source_column_from_file() {
4130 use arrow_array::Int32Array;
4131
4132 use crate::spec::{Literal, PartitionSpec, Struct, Transform};
4133
4134 let schema = Arc::new(
4136 Schema::builder()
4137 .with_schema_id(0)
4138 .with_fields(vec![
4139 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
4140 NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
4141 ])
4142 .build()
4143 .unwrap(),
4144 );
4145
4146 let partition_spec = Arc::new(
4148 PartitionSpec::builder(schema.clone())
4149 .with_spec_id(0)
4150 .add_partition_field("id", "id_bucket", Transform::Bucket(4))
4151 .unwrap()
4152 .build()
4153 .unwrap(),
4154 );
4155
4156 let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]);
4158
4159 let arrow_schema = Arc::new(ArrowSchema::new(vec![
4161 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
4162 PARQUET_FIELD_ID_META_KEY.to_string(),
4163 "1".to_string(),
4164 )])),
4165 Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
4166 PARQUET_FIELD_ID_META_KEY.to_string(),
4167 "2".to_string(),
4168 )])),
4169 ]));
4170
4171 let tmp_dir = TempDir::new().unwrap();
4173 let table_location = tmp_dir.path().to_str().unwrap().to_string();
4174 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
4175
4176 let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as ArrayRef;
4177 let name_data =
4178 Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef;
4179
4180 let to_write =
4181 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data]).unwrap();
4182
4183 let props = WriterProperties::builder()
4184 .set_compression(Compression::SNAPPY)
4185 .build();
4186 let file = File::create(format!("{}/data.parquet", &table_location)).unwrap();
4187 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
4188 writer.write(&to_write).expect("Writing batch");
4189 writer.close().unwrap();
4190
4191 let reader = ArrowReaderBuilder::new(file_io).build();
4193 let tasks = Box::pin(futures::stream::iter(
4194 vec![Ok(FileScanTask {
4195 start: 0,
4196 length: 0,
4197 record_count: None,
4198 data_file_path: format!("{table_location}/data.parquet"),
4199 data_file_format: DataFileFormat::Parquet,
4200 schema: schema.clone(),
4201 project_field_ids: vec![1, 2],
4202 predicate: None,
4203 deletes: vec![],
4204 partition: Some(partition_data),
4205 partition_spec: Some(partition_spec),
4206 name_mapping: None,
4207 case_sensitive: false,
4208 })]
4209 .into_iter(),
4210 )) as FileScanTaskStream;
4211
4212 let result = reader
4213 .read(tasks)
4214 .unwrap()
4215 .try_collect::<Vec<RecordBatch>>()
4216 .await
4217 .unwrap();
4218
4219 assert_eq!(result.len(), 1);
4221 let batch = &result[0];
4222
4223 assert_eq!(batch.num_columns(), 2);
4224 assert_eq!(batch.num_rows(), 4);
4225
4226 let id_col = batch
4229 .column(0)
4230 .as_primitive::<arrow_array::types::Int32Type>();
4231 assert_eq!(id_col.value(0), 1);
4232 assert_eq!(id_col.value(1), 5);
4233 assert_eq!(id_col.value(2), 9);
4234 assert_eq!(id_col.value(3), 13);
4235
4236 let name_col = batch.column(1).as_string::<i32>();
4237 assert_eq!(name_col.value(0), "Alice");
4238 assert_eq!(name_col.value(1), "Bob");
4239 assert_eq!(name_col.value(2), "Charlie");
4240 assert_eq!(name_col.value(3), "Dave");
4241 }
4242}