iceberg/transaction/
append.rs1use std::collections::HashMap;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use uuid::Uuid;
23
24use crate::error::Result;
25use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation};
26use crate::table::Table;
27use crate::transaction::snapshot::{
28 DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer,
29};
30use crate::transaction::{ActionCommit, TransactionAction};
31
32pub struct FastAppendAction {
34 check_duplicate: bool,
35 commit_uuid: Option<Uuid>,
37 key_metadata: Option<Vec<u8>>,
38 snapshot_properties: HashMap<String, String>,
39 added_data_files: Vec<DataFile>,
40}
41
42impl FastAppendAction {
43 pub(crate) fn new() -> Self {
44 Self {
45 check_duplicate: true,
46 commit_uuid: None,
47 key_metadata: None,
48 snapshot_properties: HashMap::default(),
49 added_data_files: vec![],
50 }
51 }
52
53 pub fn with_check_duplicate(mut self, v: bool) -> Self {
55 self.check_duplicate = v;
56 self
57 }
58
59 pub fn add_data_files(mut self, data_files: impl IntoIterator<Item = DataFile>) -> Self {
61 self.added_data_files.extend(data_files);
62 self
63 }
64
65 pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self {
67 self.commit_uuid = Some(commit_uuid);
68 self
69 }
70
71 pub fn set_key_metadata(mut self, key_metadata: Vec<u8>) -> Self {
73 self.key_metadata = Some(key_metadata);
74 self
75 }
76
77 pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap<String, String>) -> Self {
79 self.snapshot_properties = snapshot_properties;
80 self
81 }
82}
83
84#[async_trait]
85impl TransactionAction for FastAppendAction {
86 async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
87 let snapshot_producer = SnapshotProducer::new(
88 table,
89 self.commit_uuid.unwrap_or_else(Uuid::now_v7),
90 self.key_metadata.clone(),
91 self.snapshot_properties.clone(),
92 self.added_data_files.clone(),
93 );
94
95 snapshot_producer.validate_added_data_files()?;
97
98 if self.check_duplicate {
100 snapshot_producer.validate_duplicate_files().await?;
101 }
102
103 snapshot_producer
104 .commit(FastAppendOperation, DefaultManifestProcess)
105 .await
106 }
107}
108
109struct FastAppendOperation;
110
111impl SnapshotProduceOperation for FastAppendOperation {
112 fn operation(&self) -> Operation {
113 Operation::Append
114 }
115
116 async fn delete_entries(
117 &self,
118 _snapshot_produce: &SnapshotProducer<'_>,
119 ) -> Result<Vec<ManifestEntry>> {
120 Ok(vec![])
121 }
122
123 async fn existing_manifest(
124 &self,
125 snapshot_produce: &SnapshotProducer<'_>,
126 ) -> Result<Vec<ManifestFile>> {
127 let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else {
128 return Ok(vec![]);
129 };
130
131 let manifest_list = snapshot
132 .load_manifest_list(
133 snapshot_produce.table.file_io(),
134 &snapshot_produce.table.metadata_ref(),
135 )
136 .await?;
137
138 Ok(manifest_list
139 .entries()
140 .iter()
141 .filter(|entry| entry.has_added_files() || entry.has_existing_files())
142 .cloned()
143 .collect())
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use std::collections::HashMap;
150 use std::sync::Arc;
151
152 use crate::spec::{
153 DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, Struct,
154 };
155 use crate::transaction::tests::make_v2_minimal_table;
156 use crate::transaction::{Transaction, TransactionAction};
157 use crate::{TableRequirement, TableUpdate};
158
159 #[tokio::test]
160 async fn test_empty_data_append_action() {
161 let table = make_v2_minimal_table();
162 let tx = Transaction::new(&table);
163 let action = tx.fast_append().add_data_files(vec![]);
164 assert!(Arc::new(action).commit(&table).await.is_err());
165 }
166
167 #[tokio::test]
168 async fn test_set_snapshot_properties() {
169 let table = make_v2_minimal_table();
170 let tx = Transaction::new(&table);
171
172 let mut snapshot_properties = HashMap::new();
173 snapshot_properties.insert("key".to_string(), "val".to_string());
174
175 let data_file = DataFileBuilder::default()
176 .content(DataContentType::Data)
177 .file_path("test/1.parquet".to_string())
178 .file_format(DataFileFormat::Parquet)
179 .file_size_in_bytes(100)
180 .record_count(1)
181 .partition_spec_id(table.metadata().default_partition_spec_id())
182 .partition(Struct::from_iter([Some(Literal::long(300))]))
183 .build()
184 .unwrap();
185
186 let action = tx
187 .fast_append()
188 .set_snapshot_properties(snapshot_properties)
189 .add_data_files(vec![data_file]);
190 let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
191 let updates = action_commit.take_updates();
192
193 let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] {
195 snapshot
196 } else {
197 unreachable!()
198 };
199 assert_eq!(
200 new_snapshot
201 .summary()
202 .additional_properties
203 .get("key")
204 .unwrap(),
205 "val"
206 );
207 }
208
209 #[tokio::test]
210 async fn test_append_snapshot_properties() {
211 let table = make_v2_minimal_table();
212 let tx = Transaction::new(&table);
213
214 let mut snapshot_properties = HashMap::new();
215 snapshot_properties.insert("key".to_string(), "val".to_string());
216
217 let action = tx
218 .fast_append()
219 .set_snapshot_properties(snapshot_properties);
220 let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
221 let updates = action_commit.take_updates();
222
223 let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] {
225 snapshot
226 } else {
227 unreachable!()
228 };
229 assert_eq!(
230 new_snapshot
231 .summary()
232 .additional_properties
233 .get("key")
234 .unwrap(),
235 "val"
236 );
237 }
238
239 #[tokio::test]
240 async fn test_fast_append_file_with_incompatible_partition_value() {
241 let table = make_v2_minimal_table();
242 let tx = Transaction::new(&table);
243 let action = tx.fast_append();
244
245 let data_file = DataFileBuilder::default()
247 .content(DataContentType::Data)
248 .file_path("test/3.parquet".to_string())
249 .file_format(DataFileFormat::Parquet)
250 .file_size_in_bytes(100)
251 .record_count(1)
252 .partition_spec_id(table.metadata().default_partition_spec_id())
253 .partition(Struct::from_iter([Some(Literal::string("test"))]))
254 .build()
255 .unwrap();
256
257 let action = action.add_data_files(vec![data_file.clone()]);
258
259 assert!(Arc::new(action).commit(&table).await.is_err());
260 }
261
262 #[tokio::test]
263 async fn test_fast_append() {
264 let table = make_v2_minimal_table();
265 let tx = Transaction::new(&table);
266 let action = tx.fast_append();
267
268 let data_file = DataFileBuilder::default()
269 .content(DataContentType::Data)
270 .file_path("test/3.parquet".to_string())
271 .file_format(DataFileFormat::Parquet)
272 .file_size_in_bytes(100)
273 .record_count(1)
274 .partition_spec_id(table.metadata().default_partition_spec_id())
275 .partition(Struct::from_iter([Some(Literal::long(300))]))
276 .build()
277 .unwrap();
278
279 let action = action.add_data_files(vec![data_file.clone()]);
280 let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
281 let updates = action_commit.take_updates();
282 let requirements = action_commit.take_requirements();
283
284 assert!(
286 matches!((&updates[0],&updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH)
287 );
288 assert_eq!(
289 vec![
290 TableRequirement::UuidMatch {
291 uuid: table.metadata().uuid()
292 },
293 TableRequirement::RefSnapshotIdMatch {
294 r#ref: MAIN_BRANCH.to_string(),
295 snapshot_id: table.metadata().current_snapshot_id
296 }
297 ],
298 requirements
299 );
300
301 let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] {
303 snapshot
304 } else {
305 unreachable!()
306 };
307 let manifest_list = new_snapshot
308 .load_manifest_list(table.file_io(), table.metadata())
309 .await
310 .unwrap();
311 assert_eq!(1, manifest_list.entries().len());
312 assert_eq!(
313 manifest_list.entries()[0].sequence_number,
314 new_snapshot.sequence_number()
315 );
316
317 let manifest = manifest_list.entries()[0]
319 .load_manifest(table.file_io())
320 .await
321 .unwrap();
322 assert_eq!(1, manifest.entries().len());
323 assert_eq!(
324 new_snapshot.sequence_number(),
325 manifest.entries()[0]
326 .sequence_number()
327 .expect("Inherit sequence number by load manifest")
328 );
329
330 assert_eq!(
331 new_snapshot.snapshot_id(),
332 manifest.entries()[0].snapshot_id().unwrap()
333 );
334 assert_eq!(data_file, *manifest.entries()[0].data_file());
335 }
336}