1use std::collections::HashMap;
19use std::sync::Arc;
20
21use arrow_array::RecordBatch;
22use arrow_array::builder::{MapBuilder, MapFieldNames, PrimitiveBuilder, StringBuilder};
23use arrow_array::types::{Int64Type, TimestampMicrosecondType};
24use arrow_schema::{DataType, Field};
25use futures::{StreamExt, stream};
26use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
27
28use crate::Result;
29use crate::arrow::{DEFAULT_MAP_FIELD_NAME, schema_to_arrow_schema};
30use crate::scan::ArrowRecordBatchStream;
31use crate::spec::{
32 MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, MapType, NestedField, PrimitiveType, Type,
33};
34use crate::table::Table;
35
36pub struct SnapshotsTable<'a> {
38 table: &'a Table,
39}
40
41impl<'a> SnapshotsTable<'a> {
42 pub fn new(table: &'a Table) -> Self {
44 Self { table }
45 }
46
47 pub fn schema(&self) -> crate::spec::Schema {
49 let fields = vec![
50 NestedField::required(
51 1,
52 "committed_at",
53 Type::Primitive(PrimitiveType::Timestamptz),
54 ),
55 NestedField::required(2, "snapshot_id", Type::Primitive(PrimitiveType::Long)),
56 NestedField::optional(3, "parent_id", Type::Primitive(PrimitiveType::Long)),
57 NestedField::optional(4, "operation", Type::Primitive(PrimitiveType::String)),
58 NestedField::optional(5, "manifest_list", Type::Primitive(PrimitiveType::String)),
59 NestedField::optional(
60 6,
61 "summary",
62 Type::Map(MapType {
63 key_field: Arc::new(NestedField::map_key_element(
64 7,
65 Type::Primitive(PrimitiveType::String),
66 )),
67 value_field: Arc::new(NestedField::map_value_element(
68 8,
69 Type::Primitive(PrimitiveType::String),
70 false,
71 )),
72 }),
73 ),
74 ];
75 crate::spec::Schema::builder()
76 .with_fields(fields.into_iter().map(|f| f.into()))
77 .build()
78 .unwrap()
79 }
80
81 pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
83 let schema = schema_to_arrow_schema(&self.schema())?;
84
85 let mut committed_at =
86 PrimitiveBuilder::<TimestampMicrosecondType>::new().with_timezone("+00:00");
87 let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
88 let mut parent_id = PrimitiveBuilder::<Int64Type>::new();
89 let mut operation = StringBuilder::new();
90 let mut manifest_list = StringBuilder::new();
91 let mut summary = MapBuilder::new(
92 Some(MapFieldNames {
93 entry: DEFAULT_MAP_FIELD_NAME.to_string(),
94 key: MAP_KEY_FIELD_NAME.to_string(),
95 value: MAP_VALUE_FIELD_NAME.to_string(),
96 }),
97 StringBuilder::new(),
98 StringBuilder::new(),
99 )
100 .with_keys_field(Arc::new(
101 Field::new(MAP_KEY_FIELD_NAME, DataType::Utf8, false).with_metadata(HashMap::from([(
102 PARQUET_FIELD_ID_META_KEY.to_string(),
103 "7".to_string(),
104 )])),
105 ))
106 .with_values_field(Arc::new(
107 Field::new(MAP_VALUE_FIELD_NAME, DataType::Utf8, true).with_metadata(HashMap::from([
108 (PARQUET_FIELD_ID_META_KEY.to_string(), "8".to_string()),
109 ])),
110 ));
111 for snapshot in self.table.metadata().snapshots() {
112 committed_at.append_value(snapshot.timestamp_ms() * 1000);
113 snapshot_id.append_value(snapshot.snapshot_id());
114 parent_id.append_option(snapshot.parent_snapshot_id());
115 manifest_list.append_value(snapshot.manifest_list());
116 operation.append_value(snapshot.summary().operation.as_str());
117 for (key, value) in &snapshot.summary().additional_properties {
118 summary.keys().append_value(key);
119 summary.values().append_value(value);
120 }
121 summary.append(true)?;
122 }
123
124 let batch = RecordBatch::try_new(Arc::new(schema), vec![
125 Arc::new(committed_at.finish()),
126 Arc::new(snapshot_id.finish()),
127 Arc::new(parent_id.finish()),
128 Arc::new(operation.finish()),
129 Arc::new(manifest_list.finish()),
130 Arc::new(summary.finish()),
131 ])?;
132
133 Ok(stream::iter(vec![Ok(batch)]).boxed())
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use expect_test::expect;
140 use futures::TryStreamExt;
141
142 use crate::scan::tests::TableTestFixture;
143 use crate::test_utils::check_record_batches;
144
145 #[tokio::test]
146 async fn test_snapshots_table() {
147 let table = TableTestFixture::new().table;
148
149 let batch_stream = table.inspect().snapshots().scan().await.unwrap();
150
151 check_record_batches(
152 batch_stream.try_collect::<Vec<_>>().await.unwrap(),
153 expect![[r#"
154 Field { name: "committed_at", data_type: Timestamp(Microsecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
155 Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} },
156 Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} },
157 Field { name: "operation", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} },
158 Field { name: "manifest_list", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5"} },
159 Field { name: "summary", data_type: Map(Field { name: "key_value", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "7"} }, Field { name: "value", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "8"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "6"} }"#]],
160 expect![[r#"
161 committed_at: PrimitiveArray<Timestamp(Microsecond, Some("+00:00"))>
162 [
163 2018-01-04T21:22:35.770+00:00,
164 2019-04-12T20:29:15.770+00:00,
165 ],
166 snapshot_id: PrimitiveArray<Int64>
167 [
168 3051729675574597004,
169 3055729675574597004,
170 ],
171 parent_id: PrimitiveArray<Int64>
172 [
173 null,
174 3051729675574597004,
175 ],
176 operation: StringArray
177 [
178 "append",
179 "append",
180 ],
181 manifest_list: (skipped),
182 summary: MapArray
183 [
184 StructArray
185 -- validity:
186 [
187 ]
188 [
189 -- child 0: "key" (Utf8)
190 StringArray
191 [
192 ]
193 -- child 1: "value" (Utf8)
194 StringArray
195 [
196 ]
197 ],
198 StructArray
199 -- validity:
200 [
201 ]
202 [
203 -- child 0: "key" (Utf8)
204 StringArray
205 [
206 ]
207 -- child 1: "value" (Utf8)
208 StringArray
209 [
210 ]
211 ],
212 ]"#]],
213 &["manifest_list"],
214 Some("committed_at"),
215 );
216 }
217}