iceberg/util/
snapshot.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::spec::{SnapshotRef, TableMetadataRef};
19
20struct Ancestors {
21    next: Option<SnapshotRef>,
22    get_snapshot: Box<dyn Fn(i64) -> Option<SnapshotRef> + Send>,
23}
24
25impl Iterator for Ancestors {
26    type Item = SnapshotRef;
27
28    fn next(&mut self) -> Option<Self::Item> {
29        let snapshot = self.next.take()?;
30        self.next = snapshot
31            .parent_snapshot_id()
32            .and_then(|id| (self.get_snapshot)(id));
33        Some(snapshot)
34    }
35}
36
37/// Iterate starting from `snapshot_id` (inclusive) to the root snapshot.
38pub fn ancestors_of(
39    table_metadata: &TableMetadataRef,
40    snapshot_id: i64,
41) -> impl Iterator<Item = SnapshotRef> + Send {
42    let initial = table_metadata.snapshot_by_id(snapshot_id).cloned();
43    let table_metadata = table_metadata.clone();
44    Ancestors {
45        next: initial,
46        get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()),
47    }
48}
49
50/// Iterate starting from `latest_snapshot_id` (inclusive) to `oldest_snapshot_id` (exclusive).
51pub fn ancestors_between(
52    table_metadata: &TableMetadataRef,
53    latest_snapshot_id: i64,
54    oldest_snapshot_id: Option<i64>,
55) -> impl Iterator<Item = SnapshotRef> + Send {
56    ancestors_of(table_metadata, latest_snapshot_id).take_while(move |snapshot| {
57        oldest_snapshot_id
58            .map(|id| snapshot.snapshot_id() != id)
59            .unwrap_or(true)
60    })
61}
62
63#[cfg(test)]
64mod tests {
65    use super::*;
66    use crate::scan::tests::TableTestFixture;
67
68    // Five snapshots chained as: S1 (root) -> S2 -> S3 -> S4 -> S5 (current)
69    const S1: i64 = 3051729675574597004;
70    const S2: i64 = 3055729675574597004;
71    const S3: i64 = 3056729675574597004;
72    const S4: i64 = 3057729675574597004;
73    const S5: i64 = 3059729675574597004;
74
75    fn metadata() -> TableMetadataRef {
76        let fixture = TableTestFixture::new_with_deep_history();
77        std::sync::Arc::new(fixture.table.metadata().clone())
78    }
79
80    // --- ancestors_of ---
81
82    #[test]
83    fn test_ancestors_of_nonexistent_snapshot_returns_empty() {
84        let meta = metadata();
85        let ids: Vec<i64> = ancestors_of(&meta, 999).map(|s| s.snapshot_id()).collect();
86        assert!(ids.is_empty());
87    }
88
89    #[test]
90    fn test_ancestors_of_root_returns_only_root() {
91        let meta = metadata();
92        let ids: Vec<i64> = ancestors_of(&meta, S1).map(|s| s.snapshot_id()).collect();
93        assert_eq!(ids, vec![S1]);
94    }
95
96    #[test]
97    fn test_ancestors_of_leaf_returns_full_chain() {
98        let meta = metadata();
99        let ids: Vec<i64> = ancestors_of(&meta, S5).map(|s| s.snapshot_id()).collect();
100        assert_eq!(ids, vec![S5, S4, S3, S2, S1]);
101    }
102
103    #[test]
104    fn test_ancestors_of_mid_chain_returns_partial_chain() {
105        let meta = metadata();
106        let ids: Vec<i64> = ancestors_of(&meta, S3).map(|s| s.snapshot_id()).collect();
107        assert_eq!(ids, vec![S3, S2, S1]);
108    }
109
110    #[test]
111    fn test_ancestors_of_second_snapshot() {
112        let meta = metadata();
113        let ids: Vec<i64> = ancestors_of(&meta, S2).map(|s| s.snapshot_id()).collect();
114        assert_eq!(ids, vec![S2, S1]);
115    }
116
117    // --- ancestors_between ---
118
119    #[test]
120    fn test_ancestors_between_same_id_returns_empty() {
121        let meta = metadata();
122        let ids: Vec<i64> = ancestors_between(&meta, S3, Some(S3))
123            .map(|s| s.snapshot_id())
124            .collect();
125        assert!(ids.is_empty());
126    }
127
128    #[test]
129    fn test_ancestors_between_no_oldest_returns_all_ancestors() {
130        let meta = metadata();
131        let ids: Vec<i64> = ancestors_between(&meta, S5, None)
132            .map(|s| s.snapshot_id())
133            .collect();
134        assert_eq!(ids, vec![S5, S4, S3, S2, S1]);
135    }
136
137    #[test]
138    fn test_ancestors_between_excludes_oldest_snapshot() {
139        let meta = metadata();
140        // S5 down to (but not including) S2
141        let ids: Vec<i64> = ancestors_between(&meta, S5, Some(S2))
142            .map(|s| s.snapshot_id())
143            .collect();
144        assert_eq!(ids, vec![S5, S4, S3]);
145    }
146
147    #[test]
148    fn test_ancestors_between_adjacent_snapshots() {
149        let meta = metadata();
150        // S3 down to (but not including) S2 — only S3 itself
151        let ids: Vec<i64> = ancestors_between(&meta, S3, Some(S2))
152            .map(|s| s.snapshot_id())
153            .collect();
154        assert_eq!(ids, vec![S3]);
155    }
156
157    #[test]
158    fn test_ancestors_between_leaf_and_root() {
159        let meta = metadata();
160        // S5 down to (but not including) S1
161        let ids: Vec<i64> = ancestors_between(&meta, S5, Some(S1))
162            .map(|s| s.snapshot_id())
163            .collect();
164        assert_eq!(ids, vec![S5, S4, S3, S2]);
165    }
166
167    #[test]
168    fn test_ancestors_between_nonexistent_oldest_returns_full_chain() {
169        let meta = metadata();
170        // oldest_snapshot_id doesn't exist in the chain, so take_while never stops
171        let ids: Vec<i64> = ancestors_between(&meta, S5, Some(999))
172            .map(|s| s.snapshot_id())
173            .collect();
174        assert_eq!(ids, vec![S5, S4, S3, S2, S1]);
175    }
176
177    #[test]
178    fn test_ancestors_between_nonexistent_latest_returns_empty() {
179        let meta = metadata();
180        let ids: Vec<i64> = ancestors_between(&meta, 999, Some(S1))
181            .map(|s| s.snapshot_id())
182            .collect();
183        assert!(ids.is_empty());
184    }
185}