1use std::collections::HashMap;
22use std::sync::Arc;
23
24use chrono::{DateTime, Utc};
25use serde::{Deserialize, Serialize};
26use typed_builder::TypedBuilder;
27
28use super::table_metadata::SnapshotLog;
29use crate::error::{Result, timestamp_ms_to_utc};
30use crate::io::FileIO;
31use crate::spec::{ManifestList, SchemaId, SchemaRef, TableMetadata};
32use crate::{Error, ErrorKind};
33
34pub const MAIN_BRANCH: &str = "main";
36pub const UNASSIGNED_SNAPSHOT_ID: i64 = -1;
38
39pub type SnapshotRef = Arc<Snapshot>;
41#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
42#[serde(rename_all = "lowercase")]
43pub enum Operation {
45 Append,
47 Replace,
50 Overwrite,
52 Delete,
54}
55
56impl Operation {
57 pub fn as_str(&self) -> &str {
59 match self {
60 Operation::Append => "append",
61 Operation::Replace => "replace",
62 Operation::Overwrite => "overwrite",
63 Operation::Delete => "delete",
64 }
65 }
66}
67
68#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
69pub struct Summary {
71 pub operation: Operation,
73 #[serde(flatten)]
75 pub additional_properties: HashMap<String, String>,
76}
77
78impl Default for Operation {
79 fn default() -> Operation {
80 Self::Append
81 }
82}
83
84#[derive(Debug, PartialEq, Eq, Clone)]
85pub struct SnapshotRowRange {
87 pub first_row_id: u64,
89 pub added_rows: u64,
91}
92
93#[derive(Debug, PartialEq, Eq, Clone, TypedBuilder)]
94#[builder(field_defaults(setter(prefix = "with_")))]
95pub struct Snapshot {
97 pub(crate) snapshot_id: i64,
99 #[builder(default = None)]
102 pub(crate) parent_snapshot_id: Option<i64>,
103 pub(crate) sequence_number: i64,
106 pub(crate) timestamp_ms: i64,
109 #[builder(setter(into))]
113 pub(crate) manifest_list: String,
114 pub(crate) summary: Summary,
116 #[builder(setter(strip_option(fallback = schema_id_opt)), default = None)]
118 pub(crate) schema_id: Option<SchemaId>,
119 #[builder(default)]
121 pub(crate) encryption_key_id: Option<String>,
122 #[builder(default, setter(!strip_option, transform = |first_row_id: u64, added_rows: u64| Some(SnapshotRowRange { first_row_id, added_rows })))]
125 pub(crate) row_range: Option<SnapshotRowRange>,
130}
131
132impl Snapshot {
133 #[inline]
135 pub fn snapshot_id(&self) -> i64 {
136 self.snapshot_id
137 }
138
139 #[inline]
141 pub fn parent_snapshot_id(&self) -> Option<i64> {
142 self.parent_snapshot_id
143 }
144
145 #[inline]
147 pub fn sequence_number(&self) -> i64 {
148 self.sequence_number
149 }
150 #[inline]
152 pub fn manifest_list(&self) -> &str {
153 &self.manifest_list
154 }
155
156 #[inline]
158 pub fn summary(&self) -> &Summary {
159 &self.summary
160 }
161 #[inline]
163 pub fn timestamp(&self) -> Result<DateTime<Utc>> {
164 timestamp_ms_to_utc(self.timestamp_ms)
165 }
166
167 #[inline]
169 pub fn timestamp_ms(&self) -> i64 {
170 self.timestamp_ms
171 }
172
173 #[inline]
175 pub fn schema_id(&self) -> Option<SchemaId> {
176 self.schema_id
177 }
178
179 pub fn schema(&self, table_metadata: &TableMetadata) -> Result<SchemaRef> {
181 Ok(match self.schema_id() {
182 Some(schema_id) => table_metadata
183 .schema_by_id(schema_id)
184 .ok_or_else(|| {
185 Error::new(
186 ErrorKind::DataInvalid,
187 format!("Schema with id {schema_id} not found"),
188 )
189 })?
190 .clone(),
191 None => table_metadata.current_schema().clone(),
192 })
193 }
194
195 #[cfg(test)]
197 pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option<SnapshotRef> {
198 match self.parent_snapshot_id {
199 Some(id) => table_metadata.snapshot_by_id(id).cloned(),
200 None => None,
201 }
202 }
203
204 pub async fn load_manifest_list(
206 &self,
207 file_io: &FileIO,
208 table_metadata: &TableMetadata,
209 ) -> Result<ManifestList> {
210 let manifest_list_content = file_io.new_input(&self.manifest_list)?.read().await?;
211 ManifestList::parse_with_version(
212 &manifest_list_content,
213 table_metadata.format_version(),
216 )
217 }
218
219 #[allow(dead_code)]
220 pub(crate) fn log(&self) -> SnapshotLog {
221 SnapshotLog {
222 timestamp_ms: self.timestamp_ms,
223 snapshot_id: self.snapshot_id,
224 }
225 }
226
227 pub fn first_row_id(&self) -> Option<u64> {
234 self.row_range.as_ref().map(|r| r.first_row_id)
235 }
236
237 pub fn added_rows_count(&self) -> Option<u64> {
242 self.row_range.as_ref().map(|r| r.added_rows)
243 }
244
245 pub fn row_range(&self) -> Option<(u64, u64)> {
248 self.row_range
249 .as_ref()
250 .map(|r| (r.first_row_id, r.added_rows))
251 }
252
253 pub fn encryption_key_id(&self) -> Option<&str> {
255 self.encryption_key_id.as_deref()
256 }
257}
258
259pub(super) mod _serde {
260 use std::collections::HashMap;
265
266 use serde::{Deserialize, Serialize};
267
268 use super::{Operation, Snapshot, Summary};
269 use crate::spec::SchemaId;
270 use crate::spec::snapshot::SnapshotRowRange;
271 use crate::{Error, ErrorKind};
272
273 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
274 #[serde(rename_all = "kebab-case")]
275 pub(crate) struct SnapshotV3 {
277 pub snapshot_id: i64,
278 #[serde(skip_serializing_if = "Option::is_none")]
279 pub parent_snapshot_id: Option<i64>,
280 pub sequence_number: i64,
281 pub timestamp_ms: i64,
282 pub manifest_list: String,
283 pub summary: Summary,
284 #[serde(skip_serializing_if = "Option::is_none")]
285 pub schema_id: Option<SchemaId>,
286 #[serde(skip_serializing_if = "Option::is_none")]
287 pub first_row_id: Option<u64>,
288 #[serde(skip_serializing_if = "Option::is_none")]
289 pub added_rows: Option<u64>,
290 #[serde(skip_serializing_if = "Option::is_none")]
291 pub key_id: Option<String>,
292 }
293
294 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
295 #[serde(rename_all = "kebab-case")]
296 pub(crate) struct SnapshotV2 {
298 pub snapshot_id: i64,
299 #[serde(skip_serializing_if = "Option::is_none")]
300 pub parent_snapshot_id: Option<i64>,
301 pub sequence_number: i64,
302 pub timestamp_ms: i64,
303 pub manifest_list: String,
304 pub summary: Summary,
305 #[serde(skip_serializing_if = "Option::is_none")]
306 pub schema_id: Option<SchemaId>,
307 }
308
309 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
310 #[serde(rename_all = "kebab-case")]
311 pub(crate) struct SnapshotV1 {
313 pub snapshot_id: i64,
314 #[serde(skip_serializing_if = "Option::is_none")]
315 pub parent_snapshot_id: Option<i64>,
316 pub timestamp_ms: i64,
317 #[serde(skip_serializing_if = "Option::is_none")]
318 pub manifest_list: Option<String>,
319 #[serde(skip_serializing_if = "Option::is_none")]
320 pub manifests: Option<Vec<String>>,
321 #[serde(skip_serializing_if = "Option::is_none")]
322 pub summary: Option<Summary>,
323 #[serde(skip_serializing_if = "Option::is_none")]
324 pub schema_id: Option<SchemaId>,
325 }
326
327 impl From<SnapshotV3> for Snapshot {
328 fn from(s: SnapshotV3) -> Self {
329 Snapshot {
330 snapshot_id: s.snapshot_id,
331 parent_snapshot_id: s.parent_snapshot_id,
332 sequence_number: s.sequence_number,
333 timestamp_ms: s.timestamp_ms,
334 manifest_list: s.manifest_list,
335 summary: s.summary,
336 schema_id: s.schema_id,
337 encryption_key_id: s.key_id,
338 row_range: match (s.first_row_id, s.added_rows) {
339 (Some(first_row_id), Some(added_rows)) => Some(SnapshotRowRange {
340 first_row_id,
341 added_rows,
342 }),
343 _ => None,
344 },
345 }
346 }
347 }
348
349 impl TryFrom<Snapshot> for SnapshotV3 {
350 type Error = Error;
351
352 fn try_from(s: Snapshot) -> Result<Self, Self::Error> {
353 let (first_row_id, added_rows) = match s.row_range {
354 Some(row_range) => (Some(row_range.first_row_id), Some(row_range.added_rows)),
355 None => (None, None),
356 };
357
358 Ok(SnapshotV3 {
359 snapshot_id: s.snapshot_id,
360 parent_snapshot_id: s.parent_snapshot_id,
361 sequence_number: s.sequence_number,
362 timestamp_ms: s.timestamp_ms,
363 manifest_list: s.manifest_list,
364 summary: s.summary,
365 schema_id: s.schema_id,
366 first_row_id,
367 added_rows,
368 key_id: s.encryption_key_id,
369 })
370 }
371 }
372
373 impl From<SnapshotV2> for Snapshot {
374 fn from(v2: SnapshotV2) -> Self {
375 Snapshot {
376 snapshot_id: v2.snapshot_id,
377 parent_snapshot_id: v2.parent_snapshot_id,
378 sequence_number: v2.sequence_number,
379 timestamp_ms: v2.timestamp_ms,
380 manifest_list: v2.manifest_list,
381 summary: v2.summary,
382 schema_id: v2.schema_id,
383 encryption_key_id: None,
384 row_range: None,
385 }
386 }
387 }
388
389 impl From<Snapshot> for SnapshotV2 {
390 fn from(v2: Snapshot) -> Self {
391 SnapshotV2 {
392 snapshot_id: v2.snapshot_id,
393 parent_snapshot_id: v2.parent_snapshot_id,
394 sequence_number: v2.sequence_number,
395 timestamp_ms: v2.timestamp_ms,
396 manifest_list: v2.manifest_list,
397 summary: v2.summary,
398 schema_id: v2.schema_id,
399 }
400 }
401 }
402
403 impl TryFrom<SnapshotV1> for Snapshot {
404 type Error = Error;
405
406 fn try_from(v1: SnapshotV1) -> Result<Self, Self::Error> {
407 Ok(Snapshot {
408 snapshot_id: v1.snapshot_id,
409 parent_snapshot_id: v1.parent_snapshot_id,
410 sequence_number: 0,
411 timestamp_ms: v1.timestamp_ms,
412 manifest_list: match (v1.manifest_list, v1.manifests) {
413 (Some(file), None) => file,
414 (Some(_), Some(_)) => {
415 return Err(Error::new(
416 ErrorKind::DataInvalid,
417 "Invalid v1 snapshot, when manifest list provided, manifest files should be omitted",
418 ));
419 }
420 (None, _) => {
421 return Err(Error::new(
422 ErrorKind::DataInvalid,
423 "Unsupported v1 snapshot, only manifest list is supported",
424 ));
425 }
426 },
427 summary: v1.summary.unwrap_or(Summary {
428 operation: Operation::default(),
429 additional_properties: HashMap::new(),
430 }),
431 schema_id: v1.schema_id,
432 encryption_key_id: None,
433 row_range: None,
434 })
435 }
436 }
437
438 impl From<Snapshot> for SnapshotV1 {
439 fn from(v2: Snapshot) -> Self {
440 SnapshotV1 {
441 snapshot_id: v2.snapshot_id,
442 parent_snapshot_id: v2.parent_snapshot_id,
443 timestamp_ms: v2.timestamp_ms,
444 manifest_list: Some(v2.manifest_list),
445 summary: Some(v2.summary),
446 schema_id: v2.schema_id,
447 manifests: None,
448 }
449 }
450 }
451}
452
453#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
454#[serde(rename_all = "kebab-case")]
455pub struct SnapshotReference {
457 pub snapshot_id: i64,
459 #[serde(flatten)]
460 pub retention: SnapshotRetention,
462}
463
464impl SnapshotReference {
465 pub fn is_branch(&self) -> bool {
467 matches!(self.retention, SnapshotRetention::Branch { .. })
468 }
469}
470
471impl SnapshotReference {
472 pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self {
474 SnapshotReference {
475 snapshot_id,
476 retention,
477 }
478 }
479}
480
481#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
482#[serde(rename_all = "lowercase", tag = "type")]
483pub enum SnapshotRetention {
485 #[serde(rename_all = "kebab-case")]
486 Branch {
489 #[serde(skip_serializing_if = "Option::is_none")]
492 min_snapshots_to_keep: Option<i32>,
493 #[serde(skip_serializing_if = "Option::is_none")]
496 max_snapshot_age_ms: Option<i64>,
497 #[serde(skip_serializing_if = "Option::is_none")]
500 max_ref_age_ms: Option<i64>,
501 },
502 #[serde(rename_all = "kebab-case")]
503 Tag {
505 #[serde(skip_serializing_if = "Option::is_none")]
508 max_ref_age_ms: Option<i64>,
509 },
510}
511
512impl SnapshotRetention {
513 pub fn branch(
515 min_snapshots_to_keep: Option<i32>,
516 max_snapshot_age_ms: Option<i64>,
517 max_ref_age_ms: Option<i64>,
518 ) -> Self {
519 SnapshotRetention::Branch {
520 min_snapshots_to_keep,
521 max_snapshot_age_ms,
522 max_ref_age_ms,
523 }
524 }
525}
526
527#[cfg(test)]
528mod tests {
529 use std::collections::HashMap;
530
531 use chrono::{TimeZone, Utc};
532
533 use crate::spec::TableMetadata;
534 use crate::spec::snapshot::_serde::SnapshotV1;
535 use crate::spec::snapshot::{Operation, Snapshot, Summary};
536
537 #[test]
538 fn schema() {
539 let record = r#"
540 {
541 "snapshot-id": 3051729675574597004,
542 "timestamp-ms": 1515100955770,
543 "summary": {
544 "operation": "append"
545 },
546 "manifest-list": "s3://b/wh/.../s1.avro",
547 "schema-id": 0
548 }
549 "#;
550
551 let result: Snapshot = serde_json::from_str::<SnapshotV1>(record)
552 .unwrap()
553 .try_into()
554 .unwrap();
555 assert_eq!(3051729675574597004, result.snapshot_id());
556 assert_eq!(
557 Utc.timestamp_millis_opt(1515100955770).unwrap(),
558 result.timestamp().unwrap()
559 );
560 assert_eq!(1515100955770, result.timestamp_ms());
561 assert_eq!(
562 Summary {
563 operation: Operation::Append,
564 additional_properties: HashMap::new()
565 },
566 *result.summary()
567 );
568 assert_eq!("s3://b/wh/.../s1.avro".to_string(), *result.manifest_list());
569 }
570
571 #[test]
572 fn test_snapshot_v1_to_v2_projection() {
573 use crate::spec::snapshot::_serde::SnapshotV1;
574
575 let v1_snapshot = SnapshotV1 {
577 snapshot_id: 1234567890,
578 parent_snapshot_id: Some(987654321),
579 timestamp_ms: 1515100955770,
580 manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
581 manifests: None, summary: Some(Summary {
583 operation: Operation::Append,
584 additional_properties: HashMap::from([
585 ("added-files".to_string(), "5".to_string()),
586 ("added-records".to_string(), "100".to_string()),
587 ]),
588 }),
589 schema_id: Some(1),
590 };
591
592 let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
594
595 assert_eq!(
597 v2_snapshot.sequence_number(),
598 0,
599 "V1 snapshot sequence_number should default to 0"
600 );
601
602 assert_eq!(v2_snapshot.snapshot_id(), 1234567890);
604 assert_eq!(v2_snapshot.parent_snapshot_id(), Some(987654321));
605 assert_eq!(v2_snapshot.timestamp_ms(), 1515100955770);
606 assert_eq!(
607 v2_snapshot.manifest_list(),
608 "s3://bucket/manifest-list.avro"
609 );
610 assert_eq!(v2_snapshot.schema_id(), Some(1));
611 assert_eq!(v2_snapshot.summary().operation, Operation::Append);
612 assert_eq!(
613 v2_snapshot
614 .summary()
615 .additional_properties
616 .get("added-files"),
617 Some(&"5".to_string())
618 );
619 }
620
621 #[test]
622 fn test_v1_snapshot_with_manifest_list_and_manifests() {
623 {
624 let metadata = r#"
625 {
626 "format-version": 1,
627 "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
628 "location": "s3://bucket/test/location",
629 "last-updated-ms": 1700000000000,
630 "last-column-id": 1,
631 "schema": {
632 "type": "struct",
633 "fields": [
634 {"id": 1, "name": "x", "required": true, "type": "long"}
635 ]
636 },
637 "partition-spec": [],
638 "properties": {},
639 "current-snapshot-id": 111111111,
640 "snapshots": [
641 {
642 "snapshot-id": 111111111,
643 "timestamp-ms": 1600000000000,
644 "summary": {"operation": "append"},
645 "manifest-list": "s3://bucket/metadata/snap-123.avro",
646 "manifests": ["s3://bucket/metadata/manifest-1.avro"]
647 }
648 ]
649 }
650 "#;
651
652 let result_both_manifest_list_and_manifest_set =
653 serde_json::from_str::<TableMetadata>(metadata);
654 assert!(result_both_manifest_list_and_manifest_set.is_err());
655 assert_eq!(
656 result_both_manifest_list_and_manifest_set
657 .unwrap_err()
658 .to_string(),
659 "DataInvalid => Invalid v1 snapshot, when manifest list provided, manifest files should be omitted"
660 )
661 }
662
663 {
664 let metadata = r#"
665 {
666 "format-version": 1,
667 "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
668 "location": "s3://bucket/test/location",
669 "last-updated-ms": 1700000000000,
670 "last-column-id": 1,
671 "schema": {
672 "type": "struct",
673 "fields": [
674 {"id": 1, "name": "x", "required": true, "type": "long"}
675 ]
676 },
677 "partition-spec": [],
678 "properties": {},
679 "current-snapshot-id": 111111111,
680 "snapshots": [
681 {
682 "snapshot-id": 111111111,
683 "timestamp-ms": 1600000000000,
684 "summary": {"operation": "append"},
685 "manifests": ["s3://bucket/metadata/manifest-1.avro"]
686 }
687 ]
688 }
689 "#;
690 let result_missing_manifest_list = serde_json::from_str::<TableMetadata>(metadata);
691 assert!(result_missing_manifest_list.is_err());
692 assert_eq!(
693 result_missing_manifest_list.unwrap_err().to_string(),
694 "DataInvalid => Unsupported v1 snapshot, only manifest list is supported"
695 )
696 }
697 }
698
699 #[test]
700 fn test_snapshot_v1_to_v2_with_missing_summary() {
701 use crate::spec::snapshot::_serde::SnapshotV1;
702
703 let v1_snapshot = SnapshotV1 {
705 snapshot_id: 1111111111,
706 parent_snapshot_id: None,
707 timestamp_ms: 1515100955770,
708 manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
709 manifests: None,
710 summary: None, schema_id: None,
712 };
713
714 let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
716
717 assert_eq!(
719 v2_snapshot.sequence_number(),
720 0,
721 "V1 snapshot sequence_number should default to 0"
722 );
723 assert_eq!(
724 v2_snapshot.summary().operation,
725 Operation::Append,
726 "Missing V1 summary should default to Append operation"
727 );
728 assert!(
729 v2_snapshot.summary().additional_properties.is_empty(),
730 "Default summary should have empty additional_properties"
731 );
732
733 assert_eq!(v2_snapshot.snapshot_id(), 1111111111);
735 assert_eq!(v2_snapshot.parent_snapshot_id(), None);
736 assert_eq!(v2_snapshot.schema_id(), None);
737 }
738}