1use std::ops::Range;
21use std::sync::Arc;
22
23use bytes::Bytes;
24use futures::future::BoxFuture;
25use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
26use parquet::arrow::arrow_reader::ArrowReaderOptions;
27use parquet::arrow::async_reader::AsyncFileReader;
28use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
29
30use super::ParquetReadOptions;
31use crate::io::{FileMetadata, FileRead};
32
33pub struct ArrowFileReader {
35 meta: FileMetadata,
36 parquet_read_options: ParquetReadOptions,
37 r: Box<dyn FileRead>,
38}
39
40impl ArrowFileReader {
41 pub fn new(meta: FileMetadata, r: Box<dyn FileRead>) -> Self {
43 Self {
44 meta,
45 parquet_read_options: ParquetReadOptions::builder().build(),
46 r,
47 }
48 }
49
50 pub(crate) fn with_parquet_read_options(mut self, options: ParquetReadOptions) -> Self {
52 self.parquet_read_options = options;
53 self
54 }
55}
56
57impl AsyncFileReader for ArrowFileReader {
58 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
59 Box::pin(
60 self.r
61 .read(range.start..range.end)
62 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
63 )
64 }
65
66 fn get_byte_ranges(
71 &mut self,
72 ranges: Vec<Range<u64>>,
73 ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
74 let coalesce_bytes = self.parquet_read_options.range_coalesce_bytes();
75 let concurrency = self.parquet_read_options.range_fetch_concurrency().max(1);
76
77 async move {
78 let fetch_ranges = merge_ranges(&ranges, coalesce_bytes);
80 let r = &self.r;
81
82 let fetched: Vec<Bytes> = futures::stream::iter(fetch_ranges.iter().cloned())
84 .map(|range| async move {
85 r.read(range)
86 .await
87 .map_err(|e| parquet::errors::ParquetError::External(Box::new(e)))
88 })
89 .buffered(concurrency)
90 .try_collect()
91 .await?;
92
93 Ok(ranges
95 .iter()
96 .map(|range| {
97 let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1;
98 let fetch_range = &fetch_ranges[idx];
99 let fetch_bytes = &fetched[idx];
100 let start = (range.start - fetch_range.start) as usize;
101 let end = (range.end - fetch_range.start) as usize;
102 fetch_bytes.slice(start..end.min(fetch_bytes.len()))
103 })
104 .collect())
105 }
106 .boxed()
107 }
108
109 fn get_metadata(
112 &mut self,
113 _options: Option<&'_ ArrowReaderOptions>,
114 ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
115 async move {
116 let reader = ParquetMetaDataReader::new()
117 .with_prefetch_hint(self.parquet_read_options.metadata_size_hint())
118 .with_page_index_policy(PageIndexPolicy::from(
120 self.parquet_read_options.preload_page_index(),
121 ))
122 .with_column_index_policy(PageIndexPolicy::from(
123 self.parquet_read_options.preload_column_index(),
124 ))
125 .with_offset_index_policy(PageIndexPolicy::from(
126 self.parquet_read_options.preload_offset_index(),
127 ));
128 let size = self.meta.size;
129 let meta = reader.load_and_finish(self, size).await?;
130
131 Ok(Arc::new(meta))
132 }
133 .boxed()
134 }
135}
136
137fn merge_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
140 if ranges.is_empty() {
141 return vec![];
142 }
143
144 let mut ranges = ranges.to_vec();
145 ranges.sort_unstable_by_key(|r| r.start);
146
147 let mut merged = Vec::with_capacity(ranges.len());
148 let mut start_idx = 0;
149 let mut end_idx = 1;
150
151 while start_idx != ranges.len() {
152 let mut range_end = ranges[start_idx].end;
153
154 while end_idx != ranges.len()
155 && ranges[end_idx]
156 .start
157 .checked_sub(range_end)
158 .map(|delta| delta <= coalesce)
159 .unwrap_or(true)
160 {
161 range_end = range_end.max(ranges[end_idx].end);
162 end_idx += 1;
163 }
164
165 merged.push(ranges[start_idx].start..range_end);
166 start_idx = end_idx;
167 end_idx += 1;
168 }
169
170 merged
171}
172
173#[cfg(test)]
174mod tests {
175 use std::ops::Range;
176
177 use parquet::arrow::async_reader::AsyncFileReader;
178
179 use super::{ArrowFileReader, ParquetReadOptions, merge_ranges};
180 use crate::io::{FileMetadata, FileRead};
181
182 #[test]
183 fn test_merge_ranges_empty() {
184 assert_eq!(merge_ranges(&[], 1024), Vec::<Range<u64>>::new());
185 }
186
187 #[test]
188 fn test_merge_ranges_no_coalesce() {
189 let ranges = vec![0..100, 1_000_000..1_000_100];
191 let merged = merge_ranges(&ranges, 1024);
192 assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]);
193 }
194
195 #[test]
196 fn test_merge_ranges_coalesce() {
197 let ranges = vec![0..100, 200..300, 500..600];
199 let merged = merge_ranges(&ranges, 1024);
200 assert_eq!(merged, vec![0..600]);
201 }
202
203 #[test]
204 fn test_merge_ranges_overlapping() {
205 let ranges = vec![0..200, 100..300];
206 let merged = merge_ranges(&ranges, 0);
207 assert_eq!(merged, vec![0..300]);
208 }
209
210 #[test]
211 fn test_merge_ranges_unsorted() {
212 let ranges = vec![500..600, 0..100, 200..300];
213 let merged = merge_ranges(&ranges, 1024);
214 assert_eq!(merged, vec![0..600]);
215 }
216
217 struct MockFileRead {
219 data: bytes::Bytes,
220 }
221
222 impl MockFileRead {
223 fn new(size: usize) -> Self {
224 let data: Vec<u8> = (0..size).map(|i| (i % 256) as u8).collect();
226 Self {
227 data: bytes::Bytes::from(data),
228 }
229 }
230 }
231
232 #[async_trait::async_trait]
233 impl FileRead for MockFileRead {
234 async fn read(&self, range: Range<u64>) -> crate::Result<bytes::Bytes> {
235 Ok(self.data.slice(range.start as usize..range.end as usize))
236 }
237 }
238
239 #[tokio::test]
240 async fn test_get_byte_ranges_no_coalesce() {
241 let mock = MockFileRead::new(2048);
242 let expected_0 = mock.data.slice(0..100);
243 let expected_1 = mock.data.slice(1500..1600);
244
245 let mut reader = ArrowFileReader::new(FileMetadata { size: 2048 }, Box::new(mock))
246 .with_parquet_read_options(
247 ParquetReadOptions::builder()
248 .with_range_coalesce_bytes(0)
249 .build(),
250 );
251
252 let result = reader
253 .get_byte_ranges(vec![0..100, 1500..1600])
254 .await
255 .unwrap();
256
257 assert_eq!(result.len(), 2);
258 assert_eq!(result[0], expected_0);
259 assert_eq!(result[1], expected_1);
260 }
261
262 #[tokio::test]
263 async fn test_get_byte_ranges_with_coalesce() {
264 let mock = MockFileRead::new(1024);
265 let expected_0 = mock.data.slice(0..100);
266 let expected_1 = mock.data.slice(200..300);
267 let expected_2 = mock.data.slice(500..600);
268
269 let mut reader = ArrowFileReader::new(FileMetadata { size: 1024 }, Box::new(mock))
270 .with_parquet_read_options(
271 ParquetReadOptions::builder()
272 .with_range_coalesce_bytes(1024)
273 .build(),
274 );
275
276 let result = reader
278 .get_byte_ranges(vec![0..100, 200..300, 500..600])
279 .await
280 .unwrap();
281
282 assert_eq!(result.len(), 3);
283 assert_eq!(result[0], expected_0);
284 assert_eq!(result[1], expected_1);
285 assert_eq!(result[2], expected_2);
286 }
287
288 #[tokio::test]
289 async fn test_get_byte_ranges_empty() {
290 let mock = MockFileRead::new(1024);
291 let mut reader = ArrowFileReader::new(FileMetadata { size: 1024 }, Box::new(mock));
292
293 let result = reader.get_byte_ranges(vec![]).await.unwrap();
294 assert!(result.is_empty());
295 }
296
297 #[tokio::test]
298 async fn test_get_byte_ranges_coalesce_max() {
299 let mock = MockFileRead::new(2048);
300 let expected_0 = mock.data.slice(0..100);
301 let expected_1 = mock.data.slice(1500..1600);
302
303 let mut reader = ArrowFileReader::new(FileMetadata { size: 2048 }, Box::new(mock))
304 .with_parquet_read_options(
305 ParquetReadOptions::builder()
306 .with_range_coalesce_bytes(u64::MAX)
307 .build(),
308 );
309
310 let result = reader
312 .get_byte_ranges(vec![0..100, 1500..1600])
313 .await
314 .unwrap();
315
316 assert_eq!(result.len(), 2);
317 assert_eq!(result[0], expected_0);
318 assert_eq!(result[1], expected_1);
319 }
320
321 #[tokio::test]
322 async fn test_get_byte_ranges_concurrency_zero() {
323 let mock = MockFileRead::new(1024);
325 let expected = mock.data.slice(0..100);
326
327 let mut reader = ArrowFileReader::new(FileMetadata { size: 1024 }, Box::new(mock))
328 .with_parquet_read_options(
329 ParquetReadOptions::builder()
330 .with_range_fetch_concurrency(0)
331 .build(),
332 );
333
334 let result = reader
335 .get_byte_ranges(vec![0..100, 200..300])
336 .await
337 .unwrap();
338 assert_eq!(result.len(), 2);
339 assert_eq!(result[0], expected);
340 }
341
342 #[tokio::test]
343 async fn test_get_byte_ranges_concurrency_one() {
344 let mock = MockFileRead::new(2048);
345 let expected_0 = mock.data.slice(0..100);
346 let expected_1 = mock.data.slice(500..600);
347 let expected_2 = mock.data.slice(1500..1600);
348
349 let mut reader = ArrowFileReader::new(FileMetadata { size: 2048 }, Box::new(mock))
350 .with_parquet_read_options(
351 ParquetReadOptions::builder()
352 .with_range_coalesce_bytes(0)
353 .with_range_fetch_concurrency(1)
354 .build(),
355 );
356
357 let result = reader
359 .get_byte_ranges(vec![0..100, 500..600, 1500..1600])
360 .await
361 .unwrap();
362
363 assert_eq!(result.len(), 3);
364 assert_eq!(result[0], expected_0);
365 assert_eq!(result[1], expected_1);
366 assert_eq!(result[2], expected_2);
367 }
368}