1use std::collections::HashMap;
22use std::sync::Arc;
23
24use chrono::{DateTime, Utc};
25use serde::{Deserialize, Serialize};
26use typed_builder::TypedBuilder;
27
28use super::table_metadata::SnapshotLog;
29use crate::error::{Result, timestamp_ms_to_utc};
30use crate::io::FileIO;
31use crate::spec::{ManifestList, SchemaId, SchemaRef, TableMetadata};
32use crate::{Error, ErrorKind};
33
34pub const MAIN_BRANCH: &str = "main";
36
37pub type SnapshotRef = Arc<Snapshot>;
39#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone)]
40#[serde(rename_all = "lowercase")]
41pub enum Operation {
43 #[default]
45 Append,
46 Replace,
49 Overwrite,
51 Delete,
53}
54
55impl Operation {
56 pub fn as_str(&self) -> &str {
58 match self {
59 Operation::Append => "append",
60 Operation::Replace => "replace",
61 Operation::Overwrite => "overwrite",
62 Operation::Delete => "delete",
63 }
64 }
65}
66
67#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
68pub struct Summary {
70 pub operation: Operation,
72 #[serde(flatten)]
74 pub additional_properties: HashMap<String, String>,
75}
76
77#[derive(Debug, PartialEq, Eq, Clone)]
78pub struct SnapshotRowRange {
80 pub first_row_id: u64,
82 pub added_rows: u64,
84}
85
86#[derive(Debug, PartialEq, Eq, Clone, TypedBuilder)]
87#[builder(field_defaults(setter(prefix = "with_")))]
88pub struct Snapshot {
90 pub(crate) snapshot_id: i64,
92 #[builder(default = None)]
95 pub(crate) parent_snapshot_id: Option<i64>,
96 pub(crate) sequence_number: i64,
99 pub(crate) timestamp_ms: i64,
102 #[builder(setter(into))]
106 pub(crate) manifest_list: String,
107 pub(crate) summary: Summary,
109 #[builder(setter(strip_option(fallback = schema_id_opt)), default = None)]
111 pub(crate) schema_id: Option<SchemaId>,
112 #[builder(default)]
114 pub(crate) encryption_key_id: Option<String>,
115 #[builder(default, setter(!strip_option, transform = |first_row_id: u64, added_rows: u64| Some(SnapshotRowRange { first_row_id, added_rows })))]
118 pub(crate) row_range: Option<SnapshotRowRange>,
123}
124
125impl Snapshot {
126 #[inline]
128 pub fn snapshot_id(&self) -> i64 {
129 self.snapshot_id
130 }
131
132 #[inline]
134 pub fn parent_snapshot_id(&self) -> Option<i64> {
135 self.parent_snapshot_id
136 }
137
138 #[inline]
140 pub fn sequence_number(&self) -> i64 {
141 self.sequence_number
142 }
143 #[inline]
145 pub fn manifest_list(&self) -> &str {
146 &self.manifest_list
147 }
148
149 #[inline]
151 pub fn summary(&self) -> &Summary {
152 &self.summary
153 }
154 #[inline]
156 pub fn timestamp(&self) -> Result<DateTime<Utc>> {
157 timestamp_ms_to_utc(self.timestamp_ms)
158 }
159
160 #[inline]
162 pub fn timestamp_ms(&self) -> i64 {
163 self.timestamp_ms
164 }
165
166 #[inline]
168 pub fn schema_id(&self) -> Option<SchemaId> {
169 self.schema_id
170 }
171
172 pub fn schema(&self, table_metadata: &TableMetadata) -> Result<SchemaRef> {
174 Ok(match self.schema_id() {
175 Some(schema_id) => table_metadata
176 .schema_by_id(schema_id)
177 .ok_or_else(|| {
178 Error::new(
179 ErrorKind::DataInvalid,
180 format!("Schema with id {schema_id} not found"),
181 )
182 })?
183 .clone(),
184 None => table_metadata.current_schema().clone(),
185 })
186 }
187
188 #[cfg(test)]
190 pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option<SnapshotRef> {
191 match self.parent_snapshot_id {
192 Some(id) => table_metadata.snapshot_by_id(id).cloned(),
193 None => None,
194 }
195 }
196
197 pub async fn load_manifest_list(
199 &self,
200 file_io: &FileIO,
201 table_metadata: &TableMetadata,
202 ) -> Result<ManifestList> {
203 let manifest_list_content = file_io.new_input(&self.manifest_list)?.read().await?;
204 ManifestList::parse_with_version(
205 &manifest_list_content,
206 table_metadata.format_version(),
209 )
210 }
211
212 #[allow(dead_code)]
213 pub(crate) fn log(&self) -> SnapshotLog {
214 SnapshotLog {
215 timestamp_ms: self.timestamp_ms,
216 snapshot_id: self.snapshot_id,
217 }
218 }
219
220 pub fn first_row_id(&self) -> Option<u64> {
227 self.row_range.as_ref().map(|r| r.first_row_id)
228 }
229
230 pub fn added_rows_count(&self) -> Option<u64> {
235 self.row_range.as_ref().map(|r| r.added_rows)
236 }
237
238 pub fn row_range(&self) -> Option<(u64, u64)> {
241 self.row_range
242 .as_ref()
243 .map(|r| (r.first_row_id, r.added_rows))
244 }
245
246 pub fn encryption_key_id(&self) -> Option<&str> {
248 self.encryption_key_id.as_deref()
249 }
250}
251
252pub(super) mod _serde {
253 use std::collections::HashMap;
258
259 use serde::{Deserialize, Serialize};
260
261 use super::{Operation, Snapshot, Summary};
262 use crate::spec::SchemaId;
263 use crate::spec::snapshot::SnapshotRowRange;
264 use crate::{Error, ErrorKind};
265
266 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
267 #[serde(rename_all = "kebab-case")]
268 pub(crate) struct SnapshotV3 {
270 pub snapshot_id: i64,
271 #[serde(skip_serializing_if = "Option::is_none")]
272 pub parent_snapshot_id: Option<i64>,
273 pub sequence_number: i64,
274 pub timestamp_ms: i64,
275 pub manifest_list: String,
276 pub summary: Summary,
277 #[serde(skip_serializing_if = "Option::is_none")]
278 pub schema_id: Option<SchemaId>,
279 #[serde(skip_serializing_if = "Option::is_none")]
280 pub first_row_id: Option<u64>,
281 #[serde(skip_serializing_if = "Option::is_none")]
282 pub added_rows: Option<u64>,
283 #[serde(skip_serializing_if = "Option::is_none")]
284 pub key_id: Option<String>,
285 }
286
287 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
288 #[serde(rename_all = "kebab-case")]
289 pub(crate) struct SnapshotV2 {
291 pub snapshot_id: i64,
292 #[serde(skip_serializing_if = "Option::is_none")]
293 pub parent_snapshot_id: Option<i64>,
294 pub sequence_number: i64,
295 pub timestamp_ms: i64,
296 pub manifest_list: String,
297 pub summary: Summary,
298 #[serde(skip_serializing_if = "Option::is_none")]
299 pub schema_id: Option<SchemaId>,
300 }
301
302 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
303 #[serde(rename_all = "kebab-case")]
304 pub(crate) struct SnapshotV1 {
306 pub snapshot_id: i64,
307 #[serde(skip_serializing_if = "Option::is_none")]
308 pub parent_snapshot_id: Option<i64>,
309 pub timestamp_ms: i64,
310 #[serde(skip_serializing_if = "Option::is_none")]
311 pub manifest_list: Option<String>,
312 #[serde(skip_serializing_if = "Option::is_none")]
313 pub manifests: Option<Vec<String>>,
314 #[serde(skip_serializing_if = "Option::is_none")]
315 pub summary: Option<Summary>,
316 #[serde(skip_serializing_if = "Option::is_none")]
317 pub schema_id: Option<SchemaId>,
318 }
319
320 impl From<SnapshotV3> for Snapshot {
321 fn from(s: SnapshotV3) -> Self {
322 Snapshot {
323 snapshot_id: s.snapshot_id,
324 parent_snapshot_id: s.parent_snapshot_id,
325 sequence_number: s.sequence_number,
326 timestamp_ms: s.timestamp_ms,
327 manifest_list: s.manifest_list,
328 summary: s.summary,
329 schema_id: s.schema_id,
330 encryption_key_id: s.key_id,
331 row_range: match (s.first_row_id, s.added_rows) {
332 (Some(first_row_id), Some(added_rows)) => Some(SnapshotRowRange {
333 first_row_id,
334 added_rows,
335 }),
336 _ => None,
337 },
338 }
339 }
340 }
341
342 impl TryFrom<Snapshot> for SnapshotV3 {
343 type Error = Error;
344
345 fn try_from(s: Snapshot) -> Result<Self, Self::Error> {
346 let (first_row_id, added_rows) = match s.row_range {
347 Some(row_range) => (Some(row_range.first_row_id), Some(row_range.added_rows)),
348 None => (None, None),
349 };
350
351 Ok(SnapshotV3 {
352 snapshot_id: s.snapshot_id,
353 parent_snapshot_id: s.parent_snapshot_id,
354 sequence_number: s.sequence_number,
355 timestamp_ms: s.timestamp_ms,
356 manifest_list: s.manifest_list,
357 summary: s.summary,
358 schema_id: s.schema_id,
359 first_row_id,
360 added_rows,
361 key_id: s.encryption_key_id,
362 })
363 }
364 }
365
366 impl From<SnapshotV2> for Snapshot {
367 fn from(v2: SnapshotV2) -> Self {
368 Snapshot {
369 snapshot_id: v2.snapshot_id,
370 parent_snapshot_id: v2.parent_snapshot_id,
371 sequence_number: v2.sequence_number,
372 timestamp_ms: v2.timestamp_ms,
373 manifest_list: v2.manifest_list,
374 summary: v2.summary,
375 schema_id: v2.schema_id,
376 encryption_key_id: None,
377 row_range: None,
378 }
379 }
380 }
381
382 impl From<Snapshot> for SnapshotV2 {
383 fn from(v2: Snapshot) -> Self {
384 SnapshotV2 {
385 snapshot_id: v2.snapshot_id,
386 parent_snapshot_id: v2.parent_snapshot_id,
387 sequence_number: v2.sequence_number,
388 timestamp_ms: v2.timestamp_ms,
389 manifest_list: v2.manifest_list,
390 summary: v2.summary,
391 schema_id: v2.schema_id,
392 }
393 }
394 }
395
396 impl TryFrom<SnapshotV1> for Snapshot {
397 type Error = Error;
398
399 fn try_from(v1: SnapshotV1) -> Result<Self, Self::Error> {
400 Ok(Snapshot {
401 snapshot_id: v1.snapshot_id,
402 parent_snapshot_id: v1.parent_snapshot_id,
403 sequence_number: 0,
404 timestamp_ms: v1.timestamp_ms,
405 manifest_list: match (v1.manifest_list, v1.manifests) {
406 (Some(file), None) => file,
407 (Some(_), Some(_)) => {
408 return Err(Error::new(
409 ErrorKind::DataInvalid,
410 "Invalid v1 snapshot, when manifest list provided, manifest files should be omitted",
411 ));
412 }
413 (None, _) => {
414 return Err(Error::new(
415 ErrorKind::DataInvalid,
416 "Unsupported v1 snapshot, only manifest list is supported",
417 ));
418 }
419 },
420 summary: v1.summary.unwrap_or(Summary {
421 operation: Operation::default(),
422 additional_properties: HashMap::new(),
423 }),
424 schema_id: v1.schema_id,
425 encryption_key_id: None,
426 row_range: None,
427 })
428 }
429 }
430
431 impl From<Snapshot> for SnapshotV1 {
432 fn from(v2: Snapshot) -> Self {
433 SnapshotV1 {
434 snapshot_id: v2.snapshot_id,
435 parent_snapshot_id: v2.parent_snapshot_id,
436 timestamp_ms: v2.timestamp_ms,
437 manifest_list: Some(v2.manifest_list),
438 summary: Some(v2.summary),
439 schema_id: v2.schema_id,
440 manifests: None,
441 }
442 }
443 }
444}
445
446#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
447#[serde(rename_all = "kebab-case")]
448pub struct SnapshotReference {
450 pub snapshot_id: i64,
452 #[serde(flatten)]
453 pub retention: SnapshotRetention,
455}
456
457impl SnapshotReference {
458 pub fn is_branch(&self) -> bool {
460 matches!(self.retention, SnapshotRetention::Branch { .. })
461 }
462}
463
464impl SnapshotReference {
465 pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self {
467 SnapshotReference {
468 snapshot_id,
469 retention,
470 }
471 }
472}
473
474#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
475#[serde(rename_all = "lowercase", tag = "type")]
476pub enum SnapshotRetention {
478 #[serde(rename_all = "kebab-case")]
479 Branch {
482 #[serde(skip_serializing_if = "Option::is_none")]
485 min_snapshots_to_keep: Option<i32>,
486 #[serde(skip_serializing_if = "Option::is_none")]
489 max_snapshot_age_ms: Option<i64>,
490 #[serde(skip_serializing_if = "Option::is_none")]
493 max_ref_age_ms: Option<i64>,
494 },
495 #[serde(rename_all = "kebab-case")]
496 Tag {
498 #[serde(skip_serializing_if = "Option::is_none")]
501 max_ref_age_ms: Option<i64>,
502 },
503}
504
505impl SnapshotRetention {
506 pub fn branch(
508 min_snapshots_to_keep: Option<i32>,
509 max_snapshot_age_ms: Option<i64>,
510 max_ref_age_ms: Option<i64>,
511 ) -> Self {
512 SnapshotRetention::Branch {
513 min_snapshots_to_keep,
514 max_snapshot_age_ms,
515 max_ref_age_ms,
516 }
517 }
518}
519
520#[cfg(test)]
521mod tests {
522 use std::collections::HashMap;
523
524 use chrono::{TimeZone, Utc};
525
526 use crate::spec::TableMetadata;
527 use crate::spec::snapshot::_serde::SnapshotV1;
528 use crate::spec::snapshot::{Operation, Snapshot, Summary};
529
530 #[test]
531 fn schema() {
532 let record = r#"
533 {
534 "snapshot-id": 3051729675574597004,
535 "timestamp-ms": 1515100955770,
536 "summary": {
537 "operation": "append"
538 },
539 "manifest-list": "s3://b/wh/.../s1.avro",
540 "schema-id": 0
541 }
542 "#;
543
544 let result: Snapshot = serde_json::from_str::<SnapshotV1>(record)
545 .unwrap()
546 .try_into()
547 .unwrap();
548 assert_eq!(3051729675574597004, result.snapshot_id());
549 assert_eq!(
550 Utc.timestamp_millis_opt(1515100955770).unwrap(),
551 result.timestamp().unwrap()
552 );
553 assert_eq!(1515100955770, result.timestamp_ms());
554 assert_eq!(
555 Summary {
556 operation: Operation::Append,
557 additional_properties: HashMap::new()
558 },
559 *result.summary()
560 );
561 assert_eq!("s3://b/wh/.../s1.avro".to_string(), *result.manifest_list());
562 }
563
564 #[test]
565 fn test_snapshot_v1_to_v2_projection() {
566 use crate::spec::snapshot::_serde::SnapshotV1;
567
568 let v1_snapshot = SnapshotV1 {
570 snapshot_id: 1234567890,
571 parent_snapshot_id: Some(987654321),
572 timestamp_ms: 1515100955770,
573 manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
574 manifests: None, summary: Some(Summary {
576 operation: Operation::Append,
577 additional_properties: HashMap::from([
578 ("added-files".to_string(), "5".to_string()),
579 ("added-records".to_string(), "100".to_string()),
580 ]),
581 }),
582 schema_id: Some(1),
583 };
584
585 let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
587
588 assert_eq!(
590 v2_snapshot.sequence_number(),
591 0,
592 "V1 snapshot sequence_number should default to 0"
593 );
594
595 assert_eq!(v2_snapshot.snapshot_id(), 1234567890);
597 assert_eq!(v2_snapshot.parent_snapshot_id(), Some(987654321));
598 assert_eq!(v2_snapshot.timestamp_ms(), 1515100955770);
599 assert_eq!(
600 v2_snapshot.manifest_list(),
601 "s3://bucket/manifest-list.avro"
602 );
603 assert_eq!(v2_snapshot.schema_id(), Some(1));
604 assert_eq!(v2_snapshot.summary().operation, Operation::Append);
605 assert_eq!(
606 v2_snapshot
607 .summary()
608 .additional_properties
609 .get("added-files"),
610 Some(&"5".to_string())
611 );
612 }
613
614 #[test]
615 fn test_v1_snapshot_with_manifest_list_and_manifests() {
616 {
617 let metadata = r#"
618 {
619 "format-version": 1,
620 "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
621 "location": "s3://bucket/test/location",
622 "last-updated-ms": 1700000000000,
623 "last-column-id": 1,
624 "schema": {
625 "type": "struct",
626 "fields": [
627 {"id": 1, "name": "x", "required": true, "type": "long"}
628 ]
629 },
630 "partition-spec": [],
631 "properties": {},
632 "current-snapshot-id": 111111111,
633 "snapshots": [
634 {
635 "snapshot-id": 111111111,
636 "timestamp-ms": 1600000000000,
637 "summary": {"operation": "append"},
638 "manifest-list": "s3://bucket/metadata/snap-123.avro",
639 "manifests": ["s3://bucket/metadata/manifest-1.avro"]
640 }
641 ]
642 }
643 "#;
644
645 let result_both_manifest_list_and_manifest_set =
646 serde_json::from_str::<TableMetadata>(metadata);
647 assert!(result_both_manifest_list_and_manifest_set.is_err());
648 assert_eq!(
649 result_both_manifest_list_and_manifest_set
650 .unwrap_err()
651 .to_string(),
652 "DataInvalid => Invalid v1 snapshot, when manifest list provided, manifest files should be omitted"
653 )
654 }
655
656 {
657 let metadata = r#"
658 {
659 "format-version": 1,
660 "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
661 "location": "s3://bucket/test/location",
662 "last-updated-ms": 1700000000000,
663 "last-column-id": 1,
664 "schema": {
665 "type": "struct",
666 "fields": [
667 {"id": 1, "name": "x", "required": true, "type": "long"}
668 ]
669 },
670 "partition-spec": [],
671 "properties": {},
672 "current-snapshot-id": 111111111,
673 "snapshots": [
674 {
675 "snapshot-id": 111111111,
676 "timestamp-ms": 1600000000000,
677 "summary": {"operation": "append"},
678 "manifests": ["s3://bucket/metadata/manifest-1.avro"]
679 }
680 ]
681 }
682 "#;
683 let result_missing_manifest_list = serde_json::from_str::<TableMetadata>(metadata);
684 assert!(result_missing_manifest_list.is_err());
685 assert_eq!(
686 result_missing_manifest_list.unwrap_err().to_string(),
687 "DataInvalid => Unsupported v1 snapshot, only manifest list is supported"
688 )
689 }
690 }
691
692 #[test]
693 fn test_snapshot_v1_to_v2_with_missing_summary() {
694 use crate::spec::snapshot::_serde::SnapshotV1;
695
696 let v1_snapshot = SnapshotV1 {
698 snapshot_id: 1111111111,
699 parent_snapshot_id: None,
700 timestamp_ms: 1515100955770,
701 manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
702 manifests: None,
703 summary: None, schema_id: None,
705 };
706
707 let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
709
710 assert_eq!(
712 v2_snapshot.sequence_number(),
713 0,
714 "V1 snapshot sequence_number should default to 0"
715 );
716 assert_eq!(
717 v2_snapshot.summary().operation,
718 Operation::Append,
719 "Missing V1 summary should default to Append operation"
720 );
721 assert!(
722 v2_snapshot.summary().additional_properties.is_empty(),
723 "Default summary should have empty additional_properties"
724 );
725
726 assert_eq!(v2_snapshot.snapshot_id(), 1111111111);
728 assert_eq!(v2_snapshot.parent_snapshot_id(), None);
729 assert_eq!(v2_snapshot.schema_id(), None);
730 }
731}