iceberg/spec/
snapshot.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18/*!
19 * Snapshots
20 */
21use std::collections::HashMap;
22use std::sync::Arc;
23
24use chrono::{DateTime, Utc};
25use serde::{Deserialize, Serialize};
26use typed_builder::TypedBuilder;
27
28use super::table_metadata::SnapshotLog;
29use crate::error::{Result, timestamp_ms_to_utc};
30use crate::io::FileIO;
31use crate::spec::{ManifestList, SchemaId, SchemaRef, TableMetadata};
32use crate::{Error, ErrorKind};
33
34/// The ref name of the main branch of the table.
35pub const MAIN_BRANCH: &str = "main";
36
37/// Reference to [`Snapshot`].
38pub type SnapshotRef = Arc<Snapshot>;
39#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone)]
40#[serde(rename_all = "lowercase")]
41/// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots.
42pub enum Operation {
43    /// Only data files were added and no files were removed.
44    #[default]
45    Append,
46    /// Data and delete files were added and removed without changing table data;
47    /// i.e., compaction, changing the data file format, or relocating data files.
48    Replace,
49    /// Data and delete files were added and removed in a logical overwrite operation.
50    Overwrite,
51    /// Data files were removed and their contents logically deleted and/or delete files were added to delete rows.
52    Delete,
53}
54
55impl Operation {
56    /// Returns the string representation (lowercase) of the operation.
57    pub fn as_str(&self) -> &str {
58        match self {
59            Operation::Append => "append",
60            Operation::Replace => "replace",
61            Operation::Overwrite => "overwrite",
62            Operation::Delete => "delete",
63        }
64    }
65}
66
67#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
68/// Summarises the changes in the snapshot.
69pub struct Summary {
70    /// The type of operation in the snapshot
71    pub operation: Operation,
72    /// Other summary data.
73    #[serde(flatten)]
74    pub additional_properties: HashMap<String, String>,
75}
76
77#[derive(Debug, PartialEq, Eq, Clone)]
78/// Row range of a snapshot, contains first_row_id and added_rows_count.
79pub struct SnapshotRowRange {
80    /// The first _row_id assigned to the first row in the first data file in the first manifest.
81    pub first_row_id: u64,
82    /// The upper bound of the number of rows with assigned row IDs
83    pub added_rows: u64,
84}
85
86#[derive(Debug, PartialEq, Eq, Clone, TypedBuilder)]
87#[builder(field_defaults(setter(prefix = "with_")))]
88/// A snapshot represents the state of a table at some time and is used to access the complete set of data files in the table.
89pub struct Snapshot {
90    /// A unique long ID
91    pub(crate) snapshot_id: i64,
92    /// The snapshot ID of the snapshot’s parent.
93    /// Omitted for any snapshot with no parent
94    #[builder(default = None)]
95    pub(crate) parent_snapshot_id: Option<i64>,
96    /// A monotonically increasing long that tracks the order of
97    /// changes to a table.
98    pub(crate) sequence_number: i64,
99    /// A timestamp when the snapshot was created, used for garbage
100    /// collection and table inspection
101    pub(crate) timestamp_ms: i64,
102    /// The location of a manifest list for this snapshot that
103    /// tracks manifest files with additional metadata.
104    /// Currently we only support manifest list file, and manifest files are not supported.
105    #[builder(setter(into))]
106    pub(crate) manifest_list: String,
107    /// A string map that summarizes the snapshot changes, including operation.
108    pub(crate) summary: Summary,
109    /// ID of the table’s current schema when the snapshot was created.
110    #[builder(setter(strip_option(fallback = schema_id_opt)), default = None)]
111    pub(crate) schema_id: Option<SchemaId>,
112    /// Encryption Key ID
113    #[builder(default)]
114    pub(crate) encryption_key_id: Option<String>,
115    /// Row range of this snapshot, required when the table version supports row lineage.
116    /// Specify as a tuple of (first_row_id, added_rows_count)
117    #[builder(default, setter(!strip_option, transform = |first_row_id: u64, added_rows: u64| Some(SnapshotRowRange { first_row_id, added_rows })))]
118    // This is specified as a struct instead of two separate fields to ensure that both fields are either set or not set.
119    // The java implementations uses two separate fields, then sets `added_row_counts` to Null if `first_row_id` is set to Null.
120    // It throws an error if `added_row_counts` is set but `first_row_id` is not set, or if either of the two is negative.
121    // We handle all cases infallible using the rust type system.
122    pub(crate) row_range: Option<SnapshotRowRange>,
123}
124
125impl Snapshot {
126    /// Get the id of the snapshot
127    #[inline]
128    pub fn snapshot_id(&self) -> i64 {
129        self.snapshot_id
130    }
131
132    /// Get parent snapshot id.
133    #[inline]
134    pub fn parent_snapshot_id(&self) -> Option<i64> {
135        self.parent_snapshot_id
136    }
137
138    /// Get sequence_number of the snapshot. Is 0 for Iceberg V1 tables.
139    #[inline]
140    pub fn sequence_number(&self) -> i64 {
141        self.sequence_number
142    }
143    /// Get location of manifest_list file
144    #[inline]
145    pub fn manifest_list(&self) -> &str {
146        &self.manifest_list
147    }
148
149    /// Get summary of the snapshot
150    #[inline]
151    pub fn summary(&self) -> &Summary {
152        &self.summary
153    }
154    /// Get the timestamp of when the snapshot was created
155    #[inline]
156    pub fn timestamp(&self) -> Result<DateTime<Utc>> {
157        timestamp_ms_to_utc(self.timestamp_ms)
158    }
159
160    /// Get the timestamp of when the snapshot was created in milliseconds
161    #[inline]
162    pub fn timestamp_ms(&self) -> i64 {
163        self.timestamp_ms
164    }
165
166    /// Get the schema id of this snapshot.
167    #[inline]
168    pub fn schema_id(&self) -> Option<SchemaId> {
169        self.schema_id
170    }
171
172    /// Get the schema of this snapshot.
173    pub fn schema(&self, table_metadata: &TableMetadata) -> Result<SchemaRef> {
174        Ok(match self.schema_id() {
175            Some(schema_id) => table_metadata
176                .schema_by_id(schema_id)
177                .ok_or_else(|| {
178                    Error::new(
179                        ErrorKind::DataInvalid,
180                        format!("Schema with id {schema_id} not found"),
181                    )
182                })?
183                .clone(),
184            None => table_metadata.current_schema().clone(),
185        })
186    }
187
188    /// Get parent snapshot.
189    #[cfg(test)]
190    pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option<SnapshotRef> {
191        match self.parent_snapshot_id {
192            Some(id) => table_metadata.snapshot_by_id(id).cloned(),
193            None => None,
194        }
195    }
196
197    /// Load manifest list.
198    pub async fn load_manifest_list(
199        &self,
200        file_io: &FileIO,
201        table_metadata: &TableMetadata,
202    ) -> Result<ManifestList> {
203        let manifest_list_content = file_io.new_input(&self.manifest_list)?.read().await?;
204        ManifestList::parse_with_version(
205            &manifest_list_content,
206            // TODO: You don't really need the version since you could just project any Avro in
207            // the version that you'd like to get (probably always the latest)
208            table_metadata.format_version(),
209        )
210    }
211
212    #[allow(dead_code)]
213    pub(crate) fn log(&self) -> SnapshotLog {
214        SnapshotLog {
215            timestamp_ms: self.timestamp_ms,
216            snapshot_id: self.snapshot_id,
217        }
218    }
219
220    /// The row-id of the first newly added row in this snapshot. All rows added in this snapshot will
221    /// have a row-id assigned to them greater than this value. All rows with a row-id less than this
222    /// value were created in a snapshot that was added to the table (but not necessarily committed to
223    /// this branch) in the past.
224    ///
225    /// This field is optional but is required when the table version supports row lineage.
226    pub fn first_row_id(&self) -> Option<u64> {
227        self.row_range.as_ref().map(|r| r.first_row_id)
228    }
229
230    /// The total number of newly added rows in this snapshot. It should be the summation of {@link
231    /// ManifestFile#ADDED_ROWS_COUNT} for every manifest added in this snapshot.
232    ///
233    /// This field is optional but is required when the table version supports row lineage.
234    pub fn added_rows_count(&self) -> Option<u64> {
235        self.row_range.as_ref().map(|r| r.added_rows)
236    }
237
238    /// Returns the row range of this snapshot, if available.
239    /// This is a tuple containing (first_row_id, added_rows_count).
240    pub fn row_range(&self) -> Option<(u64, u64)> {
241        self.row_range
242            .as_ref()
243            .map(|r| (r.first_row_id, r.added_rows))
244    }
245
246    /// Get encryption key id, if available.
247    pub fn encryption_key_id(&self) -> Option<&str> {
248        self.encryption_key_id.as_deref()
249    }
250}
251
252pub(super) mod _serde {
253    /// This is a helper module that defines types to help with serialization/deserialization.
254    /// For deserialization the input first gets read into either the [SnapshotV1] or [SnapshotV2] struct
255    /// and then converted into the [Snapshot] struct. Serialization works the other way around.
256    /// [SnapshotV1] and [SnapshotV2] are internal struct that are only used for serialization and deserialization.
257    use std::collections::HashMap;
258
259    use serde::{Deserialize, Serialize};
260
261    use super::{Operation, Snapshot, Summary};
262    use crate::spec::SchemaId;
263    use crate::spec::snapshot::SnapshotRowRange;
264    use crate::{Error, ErrorKind};
265
266    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
267    #[serde(rename_all = "kebab-case")]
268    /// Defines the structure of a v3 snapshot for serialization/deserialization
269    pub(crate) struct SnapshotV3 {
270        pub snapshot_id: i64,
271        #[serde(skip_serializing_if = "Option::is_none")]
272        pub parent_snapshot_id: Option<i64>,
273        pub sequence_number: i64,
274        pub timestamp_ms: i64,
275        pub manifest_list: String,
276        pub summary: Summary,
277        #[serde(skip_serializing_if = "Option::is_none")]
278        pub schema_id: Option<SchemaId>,
279        #[serde(skip_serializing_if = "Option::is_none")]
280        pub first_row_id: Option<u64>,
281        #[serde(skip_serializing_if = "Option::is_none")]
282        pub added_rows: Option<u64>,
283        #[serde(skip_serializing_if = "Option::is_none")]
284        pub key_id: Option<String>,
285    }
286
287    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
288    #[serde(rename_all = "kebab-case")]
289    /// Defines the structure of a v2 snapshot for serialization/deserialization
290    pub(crate) struct SnapshotV2 {
291        pub snapshot_id: i64,
292        #[serde(skip_serializing_if = "Option::is_none")]
293        pub parent_snapshot_id: Option<i64>,
294        #[serde(default)]
295        pub sequence_number: i64,
296        pub timestamp_ms: i64,
297        pub manifest_list: String,
298        pub summary: Summary,
299        #[serde(skip_serializing_if = "Option::is_none")]
300        pub schema_id: Option<SchemaId>,
301    }
302
303    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
304    #[serde(rename_all = "kebab-case")]
305    /// Defines the structure of a v1 snapshot for serialization/deserialization
306    pub(crate) struct SnapshotV1 {
307        pub snapshot_id: i64,
308        #[serde(skip_serializing_if = "Option::is_none")]
309        pub parent_snapshot_id: Option<i64>,
310        pub timestamp_ms: i64,
311        #[serde(skip_serializing_if = "Option::is_none")]
312        pub manifest_list: Option<String>,
313        #[serde(skip_serializing_if = "Option::is_none")]
314        pub manifests: Option<Vec<String>>,
315        #[serde(skip_serializing_if = "Option::is_none")]
316        pub summary: Option<Summary>,
317        #[serde(skip_serializing_if = "Option::is_none")]
318        pub schema_id: Option<SchemaId>,
319    }
320
321    impl From<SnapshotV3> for Snapshot {
322        fn from(s: SnapshotV3) -> Self {
323            Snapshot {
324                snapshot_id: s.snapshot_id,
325                parent_snapshot_id: s.parent_snapshot_id,
326                sequence_number: s.sequence_number,
327                timestamp_ms: s.timestamp_ms,
328                manifest_list: s.manifest_list,
329                summary: s.summary,
330                schema_id: s.schema_id,
331                encryption_key_id: s.key_id,
332                row_range: match (s.first_row_id, s.added_rows) {
333                    (Some(first_row_id), Some(added_rows)) => Some(SnapshotRowRange {
334                        first_row_id,
335                        added_rows,
336                    }),
337                    _ => None,
338                },
339            }
340        }
341    }
342
343    impl TryFrom<Snapshot> for SnapshotV3 {
344        type Error = Error;
345
346        fn try_from(s: Snapshot) -> Result<Self, Self::Error> {
347            let (first_row_id, added_rows) = match s.row_range {
348                Some(row_range) => (Some(row_range.first_row_id), Some(row_range.added_rows)),
349                None => (None, None),
350            };
351
352            Ok(SnapshotV3 {
353                snapshot_id: s.snapshot_id,
354                parent_snapshot_id: s.parent_snapshot_id,
355                sequence_number: s.sequence_number,
356                timestamp_ms: s.timestamp_ms,
357                manifest_list: s.manifest_list,
358                summary: s.summary,
359                schema_id: s.schema_id,
360                first_row_id,
361                added_rows,
362                key_id: s.encryption_key_id,
363            })
364        }
365    }
366
367    impl From<SnapshotV2> for Snapshot {
368        fn from(v2: SnapshotV2) -> Self {
369            Snapshot {
370                snapshot_id: v2.snapshot_id,
371                parent_snapshot_id: v2.parent_snapshot_id,
372                sequence_number: v2.sequence_number,
373                timestamp_ms: v2.timestamp_ms,
374                manifest_list: v2.manifest_list,
375                summary: v2.summary,
376                schema_id: v2.schema_id,
377                encryption_key_id: None,
378                row_range: None,
379            }
380        }
381    }
382
383    impl From<Snapshot> for SnapshotV2 {
384        fn from(v2: Snapshot) -> Self {
385            SnapshotV2 {
386                snapshot_id: v2.snapshot_id,
387                parent_snapshot_id: v2.parent_snapshot_id,
388                sequence_number: v2.sequence_number,
389                timestamp_ms: v2.timestamp_ms,
390                manifest_list: v2.manifest_list,
391                summary: v2.summary,
392                schema_id: v2.schema_id,
393            }
394        }
395    }
396
397    impl TryFrom<SnapshotV1> for Snapshot {
398        type Error = Error;
399
400        fn try_from(v1: SnapshotV1) -> Result<Self, Self::Error> {
401            Ok(Snapshot {
402                snapshot_id: v1.snapshot_id,
403                parent_snapshot_id: v1.parent_snapshot_id,
404                sequence_number: 0,
405                timestamp_ms: v1.timestamp_ms,
406                manifest_list: match (v1.manifest_list, v1.manifests) {
407                    (Some(file), None) => file,
408                    (Some(_), Some(_)) => {
409                        return Err(Error::new(
410                            ErrorKind::DataInvalid,
411                            "Invalid v1 snapshot, when manifest list provided, manifest files should be omitted",
412                        ));
413                    }
414                    (None, _) => {
415                        return Err(Error::new(
416                            ErrorKind::DataInvalid,
417                            "Unsupported v1 snapshot, only manifest list is supported",
418                        ));
419                    }
420                },
421                summary: v1.summary.unwrap_or(Summary {
422                    operation: Operation::default(),
423                    additional_properties: HashMap::new(),
424                }),
425                schema_id: v1.schema_id,
426                encryption_key_id: None,
427                row_range: None,
428            })
429        }
430    }
431
432    impl From<Snapshot> for SnapshotV1 {
433        fn from(v2: Snapshot) -> Self {
434            SnapshotV1 {
435                snapshot_id: v2.snapshot_id,
436                parent_snapshot_id: v2.parent_snapshot_id,
437                timestamp_ms: v2.timestamp_ms,
438                manifest_list: Some(v2.manifest_list),
439                summary: Some(v2.summary),
440                schema_id: v2.schema_id,
441                manifests: None,
442            }
443        }
444    }
445}
446
447#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
448#[serde(rename_all = "kebab-case")]
449/// Iceberg tables keep track of branches and tags using snapshot references.
450pub struct SnapshotReference {
451    /// A reference’s snapshot ID. The tagged snapshot or latest snapshot of a branch.
452    pub snapshot_id: i64,
453    #[serde(flatten)]
454    /// Snapshot retention policy
455    pub retention: SnapshotRetention,
456}
457
458impl SnapshotReference {
459    /// Returns true if the snapshot reference is a branch.
460    pub fn is_branch(&self) -> bool {
461        matches!(self.retention, SnapshotRetention::Branch { .. })
462    }
463}
464
465impl SnapshotReference {
466    /// Create new snapshot reference
467    pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self {
468        SnapshotReference {
469            snapshot_id,
470            retention,
471        }
472    }
473}
474
475#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
476#[serde(rename_all = "lowercase", tag = "type")]
477/// The snapshot expiration procedure removes snapshots from table metadata and applies the table’s retention policy.
478pub enum SnapshotRetention {
479    #[serde(rename_all = "kebab-case")]
480    /// Branches are mutable named references that can be updated by committing a new snapshot as
481    /// the branch’s referenced snapshot using the Commit Conflict Resolution and Retry procedures.
482    Branch {
483        /// A positive number for the minimum number of snapshots to keep in a branch while expiring snapshots.
484        /// Defaults to table property history.expire.min-snapshots-to-keep.
485        #[serde(skip_serializing_if = "Option::is_none")]
486        min_snapshots_to_keep: Option<i32>,
487        /// A positive number for the max age of snapshots to keep when expiring, including the latest snapshot.
488        /// Defaults to table property history.expire.max-snapshot-age-ms.
489        #[serde(skip_serializing_if = "Option::is_none")]
490        max_snapshot_age_ms: Option<i64>,
491        /// For snapshot references except the main branch, a positive number for the max age of the snapshot reference to keep while expiring snapshots.
492        /// Defaults to table property history.expire.max-ref-age-ms. The main branch never expires.
493        #[serde(skip_serializing_if = "Option::is_none")]
494        max_ref_age_ms: Option<i64>,
495    },
496    #[serde(rename_all = "kebab-case")]
497    /// Tags are labels for individual snapshots.
498    Tag {
499        /// For snapshot references except the main branch, a positive number for the max age of the snapshot reference to keep while expiring snapshots.
500        /// Defaults to table property history.expire.max-ref-age-ms. The main branch never expires.
501        #[serde(skip_serializing_if = "Option::is_none")]
502        max_ref_age_ms: Option<i64>,
503    },
504}
505
506impl SnapshotRetention {
507    /// Create a new branch retention policy
508    pub fn branch(
509        min_snapshots_to_keep: Option<i32>,
510        max_snapshot_age_ms: Option<i64>,
511        max_ref_age_ms: Option<i64>,
512    ) -> Self {
513        SnapshotRetention::Branch {
514            min_snapshots_to_keep,
515            max_snapshot_age_ms,
516            max_ref_age_ms,
517        }
518    }
519}
520
521#[cfg(test)]
522mod tests {
523    use std::collections::HashMap;
524
525    use chrono::{TimeZone, Utc};
526
527    use crate::spec::TableMetadata;
528    use crate::spec::snapshot::_serde::SnapshotV1;
529    use crate::spec::snapshot::{Operation, Snapshot, Summary};
530
531    #[test]
532    fn schema() {
533        let record = r#"
534        {
535            "snapshot-id": 3051729675574597004,
536            "timestamp-ms": 1515100955770,
537            "summary": {
538                "operation": "append"
539            },
540            "manifest-list": "s3://b/wh/.../s1.avro",
541            "schema-id": 0
542        }
543        "#;
544
545        let result: Snapshot = serde_json::from_str::<SnapshotV1>(record)
546            .unwrap()
547            .try_into()
548            .unwrap();
549        assert_eq!(3051729675574597004, result.snapshot_id());
550        assert_eq!(
551            Utc.timestamp_millis_opt(1515100955770).unwrap(),
552            result.timestamp().unwrap()
553        );
554        assert_eq!(1515100955770, result.timestamp_ms());
555        assert_eq!(
556            Summary {
557                operation: Operation::Append,
558                additional_properties: HashMap::new()
559            },
560            *result.summary()
561        );
562        assert_eq!("s3://b/wh/.../s1.avro".to_string(), *result.manifest_list());
563    }
564
565    #[test]
566    fn test_snapshot_v1_to_v2_projection() {
567        use crate::spec::snapshot::_serde::SnapshotV1;
568
569        // Create a V1 snapshot (without sequence-number field)
570        let v1_snapshot = SnapshotV1 {
571            snapshot_id: 1234567890,
572            parent_snapshot_id: Some(987654321),
573            timestamp_ms: 1515100955770,
574            manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
575            manifests: None, // V1 can have either manifest_list or manifests, but not both
576            summary: Some(Summary {
577                operation: Operation::Append,
578                additional_properties: HashMap::from([
579                    ("added-files".to_string(), "5".to_string()),
580                    ("added-records".to_string(), "100".to_string()),
581                ]),
582            }),
583            schema_id: Some(1),
584        };
585
586        // Convert V1 to V2 - this should apply defaults for missing V2 fields
587        let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
588
589        // Verify V1→V2 projection defaults are applied correctly
590        assert_eq!(
591            v2_snapshot.sequence_number(),
592            0,
593            "V1 snapshot sequence_number should default to 0"
594        );
595
596        // Verify other fields are preserved correctly during conversion
597        assert_eq!(v2_snapshot.snapshot_id(), 1234567890);
598        assert_eq!(v2_snapshot.parent_snapshot_id(), Some(987654321));
599        assert_eq!(v2_snapshot.timestamp_ms(), 1515100955770);
600        assert_eq!(
601            v2_snapshot.manifest_list(),
602            "s3://bucket/manifest-list.avro"
603        );
604        assert_eq!(v2_snapshot.schema_id(), Some(1));
605        assert_eq!(v2_snapshot.summary().operation, Operation::Append);
606        assert_eq!(
607            v2_snapshot
608                .summary()
609                .additional_properties
610                .get("added-files"),
611            Some(&"5".to_string())
612        );
613    }
614
615    #[test]
616    fn test_v1_snapshot_with_manifest_list_and_manifests() {
617        {
618            let metadata = r#"
619    {
620        "format-version": 1,
621        "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
622        "location": "s3://bucket/test/location",
623        "last-updated-ms": 1700000000000,
624        "last-column-id": 1,
625        "schema": {
626            "type": "struct",
627            "fields": [
628                {"id": 1, "name": "x", "required": true, "type": "long"}
629            ]
630        },
631        "partition-spec": [],
632        "properties": {},
633        "current-snapshot-id": 111111111,
634        "snapshots": [
635            {
636                "snapshot-id": 111111111,
637                "timestamp-ms": 1600000000000,
638                "summary": {"operation": "append"},
639                "manifest-list": "s3://bucket/metadata/snap-123.avro",
640                "manifests": ["s3://bucket/metadata/manifest-1.avro"]
641            }
642        ]
643    }
644    "#;
645
646            let result_both_manifest_list_and_manifest_set =
647                serde_json::from_str::<TableMetadata>(metadata);
648            assert!(result_both_manifest_list_and_manifest_set.is_err());
649            assert_eq!(
650                result_both_manifest_list_and_manifest_set
651                    .unwrap_err()
652                    .to_string(),
653                "DataInvalid => Invalid v1 snapshot, when manifest list provided, manifest files should be omitted"
654            )
655        }
656
657        {
658            let metadata = r#"
659    {
660        "format-version": 1,
661        "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
662        "location": "s3://bucket/test/location",
663        "last-updated-ms": 1700000000000,
664        "last-column-id": 1,
665        "schema": {
666            "type": "struct",
667            "fields": [
668                {"id": 1, "name": "x", "required": true, "type": "long"}
669            ]
670        },
671        "partition-spec": [],
672        "properties": {},
673        "current-snapshot-id": 111111111,
674        "snapshots": [
675            {
676                "snapshot-id": 111111111,
677                "timestamp-ms": 1600000000000,
678                "summary": {"operation": "append"},
679                "manifests": ["s3://bucket/metadata/manifest-1.avro"]
680            }
681        ]
682    }
683    "#;
684            let result_missing_manifest_list = serde_json::from_str::<TableMetadata>(metadata);
685            assert!(result_missing_manifest_list.is_err());
686            assert_eq!(
687                result_missing_manifest_list.unwrap_err().to_string(),
688                "DataInvalid => Unsupported v1 snapshot, only manifest list is supported"
689            )
690        }
691    }
692
693    #[test]
694    fn test_snapshot_v1_to_v2_with_missing_summary() {
695        use crate::spec::snapshot::_serde::SnapshotV1;
696
697        // Create a V1 snapshot without summary (should get default)
698        let v1_snapshot = SnapshotV1 {
699            snapshot_id: 1111111111,
700            parent_snapshot_id: None,
701            timestamp_ms: 1515100955770,
702            manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
703            manifests: None,
704            summary: None, // V1 summary is optional
705            schema_id: None,
706        };
707
708        // Convert V1 to V2 - this should apply default summary
709        let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
710
711        // Verify defaults are applied correctly
712        assert_eq!(
713            v2_snapshot.sequence_number(),
714            0,
715            "V1 snapshot sequence_number should default to 0"
716        );
717        assert_eq!(
718            v2_snapshot.summary().operation,
719            Operation::Append,
720            "Missing V1 summary should default to Append operation"
721        );
722        assert!(
723            v2_snapshot.summary().additional_properties.is_empty(),
724            "Default summary should have empty additional_properties"
725        );
726
727        // Verify other fields
728        assert_eq!(v2_snapshot.snapshot_id(), 1111111111);
729        assert_eq!(v2_snapshot.parent_snapshot_id(), None);
730        assert_eq!(v2_snapshot.schema_id(), None);
731    }
732}