1use std::collections::HashMap;
22use std::sync::Arc;
23
24use chrono::{DateTime, Utc};
25use serde::{Deserialize, Serialize};
26use typed_builder::TypedBuilder;
27
28use super::table_metadata::SnapshotLog;
29use crate::error::{Result, timestamp_ms_to_utc};
30use crate::io::FileIO;
31use crate::spec::{ManifestList, SchemaId, SchemaRef, TableMetadata};
32use crate::{Error, ErrorKind};
33
34pub const MAIN_BRANCH: &str = "main";
36
37pub type SnapshotRef = Arc<Snapshot>;
39#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone)]
40#[serde(rename_all = "lowercase")]
41pub enum Operation {
43 #[default]
45 Append,
46 Replace,
49 Overwrite,
51 Delete,
53}
54
55impl Operation {
56 pub fn as_str(&self) -> &str {
58 match self {
59 Operation::Append => "append",
60 Operation::Replace => "replace",
61 Operation::Overwrite => "overwrite",
62 Operation::Delete => "delete",
63 }
64 }
65}
66
67#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
68pub struct Summary {
70 pub operation: Operation,
72 #[serde(flatten)]
74 pub additional_properties: HashMap<String, String>,
75}
76
77#[derive(Debug, PartialEq, Eq, Clone)]
78pub struct SnapshotRowRange {
80 pub first_row_id: u64,
82 pub added_rows: u64,
84}
85
86#[derive(Debug, PartialEq, Eq, Clone, TypedBuilder)]
87#[builder(field_defaults(setter(prefix = "with_")))]
88pub struct Snapshot {
90 pub(crate) snapshot_id: i64,
92 #[builder(default = None)]
95 pub(crate) parent_snapshot_id: Option<i64>,
96 pub(crate) sequence_number: i64,
99 pub(crate) timestamp_ms: i64,
102 #[builder(setter(into))]
106 pub(crate) manifest_list: String,
107 pub(crate) summary: Summary,
109 #[builder(setter(strip_option(fallback = schema_id_opt)), default = None)]
111 pub(crate) schema_id: Option<SchemaId>,
112 #[builder(default)]
114 pub(crate) encryption_key_id: Option<String>,
115 #[builder(default, setter(!strip_option, transform = |first_row_id: u64, added_rows: u64| Some(SnapshotRowRange { first_row_id, added_rows })))]
118 pub(crate) row_range: Option<SnapshotRowRange>,
123}
124
125impl Snapshot {
126 #[inline]
128 pub fn snapshot_id(&self) -> i64 {
129 self.snapshot_id
130 }
131
132 #[inline]
134 pub fn parent_snapshot_id(&self) -> Option<i64> {
135 self.parent_snapshot_id
136 }
137
138 #[inline]
140 pub fn sequence_number(&self) -> i64 {
141 self.sequence_number
142 }
143 #[inline]
145 pub fn manifest_list(&self) -> &str {
146 &self.manifest_list
147 }
148
149 #[inline]
151 pub fn summary(&self) -> &Summary {
152 &self.summary
153 }
154 #[inline]
156 pub fn timestamp(&self) -> Result<DateTime<Utc>> {
157 timestamp_ms_to_utc(self.timestamp_ms)
158 }
159
160 #[inline]
162 pub fn timestamp_ms(&self) -> i64 {
163 self.timestamp_ms
164 }
165
166 #[inline]
168 pub fn schema_id(&self) -> Option<SchemaId> {
169 self.schema_id
170 }
171
172 pub fn schema(&self, table_metadata: &TableMetadata) -> Result<SchemaRef> {
174 Ok(match self.schema_id() {
175 Some(schema_id) => table_metadata
176 .schema_by_id(schema_id)
177 .ok_or_else(|| {
178 Error::new(
179 ErrorKind::DataInvalid,
180 format!("Schema with id {schema_id} not found"),
181 )
182 })?
183 .clone(),
184 None => table_metadata.current_schema().clone(),
185 })
186 }
187
188 #[cfg(test)]
190 pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option<SnapshotRef> {
191 match self.parent_snapshot_id {
192 Some(id) => table_metadata.snapshot_by_id(id).cloned(),
193 None => None,
194 }
195 }
196
197 pub async fn load_manifest_list(
199 &self,
200 file_io: &FileIO,
201 table_metadata: &TableMetadata,
202 ) -> Result<ManifestList> {
203 let manifest_list_content = file_io.new_input(&self.manifest_list)?.read().await?;
204 ManifestList::parse_with_version(
205 &manifest_list_content,
206 table_metadata.format_version(),
209 )
210 }
211
212 #[allow(dead_code)]
213 pub(crate) fn log(&self) -> SnapshotLog {
214 SnapshotLog {
215 timestamp_ms: self.timestamp_ms,
216 snapshot_id: self.snapshot_id,
217 }
218 }
219
220 pub fn first_row_id(&self) -> Option<u64> {
227 self.row_range.as_ref().map(|r| r.first_row_id)
228 }
229
230 pub fn added_rows_count(&self) -> Option<u64> {
235 self.row_range.as_ref().map(|r| r.added_rows)
236 }
237
238 pub fn row_range(&self) -> Option<(u64, u64)> {
241 self.row_range
242 .as_ref()
243 .map(|r| (r.first_row_id, r.added_rows))
244 }
245
246 pub fn encryption_key_id(&self) -> Option<&str> {
248 self.encryption_key_id.as_deref()
249 }
250}
251
252pub(super) mod _serde {
253 use std::collections::HashMap;
258
259 use serde::{Deserialize, Serialize};
260
261 use super::{Operation, Snapshot, Summary};
262 use crate::spec::SchemaId;
263 use crate::spec::snapshot::SnapshotRowRange;
264 use crate::{Error, ErrorKind};
265
266 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
267 #[serde(rename_all = "kebab-case")]
268 pub(crate) struct SnapshotV3 {
270 pub snapshot_id: i64,
271 #[serde(skip_serializing_if = "Option::is_none")]
272 pub parent_snapshot_id: Option<i64>,
273 pub sequence_number: i64,
274 pub timestamp_ms: i64,
275 pub manifest_list: String,
276 pub summary: Summary,
277 #[serde(skip_serializing_if = "Option::is_none")]
278 pub schema_id: Option<SchemaId>,
279 #[serde(skip_serializing_if = "Option::is_none")]
280 pub first_row_id: Option<u64>,
281 #[serde(skip_serializing_if = "Option::is_none")]
282 pub added_rows: Option<u64>,
283 #[serde(skip_serializing_if = "Option::is_none")]
284 pub key_id: Option<String>,
285 }
286
287 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
288 #[serde(rename_all = "kebab-case")]
289 pub(crate) struct SnapshotV2 {
291 pub snapshot_id: i64,
292 #[serde(skip_serializing_if = "Option::is_none")]
293 pub parent_snapshot_id: Option<i64>,
294 #[serde(default)]
295 pub sequence_number: i64,
296 pub timestamp_ms: i64,
297 pub manifest_list: String,
298 pub summary: Summary,
299 #[serde(skip_serializing_if = "Option::is_none")]
300 pub schema_id: Option<SchemaId>,
301 }
302
303 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
304 #[serde(rename_all = "kebab-case")]
305 pub(crate) struct SnapshotV1 {
307 pub snapshot_id: i64,
308 #[serde(skip_serializing_if = "Option::is_none")]
309 pub parent_snapshot_id: Option<i64>,
310 pub timestamp_ms: i64,
311 #[serde(skip_serializing_if = "Option::is_none")]
312 pub manifest_list: Option<String>,
313 #[serde(skip_serializing_if = "Option::is_none")]
314 pub manifests: Option<Vec<String>>,
315 #[serde(skip_serializing_if = "Option::is_none")]
316 pub summary: Option<Summary>,
317 #[serde(skip_serializing_if = "Option::is_none")]
318 pub schema_id: Option<SchemaId>,
319 }
320
321 impl From<SnapshotV3> for Snapshot {
322 fn from(s: SnapshotV3) -> Self {
323 Snapshot {
324 snapshot_id: s.snapshot_id,
325 parent_snapshot_id: s.parent_snapshot_id,
326 sequence_number: s.sequence_number,
327 timestamp_ms: s.timestamp_ms,
328 manifest_list: s.manifest_list,
329 summary: s.summary,
330 schema_id: s.schema_id,
331 encryption_key_id: s.key_id,
332 row_range: match (s.first_row_id, s.added_rows) {
333 (Some(first_row_id), Some(added_rows)) => Some(SnapshotRowRange {
334 first_row_id,
335 added_rows,
336 }),
337 _ => None,
338 },
339 }
340 }
341 }
342
343 impl TryFrom<Snapshot> for SnapshotV3 {
344 type Error = Error;
345
346 fn try_from(s: Snapshot) -> Result<Self, Self::Error> {
347 let (first_row_id, added_rows) = match s.row_range {
348 Some(row_range) => (Some(row_range.first_row_id), Some(row_range.added_rows)),
349 None => (None, None),
350 };
351
352 Ok(SnapshotV3 {
353 snapshot_id: s.snapshot_id,
354 parent_snapshot_id: s.parent_snapshot_id,
355 sequence_number: s.sequence_number,
356 timestamp_ms: s.timestamp_ms,
357 manifest_list: s.manifest_list,
358 summary: s.summary,
359 schema_id: s.schema_id,
360 first_row_id,
361 added_rows,
362 key_id: s.encryption_key_id,
363 })
364 }
365 }
366
367 impl From<SnapshotV2> for Snapshot {
368 fn from(v2: SnapshotV2) -> Self {
369 Snapshot {
370 snapshot_id: v2.snapshot_id,
371 parent_snapshot_id: v2.parent_snapshot_id,
372 sequence_number: v2.sequence_number,
373 timestamp_ms: v2.timestamp_ms,
374 manifest_list: v2.manifest_list,
375 summary: v2.summary,
376 schema_id: v2.schema_id,
377 encryption_key_id: None,
378 row_range: None,
379 }
380 }
381 }
382
383 impl From<Snapshot> for SnapshotV2 {
384 fn from(v2: Snapshot) -> Self {
385 SnapshotV2 {
386 snapshot_id: v2.snapshot_id,
387 parent_snapshot_id: v2.parent_snapshot_id,
388 sequence_number: v2.sequence_number,
389 timestamp_ms: v2.timestamp_ms,
390 manifest_list: v2.manifest_list,
391 summary: v2.summary,
392 schema_id: v2.schema_id,
393 }
394 }
395 }
396
397 impl TryFrom<SnapshotV1> for Snapshot {
398 type Error = Error;
399
400 fn try_from(v1: SnapshotV1) -> Result<Self, Self::Error> {
401 Ok(Snapshot {
402 snapshot_id: v1.snapshot_id,
403 parent_snapshot_id: v1.parent_snapshot_id,
404 sequence_number: 0,
405 timestamp_ms: v1.timestamp_ms,
406 manifest_list: match (v1.manifest_list, v1.manifests) {
407 (Some(file), None) => file,
408 (Some(_), Some(_)) => {
409 return Err(Error::new(
410 ErrorKind::DataInvalid,
411 "Invalid v1 snapshot, when manifest list provided, manifest files should be omitted",
412 ));
413 }
414 (None, _) => {
415 return Err(Error::new(
416 ErrorKind::DataInvalid,
417 "Unsupported v1 snapshot, only manifest list is supported",
418 ));
419 }
420 },
421 summary: v1.summary.unwrap_or(Summary {
422 operation: Operation::default(),
423 additional_properties: HashMap::new(),
424 }),
425 schema_id: v1.schema_id,
426 encryption_key_id: None,
427 row_range: None,
428 })
429 }
430 }
431
432 impl From<Snapshot> for SnapshotV1 {
433 fn from(v2: Snapshot) -> Self {
434 SnapshotV1 {
435 snapshot_id: v2.snapshot_id,
436 parent_snapshot_id: v2.parent_snapshot_id,
437 timestamp_ms: v2.timestamp_ms,
438 manifest_list: Some(v2.manifest_list),
439 summary: Some(v2.summary),
440 schema_id: v2.schema_id,
441 manifests: None,
442 }
443 }
444 }
445}
446
447#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
448#[serde(rename_all = "kebab-case")]
449pub struct SnapshotReference {
451 pub snapshot_id: i64,
453 #[serde(flatten)]
454 pub retention: SnapshotRetention,
456}
457
458impl SnapshotReference {
459 pub fn is_branch(&self) -> bool {
461 matches!(self.retention, SnapshotRetention::Branch { .. })
462 }
463}
464
465impl SnapshotReference {
466 pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self {
468 SnapshotReference {
469 snapshot_id,
470 retention,
471 }
472 }
473}
474
475#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
476#[serde(rename_all = "lowercase", tag = "type")]
477pub enum SnapshotRetention {
479 #[serde(rename_all = "kebab-case")]
480 Branch {
483 #[serde(skip_serializing_if = "Option::is_none")]
486 min_snapshots_to_keep: Option<i32>,
487 #[serde(skip_serializing_if = "Option::is_none")]
490 max_snapshot_age_ms: Option<i64>,
491 #[serde(skip_serializing_if = "Option::is_none")]
494 max_ref_age_ms: Option<i64>,
495 },
496 #[serde(rename_all = "kebab-case")]
497 Tag {
499 #[serde(skip_serializing_if = "Option::is_none")]
502 max_ref_age_ms: Option<i64>,
503 },
504}
505
506impl SnapshotRetention {
507 pub fn branch(
509 min_snapshots_to_keep: Option<i32>,
510 max_snapshot_age_ms: Option<i64>,
511 max_ref_age_ms: Option<i64>,
512 ) -> Self {
513 SnapshotRetention::Branch {
514 min_snapshots_to_keep,
515 max_snapshot_age_ms,
516 max_ref_age_ms,
517 }
518 }
519}
520
521#[cfg(test)]
522mod tests {
523 use std::collections::HashMap;
524
525 use chrono::{TimeZone, Utc};
526
527 use crate::spec::TableMetadata;
528 use crate::spec::snapshot::_serde::SnapshotV1;
529 use crate::spec::snapshot::{Operation, Snapshot, Summary};
530
531 #[test]
532 fn schema() {
533 let record = r#"
534 {
535 "snapshot-id": 3051729675574597004,
536 "timestamp-ms": 1515100955770,
537 "summary": {
538 "operation": "append"
539 },
540 "manifest-list": "s3://b/wh/.../s1.avro",
541 "schema-id": 0
542 }
543 "#;
544
545 let result: Snapshot = serde_json::from_str::<SnapshotV1>(record)
546 .unwrap()
547 .try_into()
548 .unwrap();
549 assert_eq!(3051729675574597004, result.snapshot_id());
550 assert_eq!(
551 Utc.timestamp_millis_opt(1515100955770).unwrap(),
552 result.timestamp().unwrap()
553 );
554 assert_eq!(1515100955770, result.timestamp_ms());
555 assert_eq!(
556 Summary {
557 operation: Operation::Append,
558 additional_properties: HashMap::new()
559 },
560 *result.summary()
561 );
562 assert_eq!("s3://b/wh/.../s1.avro".to_string(), *result.manifest_list());
563 }
564
565 #[test]
566 fn test_snapshot_v1_to_v2_projection() {
567 use crate::spec::snapshot::_serde::SnapshotV1;
568
569 let v1_snapshot = SnapshotV1 {
571 snapshot_id: 1234567890,
572 parent_snapshot_id: Some(987654321),
573 timestamp_ms: 1515100955770,
574 manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
575 manifests: None, summary: Some(Summary {
577 operation: Operation::Append,
578 additional_properties: HashMap::from([
579 ("added-files".to_string(), "5".to_string()),
580 ("added-records".to_string(), "100".to_string()),
581 ]),
582 }),
583 schema_id: Some(1),
584 };
585
586 let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
588
589 assert_eq!(
591 v2_snapshot.sequence_number(),
592 0,
593 "V1 snapshot sequence_number should default to 0"
594 );
595
596 assert_eq!(v2_snapshot.snapshot_id(), 1234567890);
598 assert_eq!(v2_snapshot.parent_snapshot_id(), Some(987654321));
599 assert_eq!(v2_snapshot.timestamp_ms(), 1515100955770);
600 assert_eq!(
601 v2_snapshot.manifest_list(),
602 "s3://bucket/manifest-list.avro"
603 );
604 assert_eq!(v2_snapshot.schema_id(), Some(1));
605 assert_eq!(v2_snapshot.summary().operation, Operation::Append);
606 assert_eq!(
607 v2_snapshot
608 .summary()
609 .additional_properties
610 .get("added-files"),
611 Some(&"5".to_string())
612 );
613 }
614
615 #[test]
616 fn test_v1_snapshot_with_manifest_list_and_manifests() {
617 {
618 let metadata = r#"
619 {
620 "format-version": 1,
621 "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
622 "location": "s3://bucket/test/location",
623 "last-updated-ms": 1700000000000,
624 "last-column-id": 1,
625 "schema": {
626 "type": "struct",
627 "fields": [
628 {"id": 1, "name": "x", "required": true, "type": "long"}
629 ]
630 },
631 "partition-spec": [],
632 "properties": {},
633 "current-snapshot-id": 111111111,
634 "snapshots": [
635 {
636 "snapshot-id": 111111111,
637 "timestamp-ms": 1600000000000,
638 "summary": {"operation": "append"},
639 "manifest-list": "s3://bucket/metadata/snap-123.avro",
640 "manifests": ["s3://bucket/metadata/manifest-1.avro"]
641 }
642 ]
643 }
644 "#;
645
646 let result_both_manifest_list_and_manifest_set =
647 serde_json::from_str::<TableMetadata>(metadata);
648 assert!(result_both_manifest_list_and_manifest_set.is_err());
649 assert_eq!(
650 result_both_manifest_list_and_manifest_set
651 .unwrap_err()
652 .to_string(),
653 "DataInvalid => Invalid v1 snapshot, when manifest list provided, manifest files should be omitted"
654 )
655 }
656
657 {
658 let metadata = r#"
659 {
660 "format-version": 1,
661 "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
662 "location": "s3://bucket/test/location",
663 "last-updated-ms": 1700000000000,
664 "last-column-id": 1,
665 "schema": {
666 "type": "struct",
667 "fields": [
668 {"id": 1, "name": "x", "required": true, "type": "long"}
669 ]
670 },
671 "partition-spec": [],
672 "properties": {},
673 "current-snapshot-id": 111111111,
674 "snapshots": [
675 {
676 "snapshot-id": 111111111,
677 "timestamp-ms": 1600000000000,
678 "summary": {"operation": "append"},
679 "manifests": ["s3://bucket/metadata/manifest-1.avro"]
680 }
681 ]
682 }
683 "#;
684 let result_missing_manifest_list = serde_json::from_str::<TableMetadata>(metadata);
685 assert!(result_missing_manifest_list.is_err());
686 assert_eq!(
687 result_missing_manifest_list.unwrap_err().to_string(),
688 "DataInvalid => Unsupported v1 snapshot, only manifest list is supported"
689 )
690 }
691 }
692
693 #[test]
694 fn test_snapshot_v1_to_v2_with_missing_summary() {
695 use crate::spec::snapshot::_serde::SnapshotV1;
696
697 let v1_snapshot = SnapshotV1 {
699 snapshot_id: 1111111111,
700 parent_snapshot_id: None,
701 timestamp_ms: 1515100955770,
702 manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
703 manifests: None,
704 summary: None, schema_id: None,
706 };
707
708 let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
710
711 assert_eq!(
713 v2_snapshot.sequence_number(),
714 0,
715 "V1 snapshot sequence_number should default to 0"
716 );
717 assert_eq!(
718 v2_snapshot.summary().operation,
719 Operation::Append,
720 "Missing V1 summary should default to Append operation"
721 );
722 assert!(
723 v2_snapshot.summary().additional_properties.is_empty(),
724 "Default summary should have empty additional_properties"
725 );
726
727 assert_eq!(v2_snapshot.snapshot_id(), 1111111111);
729 assert_eq!(v2_snapshot.parent_snapshot_id(), None);
730 assert_eq!(v2_snapshot.schema_id(), None);
731 }
732}