iceberg/transaction/
append.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use uuid::Uuid;
23
24use crate::error::Result;
25use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation};
26use crate::table::Table;
27use crate::transaction::snapshot::{
28    DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer,
29};
30use crate::transaction::{ActionCommit, TransactionAction};
31
32/// FastAppendAction is a transaction action for fast append data files to the table.
33pub struct FastAppendAction {
34    check_duplicate: bool,
35    // below are properties used to create SnapshotProducer when commit
36    commit_uuid: Option<Uuid>,
37    key_metadata: Option<Vec<u8>>,
38    snapshot_properties: HashMap<String, String>,
39    added_data_files: Vec<DataFile>,
40}
41
42impl FastAppendAction {
43    pub(crate) fn new() -> Self {
44        Self {
45            check_duplicate: true,
46            commit_uuid: None,
47            key_metadata: None,
48            snapshot_properties: HashMap::default(),
49            added_data_files: vec![],
50        }
51    }
52
53    /// Set whether to check duplicate files
54    pub fn with_check_duplicate(mut self, v: bool) -> Self {
55        self.check_duplicate = v;
56        self
57    }
58
59    /// Add data files to the snapshot.
60    pub fn add_data_files(mut self, data_files: impl IntoIterator<Item = DataFile>) -> Self {
61        self.added_data_files.extend(data_files);
62        self
63    }
64
65    /// Set commit UUID for the snapshot.
66    pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self {
67        self.commit_uuid = Some(commit_uuid);
68        self
69    }
70
71    /// Set key metadata for manifest files.
72    pub fn set_key_metadata(mut self, key_metadata: Vec<u8>) -> Self {
73        self.key_metadata = Some(key_metadata);
74        self
75    }
76
77    /// Set snapshot summary properties.
78    pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap<String, String>) -> Self {
79        self.snapshot_properties = snapshot_properties;
80        self
81    }
82}
83
84#[async_trait]
85impl TransactionAction for FastAppendAction {
86    async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
87        let snapshot_producer = SnapshotProducer::new(
88            table,
89            self.commit_uuid.unwrap_or_else(Uuid::now_v7),
90            self.key_metadata.clone(),
91            self.snapshot_properties.clone(),
92            self.added_data_files.clone(),
93        );
94
95        // validate added files
96        snapshot_producer.validate_added_data_files()?;
97
98        // Checks duplicate files
99        if self.check_duplicate {
100            snapshot_producer.validate_duplicate_files().await?;
101        }
102
103        snapshot_producer
104            .commit(FastAppendOperation, DefaultManifestProcess)
105            .await
106    }
107}
108
109struct FastAppendOperation;
110
111impl SnapshotProduceOperation for FastAppendOperation {
112    fn operation(&self) -> Operation {
113        Operation::Append
114    }
115
116    async fn delete_entries(
117        &self,
118        _snapshot_produce: &SnapshotProducer<'_>,
119    ) -> Result<Vec<ManifestEntry>> {
120        Ok(vec![])
121    }
122
123    async fn existing_manifest(
124        &self,
125        snapshot_produce: &SnapshotProducer<'_>,
126    ) -> Result<Vec<ManifestFile>> {
127        let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else {
128            return Ok(vec![]);
129        };
130
131        let manifest_list = snapshot
132            .load_manifest_list(
133                snapshot_produce.table.file_io(),
134                &snapshot_produce.table.metadata_ref(),
135            )
136            .await?;
137
138        Ok(manifest_list
139            .entries()
140            .iter()
141            .filter(|entry| entry.has_added_files() || entry.has_existing_files())
142            .cloned()
143            .collect())
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use std::collections::HashMap;
150    use std::sync::Arc;
151
152    use crate::spec::{
153        DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, Struct,
154    };
155    use crate::transaction::tests::make_v2_minimal_table;
156    use crate::transaction::{Transaction, TransactionAction};
157    use crate::{TableRequirement, TableUpdate};
158
159    #[tokio::test]
160    async fn test_empty_data_append_action() {
161        let table = make_v2_minimal_table();
162        let tx = Transaction::new(&table);
163        let action = tx.fast_append().add_data_files(vec![]);
164        assert!(Arc::new(action).commit(&table).await.is_err());
165    }
166
167    #[tokio::test]
168    async fn test_set_snapshot_properties() {
169        let table = make_v2_minimal_table();
170        let tx = Transaction::new(&table);
171
172        let mut snapshot_properties = HashMap::new();
173        snapshot_properties.insert("key".to_string(), "val".to_string());
174
175        let data_file = DataFileBuilder::default()
176            .content(DataContentType::Data)
177            .file_path("test/1.parquet".to_string())
178            .file_format(DataFileFormat::Parquet)
179            .file_size_in_bytes(100)
180            .record_count(1)
181            .partition_spec_id(table.metadata().default_partition_spec_id())
182            .partition(Struct::from_iter([Some(Literal::long(300))]))
183            .build()
184            .unwrap();
185
186        let action = tx
187            .fast_append()
188            .set_snapshot_properties(snapshot_properties)
189            .add_data_files(vec![data_file]);
190        let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
191        let updates = action_commit.take_updates();
192
193        // Check customized properties is contained in snapshot summary properties.
194        let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] {
195            snapshot
196        } else {
197            unreachable!()
198        };
199        assert_eq!(
200            new_snapshot
201                .summary()
202                .additional_properties
203                .get("key")
204                .unwrap(),
205            "val"
206        );
207    }
208
209    #[tokio::test]
210    async fn test_append_snapshot_properties() {
211        let table = make_v2_minimal_table();
212        let tx = Transaction::new(&table);
213
214        let mut snapshot_properties = HashMap::new();
215        snapshot_properties.insert("key".to_string(), "val".to_string());
216
217        let action = tx
218            .fast_append()
219            .set_snapshot_properties(snapshot_properties);
220        let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
221        let updates = action_commit.take_updates();
222
223        // Check customized properties is contained in snapshot summary properties.
224        let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] {
225            snapshot
226        } else {
227            unreachable!()
228        };
229        assert_eq!(
230            new_snapshot
231                .summary()
232                .additional_properties
233                .get("key")
234                .unwrap(),
235            "val"
236        );
237    }
238
239    #[tokio::test]
240    async fn test_fast_append_file_with_incompatible_partition_value() {
241        let table = make_v2_minimal_table();
242        let tx = Transaction::new(&table);
243        let action = tx.fast_append();
244
245        // check add data file with incompatible partition value
246        let data_file = DataFileBuilder::default()
247            .content(DataContentType::Data)
248            .file_path("test/3.parquet".to_string())
249            .file_format(DataFileFormat::Parquet)
250            .file_size_in_bytes(100)
251            .record_count(1)
252            .partition_spec_id(table.metadata().default_partition_spec_id())
253            .partition(Struct::from_iter([Some(Literal::string("test"))]))
254            .build()
255            .unwrap();
256
257        let action = action.add_data_files(vec![data_file.clone()]);
258
259        assert!(Arc::new(action).commit(&table).await.is_err());
260    }
261
262    #[tokio::test]
263    async fn test_fast_append() {
264        let table = make_v2_minimal_table();
265        let tx = Transaction::new(&table);
266        let action = tx.fast_append();
267
268        let data_file = DataFileBuilder::default()
269            .content(DataContentType::Data)
270            .file_path("test/3.parquet".to_string())
271            .file_format(DataFileFormat::Parquet)
272            .file_size_in_bytes(100)
273            .record_count(1)
274            .partition_spec_id(table.metadata().default_partition_spec_id())
275            .partition(Struct::from_iter([Some(Literal::long(300))]))
276            .build()
277            .unwrap();
278
279        let action = action.add_data_files(vec![data_file.clone()]);
280        let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
281        let updates = action_commit.take_updates();
282        let requirements = action_commit.take_requirements();
283
284        // check updates and requirements
285        assert!(
286            matches!((&updates[0],&updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH)
287        );
288        assert_eq!(
289            vec![
290                TableRequirement::UuidMatch {
291                    uuid: table.metadata().uuid()
292                },
293                TableRequirement::RefSnapshotIdMatch {
294                    r#ref: MAIN_BRANCH.to_string(),
295                    snapshot_id: table.metadata().current_snapshot_id
296                }
297            ],
298            requirements
299        );
300
301        // check manifest list
302        let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] {
303            snapshot
304        } else {
305            unreachable!()
306        };
307        let manifest_list = new_snapshot
308            .load_manifest_list(table.file_io(), table.metadata())
309            .await
310            .unwrap();
311        assert_eq!(1, manifest_list.entries().len());
312        assert_eq!(
313            manifest_list.entries()[0].sequence_number,
314            new_snapshot.sequence_number()
315        );
316
317        // check manifest
318        let manifest = manifest_list.entries()[0]
319            .load_manifest(table.file_io())
320            .await
321            .unwrap();
322        assert_eq!(1, manifest.entries().len());
323        assert_eq!(
324            new_snapshot.sequence_number(),
325            manifest.entries()[0]
326                .sequence_number()
327                .expect("Inherit sequence number by load manifest")
328        );
329
330        assert_eq!(
331            new_snapshot.snapshot_id(),
332            manifest.entries()[0].snapshot_id().unwrap()
333        );
334        assert_eq!(data_file, *manifest.entries()[0].data_file());
335    }
336}