1use std::collections::HashMap;
22use std::sync::Arc;
23
24use chrono::{DateTime, Utc};
25use serde::{Deserialize, Serialize};
26use typed_builder::TypedBuilder;
27
28use super::table_metadata::SnapshotLog;
29use crate::error::{Result, timestamp_ms_to_utc};
30use crate::io::FileIO;
31use crate::spec::{ManifestList, SchemaId, SchemaRef, TableMetadata};
32use crate::{Error, ErrorKind};
33
34pub const MAIN_BRANCH: &str = "main";
36pub const UNASSIGNED_SNAPSHOT_ID: i64 = -1;
38
39pub type SnapshotRef = Arc<Snapshot>;
41#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone)]
42#[serde(rename_all = "lowercase")]
43pub enum Operation {
45 #[default]
47 Append,
48 Replace,
51 Overwrite,
53 Delete,
55}
56
57impl Operation {
58 pub fn as_str(&self) -> &str {
60 match self {
61 Operation::Append => "append",
62 Operation::Replace => "replace",
63 Operation::Overwrite => "overwrite",
64 Operation::Delete => "delete",
65 }
66 }
67}
68
69#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
70pub struct Summary {
72 pub operation: Operation,
74 #[serde(flatten)]
76 pub additional_properties: HashMap<String, String>,
77}
78
79#[derive(Debug, PartialEq, Eq, Clone)]
80pub struct SnapshotRowRange {
82 pub first_row_id: u64,
84 pub added_rows: u64,
86}
87
88#[derive(Debug, PartialEq, Eq, Clone, TypedBuilder)]
89#[builder(field_defaults(setter(prefix = "with_")))]
90pub struct Snapshot {
92 pub(crate) snapshot_id: i64,
94 #[builder(default = None)]
97 pub(crate) parent_snapshot_id: Option<i64>,
98 pub(crate) sequence_number: i64,
101 pub(crate) timestamp_ms: i64,
104 #[builder(setter(into))]
108 pub(crate) manifest_list: String,
109 pub(crate) summary: Summary,
111 #[builder(setter(strip_option(fallback = schema_id_opt)), default = None)]
113 pub(crate) schema_id: Option<SchemaId>,
114 #[builder(default)]
116 pub(crate) encryption_key_id: Option<String>,
117 #[builder(default, setter(!strip_option, transform = |first_row_id: u64, added_rows: u64| Some(SnapshotRowRange { first_row_id, added_rows })))]
120 pub(crate) row_range: Option<SnapshotRowRange>,
125}
126
127impl Snapshot {
128 #[inline]
130 pub fn snapshot_id(&self) -> i64 {
131 self.snapshot_id
132 }
133
134 #[inline]
136 pub fn parent_snapshot_id(&self) -> Option<i64> {
137 self.parent_snapshot_id
138 }
139
140 #[inline]
142 pub fn sequence_number(&self) -> i64 {
143 self.sequence_number
144 }
145 #[inline]
147 pub fn manifest_list(&self) -> &str {
148 &self.manifest_list
149 }
150
151 #[inline]
153 pub fn summary(&self) -> &Summary {
154 &self.summary
155 }
156 #[inline]
158 pub fn timestamp(&self) -> Result<DateTime<Utc>> {
159 timestamp_ms_to_utc(self.timestamp_ms)
160 }
161
162 #[inline]
164 pub fn timestamp_ms(&self) -> i64 {
165 self.timestamp_ms
166 }
167
168 #[inline]
170 pub fn schema_id(&self) -> Option<SchemaId> {
171 self.schema_id
172 }
173
174 pub fn schema(&self, table_metadata: &TableMetadata) -> Result<SchemaRef> {
176 Ok(match self.schema_id() {
177 Some(schema_id) => table_metadata
178 .schema_by_id(schema_id)
179 .ok_or_else(|| {
180 Error::new(
181 ErrorKind::DataInvalid,
182 format!("Schema with id {schema_id} not found"),
183 )
184 })?
185 .clone(),
186 None => table_metadata.current_schema().clone(),
187 })
188 }
189
190 #[cfg(test)]
192 pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option<SnapshotRef> {
193 match self.parent_snapshot_id {
194 Some(id) => table_metadata.snapshot_by_id(id).cloned(),
195 None => None,
196 }
197 }
198
199 pub async fn load_manifest_list(
201 &self,
202 file_io: &FileIO,
203 table_metadata: &TableMetadata,
204 ) -> Result<ManifestList> {
205 let manifest_list_content = file_io.new_input(&self.manifest_list)?.read().await?;
206 ManifestList::parse_with_version(
207 &manifest_list_content,
208 table_metadata.format_version(),
211 )
212 }
213
214 #[allow(dead_code)]
215 pub(crate) fn log(&self) -> SnapshotLog {
216 SnapshotLog {
217 timestamp_ms: self.timestamp_ms,
218 snapshot_id: self.snapshot_id,
219 }
220 }
221
222 pub fn first_row_id(&self) -> Option<u64> {
229 self.row_range.as_ref().map(|r| r.first_row_id)
230 }
231
232 pub fn added_rows_count(&self) -> Option<u64> {
237 self.row_range.as_ref().map(|r| r.added_rows)
238 }
239
240 pub fn row_range(&self) -> Option<(u64, u64)> {
243 self.row_range
244 .as_ref()
245 .map(|r| (r.first_row_id, r.added_rows))
246 }
247
248 pub fn encryption_key_id(&self) -> Option<&str> {
250 self.encryption_key_id.as_deref()
251 }
252}
253
254pub(super) mod _serde {
255 use std::collections::HashMap;
260
261 use serde::{Deserialize, Serialize};
262
263 use super::{Operation, Snapshot, Summary};
264 use crate::spec::SchemaId;
265 use crate::spec::snapshot::SnapshotRowRange;
266 use crate::{Error, ErrorKind};
267
268 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
269 #[serde(rename_all = "kebab-case")]
270 pub(crate) struct SnapshotV3 {
272 pub snapshot_id: i64,
273 #[serde(skip_serializing_if = "Option::is_none")]
274 pub parent_snapshot_id: Option<i64>,
275 pub sequence_number: i64,
276 pub timestamp_ms: i64,
277 pub manifest_list: String,
278 pub summary: Summary,
279 #[serde(skip_serializing_if = "Option::is_none")]
280 pub schema_id: Option<SchemaId>,
281 #[serde(skip_serializing_if = "Option::is_none")]
282 pub first_row_id: Option<u64>,
283 #[serde(skip_serializing_if = "Option::is_none")]
284 pub added_rows: Option<u64>,
285 #[serde(skip_serializing_if = "Option::is_none")]
286 pub key_id: Option<String>,
287 }
288
289 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
290 #[serde(rename_all = "kebab-case")]
291 pub(crate) struct SnapshotV2 {
293 pub snapshot_id: i64,
294 #[serde(skip_serializing_if = "Option::is_none")]
295 pub parent_snapshot_id: Option<i64>,
296 pub sequence_number: i64,
297 pub timestamp_ms: i64,
298 pub manifest_list: String,
299 pub summary: Summary,
300 #[serde(skip_serializing_if = "Option::is_none")]
301 pub schema_id: Option<SchemaId>,
302 }
303
304 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
305 #[serde(rename_all = "kebab-case")]
306 pub(crate) struct SnapshotV1 {
308 pub snapshot_id: i64,
309 #[serde(skip_serializing_if = "Option::is_none")]
310 pub parent_snapshot_id: Option<i64>,
311 pub timestamp_ms: i64,
312 #[serde(skip_serializing_if = "Option::is_none")]
313 pub manifest_list: Option<String>,
314 #[serde(skip_serializing_if = "Option::is_none")]
315 pub manifests: Option<Vec<String>>,
316 #[serde(skip_serializing_if = "Option::is_none")]
317 pub summary: Option<Summary>,
318 #[serde(skip_serializing_if = "Option::is_none")]
319 pub schema_id: Option<SchemaId>,
320 }
321
322 impl From<SnapshotV3> for Snapshot {
323 fn from(s: SnapshotV3) -> Self {
324 Snapshot {
325 snapshot_id: s.snapshot_id,
326 parent_snapshot_id: s.parent_snapshot_id,
327 sequence_number: s.sequence_number,
328 timestamp_ms: s.timestamp_ms,
329 manifest_list: s.manifest_list,
330 summary: s.summary,
331 schema_id: s.schema_id,
332 encryption_key_id: s.key_id,
333 row_range: match (s.first_row_id, s.added_rows) {
334 (Some(first_row_id), Some(added_rows)) => Some(SnapshotRowRange {
335 first_row_id,
336 added_rows,
337 }),
338 _ => None,
339 },
340 }
341 }
342 }
343
344 impl TryFrom<Snapshot> for SnapshotV3 {
345 type Error = Error;
346
347 fn try_from(s: Snapshot) -> Result<Self, Self::Error> {
348 let (first_row_id, added_rows) = match s.row_range {
349 Some(row_range) => (Some(row_range.first_row_id), Some(row_range.added_rows)),
350 None => (None, None),
351 };
352
353 Ok(SnapshotV3 {
354 snapshot_id: s.snapshot_id,
355 parent_snapshot_id: s.parent_snapshot_id,
356 sequence_number: s.sequence_number,
357 timestamp_ms: s.timestamp_ms,
358 manifest_list: s.manifest_list,
359 summary: s.summary,
360 schema_id: s.schema_id,
361 first_row_id,
362 added_rows,
363 key_id: s.encryption_key_id,
364 })
365 }
366 }
367
368 impl From<SnapshotV2> for Snapshot {
369 fn from(v2: SnapshotV2) -> Self {
370 Snapshot {
371 snapshot_id: v2.snapshot_id,
372 parent_snapshot_id: v2.parent_snapshot_id,
373 sequence_number: v2.sequence_number,
374 timestamp_ms: v2.timestamp_ms,
375 manifest_list: v2.manifest_list,
376 summary: v2.summary,
377 schema_id: v2.schema_id,
378 encryption_key_id: None,
379 row_range: None,
380 }
381 }
382 }
383
384 impl From<Snapshot> for SnapshotV2 {
385 fn from(v2: Snapshot) -> Self {
386 SnapshotV2 {
387 snapshot_id: v2.snapshot_id,
388 parent_snapshot_id: v2.parent_snapshot_id,
389 sequence_number: v2.sequence_number,
390 timestamp_ms: v2.timestamp_ms,
391 manifest_list: v2.manifest_list,
392 summary: v2.summary,
393 schema_id: v2.schema_id,
394 }
395 }
396 }
397
398 impl TryFrom<SnapshotV1> for Snapshot {
399 type Error = Error;
400
401 fn try_from(v1: SnapshotV1) -> Result<Self, Self::Error> {
402 Ok(Snapshot {
403 snapshot_id: v1.snapshot_id,
404 parent_snapshot_id: v1.parent_snapshot_id,
405 sequence_number: 0,
406 timestamp_ms: v1.timestamp_ms,
407 manifest_list: match (v1.manifest_list, v1.manifests) {
408 (Some(file), None) => file,
409 (Some(_), Some(_)) => {
410 return Err(Error::new(
411 ErrorKind::DataInvalid,
412 "Invalid v1 snapshot, when manifest list provided, manifest files should be omitted",
413 ));
414 }
415 (None, _) => {
416 return Err(Error::new(
417 ErrorKind::DataInvalid,
418 "Unsupported v1 snapshot, only manifest list is supported",
419 ));
420 }
421 },
422 summary: v1.summary.unwrap_or(Summary {
423 operation: Operation::default(),
424 additional_properties: HashMap::new(),
425 }),
426 schema_id: v1.schema_id,
427 encryption_key_id: None,
428 row_range: None,
429 })
430 }
431 }
432
433 impl From<Snapshot> for SnapshotV1 {
434 fn from(v2: Snapshot) -> Self {
435 SnapshotV1 {
436 snapshot_id: v2.snapshot_id,
437 parent_snapshot_id: v2.parent_snapshot_id,
438 timestamp_ms: v2.timestamp_ms,
439 manifest_list: Some(v2.manifest_list),
440 summary: Some(v2.summary),
441 schema_id: v2.schema_id,
442 manifests: None,
443 }
444 }
445 }
446}
447
448#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
449#[serde(rename_all = "kebab-case")]
450pub struct SnapshotReference {
452 pub snapshot_id: i64,
454 #[serde(flatten)]
455 pub retention: SnapshotRetention,
457}
458
459impl SnapshotReference {
460 pub fn is_branch(&self) -> bool {
462 matches!(self.retention, SnapshotRetention::Branch { .. })
463 }
464}
465
466impl SnapshotReference {
467 pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self {
469 SnapshotReference {
470 snapshot_id,
471 retention,
472 }
473 }
474}
475
476#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
477#[serde(rename_all = "lowercase", tag = "type")]
478pub enum SnapshotRetention {
480 #[serde(rename_all = "kebab-case")]
481 Branch {
484 #[serde(skip_serializing_if = "Option::is_none")]
487 min_snapshots_to_keep: Option<i32>,
488 #[serde(skip_serializing_if = "Option::is_none")]
491 max_snapshot_age_ms: Option<i64>,
492 #[serde(skip_serializing_if = "Option::is_none")]
495 max_ref_age_ms: Option<i64>,
496 },
497 #[serde(rename_all = "kebab-case")]
498 Tag {
500 #[serde(skip_serializing_if = "Option::is_none")]
503 max_ref_age_ms: Option<i64>,
504 },
505}
506
507impl SnapshotRetention {
508 pub fn branch(
510 min_snapshots_to_keep: Option<i32>,
511 max_snapshot_age_ms: Option<i64>,
512 max_ref_age_ms: Option<i64>,
513 ) -> Self {
514 SnapshotRetention::Branch {
515 min_snapshots_to_keep,
516 max_snapshot_age_ms,
517 max_ref_age_ms,
518 }
519 }
520}
521
522#[cfg(test)]
523mod tests {
524 use std::collections::HashMap;
525
526 use chrono::{TimeZone, Utc};
527
528 use crate::spec::TableMetadata;
529 use crate::spec::snapshot::_serde::SnapshotV1;
530 use crate::spec::snapshot::{Operation, Snapshot, Summary};
531
532 #[test]
533 fn schema() {
534 let record = r#"
535 {
536 "snapshot-id": 3051729675574597004,
537 "timestamp-ms": 1515100955770,
538 "summary": {
539 "operation": "append"
540 },
541 "manifest-list": "s3://b/wh/.../s1.avro",
542 "schema-id": 0
543 }
544 "#;
545
546 let result: Snapshot = serde_json::from_str::<SnapshotV1>(record)
547 .unwrap()
548 .try_into()
549 .unwrap();
550 assert_eq!(3051729675574597004, result.snapshot_id());
551 assert_eq!(
552 Utc.timestamp_millis_opt(1515100955770).unwrap(),
553 result.timestamp().unwrap()
554 );
555 assert_eq!(1515100955770, result.timestamp_ms());
556 assert_eq!(
557 Summary {
558 operation: Operation::Append,
559 additional_properties: HashMap::new()
560 },
561 *result.summary()
562 );
563 assert_eq!("s3://b/wh/.../s1.avro".to_string(), *result.manifest_list());
564 }
565
566 #[test]
567 fn test_snapshot_v1_to_v2_projection() {
568 use crate::spec::snapshot::_serde::SnapshotV1;
569
570 let v1_snapshot = SnapshotV1 {
572 snapshot_id: 1234567890,
573 parent_snapshot_id: Some(987654321),
574 timestamp_ms: 1515100955770,
575 manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
576 manifests: None, summary: Some(Summary {
578 operation: Operation::Append,
579 additional_properties: HashMap::from([
580 ("added-files".to_string(), "5".to_string()),
581 ("added-records".to_string(), "100".to_string()),
582 ]),
583 }),
584 schema_id: Some(1),
585 };
586
587 let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
589
590 assert_eq!(
592 v2_snapshot.sequence_number(),
593 0,
594 "V1 snapshot sequence_number should default to 0"
595 );
596
597 assert_eq!(v2_snapshot.snapshot_id(), 1234567890);
599 assert_eq!(v2_snapshot.parent_snapshot_id(), Some(987654321));
600 assert_eq!(v2_snapshot.timestamp_ms(), 1515100955770);
601 assert_eq!(
602 v2_snapshot.manifest_list(),
603 "s3://bucket/manifest-list.avro"
604 );
605 assert_eq!(v2_snapshot.schema_id(), Some(1));
606 assert_eq!(v2_snapshot.summary().operation, Operation::Append);
607 assert_eq!(
608 v2_snapshot
609 .summary()
610 .additional_properties
611 .get("added-files"),
612 Some(&"5".to_string())
613 );
614 }
615
616 #[test]
617 fn test_v1_snapshot_with_manifest_list_and_manifests() {
618 {
619 let metadata = r#"
620 {
621 "format-version": 1,
622 "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
623 "location": "s3://bucket/test/location",
624 "last-updated-ms": 1700000000000,
625 "last-column-id": 1,
626 "schema": {
627 "type": "struct",
628 "fields": [
629 {"id": 1, "name": "x", "required": true, "type": "long"}
630 ]
631 },
632 "partition-spec": [],
633 "properties": {},
634 "current-snapshot-id": 111111111,
635 "snapshots": [
636 {
637 "snapshot-id": 111111111,
638 "timestamp-ms": 1600000000000,
639 "summary": {"operation": "append"},
640 "manifest-list": "s3://bucket/metadata/snap-123.avro",
641 "manifests": ["s3://bucket/metadata/manifest-1.avro"]
642 }
643 ]
644 }
645 "#;
646
647 let result_both_manifest_list_and_manifest_set =
648 serde_json::from_str::<TableMetadata>(metadata);
649 assert!(result_both_manifest_list_and_manifest_set.is_err());
650 assert_eq!(
651 result_both_manifest_list_and_manifest_set
652 .unwrap_err()
653 .to_string(),
654 "DataInvalid => Invalid v1 snapshot, when manifest list provided, manifest files should be omitted"
655 )
656 }
657
658 {
659 let metadata = r#"
660 {
661 "format-version": 1,
662 "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
663 "location": "s3://bucket/test/location",
664 "last-updated-ms": 1700000000000,
665 "last-column-id": 1,
666 "schema": {
667 "type": "struct",
668 "fields": [
669 {"id": 1, "name": "x", "required": true, "type": "long"}
670 ]
671 },
672 "partition-spec": [],
673 "properties": {},
674 "current-snapshot-id": 111111111,
675 "snapshots": [
676 {
677 "snapshot-id": 111111111,
678 "timestamp-ms": 1600000000000,
679 "summary": {"operation": "append"},
680 "manifests": ["s3://bucket/metadata/manifest-1.avro"]
681 }
682 ]
683 }
684 "#;
685 let result_missing_manifest_list = serde_json::from_str::<TableMetadata>(metadata);
686 assert!(result_missing_manifest_list.is_err());
687 assert_eq!(
688 result_missing_manifest_list.unwrap_err().to_string(),
689 "DataInvalid => Unsupported v1 snapshot, only manifest list is supported"
690 )
691 }
692 }
693
694 #[test]
695 fn test_snapshot_v1_to_v2_with_missing_summary() {
696 use crate::spec::snapshot::_serde::SnapshotV1;
697
698 let v1_snapshot = SnapshotV1 {
700 snapshot_id: 1111111111,
701 parent_snapshot_id: None,
702 timestamp_ms: 1515100955770,
703 manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
704 manifests: None,
705 summary: None, schema_id: None,
707 };
708
709 let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
711
712 assert_eq!(
714 v2_snapshot.sequence_number(),
715 0,
716 "V1 snapshot sequence_number should default to 0"
717 );
718 assert_eq!(
719 v2_snapshot.summary().operation,
720 Operation::Append,
721 "Missing V1 summary should default to Append operation"
722 );
723 assert!(
724 v2_snapshot.summary().additional_properties.is_empty(),
725 "Default summary should have empty additional_properties"
726 );
727
728 assert_eq!(v2_snapshot.snapshot_id(), 1111111111);
730 assert_eq!(v2_snapshot.parent_snapshot_id(), None);
731 assert_eq!(v2_snapshot.schema_id(), None);
732 }
733}