iceberg/arrow/reader/
mod.rs1use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
21use crate::io::FileIO;
22use crate::runtime::Runtime;
23use crate::util::available_parallelism;
24
25const DEFAULT_RANGE_COALESCE_BYTES: u64 = 1024 * 1024;
28
29const DEFAULT_RANGE_FETCH_CONCURRENCY: usize = 10;
32
33const DEFAULT_METADATA_SIZE_HINT: usize = 512 * 1024;
36
37mod file_reader;
38mod options;
39mod pipeline;
40mod positional_deletes;
41mod predicate_visitor;
42mod projection;
43mod row_filter;
44pub use file_reader::ArrowFileReader;
45pub(crate) use options::ParquetReadOptions;
46use predicate_visitor::{CollectFieldIdVisitor, PredicateConverter};
47use projection::{add_fallback_field_ids_to_arrow_schema, apply_name_mapping_to_arrow_schema};
48
49pub struct ArrowReaderBuilder {
51 batch_size: Option<usize>,
52 file_io: FileIO,
53 concurrency_limit_data_files: usize,
54 row_group_filtering_enabled: bool,
55 row_selection_enabled: bool,
56 parquet_read_options: ParquetReadOptions,
57 runtime: Runtime,
58}
59
60impl ArrowReaderBuilder {
61 pub fn new(file_io: FileIO, runtime: Runtime) -> Self {
63 let num_cpus = available_parallelism().get();
64
65 ArrowReaderBuilder {
66 batch_size: None,
67 file_io,
68 concurrency_limit_data_files: num_cpus,
69 row_group_filtering_enabled: true,
70 row_selection_enabled: false,
71 parquet_read_options: ParquetReadOptions::builder().build(),
72 runtime,
73 }
74 }
75
76 pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
78 self.concurrency_limit_data_files = val;
79 self
80 }
81
82 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
85 self.batch_size = Some(batch_size);
86 self
87 }
88
89 pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
91 self.row_group_filtering_enabled = row_group_filtering_enabled;
92 self
93 }
94
95 pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
97 self.row_selection_enabled = row_selection_enabled;
98 self
99 }
100
101 pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
106 self.parquet_read_options.metadata_size_hint = Some(metadata_size_hint);
107 self
108 }
109
110 pub fn with_range_coalesce_bytes(mut self, range_coalesce_bytes: u64) -> Self {
115 self.parquet_read_options.range_coalesce_bytes = range_coalesce_bytes;
116 self
117 }
118
119 pub fn with_range_fetch_concurrency(mut self, range_fetch_concurrency: usize) -> Self {
123 self.parquet_read_options.range_fetch_concurrency = range_fetch_concurrency;
124 self
125 }
126
127 pub fn build(self) -> ArrowReader {
129 ArrowReader {
130 batch_size: self.batch_size,
131 file_io: self.file_io.clone(),
132 delete_file_loader: CachingDeleteFileLoader::new(
133 self.file_io.clone(),
134 self.concurrency_limit_data_files,
135 self.runtime.clone(),
136 ),
137 concurrency_limit_data_files: self.concurrency_limit_data_files,
138 row_group_filtering_enabled: self.row_group_filtering_enabled,
139 row_selection_enabled: self.row_selection_enabled,
140 parquet_read_options: self.parquet_read_options,
141 }
142 }
143}
144
145#[derive(Clone)]
147pub struct ArrowReader {
148 batch_size: Option<usize>,
149 file_io: FileIO,
150 delete_file_loader: CachingDeleteFileLoader,
151
152 concurrency_limit_data_files: usize,
154
155 row_group_filtering_enabled: bool,
156 row_selection_enabled: bool,
157 parquet_read_options: ParquetReadOptions,
158}