iceberg/arrow/reader/
mod.rs1use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
21use crate::io::FileIO;
22use crate::util::available_parallelism;
23
24const DEFAULT_RANGE_COALESCE_BYTES: u64 = 1024 * 1024;
27
28const DEFAULT_RANGE_FETCH_CONCURRENCY: usize = 10;
31
32const DEFAULT_METADATA_SIZE_HINT: usize = 512 * 1024;
35
36mod file_reader;
37mod options;
38mod pipeline;
39mod positional_deletes;
40mod predicate_visitor;
41mod projection;
42mod row_filter;
43pub use file_reader::ArrowFileReader;
44pub(crate) use options::ParquetReadOptions;
45use predicate_visitor::{CollectFieldIdVisitor, PredicateConverter};
46use projection::{add_fallback_field_ids_to_arrow_schema, apply_name_mapping_to_arrow_schema};
47
48pub struct ArrowReaderBuilder {
50 batch_size: Option<usize>,
51 file_io: FileIO,
52 concurrency_limit_data_files: usize,
53 row_group_filtering_enabled: bool,
54 row_selection_enabled: bool,
55 parquet_read_options: ParquetReadOptions,
56}
57
58impl ArrowReaderBuilder {
59 pub fn new(file_io: FileIO) -> Self {
61 let num_cpus = available_parallelism().get();
62
63 ArrowReaderBuilder {
64 batch_size: None,
65 file_io,
66 concurrency_limit_data_files: num_cpus,
67 row_group_filtering_enabled: true,
68 row_selection_enabled: false,
69 parquet_read_options: ParquetReadOptions::builder().build(),
70 }
71 }
72
73 pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
75 self.concurrency_limit_data_files = val;
76 self
77 }
78
79 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
82 self.batch_size = Some(batch_size);
83 self
84 }
85
86 pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
88 self.row_group_filtering_enabled = row_group_filtering_enabled;
89 self
90 }
91
92 pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
94 self.row_selection_enabled = row_selection_enabled;
95 self
96 }
97
98 pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
103 self.parquet_read_options.metadata_size_hint = Some(metadata_size_hint);
104 self
105 }
106
107 pub fn with_range_coalesce_bytes(mut self, range_coalesce_bytes: u64) -> Self {
112 self.parquet_read_options.range_coalesce_bytes = range_coalesce_bytes;
113 self
114 }
115
116 pub fn with_range_fetch_concurrency(mut self, range_fetch_concurrency: usize) -> Self {
120 self.parquet_read_options.range_fetch_concurrency = range_fetch_concurrency;
121 self
122 }
123
124 pub fn build(self) -> ArrowReader {
126 ArrowReader {
127 batch_size: self.batch_size,
128 file_io: self.file_io.clone(),
129 delete_file_loader: CachingDeleteFileLoader::new(
130 self.file_io.clone(),
131 self.concurrency_limit_data_files,
132 ),
133 concurrency_limit_data_files: self.concurrency_limit_data_files,
134 row_group_filtering_enabled: self.row_group_filtering_enabled,
135 row_selection_enabled: self.row_selection_enabled,
136 parquet_read_options: self.parquet_read_options,
137 }
138 }
139}
140
141#[derive(Clone)]
143pub struct ArrowReader {
144 batch_size: Option<usize>,
145 file_io: FileIO,
146 delete_file_loader: CachingDeleteFileLoader,
147
148 concurrency_limit_data_files: usize,
150
151 row_group_filtering_enabled: bool,
152 row_selection_enabled: bool,
153 parquet_read_options: ParquetReadOptions,
154}