iceberg/transaction/
snapshot.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19use std::future::Future;
20use std::ops::RangeFrom;
21
22use uuid::Uuid;
23
24use crate::error::Result;
25use crate::spec::{
26    DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry,
27    ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, Snapshot,
28    SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, Summary,
29    TableProperties, update_snapshot_summaries,
30};
31use crate::table::Table;
32use crate::transaction::ActionCommit;
33use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
34
35const META_ROOT_PATH: &str = "metadata";
36
37/// A trait that defines how different table operations produce new snapshots.
38///
39/// `SnapshotProduceOperation` is used by [`SnapshotProducer`] to customize snapshot creation
40/// based on the type of operation being performed (e.g., `Append`, `Overwrite`, `Delete`, etc.).
41/// Each operation type implements this trait to specify:
42/// - Which operation type to record in the snapshot summary
43/// - Which existing manifest files should be included in the new snapshot
44/// - Which manifest entries should be marked as deleted
45///
46/// # When it accomplishes
47///
48/// This trait is used during the snapshot creation process in [`SnapshotProducer::commit()`]:
49///
50/// 1. **Operation Type Recording**: The `operation()` method determines which operation type
51///    (e.g., `Operation::Append`, `Operation::Overwrite`) is recorded in the snapshot summary.
52///    This metadata helps track what kind of change was made to the table.
53///
54/// 2. **Manifest File Selection**: The `existing_manifest()` method determines which existing
55///    manifest files from the current snapshot should be carried forward to the new snapshot.
56///    For example:
57///    - An `Append` operation typically includes all existing manifests plus new ones
58///    - An `Overwrite` operation might exclude manifests for partitions being overwritten
59///
60/// 3. **Delete Entry Processing**: The `delete_entries()` method is intended for future delete
61///    operations to specify which manifest entries should be marked as deleted.
62pub(crate) trait SnapshotProduceOperation: Send + Sync {
63    /// Returns the operation type that will be recorded in the snapshot summary.
64    ///
65    /// This determines what kind of operation is being performed (e.g., `Append`, `Overwrite`),
66    /// which is stored in the snapshot metadata for tracking and auditing purposes.
67    fn operation(&self) -> Operation;
68
69    /// Returns manifest entries that should be marked as deleted in the new snapshot.
70    #[allow(unused)]
71    fn delete_entries(
72        &self,
73        snapshot_produce: &SnapshotProducer,
74    ) -> impl Future<Output = Result<Vec<ManifestEntry>>> + Send;
75
76    /// Returns existing manifest files that should be included in the new snapshot.
77    ///
78    /// This method determines which manifest files from the current snapshot should be
79    /// carried forward to the new snapshot. The selection depends on the operation type:
80    ///
81    /// - **Append operations**: Typically include all existing manifests
82    /// - **Overwrite operations**: May exclude manifests for partitions being overwritten
83    /// - **Delete operations**: May exclude manifests for partitions being deleted
84    fn existing_manifest(
85        &self,
86        snapshot_produce: &SnapshotProducer<'_>,
87    ) -> impl Future<Output = Result<Vec<ManifestFile>>> + Send;
88}
89
90pub(crate) struct DefaultManifestProcess;
91
92impl ManifestProcess for DefaultManifestProcess {
93    fn process_manifests(
94        &self,
95        _snapshot_produce: &SnapshotProducer<'_>,
96        manifests: Vec<ManifestFile>,
97    ) -> Vec<ManifestFile> {
98        manifests
99    }
100}
101
102pub(crate) trait ManifestProcess: Send + Sync {
103    fn process_manifests(
104        &self,
105        snapshot_produce: &SnapshotProducer<'_>,
106        manifests: Vec<ManifestFile>,
107    ) -> Vec<ManifestFile>;
108}
109
110pub(crate) struct SnapshotProducer<'a> {
111    pub(crate) table: &'a Table,
112    snapshot_id: i64,
113    commit_uuid: Uuid,
114    key_metadata: Option<Vec<u8>>,
115    snapshot_properties: HashMap<String, String>,
116    added_data_files: Vec<DataFile>,
117    // A counter used to generate unique manifest file names.
118    // It starts from 0 and increments for each new manifest file.
119    // Note: This counter is limited to the range of (0..u64::MAX).
120    manifest_counter: RangeFrom<u64>,
121}
122
123impl<'a> SnapshotProducer<'a> {
124    pub(crate) fn new(
125        table: &'a Table,
126        commit_uuid: Uuid,
127        key_metadata: Option<Vec<u8>>,
128        snapshot_properties: HashMap<String, String>,
129        added_data_files: Vec<DataFile>,
130    ) -> Self {
131        Self {
132            table,
133            snapshot_id: Self::generate_unique_snapshot_id(table),
134            commit_uuid,
135            key_metadata,
136            snapshot_properties,
137            added_data_files,
138            manifest_counter: (0..),
139        }
140    }
141
142    pub(crate) fn validate_added_data_files(&self) -> Result<()> {
143        for data_file in &self.added_data_files {
144            if data_file.content_type() != crate::spec::DataContentType::Data {
145                return Err(Error::new(
146                    ErrorKind::DataInvalid,
147                    "Only data content type is allowed for fast append",
148                ));
149            }
150            // Check if the data file partition spec id matches the table default partition spec id.
151            if self.table.metadata().default_partition_spec_id() != data_file.partition_spec_id {
152                return Err(Error::new(
153                    ErrorKind::DataInvalid,
154                    "Data file partition spec id does not match table default partition spec id",
155                ));
156            }
157            Self::validate_partition_value(
158                data_file.partition(),
159                self.table.metadata().default_partition_type(),
160            )?;
161        }
162
163        Ok(())
164    }
165
166    pub(crate) async fn validate_duplicate_files(&self) -> Result<()> {
167        let new_files: HashSet<&str> = self
168            .added_data_files
169            .iter()
170            .map(|df| df.file_path.as_str())
171            .collect();
172
173        let mut referenced_files = Vec::new();
174        if let Some(current_snapshot) = self.table.metadata().current_snapshot() {
175            let manifest_list = current_snapshot
176                .load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
177                .await?;
178            for manifest_list_entry in manifest_list.entries() {
179                let manifest = manifest_list_entry
180                    .load_manifest(self.table.file_io())
181                    .await?;
182                for entry in manifest.entries() {
183                    let file_path = entry.file_path();
184                    if new_files.contains(file_path) && entry.is_alive() {
185                        referenced_files.push(file_path.to_string());
186                    }
187                }
188            }
189        }
190
191        if !referenced_files.is_empty() {
192            return Err(Error::new(
193                ErrorKind::DataInvalid,
194                format!(
195                    "Cannot add files that are already referenced by table, files: {}",
196                    referenced_files.join(", ")
197                ),
198            ));
199        }
200
201        Ok(())
202    }
203
204    fn generate_unique_snapshot_id(table: &Table) -> i64 {
205        let generate_random_id = || -> i64 {
206            let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
207            let snapshot_id = (lhs ^ rhs) as i64;
208            if snapshot_id < 0 {
209                -snapshot_id
210            } else {
211                snapshot_id
212            }
213        };
214        let mut snapshot_id = generate_random_id();
215
216        while table
217            .metadata()
218            .snapshots()
219            .any(|s| s.snapshot_id() == snapshot_id)
220        {
221            snapshot_id = generate_random_id();
222        }
223        snapshot_id
224    }
225
226    fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result<ManifestWriter> {
227        let new_manifest_path = format!(
228            "{}/{}/{}-m{}.{}",
229            self.table.metadata().location(),
230            META_ROOT_PATH,
231            self.commit_uuid,
232            self.manifest_counter.next().unwrap(),
233            DataFileFormat::Avro
234        );
235        let output_file = self.table.file_io().new_output(new_manifest_path)?;
236        let builder = ManifestWriterBuilder::new(
237            output_file,
238            Some(self.snapshot_id),
239            self.key_metadata.clone(),
240            self.table.metadata().current_schema().clone(),
241            self.table
242                .metadata()
243                .default_partition_spec()
244                .as_ref()
245                .clone(),
246        );
247        match self.table.metadata().format_version() {
248            FormatVersion::V1 => Ok(builder.build_v1()),
249            FormatVersion::V2 => match content {
250                ManifestContentType::Data => Ok(builder.build_v2_data()),
251                ManifestContentType::Deletes => Ok(builder.build_v2_deletes()),
252            },
253            FormatVersion::V3 => match content {
254                ManifestContentType::Data => Ok(builder.build_v3_data()),
255                ManifestContentType::Deletes => Ok(builder.build_v3_deletes()),
256            },
257        }
258    }
259
260    // Check if the partition value is compatible with the partition type.
261    fn validate_partition_value(
262        partition_value: &Struct,
263        partition_type: &StructType,
264    ) -> Result<()> {
265        if partition_value.fields().len() != partition_type.fields().len() {
266            return Err(Error::new(
267                ErrorKind::DataInvalid,
268                "Partition value is not compatible with partition type",
269            ));
270        }
271
272        for (value, field) in partition_value.fields().iter().zip(partition_type.fields()) {
273            let field = field.field_type.as_primitive_type().ok_or_else(|| {
274                Error::new(
275                    ErrorKind::Unexpected,
276                    "Partition field should only be primitive type.",
277                )
278            })?;
279            if let Some(value) = value
280                && !field.compatible(&value.as_primitive_literal().unwrap())
281            {
282                return Err(Error::new(
283                    ErrorKind::DataInvalid,
284                    "Partition value is not compatible partition type",
285                ));
286            }
287        }
288        Ok(())
289    }
290
291    // Write manifest file for added data files and return the ManifestFile for ManifestList.
292    async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
293        let added_data_files = std::mem::take(&mut self.added_data_files);
294        if added_data_files.is_empty() {
295            return Err(Error::new(
296                ErrorKind::PreconditionFailed,
297                "No added data files found when write an added manifest file",
298            ));
299        }
300
301        let snapshot_id = self.snapshot_id;
302        let format_version = self.table.metadata().format_version();
303        let manifest_entries = added_data_files.into_iter().map(|data_file| {
304            let builder = ManifestEntry::builder()
305                .status(crate::spec::ManifestStatus::Added)
306                .data_file(data_file);
307            if format_version == FormatVersion::V1 {
308                builder.snapshot_id(snapshot_id).build()
309            } else {
310                // For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when
311                // commit failed.
312                builder.build()
313            }
314        });
315        let mut writer = self.new_manifest_writer(ManifestContentType::Data)?;
316        for entry in manifest_entries {
317            writer.add_entry(entry)?;
318        }
319        writer.write_manifest_file().await
320    }
321
322    async fn manifest_file<OP: SnapshotProduceOperation, MP: ManifestProcess>(
323        &mut self,
324        snapshot_produce_operation: &OP,
325        manifest_process: &MP,
326    ) -> Result<Vec<ManifestFile>> {
327        // Assert current snapshot producer contains new content to add to new snapshot.
328        //
329        // TODO: Allowing snapshot property setup with no added data files is a workaround.
330        // We should clean it up after all necessary actions are supported.
331        // For details, please refer to https://github.com/apache/iceberg-rust/issues/1548
332        if self.added_data_files.is_empty() && self.snapshot_properties.is_empty() {
333            return Err(Error::new(
334                ErrorKind::PreconditionFailed,
335                "No added data files or added snapshot properties found when write a manifest file",
336            ));
337        }
338
339        let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?;
340        let mut manifest_files = existing_manifests;
341
342        // Process added entries.
343        if !self.added_data_files.is_empty() {
344            let added_manifest = self.write_added_manifest().await?;
345            manifest_files.push(added_manifest);
346        }
347
348        // # TODO
349        // Support process delete entries.
350
351        let manifest_files = manifest_process.process_manifests(self, manifest_files);
352        Ok(manifest_files)
353    }
354
355    // Returns a `Summary` of the current snapshot
356    fn summary<OP: SnapshotProduceOperation>(
357        &self,
358        snapshot_produce_operation: &OP,
359    ) -> Result<Summary> {
360        let mut summary_collector = SnapshotSummaryCollector::default();
361        let table_metadata = self.table.metadata_ref();
362
363        let partition_summary_limit = if let Some(limit) = table_metadata
364            .properties()
365            .get(TableProperties::PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT)
366        {
367            if let Ok(limit) = limit.parse::<u64>() {
368                limit
369            } else {
370                TableProperties::PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
371            }
372        } else {
373            TableProperties::PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
374        };
375
376        summary_collector.set_partition_summary_limit(partition_summary_limit);
377
378        for data_file in &self.added_data_files {
379            summary_collector.add_file(
380                data_file,
381                table_metadata.current_schema().clone(),
382                table_metadata.default_partition_spec().clone(),
383            );
384        }
385
386        let previous_snapshot = table_metadata
387            .snapshot_by_id(self.snapshot_id)
388            .and_then(|snapshot| snapshot.parent_snapshot_id())
389            .and_then(|parent_id| table_metadata.snapshot_by_id(parent_id));
390
391        let mut additional_properties = summary_collector.build();
392        additional_properties.extend(self.snapshot_properties.clone());
393
394        let summary = Summary {
395            operation: snapshot_produce_operation.operation(),
396            additional_properties,
397        };
398
399        update_snapshot_summaries(
400            summary,
401            previous_snapshot.map(|s| s.summary()),
402            snapshot_produce_operation.operation() == Operation::Overwrite,
403        )
404    }
405
406    fn generate_manifest_list_file_path(&self, attempt: i64) -> String {
407        format!(
408            "{}/{}/snap-{}-{}-{}.{}",
409            self.table.metadata().location(),
410            META_ROOT_PATH,
411            self.snapshot_id,
412            attempt,
413            self.commit_uuid,
414            DataFileFormat::Avro
415        )
416    }
417
418    /// Finished building the action and return the [`ActionCommit`] to the transaction.
419    pub(crate) async fn commit<OP: SnapshotProduceOperation, MP: ManifestProcess>(
420        mut self,
421        snapshot_produce_operation: OP,
422        process: MP,
423    ) -> Result<ActionCommit> {
424        let manifest_list_path = self.generate_manifest_list_file_path(0);
425        let next_seq_num = self.table.metadata().next_sequence_number();
426        let first_row_id = self.table.metadata().next_row_id();
427        let mut manifest_list_writer = match self.table.metadata().format_version() {
428            FormatVersion::V1 => ManifestListWriter::v1(
429                self.table
430                    .file_io()
431                    .new_output(manifest_list_path.clone())?,
432                self.snapshot_id,
433                self.table.metadata().current_snapshot_id(),
434            ),
435            FormatVersion::V2 => ManifestListWriter::v2(
436                self.table
437                    .file_io()
438                    .new_output(manifest_list_path.clone())?,
439                self.snapshot_id,
440                self.table.metadata().current_snapshot_id(),
441                next_seq_num,
442            ),
443            FormatVersion::V3 => ManifestListWriter::v3(
444                self.table
445                    .file_io()
446                    .new_output(manifest_list_path.clone())?,
447                self.snapshot_id,
448                self.table.metadata().current_snapshot_id(),
449                next_seq_num,
450                Some(first_row_id),
451            ),
452        };
453
454        // Calling self.summary() before self.manifest_file() is important because self.added_data_files
455        // will be set to an empty vec after self.manifest_file() returns, resulting in an empty summary
456        // being generated.
457        let summary = self.summary(&snapshot_produce_operation).map_err(|err| {
458            Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.").with_source(err)
459        })?;
460
461        let new_manifests = self
462            .manifest_file(&snapshot_produce_operation, &process)
463            .await?;
464
465        manifest_list_writer.add_manifests(new_manifests.into_iter())?;
466        let writer_next_row_id = manifest_list_writer.next_row_id();
467        manifest_list_writer.close().await?;
468
469        let commit_ts = chrono::Utc::now().timestamp_millis();
470        let new_snapshot = Snapshot::builder()
471            .with_manifest_list(manifest_list_path)
472            .with_snapshot_id(self.snapshot_id)
473            .with_parent_snapshot_id(self.table.metadata().current_snapshot_id())
474            .with_sequence_number(next_seq_num)
475            .with_summary(summary)
476            .with_schema_id(self.table.metadata().current_schema_id())
477            .with_timestamp_ms(commit_ts);
478
479        let new_snapshot = if let Some(writer_next_row_id) = writer_next_row_id {
480            let assigned_rows = writer_next_row_id - self.table.metadata().next_row_id();
481            new_snapshot
482                .with_row_range(first_row_id, assigned_rows)
483                .build()
484        } else {
485            new_snapshot.build()
486        };
487
488        let updates = vec![
489            TableUpdate::AddSnapshot {
490                snapshot: new_snapshot,
491            },
492            TableUpdate::SetSnapshotRef {
493                ref_name: MAIN_BRANCH.to_string(),
494                reference: SnapshotReference::new(
495                    self.snapshot_id,
496                    SnapshotRetention::branch(None, None, None),
497                ),
498            },
499        ];
500
501        let requirements = vec![
502            TableRequirement::UuidMatch {
503                uuid: self.table.metadata().uuid(),
504            },
505            TableRequirement::RefSnapshotIdMatch {
506                r#ref: MAIN_BRANCH.to_string(),
507                snapshot_id: self.table.metadata().current_snapshot_id(),
508            },
509        ];
510
511        Ok(ActionCommit::new(updates, requirements))
512    }
513}