iceberg/arrow/
delete_file_loader.rs1use std::sync::Arc;
19
20use futures::{StreamExt, TryStreamExt};
21
22use crate::arrow::ArrowReader;
23use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
24use crate::io::FileIO;
25use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
26use crate::spec::{Schema, SchemaRef};
27use crate::{Error, ErrorKind, Result};
28
29#[allow(unused)]
31#[async_trait::async_trait]
32pub trait DeleteFileLoader {
33 async fn read_delete_file(
37 &self,
38 task: &FileScanTaskDeleteFile,
39 schema: SchemaRef,
40 ) -> Result<ArrowRecordBatchStream>;
41}
42
43#[derive(Clone, Debug)]
44pub(crate) struct BasicDeleteFileLoader {
45 file_io: FileIO,
46}
47
48#[allow(unused_variables)]
49impl BasicDeleteFileLoader {
50 pub fn new(file_io: FileIO) -> Self {
51 BasicDeleteFileLoader { file_io }
52 }
53 pub(crate) async fn parquet_to_batch_stream(
55 &self,
56 data_file_path: &str,
57 file_size_in_bytes: u64,
58 ) -> Result<ArrowRecordBatchStream> {
59 let record_batch_stream = ArrowReader::create_parquet_record_batch_stream_builder(
64 data_file_path,
65 self.file_io.clone(),
66 false,
67 None,
68 None,
69 file_size_in_bytes,
70 )
71 .await?
72 .build()?
73 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")));
74
75 Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
76 }
77
78 pub(crate) async fn evolve_schema(
83 record_batch_stream: ArrowRecordBatchStream,
84 target_schema: Arc<Schema>,
85 equality_ids: &[i32],
86 ) -> Result<ArrowRecordBatchStream> {
87 let mut record_batch_transformer =
88 RecordBatchTransformerBuilder::new(target_schema.clone(), equality_ids).build();
89
90 let record_batch_stream = record_batch_stream.map(move |record_batch| {
91 record_batch.and_then(|record_batch| {
92 record_batch_transformer.process_record_batch(record_batch)
93 })
94 });
95
96 Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
97 }
98}
99
100#[async_trait::async_trait]
101impl DeleteFileLoader for BasicDeleteFileLoader {
102 async fn read_delete_file(
103 &self,
104 task: &FileScanTaskDeleteFile,
105 schema: SchemaRef,
106 ) -> Result<ArrowRecordBatchStream> {
107 let raw_batch_stream = self
108 .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes)
109 .await?;
110
111 let field_ids = match &task.equality_ids {
114 Some(ids) => ids.clone(),
115 None => schema.field_id_to_name_map().keys().cloned().collect(),
116 };
117
118 Self::evolve_schema(raw_batch_stream, schema, &field_ids).await
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use tempfile::TempDir;
125
126 use super::*;
127 use crate::arrow::delete_filter::tests::setup;
128
129 #[tokio::test]
130 async fn test_basic_delete_file_loader_read_delete_file() {
131 let tmp_dir = TempDir::new().unwrap();
132 let table_location = tmp_dir.path();
133 let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
134 .unwrap()
135 .build()
136 .unwrap();
137
138 let delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());
139
140 let file_scan_tasks = setup(table_location);
141
142 let result = delete_file_loader
143 .read_delete_file(
144 &file_scan_tasks[0].deletes[0],
145 file_scan_tasks[0].schema_ref(),
146 )
147 .await
148 .unwrap();
149
150 let result = result.try_collect::<Vec<_>>().await.unwrap();
151
152 assert_eq!(result.len(), 1);
153 }
154}