iceberg/arrow/
delete_file_loader.rs1use std::sync::Arc;
19
20use futures::{StreamExt, TryStreamExt};
21use parquet::arrow::ParquetRecordBatchStreamBuilder;
22
23use crate::arrow::ArrowReader;
24use crate::arrow::reader::ParquetReadOptions;
25use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
26use crate::io::FileIO;
27use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
28use crate::spec::{Schema, SchemaRef};
29use crate::{Error, ErrorKind, Result};
30
31#[allow(unused)]
33#[async_trait::async_trait]
34pub trait DeleteFileLoader {
35 async fn read_delete_file(
39 &self,
40 task: &FileScanTaskDeleteFile,
41 schema: SchemaRef,
42 ) -> Result<ArrowRecordBatchStream>;
43}
44
45#[derive(Clone, Debug)]
46pub(crate) struct BasicDeleteFileLoader {
47 file_io: FileIO,
48}
49
50#[allow(unused_variables)]
51impl BasicDeleteFileLoader {
52 pub fn new(file_io: FileIO) -> Self {
53 BasicDeleteFileLoader { file_io }
54 }
55 pub(crate) async fn parquet_to_batch_stream(
57 &self,
58 data_file_path: &str,
59 file_size_in_bytes: u64,
60 ) -> Result<ArrowRecordBatchStream> {
61 let parquet_read_options = ParquetReadOptions::builder().build();
66
67 let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file(
68 data_file_path,
69 &self.file_io,
70 file_size_in_bytes,
71 parquet_read_options,
72 )
73 .await?;
74
75 let record_batch_stream =
76 ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_file_reader, arrow_metadata)
77 .build()?
78 .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")));
79
80 Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
81 }
82
83 pub(crate) async fn evolve_schema(
88 record_batch_stream: ArrowRecordBatchStream,
89 target_schema: Arc<Schema>,
90 equality_ids: &[i32],
91 ) -> Result<ArrowRecordBatchStream> {
92 let mut record_batch_transformer =
93 RecordBatchTransformerBuilder::new(target_schema.clone(), equality_ids).build();
94
95 let record_batch_stream = record_batch_stream.map(move |record_batch| {
96 record_batch.and_then(|record_batch| {
97 record_batch_transformer.process_record_batch(record_batch)
98 })
99 });
100
101 Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
102 }
103}
104
105#[async_trait::async_trait]
106impl DeleteFileLoader for BasicDeleteFileLoader {
107 async fn read_delete_file(
108 &self,
109 task: &FileScanTaskDeleteFile,
110 schema: SchemaRef,
111 ) -> Result<ArrowRecordBatchStream> {
112 let raw_batch_stream = self
113 .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes)
114 .await?;
115
116 let field_ids = match &task.equality_ids {
119 Some(ids) => ids.clone(),
120 None => schema.field_id_to_name_map().keys().cloned().collect(),
121 };
122
123 Self::evolve_schema(raw_batch_stream, schema, &field_ids).await
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use tempfile::TempDir;
130
131 use super::*;
132 use crate::arrow::delete_filter::tests::setup;
133
134 #[tokio::test]
135 async fn test_basic_delete_file_loader_read_delete_file() {
136 let tmp_dir = TempDir::new().unwrap();
137 let table_location = tmp_dir.path();
138 let file_io = FileIO::new_with_fs();
139
140 let delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());
141
142 let file_scan_tasks = setup(table_location);
143
144 let result = delete_file_loader
145 .read_delete_file(
146 &file_scan_tasks[0].deletes[0],
147 file_scan_tasks[0].schema_ref(),
148 )
149 .await
150 .unwrap();
151
152 let result = result.try_collect::<Vec<_>>().await.unwrap();
153
154 assert_eq!(result.len(), 1);
155 }
156}