iceberg/arrow/reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Parquet file data reader
19
20use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
21use crate::io::FileIO;
22use crate::util::available_parallelism;
23
24/// Default gap between byte ranges below which they are coalesced into a
25/// single request. Matches object_store's `OBJECT_STORE_COALESCE_DEFAULT`.
26const DEFAULT_RANGE_COALESCE_BYTES: u64 = 1024 * 1024;
27
28/// Default maximum number of coalesced byte ranges fetched concurrently.
29/// Matches object_store's `OBJECT_STORE_COALESCE_PARALLEL`.
30const DEFAULT_RANGE_FETCH_CONCURRENCY: usize = 10;
31
32/// Default number of bytes to prefetch when parsing Parquet footer metadata.
33/// Matches DataFusion's default `ParquetOptions::metadata_size_hint`.
34const DEFAULT_METADATA_SIZE_HINT: usize = 512 * 1024;
35
36mod file_reader;
37mod options;
38mod pipeline;
39mod positional_deletes;
40mod predicate_visitor;
41mod projection;
42mod row_filter;
43pub use file_reader::ArrowFileReader;
44pub(crate) use options::ParquetReadOptions;
45use predicate_visitor::{CollectFieldIdVisitor, PredicateConverter};
46use projection::{add_fallback_field_ids_to_arrow_schema, apply_name_mapping_to_arrow_schema};
47
48/// Builder to create ArrowReader
49pub struct ArrowReaderBuilder {
50    batch_size: Option<usize>,
51    file_io: FileIO,
52    concurrency_limit_data_files: usize,
53    row_group_filtering_enabled: bool,
54    row_selection_enabled: bool,
55    parquet_read_options: ParquetReadOptions,
56}
57
58impl ArrowReaderBuilder {
59    /// Create a new ArrowReaderBuilder
60    pub fn new(file_io: FileIO) -> Self {
61        let num_cpus = available_parallelism().get();
62
63        ArrowReaderBuilder {
64            batch_size: None,
65            file_io,
66            concurrency_limit_data_files: num_cpus,
67            row_group_filtering_enabled: true,
68            row_selection_enabled: false,
69            parquet_read_options: ParquetReadOptions::builder().build(),
70        }
71    }
72
73    /// Sets the max number of in flight data files that are being fetched
74    pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
75        self.concurrency_limit_data_files = val;
76        self
77    }
78
79    /// Sets the desired size of batches in the response
80    /// to something other than the default
81    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
82        self.batch_size = Some(batch_size);
83        self
84    }
85
86    /// Determines whether to enable row group filtering.
87    pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
88        self.row_group_filtering_enabled = row_group_filtering_enabled;
89        self
90    }
91
92    /// Determines whether to enable row selection.
93    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
94        self.row_selection_enabled = row_selection_enabled;
95        self
96    }
97
98    /// Provide a hint as to the number of bytes to prefetch for parsing the Parquet metadata
99    ///
100    /// This hint can help reduce the number of fetch requests. For more details see the
101    /// [ParquetMetaDataReader documentation](https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html#method.with_prefetch_hint).
102    pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
103        self.parquet_read_options.metadata_size_hint = Some(metadata_size_hint);
104        self
105    }
106
107    /// Sets the gap threshold for merging nearby byte ranges into a single request.
108    /// Ranges with gaps smaller than this value will be coalesced.
109    ///
110    /// Defaults to 1 MiB, matching object_store's OBJECT_STORE_COALESCE_DEFAULT.
111    pub fn with_range_coalesce_bytes(mut self, range_coalesce_bytes: u64) -> Self {
112        self.parquet_read_options.range_coalesce_bytes = range_coalesce_bytes;
113        self
114    }
115
116    /// Sets the maximum number of merged byte ranges to fetch concurrently.
117    ///
118    /// Defaults to 10, matching object_store's OBJECT_STORE_COALESCE_PARALLEL.
119    pub fn with_range_fetch_concurrency(mut self, range_fetch_concurrency: usize) -> Self {
120        self.parquet_read_options.range_fetch_concurrency = range_fetch_concurrency;
121        self
122    }
123
124    /// Build the ArrowReader.
125    pub fn build(self) -> ArrowReader {
126        ArrowReader {
127            batch_size: self.batch_size,
128            file_io: self.file_io.clone(),
129            delete_file_loader: CachingDeleteFileLoader::new(
130                self.file_io.clone(),
131                self.concurrency_limit_data_files,
132            ),
133            concurrency_limit_data_files: self.concurrency_limit_data_files,
134            row_group_filtering_enabled: self.row_group_filtering_enabled,
135            row_selection_enabled: self.row_selection_enabled,
136            parquet_read_options: self.parquet_read_options,
137        }
138    }
139}
140
141/// Reads data from Parquet files
142#[derive(Clone)]
143pub struct ArrowReader {
144    batch_size: Option<usize>,
145    file_io: FileIO,
146    delete_file_loader: CachingDeleteFileLoader,
147
148    /// the maximum number of data files that can be fetched at the same time
149    concurrency_limit_data_files: usize,
150
151    row_group_filtering_enabled: bool,
152    row_selection_enabled: bool,
153    parquet_read_options: ParquetReadOptions,
154}