iceberg/arrow/
delete_file_loader.rs1use std::sync::Arc;
19
20use futures::{StreamExt, TryStreamExt};
21
22use crate::arrow::ArrowReader;
23use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
24use crate::io::FileIO;
25use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
26use crate::spec::{Schema, SchemaRef};
27use crate::{Error, ErrorKind, Result};
28
29#[allow(unused)]
31#[async_trait::async_trait]
32pub trait DeleteFileLoader {
33 async fn read_delete_file(
37 &self,
38 task: &FileScanTaskDeleteFile,
39 schema: SchemaRef,
40 ) -> Result<ArrowRecordBatchStream>;
41}
42
43#[derive(Clone, Debug)]
44pub(crate) struct BasicDeleteFileLoader {
45 file_io: FileIO,
46}
47
48#[allow(unused_variables)]
49impl BasicDeleteFileLoader {
50 pub fn new(file_io: FileIO) -> Self {
51 BasicDeleteFileLoader { file_io }
52 }
53 pub(crate) async fn parquet_to_batch_stream(
55 &self,
56 data_file_path: &str,
57 ) -> Result<ArrowRecordBatchStream> {
58 let record_batch_stream = ArrowReader::create_parquet_record_batch_stream_builder(
63 data_file_path,
64 self.file_io.clone(),
65 false,
66 None,
67 )
68 .await?
69 .build()?
70 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")));
71
72 Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
73 }
74
75 pub(crate) async fn evolve_schema(
80 record_batch_stream: ArrowRecordBatchStream,
81 target_schema: Arc<Schema>,
82 equality_ids: &[i32],
83 ) -> Result<ArrowRecordBatchStream> {
84 let mut record_batch_transformer =
85 RecordBatchTransformerBuilder::new(target_schema.clone(), equality_ids).build();
86
87 let record_batch_stream = record_batch_stream.map(move |record_batch| {
88 record_batch.and_then(|record_batch| {
89 record_batch_transformer.process_record_batch(record_batch)
90 })
91 });
92
93 Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
94 }
95}
96
97#[async_trait::async_trait]
98impl DeleteFileLoader for BasicDeleteFileLoader {
99 async fn read_delete_file(
100 &self,
101 task: &FileScanTaskDeleteFile,
102 schema: SchemaRef,
103 ) -> Result<ArrowRecordBatchStream> {
104 let raw_batch_stream = self.parquet_to_batch_stream(&task.file_path).await?;
105
106 let field_ids = match &task.equality_ids {
109 Some(ids) => ids.clone(),
110 None => schema.field_id_to_name_map().keys().cloned().collect(),
111 };
112
113 Self::evolve_schema(raw_batch_stream, schema, &field_ids).await
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use tempfile::TempDir;
120
121 use super::*;
122 use crate::arrow::delete_filter::tests::setup;
123
124 #[tokio::test]
125 async fn test_basic_delete_file_loader_read_delete_file() {
126 let tmp_dir = TempDir::new().unwrap();
127 let table_location = tmp_dir.path();
128 let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
129 .unwrap()
130 .build()
131 .unwrap();
132
133 let delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());
134
135 let file_scan_tasks = setup(table_location);
136
137 let result = delete_file_loader
138 .read_delete_file(
139 &file_scan_tasks[0].deletes[0],
140 file_scan_tasks[0].schema_ref(),
141 )
142 .await
143 .unwrap();
144
145 let result = result.try_collect::<Vec<_>>().await.unwrap();
146
147 assert_eq!(result.len(), 1);
148 }
149}