1use std::collections::HashMap;
22use std::sync::Arc;
23
24use chrono::{DateTime, Utc};
25use serde::{Deserialize, Serialize};
26use typed_builder::TypedBuilder;
27
28use super::table_metadata::SnapshotLog;
29use crate::error::{Result, timestamp_ms_to_utc};
30use crate::io::FileIO;
31use crate::spec::{ManifestList, SchemaId, SchemaRef, TableMetadata};
32use crate::{Error, ErrorKind};
33
34pub const MAIN_BRANCH: &str = "main";
36pub const UNASSIGNED_SNAPSHOT_ID: i64 = -1;
38
39pub type SnapshotRef = Arc<Snapshot>;
41#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
42#[serde(rename_all = "lowercase")]
43pub enum Operation {
45 Append,
47 Replace,
50 Overwrite,
52 Delete,
54}
55
56impl Operation {
57 pub fn as_str(&self) -> &str {
59 match self {
60 Operation::Append => "append",
61 Operation::Replace => "replace",
62 Operation::Overwrite => "overwrite",
63 Operation::Delete => "delete",
64 }
65 }
66}
67
68#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
69pub struct Summary {
71 pub operation: Operation,
73 #[serde(flatten)]
75 pub additional_properties: HashMap<String, String>,
76}
77
78impl Default for Operation {
79 fn default() -> Operation {
80 Self::Append
81 }
82}
83
84#[derive(Debug, PartialEq, Eq, Clone)]
85pub struct SnapshotRowRange {
87 pub first_row_id: u64,
89 pub added_rows: u64,
91}
92
93#[derive(Debug, PartialEq, Eq, Clone, TypedBuilder)]
94#[builder(field_defaults(setter(prefix = "with_")))]
95pub struct Snapshot {
97 pub(crate) snapshot_id: i64,
99 #[builder(default = None)]
102 pub(crate) parent_snapshot_id: Option<i64>,
103 pub(crate) sequence_number: i64,
106 pub(crate) timestamp_ms: i64,
109 #[builder(setter(into))]
113 pub(crate) manifest_list: String,
114 pub(crate) summary: Summary,
116 #[builder(setter(strip_option(fallback = schema_id_opt)), default = None)]
118 pub(crate) schema_id: Option<SchemaId>,
119 #[builder(default)]
121 pub(crate) encryption_key_id: Option<String>,
122 #[builder(default, setter(!strip_option, transform = |first_row_id: u64, added_rows: u64| Some(SnapshotRowRange { first_row_id, added_rows })))]
125 pub(crate) row_range: Option<SnapshotRowRange>,
130}
131
132impl Snapshot {
133 #[inline]
135 pub fn snapshot_id(&self) -> i64 {
136 self.snapshot_id
137 }
138
139 #[inline]
141 pub fn parent_snapshot_id(&self) -> Option<i64> {
142 self.parent_snapshot_id
143 }
144
145 #[inline]
147 pub fn sequence_number(&self) -> i64 {
148 self.sequence_number
149 }
150 #[inline]
152 pub fn manifest_list(&self) -> &str {
153 &self.manifest_list
154 }
155
156 #[inline]
158 pub fn summary(&self) -> &Summary {
159 &self.summary
160 }
161 #[inline]
163 pub fn timestamp(&self) -> Result<DateTime<Utc>> {
164 timestamp_ms_to_utc(self.timestamp_ms)
165 }
166
167 #[inline]
169 pub fn timestamp_ms(&self) -> i64 {
170 self.timestamp_ms
171 }
172
173 #[inline]
175 pub fn schema_id(&self) -> Option<SchemaId> {
176 self.schema_id
177 }
178
179 pub fn schema(&self, table_metadata: &TableMetadata) -> Result<SchemaRef> {
181 Ok(match self.schema_id() {
182 Some(schema_id) => table_metadata
183 .schema_by_id(schema_id)
184 .ok_or_else(|| {
185 Error::new(
186 ErrorKind::DataInvalid,
187 format!("Schema with id {schema_id} not found"),
188 )
189 })?
190 .clone(),
191 None => table_metadata.current_schema().clone(),
192 })
193 }
194
195 #[cfg(test)]
197 pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option<SnapshotRef> {
198 match self.parent_snapshot_id {
199 Some(id) => table_metadata.snapshot_by_id(id).cloned(),
200 None => None,
201 }
202 }
203
204 pub async fn load_manifest_list(
206 &self,
207 file_io: &FileIO,
208 table_metadata: &TableMetadata,
209 ) -> Result<ManifestList> {
210 let manifest_list_content = file_io.new_input(&self.manifest_list)?.read().await?;
211 ManifestList::parse_with_version(
212 &manifest_list_content,
213 table_metadata.format_version(),
216 )
217 }
218
219 #[allow(dead_code)]
220 pub(crate) fn log(&self) -> SnapshotLog {
221 SnapshotLog {
222 timestamp_ms: self.timestamp_ms,
223 snapshot_id: self.snapshot_id,
224 }
225 }
226
227 pub fn first_row_id(&self) -> Option<u64> {
234 self.row_range.as_ref().map(|r| r.first_row_id)
235 }
236
237 pub fn added_rows_count(&self) -> Option<u64> {
242 self.row_range.as_ref().map(|r| r.added_rows)
243 }
244
245 pub fn row_range(&self) -> Option<(u64, u64)> {
248 self.row_range
249 .as_ref()
250 .map(|r| (r.first_row_id, r.added_rows))
251 }
252
253 pub fn encryption_key_id(&self) -> Option<&str> {
255 self.encryption_key_id.as_deref()
256 }
257}
258
259pub(super) mod _serde {
260 use std::collections::HashMap;
265
266 use serde::{Deserialize, Serialize};
267
268 use super::{Operation, Snapshot, Summary};
269 use crate::spec::SchemaId;
270 use crate::spec::snapshot::SnapshotRowRange;
271 use crate::{Error, ErrorKind};
272
273 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
274 #[serde(rename_all = "kebab-case")]
275 pub(crate) struct SnapshotV3 {
277 pub snapshot_id: i64,
278 #[serde(skip_serializing_if = "Option::is_none")]
279 pub parent_snapshot_id: Option<i64>,
280 pub sequence_number: i64,
281 pub timestamp_ms: i64,
282 pub manifest_list: String,
283 pub summary: Summary,
284 #[serde(skip_serializing_if = "Option::is_none")]
285 pub schema_id: Option<SchemaId>,
286 pub first_row_id: u64,
287 pub added_rows: u64,
288 #[serde(skip_serializing_if = "Option::is_none")]
289 pub key_id: Option<String>,
290 }
291
292 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
293 #[serde(rename_all = "kebab-case")]
294 pub(crate) struct SnapshotV2 {
296 pub snapshot_id: i64,
297 #[serde(skip_serializing_if = "Option::is_none")]
298 pub parent_snapshot_id: Option<i64>,
299 pub sequence_number: i64,
300 pub timestamp_ms: i64,
301 pub manifest_list: String,
302 pub summary: Summary,
303 #[serde(skip_serializing_if = "Option::is_none")]
304 pub schema_id: Option<SchemaId>,
305 }
306
307 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
308 #[serde(rename_all = "kebab-case")]
309 pub(crate) struct SnapshotV1 {
311 pub snapshot_id: i64,
312 #[serde(skip_serializing_if = "Option::is_none")]
313 pub parent_snapshot_id: Option<i64>,
314 pub timestamp_ms: i64,
315 #[serde(skip_serializing_if = "Option::is_none")]
316 pub manifest_list: Option<String>,
317 #[serde(skip_serializing_if = "Option::is_none")]
318 pub manifests: Option<Vec<String>>,
319 #[serde(skip_serializing_if = "Option::is_none")]
320 pub summary: Option<Summary>,
321 #[serde(skip_serializing_if = "Option::is_none")]
322 pub schema_id: Option<SchemaId>,
323 }
324
325 impl From<SnapshotV3> for Snapshot {
326 fn from(s: SnapshotV3) -> Self {
327 Snapshot {
328 snapshot_id: s.snapshot_id,
329 parent_snapshot_id: s.parent_snapshot_id,
330 sequence_number: s.sequence_number,
331 timestamp_ms: s.timestamp_ms,
332 manifest_list: s.manifest_list,
333 summary: s.summary,
334 schema_id: s.schema_id,
335 encryption_key_id: s.key_id,
336 row_range: Some(SnapshotRowRange {
337 first_row_id: s.first_row_id,
338 added_rows: s.added_rows,
339 }),
340 }
341 }
342 }
343
344 impl TryFrom<Snapshot> for SnapshotV3 {
345 type Error = Error;
346
347 fn try_from(s: Snapshot) -> Result<Self, Self::Error> {
348 let row_range = s.row_range.ok_or_else(|| {
349 Error::new(
350 crate::ErrorKind::DataInvalid,
351 "v3 Snapshots must have first-row-id and rows-added fields set.".to_string(),
352 )
353 })?;
354
355 Ok(SnapshotV3 {
356 snapshot_id: s.snapshot_id,
357 parent_snapshot_id: s.parent_snapshot_id,
358 sequence_number: s.sequence_number,
359 timestamp_ms: s.timestamp_ms,
360 manifest_list: s.manifest_list,
361 summary: s.summary,
362 schema_id: s.schema_id,
363 first_row_id: row_range.first_row_id,
364 added_rows: row_range.added_rows,
365 key_id: s.encryption_key_id,
366 })
367 }
368 }
369
370 impl From<SnapshotV2> for Snapshot {
371 fn from(v2: SnapshotV2) -> Self {
372 Snapshot {
373 snapshot_id: v2.snapshot_id,
374 parent_snapshot_id: v2.parent_snapshot_id,
375 sequence_number: v2.sequence_number,
376 timestamp_ms: v2.timestamp_ms,
377 manifest_list: v2.manifest_list,
378 summary: v2.summary,
379 schema_id: v2.schema_id,
380 encryption_key_id: None,
381 row_range: None,
382 }
383 }
384 }
385
386 impl From<Snapshot> for SnapshotV2 {
387 fn from(v2: Snapshot) -> Self {
388 SnapshotV2 {
389 snapshot_id: v2.snapshot_id,
390 parent_snapshot_id: v2.parent_snapshot_id,
391 sequence_number: v2.sequence_number,
392 timestamp_ms: v2.timestamp_ms,
393 manifest_list: v2.manifest_list,
394 summary: v2.summary,
395 schema_id: v2.schema_id,
396 }
397 }
398 }
399
400 impl TryFrom<SnapshotV1> for Snapshot {
401 type Error = Error;
402
403 fn try_from(v1: SnapshotV1) -> Result<Self, Self::Error> {
404 Ok(Snapshot {
405 snapshot_id: v1.snapshot_id,
406 parent_snapshot_id: v1.parent_snapshot_id,
407 sequence_number: 0,
408 timestamp_ms: v1.timestamp_ms,
409 manifest_list: match (v1.manifest_list, v1.manifests) {
410 (Some(file), None) => file,
411 (Some(_), Some(_)) => {
412 return Err(Error::new(
413 ErrorKind::DataInvalid,
414 "Invalid v1 snapshot, when manifest list provided, manifest files should be omitted",
415 ));
416 }
417 (None, _) => {
418 return Err(Error::new(
419 ErrorKind::DataInvalid,
420 "Unsupported v1 snapshot, only manifest list is supported",
421 ));
422 }
423 },
424 summary: v1.summary.unwrap_or(Summary {
425 operation: Operation::default(),
426 additional_properties: HashMap::new(),
427 }),
428 schema_id: v1.schema_id,
429 encryption_key_id: None,
430 row_range: None,
431 })
432 }
433 }
434
435 impl From<Snapshot> for SnapshotV1 {
436 fn from(v2: Snapshot) -> Self {
437 SnapshotV1 {
438 snapshot_id: v2.snapshot_id,
439 parent_snapshot_id: v2.parent_snapshot_id,
440 timestamp_ms: v2.timestamp_ms,
441 manifest_list: Some(v2.manifest_list),
442 summary: Some(v2.summary),
443 schema_id: v2.schema_id,
444 manifests: None,
445 }
446 }
447 }
448}
449
450#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
451#[serde(rename_all = "kebab-case")]
452pub struct SnapshotReference {
454 pub snapshot_id: i64,
456 #[serde(flatten)]
457 pub retention: SnapshotRetention,
459}
460
461impl SnapshotReference {
462 pub fn is_branch(&self) -> bool {
464 matches!(self.retention, SnapshotRetention::Branch { .. })
465 }
466}
467
468impl SnapshotReference {
469 pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self {
471 SnapshotReference {
472 snapshot_id,
473 retention,
474 }
475 }
476}
477
478#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
479#[serde(rename_all = "lowercase", tag = "type")]
480pub enum SnapshotRetention {
482 #[serde(rename_all = "kebab-case")]
483 Branch {
486 #[serde(skip_serializing_if = "Option::is_none")]
489 min_snapshots_to_keep: Option<i32>,
490 #[serde(skip_serializing_if = "Option::is_none")]
493 max_snapshot_age_ms: Option<i64>,
494 #[serde(skip_serializing_if = "Option::is_none")]
497 max_ref_age_ms: Option<i64>,
498 },
499 #[serde(rename_all = "kebab-case")]
500 Tag {
502 #[serde(skip_serializing_if = "Option::is_none")]
505 max_ref_age_ms: Option<i64>,
506 },
507}
508
509impl SnapshotRetention {
510 pub fn branch(
512 min_snapshots_to_keep: Option<i32>,
513 max_snapshot_age_ms: Option<i64>,
514 max_ref_age_ms: Option<i64>,
515 ) -> Self {
516 SnapshotRetention::Branch {
517 min_snapshots_to_keep,
518 max_snapshot_age_ms,
519 max_ref_age_ms,
520 }
521 }
522}
523
524#[cfg(test)]
525mod tests {
526 use std::collections::HashMap;
527
528 use chrono::{TimeZone, Utc};
529
530 use crate::spec::TableMetadata;
531 use crate::spec::snapshot::_serde::SnapshotV1;
532 use crate::spec::snapshot::{Operation, Snapshot, Summary};
533
534 #[test]
535 fn schema() {
536 let record = r#"
537 {
538 "snapshot-id": 3051729675574597004,
539 "timestamp-ms": 1515100955770,
540 "summary": {
541 "operation": "append"
542 },
543 "manifest-list": "s3://b/wh/.../s1.avro",
544 "schema-id": 0
545 }
546 "#;
547
548 let result: Snapshot = serde_json::from_str::<SnapshotV1>(record)
549 .unwrap()
550 .try_into()
551 .unwrap();
552 assert_eq!(3051729675574597004, result.snapshot_id());
553 assert_eq!(
554 Utc.timestamp_millis_opt(1515100955770).unwrap(),
555 result.timestamp().unwrap()
556 );
557 assert_eq!(1515100955770, result.timestamp_ms());
558 assert_eq!(
559 Summary {
560 operation: Operation::Append,
561 additional_properties: HashMap::new()
562 },
563 *result.summary()
564 );
565 assert_eq!("s3://b/wh/.../s1.avro".to_string(), *result.manifest_list());
566 }
567
568 #[test]
569 fn test_snapshot_v1_to_v2_projection() {
570 use crate::spec::snapshot::_serde::SnapshotV1;
571
572 let v1_snapshot = SnapshotV1 {
574 snapshot_id: 1234567890,
575 parent_snapshot_id: Some(987654321),
576 timestamp_ms: 1515100955770,
577 manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
578 manifests: None, summary: Some(Summary {
580 operation: Operation::Append,
581 additional_properties: HashMap::from([
582 ("added-files".to_string(), "5".to_string()),
583 ("added-records".to_string(), "100".to_string()),
584 ]),
585 }),
586 schema_id: Some(1),
587 };
588
589 let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
591
592 assert_eq!(
594 v2_snapshot.sequence_number(),
595 0,
596 "V1 snapshot sequence_number should default to 0"
597 );
598
599 assert_eq!(v2_snapshot.snapshot_id(), 1234567890);
601 assert_eq!(v2_snapshot.parent_snapshot_id(), Some(987654321));
602 assert_eq!(v2_snapshot.timestamp_ms(), 1515100955770);
603 assert_eq!(
604 v2_snapshot.manifest_list(),
605 "s3://bucket/manifest-list.avro"
606 );
607 assert_eq!(v2_snapshot.schema_id(), Some(1));
608 assert_eq!(v2_snapshot.summary().operation, Operation::Append);
609 assert_eq!(
610 v2_snapshot
611 .summary()
612 .additional_properties
613 .get("added-files"),
614 Some(&"5".to_string())
615 );
616 }
617
618 #[test]
619 fn test_v1_snapshot_with_manifest_list_and_manifests() {
620 {
621 let metadata = r#"
622 {
623 "format-version": 1,
624 "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
625 "location": "s3://bucket/test/location",
626 "last-updated-ms": 1700000000000,
627 "last-column-id": 1,
628 "schema": {
629 "type": "struct",
630 "fields": [
631 {"id": 1, "name": "x", "required": true, "type": "long"}
632 ]
633 },
634 "partition-spec": [],
635 "properties": {},
636 "current-snapshot-id": 111111111,
637 "snapshots": [
638 {
639 "snapshot-id": 111111111,
640 "timestamp-ms": 1600000000000,
641 "summary": {"operation": "append"},
642 "manifest-list": "s3://bucket/metadata/snap-123.avro",
643 "manifests": ["s3://bucket/metadata/manifest-1.avro"]
644 }
645 ]
646 }
647 "#;
648
649 let result_both_manifest_list_and_manifest_set =
650 serde_json::from_str::<TableMetadata>(metadata);
651 assert!(result_both_manifest_list_and_manifest_set.is_err());
652 assert_eq!(
653 result_both_manifest_list_and_manifest_set
654 .unwrap_err()
655 .to_string(),
656 "DataInvalid => Invalid v1 snapshot, when manifest list provided, manifest files should be omitted"
657 )
658 }
659
660 {
661 let metadata = r#"
662 {
663 "format-version": 1,
664 "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
665 "location": "s3://bucket/test/location",
666 "last-updated-ms": 1700000000000,
667 "last-column-id": 1,
668 "schema": {
669 "type": "struct",
670 "fields": [
671 {"id": 1, "name": "x", "required": true, "type": "long"}
672 ]
673 },
674 "partition-spec": [],
675 "properties": {},
676 "current-snapshot-id": 111111111,
677 "snapshots": [
678 {
679 "snapshot-id": 111111111,
680 "timestamp-ms": 1600000000000,
681 "summary": {"operation": "append"},
682 "manifests": ["s3://bucket/metadata/manifest-1.avro"]
683 }
684 ]
685 }
686 "#;
687 let result_missing_manifest_list = serde_json::from_str::<TableMetadata>(metadata);
688 assert!(result_missing_manifest_list.is_err());
689 assert_eq!(
690 result_missing_manifest_list.unwrap_err().to_string(),
691 "DataInvalid => Unsupported v1 snapshot, only manifest list is supported"
692 )
693 }
694 }
695
696 #[test]
697 fn test_snapshot_v1_to_v2_with_missing_summary() {
698 use crate::spec::snapshot::_serde::SnapshotV1;
699
700 let v1_snapshot = SnapshotV1 {
702 snapshot_id: 1111111111,
703 parent_snapshot_id: None,
704 timestamp_ms: 1515100955770,
705 manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
706 manifests: None,
707 summary: None, schema_id: None,
709 };
710
711 let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
713
714 assert_eq!(
716 v2_snapshot.sequence_number(),
717 0,
718 "V1 snapshot sequence_number should default to 0"
719 );
720 assert_eq!(
721 v2_snapshot.summary().operation,
722 Operation::Append,
723 "Missing V1 summary should default to Append operation"
724 );
725 assert!(
726 v2_snapshot.summary().additional_properties.is_empty(),
727 "Default summary should have empty additional_properties"
728 );
729
730 assert_eq!(v2_snapshot.snapshot_id(), 1111111111);
732 assert_eq!(v2_snapshot.parent_snapshot_id(), None);
733 assert_eq!(v2_snapshot.schema_id(), None);
734 }
735}