iceberg/arrow/reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Parquet file data reader
19
20use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
21use crate::io::FileIO;
22use crate::runtime::Runtime;
23use crate::util::available_parallelism;
24
25/// Default gap between byte ranges below which they are coalesced into a
26/// single request. Matches object_store's `OBJECT_STORE_COALESCE_DEFAULT`.
27const DEFAULT_RANGE_COALESCE_BYTES: u64 = 1024 * 1024;
28
29/// Default maximum number of coalesced byte ranges fetched concurrently.
30/// Matches object_store's `OBJECT_STORE_COALESCE_PARALLEL`.
31const DEFAULT_RANGE_FETCH_CONCURRENCY: usize = 10;
32
33/// Default number of bytes to prefetch when parsing Parquet footer metadata.
34/// Matches DataFusion's default `ParquetOptions::metadata_size_hint`.
35const DEFAULT_METADATA_SIZE_HINT: usize = 512 * 1024;
36
37mod file_reader;
38mod options;
39mod pipeline;
40mod positional_deletes;
41mod predicate_visitor;
42mod projection;
43mod row_filter;
44pub use file_reader::ArrowFileReader;
45pub(crate) use options::ParquetReadOptions;
46use predicate_visitor::{CollectFieldIdVisitor, PredicateConverter};
47use projection::{add_fallback_field_ids_to_arrow_schema, apply_name_mapping_to_arrow_schema};
48
49/// Builder to create ArrowReader
50pub struct ArrowReaderBuilder {
51    batch_size: Option<usize>,
52    file_io: FileIO,
53    concurrency_limit_data_files: usize,
54    row_group_filtering_enabled: bool,
55    row_selection_enabled: bool,
56    parquet_read_options: ParquetReadOptions,
57    runtime: Runtime,
58}
59
60impl ArrowReaderBuilder {
61    /// Create a new ArrowReaderBuilder
62    pub fn new(file_io: FileIO, runtime: Runtime) -> Self {
63        let num_cpus = available_parallelism().get();
64
65        ArrowReaderBuilder {
66            batch_size: None,
67            file_io,
68            concurrency_limit_data_files: num_cpus,
69            row_group_filtering_enabled: true,
70            row_selection_enabled: false,
71            parquet_read_options: ParquetReadOptions::builder().build(),
72            runtime,
73        }
74    }
75
76    /// Sets the max number of in flight data files that are being fetched
77    pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
78        self.concurrency_limit_data_files = val;
79        self
80    }
81
82    /// Sets the desired size of batches in the response
83    /// to something other than the default
84    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
85        self.batch_size = Some(batch_size);
86        self
87    }
88
89    /// Determines whether to enable row group filtering.
90    pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
91        self.row_group_filtering_enabled = row_group_filtering_enabled;
92        self
93    }
94
95    /// Determines whether to enable row selection.
96    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
97        self.row_selection_enabled = row_selection_enabled;
98        self
99    }
100
101    /// Provide a hint as to the number of bytes to prefetch for parsing the Parquet metadata
102    ///
103    /// This hint can help reduce the number of fetch requests. For more details see the
104    /// [ParquetMetaDataReader documentation](https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html#method.with_prefetch_hint).
105    pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
106        self.parquet_read_options.metadata_size_hint = Some(metadata_size_hint);
107        self
108    }
109
110    /// Sets the gap threshold for merging nearby byte ranges into a single request.
111    /// Ranges with gaps smaller than this value will be coalesced.
112    ///
113    /// Defaults to 1 MiB, matching object_store's OBJECT_STORE_COALESCE_DEFAULT.
114    pub fn with_range_coalesce_bytes(mut self, range_coalesce_bytes: u64) -> Self {
115        self.parquet_read_options.range_coalesce_bytes = range_coalesce_bytes;
116        self
117    }
118
119    /// Sets the maximum number of merged byte ranges to fetch concurrently.
120    ///
121    /// Defaults to 10, matching object_store's OBJECT_STORE_COALESCE_PARALLEL.
122    pub fn with_range_fetch_concurrency(mut self, range_fetch_concurrency: usize) -> Self {
123        self.parquet_read_options.range_fetch_concurrency = range_fetch_concurrency;
124        self
125    }
126
127    /// Build the ArrowReader.
128    pub fn build(self) -> ArrowReader {
129        ArrowReader {
130            batch_size: self.batch_size,
131            file_io: self.file_io.clone(),
132            delete_file_loader: CachingDeleteFileLoader::new(
133                self.file_io.clone(),
134                self.concurrency_limit_data_files,
135                self.runtime.clone(),
136            ),
137            concurrency_limit_data_files: self.concurrency_limit_data_files,
138            row_group_filtering_enabled: self.row_group_filtering_enabled,
139            row_selection_enabled: self.row_selection_enabled,
140            parquet_read_options: self.parquet_read_options,
141        }
142    }
143}
144
145/// Reads data from Parquet files
146#[derive(Clone)]
147pub struct ArrowReader {
148    batch_size: Option<usize>,
149    file_io: FileIO,
150    delete_file_loader: CachingDeleteFileLoader,
151
152    /// the maximum number of data files that can be fetched at the same time
153    concurrency_limit_data_files: usize,
154
155    row_group_filtering_enabled: bool,
156    row_selection_enabled: bool,
157    parquet_read_options: ParquetReadOptions,
158}