iceberg/arrow/reader/
file_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Async Parquet file reader that adapts an Iceberg `FileRead` to parquet's `AsyncFileReader`.
19
20use std::ops::Range;
21use std::sync::Arc;
22
23use bytes::Bytes;
24use futures::future::BoxFuture;
25use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
26use parquet::arrow::arrow_reader::ArrowReaderOptions;
27use parquet::arrow::async_reader::AsyncFileReader;
28use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
29
30use super::ParquetReadOptions;
31use crate::io::{FileMetadata, FileRead};
32
33/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader.
34pub struct ArrowFileReader {
35    meta: FileMetadata,
36    parquet_read_options: ParquetReadOptions,
37    r: Box<dyn FileRead>,
38}
39
40impl ArrowFileReader {
41    /// Create a new ArrowFileReader
42    pub fn new(meta: FileMetadata, r: Box<dyn FileRead>) -> Self {
43        Self {
44            meta,
45            parquet_read_options: ParquetReadOptions::builder().build(),
46            r,
47        }
48    }
49
50    /// Configure all Parquet read options.
51    pub(crate) fn with_parquet_read_options(mut self, options: ParquetReadOptions) -> Self {
52        self.parquet_read_options = options;
53        self
54    }
55}
56
57impl AsyncFileReader for ArrowFileReader {
58    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
59        Box::pin(
60            self.r
61                .read(range.start..range.end)
62                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
63        )
64    }
65
66    /// Override the default `get_byte_ranges` which calls `get_bytes` sequentially.
67    /// The parquet reader calls this to fetch column chunks for a row group, so
68    /// without this override each column chunk is a serial round-trip to object storage.
69    /// Adapted from object_store's `coalesce_ranges` in `util.rs`.
70    fn get_byte_ranges(
71        &mut self,
72        ranges: Vec<Range<u64>>,
73    ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
74        let coalesce_bytes = self.parquet_read_options.range_coalesce_bytes();
75        let concurrency = self.parquet_read_options.range_fetch_concurrency().max(1);
76
77        async move {
78            // Merge nearby ranges to reduce the number of object store requests.
79            let fetch_ranges = merge_ranges(&ranges, coalesce_bytes);
80            let r = &self.r;
81
82            // Fetch merged ranges concurrently.
83            let fetched: Vec<Bytes> = futures::stream::iter(fetch_ranges.iter().cloned())
84                .map(|range| async move {
85                    r.read(range)
86                        .await
87                        .map_err(|e| parquet::errors::ParquetError::External(Box::new(e)))
88                })
89                .buffered(concurrency)
90                .try_collect()
91                .await?;
92
93            // Slice the fetched data back into the originally requested ranges.
94            Ok(ranges
95                .iter()
96                .map(|range| {
97                    let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1;
98                    let fetch_range = &fetch_ranges[idx];
99                    let fetch_bytes = &fetched[idx];
100                    let start = (range.start - fetch_range.start) as usize;
101                    let end = (range.end - fetch_range.start) as usize;
102                    fetch_bytes.slice(start..end.min(fetch_bytes.len()))
103                })
104                .collect())
105        }
106        .boxed()
107    }
108
109    fn get_metadata(
110        &mut self,
111        options: Option<&'_ ArrowReaderOptions>,
112    ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
113        let decryption_properties = options
114            .and_then(|opts| opts.file_decryption_properties())
115            .cloned();
116
117        let metadata_options = options.map(|opts| opts.metadata_options().clone());
118
119        async move {
120            let reader = ParquetMetaDataReader::new()
121                .with_prefetch_hint(self.parquet_read_options.metadata_size_hint())
122                // Set the page policy first because it updates both column and offset policies.
123                .with_page_index_policy(PageIndexPolicy::from(
124                    self.parquet_read_options.preload_page_index(),
125                ))
126                .with_column_index_policy(PageIndexPolicy::from(
127                    self.parquet_read_options.preload_column_index(),
128                ))
129                .with_offset_index_policy(PageIndexPolicy::from(
130                    self.parquet_read_options.preload_offset_index(),
131                ))
132                .with_metadata_options(metadata_options)
133                .with_decryption_properties(decryption_properties);
134            let size = self.meta.size;
135            let meta = reader.load_and_finish(self, size).await?;
136
137            Ok(Arc::new(meta))
138        }
139        .boxed()
140    }
141}
142
143/// Merge overlapping or nearby byte ranges, combining ranges with gaps <= `coalesce` bytes.
144/// Adapted from object_store's `merge_ranges` in `util.rs`.
145fn merge_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
146    if ranges.is_empty() {
147        return vec![];
148    }
149
150    let mut ranges = ranges.to_vec();
151    ranges.sort_unstable_by_key(|r| r.start);
152
153    let mut merged = Vec::with_capacity(ranges.len());
154    let mut start_idx = 0;
155    let mut end_idx = 1;
156
157    while start_idx != ranges.len() {
158        let mut range_end = ranges[start_idx].end;
159
160        while end_idx != ranges.len()
161            && ranges[end_idx]
162                .start
163                .checked_sub(range_end)
164                .map(|delta| delta <= coalesce)
165                .unwrap_or(true)
166        {
167            range_end = range_end.max(ranges[end_idx].end);
168            end_idx += 1;
169        }
170
171        merged.push(ranges[start_idx].start..range_end);
172        start_idx = end_idx;
173        end_idx += 1;
174    }
175
176    merged
177}
178
179#[cfg(test)]
180mod tests {
181    use std::ops::Range;
182
183    use parquet::arrow::async_reader::AsyncFileReader;
184
185    use super::{ArrowFileReader, ParquetReadOptions, merge_ranges};
186    use crate::io::{FileMetadata, FileRead};
187
188    #[test]
189    fn test_merge_ranges_empty() {
190        assert_eq!(merge_ranges(&[], 1024), Vec::<Range<u64>>::new());
191    }
192
193    #[test]
194    fn test_merge_ranges_no_coalesce() {
195        // Ranges far apart should not be merged
196        let ranges = vec![0..100, 1_000_000..1_000_100];
197        let merged = merge_ranges(&ranges, 1024);
198        assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]);
199    }
200
201    #[test]
202    fn test_merge_ranges_coalesce() {
203        // Ranges within the gap threshold should be merged
204        let ranges = vec![0..100, 200..300, 500..600];
205        let merged = merge_ranges(&ranges, 1024);
206        assert_eq!(merged, vec![0..600]);
207    }
208
209    #[test]
210    fn test_merge_ranges_overlapping() {
211        let ranges = vec![0..200, 100..300];
212        let merged = merge_ranges(&ranges, 0);
213        assert_eq!(merged, vec![0..300]);
214    }
215
216    #[test]
217    fn test_merge_ranges_unsorted() {
218        let ranges = vec![500..600, 0..100, 200..300];
219        let merged = merge_ranges(&ranges, 1024);
220        assert_eq!(merged, vec![0..600]);
221    }
222
223    /// Mock FileRead backed by a flat byte buffer.
224    struct MockFileRead {
225        data: bytes::Bytes,
226    }
227
228    impl MockFileRead {
229        fn new(size: usize) -> Self {
230            // Fill with sequential byte values so slices are verifiable.
231            let data: Vec<u8> = (0..size).map(|i| (i % 256) as u8).collect();
232            Self {
233                data: bytes::Bytes::from(data),
234            }
235        }
236    }
237
238    #[async_trait::async_trait]
239    impl FileRead for MockFileRead {
240        async fn read(&self, range: Range<u64>) -> crate::Result<bytes::Bytes> {
241            Ok(self.data.slice(range.start as usize..range.end as usize))
242        }
243    }
244
245    #[tokio::test]
246    async fn test_get_byte_ranges_no_coalesce() {
247        let mock = MockFileRead::new(2048);
248        let expected_0 = mock.data.slice(0..100);
249        let expected_1 = mock.data.slice(1500..1600);
250
251        let mut reader = ArrowFileReader::new(FileMetadata { size: 2048 }, Box::new(mock))
252            .with_parquet_read_options(
253                ParquetReadOptions::builder()
254                    .with_range_coalesce_bytes(0)
255                    .build(),
256            );
257
258        let result = reader
259            .get_byte_ranges(vec![0..100, 1500..1600])
260            .await
261            .unwrap();
262
263        assert_eq!(result.len(), 2);
264        assert_eq!(result[0], expected_0);
265        assert_eq!(result[1], expected_1);
266    }
267
268    #[tokio::test]
269    async fn test_get_byte_ranges_with_coalesce() {
270        let mock = MockFileRead::new(1024);
271        let expected_0 = mock.data.slice(0..100);
272        let expected_1 = mock.data.slice(200..300);
273        let expected_2 = mock.data.slice(500..600);
274
275        let mut reader = ArrowFileReader::new(FileMetadata { size: 1024 }, Box::new(mock))
276            .with_parquet_read_options(
277                ParquetReadOptions::builder()
278                    .with_range_coalesce_bytes(1024)
279                    .build(),
280            );
281
282        // All ranges within coalesce threshold — should merge into one fetch.
283        let result = reader
284            .get_byte_ranges(vec![0..100, 200..300, 500..600])
285            .await
286            .unwrap();
287
288        assert_eq!(result.len(), 3);
289        assert_eq!(result[0], expected_0);
290        assert_eq!(result[1], expected_1);
291        assert_eq!(result[2], expected_2);
292    }
293
294    #[tokio::test]
295    async fn test_get_byte_ranges_empty() {
296        let mock = MockFileRead::new(1024);
297        let mut reader = ArrowFileReader::new(FileMetadata { size: 1024 }, Box::new(mock));
298
299        let result = reader.get_byte_ranges(vec![]).await.unwrap();
300        assert!(result.is_empty());
301    }
302
303    #[tokio::test]
304    async fn test_get_byte_ranges_coalesce_max() {
305        let mock = MockFileRead::new(2048);
306        let expected_0 = mock.data.slice(0..100);
307        let expected_1 = mock.data.slice(1500..1600);
308
309        let mut reader = ArrowFileReader::new(FileMetadata { size: 2048 }, Box::new(mock))
310            .with_parquet_read_options(
311                ParquetReadOptions::builder()
312                    .with_range_coalesce_bytes(u64::MAX)
313                    .build(),
314            );
315
316        // u64::MAX coalesce — all ranges merge into a single fetch.
317        let result = reader
318            .get_byte_ranges(vec![0..100, 1500..1600])
319            .await
320            .unwrap();
321
322        assert_eq!(result.len(), 2);
323        assert_eq!(result[0], expected_0);
324        assert_eq!(result[1], expected_1);
325    }
326
327    #[tokio::test]
328    async fn test_get_byte_ranges_concurrency_zero() {
329        // concurrency=0 is clamped to 1, so this should not hang.
330        let mock = MockFileRead::new(1024);
331        let expected = mock.data.slice(0..100);
332
333        let mut reader = ArrowFileReader::new(FileMetadata { size: 1024 }, Box::new(mock))
334            .with_parquet_read_options(
335                ParquetReadOptions::builder()
336                    .with_range_fetch_concurrency(0)
337                    .build(),
338            );
339
340        let result = reader
341            .get_byte_ranges(vec![0..100, 200..300])
342            .await
343            .unwrap();
344        assert_eq!(result.len(), 2);
345        assert_eq!(result[0], expected);
346    }
347
348    #[tokio::test]
349    async fn test_get_byte_ranges_concurrency_one() {
350        let mock = MockFileRead::new(2048);
351        let expected_0 = mock.data.slice(0..100);
352        let expected_1 = mock.data.slice(500..600);
353        let expected_2 = mock.data.slice(1500..1600);
354
355        let mut reader = ArrowFileReader::new(FileMetadata { size: 2048 }, Box::new(mock))
356            .with_parquet_read_options(
357                ParquetReadOptions::builder()
358                    .with_range_coalesce_bytes(0)
359                    .with_range_fetch_concurrency(1)
360                    .build(),
361            );
362
363        // concurrency=1 with no coalescing — sequential fetches.
364        let result = reader
365            .get_byte_ranges(vec![0..100, 500..600, 1500..1600])
366            .await
367            .unwrap();
368
369        assert_eq!(result.len(), 3);
370        assert_eq!(result[0], expected_0);
371        assert_eq!(result[1], expected_1);
372        assert_eq!(result[2], expected_2);
373    }
374}