iceberg/arrow/
delete_file_loader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use futures::{StreamExt, TryStreamExt};
21
22use crate::arrow::ArrowReader;
23use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
24use crate::io::FileIO;
25use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
26use crate::spec::{Schema, SchemaRef};
27use crate::{Error, ErrorKind, Result};
28
29/// Delete File Loader
30#[allow(unused)]
31#[async_trait::async_trait]
32pub trait DeleteFileLoader {
33    /// Read the delete file referred to in the task
34    ///
35    /// Returns the contents of the delete file as a RecordBatch stream. Applies schema evolution.
36    async fn read_delete_file(
37        &self,
38        task: &FileScanTaskDeleteFile,
39        schema: SchemaRef,
40    ) -> Result<ArrowRecordBatchStream>;
41}
42
43#[derive(Clone, Debug)]
44pub(crate) struct BasicDeleteFileLoader {
45    file_io: FileIO,
46}
47
48#[allow(unused_variables)]
49impl BasicDeleteFileLoader {
50    pub fn new(file_io: FileIO) -> Self {
51        BasicDeleteFileLoader { file_io }
52    }
53    /// Loads a RecordBatchStream for a given datafile.
54    pub(crate) async fn parquet_to_batch_stream(
55        &self,
56        data_file_path: &str,
57        file_size_in_bytes: u64,
58    ) -> Result<ArrowRecordBatchStream> {
59        /*
60           Essentially a super-cut-down ArrowReader. We can't use ArrowReader directly
61           as that introduces a circular dependency.
62        */
63        let record_batch_stream = ArrowReader::create_parquet_record_batch_stream_builder(
64            data_file_path,
65            self.file_io.clone(),
66            false,
67            None,
68            None,
69            file_size_in_bytes,
70        )
71        .await?
72        .build()?
73        .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")));
74
75        Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
76    }
77
78    /// Evolves the schema of the RecordBatches from an equality delete file.
79    ///
80    /// Per the [Iceberg spec](https://iceberg.apache.org/spec/#equality-delete-files),
81    /// only evolves the specified `equality_ids` columns, not all table columns.
82    pub(crate) async fn evolve_schema(
83        record_batch_stream: ArrowRecordBatchStream,
84        target_schema: Arc<Schema>,
85        equality_ids: &[i32],
86    ) -> Result<ArrowRecordBatchStream> {
87        let mut record_batch_transformer =
88            RecordBatchTransformerBuilder::new(target_schema.clone(), equality_ids).build();
89
90        let record_batch_stream = record_batch_stream.map(move |record_batch| {
91            record_batch.and_then(|record_batch| {
92                record_batch_transformer.process_record_batch(record_batch)
93            })
94        });
95
96        Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
97    }
98}
99
100#[async_trait::async_trait]
101impl DeleteFileLoader for BasicDeleteFileLoader {
102    async fn read_delete_file(
103        &self,
104        task: &FileScanTaskDeleteFile,
105        schema: SchemaRef,
106    ) -> Result<ArrowRecordBatchStream> {
107        let raw_batch_stream = self
108            .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes)
109            .await?;
110
111        // For equality deletes, only evolve the equality_ids columns.
112        // For positional deletes (equality_ids is None), use all field IDs.
113        let field_ids = match &task.equality_ids {
114            Some(ids) => ids.clone(),
115            None => schema.field_id_to_name_map().keys().cloned().collect(),
116        };
117
118        Self::evolve_schema(raw_batch_stream, schema, &field_ids).await
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use tempfile::TempDir;
125
126    use super::*;
127    use crate::arrow::delete_filter::tests::setup;
128
129    #[tokio::test]
130    async fn test_basic_delete_file_loader_read_delete_file() {
131        let tmp_dir = TempDir::new().unwrap();
132        let table_location = tmp_dir.path();
133        let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
134            .unwrap()
135            .build()
136            .unwrap();
137
138        let delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());
139
140        let file_scan_tasks = setup(table_location);
141
142        let result = delete_file_loader
143            .read_delete_file(
144                &file_scan_tasks[0].deletes[0],
145                file_scan_tasks[0].schema_ref(),
146            )
147            .await
148            .unwrap();
149
150        let result = result.try_collect::<Vec<_>>().await.unwrap();
151
152        assert_eq!(result.len(), 1);
153    }
154}