iceberg/arrow/
delete_file_loader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use futures::{StreamExt, TryStreamExt};
21use parquet::arrow::ParquetRecordBatchStreamBuilder;
22
23use crate::arrow::ArrowReader;
24use crate::arrow::reader::ParquetReadOptions;
25use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
26use crate::arrow::scan_metrics::ScanMetrics;
27use crate::io::FileIO;
28use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
29use crate::spec::{Schema, SchemaRef};
30use crate::{Error, ErrorKind, Result};
31
32/// Delete File Loader
33#[allow(unused)]
34#[async_trait::async_trait]
35pub trait DeleteFileLoader {
36    /// Read the delete file referred to in the task
37    ///
38    /// Returns the contents of the delete file as a RecordBatch stream. Applies schema evolution.
39    async fn read_delete_file(
40        &self,
41        task: &FileScanTaskDeleteFile,
42        schema: SchemaRef,
43    ) -> Result<ArrowRecordBatchStream>;
44}
45
46#[derive(Clone, Debug)]
47pub(crate) struct BasicDeleteFileLoader {
48    file_io: FileIO,
49    scan_metrics: ScanMetrics,
50}
51
52#[allow(unused_variables)]
53impl BasicDeleteFileLoader {
54    pub fn new(file_io: FileIO, scan_metrics: ScanMetrics) -> Self {
55        BasicDeleteFileLoader {
56            file_io,
57            scan_metrics,
58        }
59    }
60
61    pub(crate) fn file_io(&self) -> &FileIO {
62        &self.file_io
63    }
64
65    /// Loads a RecordBatchStream for a given datafile.
66    pub(crate) async fn parquet_to_batch_stream(
67        &self,
68        data_file_path: &str,
69        file_size_in_bytes: u64,
70    ) -> Result<ArrowRecordBatchStream> {
71        /*
72           Essentially a super-cut-down ArrowReader. We can't use ArrowReader directly
73           as that introduces a circular dependency.
74        */
75        let parquet_read_options = ParquetReadOptions::builder().build();
76
77        let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file(
78            data_file_path,
79            &self.file_io,
80            file_size_in_bytes,
81            parquet_read_options,
82            self.scan_metrics.bytes_read_counter(),
83        )
84        .await?;
85
86        let record_batch_stream =
87            ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_file_reader, arrow_metadata)
88                .build()?
89                .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")));
90
91        Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
92    }
93
94    /// Evolves the schema of the RecordBatches from an equality delete file.
95    ///
96    /// Per the [Iceberg spec](https://iceberg.apache.org/spec/#equality-delete-files),
97    /// only evolves the specified `equality_ids` columns, not all table columns.
98    pub(crate) async fn evolve_schema(
99        record_batch_stream: ArrowRecordBatchStream,
100        target_schema: Arc<Schema>,
101        equality_ids: &[i32],
102    ) -> Result<ArrowRecordBatchStream> {
103        let mut record_batch_transformer =
104            RecordBatchTransformerBuilder::new(target_schema.clone(), equality_ids).build();
105
106        let record_batch_stream = record_batch_stream.map(move |record_batch| {
107            record_batch.and_then(|record_batch| {
108                record_batch_transformer.process_record_batch(record_batch)
109            })
110        });
111
112        Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
113    }
114}
115
116#[async_trait::async_trait]
117impl DeleteFileLoader for BasicDeleteFileLoader {
118    async fn read_delete_file(
119        &self,
120        task: &FileScanTaskDeleteFile,
121        schema: SchemaRef,
122    ) -> Result<ArrowRecordBatchStream> {
123        let raw_batch_stream = self
124            .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes)
125            .await?;
126
127        // For equality deletes, only evolve the equality_ids columns.
128        // For positional deletes (equality_ids is None), use all field IDs.
129        let field_ids = match &task.equality_ids {
130            Some(ids) => ids.clone(),
131            None => schema.field_id_to_name_map().keys().cloned().collect(),
132        };
133
134        Self::evolve_schema(raw_batch_stream, schema, &field_ids).await
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use tempfile::TempDir;
141
142    use super::*;
143    use crate::arrow::delete_filter::tests::setup;
144
145    #[tokio::test]
146    async fn test_basic_delete_file_loader_read_delete_file() {
147        let tmp_dir = TempDir::new().unwrap();
148        let table_location = tmp_dir.path();
149        let file_io = FileIO::new_with_fs();
150
151        let scan_metrics = ScanMetrics::new();
152        let delete_file_loader = BasicDeleteFileLoader::new(file_io.clone(), scan_metrics);
153
154        let file_scan_tasks = setup(table_location);
155
156        let result = delete_file_loader
157            .read_delete_file(
158                &file_scan_tasks[0].deletes[0],
159                file_scan_tasks[0].schema_ref(),
160            )
161            .await
162            .unwrap();
163
164        let result = result.try_collect::<Vec<_>>().await.unwrap();
165
166        assert_eq!(result.len(), 1);
167    }
168}