1use std::collections::HashMap;
22use std::sync::Arc;
23
24use chrono::{DateTime, Utc};
25use serde::{Deserialize, Serialize};
26use typed_builder::TypedBuilder;
27
28use super::table_metadata::SnapshotLog;
29use crate::error::{Result, timestamp_ms_to_utc};
30use crate::spec::{SchemaId, SchemaRef, TableMetadata};
31use crate::{Error, ErrorKind};
32
33pub const MAIN_BRANCH: &str = "main";
35
36pub type SnapshotRef = Arc<Snapshot>;
38#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone)]
39#[serde(rename_all = "lowercase")]
40pub enum Operation {
42 #[default]
44 Append,
45 Replace,
48 Overwrite,
50 Delete,
52}
53
54impl Operation {
55 pub fn as_str(&self) -> &str {
57 match self {
58 Operation::Append => "append",
59 Operation::Replace => "replace",
60 Operation::Overwrite => "overwrite",
61 Operation::Delete => "delete",
62 }
63 }
64}
65
66#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
67pub struct Summary {
69 pub operation: Operation,
71 #[serde(flatten)]
73 pub additional_properties: HashMap<String, String>,
74}
75
76#[derive(Debug, PartialEq, Eq, Clone)]
77pub struct SnapshotRowRange {
79 pub first_row_id: u64,
81 pub added_rows: u64,
83}
84
85#[derive(Debug, PartialEq, Eq, Clone, TypedBuilder)]
86#[builder(field_defaults(setter(prefix = "with_")))]
87pub struct Snapshot {
89 pub(crate) snapshot_id: i64,
91 #[builder(default = None)]
94 pub(crate) parent_snapshot_id: Option<i64>,
95 pub(crate) sequence_number: i64,
98 pub(crate) timestamp_ms: i64,
101 #[builder(setter(into))]
105 pub(crate) manifest_list: String,
106 pub(crate) summary: Summary,
108 #[builder(setter(strip_option(fallback = schema_id_opt)), default = None)]
110 pub(crate) schema_id: Option<SchemaId>,
111 #[builder(default)]
113 pub(crate) encryption_key_id: Option<String>,
114 #[builder(default, setter(!strip_option, transform = |first_row_id: u64, added_rows: u64| Some(SnapshotRowRange { first_row_id, added_rows })))]
117 pub(crate) row_range: Option<SnapshotRowRange>,
122}
123
124impl Snapshot {
125 #[inline]
127 pub fn snapshot_id(&self) -> i64 {
128 self.snapshot_id
129 }
130
131 #[inline]
133 pub fn parent_snapshot_id(&self) -> Option<i64> {
134 self.parent_snapshot_id
135 }
136
137 #[inline]
139 pub fn sequence_number(&self) -> i64 {
140 self.sequence_number
141 }
142 #[inline]
144 pub fn manifest_list(&self) -> &str {
145 &self.manifest_list
146 }
147
148 #[inline]
150 pub fn summary(&self) -> &Summary {
151 &self.summary
152 }
153 #[inline]
155 pub fn timestamp(&self) -> Result<DateTime<Utc>> {
156 timestamp_ms_to_utc(self.timestamp_ms)
157 }
158
159 #[inline]
161 pub fn timestamp_ms(&self) -> i64 {
162 self.timestamp_ms
163 }
164
165 #[inline]
167 pub fn schema_id(&self) -> Option<SchemaId> {
168 self.schema_id
169 }
170
171 pub fn schema(&self, table_metadata: &TableMetadata) -> Result<SchemaRef> {
173 Ok(match self.schema_id() {
174 Some(schema_id) => table_metadata
175 .schema_by_id(schema_id)
176 .ok_or_else(|| {
177 Error::new(
178 ErrorKind::DataInvalid,
179 format!("Schema with id {schema_id} not found"),
180 )
181 })?
182 .clone(),
183 None => table_metadata.current_schema().clone(),
184 })
185 }
186
187 #[cfg(test)]
189 pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option<SnapshotRef> {
190 match self.parent_snapshot_id {
191 Some(id) => table_metadata.snapshot_by_id(id).cloned(),
192 None => None,
193 }
194 }
195
196 #[allow(dead_code)]
197 pub(crate) fn log(&self) -> SnapshotLog {
198 SnapshotLog {
199 timestamp_ms: self.timestamp_ms,
200 snapshot_id: self.snapshot_id,
201 }
202 }
203
204 pub fn first_row_id(&self) -> Option<u64> {
211 self.row_range.as_ref().map(|r| r.first_row_id)
212 }
213
214 pub fn added_rows_count(&self) -> Option<u64> {
219 self.row_range.as_ref().map(|r| r.added_rows)
220 }
221
222 pub fn row_range(&self) -> Option<(u64, u64)> {
225 self.row_range
226 .as_ref()
227 .map(|r| (r.first_row_id, r.added_rows))
228 }
229
230 pub fn encryption_key_id(&self) -> Option<&str> {
232 self.encryption_key_id.as_deref()
233 }
234}
235
236pub(super) mod _serde {
237 use std::collections::HashMap;
242
243 use serde::{Deserialize, Serialize};
244
245 use super::{Operation, Snapshot, Summary};
246 use crate::spec::SchemaId;
247 use crate::spec::snapshot::SnapshotRowRange;
248 use crate::{Error, ErrorKind};
249
250 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
251 #[serde(rename_all = "kebab-case")]
252 pub(crate) struct SnapshotV3 {
254 pub snapshot_id: i64,
255 #[serde(skip_serializing_if = "Option::is_none")]
256 pub parent_snapshot_id: Option<i64>,
257 pub sequence_number: i64,
258 pub timestamp_ms: i64,
259 pub manifest_list: String,
260 pub summary: Summary,
261 #[serde(skip_serializing_if = "Option::is_none")]
262 pub schema_id: Option<SchemaId>,
263 #[serde(skip_serializing_if = "Option::is_none")]
264 pub first_row_id: Option<u64>,
265 #[serde(skip_serializing_if = "Option::is_none")]
266 pub added_rows: Option<u64>,
267 #[serde(skip_serializing_if = "Option::is_none")]
268 pub key_id: Option<String>,
269 }
270
271 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
272 #[serde(rename_all = "kebab-case")]
273 pub(crate) struct SnapshotV2 {
275 pub snapshot_id: i64,
276 #[serde(skip_serializing_if = "Option::is_none")]
277 pub parent_snapshot_id: Option<i64>,
278 #[serde(default)]
279 pub sequence_number: i64,
280 pub timestamp_ms: i64,
281 pub manifest_list: String,
282 pub summary: Summary,
283 #[serde(skip_serializing_if = "Option::is_none")]
284 pub schema_id: Option<SchemaId>,
285 }
286
287 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
288 #[serde(rename_all = "kebab-case")]
289 pub(crate) struct SnapshotV1 {
291 pub snapshot_id: i64,
292 #[serde(skip_serializing_if = "Option::is_none")]
293 pub parent_snapshot_id: Option<i64>,
294 pub timestamp_ms: i64,
295 #[serde(skip_serializing_if = "Option::is_none")]
296 pub manifest_list: Option<String>,
297 #[serde(skip_serializing_if = "Option::is_none")]
298 pub manifests: Option<Vec<String>>,
299 #[serde(skip_serializing_if = "Option::is_none")]
300 pub summary: Option<Summary>,
301 #[serde(skip_serializing_if = "Option::is_none")]
302 pub schema_id: Option<SchemaId>,
303 }
304
305 impl From<SnapshotV3> for Snapshot {
306 fn from(s: SnapshotV3) -> Self {
307 Snapshot {
308 snapshot_id: s.snapshot_id,
309 parent_snapshot_id: s.parent_snapshot_id,
310 sequence_number: s.sequence_number,
311 timestamp_ms: s.timestamp_ms,
312 manifest_list: s.manifest_list,
313 summary: s.summary,
314 schema_id: s.schema_id,
315 encryption_key_id: s.key_id,
316 row_range: match (s.first_row_id, s.added_rows) {
317 (Some(first_row_id), Some(added_rows)) => Some(SnapshotRowRange {
318 first_row_id,
319 added_rows,
320 }),
321 _ => None,
322 },
323 }
324 }
325 }
326
327 impl TryFrom<Snapshot> for SnapshotV3 {
328 type Error = Error;
329
330 fn try_from(s: Snapshot) -> Result<Self, Self::Error> {
331 let (first_row_id, added_rows) = match s.row_range {
332 Some(row_range) => (Some(row_range.first_row_id), Some(row_range.added_rows)),
333 None => (None, None),
334 };
335
336 Ok(SnapshotV3 {
337 snapshot_id: s.snapshot_id,
338 parent_snapshot_id: s.parent_snapshot_id,
339 sequence_number: s.sequence_number,
340 timestamp_ms: s.timestamp_ms,
341 manifest_list: s.manifest_list,
342 summary: s.summary,
343 schema_id: s.schema_id,
344 first_row_id,
345 added_rows,
346 key_id: s.encryption_key_id,
347 })
348 }
349 }
350
351 impl From<SnapshotV2> for Snapshot {
352 fn from(v2: SnapshotV2) -> Self {
353 Snapshot {
354 snapshot_id: v2.snapshot_id,
355 parent_snapshot_id: v2.parent_snapshot_id,
356 sequence_number: v2.sequence_number,
357 timestamp_ms: v2.timestamp_ms,
358 manifest_list: v2.manifest_list,
359 summary: v2.summary,
360 schema_id: v2.schema_id,
361 encryption_key_id: None,
362 row_range: None,
363 }
364 }
365 }
366
367 impl From<Snapshot> for SnapshotV2 {
368 fn from(v2: Snapshot) -> Self {
369 SnapshotV2 {
370 snapshot_id: v2.snapshot_id,
371 parent_snapshot_id: v2.parent_snapshot_id,
372 sequence_number: v2.sequence_number,
373 timestamp_ms: v2.timestamp_ms,
374 manifest_list: v2.manifest_list,
375 summary: v2.summary,
376 schema_id: v2.schema_id,
377 }
378 }
379 }
380
381 impl TryFrom<SnapshotV1> for Snapshot {
382 type Error = Error;
383
384 fn try_from(v1: SnapshotV1) -> Result<Self, Self::Error> {
385 Ok(Snapshot {
386 snapshot_id: v1.snapshot_id,
387 parent_snapshot_id: v1.parent_snapshot_id,
388 sequence_number: 0,
389 timestamp_ms: v1.timestamp_ms,
390 manifest_list: match (v1.manifest_list, v1.manifests) {
391 (Some(file), None) => file,
392 (Some(_), Some(_)) => {
393 return Err(Error::new(
394 ErrorKind::DataInvalid,
395 "Invalid v1 snapshot, when manifest list provided, manifest files should be omitted",
396 ));
397 }
398 (None, _) => {
399 return Err(Error::new(
400 ErrorKind::DataInvalid,
401 "Unsupported v1 snapshot, only manifest list is supported",
402 ));
403 }
404 },
405 summary: v1.summary.unwrap_or(Summary {
406 operation: Operation::default(),
407 additional_properties: HashMap::new(),
408 }),
409 schema_id: v1.schema_id,
410 encryption_key_id: None,
411 row_range: None,
412 })
413 }
414 }
415
416 impl From<Snapshot> for SnapshotV1 {
417 fn from(v2: Snapshot) -> Self {
418 SnapshotV1 {
419 snapshot_id: v2.snapshot_id,
420 parent_snapshot_id: v2.parent_snapshot_id,
421 timestamp_ms: v2.timestamp_ms,
422 manifest_list: Some(v2.manifest_list),
423 summary: Some(v2.summary),
424 schema_id: v2.schema_id,
425 manifests: None,
426 }
427 }
428 }
429}
430
431#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
432#[serde(rename_all = "kebab-case")]
433pub struct SnapshotReference {
435 pub snapshot_id: i64,
437 #[serde(flatten)]
438 pub retention: SnapshotRetention,
440}
441
442impl SnapshotReference {
443 pub fn is_branch(&self) -> bool {
445 matches!(self.retention, SnapshotRetention::Branch { .. })
446 }
447}
448
449impl SnapshotReference {
450 pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self {
452 SnapshotReference {
453 snapshot_id,
454 retention,
455 }
456 }
457}
458
459#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
460#[serde(rename_all = "lowercase", tag = "type")]
461pub enum SnapshotRetention {
463 #[serde(rename_all = "kebab-case")]
464 Branch {
467 #[serde(skip_serializing_if = "Option::is_none")]
470 min_snapshots_to_keep: Option<i32>,
471 #[serde(skip_serializing_if = "Option::is_none")]
474 max_snapshot_age_ms: Option<i64>,
475 #[serde(skip_serializing_if = "Option::is_none")]
478 max_ref_age_ms: Option<i64>,
479 },
480 #[serde(rename_all = "kebab-case")]
481 Tag {
483 #[serde(skip_serializing_if = "Option::is_none")]
486 max_ref_age_ms: Option<i64>,
487 },
488}
489
490impl SnapshotRetention {
491 pub fn branch(
493 min_snapshots_to_keep: Option<i32>,
494 max_snapshot_age_ms: Option<i64>,
495 max_ref_age_ms: Option<i64>,
496 ) -> Self {
497 SnapshotRetention::Branch {
498 min_snapshots_to_keep,
499 max_snapshot_age_ms,
500 max_ref_age_ms,
501 }
502 }
503}
504
505#[cfg(test)]
506mod tests {
507 use std::collections::HashMap;
508
509 use chrono::{TimeZone, Utc};
510
511 use crate::spec::TableMetadata;
512 use crate::spec::snapshot::_serde::SnapshotV1;
513 use crate::spec::snapshot::{Operation, Snapshot, Summary};
514
515 #[test]
516 fn schema() {
517 let record = r#"
518 {
519 "snapshot-id": 3051729675574597004,
520 "timestamp-ms": 1515100955770,
521 "summary": {
522 "operation": "append"
523 },
524 "manifest-list": "s3://b/wh/.../s1.avro",
525 "schema-id": 0
526 }
527 "#;
528
529 let result: Snapshot = serde_json::from_str::<SnapshotV1>(record)
530 .unwrap()
531 .try_into()
532 .unwrap();
533 assert_eq!(3051729675574597004, result.snapshot_id());
534 assert_eq!(
535 Utc.timestamp_millis_opt(1515100955770).unwrap(),
536 result.timestamp().unwrap()
537 );
538 assert_eq!(1515100955770, result.timestamp_ms());
539 assert_eq!(
540 Summary {
541 operation: Operation::Append,
542 additional_properties: HashMap::new()
543 },
544 *result.summary()
545 );
546 assert_eq!("s3://b/wh/.../s1.avro".to_string(), *result.manifest_list());
547 }
548
549 #[test]
550 fn test_snapshot_v1_to_v2_projection() {
551 use crate::spec::snapshot::_serde::SnapshotV1;
552
553 let v1_snapshot = SnapshotV1 {
555 snapshot_id: 1234567890,
556 parent_snapshot_id: Some(987654321),
557 timestamp_ms: 1515100955770,
558 manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
559 manifests: None, summary: Some(Summary {
561 operation: Operation::Append,
562 additional_properties: HashMap::from([
563 ("added-files".to_string(), "5".to_string()),
564 ("added-records".to_string(), "100".to_string()),
565 ]),
566 }),
567 schema_id: Some(1),
568 };
569
570 let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
572
573 assert_eq!(
575 v2_snapshot.sequence_number(),
576 0,
577 "V1 snapshot sequence_number should default to 0"
578 );
579
580 assert_eq!(v2_snapshot.snapshot_id(), 1234567890);
582 assert_eq!(v2_snapshot.parent_snapshot_id(), Some(987654321));
583 assert_eq!(v2_snapshot.timestamp_ms(), 1515100955770);
584 assert_eq!(
585 v2_snapshot.manifest_list(),
586 "s3://bucket/manifest-list.avro"
587 );
588 assert_eq!(v2_snapshot.schema_id(), Some(1));
589 assert_eq!(v2_snapshot.summary().operation, Operation::Append);
590 assert_eq!(
591 v2_snapshot
592 .summary()
593 .additional_properties
594 .get("added-files"),
595 Some(&"5".to_string())
596 );
597 }
598
599 #[test]
600 fn test_v1_snapshot_with_manifest_list_and_manifests() {
601 {
602 let metadata = r#"
603 {
604 "format-version": 1,
605 "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
606 "location": "s3://bucket/test/location",
607 "last-updated-ms": 1700000000000,
608 "last-column-id": 1,
609 "schema": {
610 "type": "struct",
611 "fields": [
612 {"id": 1, "name": "x", "required": true, "type": "long"}
613 ]
614 },
615 "partition-spec": [],
616 "properties": {},
617 "current-snapshot-id": 111111111,
618 "snapshots": [
619 {
620 "snapshot-id": 111111111,
621 "timestamp-ms": 1600000000000,
622 "summary": {"operation": "append"},
623 "manifest-list": "s3://bucket/metadata/snap-123.avro",
624 "manifests": ["s3://bucket/metadata/manifest-1.avro"]
625 }
626 ]
627 }
628 "#;
629
630 let result_both_manifest_list_and_manifest_set =
631 serde_json::from_str::<TableMetadata>(metadata);
632 assert!(result_both_manifest_list_and_manifest_set.is_err());
633 assert_eq!(
634 result_both_manifest_list_and_manifest_set
635 .unwrap_err()
636 .to_string(),
637 "DataInvalid => Invalid v1 snapshot, when manifest list provided, manifest files should be omitted"
638 )
639 }
640
641 {
642 let metadata = r#"
643 {
644 "format-version": 1,
645 "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
646 "location": "s3://bucket/test/location",
647 "last-updated-ms": 1700000000000,
648 "last-column-id": 1,
649 "schema": {
650 "type": "struct",
651 "fields": [
652 {"id": 1, "name": "x", "required": true, "type": "long"}
653 ]
654 },
655 "partition-spec": [],
656 "properties": {},
657 "current-snapshot-id": 111111111,
658 "snapshots": [
659 {
660 "snapshot-id": 111111111,
661 "timestamp-ms": 1600000000000,
662 "summary": {"operation": "append"},
663 "manifests": ["s3://bucket/metadata/manifest-1.avro"]
664 }
665 ]
666 }
667 "#;
668 let result_missing_manifest_list = serde_json::from_str::<TableMetadata>(metadata);
669 assert!(result_missing_manifest_list.is_err());
670 assert_eq!(
671 result_missing_manifest_list.unwrap_err().to_string(),
672 "DataInvalid => Unsupported v1 snapshot, only manifest list is supported"
673 )
674 }
675 }
676
677 #[test]
678 fn test_snapshot_v1_to_v2_with_missing_summary() {
679 use crate::spec::snapshot::_serde::SnapshotV1;
680
681 let v1_snapshot = SnapshotV1 {
683 snapshot_id: 1111111111,
684 parent_snapshot_id: None,
685 timestamp_ms: 1515100955770,
686 manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
687 manifests: None,
688 summary: None, schema_id: None,
690 };
691
692 let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
694
695 assert_eq!(
697 v2_snapshot.sequence_number(),
698 0,
699 "V1 snapshot sequence_number should default to 0"
700 );
701 assert_eq!(
702 v2_snapshot.summary().operation,
703 Operation::Append,
704 "Missing V1 summary should default to Append operation"
705 );
706 assert!(
707 v2_snapshot.summary().additional_properties.is_empty(),
708 "Default summary should have empty additional_properties"
709 );
710
711 assert_eq!(v2_snapshot.snapshot_id(), 1111111111);
713 assert_eq!(v2_snapshot.parent_snapshot_id(), None);
714 assert_eq!(v2_snapshot.schema_id(), None);
715 }
716}