iceberg/arrow/
delete_file_loader.rs1use std::sync::Arc;
19
20use futures::{StreamExt, TryStreamExt};
21use parquet::arrow::ParquetRecordBatchStreamBuilder;
22
23use crate::arrow::ArrowReader;
24use crate::arrow::reader::ParquetReadOptions;
25use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
26use crate::arrow::scan_metrics::ScanMetrics;
27use crate::io::FileIO;
28use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
29use crate::spec::{Schema, SchemaRef};
30use crate::{Error, ErrorKind, Result};
31
32#[allow(unused)]
34#[async_trait::async_trait]
35pub trait DeleteFileLoader {
36 async fn read_delete_file(
40 &self,
41 task: &FileScanTaskDeleteFile,
42 schema: SchemaRef,
43 ) -> Result<ArrowRecordBatchStream>;
44}
45
46#[derive(Clone, Debug)]
47pub(crate) struct BasicDeleteFileLoader {
48 file_io: FileIO,
49 scan_metrics: ScanMetrics,
50}
51
52#[allow(unused_variables)]
53impl BasicDeleteFileLoader {
54 pub fn new(file_io: FileIO, scan_metrics: ScanMetrics) -> Self {
55 BasicDeleteFileLoader {
56 file_io,
57 scan_metrics,
58 }
59 }
60
61 pub(crate) fn file_io(&self) -> &FileIO {
62 &self.file_io
63 }
64
65 pub(crate) async fn parquet_to_batch_stream(
67 &self,
68 data_file_path: &str,
69 file_size_in_bytes: u64,
70 ) -> Result<ArrowRecordBatchStream> {
71 let parquet_read_options = ParquetReadOptions::builder().build();
76
77 let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file(
78 data_file_path,
79 &self.file_io,
80 file_size_in_bytes,
81 parquet_read_options,
82 self.scan_metrics.bytes_read_counter(),
83 )
84 .await?;
85
86 let record_batch_stream =
87 ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_file_reader, arrow_metadata)
88 .build()?
89 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")));
90
91 Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
92 }
93
94 pub(crate) async fn evolve_schema(
99 record_batch_stream: ArrowRecordBatchStream,
100 target_schema: Arc<Schema>,
101 equality_ids: &[i32],
102 ) -> Result<ArrowRecordBatchStream> {
103 let mut record_batch_transformer =
104 RecordBatchTransformerBuilder::new(target_schema.clone(), equality_ids).build();
105
106 let record_batch_stream = record_batch_stream.map(move |record_batch| {
107 record_batch.and_then(|record_batch| {
108 record_batch_transformer.process_record_batch(record_batch)
109 })
110 });
111
112 Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
113 }
114}
115
116#[async_trait::async_trait]
117impl DeleteFileLoader for BasicDeleteFileLoader {
118 async fn read_delete_file(
119 &self,
120 task: &FileScanTaskDeleteFile,
121 schema: SchemaRef,
122 ) -> Result<ArrowRecordBatchStream> {
123 let raw_batch_stream = self
124 .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes)
125 .await?;
126
127 let field_ids = match &task.equality_ids {
130 Some(ids) => ids.clone(),
131 None => schema.field_id_to_name_map().keys().cloned().collect(),
132 };
133
134 Self::evolve_schema(raw_batch_stream, schema, &field_ids).await
135 }
136}
137
138#[cfg(test)]
139mod tests {
140 use tempfile::TempDir;
141
142 use super::*;
143 use crate::arrow::delete_filter::tests::setup;
144
145 #[tokio::test]
146 async fn test_basic_delete_file_loader_read_delete_file() {
147 let tmp_dir = TempDir::new().unwrap();
148 let table_location = tmp_dir.path();
149 let file_io = FileIO::new_with_fs();
150
151 let scan_metrics = ScanMetrics::new();
152 let delete_file_loader = BasicDeleteFileLoader::new(file_io.clone(), scan_metrics);
153
154 let file_scan_tasks = setup(table_location);
155
156 let result = delete_file_loader
157 .read_delete_file(
158 &file_scan_tasks[0].deletes[0],
159 file_scan_tasks[0].schema_ref(),
160 )
161 .await
162 .unwrap();
163
164 let result = result.try_collect::<Vec<_>>().await.unwrap();
165
166 assert_eq!(result.len(), 1);
167 }
168}