iceberg/arrow/
scan_metrics.rs1use std::ops::Range;
21use std::sync::Arc;
22use std::sync::atomic::{AtomicU64, Ordering};
23
24use bytes::Bytes;
25
26use crate::error::Result;
27use crate::io::FileRead;
28use crate::scan::ArrowRecordBatchStream;
29
30pub(crate) struct CountingFileRead<F: FileRead> {
32 inner: F,
33 bytes_read: Arc<AtomicU64>,
34}
35
36impl<F: FileRead> CountingFileRead<F> {
37 pub(crate) fn new(inner: F, bytes_read: Arc<AtomicU64>) -> Self {
38 Self { inner, bytes_read }
39 }
40}
41
42#[async_trait::async_trait]
43impl<F: FileRead> FileRead for CountingFileRead<F> {
44 async fn read(&self, range: Range<u64>) -> Result<Bytes> {
45 debug_assert!(range.end >= range.start);
46 self.bytes_read
47 .fetch_add(range.end - range.start, Ordering::Relaxed);
48 self.inner.read(range).await
49 }
50}
51
52#[derive(Clone, Debug)]
54pub struct ScanMetrics {
55 bytes_read: Arc<AtomicU64>,
56}
57
58impl ScanMetrics {
59 pub(crate) fn new() -> Self {
60 Self {
61 bytes_read: Arc::new(AtomicU64::new(0)),
62 }
63 }
64
65 pub(crate) fn bytes_read_counter(&self) -> &Arc<AtomicU64> {
66 &self.bytes_read
67 }
68
69 pub fn bytes_read(&self) -> u64 {
71 self.bytes_read.load(Ordering::Relaxed)
72 }
73}
74
75pub struct ScanResult {
78 stream: ArrowRecordBatchStream,
79 metrics: ScanMetrics,
80}
81
82impl ScanResult {
83 pub(crate) fn new(stream: ArrowRecordBatchStream, metrics: ScanMetrics) -> Self {
84 Self { stream, metrics }
85 }
86
87 pub fn stream(self) -> ArrowRecordBatchStream {
89 self.stream
90 }
91
92 pub fn metrics(&self) -> &ScanMetrics {
94 &self.metrics
95 }
96}