1use std::collections::HashMap;
22use std::sync::Arc;
23
24use chrono::{DateTime, Utc};
25use serde::{Deserialize, Serialize};
26use typed_builder::TypedBuilder;
27
28use super::table_metadata::SnapshotLog;
29use crate::error::{Result, timestamp_ms_to_utc};
30use crate::io::FileIO;
31use crate::spec::{ManifestList, SchemaId, SchemaRef, TableMetadata};
32use crate::{Error, ErrorKind};
33
34pub const MAIN_BRANCH: &str = "main";
36pub const UNASSIGNED_SNAPSHOT_ID: i64 = -1;
38
39pub type SnapshotRef = Arc<Snapshot>;
41#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
42#[serde(rename_all = "lowercase")]
43pub enum Operation {
45 Append,
47 Replace,
50 Overwrite,
52 Delete,
54}
55
56impl Operation {
57 pub fn as_str(&self) -> &str {
59 match self {
60 Operation::Append => "append",
61 Operation::Replace => "replace",
62 Operation::Overwrite => "overwrite",
63 Operation::Delete => "delete",
64 }
65 }
66}
67
68#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
69pub struct Summary {
71 pub operation: Operation,
73 #[serde(flatten)]
75 pub additional_properties: HashMap<String, String>,
76}
77
78impl Default for Operation {
79 fn default() -> Operation {
80 Self::Append
81 }
82}
83
84#[derive(Debug, PartialEq, Eq, Clone)]
85pub struct SnapshotRowRange {
87 pub first_row_id: u64,
89 pub added_rows: u64,
91}
92
93#[derive(Debug, PartialEq, Eq, Clone, TypedBuilder)]
94#[builder(field_defaults(setter(prefix = "with_")))]
95pub struct Snapshot {
97 pub(crate) snapshot_id: i64,
99 #[builder(default = None)]
102 pub(crate) parent_snapshot_id: Option<i64>,
103 pub(crate) sequence_number: i64,
106 pub(crate) timestamp_ms: i64,
109 #[builder(setter(into))]
113 pub(crate) manifest_list: String,
114 pub(crate) summary: Summary,
116 #[builder(setter(strip_option(fallback = schema_id_opt)), default = None)]
118 pub(crate) schema_id: Option<SchemaId>,
119 #[builder(default)]
121 pub(crate) encryption_key_id: Option<String>,
122 #[builder(default, setter(!strip_option, transform = |first_row_id: u64, added_rows: u64| Some(SnapshotRowRange { first_row_id, added_rows })))]
125 pub(crate) row_range: Option<SnapshotRowRange>,
130}
131
132impl Snapshot {
133 #[inline]
135 pub fn snapshot_id(&self) -> i64 {
136 self.snapshot_id
137 }
138
139 #[inline]
141 pub fn parent_snapshot_id(&self) -> Option<i64> {
142 self.parent_snapshot_id
143 }
144
145 #[inline]
147 pub fn sequence_number(&self) -> i64 {
148 self.sequence_number
149 }
150 #[inline]
152 pub fn manifest_list(&self) -> &str {
153 &self.manifest_list
154 }
155
156 #[inline]
158 pub fn summary(&self) -> &Summary {
159 &self.summary
160 }
161 #[inline]
163 pub fn timestamp(&self) -> Result<DateTime<Utc>> {
164 timestamp_ms_to_utc(self.timestamp_ms)
165 }
166
167 #[inline]
169 pub fn timestamp_ms(&self) -> i64 {
170 self.timestamp_ms
171 }
172
173 #[inline]
175 pub fn schema_id(&self) -> Option<SchemaId> {
176 self.schema_id
177 }
178
179 pub fn schema(&self, table_metadata: &TableMetadata) -> Result<SchemaRef> {
181 Ok(match self.schema_id() {
182 Some(schema_id) => table_metadata
183 .schema_by_id(schema_id)
184 .ok_or_else(|| {
185 Error::new(
186 ErrorKind::DataInvalid,
187 format!("Schema with id {schema_id} not found"),
188 )
189 })?
190 .clone(),
191 None => table_metadata.current_schema().clone(),
192 })
193 }
194
195 #[cfg(test)]
197 pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option<SnapshotRef> {
198 match self.parent_snapshot_id {
199 Some(id) => table_metadata.snapshot_by_id(id).cloned(),
200 None => None,
201 }
202 }
203
204 pub async fn load_manifest_list(
206 &self,
207 file_io: &FileIO,
208 table_metadata: &TableMetadata,
209 ) -> Result<ManifestList> {
210 let manifest_list_content = file_io.new_input(&self.manifest_list)?.read().await?;
211 ManifestList::parse_with_version(
212 &manifest_list_content,
213 table_metadata.format_version(),
216 )
217 }
218
219 #[allow(dead_code)]
220 pub(crate) fn log(&self) -> SnapshotLog {
221 SnapshotLog {
222 timestamp_ms: self.timestamp_ms,
223 snapshot_id: self.snapshot_id,
224 }
225 }
226
227 pub fn first_row_id(&self) -> Option<u64> {
234 self.row_range.as_ref().map(|r| r.first_row_id)
235 }
236
237 pub fn added_rows_count(&self) -> Option<u64> {
242 self.row_range.as_ref().map(|r| r.added_rows)
243 }
244
245 pub fn row_range(&self) -> Option<(u64, u64)> {
248 self.row_range
249 .as_ref()
250 .map(|r| (r.first_row_id, r.added_rows))
251 }
252
253 pub fn encryption_key_id(&self) -> Option<&str> {
255 self.encryption_key_id.as_deref()
256 }
257}
258
259pub(super) mod _serde {
260 use std::collections::HashMap;
265
266 use serde::{Deserialize, Serialize};
267
268 use super::{Operation, Snapshot, Summary};
269 use crate::Error;
270 use crate::spec::SchemaId;
271 use crate::spec::snapshot::SnapshotRowRange;
272
273 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
274 #[serde(rename_all = "kebab-case")]
275 pub(crate) struct SnapshotV3 {
277 pub snapshot_id: i64,
278 #[serde(skip_serializing_if = "Option::is_none")]
279 pub parent_snapshot_id: Option<i64>,
280 pub sequence_number: i64,
281 pub timestamp_ms: i64,
282 pub manifest_list: String,
283 pub summary: Summary,
284 #[serde(skip_serializing_if = "Option::is_none")]
285 pub schema_id: Option<SchemaId>,
286 pub first_row_id: u64,
287 pub added_rows: u64,
288 #[serde(skip_serializing_if = "Option::is_none")]
289 pub key_id: Option<String>,
290 }
291
292 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
293 #[serde(rename_all = "kebab-case")]
294 pub(crate) struct SnapshotV2 {
296 pub snapshot_id: i64,
297 #[serde(skip_serializing_if = "Option::is_none")]
298 pub parent_snapshot_id: Option<i64>,
299 pub sequence_number: i64,
300 pub timestamp_ms: i64,
301 pub manifest_list: String,
302 pub summary: Summary,
303 #[serde(skip_serializing_if = "Option::is_none")]
304 pub schema_id: Option<SchemaId>,
305 }
306
307 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
308 #[serde(rename_all = "kebab-case")]
309 pub(crate) struct SnapshotV1 {
311 pub snapshot_id: i64,
312 #[serde(skip_serializing_if = "Option::is_none")]
313 pub parent_snapshot_id: Option<i64>,
314 pub timestamp_ms: i64,
315 #[serde(skip_serializing_if = "Option::is_none")]
316 pub manifest_list: Option<String>,
317 #[serde(skip_serializing_if = "Option::is_none")]
318 pub manifests: Option<Vec<String>>,
319 #[serde(skip_serializing_if = "Option::is_none")]
320 pub summary: Option<Summary>,
321 #[serde(skip_serializing_if = "Option::is_none")]
322 pub schema_id: Option<SchemaId>,
323 }
324
325 impl From<SnapshotV3> for Snapshot {
326 fn from(s: SnapshotV3) -> Self {
327 Snapshot {
328 snapshot_id: s.snapshot_id,
329 parent_snapshot_id: s.parent_snapshot_id,
330 sequence_number: s.sequence_number,
331 timestamp_ms: s.timestamp_ms,
332 manifest_list: s.manifest_list,
333 summary: s.summary,
334 schema_id: s.schema_id,
335 encryption_key_id: s.key_id,
336 row_range: Some(SnapshotRowRange {
337 first_row_id: s.first_row_id,
338 added_rows: s.added_rows,
339 }),
340 }
341 }
342 }
343
344 impl TryFrom<Snapshot> for SnapshotV3 {
345 type Error = Error;
346
347 fn try_from(s: Snapshot) -> Result<Self, Self::Error> {
348 let row_range = s.row_range.ok_or_else(|| {
349 Error::new(
350 crate::ErrorKind::DataInvalid,
351 "v3 Snapshots must have first-row-id and rows-added fields set.".to_string(),
352 )
353 })?;
354
355 Ok(SnapshotV3 {
356 snapshot_id: s.snapshot_id,
357 parent_snapshot_id: s.parent_snapshot_id,
358 sequence_number: s.sequence_number,
359 timestamp_ms: s.timestamp_ms,
360 manifest_list: s.manifest_list,
361 summary: s.summary,
362 schema_id: s.schema_id,
363 first_row_id: row_range.first_row_id,
364 added_rows: row_range.added_rows,
365 key_id: s.encryption_key_id,
366 })
367 }
368 }
369
370 impl From<SnapshotV2> for Snapshot {
371 fn from(v2: SnapshotV2) -> Self {
372 Snapshot {
373 snapshot_id: v2.snapshot_id,
374 parent_snapshot_id: v2.parent_snapshot_id,
375 sequence_number: v2.sequence_number,
376 timestamp_ms: v2.timestamp_ms,
377 manifest_list: v2.manifest_list,
378 summary: v2.summary,
379 schema_id: v2.schema_id,
380 encryption_key_id: None,
381 row_range: None,
382 }
383 }
384 }
385
386 impl From<Snapshot> for SnapshotV2 {
387 fn from(v2: Snapshot) -> Self {
388 SnapshotV2 {
389 snapshot_id: v2.snapshot_id,
390 parent_snapshot_id: v2.parent_snapshot_id,
391 sequence_number: v2.sequence_number,
392 timestamp_ms: v2.timestamp_ms,
393 manifest_list: v2.manifest_list,
394 summary: v2.summary,
395 schema_id: v2.schema_id,
396 }
397 }
398 }
399
400 impl TryFrom<SnapshotV1> for Snapshot {
401 type Error = Error;
402
403 fn try_from(v1: SnapshotV1) -> Result<Self, Self::Error> {
404 Ok(Snapshot {
405 snapshot_id: v1.snapshot_id,
406 parent_snapshot_id: v1.parent_snapshot_id,
407 sequence_number: 0,
408 timestamp_ms: v1.timestamp_ms,
409 manifest_list: match (v1.manifest_list, v1.manifests) {
410 (Some(file), None) => file,
411 (Some(_), Some(_)) => "Invalid v1 snapshot, when manifest list provided, manifest files should be omitted".to_string(),
412 (None, _) => "Unsupported v1 snapshot, only manifest list is supported".to_string()
413 },
414 summary: v1.summary.unwrap_or(Summary {
415 operation: Operation::default(),
416 additional_properties: HashMap::new(),
417 }),
418 schema_id: v1.schema_id,
419 encryption_key_id: None,
420 row_range: None,
421 })
422 }
423 }
424
425 impl From<Snapshot> for SnapshotV1 {
426 fn from(v2: Snapshot) -> Self {
427 SnapshotV1 {
428 snapshot_id: v2.snapshot_id,
429 parent_snapshot_id: v2.parent_snapshot_id,
430 timestamp_ms: v2.timestamp_ms,
431 manifest_list: Some(v2.manifest_list),
432 summary: Some(v2.summary),
433 schema_id: v2.schema_id,
434 manifests: None,
435 }
436 }
437 }
438}
439
440#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
441#[serde(rename_all = "kebab-case")]
442pub struct SnapshotReference {
444 pub snapshot_id: i64,
446 #[serde(flatten)]
447 pub retention: SnapshotRetention,
449}
450
451impl SnapshotReference {
452 pub fn is_branch(&self) -> bool {
454 matches!(self.retention, SnapshotRetention::Branch { .. })
455 }
456}
457
458impl SnapshotReference {
459 pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self {
461 SnapshotReference {
462 snapshot_id,
463 retention,
464 }
465 }
466}
467
468#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
469#[serde(rename_all = "lowercase", tag = "type")]
470pub enum SnapshotRetention {
472 #[serde(rename_all = "kebab-case")]
473 Branch {
476 #[serde(skip_serializing_if = "Option::is_none")]
479 min_snapshots_to_keep: Option<i32>,
480 #[serde(skip_serializing_if = "Option::is_none")]
483 max_snapshot_age_ms: Option<i64>,
484 #[serde(skip_serializing_if = "Option::is_none")]
487 max_ref_age_ms: Option<i64>,
488 },
489 #[serde(rename_all = "kebab-case")]
490 Tag {
492 #[serde(skip_serializing_if = "Option::is_none")]
495 max_ref_age_ms: Option<i64>,
496 },
497}
498
499impl SnapshotRetention {
500 pub fn branch(
502 min_snapshots_to_keep: Option<i32>,
503 max_snapshot_age_ms: Option<i64>,
504 max_ref_age_ms: Option<i64>,
505 ) -> Self {
506 SnapshotRetention::Branch {
507 min_snapshots_to_keep,
508 max_snapshot_age_ms,
509 max_ref_age_ms,
510 }
511 }
512}
513
514#[cfg(test)]
515mod tests {
516 use std::collections::HashMap;
517
518 use chrono::{TimeZone, Utc};
519
520 use crate::spec::snapshot::_serde::SnapshotV1;
521 use crate::spec::snapshot::{Operation, Snapshot, Summary};
522
523 #[test]
524 fn schema() {
525 let record = r#"
526 {
527 "snapshot-id": 3051729675574597004,
528 "timestamp-ms": 1515100955770,
529 "summary": {
530 "operation": "append"
531 },
532 "manifest-list": "s3://b/wh/.../s1.avro",
533 "schema-id": 0
534 }
535 "#;
536
537 let result: Snapshot = serde_json::from_str::<SnapshotV1>(record)
538 .unwrap()
539 .try_into()
540 .unwrap();
541 assert_eq!(3051729675574597004, result.snapshot_id());
542 assert_eq!(
543 Utc.timestamp_millis_opt(1515100955770).unwrap(),
544 result.timestamp().unwrap()
545 );
546 assert_eq!(1515100955770, result.timestamp_ms());
547 assert_eq!(
548 Summary {
549 operation: Operation::Append,
550 additional_properties: HashMap::new()
551 },
552 *result.summary()
553 );
554 assert_eq!("s3://b/wh/.../s1.avro".to_string(), *result.manifest_list());
555 }
556
557 #[test]
558 fn test_snapshot_v1_to_v2_projection() {
559 use crate::spec::snapshot::_serde::SnapshotV1;
560
561 let v1_snapshot = SnapshotV1 {
563 snapshot_id: 1234567890,
564 parent_snapshot_id: Some(987654321),
565 timestamp_ms: 1515100955770,
566 manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
567 manifests: None, summary: Some(Summary {
569 operation: Operation::Append,
570 additional_properties: HashMap::from([
571 ("added-files".to_string(), "5".to_string()),
572 ("added-records".to_string(), "100".to_string()),
573 ]),
574 }),
575 schema_id: Some(1),
576 };
577
578 let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
580
581 assert_eq!(
583 v2_snapshot.sequence_number(),
584 0,
585 "V1 snapshot sequence_number should default to 0"
586 );
587
588 assert_eq!(v2_snapshot.snapshot_id(), 1234567890);
590 assert_eq!(v2_snapshot.parent_snapshot_id(), Some(987654321));
591 assert_eq!(v2_snapshot.timestamp_ms(), 1515100955770);
592 assert_eq!(
593 v2_snapshot.manifest_list(),
594 "s3://bucket/manifest-list.avro"
595 );
596 assert_eq!(v2_snapshot.schema_id(), Some(1));
597 assert_eq!(v2_snapshot.summary().operation, Operation::Append);
598 assert_eq!(
599 v2_snapshot
600 .summary()
601 .additional_properties
602 .get("added-files"),
603 Some(&"5".to_string())
604 );
605 }
606
607 #[test]
608 fn test_snapshot_v1_to_v2_with_missing_summary() {
609 use crate::spec::snapshot::_serde::SnapshotV1;
610
611 let v1_snapshot = SnapshotV1 {
613 snapshot_id: 1111111111,
614 parent_snapshot_id: None,
615 timestamp_ms: 1515100955770,
616 manifest_list: Some("s3://bucket/manifest-list.avro".to_string()),
617 manifests: None,
618 summary: None, schema_id: None,
620 };
621
622 let v2_snapshot: Snapshot = v1_snapshot.try_into().unwrap();
624
625 assert_eq!(
627 v2_snapshot.sequence_number(),
628 0,
629 "V1 snapshot sequence_number should default to 0"
630 );
631 assert_eq!(
632 v2_snapshot.summary().operation,
633 Operation::Append,
634 "Missing V1 summary should default to Append operation"
635 );
636 assert!(
637 v2_snapshot.summary().additional_properties.is_empty(),
638 "Default summary should have empty additional_properties"
639 );
640
641 assert_eq!(v2_snapshot.snapshot_id(), 1111111111);
643 assert_eq!(v2_snapshot.parent_snapshot_id(), None);
644 assert_eq!(v2_snapshot.schema_id(), None);
645 }
646}