iceberg/arrow/
scan_metrics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Scan metrics and I/O counting for Parquet data file reads.
19
20use std::ops::Range;
21use std::sync::Arc;
22use std::sync::atomic::{AtomicU64, Ordering};
23
24use bytes::Bytes;
25
26use crate::error::Result;
27use crate::io::FileRead;
28use crate::scan::ArrowRecordBatchStream;
29
30/// Wraps a [`FileRead`] to count bytes read via a shared atomic counter.
31pub(crate) struct CountingFileRead<F: FileRead> {
32    inner: F,
33    bytes_read: Arc<AtomicU64>,
34}
35
36impl<F: FileRead> CountingFileRead<F> {
37    pub(crate) fn new(inner: F, bytes_read: Arc<AtomicU64>) -> Self {
38        Self { inner, bytes_read }
39    }
40}
41
42#[async_trait::async_trait]
43impl<F: FileRead> FileRead for CountingFileRead<F> {
44    async fn read(&self, range: Range<u64>) -> Result<Bytes> {
45        debug_assert!(range.end >= range.start);
46        self.bytes_read
47            .fetch_add(range.end - range.start, Ordering::Relaxed);
48        self.inner.read(range).await
49    }
50}
51
52/// Metrics collected during an Iceberg scan.
53#[derive(Clone, Debug)]
54pub struct ScanMetrics {
55    bytes_read: Arc<AtomicU64>,
56}
57
58impl ScanMetrics {
59    pub(crate) fn new() -> Self {
60        Self {
61            bytes_read: Arc::new(AtomicU64::new(0)),
62        }
63    }
64
65    pub(crate) fn bytes_read_counter(&self) -> &Arc<AtomicU64> {
66        &self.bytes_read
67    }
68
69    /// Total bytes read from storage during this scan, including data files and delete files.
70    pub fn bytes_read(&self) -> u64 {
71        self.bytes_read.load(Ordering::Relaxed)
72    }
73}
74
75/// Result of [`ArrowReader::read`](super::ArrowReader::read), containing the
76/// record batch stream and metrics collected during the scan.
77pub struct ScanResult {
78    stream: ArrowRecordBatchStream,
79    metrics: ScanMetrics,
80}
81
82impl ScanResult {
83    pub(crate) fn new(stream: ArrowRecordBatchStream, metrics: ScanMetrics) -> Self {
84        Self { stream, metrics }
85    }
86
87    /// Consumes the result, returning only the record batch stream.
88    pub fn stream(self) -> ArrowRecordBatchStream {
89        self.stream
90    }
91
92    /// Returns a reference to the scan metrics.
93    pub fn metrics(&self) -> &ScanMetrics {
94        &self.metrics
95    }
96}