1use std::collections::HashMap;
22use std::sync::Arc;
23
24use chrono::{DateTime, Utc};
25use serde::{Deserialize, Serialize};
26use typed_builder::TypedBuilder;
27
28use crate::error::{Result, timestamp_ms_to_utc};
29use crate::spec::{SchemaId, SchemaRef, TableMetadata};
30use crate::{Error, ErrorKind};
31
32pub const MAIN_BRANCH: &str = "main";
34
35pub type SnapshotRef = Arc<Snapshot>;
37#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone)]
38#[serde(rename_all = "lowercase")]
39pub enum Operation {
41 #[default]
43 Append,
44 Replace,
47 Overwrite,
49 Delete,
51}
52
53impl Operation {
54 pub fn as_str(&self) -> &str {
56 match self {
57 Operation::Append => "append",
58 Operation::Replace => "replace",
59 Operation::Overwrite => "overwrite",
60 Operation::Delete => "delete",
61 }
62 }
63}
64
65#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
66pub struct Summary {
68 pub operation: Operation,
70 #[serde(flatten)]
72 pub additional_properties: HashMap<String, String>,
73}
74
75#[derive(Debug, PartialEq, Eq, Clone)]
76pub struct SnapshotRowRange {
78 pub first_row_id: u64,
80 pub added_rows: u64,
82}
83
84#[derive(Debug, PartialEq, Eq, Clone, TypedBuilder)]
85#[builder(field_defaults(setter(prefix = "with_")))]
86pub struct Snapshot {
88 pub(crate) snapshot_id: i64,
90 #[builder(default = None)]
93 pub(crate) parent_snapshot_id: Option<i64>,
94 pub(crate) sequence_number: i64,
97 pub(crate) timestamp_ms: i64,
100 #[builder(setter(into))]
104 pub(crate) manifest_list: String,
105 pub(crate) summary: Summary,
107 #[builder(setter(strip_option(fallback = schema_id_opt)), default = None)]
109 pub(crate) schema_id: Option<SchemaId>,
110 #[builder(default)]
112 pub(crate) encryption_key_id: Option<String>,
113 #[builder(default, setter(!strip_option, transform = |first_row_id: u64, added_rows: u64| Some(SnapshotRowRange { first_row_id, added_rows })))]
116 pub(crate) row_range: Option<SnapshotRowRange>,
121}
122
123impl Snapshot {
124 #[inline]
126 pub fn snapshot_id(&self) -> i64 {
127 self.snapshot_id
128 }
129
130 #[inline]
132 pub fn parent_snapshot_id(&self) -> Option<i64> {
133 self.parent_snapshot_id
134 }
135
136 #[inline]
138 pub fn sequence_number(&self) -> i64 {
139 self.sequence_number
140 }
141 #[inline]
143 pub fn manifest_list(&self) -> &str {
144 &self.manifest_list
145 }
146
147 #[inline]
149 pub fn summary(&self) -> &Summary {
150 &self.summary
151 }
152 #[inline]
154 pub fn timestamp(&self) -> Result<DateTime<Utc>> {
155 timestamp_ms_to_utc(self.timestamp_ms)
156 }
157
158 #[inline]
160 pub fn timestamp_ms(&self) -> i64 {
161 self.timestamp_ms
162 }
163
164 #[inline]
166 pub fn schema_id(&self) -> Option<SchemaId> {
167 self.schema_id
168 }
169
170 pub fn schema(&self, table_metadata: &TableMetadata) -> Result<SchemaRef> {
172 Ok(match self.schema_id() {
173 Some(schema_id) => table_metadata
174 .schema_by_id(schema_id)
175 .ok_or_else(|| {
176 Error::new(
177 ErrorKind::DataInvalid,
178 format!("Schema with id {schema_id} not found"),
179 )
180 })?
181 .clone(),
182 None => table_metadata.current_schema().clone(),
183 })
184 }
185
186 #[cfg(test)]
188 pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option<SnapshotRef> {
189 match self.parent_snapshot_id {
190 Some(id) => table_metadata.snapshot_by_id(id).cloned(),
191 None => None,
192 }
193 }
194
195 pub fn first_row_id(&self) -> Option<u64> {
202 self.row_range.as_ref().map(|r| r.first_row_id)
203 }
204
205 pub fn added_rows_count(&self) -> Option<u64> {
210 self.row_range.as_ref().map(|r| r.added_rows)
211 }
212
213 pub fn row_range(&self) -> Option<(u64, u64)> {
216 self.row_range
217 .as_ref()
218 .map(|r| (r.first_row_id, r.added_rows))
219 }
220
221 pub fn encryption_key_id(&self) -> Option<&str> {
223 self.encryption_key_id.as_deref()
224 }
225}
226
227pub(super) mod _serde {
228 use std::collections::HashMap;
233
234 use serde::{Deserialize, Serialize};
235
236 use super::{Operation, Snapshot, Summary};
237 use crate::spec::SchemaId;
238 use crate::spec::snapshot::SnapshotRowRange;
239 use crate::{Error, ErrorKind};
240
241 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
242 #[serde(rename_all = "kebab-case")]
243 pub(crate) struct SnapshotV3 {
245 pub snapshot_id: i64,
246 #[serde(skip_serializing_if = "Option::is_none")]
247 pub parent_snapshot_id: Option<i64>,
248 pub sequence_number: i64,
249 pub timestamp_ms: i64,
250 pub manifest_list: String,
251 pub summary: Summary,
252 #[serde(skip_serializing_if = "Option::is_none")]
253 pub schema_id: Option<SchemaId>,
254 #[serde(skip_serializing_if = "Option::is_none")]
255 pub first_row_id: Option<u64>,
256 #[serde(skip_serializing_if = "Option::is_none")]
257 pub added_rows: Option<u64>,
258 #[serde(skip_serializing_if = "Option::is_none")]
259 pub key_id: Option<String>,
260 }
261
262 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
263 #[serde(rename_all = "kebab-case")]
264 pub(crate) struct SnapshotV2 {
266 pub snapshot_id: i64,
267 #[serde(skip_serializing_if = "Option::is_none")]
268 pub parent_snapshot_id: Option<i64>,
269 #[serde(default)]
270 pub sequence_number: i64,
271 pub timestamp_ms: i64,
272 pub manifest_list: String,
273 pub summary: Summary,
274 #[serde(skip_serializing_if = "Option::is_none")]
275 pub schema_id: Option<SchemaId>,
276 }
277
278 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
279 #[serde(rename_all = "kebab-case")]
280 pub(crate) struct SnapshotV1 {
282 pub snapshot_id: i64,
283 #[serde(skip_serializing_if = "Option::is_none")]
284 pub parent_snapshot_id: Option<i64>,
285 pub timestamp_ms: i64,
286 #[serde(skip_serializing_if = "Option::is_none")]
287 pub manifest_list: Option<String>,
288 #[serde(skip_serializing_if = "Option::is_none")]
289 pub manifests: Option<Vec<String>>,
290 #[serde(skip_serializing_if = "Option::is_none")]
291 pub summary: Option<Summary>,
292 #[serde(skip_serializing_if = "Option::is_none")]
293 pub schema_id: Option<SchemaId>,
294 }
295
296 impl From<SnapshotV3> for Snapshot {
297 fn from(s: SnapshotV3) -> Self {
298 Snapshot {
299 snapshot_id: s.snapshot_id,
300 parent_snapshot_id: s.parent_snapshot_id,
301 sequence_number: s.sequence_number,
302 timestamp_ms: s.timestamp_ms,
303 manifest_list: s.manifest_list,
304 summary: s.summary,
305 schema_id: s.schema_id,
306 encryption_key_id: s.key_id,
307 row_range: match (s.first_row_id, s.added_rows) {
308 (Some(first_row_id), Some(added_rows)) => Some(SnapshotRowRange {
309 first_row_id,
310 added_rows,
311 }),
312 _ => None,
313 },
314 }
315 }
316 }
317
318 impl TryFrom<Snapshot> for SnapshotV3 {
319 type Error = Error;
320
321 fn try_from(s: Snapshot) -> Result<Self, Self::Error> {
322 let (first_row_id, added_rows) = match s.row_range {
323 Some(row_range) => (Some(row_range.first_row_id), Some(row_range.added_rows)),
324 None => (None, None),
325 };
326
327 Ok(SnapshotV3 {
328 snapshot_id: s.snapshot_id,
329 parent_snapshot_id: s.parent_snapshot_id,
330 sequence_number: s.sequence_number,
331 timestamp_ms: s.timestamp_ms,
332 manifest_list: s.manifest_list,
333 summary: s.summary,
334 schema_id: s.schema_id,
335 first_row_id,
336 added_rows,
337 key_id: s.encryption_key_id,
338 })
339 }
340 }
341
342 impl From<SnapshotV2> for Snapshot {
343 fn from(v2: SnapshotV2) -> Self {
344 Snapshot {
345 snapshot_id: v2.snapshot_id,
346 parent_snapshot_id: v2.parent_snapshot_id,
347 sequence_number: v2.sequence_number,
348 timestamp_ms: v2.timestamp_ms,
349 manifest_list: v2.manifest_list,
350 summary: v2.summary,
351 schema_id: v2.schema_id,
352 encryption_key_id: None,
353 row_range: None,
354 }
355 }
356 }
357
358 impl From<Snapshot> for SnapshotV2 {
359 fn from(v2: Snapshot) -> Self {
360 SnapshotV2 {
361 snapshot_id: v2.snapshot_id,
362 parent_snapshot_id: v2.parent_snapshot_id,
363 sequence_number: v2.sequence_number,
364 timestamp_ms: v2.timestamp_ms,
365 manifest_list: v2.manifest_list,
366 summary: v2.summary,
367 schema_id: v2.schema_id,
368 }
369 }
370 }
371
372 impl TryFrom<SnapshotV1> for Snapshot {
373 type Error = Error;
374
375 fn try_from(v1: SnapshotV1) -> Result<Self, Self::Error> {
376 Ok(Snapshot {
377 snapshot_id: v1.snapshot_id,
378 parent_snapshot_id: v1.parent_snapshot_id,
379 sequence_number: 0,
380 timestamp_ms: v1.timestamp_ms,
381 manifest_list: match (v1.manifest_list, v1.manifests) {
382 (Some(file), None) => file,
383 (Some(_), Some(_)) => {
384 return Err(Error::new(
385 ErrorKind::DataInvalid,
386 "Invalid v1 snapshot, when manifest list provided, manifest files should be omitted",
387 ));
388 }
389 (None, _) => {
390 return Err(Error::new(
391 ErrorKind::DataInvalid,
392 "Unsupported v1 snapshot, only manifest list is supported",
393 ));
394 }
395 },
396 summary: v1.summary.unwrap_or(Summary {
397 operation: Operation::default(),
398 additional_properties: HashMap::new(),
399 }),
400 schema_id: v1.schema_id,
401 encryption_key_id: None,
402 row_range: None,
403 })
404 }
405 }
406
407 impl From<Snapshot> for SnapshotV1 {
408 fn from(v2: Snapshot) -> Self {
409 SnapshotV1 {
410 snapshot_id: v2.snapshot_id,
411 parent_snapshot_id: v2.parent_snapshot_id,
412 timestamp_ms: v2.timestamp_ms,
413 manifest_list: Some(v2.manifest_list),
414 summary: Some(v2.summary),
415 schema_id: v2.schema_id,
416 manifests: None,
417 }
418 }
419 }
420}
421
422#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
423#[serde(rename_all = "kebab-case")]
424pub struct SnapshotReference {
426 pub snapshot_id: i64,
428 #[serde(flatten)]
429 pub retention: SnapshotRetention,
431}
432
433impl SnapshotReference {
434 pub fn is_branch(&self) -> bool {
436 matches!(self.retention, SnapshotRetention::Branch { .. })
437 }
438}
439
440impl SnapshotReference {
441 pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self {
443 SnapshotReference {
444 snapshot_id,
445 retention,
446 }
447 }
448}
449
450#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
451#[serde(rename_all = "lowercase", tag = "type")]
452pub enum SnapshotRetention {
454 #[serde(rename_all = "kebab-case")]
455 Branch {
458 #[serde(skip_serializing_if = "Option::is_none")]
461 min_snapshots_to_keep: Option<i32>,
462 #[serde(skip_serializing_if = "Option::is_none")]
465 max_snapshot_age_ms: Option<i64>,
466 #[serde(skip_serializing_if = "Option::is_none")]
469 max_ref_age_ms: Option<i64>,
470 },
471 #[serde(rename_all = "kebab-case")]
472 Tag {
474 #[serde(skip_serializing_if = "Option::is_none")]
477 max_ref_age_ms: Option<i64>,
478 },
479}
480
481impl SnapshotRetention {
482 pub fn branch(
484 min_snapshots_to_keep: Option<i32>,
485 max_snapshot_age_ms: Option<i64>,
486 max_ref_age_ms: Option<i64>,
487 ) -> Self {
488 SnapshotRetention::Branch {
489 min_snapshots_to_keep,
490 max_snapshot_age_ms,
491 max_ref_age_ms,
492 }
493 }
494}
495
496#[cfg(test)]
497mod tests {
498 use std::collections::HashMap;
499
500 use chrono::{TimeZone, Utc};
501
502 use crate::spec::TableMetadata;
503 use crate::spec::snapshot::_serde::SnapshotV1;
504 use crate::spec::snapshot::{Operation, Snapshot, Summary};
505
506 #[test]
507 fn schema() {
508 let record = r#"
509 {
510 "snapshot-id": 3051729675574597004,
511 "timestamp-ms": 1515100955770,
512 "summary": {
513 "operation": "append"
514 },
515 "manifest-list": "s3://b/wh/.../s1.avro",
516 "schema-id": 0
517 }
518 "#;
519
520 let result: Snapshot = serde_json::from_str::<SnapshotV1>(record)
521 .unwrap()
522 .try_into()
523 .unwrap();
524 assert_eq!(3051729675574597004, result.snapshot_id());
525 assert_eq!(
526 Utc.timestamp_millis_opt(1515100955770).unwrap(),
527 result.timestamp().unwrap()
528 );
529 assert_eq!(1515100955770, result.timestamp_ms());
530 assert_eq!(
531 Summary {
532 operation: Operation::Append,
533 additional_properties: HashMap::new()
534 },
535 *result.summary()
536 );
537 assert_eq!("s3://b/wh/.../s1.avro".to_string(), *result.manifest_list());
538 }
539
540 #[test]
541 fn test_snapshot_v1_to_v2_projection() {
542 use crate::spec::snapshot::_serde::SnapshotV1;
543
544 let v1_snapshot = SnapshotV1 {
546 snapshot_id: 1234567890,
547 parent_snapshot_id: Some(987654321),
548 timestamp_ms: 1515100955770,
549 manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
550 manifests: None, summary: Some(Summary {
552 operation: Operation::Append,
553 additional_properties: HashMap::from([
554 ("added-files".to_string(), "5".to_string()),
555 ("added-records".to_string(), "100".to_string()),
556 ]),
557 }),
558 schema_id: Some(1),
559 };
560
561 let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
563
564 assert_eq!(
566 v2_snapshot.sequence_number(),
567 0,
568 "V1 snapshot sequence_number should default to 0"
569 );
570
571 assert_eq!(v2_snapshot.snapshot_id(), 1234567890);
573 assert_eq!(v2_snapshot.parent_snapshot_id(), Some(987654321));
574 assert_eq!(v2_snapshot.timestamp_ms(), 1515100955770);
575 assert_eq!(
576 v2_snapshot.manifest_list(),
577 "s3://bucket/manifest-list.avro"
578 );
579 assert_eq!(v2_snapshot.schema_id(), Some(1));
580 assert_eq!(v2_snapshot.summary().operation, Operation::Append);
581 assert_eq!(
582 v2_snapshot
583 .summary()
584 .additional_properties
585 .get("added-files"),
586 Some(&"5".to_string())
587 );
588 }
589
590 #[test]
591 fn test_v1_snapshot_with_manifest_list_and_manifests() {
592 {
593 let metadata = r#"
594 {
595 "format-version": 1,
596 "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
597 "location": "s3://bucket/test/location",
598 "last-updated-ms": 1700000000000,
599 "last-column-id": 1,
600 "schema": {
601 "type": "struct",
602 "fields": [
603 {"id": 1, "name": "x", "required": true, "type": "long"}
604 ]
605 },
606 "partition-spec": [],
607 "properties": {},
608 "current-snapshot-id": 111111111,
609 "snapshots": [
610 {
611 "snapshot-id": 111111111,
612 "timestamp-ms": 1600000000000,
613 "summary": {"operation": "append"},
614 "manifest-list": "s3://bucket/metadata/snap-123.avro",
615 "manifests": ["s3://bucket/metadata/manifest-1.avro"]
616 }
617 ]
618 }
619 "#;
620
621 let result_both_manifest_list_and_manifest_set =
622 serde_json::from_str::<TableMetadata>(metadata);
623 assert!(result_both_manifest_list_and_manifest_set.is_err());
624 assert_eq!(
625 result_both_manifest_list_and_manifest_set
626 .unwrap_err()
627 .to_string(),
628 "DataInvalid => Invalid v1 snapshot, when manifest list provided, manifest files should be omitted"
629 )
630 }
631
632 {
633 let metadata = r#"
634 {
635 "format-version": 1,
636 "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
637 "location": "s3://bucket/test/location",
638 "last-updated-ms": 1700000000000,
639 "last-column-id": 1,
640 "schema": {
641 "type": "struct",
642 "fields": [
643 {"id": 1, "name": "x", "required": true, "type": "long"}
644 ]
645 },
646 "partition-spec": [],
647 "properties": {},
648 "current-snapshot-id": 111111111,
649 "snapshots": [
650 {
651 "snapshot-id": 111111111,
652 "timestamp-ms": 1600000000000,
653 "summary": {"operation": "append"},
654 "manifests": ["s3://bucket/metadata/manifest-1.avro"]
655 }
656 ]
657 }
658 "#;
659 let result_missing_manifest_list = serde_json::from_str::<TableMetadata>(metadata);
660 assert!(result_missing_manifest_list.is_err());
661 assert_eq!(
662 result_missing_manifest_list.unwrap_err().to_string(),
663 "DataInvalid => Unsupported v1 snapshot, only manifest list is supported"
664 )
665 }
666 }
667
668 #[test]
669 fn test_snapshot_v1_to_v2_with_missing_summary() {
670 use crate::spec::snapshot::_serde::SnapshotV1;
671
672 let v1_snapshot = SnapshotV1 {
674 snapshot_id: 1111111111,
675 parent_snapshot_id: None,
676 timestamp_ms: 1515100955770,
677 manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
678 manifests: None,
679 summary: None, schema_id: None,
681 };
682
683 let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
685
686 assert_eq!(
688 v2_snapshot.sequence_number(),
689 0,
690 "V1 snapshot sequence_number should default to 0"
691 );
692 assert_eq!(
693 v2_snapshot.summary().operation,
694 Operation::Append,
695 "Missing V1 summary should default to Append operation"
696 );
697 assert!(
698 v2_snapshot.summary().additional_properties.is_empty(),
699 "Default summary should have empty additional_properties"
700 );
701
702 assert_eq!(v2_snapshot.snapshot_id(), 1111111111);
704 assert_eq!(v2_snapshot.parent_snapshot_id(), None);
705 assert_eq!(v2_snapshot.schema_id(), None);
706 }
707}