1use std::collections::{HashMap, HashSet};
21use std::ops::Range;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene};
26use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar};
27use arrow_cast::cast::cast;
28use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
29use arrow_schema::{
30 ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
31};
32use arrow_string::like::starts_with;
33use bytes::Bytes;
34use fnv::FnvHashSet;
35use futures::future::BoxFuture;
36use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join};
37use parquet::arrow::arrow_reader::{
38 ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
39};
40use parquet::arrow::async_reader::AsyncFileReader;
41use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask};
42use parquet::file::metadata::{
43 PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
44};
45use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
46
47use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
48use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
49use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
50use crate::delete_vector::DeleteVector;
51use crate::error::Result;
52use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit};
53use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
54use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
55use crate::expr::{BoundPredicate, BoundReference};
56use crate::io::{FileIO, FileMetadata, FileRead};
57use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
58use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type};
59use crate::utils::available_parallelism;
60use crate::{Error, ErrorKind};
61
62pub struct ArrowReaderBuilder {
64 batch_size: Option<usize>,
65 file_io: FileIO,
66 concurrency_limit_data_files: usize,
67 row_group_filtering_enabled: bool,
68 row_selection_enabled: bool,
69}
70
71impl ArrowReaderBuilder {
72 pub fn new(file_io: FileIO) -> Self {
74 let num_cpus = available_parallelism().get();
75
76 ArrowReaderBuilder {
77 batch_size: None,
78 file_io,
79 concurrency_limit_data_files: num_cpus,
80 row_group_filtering_enabled: true,
81 row_selection_enabled: false,
82 }
83 }
84
85 pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
87 self.concurrency_limit_data_files = val;
88 self
89 }
90
91 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
94 self.batch_size = Some(batch_size);
95 self
96 }
97
98 pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
100 self.row_group_filtering_enabled = row_group_filtering_enabled;
101 self
102 }
103
104 pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
106 self.row_selection_enabled = row_selection_enabled;
107 self
108 }
109
110 pub fn build(self) -> ArrowReader {
112 ArrowReader {
113 batch_size: self.batch_size,
114 file_io: self.file_io.clone(),
115 delete_file_loader: CachingDeleteFileLoader::new(
116 self.file_io.clone(),
117 self.concurrency_limit_data_files,
118 ),
119 concurrency_limit_data_files: self.concurrency_limit_data_files,
120 row_group_filtering_enabled: self.row_group_filtering_enabled,
121 row_selection_enabled: self.row_selection_enabled,
122 }
123 }
124}
125
126#[derive(Clone)]
128pub struct ArrowReader {
129 batch_size: Option<usize>,
130 file_io: FileIO,
131 delete_file_loader: CachingDeleteFileLoader,
132
133 concurrency_limit_data_files: usize,
135
136 row_group_filtering_enabled: bool,
137 row_selection_enabled: bool,
138}
139
140impl ArrowReader {
141 pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
144 let file_io = self.file_io.clone();
145 let batch_size = self.batch_size;
146 let concurrency_limit_data_files = self.concurrency_limit_data_files;
147 let row_group_filtering_enabled = self.row_group_filtering_enabled;
148 let row_selection_enabled = self.row_selection_enabled;
149
150 let stream = tasks
151 .map_ok(move |task| {
152 let file_io = file_io.clone();
153
154 Self::process_file_scan_task(
155 task,
156 batch_size,
157 file_io,
158 self.delete_file_loader.clone(),
159 row_group_filtering_enabled,
160 row_selection_enabled,
161 )
162 })
163 .map_err(|err| {
164 Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err)
165 })
166 .try_buffer_unordered(concurrency_limit_data_files)
167 .try_flatten_unordered(concurrency_limit_data_files);
168
169 Ok(Box::pin(stream) as ArrowRecordBatchStream)
170 }
171
172 #[allow(clippy::too_many_arguments)]
173 async fn process_file_scan_task(
174 task: FileScanTask,
175 batch_size: Option<usize>,
176 file_io: FileIO,
177 delete_file_loader: CachingDeleteFileLoader,
178 row_group_filtering_enabled: bool,
179 row_selection_enabled: bool,
180 ) -> Result<ArrowRecordBatchStream> {
181 let should_load_page_index =
182 (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
183
184 let delete_filter_rx =
185 delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema));
186
187 let initial_stream_builder = Self::create_parquet_record_batch_stream_builder(
190 &task.data_file_path,
191 file_io.clone(),
192 should_load_page_index,
193 None,
194 )
195 .await?;
196
197 let missing_field_ids = initial_stream_builder
201 .schema()
202 .fields()
203 .iter()
204 .next()
205 .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
206
207 let mut record_batch_stream_builder = if missing_field_ids {
223 let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
225 apply_name_mapping_to_arrow_schema(
230 Arc::clone(initial_stream_builder.schema()),
231 name_mapping,
232 )?
233 } else {
234 add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema())
237 };
238
239 let options = ArrowReaderOptions::new().with_schema(arrow_schema);
240
241 Self::create_parquet_record_batch_stream_builder(
242 &task.data_file_path,
243 file_io.clone(),
244 should_load_page_index,
245 Some(options),
246 )
247 .await?
248 } else {
249 initial_stream_builder
251 };
252
253 let projection_mask = Self::get_arrow_projection_mask(
258 &task.project_field_ids,
259 &task.schema,
260 record_batch_stream_builder.parquet_schema(),
261 record_batch_stream_builder.schema(),
262 missing_field_ids, )?;
264
265 record_batch_stream_builder =
266 record_batch_stream_builder.with_projection(projection_mask.clone());
267
268 let mut record_batch_transformer_builder =
272 RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
273
274 if let (Some(partition_spec), Some(partition_data)) =
275 (task.partition_spec.clone(), task.partition.clone())
276 {
277 record_batch_transformer_builder =
278 record_batch_transformer_builder.with_partition(partition_spec, partition_data);
279 }
280
281 let mut record_batch_transformer = record_batch_transformer_builder.build();
282
283 if let Some(batch_size) = batch_size {
284 record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
285 }
286
287 let delete_filter = delete_filter_rx.await.unwrap()?;
288 let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
289
290 let final_predicate = match (&task.predicate, delete_predicate) {
295 (None, None) => None,
296 (Some(predicate), None) => Some(predicate.clone()),
297 (None, Some(ref predicate)) => Some(predicate.clone()),
298 (Some(filter_predicate), Some(delete_predicate)) => {
299 Some(filter_predicate.clone().and(delete_predicate))
300 }
301 };
302
303 let mut selected_row_group_indices = None;
319 let mut row_selection = None;
320
321 if task.start != 0 || task.length != 0 {
324 let byte_range_filtered_row_groups = Self::filter_row_groups_by_byte_range(
325 record_batch_stream_builder.metadata(),
326 task.start,
327 task.length,
328 )?;
329 selected_row_group_indices = Some(byte_range_filtered_row_groups);
330 }
331
332 if let Some(predicate) = final_predicate {
333 let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
334 record_batch_stream_builder.parquet_schema(),
335 &predicate,
336 )?;
337
338 let row_filter = Self::get_row_filter(
339 &predicate,
340 record_batch_stream_builder.parquet_schema(),
341 &iceberg_field_ids,
342 &field_id_map,
343 )?;
344 record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
345
346 if row_group_filtering_enabled {
347 let predicate_filtered_row_groups = Self::get_selected_row_group_indices(
348 &predicate,
349 record_batch_stream_builder.metadata(),
350 &field_id_map,
351 &task.schema,
352 )?;
353
354 selected_row_group_indices = match selected_row_group_indices {
357 Some(byte_range_filtered) => {
358 let intersection: Vec<usize> = byte_range_filtered
360 .into_iter()
361 .filter(|idx| predicate_filtered_row_groups.contains(idx))
362 .collect();
363 Some(intersection)
364 }
365 None => Some(predicate_filtered_row_groups),
366 };
367 }
368
369 if row_selection_enabled {
370 row_selection = Some(Self::get_row_selection_for_filter_predicate(
371 &predicate,
372 record_batch_stream_builder.metadata(),
373 &selected_row_group_indices,
374 &field_id_map,
375 &task.schema,
376 )?);
377 }
378 }
379
380 let positional_delete_indexes = delete_filter.get_delete_vector(&task);
381
382 if let Some(positional_delete_indexes) = positional_delete_indexes {
383 let delete_row_selection = {
384 let positional_delete_indexes = positional_delete_indexes.lock().unwrap();
385
386 Self::build_deletes_row_selection(
387 record_batch_stream_builder.metadata().row_groups(),
388 &selected_row_group_indices,
389 &positional_delete_indexes,
390 )
391 }?;
392
393 row_selection = match row_selection {
396 None => Some(delete_row_selection),
397 Some(filter_row_selection) => {
398 Some(filter_row_selection.intersection(&delete_row_selection))
399 }
400 };
401 }
402
403 if let Some(row_selection) = row_selection {
404 record_batch_stream_builder =
405 record_batch_stream_builder.with_row_selection(row_selection);
406 }
407
408 if let Some(selected_row_group_indices) = selected_row_group_indices {
409 record_batch_stream_builder =
410 record_batch_stream_builder.with_row_groups(selected_row_group_indices);
411 }
412
413 let record_batch_stream =
416 record_batch_stream_builder
417 .build()?
418 .map(move |batch| match batch {
419 Ok(batch) => record_batch_transformer.process_record_batch(batch),
420 Err(err) => Err(err.into()),
421 });
422
423 Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
424 }
425
426 pub(crate) async fn create_parquet_record_batch_stream_builder(
427 data_file_path: &str,
428 file_io: FileIO,
429 should_load_page_index: bool,
430 arrow_reader_options: Option<ArrowReaderOptions>,
431 ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader<impl FileRead + Sized>>> {
432 let parquet_file = file_io.new_input(data_file_path)?;
435 let (parquet_metadata, parquet_reader) =
436 try_join!(parquet_file.metadata(), parquet_file.reader())?;
437 let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader)
438 .with_preload_column_index(true)
439 .with_preload_offset_index(true)
440 .with_preload_page_index(should_load_page_index);
441
442 let options = arrow_reader_options.unwrap_or_default();
444 let record_batch_stream_builder =
445 ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, options).await?;
446 Ok(record_batch_stream_builder)
447 }
448
449 fn build_deletes_row_selection(
455 row_group_metadata_list: &[RowGroupMetaData],
456 selected_row_groups: &Option<Vec<usize>>,
457 positional_deletes: &DeleteVector,
458 ) -> Result<RowSelection> {
459 let mut results: Vec<RowSelector> = Vec::new();
460 let mut selected_row_groups_idx = 0;
461 let mut current_row_group_base_idx: u64 = 0;
462 let mut delete_vector_iter = positional_deletes.iter();
463 let mut next_deleted_row_idx_opt = delete_vector_iter.next();
464
465 for (idx, row_group_metadata) in row_group_metadata_list.iter().enumerate() {
466 let row_group_num_rows = row_group_metadata.num_rows() as u64;
467 let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;
468
469 if let Some(selected_row_groups) = selected_row_groups {
471 if selected_row_groups_idx == selected_row_groups.len() {
473 break;
474 }
475
476 if idx == selected_row_groups[selected_row_groups_idx] {
477 selected_row_groups_idx += 1;
481 } else {
482 delete_vector_iter.advance_to(next_row_group_base_idx);
487 if let Some(cached_idx) = next_deleted_row_idx_opt {
489 if cached_idx < next_row_group_base_idx {
490 next_deleted_row_idx_opt = delete_vector_iter.next();
491 }
492 }
493
494 current_row_group_base_idx += row_group_num_rows;
497 continue;
498 }
499 }
500
501 let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
502 Some(next_deleted_row_idx) => {
503 if next_deleted_row_idx >= next_row_group_base_idx {
506 results.push(RowSelector::select(row_group_num_rows as usize));
507 current_row_group_base_idx += row_group_num_rows;
508 continue;
509 }
510
511 next_deleted_row_idx
512 }
513
514 _ => {
516 results.push(RowSelector::select(row_group_num_rows as usize));
517 current_row_group_base_idx += row_group_num_rows;
518 continue;
519 }
520 };
521
522 let mut current_idx = current_row_group_base_idx;
523 'chunks: while next_deleted_row_idx < next_row_group_base_idx {
524 if current_idx < next_deleted_row_idx {
526 let run_length = next_deleted_row_idx - current_idx;
527 results.push(RowSelector::select(run_length as usize));
528 current_idx += run_length;
529 }
530
531 let mut run_length = 0;
533 while next_deleted_row_idx == current_idx
534 && next_deleted_row_idx < next_row_group_base_idx
535 {
536 run_length += 1;
537 current_idx += 1;
538
539 next_deleted_row_idx_opt = delete_vector_iter.next();
540 next_deleted_row_idx = match next_deleted_row_idx_opt {
541 Some(next_deleted_row_idx) => next_deleted_row_idx,
542 _ => {
543 results.push(RowSelector::skip(run_length));
547 break 'chunks;
548 }
549 };
550 }
551 if run_length > 0 {
552 results.push(RowSelector::skip(run_length));
553 }
554 }
555
556 if current_idx < next_row_group_base_idx {
557 results.push(RowSelector::select(
558 (next_row_group_base_idx - current_idx) as usize,
559 ));
560 }
561
562 current_row_group_base_idx += row_group_num_rows;
563 }
564
565 Ok(results.into())
566 }
567
568 fn build_field_id_set_and_map(
569 parquet_schema: &SchemaDescriptor,
570 predicate: &BoundPredicate,
571 ) -> Result<(HashSet<i32>, HashMap<i32, usize>)> {
572 let mut collector = CollectFieldIdVisitor {
574 field_ids: HashSet::default(),
575 };
576 visit(&mut collector, predicate)?;
577
578 let iceberg_field_ids = collector.field_ids();
579
580 let field_id_map = match build_field_id_map(parquet_schema)? {
582 Some(map) => map,
583 None => build_fallback_field_id_map(parquet_schema),
584 };
585
586 Ok((iceberg_field_ids, field_id_map))
587 }
588
589 fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
592 match field.field_type.as_ref() {
593 Type::Primitive(_) => {
594 field_ids.push(field.id);
595 }
596 Type::Struct(struct_type) => {
597 for nested_field in struct_type.fields() {
598 Self::include_leaf_field_id(nested_field, field_ids);
599 }
600 }
601 Type::List(list_type) => {
602 Self::include_leaf_field_id(&list_type.element_field, field_ids);
603 }
604 Type::Map(map_type) => {
605 Self::include_leaf_field_id(&map_type.key_field, field_ids);
606 Self::include_leaf_field_id(&map_type.value_field, field_ids);
607 }
608 }
609 }
610
611 fn get_arrow_projection_mask(
612 field_ids: &[i32],
613 iceberg_schema_of_task: &Schema,
614 parquet_schema: &SchemaDescriptor,
615 arrow_schema: &ArrowSchemaRef,
616 use_fallback: bool, ) -> Result<ProjectionMask> {
618 fn type_promotion_is_valid(
619 file_type: Option<&PrimitiveType>,
620 projected_type: Option<&PrimitiveType>,
621 ) -> bool {
622 match (file_type, projected_type) {
623 (Some(lhs), Some(rhs)) if lhs == rhs => true,
624 (Some(PrimitiveType::Int), Some(PrimitiveType::Long)) => true,
625 (Some(PrimitiveType::Float), Some(PrimitiveType::Double)) => true,
626 (
627 Some(PrimitiveType::Decimal {
628 precision: file_precision,
629 scale: file_scale,
630 }),
631 Some(PrimitiveType::Decimal {
632 precision: requested_precision,
633 scale: requested_scale,
634 }),
635 ) if requested_precision >= file_precision && file_scale == requested_scale => true,
636 (Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true,
638 _ => false,
639 }
640 }
641
642 if field_ids.is_empty() {
643 return Ok(ProjectionMask::all());
644 }
645
646 if use_fallback {
647 Self::get_arrow_projection_mask_fallback(field_ids, parquet_schema)
649 } else {
650 let mut leaf_field_ids = vec![];
654 for field_id in field_ids {
655 let field = iceberg_schema_of_task.field_by_id(*field_id);
656 if let Some(field) = field {
657 Self::include_leaf_field_id(field, &mut leaf_field_ids);
658 }
659 }
660
661 Self::get_arrow_projection_mask_with_field_ids(
662 &leaf_field_ids,
663 iceberg_schema_of_task,
664 parquet_schema,
665 arrow_schema,
666 type_promotion_is_valid,
667 )
668 }
669 }
670
671 fn get_arrow_projection_mask_with_field_ids(
674 leaf_field_ids: &[i32],
675 iceberg_schema_of_task: &Schema,
676 parquet_schema: &SchemaDescriptor,
677 arrow_schema: &ArrowSchemaRef,
678 type_promotion_is_valid: fn(Option<&PrimitiveType>, Option<&PrimitiveType>) -> bool,
679 ) -> Result<ProjectionMask> {
680 let mut column_map = HashMap::new();
681 let fields = arrow_schema.fields();
682
683 let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
686 let projected_arrow_schema = ArrowSchema::new_with_metadata(
687 fields.filter_leaves(|_, f| {
688 f.metadata()
689 .get(PARQUET_FIELD_ID_META_KEY)
690 .and_then(|field_id| i32::from_str(field_id).ok())
691 .is_some_and(|field_id| {
692 projected_fields.insert((*f).clone(), field_id);
693 leaf_field_ids.contains(&field_id)
694 })
695 }),
696 arrow_schema.metadata().clone(),
697 );
698 let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
699
700 fields.filter_leaves(|idx, field| {
701 let Some(field_id) = projected_fields.get(field).cloned() else {
702 return false;
703 };
704
705 let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
706 let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
707
708 if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
709 return false;
710 }
711
712 if !type_promotion_is_valid(
713 parquet_iceberg_field
714 .unwrap()
715 .field_type
716 .as_primitive_type(),
717 iceberg_field.unwrap().field_type.as_primitive_type(),
718 ) {
719 return false;
720 }
721
722 column_map.insert(field_id, idx);
723 true
724 });
725
726 let mut indices = vec![];
729 for field_id in leaf_field_ids {
730 if let Some(col_idx) = column_map.get(field_id) {
731 indices.push(*col_idx);
732 }
733 }
734
735 if indices.is_empty() {
736 Ok(ProjectionMask::all())
739 } else {
740 Ok(ProjectionMask::leaves(parquet_schema, indices))
741 }
742 }
743
744 fn get_arrow_projection_mask_fallback(
748 field_ids: &[i32],
749 parquet_schema: &SchemaDescriptor,
750 ) -> Result<ProjectionMask> {
751 let parquet_root_fields = parquet_schema.root_schema().get_fields();
753 let mut root_indices = vec![];
754
755 for field_id in field_ids.iter() {
756 let parquet_pos = (*field_id - 1) as usize;
757
758 if parquet_pos < parquet_root_fields.len() {
759 root_indices.push(parquet_pos);
760 }
761 }
763
764 if root_indices.is_empty() {
765 Ok(ProjectionMask::all())
766 } else {
767 Ok(ProjectionMask::roots(parquet_schema, root_indices))
768 }
769 }
770
771 fn get_row_filter(
772 predicates: &BoundPredicate,
773 parquet_schema: &SchemaDescriptor,
774 iceberg_field_ids: &HashSet<i32>,
775 field_id_map: &HashMap<i32, usize>,
776 ) -> Result<RowFilter> {
777 let mut column_indices = iceberg_field_ids
780 .iter()
781 .filter_map(|field_id| field_id_map.get(field_id).cloned())
782 .collect::<Vec<_>>();
783 column_indices.sort();
784
785 let mut converter = PredicateConverter {
787 parquet_schema,
788 column_map: field_id_map,
789 column_indices: &column_indices,
790 };
791
792 let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
795 let predicate_func = visit(&mut converter, predicates)?;
796 let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
797 Ok(RowFilter::new(vec![Box::new(arrow_predicate)]))
798 }
799
800 fn get_selected_row_group_indices(
801 predicate: &BoundPredicate,
802 parquet_metadata: &Arc<ParquetMetaData>,
803 field_id_map: &HashMap<i32, usize>,
804 snapshot_schema: &Schema,
805 ) -> Result<Vec<usize>> {
806 let row_groups_metadata = parquet_metadata.row_groups();
807 let mut results = Vec::with_capacity(row_groups_metadata.len());
808
809 for (idx, row_group_metadata) in row_groups_metadata.iter().enumerate() {
810 if RowGroupMetricsEvaluator::eval(
811 predicate,
812 row_group_metadata,
813 field_id_map,
814 snapshot_schema,
815 )? {
816 results.push(idx);
817 }
818 }
819
820 Ok(results)
821 }
822
823 fn get_row_selection_for_filter_predicate(
824 predicate: &BoundPredicate,
825 parquet_metadata: &Arc<ParquetMetaData>,
826 selected_row_groups: &Option<Vec<usize>>,
827 field_id_map: &HashMap<i32, usize>,
828 snapshot_schema: &Schema,
829 ) -> Result<RowSelection> {
830 let Some(column_index) = parquet_metadata.column_index() else {
831 return Err(Error::new(
832 ErrorKind::Unexpected,
833 "Parquet file metadata does not contain a column index",
834 ));
835 };
836
837 let Some(offset_index) = parquet_metadata.offset_index() else {
838 return Err(Error::new(
839 ErrorKind::Unexpected,
840 "Parquet file metadata does not contain an offset index",
841 ));
842 };
843
844 if let Some(selected_row_groups) = selected_row_groups {
846 if selected_row_groups.is_empty() {
847 return Ok(RowSelection::from(Vec::new()));
848 }
849 }
850
851 let mut selected_row_groups_idx = 0;
852
853 let page_index = column_index
854 .iter()
855 .enumerate()
856 .zip(offset_index)
857 .zip(parquet_metadata.row_groups());
858
859 let mut results = Vec::new();
860 for (((idx, column_index), offset_index), row_group_metadata) in page_index {
861 if let Some(selected_row_groups) = selected_row_groups {
862 if idx == selected_row_groups[selected_row_groups_idx] {
864 selected_row_groups_idx += 1;
865 } else {
866 continue;
867 }
868 }
869
870 let selections_for_page = PageIndexEvaluator::eval(
871 predicate,
872 column_index,
873 offset_index,
874 row_group_metadata,
875 field_id_map,
876 snapshot_schema,
877 )?;
878
879 results.push(selections_for_page);
880
881 if let Some(selected_row_groups) = selected_row_groups {
882 if selected_row_groups_idx == selected_row_groups.len() {
883 break;
884 }
885 }
886 }
887
888 Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
889 }
890
891 fn filter_row_groups_by_byte_range(
896 parquet_metadata: &Arc<ParquetMetaData>,
897 start: u64,
898 length: u64,
899 ) -> Result<Vec<usize>> {
900 let row_groups = parquet_metadata.row_groups();
901 let mut selected = Vec::new();
902 let end = start + length;
903
904 let mut current_byte_offset = 4u64;
906
907 for (idx, row_group) in row_groups.iter().enumerate() {
908 let row_group_size = row_group.compressed_size() as u64;
909 let row_group_end = current_byte_offset + row_group_size;
910
911 if current_byte_offset < end && start < row_group_end {
912 selected.push(idx);
913 }
914
915 current_byte_offset = row_group_end;
916 }
917
918 Ok(selected)
919 }
920}
921
922fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<Option<HashMap<i32, usize>>> {
925 let mut column_map = HashMap::new();
926
927 for (idx, field) in parquet_schema.columns().iter().enumerate() {
928 let field_type = field.self_type();
929 match field_type {
930 ParquetType::PrimitiveType { basic_info, .. } => {
931 if !basic_info.has_id() {
932 return Ok(None);
933 }
934 column_map.insert(basic_info.id(), idx);
935 }
936 ParquetType::GroupType { .. } => {
937 return Err(Error::new(
938 ErrorKind::DataInvalid,
939 format!(
940 "Leave column in schema should be primitive type but got {field_type:?}"
941 ),
942 ));
943 }
944 };
945 }
946
947 Ok(Some(column_map))
948}
949
950fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap<i32, usize> {
953 let mut column_map = HashMap::new();
954
955 for (idx, _field) in parquet_schema.columns().iter().enumerate() {
957 let field_id = (idx + 1) as i32;
958 column_map.insert(field_id, idx);
959 }
960
961 column_map
962}
963
964fn apply_name_mapping_to_arrow_schema(
983 arrow_schema: ArrowSchemaRef,
984 name_mapping: &NameMapping,
985) -> Result<Arc<ArrowSchema>> {
986 debug_assert!(
987 arrow_schema
988 .fields()
989 .iter()
990 .next()
991 .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
992 "Schema already has field IDs - name mapping should not be applied"
993 );
994
995 use arrow_schema::Field;
996
997 let fields_with_mapped_ids: Vec<_> = arrow_schema
998 .fields()
999 .iter()
1000 .map(|field| {
1001 let mapped_field_opt = name_mapping
1009 .fields()
1010 .iter()
1011 .find(|f| f.names().contains(&field.name().to_string()));
1012
1013 let mut metadata = field.metadata().clone();
1014
1015 if let Some(mapped_field) = mapped_field_opt {
1016 if let Some(field_id) = mapped_field.field_id() {
1017 metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1019 }
1020 }
1022 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1025 .with_metadata(metadata)
1026 })
1027 .collect();
1028
1029 Ok(Arc::new(ArrowSchema::new_with_metadata(
1030 fields_with_mapped_ids,
1031 arrow_schema.metadata().clone(),
1032 )))
1033}
1034
1035fn add_fallback_field_ids_to_arrow_schema(arrow_schema: &ArrowSchemaRef) -> Arc<ArrowSchema> {
1042 debug_assert!(
1043 arrow_schema
1044 .fields()
1045 .iter()
1046 .next()
1047 .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
1048 "Schema already has field IDs"
1049 );
1050
1051 use arrow_schema::Field;
1052
1053 let fields_with_fallback_ids: Vec<_> = arrow_schema
1054 .fields()
1055 .iter()
1056 .enumerate()
1057 .map(|(pos, field)| {
1058 let mut metadata = field.metadata().clone();
1059 let field_id = (pos + 1) as i32; metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1061
1062 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1063 .with_metadata(metadata)
1064 })
1065 .collect();
1066
1067 Arc::new(ArrowSchema::new_with_metadata(
1068 fields_with_fallback_ids,
1069 arrow_schema.metadata().clone(),
1070 ))
1071}
1072
1073struct CollectFieldIdVisitor {
1075 field_ids: HashSet<i32>,
1076}
1077
1078impl CollectFieldIdVisitor {
1079 fn field_ids(self) -> HashSet<i32> {
1080 self.field_ids
1081 }
1082}
1083
1084impl BoundPredicateVisitor for CollectFieldIdVisitor {
1085 type T = ();
1086
1087 fn always_true(&mut self) -> Result<()> {
1088 Ok(())
1089 }
1090
1091 fn always_false(&mut self) -> Result<()> {
1092 Ok(())
1093 }
1094
1095 fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1096 Ok(())
1097 }
1098
1099 fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> {
1100 Ok(())
1101 }
1102
1103 fn not(&mut self, _inner: ()) -> Result<()> {
1104 Ok(())
1105 }
1106
1107 fn is_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1108 self.field_ids.insert(reference.field().id);
1109 Ok(())
1110 }
1111
1112 fn not_null(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1113 self.field_ids.insert(reference.field().id);
1114 Ok(())
1115 }
1116
1117 fn is_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1118 self.field_ids.insert(reference.field().id);
1119 Ok(())
1120 }
1121
1122 fn not_nan(&mut self, reference: &BoundReference, _predicate: &BoundPredicate) -> Result<()> {
1123 self.field_ids.insert(reference.field().id);
1124 Ok(())
1125 }
1126
1127 fn less_than(
1128 &mut self,
1129 reference: &BoundReference,
1130 _literal: &Datum,
1131 _predicate: &BoundPredicate,
1132 ) -> Result<()> {
1133 self.field_ids.insert(reference.field().id);
1134 Ok(())
1135 }
1136
1137 fn less_than_or_eq(
1138 &mut self,
1139 reference: &BoundReference,
1140 _literal: &Datum,
1141 _predicate: &BoundPredicate,
1142 ) -> Result<()> {
1143 self.field_ids.insert(reference.field().id);
1144 Ok(())
1145 }
1146
1147 fn greater_than(
1148 &mut self,
1149 reference: &BoundReference,
1150 _literal: &Datum,
1151 _predicate: &BoundPredicate,
1152 ) -> Result<()> {
1153 self.field_ids.insert(reference.field().id);
1154 Ok(())
1155 }
1156
1157 fn greater_than_or_eq(
1158 &mut self,
1159 reference: &BoundReference,
1160 _literal: &Datum,
1161 _predicate: &BoundPredicate,
1162 ) -> Result<()> {
1163 self.field_ids.insert(reference.field().id);
1164 Ok(())
1165 }
1166
1167 fn eq(
1168 &mut self,
1169 reference: &BoundReference,
1170 _literal: &Datum,
1171 _predicate: &BoundPredicate,
1172 ) -> Result<()> {
1173 self.field_ids.insert(reference.field().id);
1174 Ok(())
1175 }
1176
1177 fn not_eq(
1178 &mut self,
1179 reference: &BoundReference,
1180 _literal: &Datum,
1181 _predicate: &BoundPredicate,
1182 ) -> Result<()> {
1183 self.field_ids.insert(reference.field().id);
1184 Ok(())
1185 }
1186
1187 fn starts_with(
1188 &mut self,
1189 reference: &BoundReference,
1190 _literal: &Datum,
1191 _predicate: &BoundPredicate,
1192 ) -> Result<()> {
1193 self.field_ids.insert(reference.field().id);
1194 Ok(())
1195 }
1196
1197 fn not_starts_with(
1198 &mut self,
1199 reference: &BoundReference,
1200 _literal: &Datum,
1201 _predicate: &BoundPredicate,
1202 ) -> Result<()> {
1203 self.field_ids.insert(reference.field().id);
1204 Ok(())
1205 }
1206
1207 fn r#in(
1208 &mut self,
1209 reference: &BoundReference,
1210 _literals: &FnvHashSet<Datum>,
1211 _predicate: &BoundPredicate,
1212 ) -> Result<()> {
1213 self.field_ids.insert(reference.field().id);
1214 Ok(())
1215 }
1216
1217 fn not_in(
1218 &mut self,
1219 reference: &BoundReference,
1220 _literals: &FnvHashSet<Datum>,
1221 _predicate: &BoundPredicate,
1222 ) -> Result<()> {
1223 self.field_ids.insert(reference.field().id);
1224 Ok(())
1225 }
1226}
1227
1228struct PredicateConverter<'a> {
1230 pub parquet_schema: &'a SchemaDescriptor,
1232 pub column_map: &'a HashMap<i32, usize>,
1234 pub column_indices: &'a Vec<usize>,
1236}
1237
1238impl PredicateConverter<'_> {
1239 fn bound_reference(&mut self, reference: &BoundReference) -> Result<Option<usize>> {
1244 if let Some(column_idx) = self.column_map.get(&reference.field().id) {
1246 if self.parquet_schema.get_column_root(*column_idx).is_group() {
1247 return Err(Error::new(
1248 ErrorKind::DataInvalid,
1249 format!(
1250 "Leave column `{}` in predicates isn't a root column in Parquet schema.",
1251 reference.field().name
1252 ),
1253 ));
1254 }
1255
1256 let index = self
1258 .column_indices
1259 .iter()
1260 .position(|&idx| idx == *column_idx)
1261 .ok_or(Error::new(
1262 ErrorKind::DataInvalid,
1263 format!(
1264 "Leave column `{}` in predicates cannot be found in the required column indices.",
1265 reference.field().name
1266 ),
1267 ))?;
1268
1269 Ok(Some(index))
1270 } else {
1271 Ok(None)
1272 }
1273 }
1274
1275 fn build_always_true(&self) -> Result<Box<PredicateResult>> {
1277 Ok(Box::new(|batch| {
1278 Ok(BooleanArray::from(vec![true; batch.num_rows()]))
1279 }))
1280 }
1281
1282 fn build_always_false(&self) -> Result<Box<PredicateResult>> {
1284 Ok(Box::new(|batch| {
1285 Ok(BooleanArray::from(vec![false; batch.num_rows()]))
1286 }))
1287 }
1288}
1289
1290fn project_column(
1293 batch: &RecordBatch,
1294 column_idx: usize,
1295) -> std::result::Result<ArrayRef, ArrowError> {
1296 let column = batch.column(column_idx);
1297
1298 match column.data_type() {
1299 DataType::Struct(_) => Err(ArrowError::SchemaError(
1300 "Does not support struct column yet.".to_string(),
1301 )),
1302 _ => Ok(column.clone()),
1303 }
1304}
1305
1306type PredicateResult =
1307 dyn FnMut(RecordBatch) -> std::result::Result<BooleanArray, ArrowError> + Send + 'static;
1308
1309impl BoundPredicateVisitor for PredicateConverter<'_> {
1310 type T = Box<PredicateResult>;
1311
1312 fn always_true(&mut self) -> Result<Box<PredicateResult>> {
1313 self.build_always_true()
1314 }
1315
1316 fn always_false(&mut self) -> Result<Box<PredicateResult>> {
1317 self.build_always_false()
1318 }
1319
1320 fn and(
1321 &mut self,
1322 mut lhs: Box<PredicateResult>,
1323 mut rhs: Box<PredicateResult>,
1324 ) -> Result<Box<PredicateResult>> {
1325 Ok(Box::new(move |batch| {
1326 let left = lhs(batch.clone())?;
1327 let right = rhs(batch)?;
1328 and_kleene(&left, &right)
1329 }))
1330 }
1331
1332 fn or(
1333 &mut self,
1334 mut lhs: Box<PredicateResult>,
1335 mut rhs: Box<PredicateResult>,
1336 ) -> Result<Box<PredicateResult>> {
1337 Ok(Box::new(move |batch| {
1338 let left = lhs(batch.clone())?;
1339 let right = rhs(batch)?;
1340 or_kleene(&left, &right)
1341 }))
1342 }
1343
1344 fn not(&mut self, mut inner: Box<PredicateResult>) -> Result<Box<PredicateResult>> {
1345 Ok(Box::new(move |batch| {
1346 let pred_ret = inner(batch)?;
1347 not(&pred_ret)
1348 }))
1349 }
1350
1351 fn is_null(
1352 &mut self,
1353 reference: &BoundReference,
1354 _predicate: &BoundPredicate,
1355 ) -> Result<Box<PredicateResult>> {
1356 if let Some(idx) = self.bound_reference(reference)? {
1357 Ok(Box::new(move |batch| {
1358 let column = project_column(&batch, idx)?;
1359 is_null(&column)
1360 }))
1361 } else {
1362 self.build_always_true()
1364 }
1365 }
1366
1367 fn not_null(
1368 &mut self,
1369 reference: &BoundReference,
1370 _predicate: &BoundPredicate,
1371 ) -> Result<Box<PredicateResult>> {
1372 if let Some(idx) = self.bound_reference(reference)? {
1373 Ok(Box::new(move |batch| {
1374 let column = project_column(&batch, idx)?;
1375 is_not_null(&column)
1376 }))
1377 } else {
1378 self.build_always_false()
1380 }
1381 }
1382
1383 fn is_nan(
1384 &mut self,
1385 reference: &BoundReference,
1386 _predicate: &BoundPredicate,
1387 ) -> Result<Box<PredicateResult>> {
1388 if self.bound_reference(reference)?.is_some() {
1389 self.build_always_true()
1390 } else {
1391 self.build_always_false()
1393 }
1394 }
1395
1396 fn not_nan(
1397 &mut self,
1398 reference: &BoundReference,
1399 _predicate: &BoundPredicate,
1400 ) -> Result<Box<PredicateResult>> {
1401 if self.bound_reference(reference)?.is_some() {
1402 self.build_always_false()
1403 } else {
1404 self.build_always_true()
1406 }
1407 }
1408
1409 fn less_than(
1410 &mut self,
1411 reference: &BoundReference,
1412 literal: &Datum,
1413 _predicate: &BoundPredicate,
1414 ) -> Result<Box<PredicateResult>> {
1415 if let Some(idx) = self.bound_reference(reference)? {
1416 let literal = get_arrow_datum(literal)?;
1417
1418 Ok(Box::new(move |batch| {
1419 let left = project_column(&batch, idx)?;
1420 let literal = try_cast_literal(&literal, left.data_type())?;
1421 lt(&left, literal.as_ref())
1422 }))
1423 } else {
1424 self.build_always_true()
1426 }
1427 }
1428
1429 fn less_than_or_eq(
1430 &mut self,
1431 reference: &BoundReference,
1432 literal: &Datum,
1433 _predicate: &BoundPredicate,
1434 ) -> Result<Box<PredicateResult>> {
1435 if let Some(idx) = self.bound_reference(reference)? {
1436 let literal = get_arrow_datum(literal)?;
1437
1438 Ok(Box::new(move |batch| {
1439 let left = project_column(&batch, idx)?;
1440 let literal = try_cast_literal(&literal, left.data_type())?;
1441 lt_eq(&left, literal.as_ref())
1442 }))
1443 } else {
1444 self.build_always_true()
1446 }
1447 }
1448
1449 fn greater_than(
1450 &mut self,
1451 reference: &BoundReference,
1452 literal: &Datum,
1453 _predicate: &BoundPredicate,
1454 ) -> Result<Box<PredicateResult>> {
1455 if let Some(idx) = self.bound_reference(reference)? {
1456 let literal = get_arrow_datum(literal)?;
1457
1458 Ok(Box::new(move |batch| {
1459 let left = project_column(&batch, idx)?;
1460 let literal = try_cast_literal(&literal, left.data_type())?;
1461 gt(&left, literal.as_ref())
1462 }))
1463 } else {
1464 self.build_always_false()
1466 }
1467 }
1468
1469 fn greater_than_or_eq(
1470 &mut self,
1471 reference: &BoundReference,
1472 literal: &Datum,
1473 _predicate: &BoundPredicate,
1474 ) -> Result<Box<PredicateResult>> {
1475 if let Some(idx) = self.bound_reference(reference)? {
1476 let literal = get_arrow_datum(literal)?;
1477
1478 Ok(Box::new(move |batch| {
1479 let left = project_column(&batch, idx)?;
1480 let literal = try_cast_literal(&literal, left.data_type())?;
1481 gt_eq(&left, literal.as_ref())
1482 }))
1483 } else {
1484 self.build_always_false()
1486 }
1487 }
1488
1489 fn eq(
1490 &mut self,
1491 reference: &BoundReference,
1492 literal: &Datum,
1493 _predicate: &BoundPredicate,
1494 ) -> Result<Box<PredicateResult>> {
1495 if let Some(idx) = self.bound_reference(reference)? {
1496 let literal = get_arrow_datum(literal)?;
1497
1498 Ok(Box::new(move |batch| {
1499 let left = project_column(&batch, idx)?;
1500 let literal = try_cast_literal(&literal, left.data_type())?;
1501 eq(&left, literal.as_ref())
1502 }))
1503 } else {
1504 self.build_always_false()
1506 }
1507 }
1508
1509 fn not_eq(
1510 &mut self,
1511 reference: &BoundReference,
1512 literal: &Datum,
1513 _predicate: &BoundPredicate,
1514 ) -> Result<Box<PredicateResult>> {
1515 if let Some(idx) = self.bound_reference(reference)? {
1516 let literal = get_arrow_datum(literal)?;
1517
1518 Ok(Box::new(move |batch| {
1519 let left = project_column(&batch, idx)?;
1520 let literal = try_cast_literal(&literal, left.data_type())?;
1521 neq(&left, literal.as_ref())
1522 }))
1523 } else {
1524 self.build_always_false()
1526 }
1527 }
1528
1529 fn starts_with(
1530 &mut self,
1531 reference: &BoundReference,
1532 literal: &Datum,
1533 _predicate: &BoundPredicate,
1534 ) -> Result<Box<PredicateResult>> {
1535 if let Some(idx) = self.bound_reference(reference)? {
1536 let literal = get_arrow_datum(literal)?;
1537
1538 Ok(Box::new(move |batch| {
1539 let left = project_column(&batch, idx)?;
1540 let literal = try_cast_literal(&literal, left.data_type())?;
1541 starts_with(&left, literal.as_ref())
1542 }))
1543 } else {
1544 self.build_always_false()
1546 }
1547 }
1548
1549 fn not_starts_with(
1550 &mut self,
1551 reference: &BoundReference,
1552 literal: &Datum,
1553 _predicate: &BoundPredicate,
1554 ) -> Result<Box<PredicateResult>> {
1555 if let Some(idx) = self.bound_reference(reference)? {
1556 let literal = get_arrow_datum(literal)?;
1557
1558 Ok(Box::new(move |batch| {
1559 let left = project_column(&batch, idx)?;
1560 let literal = try_cast_literal(&literal, left.data_type())?;
1561 not(&starts_with(&left, literal.as_ref())?)
1563 }))
1564 } else {
1565 self.build_always_true()
1567 }
1568 }
1569
1570 fn r#in(
1571 &mut self,
1572 reference: &BoundReference,
1573 literals: &FnvHashSet<Datum>,
1574 _predicate: &BoundPredicate,
1575 ) -> Result<Box<PredicateResult>> {
1576 if let Some(idx) = self.bound_reference(reference)? {
1577 let literals: Vec<_> = literals
1578 .iter()
1579 .map(|lit| get_arrow_datum(lit).unwrap())
1580 .collect();
1581
1582 Ok(Box::new(move |batch| {
1583 let left = project_column(&batch, idx)?;
1585
1586 let mut acc = BooleanArray::from(vec![false; batch.num_rows()]);
1587 for literal in &literals {
1588 let literal = try_cast_literal(literal, left.data_type())?;
1589 acc = or(&acc, &eq(&left, literal.as_ref())?)?
1590 }
1591
1592 Ok(acc)
1593 }))
1594 } else {
1595 self.build_always_false()
1597 }
1598 }
1599
1600 fn not_in(
1601 &mut self,
1602 reference: &BoundReference,
1603 literals: &FnvHashSet<Datum>,
1604 _predicate: &BoundPredicate,
1605 ) -> Result<Box<PredicateResult>> {
1606 if let Some(idx) = self.bound_reference(reference)? {
1607 let literals: Vec<_> = literals
1608 .iter()
1609 .map(|lit| get_arrow_datum(lit).unwrap())
1610 .collect();
1611
1612 Ok(Box::new(move |batch| {
1613 let left = project_column(&batch, idx)?;
1615 let mut acc = BooleanArray::from(vec![true; batch.num_rows()]);
1616 for literal in &literals {
1617 let literal = try_cast_literal(literal, left.data_type())?;
1618 acc = and(&acc, &neq(&left, literal.as_ref())?)?
1619 }
1620
1621 Ok(acc)
1622 }))
1623 } else {
1624 self.build_always_true()
1626 }
1627 }
1628}
1629
1630pub struct ArrowFileReader<R: FileRead> {
1632 meta: FileMetadata,
1633 preload_column_index: bool,
1634 preload_offset_index: bool,
1635 preload_page_index: bool,
1636 metadata_size_hint: Option<usize>,
1637 r: R,
1638}
1639
1640impl<R: FileRead> ArrowFileReader<R> {
1641 pub fn new(meta: FileMetadata, r: R) -> Self {
1643 Self {
1644 meta,
1645 preload_column_index: false,
1646 preload_offset_index: false,
1647 preload_page_index: false,
1648 metadata_size_hint: None,
1649 r,
1650 }
1651 }
1652
1653 pub fn with_preload_column_index(mut self, preload: bool) -> Self {
1655 self.preload_column_index = preload;
1656 self
1657 }
1658
1659 pub fn with_preload_offset_index(mut self, preload: bool) -> Self {
1661 self.preload_offset_index = preload;
1662 self
1663 }
1664
1665 pub fn with_preload_page_index(mut self, preload: bool) -> Self {
1667 self.preload_page_index = preload;
1668 self
1669 }
1670
1671 pub fn with_metadata_size_hint(mut self, hint: usize) -> Self {
1676 self.metadata_size_hint = Some(hint);
1677 self
1678 }
1679}
1680
1681impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
1682 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
1683 Box::pin(
1684 self.r
1685 .read(range.start..range.end)
1686 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
1687 )
1688 }
1689
1690 fn get_metadata(
1693 &mut self,
1694 _options: Option<&'_ ArrowReaderOptions>,
1695 ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
1696 async move {
1697 let reader = ParquetMetaDataReader::new()
1698 .with_prefetch_hint(self.metadata_size_hint)
1699 .with_page_index_policy(PageIndexPolicy::from(self.preload_page_index))
1701 .with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
1702 .with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index));
1703 let size = self.meta.size;
1704 let meta = reader.load_and_finish(self, size).await?;
1705
1706 Ok(Arc::new(meta))
1707 }
1708 .boxed()
1709 }
1710}
1711
1712fn try_cast_literal(
1719 literal: &Arc<dyn ArrowDatum + Send + Sync>,
1720 column_type: &DataType,
1721) -> std::result::Result<Arc<dyn ArrowDatum + Send + Sync>, ArrowError> {
1722 let literal_array = literal.get().0;
1723
1724 if literal_array.data_type() == column_type {
1726 return Ok(Arc::clone(literal));
1727 }
1728
1729 let literal_array = cast(literal_array, column_type)?;
1730 Ok(Arc::new(Scalar::new(literal_array)))
1731}
1732
1733#[cfg(test)]
1734mod tests {
1735 use std::collections::{HashMap, HashSet};
1736 use std::fs::File;
1737 use std::sync::Arc;
1738
1739 use arrow_array::cast::AsArray;
1740 use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray};
1741 use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1742 use futures::TryStreamExt;
1743 use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
1744 use parquet::arrow::{ArrowWriter, ProjectionMask};
1745 use parquet::basic::Compression;
1746 use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
1747 use parquet::file::properties::WriterProperties;
1748 use parquet::schema::parser::parse_message_type;
1749 use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
1750 use roaring::RoaringTreemap;
1751 use tempfile::TempDir;
1752
1753 use crate::ErrorKind;
1754 use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
1755 use crate::arrow::{ArrowReader, ArrowReaderBuilder};
1756 use crate::delete_vector::DeleteVector;
1757 use crate::expr::visitors::bound_predicate_visitor::visit;
1758 use crate::expr::{Bind, Predicate, Reference};
1759 use crate::io::FileIO;
1760 use crate::scan::{FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream};
1761 use crate::spec::{
1762 DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type,
1763 };
1764
1765 fn table_schema_simple() -> SchemaRef {
1766 Arc::new(
1767 Schema::builder()
1768 .with_schema_id(1)
1769 .with_identifier_field_ids(vec![2])
1770 .with_fields(vec![
1771 NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1772 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1773 NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1774 NestedField::optional(4, "qux", Type::Primitive(PrimitiveType::Float)).into(),
1775 ])
1776 .build()
1777 .unwrap(),
1778 )
1779 }
1780
1781 #[test]
1782 fn test_collect_field_id() {
1783 let schema = table_schema_simple();
1784 let expr = Reference::new("qux").is_null();
1785 let bound_expr = expr.bind(schema, true).unwrap();
1786
1787 let mut visitor = CollectFieldIdVisitor {
1788 field_ids: HashSet::default(),
1789 };
1790 visit(&mut visitor, &bound_expr).unwrap();
1791
1792 let mut expected = HashSet::default();
1793 expected.insert(4_i32);
1794
1795 assert_eq!(visitor.field_ids, expected);
1796 }
1797
1798 #[test]
1799 fn test_collect_field_id_with_and() {
1800 let schema = table_schema_simple();
1801 let expr = Reference::new("qux")
1802 .is_null()
1803 .and(Reference::new("baz").is_null());
1804 let bound_expr = expr.bind(schema, true).unwrap();
1805
1806 let mut visitor = CollectFieldIdVisitor {
1807 field_ids: HashSet::default(),
1808 };
1809 visit(&mut visitor, &bound_expr).unwrap();
1810
1811 let mut expected = HashSet::default();
1812 expected.insert(4_i32);
1813 expected.insert(3);
1814
1815 assert_eq!(visitor.field_ids, expected);
1816 }
1817
1818 #[test]
1819 fn test_collect_field_id_with_or() {
1820 let schema = table_schema_simple();
1821 let expr = Reference::new("qux")
1822 .is_null()
1823 .or(Reference::new("baz").is_null());
1824 let bound_expr = expr.bind(schema, true).unwrap();
1825
1826 let mut visitor = CollectFieldIdVisitor {
1827 field_ids: HashSet::default(),
1828 };
1829 visit(&mut visitor, &bound_expr).unwrap();
1830
1831 let mut expected = HashSet::default();
1832 expected.insert(4_i32);
1833 expected.insert(3);
1834
1835 assert_eq!(visitor.field_ids, expected);
1836 }
1837
1838 #[test]
1839 fn test_arrow_projection_mask() {
1840 let schema = Arc::new(
1841 Schema::builder()
1842 .with_schema_id(1)
1843 .with_identifier_field_ids(vec![1])
1844 .with_fields(vec![
1845 NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
1846 NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
1847 NestedField::optional(
1848 3,
1849 "c3",
1850 Type::Primitive(PrimitiveType::Decimal {
1851 precision: 38,
1852 scale: 3,
1853 }),
1854 )
1855 .into(),
1856 ])
1857 .build()
1858 .unwrap(),
1859 );
1860 let arrow_schema = Arc::new(ArrowSchema::new(vec![
1861 Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
1862 PARQUET_FIELD_ID_META_KEY.to_string(),
1863 "1".to_string(),
1864 )])),
1865 Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
1867 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
1868 ),
1869 Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
1871 PARQUET_FIELD_ID_META_KEY.to_string(),
1872 "3".to_string(),
1873 )])),
1874 ]));
1875
1876 let message_type = "
1877message schema {
1878 required binary c1 (STRING) = 1;
1879 optional int32 c2 (INTEGER(8,true)) = 2;
1880 optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
1881}
1882 ";
1883 let parquet_type = parse_message_type(message_type).expect("should parse schema");
1884 let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
1885
1886 let err = ArrowReader::get_arrow_projection_mask(
1888 &[1, 2, 3],
1889 &schema,
1890 &parquet_schema,
1891 &arrow_schema,
1892 false,
1893 )
1894 .unwrap_err();
1895
1896 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1897 assert_eq!(
1898 err.to_string(),
1899 "DataInvalid => Unsupported Arrow data type: Duration(Microsecond)".to_string()
1900 );
1901
1902 let err = ArrowReader::get_arrow_projection_mask(
1904 &[1, 3],
1905 &schema,
1906 &parquet_schema,
1907 &arrow_schema,
1908 false,
1909 )
1910 .unwrap_err();
1911
1912 assert_eq!(err.kind(), ErrorKind::DataInvalid);
1913 assert_eq!(
1914 err.to_string(),
1915 "DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
1916 );
1917
1918 let mask = ArrowReader::get_arrow_projection_mask(
1920 &[1],
1921 &schema,
1922 &parquet_schema,
1923 &arrow_schema,
1924 false,
1925 )
1926 .expect("Some ProjectionMask");
1927 assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
1928 }
1929
1930 #[tokio::test]
1931 async fn test_kleene_logic_or_behaviour() {
1932 let predicate = Reference::new("a")
1934 .is_null()
1935 .or(Reference::new("a").equal_to(Datum::string("foo")));
1936
1937 let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
1939
1940 let expected = vec![None, Some("foo".to_string())];
1942
1943 let (file_io, schema, table_location, _temp_dir) =
1944 setup_kleene_logic(data_for_col_a, DataType::Utf8);
1945 let reader = ArrowReaderBuilder::new(file_io).build();
1946
1947 let result_data = test_perform_read(predicate, schema, table_location, reader).await;
1948
1949 assert_eq!(result_data, expected);
1950 }
1951
1952 #[tokio::test]
1953 async fn test_kleene_logic_and_behaviour() {
1954 let predicate = Reference::new("a")
1956 .is_not_null()
1957 .and(Reference::new("a").not_equal_to(Datum::string("foo")));
1958
1959 let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
1961
1962 let expected = vec![Some("bar".to_string())];
1964
1965 let (file_io, schema, table_location, _temp_dir) =
1966 setup_kleene_logic(data_for_col_a, DataType::Utf8);
1967 let reader = ArrowReaderBuilder::new(file_io).build();
1968
1969 let result_data = test_perform_read(predicate, schema, table_location, reader).await;
1970
1971 assert_eq!(result_data, expected);
1972 }
1973
1974 #[tokio::test]
1975 async fn test_predicate_cast_literal() {
1976 let predicates = vec![
1977 (Reference::new("a").equal_to(Datum::string("foo")), vec![
1979 Some("foo".to_string()),
1980 ]),
1981 (
1983 Reference::new("a").not_equal_to(Datum::string("foo")),
1984 vec![Some("bar".to_string())],
1985 ),
1986 (Reference::new("a").starts_with(Datum::string("f")), vec![
1988 Some("foo".to_string()),
1989 ]),
1990 (
1992 Reference::new("a").not_starts_with(Datum::string("f")),
1993 vec![Some("bar".to_string())],
1994 ),
1995 (Reference::new("a").less_than(Datum::string("foo")), vec![
1997 Some("bar".to_string()),
1998 ]),
1999 (
2001 Reference::new("a").less_than_or_equal_to(Datum::string("foo")),
2002 vec![Some("foo".to_string()), Some("bar".to_string())],
2003 ),
2004 (
2006 Reference::new("a").greater_than(Datum::string("bar")),
2007 vec![Some("foo".to_string())],
2008 ),
2009 (
2011 Reference::new("a").greater_than_or_equal_to(Datum::string("foo")),
2012 vec![Some("foo".to_string())],
2013 ),
2014 (
2016 Reference::new("a").is_in([Datum::string("foo"), Datum::string("baz")]),
2017 vec![Some("foo".to_string())],
2018 ),
2019 (
2021 Reference::new("a").is_not_in([Datum::string("foo"), Datum::string("baz")]),
2022 vec![Some("bar".to_string())],
2023 ),
2024 ];
2025
2026 let data_for_col_a = vec![Some("foo".to_string()), Some("bar".to_string())];
2028
2029 let (file_io, schema, table_location, _temp_dir) =
2030 setup_kleene_logic(data_for_col_a, DataType::LargeUtf8);
2031 let reader = ArrowReaderBuilder::new(file_io).build();
2032
2033 for (predicate, expected) in predicates {
2034 println!("testing predicate {predicate}");
2035 let result_data = test_perform_read(
2036 predicate.clone(),
2037 schema.clone(),
2038 table_location.clone(),
2039 reader.clone(),
2040 )
2041 .await;
2042
2043 assert_eq!(result_data, expected, "predicate={predicate}");
2044 }
2045 }
2046
2047 async fn test_perform_read(
2048 predicate: Predicate,
2049 schema: SchemaRef,
2050 table_location: String,
2051 reader: ArrowReader,
2052 ) -> Vec<Option<String>> {
2053 let tasks = Box::pin(futures::stream::iter(
2054 vec![Ok(FileScanTask {
2055 start: 0,
2056 length: 0,
2057 record_count: None,
2058 data_file_path: format!("{table_location}/1.parquet"),
2059 data_file_format: DataFileFormat::Parquet,
2060 schema: schema.clone(),
2061 project_field_ids: vec![1],
2062 predicate: Some(predicate.bind(schema, true).unwrap()),
2063 deletes: vec![],
2064 partition: None,
2065 partition_spec: None,
2066 name_mapping: None,
2067 })]
2068 .into_iter(),
2069 )) as FileScanTaskStream;
2070
2071 let result = reader
2072 .read(tasks)
2073 .unwrap()
2074 .try_collect::<Vec<RecordBatch>>()
2075 .await
2076 .unwrap();
2077
2078 result[0].columns()[0]
2079 .as_string_opt::<i32>()
2080 .unwrap()
2081 .iter()
2082 .map(|v| v.map(ToOwned::to_owned))
2083 .collect::<Vec<_>>()
2084 }
2085
2086 fn setup_kleene_logic(
2087 data_for_col_a: Vec<Option<String>>,
2088 col_a_type: DataType,
2089 ) -> (FileIO, SchemaRef, String, TempDir) {
2090 let schema = Arc::new(
2091 Schema::builder()
2092 .with_schema_id(1)
2093 .with_fields(vec![
2094 NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(),
2095 ])
2096 .build()
2097 .unwrap(),
2098 );
2099
2100 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2101 Field::new("a", col_a_type.clone(), true).with_metadata(HashMap::from([(
2102 PARQUET_FIELD_ID_META_KEY.to_string(),
2103 "1".to_string(),
2104 )])),
2105 ]));
2106
2107 let tmp_dir = TempDir::new().unwrap();
2108 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2109
2110 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2111
2112 let col = match col_a_type {
2113 DataType::Utf8 => Arc::new(StringArray::from(data_for_col_a)) as ArrayRef,
2114 DataType::LargeUtf8 => Arc::new(LargeStringArray::from(data_for_col_a)) as ArrayRef,
2115 _ => panic!("unexpected col_a_type"),
2116 };
2117
2118 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col]).unwrap();
2119
2120 let props = WriterProperties::builder()
2122 .set_compression(Compression::SNAPPY)
2123 .build();
2124
2125 let file = File::create(format!("{}/1.parquet", &table_location)).unwrap();
2126 let mut writer =
2127 ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
2128
2129 writer.write(&to_write).expect("Writing batch");
2130
2131 writer.close().unwrap();
2133
2134 (file_io, schema, table_location, tmp_dir)
2135 }
2136
2137 #[test]
2138 fn test_build_deletes_row_selection() {
2139 let schema_descr = get_test_schema_descr();
2140
2141 let mut columns = vec![];
2142 for ptr in schema_descr.columns() {
2143 let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
2144 columns.push(column);
2145 }
2146
2147 let row_groups_metadata = vec![
2148 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 0),
2149 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 1),
2150 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 2),
2151 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 3),
2152 build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 4),
2153 ];
2154
2155 let selected_row_groups = Some(vec![1, 3]);
2156
2157 let positional_deletes = RoaringTreemap::from_iter(&[
2164 1, 3, 4, 5, 998, 999, 1000, 1010, 1011, 1012, 1498, 1499, 1500, 1501, 1600, 1999, 2000, 2001, 2100, 2200, 2201, 2202, 2999, 3000, ]);
2182
2183 let positional_deletes = DeleteVector::new(positional_deletes);
2184
2185 let result = ArrowReader::build_deletes_row_selection(
2187 &row_groups_metadata,
2188 &selected_row_groups,
2189 &positional_deletes,
2190 )
2191 .unwrap();
2192
2193 let expected = RowSelection::from(vec![
2194 RowSelector::skip(1),
2195 RowSelector::select(9),
2196 RowSelector::skip(3),
2197 RowSelector::select(485),
2198 RowSelector::skip(4),
2199 RowSelector::select(98),
2200 RowSelector::skip(1),
2201 RowSelector::select(99),
2202 RowSelector::skip(3),
2203 RowSelector::select(796),
2204 RowSelector::skip(1),
2205 ]);
2206
2207 assert_eq!(result, expected);
2208
2209 let result = ArrowReader::build_deletes_row_selection(
2211 &row_groups_metadata,
2212 &None,
2213 &positional_deletes,
2214 )
2215 .unwrap();
2216
2217 let expected = RowSelection::from(vec![
2218 RowSelector::select(1),
2219 RowSelector::skip(1),
2220 RowSelector::select(1),
2221 RowSelector::skip(3),
2222 RowSelector::select(992),
2223 RowSelector::skip(3),
2224 RowSelector::select(9),
2225 RowSelector::skip(3),
2226 RowSelector::select(485),
2227 RowSelector::skip(4),
2228 RowSelector::select(98),
2229 RowSelector::skip(1),
2230 RowSelector::select(398),
2231 RowSelector::skip(3),
2232 RowSelector::select(98),
2233 RowSelector::skip(1),
2234 RowSelector::select(99),
2235 RowSelector::skip(3),
2236 RowSelector::select(796),
2237 RowSelector::skip(2),
2238 RowSelector::select(499),
2239 ]);
2240
2241 assert_eq!(result, expected);
2242 }
2243
2244 fn build_test_row_group_meta(
2245 schema_descr: SchemaDescPtr,
2246 columns: Vec<ColumnChunkMetaData>,
2247 num_rows: i64,
2248 ordinal: i16,
2249 ) -> RowGroupMetaData {
2250 RowGroupMetaData::builder(schema_descr.clone())
2251 .set_num_rows(num_rows)
2252 .set_total_byte_size(2000)
2253 .set_column_metadata(columns)
2254 .set_ordinal(ordinal)
2255 .build()
2256 .unwrap()
2257 }
2258
2259 fn get_test_schema_descr() -> SchemaDescPtr {
2260 use parquet::schema::types::Type as SchemaType;
2261
2262 let schema = SchemaType::group_type_builder("schema")
2263 .with_fields(vec![
2264 Arc::new(
2265 SchemaType::primitive_type_builder("a", parquet::basic::Type::INT32)
2266 .build()
2267 .unwrap(),
2268 ),
2269 Arc::new(
2270 SchemaType::primitive_type_builder("b", parquet::basic::Type::INT32)
2271 .build()
2272 .unwrap(),
2273 ),
2274 ])
2275 .build()
2276 .unwrap();
2277
2278 Arc::new(SchemaDescriptor::new(Arc::new(schema)))
2279 }
2280
2281 #[tokio::test]
2283 async fn test_file_splits_respect_byte_ranges() {
2284 use arrow_array::Int32Array;
2285 use parquet::file::reader::{FileReader, SerializedFileReader};
2286
2287 let schema = Arc::new(
2288 Schema::builder()
2289 .with_schema_id(1)
2290 .with_fields(vec![
2291 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2292 ])
2293 .build()
2294 .unwrap(),
2295 );
2296
2297 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2298 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2299 PARQUET_FIELD_ID_META_KEY.to_string(),
2300 "1".to_string(),
2301 )])),
2302 ]));
2303
2304 let tmp_dir = TempDir::new().unwrap();
2305 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2306 let file_path = format!("{}/multi_row_group.parquet", &table_location);
2307
2308 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2310 (0..100).collect::<Vec<i32>>(),
2311 ))])
2312 .unwrap();
2313 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2314 (100..200).collect::<Vec<i32>>(),
2315 ))])
2316 .unwrap();
2317 let batch3 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2318 (200..300).collect::<Vec<i32>>(),
2319 ))])
2320 .unwrap();
2321
2322 let props = WriterProperties::builder()
2323 .set_compression(Compression::SNAPPY)
2324 .set_max_row_group_size(100)
2325 .build();
2326
2327 let file = File::create(&file_path).unwrap();
2328 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2329 writer.write(&batch1).expect("Writing batch 1");
2330 writer.write(&batch2).expect("Writing batch 2");
2331 writer.write(&batch3).expect("Writing batch 3");
2332 writer.close().unwrap();
2333
2334 let file = File::open(&file_path).unwrap();
2336 let reader = SerializedFileReader::new(file).unwrap();
2337 let metadata = reader.metadata();
2338
2339 println!("File has {} row groups", metadata.num_row_groups());
2340 assert_eq!(metadata.num_row_groups(), 3, "Expected 3 row groups");
2341
2342 let row_group_0 = metadata.row_group(0);
2344 let row_group_1 = metadata.row_group(1);
2345 let row_group_2 = metadata.row_group(2);
2346
2347 let rg0_start = 4u64; let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2349 let rg2_start = rg1_start + row_group_1.compressed_size() as u64;
2350 let file_end = rg2_start + row_group_2.compressed_size() as u64;
2351
2352 println!(
2353 "Row group 0: {} rows, starts at byte {}, {} bytes compressed",
2354 row_group_0.num_rows(),
2355 rg0_start,
2356 row_group_0.compressed_size()
2357 );
2358 println!(
2359 "Row group 1: {} rows, starts at byte {}, {} bytes compressed",
2360 row_group_1.num_rows(),
2361 rg1_start,
2362 row_group_1.compressed_size()
2363 );
2364 println!(
2365 "Row group 2: {} rows, starts at byte {}, {} bytes compressed",
2366 row_group_2.num_rows(),
2367 rg2_start,
2368 row_group_2.compressed_size()
2369 );
2370
2371 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2372 let reader = ArrowReaderBuilder::new(file_io).build();
2373
2374 let task1 = FileScanTask {
2376 start: rg0_start,
2377 length: row_group_0.compressed_size() as u64,
2378 record_count: Some(100),
2379 data_file_path: file_path.clone(),
2380 data_file_format: DataFileFormat::Parquet,
2381 schema: schema.clone(),
2382 project_field_ids: vec![1],
2383 predicate: None,
2384 deletes: vec![],
2385 partition: None,
2386 partition_spec: None,
2387 name_mapping: None,
2388 };
2389
2390 let task2 = FileScanTask {
2392 start: rg1_start,
2393 length: file_end - rg1_start,
2394 record_count: Some(200),
2395 data_file_path: file_path.clone(),
2396 data_file_format: DataFileFormat::Parquet,
2397 schema: schema.clone(),
2398 project_field_ids: vec![1],
2399 predicate: None,
2400 deletes: vec![],
2401 partition: None,
2402 partition_spec: None,
2403 name_mapping: None,
2404 };
2405
2406 let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
2407 let result1 = reader
2408 .clone()
2409 .read(tasks1)
2410 .unwrap()
2411 .try_collect::<Vec<RecordBatch>>()
2412 .await
2413 .unwrap();
2414
2415 let total_rows_task1: usize = result1.iter().map(|b| b.num_rows()).sum();
2416 println!(
2417 "Task 1 (bytes {}-{}) returned {} rows",
2418 rg0_start,
2419 rg0_start + row_group_0.compressed_size() as u64,
2420 total_rows_task1
2421 );
2422
2423 let tasks2 = Box::pin(futures::stream::iter(vec![Ok(task2)])) as FileScanTaskStream;
2424 let result2 = reader
2425 .read(tasks2)
2426 .unwrap()
2427 .try_collect::<Vec<RecordBatch>>()
2428 .await
2429 .unwrap();
2430
2431 let total_rows_task2: usize = result2.iter().map(|b| b.num_rows()).sum();
2432 println!("Task 2 (bytes {rg1_start}-{file_end}) returned {total_rows_task2} rows");
2433
2434 assert_eq!(
2435 total_rows_task1, 100,
2436 "Task 1 should read only the first row group (100 rows), but got {total_rows_task1} rows"
2437 );
2438
2439 assert_eq!(
2440 total_rows_task2, 200,
2441 "Task 2 should read only the second+third row groups (200 rows), but got {total_rows_task2} rows"
2442 );
2443
2444 if total_rows_task1 > 0 {
2446 let first_batch = &result1[0];
2447 let id_col = first_batch
2448 .column(0)
2449 .as_primitive::<arrow_array::types::Int32Type>();
2450 let first_val = id_col.value(0);
2451 let last_val = id_col.value(id_col.len() - 1);
2452 println!("Task 1 data range: {first_val} to {last_val}");
2453
2454 assert_eq!(first_val, 0, "Task 1 should start with id=0");
2455 assert_eq!(last_val, 99, "Task 1 should end with id=99");
2456 }
2457
2458 if total_rows_task2 > 0 {
2459 let first_batch = &result2[0];
2460 let id_col = first_batch
2461 .column(0)
2462 .as_primitive::<arrow_array::types::Int32Type>();
2463 let first_val = id_col.value(0);
2464 println!("Task 2 first value: {first_val}");
2465
2466 assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0");
2467 }
2468 }
2469
2470 #[tokio::test]
2476 async fn test_schema_evolution_add_column() {
2477 use arrow_array::{Array, Int32Array};
2478
2479 let new_schema = Arc::new(
2481 Schema::builder()
2482 .with_schema_id(2)
2483 .with_fields(vec![
2484 NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)).into(),
2485 NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
2486 ])
2487 .build()
2488 .unwrap(),
2489 );
2490
2491 let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
2493 Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([(
2494 PARQUET_FIELD_ID_META_KEY.to_string(),
2495 "1".to_string(),
2496 )])),
2497 ]));
2498
2499 let tmp_dir = TempDir::new().unwrap();
2501 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2502 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2503
2504 let data_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
2505 let to_write = RecordBatch::try_new(arrow_schema_old.clone(), vec![data_a]).unwrap();
2506
2507 let props = WriterProperties::builder()
2508 .set_compression(Compression::SNAPPY)
2509 .build();
2510 let file = File::create(format!("{}/old_file.parquet", &table_location)).unwrap();
2511 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
2512 writer.write(&to_write).expect("Writing batch");
2513 writer.close().unwrap();
2514
2515 let reader = ArrowReaderBuilder::new(file_io).build();
2517 let tasks = Box::pin(futures::stream::iter(
2518 vec![Ok(FileScanTask {
2519 start: 0,
2520 length: 0,
2521 record_count: None,
2522 data_file_path: format!("{table_location}/old_file.parquet"),
2523 data_file_format: DataFileFormat::Parquet,
2524 schema: new_schema.clone(),
2525 project_field_ids: vec![1, 2], predicate: None,
2527 deletes: vec![],
2528 partition: None,
2529 partition_spec: None,
2530 name_mapping: None,
2531 })]
2532 .into_iter(),
2533 )) as FileScanTaskStream;
2534
2535 let result = reader
2536 .read(tasks)
2537 .unwrap()
2538 .try_collect::<Vec<RecordBatch>>()
2539 .await
2540 .unwrap();
2541
2542 assert_eq!(result.len(), 1);
2544 let batch = &result[0];
2545
2546 assert_eq!(batch.num_columns(), 2);
2548 assert_eq!(batch.num_rows(), 3);
2549
2550 let col_a = batch
2552 .column(0)
2553 .as_primitive::<arrow_array::types::Int32Type>();
2554 assert_eq!(col_a.values(), &[1, 2, 3]);
2555
2556 let col_b = batch
2558 .column(1)
2559 .as_primitive::<arrow_array::types::Int32Type>();
2560 assert_eq!(col_b.null_count(), 3);
2561 assert!(col_b.is_null(0));
2562 assert!(col_b.is_null(1));
2563 assert!(col_b.is_null(2));
2564 }
2565
2566 #[tokio::test]
2584 async fn test_position_delete_across_multiple_row_groups() {
2585 use arrow_array::{Int32Array, Int64Array};
2586 use parquet::file::reader::{FileReader, SerializedFileReader};
2587
2588 const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2590 const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2591
2592 let tmp_dir = TempDir::new().unwrap();
2593 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2594
2595 let table_schema = Arc::new(
2597 Schema::builder()
2598 .with_schema_id(1)
2599 .with_fields(vec![
2600 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2601 ])
2602 .build()
2603 .unwrap(),
2604 );
2605
2606 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2607 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2608 PARQUET_FIELD_ID_META_KEY.to_string(),
2609 "1".to_string(),
2610 )])),
2611 ]));
2612
2613 let data_file_path = format!("{}/data.parquet", &table_location);
2617
2618 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2619 Int32Array::from_iter_values(1..=100),
2620 )])
2621 .unwrap();
2622
2623 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2624 Int32Array::from_iter_values(101..=200),
2625 )])
2626 .unwrap();
2627
2628 let props = WriterProperties::builder()
2630 .set_compression(Compression::SNAPPY)
2631 .set_max_row_group_size(100)
2632 .build();
2633
2634 let file = File::create(&data_file_path).unwrap();
2635 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2636 writer.write(&batch1).expect("Writing batch 1");
2637 writer.write(&batch2).expect("Writing batch 2");
2638 writer.close().unwrap();
2639
2640 let verify_file = File::open(&data_file_path).unwrap();
2642 let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2643 assert_eq!(
2644 verify_reader.metadata().num_row_groups(),
2645 2,
2646 "Should have 2 row groups"
2647 );
2648
2649 let delete_file_path = format!("{}/deletes.parquet", &table_location);
2651
2652 let delete_schema = Arc::new(ArrowSchema::new(vec![
2653 Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2654 PARQUET_FIELD_ID_META_KEY.to_string(),
2655 FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2656 )])),
2657 Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2658 PARQUET_FIELD_ID_META_KEY.to_string(),
2659 FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2660 )])),
2661 ]));
2662
2663 let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2665 Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2666 Arc::new(Int64Array::from_iter_values(vec![199i64])),
2667 ])
2668 .unwrap();
2669
2670 let delete_props = WriterProperties::builder()
2671 .set_compression(Compression::SNAPPY)
2672 .build();
2673
2674 let delete_file = File::create(&delete_file_path).unwrap();
2675 let mut delete_writer =
2676 ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2677 delete_writer.write(&delete_batch).unwrap();
2678 delete_writer.close().unwrap();
2679
2680 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2682 let reader = ArrowReaderBuilder::new(file_io).build();
2683
2684 let task = FileScanTask {
2685 start: 0,
2686 length: 0,
2687 record_count: Some(200),
2688 data_file_path: data_file_path.clone(),
2689 data_file_format: DataFileFormat::Parquet,
2690 schema: table_schema.clone(),
2691 project_field_ids: vec![1],
2692 predicate: None,
2693 deletes: vec![FileScanTaskDeleteFile {
2694 file_path: delete_file_path,
2695 file_type: DataContentType::PositionDeletes,
2696 partition_spec_id: 0,
2697 equality_ids: None,
2698 }],
2699 partition: None,
2700 partition_spec: None,
2701 name_mapping: None,
2702 };
2703
2704 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
2705 let result = reader
2706 .read(tasks)
2707 .unwrap()
2708 .try_collect::<Vec<RecordBatch>>()
2709 .await
2710 .unwrap();
2711
2712 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
2714
2715 println!("Total rows read: {}", total_rows);
2716 println!("Expected: 199 rows (deleted row 199 which had id=200)");
2717
2718 assert_eq!(
2720 total_rows, 199,
2721 "Expected 199 rows after deleting row 199, but got {} rows. \
2722 The bug causes position deletes in later row groups to be ignored.",
2723 total_rows
2724 );
2725
2726 let all_ids: Vec<i32> = result
2728 .iter()
2729 .flat_map(|batch| {
2730 batch
2731 .column(0)
2732 .as_primitive::<arrow_array::types::Int32Type>()
2733 .values()
2734 .iter()
2735 .copied()
2736 })
2737 .collect();
2738
2739 assert!(
2740 !all_ids.contains(&200),
2741 "Row with id=200 should be deleted but was found in results"
2742 );
2743
2744 let expected_ids: Vec<i32> = (1..=199).collect();
2746 assert_eq!(
2747 all_ids, expected_ids,
2748 "Should have ids 1-199 but got different values"
2749 );
2750 }
2751
2752 #[tokio::test]
2778 async fn test_position_delete_with_row_group_selection() {
2779 use arrow_array::{Int32Array, Int64Array};
2780 use parquet::file::reader::{FileReader, SerializedFileReader};
2781
2782 const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
2784 const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
2785
2786 let tmp_dir = TempDir::new().unwrap();
2787 let table_location = tmp_dir.path().to_str().unwrap().to_string();
2788
2789 let table_schema = Arc::new(
2791 Schema::builder()
2792 .with_schema_id(1)
2793 .with_fields(vec![
2794 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2795 ])
2796 .build()
2797 .unwrap(),
2798 );
2799
2800 let arrow_schema = Arc::new(ArrowSchema::new(vec![
2801 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2802 PARQUET_FIELD_ID_META_KEY.to_string(),
2803 "1".to_string(),
2804 )])),
2805 ]));
2806
2807 let data_file_path = format!("{}/data.parquet", &table_location);
2811
2812 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2813 Int32Array::from_iter_values(1..=100),
2814 )])
2815 .unwrap();
2816
2817 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
2818 Int32Array::from_iter_values(101..=200),
2819 )])
2820 .unwrap();
2821
2822 let props = WriterProperties::builder()
2824 .set_compression(Compression::SNAPPY)
2825 .set_max_row_group_size(100)
2826 .build();
2827
2828 let file = File::create(&data_file_path).unwrap();
2829 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2830 writer.write(&batch1).expect("Writing batch 1");
2831 writer.write(&batch2).expect("Writing batch 2");
2832 writer.close().unwrap();
2833
2834 let verify_file = File::open(&data_file_path).unwrap();
2836 let verify_reader = SerializedFileReader::new(verify_file).unwrap();
2837 assert_eq!(
2838 verify_reader.metadata().num_row_groups(),
2839 2,
2840 "Should have 2 row groups"
2841 );
2842
2843 let delete_file_path = format!("{}/deletes.parquet", &table_location);
2845
2846 let delete_schema = Arc::new(ArrowSchema::new(vec![
2847 Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
2848 PARQUET_FIELD_ID_META_KEY.to_string(),
2849 FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
2850 )])),
2851 Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
2852 PARQUET_FIELD_ID_META_KEY.to_string(),
2853 FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
2854 )])),
2855 ]));
2856
2857 let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
2859 Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
2860 Arc::new(Int64Array::from_iter_values(vec![199i64])),
2861 ])
2862 .unwrap();
2863
2864 let delete_props = WriterProperties::builder()
2865 .set_compression(Compression::SNAPPY)
2866 .build();
2867
2868 let delete_file = File::create(&delete_file_path).unwrap();
2869 let mut delete_writer =
2870 ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
2871 delete_writer.write(&delete_batch).unwrap();
2872 delete_writer.close().unwrap();
2873
2874 let metadata_file = File::open(&data_file_path).unwrap();
2877 let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
2878 let metadata = metadata_reader.metadata();
2879
2880 let row_group_0 = metadata.row_group(0);
2881 let row_group_1 = metadata.row_group(1);
2882
2883 let rg0_start = 4u64; let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2885 let rg1_length = row_group_1.compressed_size() as u64;
2886
2887 println!(
2888 "Row group 0: starts at byte {}, {} bytes compressed",
2889 rg0_start,
2890 row_group_0.compressed_size()
2891 );
2892 println!(
2893 "Row group 1: starts at byte {}, {} bytes compressed",
2894 rg1_start,
2895 row_group_1.compressed_size()
2896 );
2897
2898 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2899 let reader = ArrowReaderBuilder::new(file_io).build();
2900
2901 let task = FileScanTask {
2903 start: rg1_start,
2904 length: rg1_length,
2905 record_count: Some(100), data_file_path: data_file_path.clone(),
2907 data_file_format: DataFileFormat::Parquet,
2908 schema: table_schema.clone(),
2909 project_field_ids: vec![1],
2910 predicate: None,
2911 deletes: vec![FileScanTaskDeleteFile {
2912 file_path: delete_file_path,
2913 file_type: DataContentType::PositionDeletes,
2914 partition_spec_id: 0,
2915 equality_ids: None,
2916 }],
2917 partition: None,
2918 partition_spec: None,
2919 name_mapping: None,
2920 };
2921
2922 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
2923 let result = reader
2924 .read(tasks)
2925 .unwrap()
2926 .try_collect::<Vec<RecordBatch>>()
2927 .await
2928 .unwrap();
2929
2930 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
2933
2934 println!("Total rows read from row group 1: {}", total_rows);
2935 println!("Expected: 99 rows (row group 1 has 100 rows, 1 delete at position 199)");
2936
2937 assert_eq!(
2939 total_rows, 99,
2940 "Expected 99 rows from row group 1 after deleting position 199, but got {} rows. \
2941 The bug causes position deletes to be lost when advance_to() is followed by next() \
2942 when skipping unselected row groups.",
2943 total_rows
2944 );
2945
2946 let all_ids: Vec<i32> = result
2948 .iter()
2949 .flat_map(|batch| {
2950 batch
2951 .column(0)
2952 .as_primitive::<arrow_array::types::Int32Type>()
2953 .values()
2954 .iter()
2955 .copied()
2956 })
2957 .collect();
2958
2959 assert!(
2960 !all_ids.contains(&200),
2961 "Row with id=200 should be deleted but was found in results"
2962 );
2963
2964 let expected_ids: Vec<i32> = (101..=199).collect();
2966 assert_eq!(
2967 all_ids, expected_ids,
2968 "Should have ids 101-199 but got different values"
2969 );
2970 }
2971 #[tokio::test]
3000 async fn test_position_delete_in_skipped_row_group() {
3001 use arrow_array::{Int32Array, Int64Array};
3002 use parquet::file::reader::{FileReader, SerializedFileReader};
3003
3004 const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
3006 const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
3007
3008 let tmp_dir = TempDir::new().unwrap();
3009 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3010
3011 let table_schema = Arc::new(
3013 Schema::builder()
3014 .with_schema_id(1)
3015 .with_fields(vec![
3016 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3017 ])
3018 .build()
3019 .unwrap(),
3020 );
3021
3022 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3023 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3024 PARQUET_FIELD_ID_META_KEY.to_string(),
3025 "1".to_string(),
3026 )])),
3027 ]));
3028
3029 let data_file_path = format!("{}/data.parquet", &table_location);
3033
3034 let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3035 Int32Array::from_iter_values(1..=100),
3036 )])
3037 .unwrap();
3038
3039 let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
3040 Int32Array::from_iter_values(101..=200),
3041 )])
3042 .unwrap();
3043
3044 let props = WriterProperties::builder()
3046 .set_compression(Compression::SNAPPY)
3047 .set_max_row_group_size(100)
3048 .build();
3049
3050 let file = File::create(&data_file_path).unwrap();
3051 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3052 writer.write(&batch1).expect("Writing batch 1");
3053 writer.write(&batch2).expect("Writing batch 2");
3054 writer.close().unwrap();
3055
3056 let verify_file = File::open(&data_file_path).unwrap();
3058 let verify_reader = SerializedFileReader::new(verify_file).unwrap();
3059 assert_eq!(
3060 verify_reader.metadata().num_row_groups(),
3061 2,
3062 "Should have 2 row groups"
3063 );
3064
3065 let delete_file_path = format!("{}/deletes.parquet", &table_location);
3067
3068 let delete_schema = Arc::new(ArrowSchema::new(vec![
3069 Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
3070 PARQUET_FIELD_ID_META_KEY.to_string(),
3071 FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
3072 )])),
3073 Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
3074 PARQUET_FIELD_ID_META_KEY.to_string(),
3075 FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
3076 )])),
3077 ]));
3078
3079 let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![
3081 Arc::new(StringArray::from_iter_values(vec![data_file_path.clone()])),
3082 Arc::new(Int64Array::from_iter_values(vec![0i64])),
3083 ])
3084 .unwrap();
3085
3086 let delete_props = WriterProperties::builder()
3087 .set_compression(Compression::SNAPPY)
3088 .build();
3089
3090 let delete_file = File::create(&delete_file_path).unwrap();
3091 let mut delete_writer =
3092 ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap();
3093 delete_writer.write(&delete_batch).unwrap();
3094 delete_writer.close().unwrap();
3095
3096 let metadata_file = File::open(&data_file_path).unwrap();
3099 let metadata_reader = SerializedFileReader::new(metadata_file).unwrap();
3100 let metadata = metadata_reader.metadata();
3101
3102 let row_group_0 = metadata.row_group(0);
3103 let row_group_1 = metadata.row_group(1);
3104
3105 let rg0_start = 4u64; let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
3107 let rg1_length = row_group_1.compressed_size() as u64;
3108
3109 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3110 let reader = ArrowReaderBuilder::new(file_io).build();
3111
3112 let task = FileScanTask {
3114 start: rg1_start,
3115 length: rg1_length,
3116 record_count: Some(100), data_file_path: data_file_path.clone(),
3118 data_file_format: DataFileFormat::Parquet,
3119 schema: table_schema.clone(),
3120 project_field_ids: vec![1],
3121 predicate: None,
3122 deletes: vec![FileScanTaskDeleteFile {
3123 file_path: delete_file_path,
3124 file_type: DataContentType::PositionDeletes,
3125 partition_spec_id: 0,
3126 equality_ids: None,
3127 }],
3128 partition: None,
3129 partition_spec: None,
3130 name_mapping: None,
3131 };
3132
3133 let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
3134 let result = reader
3135 .read(tasks)
3136 .unwrap()
3137 .try_collect::<Vec<RecordBatch>>()
3138 .await
3139 .unwrap();
3140
3141 let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
3144
3145 assert_eq!(
3146 total_rows, 100,
3147 "Expected 100 rows from row group 1 (delete at position 0 is in skipped row group 0). \
3148 If this hangs or fails, it indicates the cached delete index was not updated after advance_to()."
3149 );
3150
3151 let all_ids: Vec<i32> = result
3153 .iter()
3154 .flat_map(|batch| {
3155 batch
3156 .column(0)
3157 .as_primitive::<arrow_array::types::Int32Type>()
3158 .values()
3159 .iter()
3160 .copied()
3161 })
3162 .collect();
3163
3164 let expected_ids: Vec<i32> = (101..=200).collect();
3165 assert_eq!(
3166 all_ids, expected_ids,
3167 "Should have ids 101-200 (all of row group 1)"
3168 );
3169 }
3170
3171 #[tokio::test]
3177 async fn test_read_parquet_file_without_field_ids() {
3178 let schema = Arc::new(
3179 Schema::builder()
3180 .with_schema_id(1)
3181 .with_fields(vec![
3182 NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3183 NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3184 ])
3185 .build()
3186 .unwrap(),
3187 );
3188
3189 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3191 Field::new("name", DataType::Utf8, false),
3192 Field::new("age", DataType::Int32, false),
3193 ]));
3194
3195 let tmp_dir = TempDir::new().unwrap();
3196 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3197 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3198
3199 let name_data = vec!["Alice", "Bob", "Charlie"];
3200 let age_data = vec![30, 25, 35];
3201
3202 use arrow_array::Int32Array;
3203 let name_col = Arc::new(StringArray::from(name_data.clone())) as ArrayRef;
3204 let age_col = Arc::new(Int32Array::from(age_data.clone())) as ArrayRef;
3205
3206 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![name_col, age_col]).unwrap();
3207
3208 let props = WriterProperties::builder()
3209 .set_compression(Compression::SNAPPY)
3210 .build();
3211
3212 let file = File::create(format!("{}/1.parquet", &table_location)).unwrap();
3213 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3214
3215 writer.write(&to_write).expect("Writing batch");
3216 writer.close().unwrap();
3217
3218 let reader = ArrowReaderBuilder::new(file_io).build();
3219
3220 let tasks = Box::pin(futures::stream::iter(
3221 vec![Ok(FileScanTask {
3222 start: 0,
3223 length: 0,
3224 record_count: None,
3225 data_file_path: format!("{}/1.parquet", table_location),
3226 data_file_format: DataFileFormat::Parquet,
3227 schema: schema.clone(),
3228 project_field_ids: vec![1, 2],
3229 predicate: None,
3230 deletes: vec![],
3231 partition: None,
3232 partition_spec: None,
3233 name_mapping: None,
3234 })]
3235 .into_iter(),
3236 )) as FileScanTaskStream;
3237
3238 let result = reader
3239 .read(tasks)
3240 .unwrap()
3241 .try_collect::<Vec<RecordBatch>>()
3242 .await
3243 .unwrap();
3244
3245 assert_eq!(result.len(), 1);
3246 let batch = &result[0];
3247 assert_eq!(batch.num_rows(), 3);
3248 assert_eq!(batch.num_columns(), 2);
3249
3250 let name_array = batch.column(0).as_string::<i32>();
3252 assert_eq!(name_array.value(0), "Alice");
3253 assert_eq!(name_array.value(1), "Bob");
3254 assert_eq!(name_array.value(2), "Charlie");
3255
3256 let age_array = batch
3257 .column(1)
3258 .as_primitive::<arrow_array::types::Int32Type>();
3259 assert_eq!(age_array.value(0), 30);
3260 assert_eq!(age_array.value(1), 25);
3261 assert_eq!(age_array.value(2), 35);
3262 }
3263
3264 #[tokio::test]
3268 async fn test_read_parquet_without_field_ids_partial_projection() {
3269 use arrow_array::Int32Array;
3270
3271 let schema = Arc::new(
3272 Schema::builder()
3273 .with_schema_id(1)
3274 .with_fields(vec![
3275 NestedField::required(1, "col1", Type::Primitive(PrimitiveType::String)).into(),
3276 NestedField::required(2, "col2", Type::Primitive(PrimitiveType::Int)).into(),
3277 NestedField::required(3, "col3", Type::Primitive(PrimitiveType::String)).into(),
3278 NestedField::required(4, "col4", Type::Primitive(PrimitiveType::Int)).into(),
3279 ])
3280 .build()
3281 .unwrap(),
3282 );
3283
3284 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3285 Field::new("col1", DataType::Utf8, false),
3286 Field::new("col2", DataType::Int32, false),
3287 Field::new("col3", DataType::Utf8, false),
3288 Field::new("col4", DataType::Int32, false),
3289 ]));
3290
3291 let tmp_dir = TempDir::new().unwrap();
3292 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3293 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3294
3295 let col1_data = Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef;
3296 let col2_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3297 let col3_data = Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef;
3298 let col4_data = Arc::new(Int32Array::from(vec![30, 40])) as ArrayRef;
3299
3300 let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![
3301 col1_data, col2_data, col3_data, col4_data,
3302 ])
3303 .unwrap();
3304
3305 let props = WriterProperties::builder()
3306 .set_compression(Compression::SNAPPY)
3307 .build();
3308
3309 let file = File::create(format!("{}/1.parquet", &table_location)).unwrap();
3310 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3311
3312 writer.write(&to_write).expect("Writing batch");
3313 writer.close().unwrap();
3314
3315 let reader = ArrowReaderBuilder::new(file_io).build();
3316
3317 let tasks = Box::pin(futures::stream::iter(
3318 vec![Ok(FileScanTask {
3319 start: 0,
3320 length: 0,
3321 record_count: None,
3322 data_file_path: format!("{}/1.parquet", table_location),
3323 data_file_format: DataFileFormat::Parquet,
3324 schema: schema.clone(),
3325 project_field_ids: vec![1, 3],
3326 predicate: None,
3327 deletes: vec![],
3328 partition: None,
3329 partition_spec: None,
3330 name_mapping: None,
3331 })]
3332 .into_iter(),
3333 )) as FileScanTaskStream;
3334
3335 let result = reader
3336 .read(tasks)
3337 .unwrap()
3338 .try_collect::<Vec<RecordBatch>>()
3339 .await
3340 .unwrap();
3341
3342 assert_eq!(result.len(), 1);
3343 let batch = &result[0];
3344 assert_eq!(batch.num_rows(), 2);
3345 assert_eq!(batch.num_columns(), 2);
3346
3347 let col1_array = batch.column(0).as_string::<i32>();
3348 assert_eq!(col1_array.value(0), "a");
3349 assert_eq!(col1_array.value(1), "b");
3350
3351 let col3_array = batch.column(1).as_string::<i32>();
3352 assert_eq!(col3_array.value(0), "c");
3353 assert_eq!(col3_array.value(1), "d");
3354 }
3355
3356 #[tokio::test]
3360 async fn test_read_parquet_without_field_ids_schema_evolution() {
3361 use arrow_array::{Array, Int32Array};
3362
3363 let schema = Arc::new(
3365 Schema::builder()
3366 .with_schema_id(1)
3367 .with_fields(vec![
3368 NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3369 NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(),
3370 NestedField::optional(3, "city", Type::Primitive(PrimitiveType::String)).into(),
3371 ])
3372 .build()
3373 .unwrap(),
3374 );
3375
3376 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3377 Field::new("name", DataType::Utf8, false),
3378 Field::new("age", DataType::Int32, false),
3379 ]));
3380
3381 let tmp_dir = TempDir::new().unwrap();
3382 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3383 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3384
3385 let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3386 let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3387
3388 let to_write =
3389 RecordBatch::try_new(arrow_schema.clone(), vec![name_data, age_data]).unwrap();
3390
3391 let props = WriterProperties::builder()
3392 .set_compression(Compression::SNAPPY)
3393 .build();
3394
3395 let file = File::create(format!("{}/1.parquet", &table_location)).unwrap();
3396 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3397
3398 writer.write(&to_write).expect("Writing batch");
3399 writer.close().unwrap();
3400
3401 let reader = ArrowReaderBuilder::new(file_io).build();
3402
3403 let tasks = Box::pin(futures::stream::iter(
3404 vec![Ok(FileScanTask {
3405 start: 0,
3406 length: 0,
3407 record_count: None,
3408 data_file_path: format!("{}/1.parquet", table_location),
3409 data_file_format: DataFileFormat::Parquet,
3410 schema: schema.clone(),
3411 project_field_ids: vec![1, 2, 3],
3412 predicate: None,
3413 deletes: vec![],
3414 partition: None,
3415 partition_spec: None,
3416 name_mapping: None,
3417 })]
3418 .into_iter(),
3419 )) as FileScanTaskStream;
3420
3421 let result = reader
3422 .read(tasks)
3423 .unwrap()
3424 .try_collect::<Vec<RecordBatch>>()
3425 .await
3426 .unwrap();
3427
3428 assert_eq!(result.len(), 1);
3429 let batch = &result[0];
3430 assert_eq!(batch.num_rows(), 2);
3431 assert_eq!(batch.num_columns(), 3);
3432
3433 let name_array = batch.column(0).as_string::<i32>();
3434 assert_eq!(name_array.value(0), "Alice");
3435 assert_eq!(name_array.value(1), "Bob");
3436
3437 let age_array = batch
3438 .column(1)
3439 .as_primitive::<arrow_array::types::Int32Type>();
3440 assert_eq!(age_array.value(0), 30);
3441 assert_eq!(age_array.value(1), 25);
3442
3443 let city_array = batch.column(2).as_string::<i32>();
3445 assert_eq!(city_array.null_count(), 2);
3446 assert!(city_array.is_null(0));
3447 assert!(city_array.is_null(1));
3448 }
3449
3450 #[tokio::test]
3453 async fn test_read_parquet_without_field_ids_multiple_row_groups() {
3454 use arrow_array::Int32Array;
3455
3456 let schema = Arc::new(
3457 Schema::builder()
3458 .with_schema_id(1)
3459 .with_fields(vec![
3460 NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
3461 NestedField::required(2, "value", Type::Primitive(PrimitiveType::Int)).into(),
3462 ])
3463 .build()
3464 .unwrap(),
3465 );
3466
3467 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3468 Field::new("name", DataType::Utf8, false),
3469 Field::new("value", DataType::Int32, false),
3470 ]));
3471
3472 let tmp_dir = TempDir::new().unwrap();
3473 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3474 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3475
3476 let props = WriterProperties::builder()
3478 .set_compression(Compression::SNAPPY)
3479 .set_write_batch_size(2)
3480 .set_max_row_group_size(2)
3481 .build();
3482
3483 let file = File::create(format!("{}/1.parquet", &table_location)).unwrap();
3484 let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
3485
3486 for batch_num in 0..3 {
3488 let name_data = Arc::new(StringArray::from(vec![
3489 format!("name_{}", batch_num * 2),
3490 format!("name_{}", batch_num * 2 + 1),
3491 ])) as ArrayRef;
3492 let value_data =
3493 Arc::new(Int32Array::from(vec![batch_num * 2, batch_num * 2 + 1])) as ArrayRef;
3494
3495 let batch =
3496 RecordBatch::try_new(arrow_schema.clone(), vec![name_data, value_data]).unwrap();
3497 writer.write(&batch).expect("Writing batch");
3498 }
3499 writer.close().unwrap();
3500
3501 let reader = ArrowReaderBuilder::new(file_io).build();
3502
3503 let tasks = Box::pin(futures::stream::iter(
3504 vec![Ok(FileScanTask {
3505 start: 0,
3506 length: 0,
3507 record_count: None,
3508 data_file_path: format!("{}/1.parquet", table_location),
3509 data_file_format: DataFileFormat::Parquet,
3510 schema: schema.clone(),
3511 project_field_ids: vec![1, 2],
3512 predicate: None,
3513 deletes: vec![],
3514 partition: None,
3515 partition_spec: None,
3516 name_mapping: None,
3517 })]
3518 .into_iter(),
3519 )) as FileScanTaskStream;
3520
3521 let result = reader
3522 .read(tasks)
3523 .unwrap()
3524 .try_collect::<Vec<RecordBatch>>()
3525 .await
3526 .unwrap();
3527
3528 assert!(!result.is_empty());
3529
3530 let mut all_names = Vec::new();
3531 let mut all_values = Vec::new();
3532
3533 for batch in &result {
3534 let name_array = batch.column(0).as_string::<i32>();
3535 let value_array = batch
3536 .column(1)
3537 .as_primitive::<arrow_array::types::Int32Type>();
3538
3539 for i in 0..batch.num_rows() {
3540 all_names.push(name_array.value(i).to_string());
3541 all_values.push(value_array.value(i));
3542 }
3543 }
3544
3545 assert_eq!(all_names.len(), 6);
3546 assert_eq!(all_values.len(), 6);
3547
3548 for i in 0..6 {
3549 assert_eq!(all_names[i], format!("name_{}", i));
3550 assert_eq!(all_values[i], i as i32);
3551 }
3552 }
3553
3554 #[tokio::test]
3558 async fn test_read_parquet_without_field_ids_with_struct() {
3559 use arrow_array::{Int32Array, StructArray};
3560 use arrow_schema::Fields;
3561
3562 let schema = Arc::new(
3563 Schema::builder()
3564 .with_schema_id(1)
3565 .with_fields(vec![
3566 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3567 NestedField::required(
3568 2,
3569 "person",
3570 Type::Struct(crate::spec::StructType::new(vec![
3571 NestedField::required(
3572 3,
3573 "name",
3574 Type::Primitive(PrimitiveType::String),
3575 )
3576 .into(),
3577 NestedField::required(4, "age", Type::Primitive(PrimitiveType::Int))
3578 .into(),
3579 ])),
3580 )
3581 .into(),
3582 ])
3583 .build()
3584 .unwrap(),
3585 );
3586
3587 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3588 Field::new("id", DataType::Int32, false),
3589 Field::new(
3590 "person",
3591 DataType::Struct(Fields::from(vec![
3592 Field::new("name", DataType::Utf8, false),
3593 Field::new("age", DataType::Int32, false),
3594 ])),
3595 false,
3596 ),
3597 ]));
3598
3599 let tmp_dir = TempDir::new().unwrap();
3600 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3601 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3602
3603 let id_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3604 let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef;
3605 let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef;
3606 let person_data = Arc::new(StructArray::from(vec![
3607 (
3608 Arc::new(Field::new("name", DataType::Utf8, false)),
3609 name_data,
3610 ),
3611 (
3612 Arc::new(Field::new("age", DataType::Int32, false)),
3613 age_data,
3614 ),
3615 ])) as ArrayRef;
3616
3617 let to_write =
3618 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, person_data]).unwrap();
3619
3620 let props = WriterProperties::builder()
3621 .set_compression(Compression::SNAPPY)
3622 .build();
3623
3624 let file = File::create(format!("{}/1.parquet", &table_location)).unwrap();
3625 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3626
3627 writer.write(&to_write).expect("Writing batch");
3628 writer.close().unwrap();
3629
3630 let reader = ArrowReaderBuilder::new(file_io).build();
3631
3632 let tasks = Box::pin(futures::stream::iter(
3633 vec![Ok(FileScanTask {
3634 start: 0,
3635 length: 0,
3636 record_count: None,
3637 data_file_path: format!("{}/1.parquet", table_location),
3638 data_file_format: DataFileFormat::Parquet,
3639 schema: schema.clone(),
3640 project_field_ids: vec![1, 2],
3641 predicate: None,
3642 deletes: vec![],
3643 partition: None,
3644 partition_spec: None,
3645 name_mapping: None,
3646 })]
3647 .into_iter(),
3648 )) as FileScanTaskStream;
3649
3650 let result = reader
3651 .read(tasks)
3652 .unwrap()
3653 .try_collect::<Vec<RecordBatch>>()
3654 .await
3655 .unwrap();
3656
3657 assert_eq!(result.len(), 1);
3658 let batch = &result[0];
3659 assert_eq!(batch.num_rows(), 2);
3660 assert_eq!(batch.num_columns(), 2);
3661
3662 let id_array = batch
3663 .column(0)
3664 .as_primitive::<arrow_array::types::Int32Type>();
3665 assert_eq!(id_array.value(0), 1);
3666 assert_eq!(id_array.value(1), 2);
3667
3668 let person_array = batch.column(1).as_struct();
3669 assert_eq!(person_array.num_columns(), 2);
3670
3671 let name_array = person_array.column(0).as_string::<i32>();
3672 assert_eq!(name_array.value(0), "Alice");
3673 assert_eq!(name_array.value(1), "Bob");
3674
3675 let age_array = person_array
3676 .column(1)
3677 .as_primitive::<arrow_array::types::Int32Type>();
3678 assert_eq!(age_array.value(0), 30);
3679 assert_eq!(age_array.value(1), 25);
3680 }
3681
3682 #[tokio::test]
3686 async fn test_read_parquet_without_field_ids_schema_evolution_add_column_in_middle() {
3687 use arrow_array::{Array, Int32Array};
3688
3689 let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
3690 Field::new("col0", DataType::Int32, true),
3691 Field::new("col1", DataType::Int32, true),
3692 ]));
3693
3694 let schema = Arc::new(
3696 Schema::builder()
3697 .with_schema_id(1)
3698 .with_fields(vec![
3699 NestedField::optional(1, "col0", Type::Primitive(PrimitiveType::Int)).into(),
3700 NestedField::optional(5, "newCol", Type::Primitive(PrimitiveType::Int)).into(),
3701 NestedField::optional(2, "col1", Type::Primitive(PrimitiveType::Int)).into(),
3702 ])
3703 .build()
3704 .unwrap(),
3705 );
3706
3707 let tmp_dir = TempDir::new().unwrap();
3708 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3709 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3710
3711 let col0_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef;
3712 let col1_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef;
3713
3714 let to_write =
3715 RecordBatch::try_new(arrow_schema_old.clone(), vec![col0_data, col1_data]).unwrap();
3716
3717 let props = WriterProperties::builder()
3718 .set_compression(Compression::SNAPPY)
3719 .build();
3720
3721 let file = File::create(format!("{}/1.parquet", &table_location)).unwrap();
3722 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3723 writer.write(&to_write).expect("Writing batch");
3724 writer.close().unwrap();
3725
3726 let reader = ArrowReaderBuilder::new(file_io).build();
3727
3728 let tasks = Box::pin(futures::stream::iter(
3729 vec![Ok(FileScanTask {
3730 start: 0,
3731 length: 0,
3732 record_count: None,
3733 data_file_path: format!("{}/1.parquet", table_location),
3734 data_file_format: DataFileFormat::Parquet,
3735 schema: schema.clone(),
3736 project_field_ids: vec![1, 5, 2],
3737 predicate: None,
3738 deletes: vec![],
3739 partition: None,
3740 partition_spec: None,
3741 name_mapping: None,
3742 })]
3743 .into_iter(),
3744 )) as FileScanTaskStream;
3745
3746 let result = reader
3747 .read(tasks)
3748 .unwrap()
3749 .try_collect::<Vec<RecordBatch>>()
3750 .await
3751 .unwrap();
3752
3753 assert_eq!(result.len(), 1);
3754 let batch = &result[0];
3755 assert_eq!(batch.num_rows(), 2);
3756 assert_eq!(batch.num_columns(), 3);
3757
3758 let result_col0 = batch
3759 .column(0)
3760 .as_primitive::<arrow_array::types::Int32Type>();
3761 assert_eq!(result_col0.value(0), 1);
3762 assert_eq!(result_col0.value(1), 2);
3763
3764 let result_newcol = batch
3766 .column(1)
3767 .as_primitive::<arrow_array::types::Int32Type>();
3768 assert_eq!(result_newcol.null_count(), 2);
3769 assert!(result_newcol.is_null(0));
3770 assert!(result_newcol.is_null(1));
3771
3772 let result_col1 = batch
3773 .column(2)
3774 .as_primitive::<arrow_array::types::Int32Type>();
3775 assert_eq!(result_col1.value(0), 10);
3776 assert_eq!(result_col1.value(1), 20);
3777 }
3778
3779 #[tokio::test]
3783 async fn test_read_parquet_without_field_ids_filter_eliminates_all_rows() {
3784 use arrow_array::{Float64Array, Int32Array};
3785
3786 let schema = Arc::new(
3788 Schema::builder()
3789 .with_schema_id(1)
3790 .with_fields(vec![
3791 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3792 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3793 NestedField::required(3, "value", Type::Primitive(PrimitiveType::Double))
3794 .into(),
3795 ])
3796 .build()
3797 .unwrap(),
3798 );
3799
3800 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3801 Field::new("id", DataType::Int32, false),
3802 Field::new("name", DataType::Utf8, false),
3803 Field::new("value", DataType::Float64, false),
3804 ]));
3805
3806 let tmp_dir = TempDir::new().unwrap();
3807 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3808 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3809
3810 let id_data = Arc::new(Int32Array::from(vec![10, 11, 12])) as ArrayRef;
3812 let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
3813 let value_data = Arc::new(Float64Array::from(vec![100.0, 200.0, 300.0])) as ArrayRef;
3814
3815 let to_write =
3816 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data, value_data])
3817 .unwrap();
3818
3819 let props = WriterProperties::builder()
3820 .set_compression(Compression::SNAPPY)
3821 .build();
3822
3823 let file = File::create(format!("{}/1.parquet", &table_location)).unwrap();
3824 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3825 writer.write(&to_write).expect("Writing batch");
3826 writer.close().unwrap();
3827
3828 let predicate = Reference::new("id").less_than(Datum::int(5));
3830
3831 let reader = ArrowReaderBuilder::new(file_io)
3833 .with_row_group_filtering_enabled(true)
3834 .with_row_selection_enabled(true)
3835 .build();
3836
3837 let tasks = Box::pin(futures::stream::iter(
3838 vec![Ok(FileScanTask {
3839 start: 0,
3840 length: 0,
3841 record_count: None,
3842 data_file_path: format!("{}/1.parquet", table_location),
3843 data_file_format: DataFileFormat::Parquet,
3844 schema: schema.clone(),
3845 project_field_ids: vec![1, 2, 3],
3846 predicate: Some(predicate.bind(schema, true).unwrap()),
3847 deletes: vec![],
3848 partition: None,
3849 partition_spec: None,
3850 name_mapping: None,
3851 })]
3852 .into_iter(),
3853 )) as FileScanTaskStream;
3854
3855 let result = reader
3857 .read(tasks)
3858 .unwrap()
3859 .try_collect::<Vec<RecordBatch>>()
3860 .await
3861 .unwrap();
3862
3863 assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0));
3865 }
3866
3867 #[tokio::test]
3912 async fn test_bucket_partitioning_reads_source_column_from_file() {
3913 use arrow_array::Int32Array;
3914
3915 use crate::spec::{Literal, PartitionSpec, Struct, Transform};
3916
3917 let schema = Arc::new(
3919 Schema::builder()
3920 .with_schema_id(0)
3921 .with_fields(vec![
3922 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3923 NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3924 ])
3925 .build()
3926 .unwrap(),
3927 );
3928
3929 let partition_spec = Arc::new(
3931 PartitionSpec::builder(schema.clone())
3932 .with_spec_id(0)
3933 .add_partition_field("id", "id_bucket", Transform::Bucket(4))
3934 .unwrap()
3935 .build()
3936 .unwrap(),
3937 );
3938
3939 let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]);
3941
3942 let arrow_schema = Arc::new(ArrowSchema::new(vec![
3944 Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3945 PARQUET_FIELD_ID_META_KEY.to_string(),
3946 "1".to_string(),
3947 )])),
3948 Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
3949 PARQUET_FIELD_ID_META_KEY.to_string(),
3950 "2".to_string(),
3951 )])),
3952 ]));
3953
3954 let tmp_dir = TempDir::new().unwrap();
3956 let table_location = tmp_dir.path().to_str().unwrap().to_string();
3957 let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3958
3959 let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as ArrayRef;
3960 let name_data =
3961 Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef;
3962
3963 let to_write =
3964 RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data]).unwrap();
3965
3966 let props = WriterProperties::builder()
3967 .set_compression(Compression::SNAPPY)
3968 .build();
3969 let file = File::create(format!("{}/data.parquet", &table_location)).unwrap();
3970 let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
3971 writer.write(&to_write).expect("Writing batch");
3972 writer.close().unwrap();
3973
3974 let reader = ArrowReaderBuilder::new(file_io).build();
3976 let tasks = Box::pin(futures::stream::iter(
3977 vec![Ok(FileScanTask {
3978 start: 0,
3979 length: 0,
3980 record_count: None,
3981 data_file_path: format!("{}/data.parquet", table_location),
3982 data_file_format: DataFileFormat::Parquet,
3983 schema: schema.clone(),
3984 project_field_ids: vec![1, 2],
3985 predicate: None,
3986 deletes: vec![],
3987 partition: Some(partition_data),
3988 partition_spec: Some(partition_spec),
3989 name_mapping: None,
3990 })]
3991 .into_iter(),
3992 )) as FileScanTaskStream;
3993
3994 let result = reader
3995 .read(tasks)
3996 .unwrap()
3997 .try_collect::<Vec<RecordBatch>>()
3998 .await
3999 .unwrap();
4000
4001 assert_eq!(result.len(), 1);
4003 let batch = &result[0];
4004
4005 assert_eq!(batch.num_columns(), 2);
4006 assert_eq!(batch.num_rows(), 4);
4007
4008 let id_col = batch
4011 .column(0)
4012 .as_primitive::<arrow_array::types::Int32Type>();
4013 assert_eq!(id_col.value(0), 1);
4014 assert_eq!(id_col.value(1), 5);
4015 assert_eq!(id_col.value(2), 9);
4016 assert_eq!(id_col.value(3), 13);
4017
4018 let name_col = batch.column(1).as_string::<i32>();
4019 assert_eq!(name_col.value(0), "Alice");
4020 assert_eq!(name_col.value(1), "Bob");
4021 assert_eq!(name_col.value(2), "Charlie");
4022 assert_eq!(name_col.value(3), "Dave");
4023 }
4024}