1use std::ops::Range;
21use std::sync::Arc;
22
23use bytes::Bytes;
24use futures::future::BoxFuture;
25use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
26use parquet::arrow::arrow_reader::ArrowReaderOptions;
27use parquet::arrow::async_reader::AsyncFileReader;
28use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
29
30use super::ParquetReadOptions;
31use crate::io::{FileMetadata, FileRead};
32
33pub struct ArrowFileReader {
35 meta: FileMetadata,
36 parquet_read_options: ParquetReadOptions,
37 r: Box<dyn FileRead>,
38}
39
40impl ArrowFileReader {
41 pub fn new(meta: FileMetadata, r: Box<dyn FileRead>) -> Self {
43 Self {
44 meta,
45 parquet_read_options: ParquetReadOptions::builder().build(),
46 r,
47 }
48 }
49
50 pub(crate) fn with_parquet_read_options(mut self, options: ParquetReadOptions) -> Self {
52 self.parquet_read_options = options;
53 self
54 }
55}
56
57impl AsyncFileReader for ArrowFileReader {
58 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
59 Box::pin(
60 self.r
61 .read(range.start..range.end)
62 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
63 )
64 }
65
66 fn get_byte_ranges(
71 &mut self,
72 ranges: Vec<Range<u64>>,
73 ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
74 let coalesce_bytes = self.parquet_read_options.range_coalesce_bytes();
75 let concurrency = self.parquet_read_options.range_fetch_concurrency().max(1);
76
77 async move {
78 let fetch_ranges = merge_ranges(&ranges, coalesce_bytes);
80 let r = &self.r;
81
82 let fetched: Vec<Bytes> = futures::stream::iter(fetch_ranges.iter().cloned())
84 .map(|range| async move {
85 r.read(range)
86 .await
87 .map_err(|e| parquet::errors::ParquetError::External(Box::new(e)))
88 })
89 .buffered(concurrency)
90 .try_collect()
91 .await?;
92
93 Ok(ranges
95 .iter()
96 .map(|range| {
97 let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1;
98 let fetch_range = &fetch_ranges[idx];
99 let fetch_bytes = &fetched[idx];
100 let start = (range.start - fetch_range.start) as usize;
101 let end = (range.end - fetch_range.start) as usize;
102 fetch_bytes.slice(start..end.min(fetch_bytes.len()))
103 })
104 .collect())
105 }
106 .boxed()
107 }
108
109 fn get_metadata(
110 &mut self,
111 options: Option<&'_ ArrowReaderOptions>,
112 ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
113 let decryption_properties = options
114 .and_then(|opts| opts.file_decryption_properties())
115 .cloned();
116
117 let metadata_options = options.map(|opts| opts.metadata_options().clone());
118
119 async move {
120 let reader = ParquetMetaDataReader::new()
121 .with_prefetch_hint(self.parquet_read_options.metadata_size_hint())
122 .with_page_index_policy(PageIndexPolicy::from(
124 self.parquet_read_options.preload_page_index(),
125 ))
126 .with_column_index_policy(PageIndexPolicy::from(
127 self.parquet_read_options.preload_column_index(),
128 ))
129 .with_offset_index_policy(PageIndexPolicy::from(
130 self.parquet_read_options.preload_offset_index(),
131 ))
132 .with_metadata_options(metadata_options)
133 .with_decryption_properties(decryption_properties);
134 let size = self.meta.size;
135 let meta = reader.load_and_finish(self, size).await?;
136
137 Ok(Arc::new(meta))
138 }
139 .boxed()
140 }
141}
142
143fn merge_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
146 if ranges.is_empty() {
147 return vec![];
148 }
149
150 let mut ranges = ranges.to_vec();
151 ranges.sort_unstable_by_key(|r| r.start);
152
153 let mut merged = Vec::with_capacity(ranges.len());
154 let mut start_idx = 0;
155 let mut end_idx = 1;
156
157 while start_idx != ranges.len() {
158 let mut range_end = ranges[start_idx].end;
159
160 while end_idx != ranges.len()
161 && ranges[end_idx]
162 .start
163 .checked_sub(range_end)
164 .map(|delta| delta <= coalesce)
165 .unwrap_or(true)
166 {
167 range_end = range_end.max(ranges[end_idx].end);
168 end_idx += 1;
169 }
170
171 merged.push(ranges[start_idx].start..range_end);
172 start_idx = end_idx;
173 end_idx += 1;
174 }
175
176 merged
177}
178
179#[cfg(test)]
180mod tests {
181 use std::ops::Range;
182
183 use parquet::arrow::async_reader::AsyncFileReader;
184
185 use super::{ArrowFileReader, ParquetReadOptions, merge_ranges};
186 use crate::io::{FileMetadata, FileRead};
187
188 #[test]
189 fn test_merge_ranges_empty() {
190 assert_eq!(merge_ranges(&[], 1024), Vec::<Range<u64>>::new());
191 }
192
193 #[test]
194 fn test_merge_ranges_no_coalesce() {
195 let ranges = vec![0..100, 1_000_000..1_000_100];
197 let merged = merge_ranges(&ranges, 1024);
198 assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]);
199 }
200
201 #[test]
202 fn test_merge_ranges_coalesce() {
203 let ranges = vec![0..100, 200..300, 500..600];
205 let merged = merge_ranges(&ranges, 1024);
206 assert_eq!(merged, vec![0..600]);
207 }
208
209 #[test]
210 fn test_merge_ranges_overlapping() {
211 let ranges = vec![0..200, 100..300];
212 let merged = merge_ranges(&ranges, 0);
213 assert_eq!(merged, vec![0..300]);
214 }
215
216 #[test]
217 fn test_merge_ranges_unsorted() {
218 let ranges = vec![500..600, 0..100, 200..300];
219 let merged = merge_ranges(&ranges, 1024);
220 assert_eq!(merged, vec![0..600]);
221 }
222
223 struct MockFileRead {
225 data: bytes::Bytes,
226 }
227
228 impl MockFileRead {
229 fn new(size: usize) -> Self {
230 let data: Vec<u8> = (0..size).map(|i| (i % 256) as u8).collect();
232 Self {
233 data: bytes::Bytes::from(data),
234 }
235 }
236 }
237
238 #[async_trait::async_trait]
239 impl FileRead for MockFileRead {
240 async fn read(&self, range: Range<u64>) -> crate::Result<bytes::Bytes> {
241 Ok(self.data.slice(range.start as usize..range.end as usize))
242 }
243 }
244
245 #[tokio::test]
246 async fn test_get_byte_ranges_no_coalesce() {
247 let mock = MockFileRead::new(2048);
248 let expected_0 = mock.data.slice(0..100);
249 let expected_1 = mock.data.slice(1500..1600);
250
251 let mut reader = ArrowFileReader::new(FileMetadata { size: 2048 }, Box::new(mock))
252 .with_parquet_read_options(
253 ParquetReadOptions::builder()
254 .with_range_coalesce_bytes(0)
255 .build(),
256 );
257
258 let result = reader
259 .get_byte_ranges(vec![0..100, 1500..1600])
260 .await
261 .unwrap();
262
263 assert_eq!(result.len(), 2);
264 assert_eq!(result[0], expected_0);
265 assert_eq!(result[1], expected_1);
266 }
267
268 #[tokio::test]
269 async fn test_get_byte_ranges_with_coalesce() {
270 let mock = MockFileRead::new(1024);
271 let expected_0 = mock.data.slice(0..100);
272 let expected_1 = mock.data.slice(200..300);
273 let expected_2 = mock.data.slice(500..600);
274
275 let mut reader = ArrowFileReader::new(FileMetadata { size: 1024 }, Box::new(mock))
276 .with_parquet_read_options(
277 ParquetReadOptions::builder()
278 .with_range_coalesce_bytes(1024)
279 .build(),
280 );
281
282 let result = reader
284 .get_byte_ranges(vec![0..100, 200..300, 500..600])
285 .await
286 .unwrap();
287
288 assert_eq!(result.len(), 3);
289 assert_eq!(result[0], expected_0);
290 assert_eq!(result[1], expected_1);
291 assert_eq!(result[2], expected_2);
292 }
293
294 #[tokio::test]
295 async fn test_get_byte_ranges_empty() {
296 let mock = MockFileRead::new(1024);
297 let mut reader = ArrowFileReader::new(FileMetadata { size: 1024 }, Box::new(mock));
298
299 let result = reader.get_byte_ranges(vec![]).await.unwrap();
300 assert!(result.is_empty());
301 }
302
303 #[tokio::test]
304 async fn test_get_byte_ranges_coalesce_max() {
305 let mock = MockFileRead::new(2048);
306 let expected_0 = mock.data.slice(0..100);
307 let expected_1 = mock.data.slice(1500..1600);
308
309 let mut reader = ArrowFileReader::new(FileMetadata { size: 2048 }, Box::new(mock))
310 .with_parquet_read_options(
311 ParquetReadOptions::builder()
312 .with_range_coalesce_bytes(u64::MAX)
313 .build(),
314 );
315
316 let result = reader
318 .get_byte_ranges(vec![0..100, 1500..1600])
319 .await
320 .unwrap();
321
322 assert_eq!(result.len(), 2);
323 assert_eq!(result[0], expected_0);
324 assert_eq!(result[1], expected_1);
325 }
326
327 #[tokio::test]
328 async fn test_get_byte_ranges_concurrency_zero() {
329 let mock = MockFileRead::new(1024);
331 let expected = mock.data.slice(0..100);
332
333 let mut reader = ArrowFileReader::new(FileMetadata { size: 1024 }, Box::new(mock))
334 .with_parquet_read_options(
335 ParquetReadOptions::builder()
336 .with_range_fetch_concurrency(0)
337 .build(),
338 );
339
340 let result = reader
341 .get_byte_ranges(vec![0..100, 200..300])
342 .await
343 .unwrap();
344 assert_eq!(result.len(), 2);
345 assert_eq!(result[0], expected);
346 }
347
348 #[tokio::test]
349 async fn test_get_byte_ranges_concurrency_one() {
350 let mock = MockFileRead::new(2048);
351 let expected_0 = mock.data.slice(0..100);
352 let expected_1 = mock.data.slice(500..600);
353 let expected_2 = mock.data.slice(1500..1600);
354
355 let mut reader = ArrowFileReader::new(FileMetadata { size: 2048 }, Box::new(mock))
356 .with_parquet_read_options(
357 ParquetReadOptions::builder()
358 .with_range_coalesce_bytes(0)
359 .with_range_fetch_concurrency(1)
360 .build(),
361 );
362
363 let result = reader
365 .get_byte_ranges(vec![0..100, 500..600, 1500..1600])
366 .await
367 .unwrap();
368
369 assert_eq!(result.len(), 3);
370 assert_eq!(result[0], expected_0);
371 assert_eq!(result[1], expected_1);
372 assert_eq!(result[2], expected_2);
373 }
374}