iceberg/inspect/
snapshots.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use arrow_array::RecordBatch;
22use arrow_array::builder::{MapBuilder, MapFieldNames, PrimitiveBuilder, StringBuilder};
23use arrow_array::types::{Int64Type, TimestampMicrosecondType};
24use arrow_schema::{DataType, Field};
25use futures::{StreamExt, stream};
26use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
27
28use crate::Result;
29use crate::arrow::{DEFAULT_MAP_FIELD_NAME, schema_to_arrow_schema};
30use crate::scan::ArrowRecordBatchStream;
31use crate::spec::{
32    MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, MapType, NestedField, PrimitiveType, Type,
33};
34use crate::table::Table;
35
36/// Snapshots table.
37pub struct SnapshotsTable<'a> {
38    table: &'a Table,
39}
40
41impl<'a> SnapshotsTable<'a> {
42    /// Create a new Snapshots table instance.
43    pub fn new(table: &'a Table) -> Self {
44        Self { table }
45    }
46
47    /// Returns the iceberg schema of the snapshots table.
48    pub fn schema(&self) -> crate::spec::Schema {
49        let fields = vec![
50            NestedField::required(
51                1,
52                "committed_at",
53                Type::Primitive(PrimitiveType::Timestamptz),
54            ),
55            NestedField::required(2, "snapshot_id", Type::Primitive(PrimitiveType::Long)),
56            NestedField::optional(3, "parent_id", Type::Primitive(PrimitiveType::Long)),
57            NestedField::optional(4, "operation", Type::Primitive(PrimitiveType::String)),
58            NestedField::optional(5, "manifest_list", Type::Primitive(PrimitiveType::String)),
59            NestedField::optional(
60                6,
61                "summary",
62                Type::Map(MapType {
63                    key_field: Arc::new(NestedField::map_key_element(
64                        7,
65                        Type::Primitive(PrimitiveType::String),
66                    )),
67                    value_field: Arc::new(NestedField::map_value_element(
68                        8,
69                        Type::Primitive(PrimitiveType::String),
70                        false,
71                    )),
72                }),
73            ),
74        ];
75        crate::spec::Schema::builder()
76            .with_fields(fields.into_iter().map(|f| f.into()))
77            .build()
78            .unwrap()
79    }
80
81    /// Scans the snapshots table.
82    pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
83        let schema = schema_to_arrow_schema(&self.schema())?;
84
85        let mut committed_at =
86            PrimitiveBuilder::<TimestampMicrosecondType>::new().with_timezone("+00:00");
87        let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
88        let mut parent_id = PrimitiveBuilder::<Int64Type>::new();
89        let mut operation = StringBuilder::new();
90        let mut manifest_list = StringBuilder::new();
91        let mut summary = MapBuilder::new(
92            Some(MapFieldNames {
93                entry: DEFAULT_MAP_FIELD_NAME.to_string(),
94                key: MAP_KEY_FIELD_NAME.to_string(),
95                value: MAP_VALUE_FIELD_NAME.to_string(),
96            }),
97            StringBuilder::new(),
98            StringBuilder::new(),
99        )
100        .with_keys_field(Arc::new(
101            Field::new(MAP_KEY_FIELD_NAME, DataType::Utf8, false).with_metadata(HashMap::from([(
102                PARQUET_FIELD_ID_META_KEY.to_string(),
103                "7".to_string(),
104            )])),
105        ))
106        .with_values_field(Arc::new(
107            Field::new(MAP_VALUE_FIELD_NAME, DataType::Utf8, true).with_metadata(HashMap::from([
108                (PARQUET_FIELD_ID_META_KEY.to_string(), "8".to_string()),
109            ])),
110        ));
111        for snapshot in self.table.metadata().snapshots() {
112            committed_at.append_value(snapshot.timestamp_ms() * 1000);
113            snapshot_id.append_value(snapshot.snapshot_id());
114            parent_id.append_option(snapshot.parent_snapshot_id());
115            manifest_list.append_value(snapshot.manifest_list());
116            operation.append_value(snapshot.summary().operation.as_str());
117            for (key, value) in &snapshot.summary().additional_properties {
118                summary.keys().append_value(key);
119                summary.values().append_value(value);
120            }
121            summary.append(true)?;
122        }
123
124        let batch = RecordBatch::try_new(Arc::new(schema), vec![
125            Arc::new(committed_at.finish()),
126            Arc::new(snapshot_id.finish()),
127            Arc::new(parent_id.finish()),
128            Arc::new(operation.finish()),
129            Arc::new(manifest_list.finish()),
130            Arc::new(summary.finish()),
131        ])?;
132
133        Ok(stream::iter(vec![Ok(batch)]).boxed())
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use expect_test::expect;
140    use futures::TryStreamExt;
141
142    use crate::scan::tests::TableTestFixture;
143    use crate::test_utils::check_record_batches;
144
145    #[tokio::test]
146    async fn test_snapshots_table() {
147        let table = TableTestFixture::new().table;
148
149        let batch_stream = table.inspect().snapshots().scan().await.unwrap();
150
151        check_record_batches(
152            batch_stream.try_collect::<Vec<_>>().await.unwrap(),
153            expect![[r#"
154                Field { name: "committed_at", data_type: Timestamp(Microsecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
155                Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} },
156                Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} },
157                Field { name: "operation", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} },
158                Field { name: "manifest_list", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5"} },
159                Field { name: "summary", data_type: Map(Field { name: "key_value", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "7"} }, Field { name: "value", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "8"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "6"} }"#]],
160            expect![[r#"
161                committed_at: PrimitiveArray<Timestamp(Microsecond, Some("+00:00"))>
162                [
163                  2018-01-04T21:22:35.770+00:00,
164                  2019-04-12T20:29:15.770+00:00,
165                ],
166                snapshot_id: PrimitiveArray<Int64>
167                [
168                  3051729675574597004,
169                  3055729675574597004,
170                ],
171                parent_id: PrimitiveArray<Int64>
172                [
173                  null,
174                  3051729675574597004,
175                ],
176                operation: StringArray
177                [
178                  "append",
179                  "append",
180                ],
181                manifest_list: (skipped),
182                summary: MapArray
183                [
184                  StructArray
185                -- validity:
186                [
187                ]
188                [
189                -- child 0: "key" (Utf8)
190                StringArray
191                [
192                ]
193                -- child 1: "value" (Utf8)
194                StringArray
195                [
196                ]
197                ],
198                  StructArray
199                -- validity:
200                [
201                ]
202                [
203                -- child 0: "key" (Utf8)
204                StringArray
205                [
206                ]
207                -- child 1: "value" (Utf8)
208                StringArray
209                [
210                ]
211                ],
212                ]"#]],
213            &["manifest_list"],
214            Some("committed_at"),
215        );
216    }
217}