use std::sync::Arc;
use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder};
use arrow_array::types::{Int64Type, TimestampMillisecondType};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use futures::{stream, StreamExt};
use crate::scan::ArrowRecordBatchStream;
use crate::table::Table;
use crate::Result;
pub struct SnapshotsTable<'a> {
table: &'a Table,
}
impl<'a> SnapshotsTable<'a> {
pub fn new(table: &'a Table) -> Self {
Self { table }
}
pub fn schema(&self) -> Schema {
Schema::new(vec![
Field::new(
"committed_at",
DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
false,
),
Field::new("snapshot_id", DataType::Int64, false),
Field::new("parent_id", DataType::Int64, true),
Field::new("operation", DataType::Utf8, false),
Field::new("manifest_list", DataType::Utf8, false),
Field::new(
"summary",
DataType::Map(
Arc::new(Field::new(
"entries",
DataType::Struct(
vec![
Field::new("keys", DataType::Utf8, false),
Field::new("values", DataType::Utf8, true),
]
.into(),
),
false,
)),
false,
),
false,
),
])
}
pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
let mut committed_at =
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
let mut parent_id = PrimitiveBuilder::<Int64Type>::new();
let mut operation = StringBuilder::new();
let mut manifest_list = StringBuilder::new();
let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new());
for snapshot in self.table.metadata().snapshots() {
committed_at.append_value(snapshot.timestamp_ms());
snapshot_id.append_value(snapshot.snapshot_id());
parent_id.append_option(snapshot.parent_snapshot_id());
manifest_list.append_value(snapshot.manifest_list());
operation.append_value(snapshot.summary().operation.as_str());
for (key, value) in &snapshot.summary().additional_properties {
summary.keys().append_value(key);
summary.values().append_value(value);
}
summary.append(true)?;
}
let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![
Arc::new(committed_at.finish()),
Arc::new(snapshot_id.finish()),
Arc::new(parent_id.finish()),
Arc::new(operation.finish()),
Arc::new(manifest_list.finish()),
Arc::new(summary.finish()),
])?;
Ok(stream::iter(vec![Ok(batch)]).boxed())
}
}
#[cfg(test)]
mod tests {
use expect_test::expect;
use crate::inspect::metadata_table::tests::check_record_batches;
use crate::scan::tests::TableTestFixture;
#[tokio::test]
async fn test_snapshots_table() {
let table = TableTestFixture::new().table;
let batch_stream = table.inspect().snapshots().scan().await.unwrap();
check_record_batches(
batch_stream,
expect![[r#"
Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "operation", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "manifest_list", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "summary", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]],
expect![[r#"
committed_at: PrimitiveArray<Timestamp(Millisecond, Some("+00:00"))>
[
2018-01-04T21:22:35.770+00:00,
2019-04-12T20:29:15.770+00:00,
],
snapshot_id: PrimitiveArray<Int64>
[
3051729675574597004,
3055729675574597004,
],
parent_id: PrimitiveArray<Int64>
[
null,
3051729675574597004,
],
operation: StringArray
[
"append",
"append",
],
manifest_list: (skipped),
summary: MapArray
[
StructArray
-- validity:
[
]
[
-- child 0: "keys" (Utf8)
StringArray
[
]
-- child 1: "values" (Utf8)
StringArray
[
]
],
StructArray
-- validity:
[
]
[
-- child 0: "keys" (Utf8)
StringArray
[
]
-- child 1: "values" (Utf8)
StringArray
[
]
],
]"#]],
&["manifest_list"],
Some("committed_at"),
).await;
}
}