iceberg/spec/
snapshot.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18/*!
19 * Snapshots
20 */
21use std::collections::HashMap;
22use std::sync::Arc;
23
24use chrono::{DateTime, Utc};
25use serde::{Deserialize, Serialize};
26use typed_builder::TypedBuilder;
27
28use super::table_metadata::SnapshotLog;
29use crate::error::{Result, timestamp_ms_to_utc};
30use crate::io::FileIO;
31use crate::spec::{ManifestList, SchemaId, SchemaRef, TableMetadata};
32use crate::{Error, ErrorKind};
33
34/// The ref name of the main branch of the table.
35pub const MAIN_BRANCH: &str = "main";
36/// Placeholder for snapshot ID. The field with this value must be replaced with the actual snapshot ID before it is committed.
37pub const UNASSIGNED_SNAPSHOT_ID: i64 = -1;
38
39/// Reference to [`Snapshot`].
40pub type SnapshotRef = Arc<Snapshot>;
41#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
42#[serde(rename_all = "lowercase")]
43/// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots.
44pub enum Operation {
45    /// Only data files were added and no files were removed.
46    Append,
47    /// Data and delete files were added and removed without changing table data;
48    /// i.e., compaction, changing the data file format, or relocating data files.
49    Replace,
50    /// Data and delete files were added and removed in a logical overwrite operation.
51    Overwrite,
52    /// Data files were removed and their contents logically deleted and/or delete files were added to delete rows.
53    Delete,
54}
55
56impl Operation {
57    /// Returns the string representation (lowercase) of the operation.
58    pub fn as_str(&self) -> &str {
59        match self {
60            Operation::Append => "append",
61            Operation::Replace => "replace",
62            Operation::Overwrite => "overwrite",
63            Operation::Delete => "delete",
64        }
65    }
66}
67
68#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
69/// Summarises the changes in the snapshot.
70pub struct Summary {
71    /// The type of operation in the snapshot
72    pub operation: Operation,
73    /// Other summary data.
74    #[serde(flatten)]
75    pub additional_properties: HashMap<String, String>,
76}
77
78impl Default for Operation {
79    fn default() -> Operation {
80        Self::Append
81    }
82}
83
84#[derive(Debug, PartialEq, Eq, Clone)]
85/// Row range of a snapshot, contains first_row_id and added_rows_count.
86pub struct SnapshotRowRange {
87    /// The first _row_id assigned to the first row in the first data file in the first manifest.
88    pub first_row_id: u64,
89    /// The upper bound of the number of rows with assigned row IDs
90    pub added_rows: u64,
91}
92
93#[derive(Debug, PartialEq, Eq, Clone, TypedBuilder)]
94#[builder(field_defaults(setter(prefix = "with_")))]
95/// A snapshot represents the state of a table at some time and is used to access the complete set of data files in the table.
96pub struct Snapshot {
97    /// A unique long ID
98    pub(crate) snapshot_id: i64,
99    /// The snapshot ID of the snapshot’s parent.
100    /// Omitted for any snapshot with no parent
101    #[builder(default = None)]
102    pub(crate) parent_snapshot_id: Option<i64>,
103    /// A monotonically increasing long that tracks the order of
104    /// changes to a table.
105    pub(crate) sequence_number: i64,
106    /// A timestamp when the snapshot was created, used for garbage
107    /// collection and table inspection
108    pub(crate) timestamp_ms: i64,
109    /// The location of a manifest list for this snapshot that
110    /// tracks manifest files with additional metadata.
111    /// Currently we only support manifest list file, and manifest files are not supported.
112    #[builder(setter(into))]
113    pub(crate) manifest_list: String,
114    /// A string map that summarizes the snapshot changes, including operation.
115    pub(crate) summary: Summary,
116    /// ID of the table’s current schema when the snapshot was created.
117    #[builder(setter(strip_option(fallback = schema_id_opt)), default = None)]
118    pub(crate) schema_id: Option<SchemaId>,
119    /// Encryption Key ID
120    #[builder(default)]
121    pub(crate) encryption_key_id: Option<String>,
122    /// Row range of this snapshot, required when the table version supports row lineage.
123    /// Specify as a tuple of (first_row_id, added_rows_count)
124    #[builder(default, setter(!strip_option, transform = |first_row_id: u64, added_rows: u64| Some(SnapshotRowRange { first_row_id, added_rows })))]
125    // This is specified as a struct instead of two separate fields to ensure that both fields are either set or not set.
126    // The java implementations uses two separate fields, then sets `added_row_counts` to Null if `first_row_id` is set to Null.
127    // It throws an error if `added_row_counts` is set but `first_row_id` is not set, or if either of the two is negative.
128    // We handle all cases infallible using the rust type system.
129    pub(crate) row_range: Option<SnapshotRowRange>,
130}
131
132impl Snapshot {
133    /// Get the id of the snapshot
134    #[inline]
135    pub fn snapshot_id(&self) -> i64 {
136        self.snapshot_id
137    }
138
139    /// Get parent snapshot id.
140    #[inline]
141    pub fn parent_snapshot_id(&self) -> Option<i64> {
142        self.parent_snapshot_id
143    }
144
145    /// Get sequence_number of the snapshot. Is 0 for Iceberg V1 tables.
146    #[inline]
147    pub fn sequence_number(&self) -> i64 {
148        self.sequence_number
149    }
150    /// Get location of manifest_list file
151    #[inline]
152    pub fn manifest_list(&self) -> &str {
153        &self.manifest_list
154    }
155
156    /// Get summary of the snapshot
157    #[inline]
158    pub fn summary(&self) -> &Summary {
159        &self.summary
160    }
161    /// Get the timestamp of when the snapshot was created
162    #[inline]
163    pub fn timestamp(&self) -> Result<DateTime<Utc>> {
164        timestamp_ms_to_utc(self.timestamp_ms)
165    }
166
167    /// Get the timestamp of when the snapshot was created in milliseconds
168    #[inline]
169    pub fn timestamp_ms(&self) -> i64 {
170        self.timestamp_ms
171    }
172
173    /// Get the schema id of this snapshot.
174    #[inline]
175    pub fn schema_id(&self) -> Option<SchemaId> {
176        self.schema_id
177    }
178
179    /// Get the schema of this snapshot.
180    pub fn schema(&self, table_metadata: &TableMetadata) -> Result<SchemaRef> {
181        Ok(match self.schema_id() {
182            Some(schema_id) => table_metadata
183                .schema_by_id(schema_id)
184                .ok_or_else(|| {
185                    Error::new(
186                        ErrorKind::DataInvalid,
187                        format!("Schema with id {schema_id} not found"),
188                    )
189                })?
190                .clone(),
191            None => table_metadata.current_schema().clone(),
192        })
193    }
194
195    /// Get parent snapshot.
196    #[cfg(test)]
197    pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option<SnapshotRef> {
198        match self.parent_snapshot_id {
199            Some(id) => table_metadata.snapshot_by_id(id).cloned(),
200            None => None,
201        }
202    }
203
204    /// Load manifest list.
205    pub async fn load_manifest_list(
206        &self,
207        file_io: &FileIO,
208        table_metadata: &TableMetadata,
209    ) -> Result<ManifestList> {
210        let manifest_list_content = file_io.new_input(&self.manifest_list)?.read().await?;
211        ManifestList::parse_with_version(
212            &manifest_list_content,
213            // TODO: You don't really need the version since you could just project any Avro in
214            // the version that you'd like to get (probably always the latest)
215            table_metadata.format_version(),
216        )
217    }
218
219    #[allow(dead_code)]
220    pub(crate) fn log(&self) -> SnapshotLog {
221        SnapshotLog {
222            timestamp_ms: self.timestamp_ms,
223            snapshot_id: self.snapshot_id,
224        }
225    }
226
227    /// The row-id of the first newly added row in this snapshot. All rows added in this snapshot will
228    /// have a row-id assigned to them greater than this value. All rows with a row-id less than this
229    /// value were created in a snapshot that was added to the table (but not necessarily committed to
230    /// this branch) in the past.
231    ///
232    /// This field is optional but is required when the table version supports row lineage.
233    pub fn first_row_id(&self) -> Option<u64> {
234        self.row_range.as_ref().map(|r| r.first_row_id)
235    }
236
237    /// The total number of newly added rows in this snapshot. It should be the summation of {@link
238    /// ManifestFile#ADDED_ROWS_COUNT} for every manifest added in this snapshot.
239    ///
240    /// This field is optional but is required when the table version supports row lineage.
241    pub fn added_rows_count(&self) -> Option<u64> {
242        self.row_range.as_ref().map(|r| r.added_rows)
243    }
244
245    /// Returns the row range of this snapshot, if available.
246    /// This is a tuple containing (first_row_id, added_rows_count).
247    pub fn row_range(&self) -> Option<(u64, u64)> {
248        self.row_range
249            .as_ref()
250            .map(|r| (r.first_row_id, r.added_rows))
251    }
252
253    /// Get encryption key id, if available.
254    pub fn encryption_key_id(&self) -> Option<&str> {
255        self.encryption_key_id.as_deref()
256    }
257}
258
259pub(super) mod _serde {
260    /// This is a helper module that defines types to help with serialization/deserialization.
261    /// For deserialization the input first gets read into either the [SnapshotV1] or [SnapshotV2] struct
262    /// and then converted into the [Snapshot] struct. Serialization works the other way around.
263    /// [SnapshotV1] and [SnapshotV2] are internal struct that are only used for serialization and deserialization.
264    use std::collections::HashMap;
265
266    use serde::{Deserialize, Serialize};
267
268    use super::{Operation, Snapshot, Summary};
269    use crate::spec::SchemaId;
270    use crate::spec::snapshot::SnapshotRowRange;
271    use crate::{Error, ErrorKind};
272
273    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
274    #[serde(rename_all = "kebab-case")]
275    /// Defines the structure of a v3 snapshot for serialization/deserialization
276    pub(crate) struct SnapshotV3 {
277        pub snapshot_id: i64,
278        #[serde(skip_serializing_if = "Option::is_none")]
279        pub parent_snapshot_id: Option<i64>,
280        pub sequence_number: i64,
281        pub timestamp_ms: i64,
282        pub manifest_list: String,
283        pub summary: Summary,
284        #[serde(skip_serializing_if = "Option::is_none")]
285        pub schema_id: Option<SchemaId>,
286        #[serde(skip_serializing_if = "Option::is_none")]
287        pub first_row_id: Option<u64>,
288        #[serde(skip_serializing_if = "Option::is_none")]
289        pub added_rows: Option<u64>,
290        #[serde(skip_serializing_if = "Option::is_none")]
291        pub key_id: Option<String>,
292    }
293
294    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
295    #[serde(rename_all = "kebab-case")]
296    /// Defines the structure of a v2 snapshot for serialization/deserialization
297    pub(crate) struct SnapshotV2 {
298        pub snapshot_id: i64,
299        #[serde(skip_serializing_if = "Option::is_none")]
300        pub parent_snapshot_id: Option<i64>,
301        pub sequence_number: i64,
302        pub timestamp_ms: i64,
303        pub manifest_list: String,
304        pub summary: Summary,
305        #[serde(skip_serializing_if = "Option::is_none")]
306        pub schema_id: Option<SchemaId>,
307    }
308
309    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
310    #[serde(rename_all = "kebab-case")]
311    /// Defines the structure of a v1 snapshot for serialization/deserialization
312    pub(crate) struct SnapshotV1 {
313        pub snapshot_id: i64,
314        #[serde(skip_serializing_if = "Option::is_none")]
315        pub parent_snapshot_id: Option<i64>,
316        pub timestamp_ms: i64,
317        #[serde(skip_serializing_if = "Option::is_none")]
318        pub manifest_list: Option<String>,
319        #[serde(skip_serializing_if = "Option::is_none")]
320        pub manifests: Option<Vec<String>>,
321        #[serde(skip_serializing_if = "Option::is_none")]
322        pub summary: Option<Summary>,
323        #[serde(skip_serializing_if = "Option::is_none")]
324        pub schema_id: Option<SchemaId>,
325    }
326
327    impl From<SnapshotV3> for Snapshot {
328        fn from(s: SnapshotV3) -> Self {
329            Snapshot {
330                snapshot_id: s.snapshot_id,
331                parent_snapshot_id: s.parent_snapshot_id,
332                sequence_number: s.sequence_number,
333                timestamp_ms: s.timestamp_ms,
334                manifest_list: s.manifest_list,
335                summary: s.summary,
336                schema_id: s.schema_id,
337                encryption_key_id: s.key_id,
338                row_range: match (s.first_row_id, s.added_rows) {
339                    (Some(first_row_id), Some(added_rows)) => Some(SnapshotRowRange {
340                        first_row_id,
341                        added_rows,
342                    }),
343                    _ => None,
344                },
345            }
346        }
347    }
348
349    impl TryFrom<Snapshot> for SnapshotV3 {
350        type Error = Error;
351
352        fn try_from(s: Snapshot) -> Result<Self, Self::Error> {
353            let (first_row_id, added_rows) = match s.row_range {
354                Some(row_range) => (Some(row_range.first_row_id), Some(row_range.added_rows)),
355                None => (None, None),
356            };
357
358            Ok(SnapshotV3 {
359                snapshot_id: s.snapshot_id,
360                parent_snapshot_id: s.parent_snapshot_id,
361                sequence_number: s.sequence_number,
362                timestamp_ms: s.timestamp_ms,
363                manifest_list: s.manifest_list,
364                summary: s.summary,
365                schema_id: s.schema_id,
366                first_row_id,
367                added_rows,
368                key_id: s.encryption_key_id,
369            })
370        }
371    }
372
373    impl From<SnapshotV2> for Snapshot {
374        fn from(v2: SnapshotV2) -> Self {
375            Snapshot {
376                snapshot_id: v2.snapshot_id,
377                parent_snapshot_id: v2.parent_snapshot_id,
378                sequence_number: v2.sequence_number,
379                timestamp_ms: v2.timestamp_ms,
380                manifest_list: v2.manifest_list,
381                summary: v2.summary,
382                schema_id: v2.schema_id,
383                encryption_key_id: None,
384                row_range: None,
385            }
386        }
387    }
388
389    impl From<Snapshot> for SnapshotV2 {
390        fn from(v2: Snapshot) -> Self {
391            SnapshotV2 {
392                snapshot_id: v2.snapshot_id,
393                parent_snapshot_id: v2.parent_snapshot_id,
394                sequence_number: v2.sequence_number,
395                timestamp_ms: v2.timestamp_ms,
396                manifest_list: v2.manifest_list,
397                summary: v2.summary,
398                schema_id: v2.schema_id,
399            }
400        }
401    }
402
403    impl TryFrom<SnapshotV1> for Snapshot {
404        type Error = Error;
405
406        fn try_from(v1: SnapshotV1) -> Result<Self, Self::Error> {
407            Ok(Snapshot {
408                snapshot_id: v1.snapshot_id,
409                parent_snapshot_id: v1.parent_snapshot_id,
410                sequence_number: 0,
411                timestamp_ms: v1.timestamp_ms,
412                manifest_list: match (v1.manifest_list, v1.manifests) {
413                    (Some(file), None) => file,
414                    (Some(_), Some(_)) => {
415                        return Err(Error::new(
416                            ErrorKind::DataInvalid,
417                            "Invalid v1 snapshot, when manifest list provided, manifest files should be omitted",
418                        ));
419                    }
420                    (None, _) => {
421                        return Err(Error::new(
422                            ErrorKind::DataInvalid,
423                            "Unsupported v1 snapshot, only manifest list is supported",
424                        ));
425                    }
426                },
427                summary: v1.summary.unwrap_or(Summary {
428                    operation: Operation::default(),
429                    additional_properties: HashMap::new(),
430                }),
431                schema_id: v1.schema_id,
432                encryption_key_id: None,
433                row_range: None,
434            })
435        }
436    }
437
438    impl From<Snapshot> for SnapshotV1 {
439        fn from(v2: Snapshot) -> Self {
440            SnapshotV1 {
441                snapshot_id: v2.snapshot_id,
442                parent_snapshot_id: v2.parent_snapshot_id,
443                timestamp_ms: v2.timestamp_ms,
444                manifest_list: Some(v2.manifest_list),
445                summary: Some(v2.summary),
446                schema_id: v2.schema_id,
447                manifests: None,
448            }
449        }
450    }
451}
452
453#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
454#[serde(rename_all = "kebab-case")]
455/// Iceberg tables keep track of branches and tags using snapshot references.
456pub struct SnapshotReference {
457    /// A reference’s snapshot ID. The tagged snapshot or latest snapshot of a branch.
458    pub snapshot_id: i64,
459    #[serde(flatten)]
460    /// Snapshot retention policy
461    pub retention: SnapshotRetention,
462}
463
464impl SnapshotReference {
465    /// Returns true if the snapshot reference is a branch.
466    pub fn is_branch(&self) -> bool {
467        matches!(self.retention, SnapshotRetention::Branch { .. })
468    }
469}
470
471impl SnapshotReference {
472    /// Create new snapshot reference
473    pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self {
474        SnapshotReference {
475            snapshot_id,
476            retention,
477        }
478    }
479}
480
481#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
482#[serde(rename_all = "lowercase", tag = "type")]
483/// The snapshot expiration procedure removes snapshots from table metadata and applies the table’s retention policy.
484pub enum SnapshotRetention {
485    #[serde(rename_all = "kebab-case")]
486    /// Branches are mutable named references that can be updated by committing a new snapshot as
487    /// the branch’s referenced snapshot using the Commit Conflict Resolution and Retry procedures.
488    Branch {
489        /// A positive number for the minimum number of snapshots to keep in a branch while expiring snapshots.
490        /// Defaults to table property history.expire.min-snapshots-to-keep.
491        #[serde(skip_serializing_if = "Option::is_none")]
492        min_snapshots_to_keep: Option<i32>,
493        /// A positive number for the max age of snapshots to keep when expiring, including the latest snapshot.
494        /// Defaults to table property history.expire.max-snapshot-age-ms.
495        #[serde(skip_serializing_if = "Option::is_none")]
496        max_snapshot_age_ms: Option<i64>,
497        /// For snapshot references except the main branch, a positive number for the max age of the snapshot reference to keep while expiring snapshots.
498        /// Defaults to table property history.expire.max-ref-age-ms. The main branch never expires.
499        #[serde(skip_serializing_if = "Option::is_none")]
500        max_ref_age_ms: Option<i64>,
501    },
502    #[serde(rename_all = "kebab-case")]
503    /// Tags are labels for individual snapshots.
504    Tag {
505        /// For snapshot references except the main branch, a positive number for the max age of the snapshot reference to keep while expiring snapshots.
506        /// Defaults to table property history.expire.max-ref-age-ms. The main branch never expires.
507        #[serde(skip_serializing_if = "Option::is_none")]
508        max_ref_age_ms: Option<i64>,
509    },
510}
511
512impl SnapshotRetention {
513    /// Create a new branch retention policy
514    pub fn branch(
515        min_snapshots_to_keep: Option<i32>,
516        max_snapshot_age_ms: Option<i64>,
517        max_ref_age_ms: Option<i64>,
518    ) -> Self {
519        SnapshotRetention::Branch {
520            min_snapshots_to_keep,
521            max_snapshot_age_ms,
522            max_ref_age_ms,
523        }
524    }
525}
526
527#[cfg(test)]
528mod tests {
529    use std::collections::HashMap;
530
531    use chrono::{TimeZone, Utc};
532
533    use crate::spec::TableMetadata;
534    use crate::spec::snapshot::_serde::SnapshotV1;
535    use crate::spec::snapshot::{Operation, Snapshot, Summary};
536
537    #[test]
538    fn schema() {
539        let record = r#"
540        {
541            "snapshot-id": 3051729675574597004,
542            "timestamp-ms": 1515100955770,
543            "summary": {
544                "operation": "append"
545            },
546            "manifest-list": "s3://b/wh/.../s1.avro",
547            "schema-id": 0
548        }
549        "#;
550
551        let result: Snapshot = serde_json::from_str::<SnapshotV1>(record)
552            .unwrap()
553            .try_into()
554            .unwrap();
555        assert_eq!(3051729675574597004, result.snapshot_id());
556        assert_eq!(
557            Utc.timestamp_millis_opt(1515100955770).unwrap(),
558            result.timestamp().unwrap()
559        );
560        assert_eq!(1515100955770, result.timestamp_ms());
561        assert_eq!(
562            Summary {
563                operation: Operation::Append,
564                additional_properties: HashMap::new()
565            },
566            *result.summary()
567        );
568        assert_eq!("s3://b/wh/.../s1.avro".to_string(), *result.manifest_list());
569    }
570
571    #[test]
572    fn test_snapshot_v1_to_v2_projection() {
573        use crate::spec::snapshot::_serde::SnapshotV1;
574
575        // Create a V1 snapshot (without sequence-number field)
576        let v1_snapshot = SnapshotV1 {
577            snapshot_id: 1234567890,
578            parent_snapshot_id: Some(987654321),
579            timestamp_ms: 1515100955770,
580            manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
581            manifests: None, // V1 can have either manifest_list or manifests, but not both
582            summary: Some(Summary {
583                operation: Operation::Append,
584                additional_properties: HashMap::from([
585                    ("added-files".to_string(), "5".to_string()),
586                    ("added-records".to_string(), "100".to_string()),
587                ]),
588            }),
589            schema_id: Some(1),
590        };
591
592        // Convert V1 to V2 - this should apply defaults for missing V2 fields
593        let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
594
595        // Verify V1→V2 projection defaults are applied correctly
596        assert_eq!(
597            v2_snapshot.sequence_number(),
598            0,
599            "V1 snapshot sequence_number should default to 0"
600        );
601
602        // Verify other fields are preserved correctly during conversion
603        assert_eq!(v2_snapshot.snapshot_id(), 1234567890);
604        assert_eq!(v2_snapshot.parent_snapshot_id(), Some(987654321));
605        assert_eq!(v2_snapshot.timestamp_ms(), 1515100955770);
606        assert_eq!(
607            v2_snapshot.manifest_list(),
608            "s3://bucket/manifest-list.avro"
609        );
610        assert_eq!(v2_snapshot.schema_id(), Some(1));
611        assert_eq!(v2_snapshot.summary().operation, Operation::Append);
612        assert_eq!(
613            v2_snapshot
614                .summary()
615                .additional_properties
616                .get("added-files"),
617            Some(&"5".to_string())
618        );
619    }
620
621    #[test]
622    fn test_v1_snapshot_with_manifest_list_and_manifests() {
623        {
624            let metadata = r#"
625    {
626        "format-version": 1,
627        "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
628        "location": "s3://bucket/test/location",
629        "last-updated-ms": 1700000000000,
630        "last-column-id": 1,
631        "schema": {
632            "type": "struct",
633            "fields": [
634                {"id": 1, "name": "x", "required": true, "type": "long"}
635            ]
636        },
637        "partition-spec": [],
638        "properties": {},
639        "current-snapshot-id": 111111111,
640        "snapshots": [
641            {
642                "snapshot-id": 111111111,
643                "timestamp-ms": 1600000000000,
644                "summary": {"operation": "append"},
645                "manifest-list": "s3://bucket/metadata/snap-123.avro",
646                "manifests": ["s3://bucket/metadata/manifest-1.avro"]
647            }
648        ]
649    }
650    "#;
651
652            let result_both_manifest_list_and_manifest_set =
653                serde_json::from_str::<TableMetadata>(metadata);
654            assert!(result_both_manifest_list_and_manifest_set.is_err());
655            assert_eq!(
656                result_both_manifest_list_and_manifest_set
657                    .unwrap_err()
658                    .to_string(),
659                "DataInvalid => Invalid v1 snapshot, when manifest list provided, manifest files should be omitted"
660            )
661        }
662
663        {
664            let metadata = r#"
665    {
666        "format-version": 1,
667        "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
668        "location": "s3://bucket/test/location",
669        "last-updated-ms": 1700000000000,
670        "last-column-id": 1,
671        "schema": {
672            "type": "struct",
673            "fields": [
674                {"id": 1, "name": "x", "required": true, "type": "long"}
675            ]
676        },
677        "partition-spec": [],
678        "properties": {},
679        "current-snapshot-id": 111111111,
680        "snapshots": [
681            {
682                "snapshot-id": 111111111,
683                "timestamp-ms": 1600000000000,
684                "summary": {"operation": "append"},
685                "manifests": ["s3://bucket/metadata/manifest-1.avro"]
686            }
687        ]
688    }
689    "#;
690            let result_missing_manifest_list = serde_json::from_str::<TableMetadata>(metadata);
691            assert!(result_missing_manifest_list.is_err());
692            assert_eq!(
693                result_missing_manifest_list.unwrap_err().to_string(),
694                "DataInvalid => Unsupported v1 snapshot, only manifest list is supported"
695            )
696        }
697    }
698
699    #[test]
700    fn test_snapshot_v1_to_v2_with_missing_summary() {
701        use crate::spec::snapshot::_serde::SnapshotV1;
702
703        // Create a V1 snapshot without summary (should get default)
704        let v1_snapshot = SnapshotV1 {
705            snapshot_id: 1111111111,
706            parent_snapshot_id: None,
707            timestamp_ms: 1515100955770,
708            manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
709            manifests: None,
710            summary: None, // V1 summary is optional
711            schema_id: None,
712        };
713
714        // Convert V1 to V2 - this should apply default summary
715        let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
716
717        // Verify defaults are applied correctly
718        assert_eq!(
719            v2_snapshot.sequence_number(),
720            0,
721            "V1 snapshot sequence_number should default to 0"
722        );
723        assert_eq!(
724            v2_snapshot.summary().operation,
725            Operation::Append,
726            "Missing V1 summary should default to Append operation"
727        );
728        assert!(
729            v2_snapshot.summary().additional_properties.is_empty(),
730            "Default summary should have empty additional_properties"
731        );
732
733        // Verify other fields
734        assert_eq!(v2_snapshot.snapshot_id(), 1111111111);
735        assert_eq!(v2_snapshot.parent_snapshot_id(), None);
736        assert_eq!(v2_snapshot.schema_id(), None);
737    }
738}