iceberg/arrow/reader/
file_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Async Parquet file reader that adapts an Iceberg `FileRead` to parquet's `AsyncFileReader`.
19
20use std::ops::Range;
21use std::sync::Arc;
22
23use bytes::Bytes;
24use futures::future::BoxFuture;
25use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
26use parquet::arrow::arrow_reader::ArrowReaderOptions;
27use parquet::arrow::async_reader::AsyncFileReader;
28use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
29
30use super::ParquetReadOptions;
31use crate::io::{FileMetadata, FileRead};
32
33/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader.
34pub struct ArrowFileReader {
35    meta: FileMetadata,
36    parquet_read_options: ParquetReadOptions,
37    r: Box<dyn FileRead>,
38}
39
40impl ArrowFileReader {
41    /// Create a new ArrowFileReader
42    pub fn new(meta: FileMetadata, r: Box<dyn FileRead>) -> Self {
43        Self {
44            meta,
45            parquet_read_options: ParquetReadOptions::builder().build(),
46            r,
47        }
48    }
49
50    /// Configure all Parquet read options.
51    pub(crate) fn with_parquet_read_options(mut self, options: ParquetReadOptions) -> Self {
52        self.parquet_read_options = options;
53        self
54    }
55}
56
57impl AsyncFileReader for ArrowFileReader {
58    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
59        Box::pin(
60            self.r
61                .read(range.start..range.end)
62                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
63        )
64    }
65
66    /// Override the default `get_byte_ranges` which calls `get_bytes` sequentially.
67    /// The parquet reader calls this to fetch column chunks for a row group, so
68    /// without this override each column chunk is a serial round-trip to object storage.
69    /// Adapted from object_store's `coalesce_ranges` in `util.rs`.
70    fn get_byte_ranges(
71        &mut self,
72        ranges: Vec<Range<u64>>,
73    ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
74        let coalesce_bytes = self.parquet_read_options.range_coalesce_bytes();
75        let concurrency = self.parquet_read_options.range_fetch_concurrency().max(1);
76
77        async move {
78            // Merge nearby ranges to reduce the number of object store requests.
79            let fetch_ranges = merge_ranges(&ranges, coalesce_bytes);
80            let r = &self.r;
81
82            // Fetch merged ranges concurrently.
83            let fetched: Vec<Bytes> = futures::stream::iter(fetch_ranges.iter().cloned())
84                .map(|range| async move {
85                    r.read(range)
86                        .await
87                        .map_err(|e| parquet::errors::ParquetError::External(Box::new(e)))
88                })
89                .buffered(concurrency)
90                .try_collect()
91                .await?;
92
93            // Slice the fetched data back into the originally requested ranges.
94            Ok(ranges
95                .iter()
96                .map(|range| {
97                    let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1;
98                    let fetch_range = &fetch_ranges[idx];
99                    let fetch_bytes = &fetched[idx];
100                    let start = (range.start - fetch_range.start) as usize;
101                    let end = (range.end - fetch_range.start) as usize;
102                    fetch_bytes.slice(start..end.min(fetch_bytes.len()))
103                })
104                .collect())
105        }
106        .boxed()
107    }
108
109    // TODO: currently we don't respect `ArrowReaderOptions` cause it don't expose any method to access the option field
110    // we will fix it after `v55.1.0` is released in https://github.com/apache/arrow-rs/issues/7393
111    fn get_metadata(
112        &mut self,
113        _options: Option<&'_ ArrowReaderOptions>,
114    ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
115        async move {
116            let reader = ParquetMetaDataReader::new()
117                .with_prefetch_hint(self.parquet_read_options.metadata_size_hint())
118                // Set the page policy first because it updates both column and offset policies.
119                .with_page_index_policy(PageIndexPolicy::from(
120                    self.parquet_read_options.preload_page_index(),
121                ))
122                .with_column_index_policy(PageIndexPolicy::from(
123                    self.parquet_read_options.preload_column_index(),
124                ))
125                .with_offset_index_policy(PageIndexPolicy::from(
126                    self.parquet_read_options.preload_offset_index(),
127                ));
128            let size = self.meta.size;
129            let meta = reader.load_and_finish(self, size).await?;
130
131            Ok(Arc::new(meta))
132        }
133        .boxed()
134    }
135}
136
137/// Merge overlapping or nearby byte ranges, combining ranges with gaps <= `coalesce` bytes.
138/// Adapted from object_store's `merge_ranges` in `util.rs`.
139fn merge_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
140    if ranges.is_empty() {
141        return vec![];
142    }
143
144    let mut ranges = ranges.to_vec();
145    ranges.sort_unstable_by_key(|r| r.start);
146
147    let mut merged = Vec::with_capacity(ranges.len());
148    let mut start_idx = 0;
149    let mut end_idx = 1;
150
151    while start_idx != ranges.len() {
152        let mut range_end = ranges[start_idx].end;
153
154        while end_idx != ranges.len()
155            && ranges[end_idx]
156                .start
157                .checked_sub(range_end)
158                .map(|delta| delta <= coalesce)
159                .unwrap_or(true)
160        {
161            range_end = range_end.max(ranges[end_idx].end);
162            end_idx += 1;
163        }
164
165        merged.push(ranges[start_idx].start..range_end);
166        start_idx = end_idx;
167        end_idx += 1;
168    }
169
170    merged
171}
172
173#[cfg(test)]
174mod tests {
175    use std::ops::Range;
176
177    use parquet::arrow::async_reader::AsyncFileReader;
178
179    use super::{ArrowFileReader, ParquetReadOptions, merge_ranges};
180    use crate::io::{FileMetadata, FileRead};
181
182    #[test]
183    fn test_merge_ranges_empty() {
184        assert_eq!(merge_ranges(&[], 1024), Vec::<Range<u64>>::new());
185    }
186
187    #[test]
188    fn test_merge_ranges_no_coalesce() {
189        // Ranges far apart should not be merged
190        let ranges = vec![0..100, 1_000_000..1_000_100];
191        let merged = merge_ranges(&ranges, 1024);
192        assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]);
193    }
194
195    #[test]
196    fn test_merge_ranges_coalesce() {
197        // Ranges within the gap threshold should be merged
198        let ranges = vec![0..100, 200..300, 500..600];
199        let merged = merge_ranges(&ranges, 1024);
200        assert_eq!(merged, vec![0..600]);
201    }
202
203    #[test]
204    fn test_merge_ranges_overlapping() {
205        let ranges = vec![0..200, 100..300];
206        let merged = merge_ranges(&ranges, 0);
207        assert_eq!(merged, vec![0..300]);
208    }
209
210    #[test]
211    fn test_merge_ranges_unsorted() {
212        let ranges = vec![500..600, 0..100, 200..300];
213        let merged = merge_ranges(&ranges, 1024);
214        assert_eq!(merged, vec![0..600]);
215    }
216
217    /// Mock FileRead backed by a flat byte buffer.
218    struct MockFileRead {
219        data: bytes::Bytes,
220    }
221
222    impl MockFileRead {
223        fn new(size: usize) -> Self {
224            // Fill with sequential byte values so slices are verifiable.
225            let data: Vec<u8> = (0..size).map(|i| (i % 256) as u8).collect();
226            Self {
227                data: bytes::Bytes::from(data),
228            }
229        }
230    }
231
232    #[async_trait::async_trait]
233    impl FileRead for MockFileRead {
234        async fn read(&self, range: Range<u64>) -> crate::Result<bytes::Bytes> {
235            Ok(self.data.slice(range.start as usize..range.end as usize))
236        }
237    }
238
239    #[tokio::test]
240    async fn test_get_byte_ranges_no_coalesce() {
241        let mock = MockFileRead::new(2048);
242        let expected_0 = mock.data.slice(0..100);
243        let expected_1 = mock.data.slice(1500..1600);
244
245        let mut reader = ArrowFileReader::new(FileMetadata { size: 2048 }, Box::new(mock))
246            .with_parquet_read_options(
247                ParquetReadOptions::builder()
248                    .with_range_coalesce_bytes(0)
249                    .build(),
250            );
251
252        let result = reader
253            .get_byte_ranges(vec![0..100, 1500..1600])
254            .await
255            .unwrap();
256
257        assert_eq!(result.len(), 2);
258        assert_eq!(result[0], expected_0);
259        assert_eq!(result[1], expected_1);
260    }
261
262    #[tokio::test]
263    async fn test_get_byte_ranges_with_coalesce() {
264        let mock = MockFileRead::new(1024);
265        let expected_0 = mock.data.slice(0..100);
266        let expected_1 = mock.data.slice(200..300);
267        let expected_2 = mock.data.slice(500..600);
268
269        let mut reader = ArrowFileReader::new(FileMetadata { size: 1024 }, Box::new(mock))
270            .with_parquet_read_options(
271                ParquetReadOptions::builder()
272                    .with_range_coalesce_bytes(1024)
273                    .build(),
274            );
275
276        // All ranges within coalesce threshold — should merge into one fetch.
277        let result = reader
278            .get_byte_ranges(vec![0..100, 200..300, 500..600])
279            .await
280            .unwrap();
281
282        assert_eq!(result.len(), 3);
283        assert_eq!(result[0], expected_0);
284        assert_eq!(result[1], expected_1);
285        assert_eq!(result[2], expected_2);
286    }
287
288    #[tokio::test]
289    async fn test_get_byte_ranges_empty() {
290        let mock = MockFileRead::new(1024);
291        let mut reader = ArrowFileReader::new(FileMetadata { size: 1024 }, Box::new(mock));
292
293        let result = reader.get_byte_ranges(vec![]).await.unwrap();
294        assert!(result.is_empty());
295    }
296
297    #[tokio::test]
298    async fn test_get_byte_ranges_coalesce_max() {
299        let mock = MockFileRead::new(2048);
300        let expected_0 = mock.data.slice(0..100);
301        let expected_1 = mock.data.slice(1500..1600);
302
303        let mut reader = ArrowFileReader::new(FileMetadata { size: 2048 }, Box::new(mock))
304            .with_parquet_read_options(
305                ParquetReadOptions::builder()
306                    .with_range_coalesce_bytes(u64::MAX)
307                    .build(),
308            );
309
310        // u64::MAX coalesce — all ranges merge into a single fetch.
311        let result = reader
312            .get_byte_ranges(vec![0..100, 1500..1600])
313            .await
314            .unwrap();
315
316        assert_eq!(result.len(), 2);
317        assert_eq!(result[0], expected_0);
318        assert_eq!(result[1], expected_1);
319    }
320
321    #[tokio::test]
322    async fn test_get_byte_ranges_concurrency_zero() {
323        // concurrency=0 is clamped to 1, so this should not hang.
324        let mock = MockFileRead::new(1024);
325        let expected = mock.data.slice(0..100);
326
327        let mut reader = ArrowFileReader::new(FileMetadata { size: 1024 }, Box::new(mock))
328            .with_parquet_read_options(
329                ParquetReadOptions::builder()
330                    .with_range_fetch_concurrency(0)
331                    .build(),
332            );
333
334        let result = reader
335            .get_byte_ranges(vec![0..100, 200..300])
336            .await
337            .unwrap();
338        assert_eq!(result.len(), 2);
339        assert_eq!(result[0], expected);
340    }
341
342    #[tokio::test]
343    async fn test_get_byte_ranges_concurrency_one() {
344        let mock = MockFileRead::new(2048);
345        let expected_0 = mock.data.slice(0..100);
346        let expected_1 = mock.data.slice(500..600);
347        let expected_2 = mock.data.slice(1500..1600);
348
349        let mut reader = ArrowFileReader::new(FileMetadata { size: 2048 }, Box::new(mock))
350            .with_parquet_read_options(
351                ParquetReadOptions::builder()
352                    .with_range_coalesce_bytes(0)
353                    .with_range_fetch_concurrency(1)
354                    .build(),
355            );
356
357        // concurrency=1 with no coalescing — sequential fetches.
358        let result = reader
359            .get_byte_ranges(vec![0..100, 500..600, 1500..1600])
360            .await
361            .unwrap();
362
363        assert_eq!(result.len(), 3);
364        assert_eq!(result[0], expected_0);
365        assert_eq!(result[1], expected_1);
366        assert_eq!(result[2], expected_2);
367    }
368}