iceberg/spec/
snapshot.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18/*!
19 * Snapshots
20 */
21use std::collections::HashMap;
22use std::sync::Arc;
23
24use chrono::{DateTime, Utc};
25use serde::{Deserialize, Serialize};
26use typed_builder::TypedBuilder;
27
28use super::table_metadata::SnapshotLog;
29use crate::error::{Result, timestamp_ms_to_utc};
30use crate::io::FileIO;
31use crate::spec::{ManifestList, SchemaId, SchemaRef, TableMetadata};
32use crate::{Error, ErrorKind};
33
34/// The ref name of the main branch of the table.
35pub const MAIN_BRANCH: &str = "main";
36/// Placeholder for snapshot ID. The field with this value must be replaced with the actual snapshot ID before it is committed.
37pub const UNASSIGNED_SNAPSHOT_ID: i64 = -1;
38
39/// Reference to [`Snapshot`].
40pub type SnapshotRef = Arc<Snapshot>;
41#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone)]
42#[serde(rename_all = "lowercase")]
43/// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots.
44pub enum Operation {
45    /// Only data files were added and no files were removed.
46    #[default]
47    Append,
48    /// Data and delete files were added and removed without changing table data;
49    /// i.e., compaction, changing the data file format, or relocating data files.
50    Replace,
51    /// Data and delete files were added and removed in a logical overwrite operation.
52    Overwrite,
53    /// Data files were removed and their contents logically deleted and/or delete files were added to delete rows.
54    Delete,
55}
56
57impl Operation {
58    /// Returns the string representation (lowercase) of the operation.
59    pub fn as_str(&self) -> &str {
60        match self {
61            Operation::Append => "append",
62            Operation::Replace => "replace",
63            Operation::Overwrite => "overwrite",
64            Operation::Delete => "delete",
65        }
66    }
67}
68
69#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
70/// Summarises the changes in the snapshot.
71pub struct Summary {
72    /// The type of operation in the snapshot
73    pub operation: Operation,
74    /// Other summary data.
75    #[serde(flatten)]
76    pub additional_properties: HashMap<String, String>,
77}
78
79#[derive(Debug, PartialEq, Eq, Clone)]
80/// Row range of a snapshot, contains first_row_id and added_rows_count.
81pub struct SnapshotRowRange {
82    /// The first _row_id assigned to the first row in the first data file in the first manifest.
83    pub first_row_id: u64,
84    /// The upper bound of the number of rows with assigned row IDs
85    pub added_rows: u64,
86}
87
88#[derive(Debug, PartialEq, Eq, Clone, TypedBuilder)]
89#[builder(field_defaults(setter(prefix = "with_")))]
90/// A snapshot represents the state of a table at some time and is used to access the complete set of data files in the table.
91pub struct Snapshot {
92    /// A unique long ID
93    pub(crate) snapshot_id: i64,
94    /// The snapshot ID of the snapshot’s parent.
95    /// Omitted for any snapshot with no parent
96    #[builder(default = None)]
97    pub(crate) parent_snapshot_id: Option<i64>,
98    /// A monotonically increasing long that tracks the order of
99    /// changes to a table.
100    pub(crate) sequence_number: i64,
101    /// A timestamp when the snapshot was created, used for garbage
102    /// collection and table inspection
103    pub(crate) timestamp_ms: i64,
104    /// The location of a manifest list for this snapshot that
105    /// tracks manifest files with additional metadata.
106    /// Currently we only support manifest list file, and manifest files are not supported.
107    #[builder(setter(into))]
108    pub(crate) manifest_list: String,
109    /// A string map that summarizes the snapshot changes, including operation.
110    pub(crate) summary: Summary,
111    /// ID of the table’s current schema when the snapshot was created.
112    #[builder(setter(strip_option(fallback = schema_id_opt)), default = None)]
113    pub(crate) schema_id: Option<SchemaId>,
114    /// Encryption Key ID
115    #[builder(default)]
116    pub(crate) encryption_key_id: Option<String>,
117    /// Row range of this snapshot, required when the table version supports row lineage.
118    /// Specify as a tuple of (first_row_id, added_rows_count)
119    #[builder(default, setter(!strip_option, transform = |first_row_id: u64, added_rows: u64| Some(SnapshotRowRange { first_row_id, added_rows })))]
120    // This is specified as a struct instead of two separate fields to ensure that both fields are either set or not set.
121    // The java implementations uses two separate fields, then sets `added_row_counts` to Null if `first_row_id` is set to Null.
122    // It throws an error if `added_row_counts` is set but `first_row_id` is not set, or if either of the two is negative.
123    // We handle all cases infallible using the rust type system.
124    pub(crate) row_range: Option<SnapshotRowRange>,
125}
126
127impl Snapshot {
128    /// Get the id of the snapshot
129    #[inline]
130    pub fn snapshot_id(&self) -> i64 {
131        self.snapshot_id
132    }
133
134    /// Get parent snapshot id.
135    #[inline]
136    pub fn parent_snapshot_id(&self) -> Option<i64> {
137        self.parent_snapshot_id
138    }
139
140    /// Get sequence_number of the snapshot. Is 0 for Iceberg V1 tables.
141    #[inline]
142    pub fn sequence_number(&self) -> i64 {
143        self.sequence_number
144    }
145    /// Get location of manifest_list file
146    #[inline]
147    pub fn manifest_list(&self) -> &str {
148        &self.manifest_list
149    }
150
151    /// Get summary of the snapshot
152    #[inline]
153    pub fn summary(&self) -> &Summary {
154        &self.summary
155    }
156    /// Get the timestamp of when the snapshot was created
157    #[inline]
158    pub fn timestamp(&self) -> Result<DateTime<Utc>> {
159        timestamp_ms_to_utc(self.timestamp_ms)
160    }
161
162    /// Get the timestamp of when the snapshot was created in milliseconds
163    #[inline]
164    pub fn timestamp_ms(&self) -> i64 {
165        self.timestamp_ms
166    }
167
168    /// Get the schema id of this snapshot.
169    #[inline]
170    pub fn schema_id(&self) -> Option<SchemaId> {
171        self.schema_id
172    }
173
174    /// Get the schema of this snapshot.
175    pub fn schema(&self, table_metadata: &TableMetadata) -> Result<SchemaRef> {
176        Ok(match self.schema_id() {
177            Some(schema_id) => table_metadata
178                .schema_by_id(schema_id)
179                .ok_or_else(|| {
180                    Error::new(
181                        ErrorKind::DataInvalid,
182                        format!("Schema with id {schema_id} not found"),
183                    )
184                })?
185                .clone(),
186            None => table_metadata.current_schema().clone(),
187        })
188    }
189
190    /// Get parent snapshot.
191    #[cfg(test)]
192    pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option<SnapshotRef> {
193        match self.parent_snapshot_id {
194            Some(id) => table_metadata.snapshot_by_id(id).cloned(),
195            None => None,
196        }
197    }
198
199    /// Load manifest list.
200    pub async fn load_manifest_list(
201        &self,
202        file_io: &FileIO,
203        table_metadata: &TableMetadata,
204    ) -> Result<ManifestList> {
205        let manifest_list_content = file_io.new_input(&self.manifest_list)?.read().await?;
206        ManifestList::parse_with_version(
207            &manifest_list_content,
208            // TODO: You don't really need the version since you could just project any Avro in
209            // the version that you'd like to get (probably always the latest)
210            table_metadata.format_version(),
211        )
212    }
213
214    #[allow(dead_code)]
215    pub(crate) fn log(&self) -> SnapshotLog {
216        SnapshotLog {
217            timestamp_ms: self.timestamp_ms,
218            snapshot_id: self.snapshot_id,
219        }
220    }
221
222    /// The row-id of the first newly added row in this snapshot. All rows added in this snapshot will
223    /// have a row-id assigned to them greater than this value. All rows with a row-id less than this
224    /// value were created in a snapshot that was added to the table (but not necessarily committed to
225    /// this branch) in the past.
226    ///
227    /// This field is optional but is required when the table version supports row lineage.
228    pub fn first_row_id(&self) -> Option<u64> {
229        self.row_range.as_ref().map(|r| r.first_row_id)
230    }
231
232    /// The total number of newly added rows in this snapshot. It should be the summation of {@link
233    /// ManifestFile#ADDED_ROWS_COUNT} for every manifest added in this snapshot.
234    ///
235    /// This field is optional but is required when the table version supports row lineage.
236    pub fn added_rows_count(&self) -> Option<u64> {
237        self.row_range.as_ref().map(|r| r.added_rows)
238    }
239
240    /// Returns the row range of this snapshot, if available.
241    /// This is a tuple containing (first_row_id, added_rows_count).
242    pub fn row_range(&self) -> Option<(u64, u64)> {
243        self.row_range
244            .as_ref()
245            .map(|r| (r.first_row_id, r.added_rows))
246    }
247
248    /// Get encryption key id, if available.
249    pub fn encryption_key_id(&self) -> Option<&str> {
250        self.encryption_key_id.as_deref()
251    }
252}
253
254pub(super) mod _serde {
255    /// This is a helper module that defines types to help with serialization/deserialization.
256    /// For deserialization the input first gets read into either the [SnapshotV1] or [SnapshotV2] struct
257    /// and then converted into the [Snapshot] struct. Serialization works the other way around.
258    /// [SnapshotV1] and [SnapshotV2] are internal struct that are only used for serialization and deserialization.
259    use std::collections::HashMap;
260
261    use serde::{Deserialize, Serialize};
262
263    use super::{Operation, Snapshot, Summary};
264    use crate::spec::SchemaId;
265    use crate::spec::snapshot::SnapshotRowRange;
266    use crate::{Error, ErrorKind};
267
268    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
269    #[serde(rename_all = "kebab-case")]
270    /// Defines the structure of a v3 snapshot for serialization/deserialization
271    pub(crate) struct SnapshotV3 {
272        pub snapshot_id: i64,
273        #[serde(skip_serializing_if = "Option::is_none")]
274        pub parent_snapshot_id: Option<i64>,
275        pub sequence_number: i64,
276        pub timestamp_ms: i64,
277        pub manifest_list: String,
278        pub summary: Summary,
279        #[serde(skip_serializing_if = "Option::is_none")]
280        pub schema_id: Option<SchemaId>,
281        #[serde(skip_serializing_if = "Option::is_none")]
282        pub first_row_id: Option<u64>,
283        #[serde(skip_serializing_if = "Option::is_none")]
284        pub added_rows: Option<u64>,
285        #[serde(skip_serializing_if = "Option::is_none")]
286        pub key_id: Option<String>,
287    }
288
289    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
290    #[serde(rename_all = "kebab-case")]
291    /// Defines the structure of a v2 snapshot for serialization/deserialization
292    pub(crate) struct SnapshotV2 {
293        pub snapshot_id: i64,
294        #[serde(skip_serializing_if = "Option::is_none")]
295        pub parent_snapshot_id: Option<i64>,
296        pub sequence_number: i64,
297        pub timestamp_ms: i64,
298        pub manifest_list: String,
299        pub summary: Summary,
300        #[serde(skip_serializing_if = "Option::is_none")]
301        pub schema_id: Option<SchemaId>,
302    }
303
304    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
305    #[serde(rename_all = "kebab-case")]
306    /// Defines the structure of a v1 snapshot for serialization/deserialization
307    pub(crate) struct SnapshotV1 {
308        pub snapshot_id: i64,
309        #[serde(skip_serializing_if = "Option::is_none")]
310        pub parent_snapshot_id: Option<i64>,
311        pub timestamp_ms: i64,
312        #[serde(skip_serializing_if = "Option::is_none")]
313        pub manifest_list: Option<String>,
314        #[serde(skip_serializing_if = "Option::is_none")]
315        pub manifests: Option<Vec<String>>,
316        #[serde(skip_serializing_if = "Option::is_none")]
317        pub summary: Option<Summary>,
318        #[serde(skip_serializing_if = "Option::is_none")]
319        pub schema_id: Option<SchemaId>,
320    }
321
322    impl From<SnapshotV3> for Snapshot {
323        fn from(s: SnapshotV3) -> Self {
324            Snapshot {
325                snapshot_id: s.snapshot_id,
326                parent_snapshot_id: s.parent_snapshot_id,
327                sequence_number: s.sequence_number,
328                timestamp_ms: s.timestamp_ms,
329                manifest_list: s.manifest_list,
330                summary: s.summary,
331                schema_id: s.schema_id,
332                encryption_key_id: s.key_id,
333                row_range: match (s.first_row_id, s.added_rows) {
334                    (Some(first_row_id), Some(added_rows)) => Some(SnapshotRowRange {
335                        first_row_id,
336                        added_rows,
337                    }),
338                    _ => None,
339                },
340            }
341        }
342    }
343
344    impl TryFrom<Snapshot> for SnapshotV3 {
345        type Error = Error;
346
347        fn try_from(s: Snapshot) -> Result<Self, Self::Error> {
348            let (first_row_id, added_rows) = match s.row_range {
349                Some(row_range) => (Some(row_range.first_row_id), Some(row_range.added_rows)),
350                None => (None, None),
351            };
352
353            Ok(SnapshotV3 {
354                snapshot_id: s.snapshot_id,
355                parent_snapshot_id: s.parent_snapshot_id,
356                sequence_number: s.sequence_number,
357                timestamp_ms: s.timestamp_ms,
358                manifest_list: s.manifest_list,
359                summary: s.summary,
360                schema_id: s.schema_id,
361                first_row_id,
362                added_rows,
363                key_id: s.encryption_key_id,
364            })
365        }
366    }
367
368    impl From<SnapshotV2> for Snapshot {
369        fn from(v2: SnapshotV2) -> Self {
370            Snapshot {
371                snapshot_id: v2.snapshot_id,
372                parent_snapshot_id: v2.parent_snapshot_id,
373                sequence_number: v2.sequence_number,
374                timestamp_ms: v2.timestamp_ms,
375                manifest_list: v2.manifest_list,
376                summary: v2.summary,
377                schema_id: v2.schema_id,
378                encryption_key_id: None,
379                row_range: None,
380            }
381        }
382    }
383
384    impl From<Snapshot> for SnapshotV2 {
385        fn from(v2: Snapshot) -> Self {
386            SnapshotV2 {
387                snapshot_id: v2.snapshot_id,
388                parent_snapshot_id: v2.parent_snapshot_id,
389                sequence_number: v2.sequence_number,
390                timestamp_ms: v2.timestamp_ms,
391                manifest_list: v2.manifest_list,
392                summary: v2.summary,
393                schema_id: v2.schema_id,
394            }
395        }
396    }
397
398    impl TryFrom<SnapshotV1> for Snapshot {
399        type Error = Error;
400
401        fn try_from(v1: SnapshotV1) -> Result<Self, Self::Error> {
402            Ok(Snapshot {
403                snapshot_id: v1.snapshot_id,
404                parent_snapshot_id: v1.parent_snapshot_id,
405                sequence_number: 0,
406                timestamp_ms: v1.timestamp_ms,
407                manifest_list: match (v1.manifest_list, v1.manifests) {
408                    (Some(file), None) => file,
409                    (Some(_), Some(_)) => {
410                        return Err(Error::new(
411                            ErrorKind::DataInvalid,
412                            "Invalid v1 snapshot, when manifest list provided, manifest files should be omitted",
413                        ));
414                    }
415                    (None, _) => {
416                        return Err(Error::new(
417                            ErrorKind::DataInvalid,
418                            "Unsupported v1 snapshot, only manifest list is supported",
419                        ));
420                    }
421                },
422                summary: v1.summary.unwrap_or(Summary {
423                    operation: Operation::default(),
424                    additional_properties: HashMap::new(),
425                }),
426                schema_id: v1.schema_id,
427                encryption_key_id: None,
428                row_range: None,
429            })
430        }
431    }
432
433    impl From<Snapshot> for SnapshotV1 {
434        fn from(v2: Snapshot) -> Self {
435            SnapshotV1 {
436                snapshot_id: v2.snapshot_id,
437                parent_snapshot_id: v2.parent_snapshot_id,
438                timestamp_ms: v2.timestamp_ms,
439                manifest_list: Some(v2.manifest_list),
440                summary: Some(v2.summary),
441                schema_id: v2.schema_id,
442                manifests: None,
443            }
444        }
445    }
446}
447
448#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
449#[serde(rename_all = "kebab-case")]
450/// Iceberg tables keep track of branches and tags using snapshot references.
451pub struct SnapshotReference {
452    /// A reference’s snapshot ID. The tagged snapshot or latest snapshot of a branch.
453    pub snapshot_id: i64,
454    #[serde(flatten)]
455    /// Snapshot retention policy
456    pub retention: SnapshotRetention,
457}
458
459impl SnapshotReference {
460    /// Returns true if the snapshot reference is a branch.
461    pub fn is_branch(&self) -> bool {
462        matches!(self.retention, SnapshotRetention::Branch { .. })
463    }
464}
465
466impl SnapshotReference {
467    /// Create new snapshot reference
468    pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self {
469        SnapshotReference {
470            snapshot_id,
471            retention,
472        }
473    }
474}
475
476#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
477#[serde(rename_all = "lowercase", tag = "type")]
478/// The snapshot expiration procedure removes snapshots from table metadata and applies the table’s retention policy.
479pub enum SnapshotRetention {
480    #[serde(rename_all = "kebab-case")]
481    /// Branches are mutable named references that can be updated by committing a new snapshot as
482    /// the branch’s referenced snapshot using the Commit Conflict Resolution and Retry procedures.
483    Branch {
484        /// A positive number for the minimum number of snapshots to keep in a branch while expiring snapshots.
485        /// Defaults to table property history.expire.min-snapshots-to-keep.
486        #[serde(skip_serializing_if = "Option::is_none")]
487        min_snapshots_to_keep: Option<i32>,
488        /// A positive number for the max age of snapshots to keep when expiring, including the latest snapshot.
489        /// Defaults to table property history.expire.max-snapshot-age-ms.
490        #[serde(skip_serializing_if = "Option::is_none")]
491        max_snapshot_age_ms: Option<i64>,
492        /// For snapshot references except the main branch, a positive number for the max age of the snapshot reference to keep while expiring snapshots.
493        /// Defaults to table property history.expire.max-ref-age-ms. The main branch never expires.
494        #[serde(skip_serializing_if = "Option::is_none")]
495        max_ref_age_ms: Option<i64>,
496    },
497    #[serde(rename_all = "kebab-case")]
498    /// Tags are labels for individual snapshots.
499    Tag {
500        /// For snapshot references except the main branch, a positive number for the max age of the snapshot reference to keep while expiring snapshots.
501        /// Defaults to table property history.expire.max-ref-age-ms. The main branch never expires.
502        #[serde(skip_serializing_if = "Option::is_none")]
503        max_ref_age_ms: Option<i64>,
504    },
505}
506
507impl SnapshotRetention {
508    /// Create a new branch retention policy
509    pub fn branch(
510        min_snapshots_to_keep: Option<i32>,
511        max_snapshot_age_ms: Option<i64>,
512        max_ref_age_ms: Option<i64>,
513    ) -> Self {
514        SnapshotRetention::Branch {
515            min_snapshots_to_keep,
516            max_snapshot_age_ms,
517            max_ref_age_ms,
518        }
519    }
520}
521
522#[cfg(test)]
523mod tests {
524    use std::collections::HashMap;
525
526    use chrono::{TimeZone, Utc};
527
528    use crate::spec::TableMetadata;
529    use crate::spec::snapshot::_serde::SnapshotV1;
530    use crate::spec::snapshot::{Operation, Snapshot, Summary};
531
532    #[test]
533    fn schema() {
534        let record = r#"
535        {
536            "snapshot-id": 3051729675574597004,
537            "timestamp-ms": 1515100955770,
538            "summary": {
539                "operation": "append"
540            },
541            "manifest-list": "s3://b/wh/.../s1.avro",
542            "schema-id": 0
543        }
544        "#;
545
546        let result: Snapshot = serde_json::from_str::<SnapshotV1>(record)
547            .unwrap()
548            .try_into()
549            .unwrap();
550        assert_eq!(3051729675574597004, result.snapshot_id());
551        assert_eq!(
552            Utc.timestamp_millis_opt(1515100955770).unwrap(),
553            result.timestamp().unwrap()
554        );
555        assert_eq!(1515100955770, result.timestamp_ms());
556        assert_eq!(
557            Summary {
558                operation: Operation::Append,
559                additional_properties: HashMap::new()
560            },
561            *result.summary()
562        );
563        assert_eq!("s3://b/wh/.../s1.avro".to_string(), *result.manifest_list());
564    }
565
566    #[test]
567    fn test_snapshot_v1_to_v2_projection() {
568        use crate::spec::snapshot::_serde::SnapshotV1;
569
570        // Create a V1 snapshot (without sequence-number field)
571        let v1_snapshot = SnapshotV1 {
572            snapshot_id: 1234567890,
573            parent_snapshot_id: Some(987654321),
574            timestamp_ms: 1515100955770,
575            manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
576            manifests: None, // V1 can have either manifest_list or manifests, but not both
577            summary: Some(Summary {
578                operation: Operation::Append,
579                additional_properties: HashMap::from([
580                    ("added-files".to_string(), "5".to_string()),
581                    ("added-records".to_string(), "100".to_string()),
582                ]),
583            }),
584            schema_id: Some(1),
585        };
586
587        // Convert V1 to V2 - this should apply defaults for missing V2 fields
588        let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
589
590        // Verify V1→V2 projection defaults are applied correctly
591        assert_eq!(
592            v2_snapshot.sequence_number(),
593            0,
594            "V1 snapshot sequence_number should default to 0"
595        );
596
597        // Verify other fields are preserved correctly during conversion
598        assert_eq!(v2_snapshot.snapshot_id(), 1234567890);
599        assert_eq!(v2_snapshot.parent_snapshot_id(), Some(987654321));
600        assert_eq!(v2_snapshot.timestamp_ms(), 1515100955770);
601        assert_eq!(
602            v2_snapshot.manifest_list(),
603            "s3://bucket/manifest-list.avro"
604        );
605        assert_eq!(v2_snapshot.schema_id(), Some(1));
606        assert_eq!(v2_snapshot.summary().operation, Operation::Append);
607        assert_eq!(
608            v2_snapshot
609                .summary()
610                .additional_properties
611                .get("added-files"),
612            Some(&"5".to_string())
613        );
614    }
615
616    #[test]
617    fn test_v1_snapshot_with_manifest_list_and_manifests() {
618        {
619            let metadata = r#"
620    {
621        "format-version": 1,
622        "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
623        "location": "s3://bucket/test/location",
624        "last-updated-ms": 1700000000000,
625        "last-column-id": 1,
626        "schema": {
627            "type": "struct",
628            "fields": [
629                {"id": 1, "name": "x", "required": true, "type": "long"}
630            ]
631        },
632        "partition-spec": [],
633        "properties": {},
634        "current-snapshot-id": 111111111,
635        "snapshots": [
636            {
637                "snapshot-id": 111111111,
638                "timestamp-ms": 1600000000000,
639                "summary": {"operation": "append"},
640                "manifest-list": "s3://bucket/metadata/snap-123.avro",
641                "manifests": ["s3://bucket/metadata/manifest-1.avro"]
642            }
643        ]
644    }
645    "#;
646
647            let result_both_manifest_list_and_manifest_set =
648                serde_json::from_str::<TableMetadata>(metadata);
649            assert!(result_both_manifest_list_and_manifest_set.is_err());
650            assert_eq!(
651                result_both_manifest_list_and_manifest_set
652                    .unwrap_err()
653                    .to_string(),
654                "DataInvalid => Invalid v1 snapshot, when manifest list provided, manifest files should be omitted"
655            )
656        }
657
658        {
659            let metadata = r#"
660    {
661        "format-version": 1,
662        "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
663        "location": "s3://bucket/test/location",
664        "last-updated-ms": 1700000000000,
665        "last-column-id": 1,
666        "schema": {
667            "type": "struct",
668            "fields": [
669                {"id": 1, "name": "x", "required": true, "type": "long"}
670            ]
671        },
672        "partition-spec": [],
673        "properties": {},
674        "current-snapshot-id": 111111111,
675        "snapshots": [
676            {
677                "snapshot-id": 111111111,
678                "timestamp-ms": 1600000000000,
679                "summary": {"operation": "append"},
680                "manifests": ["s3://bucket/metadata/manifest-1.avro"]
681            }
682        ]
683    }
684    "#;
685            let result_missing_manifest_list = serde_json::from_str::<TableMetadata>(metadata);
686            assert!(result_missing_manifest_list.is_err());
687            assert_eq!(
688                result_missing_manifest_list.unwrap_err().to_string(),
689                "DataInvalid => Unsupported v1 snapshot, only manifest list is supported"
690            )
691        }
692    }
693
694    #[test]
695    fn test_snapshot_v1_to_v2_with_missing_summary() {
696        use crate::spec::snapshot::_serde::SnapshotV1;
697
698        // Create a V1 snapshot without summary (should get default)
699        let v1_snapshot = SnapshotV1 {
700            snapshot_id: 1111111111,
701            parent_snapshot_id: None,
702            timestamp_ms: 1515100955770,
703            manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
704            manifests: None,
705            summary: None, // V1 summary is optional
706            schema_id: None,
707        };
708
709        // Convert V1 to V2 - this should apply default summary
710        let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
711
712        // Verify defaults are applied correctly
713        assert_eq!(
714            v2_snapshot.sequence_number(),
715            0,
716            "V1 snapshot sequence_number should default to 0"
717        );
718        assert_eq!(
719            v2_snapshot.summary().operation,
720            Operation::Append,
721            "Missing V1 summary should default to Append operation"
722        );
723        assert!(
724            v2_snapshot.summary().additional_properties.is_empty(),
725            "Default summary should have empty additional_properties"
726        );
727
728        // Verify other fields
729        assert_eq!(v2_snapshot.snapshot_id(), 1111111111);
730        assert_eq!(v2_snapshot.parent_snapshot_id(), None);
731        assert_eq!(v2_snapshot.schema_id(), None);
732    }
733}