use std::collections::HashMap;
use std::fmt::Debug;
use std::mem::take;
use std::ops::Deref;
use async_trait::async_trait;
use serde_derive::{Deserialize, Serialize};
use typed_builder::TypedBuilder;
use uuid::Uuid;
use crate::spec::{
FormatVersion, Schema, Snapshot, SnapshotReference, SortOrder, TableMetadataBuilder,
UnboundPartitionSpec, ViewRepresentations,
};
use crate::table::Table;
use crate::{Error, ErrorKind, Result};
#[async_trait]
pub trait Catalog: Debug + Sync + Send {
async fn list_namespaces(&self, parent: Option<&NamespaceIdent>)
-> Result<Vec<NamespaceIdent>>;
async fn create_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<Namespace>;
async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace>;
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool>;
async fn update_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<()>;
async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()>;
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>>;
async fn create_table(
&self,
namespace: &NamespaceIdent,
creation: TableCreation,
) -> Result<Table>;
async fn load_table(&self, table: &TableIdent) -> Result<Table>;
async fn drop_table(&self, table: &TableIdent) -> Result<()>;
async fn table_exists(&self, table: &TableIdent) -> Result<bool>;
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()>;
async fn update_table(&self, commit: TableCommit) -> Result<Table>;
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct NamespaceIdent(Vec<String>);
impl NamespaceIdent {
pub fn new(name: String) -> Self {
Self(vec![name])
}
pub fn from_vec(names: Vec<String>) -> Result<Self> {
if names.is_empty() {
return Err(Error::new(
ErrorKind::DataInvalid,
"Namespace identifier can't be empty!",
));
}
Ok(Self(names))
}
pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
Self::from_vec(iter.into_iter().map(|s| s.to_string()).collect())
}
pub fn to_url_string(&self) -> String {
self.as_ref().join("\u{001f}")
}
pub fn inner(self) -> Vec<String> {
self.0
}
}
impl AsRef<Vec<String>> for NamespaceIdent {
fn as_ref(&self) -> &Vec<String> {
&self.0
}
}
impl Deref for NamespaceIdent {
type Target = [String];
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Namespace {
name: NamespaceIdent,
properties: HashMap<String, String>,
}
impl Namespace {
pub fn new(name: NamespaceIdent) -> Self {
Self::with_properties(name, HashMap::default())
}
pub fn with_properties(name: NamespaceIdent, properties: HashMap<String, String>) -> Self {
Self { name, properties }
}
pub fn name(&self) -> &NamespaceIdent {
&self.name
}
pub fn properties(&self) -> &HashMap<String, String> {
&self.properties
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
pub struct TableIdent {
pub namespace: NamespaceIdent,
pub name: String,
}
impl TableIdent {
pub fn new(namespace: NamespaceIdent, name: String) -> Self {
Self { namespace, name }
}
pub fn namespace(&self) -> &NamespaceIdent {
&self.namespace
}
pub fn name(&self) -> &str {
&self.name
}
pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
let mut vec: Vec<String> = iter.into_iter().map(|s| s.to_string()).collect();
let table_name = vec.pop().ok_or_else(|| {
Error::new(ErrorKind::DataInvalid, "Table identifier can't be empty!")
})?;
let namespace_ident = NamespaceIdent::from_vec(vec)?;
Ok(Self {
namespace: namespace_ident,
name: table_name,
})
}
}
#[derive(Debug, TypedBuilder)]
pub struct TableCreation {
pub name: String,
#[builder(default, setter(strip_option))]
pub location: Option<String>,
pub schema: Schema,
#[builder(default, setter(strip_option, into))]
pub partition_spec: Option<UnboundPartitionSpec>,
#[builder(default, setter(strip_option))]
pub sort_order: Option<SortOrder>,
#[builder(default)]
pub properties: HashMap<String, String>,
}
#[derive(Debug, TypedBuilder)]
#[builder(build_method(vis = "pub(crate)"))]
pub struct TableCommit {
ident: TableIdent,
requirements: Vec<TableRequirement>,
updates: Vec<TableUpdate>,
}
impl TableCommit {
pub fn identifier(&self) -> &TableIdent {
&self.ident
}
pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
take(&mut self.requirements)
}
pub fn take_updates(&mut self) -> Vec<TableUpdate> {
take(&mut self.updates)
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
pub enum TableRequirement {
#[serde(rename = "assert-create")]
NotExist,
#[serde(rename = "assert-table-uuid")]
UuidMatch {
uuid: Uuid,
},
#[serde(rename = "assert-ref-snapshot-id")]
RefSnapshotIdMatch {
r#ref: String,
#[serde(rename = "snapshot-id")]
snapshot_id: Option<i64>,
},
#[serde(rename = "assert-last-assigned-field-id")]
LastAssignedFieldIdMatch {
#[serde(rename = "last-assigned-field-id")]
last_assigned_field_id: i64,
},
#[serde(rename = "assert-current-schema-id")]
CurrentSchemaIdMatch {
#[serde(rename = "current-schema-id")]
current_schema_id: i64,
},
#[serde(rename = "assert-last-assigned-partition-id")]
LastAssignedPartitionIdMatch {
#[serde(rename = "last-assigned-partition-id")]
last_assigned_partition_id: i64,
},
#[serde(rename = "assert-default-spec-id")]
DefaultSpecIdMatch {
#[serde(rename = "default-spec-id")]
default_spec_id: i64,
},
#[serde(rename = "assert-default-sort-order-id")]
DefaultSortOrderIdMatch {
#[serde(rename = "default-sort-order-id")]
default_sort_order_id: i64,
},
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
#[serde(tag = "action", rename_all = "kebab-case")]
pub enum TableUpdate {
#[serde(rename_all = "kebab-case")]
UpgradeFormatVersion {
format_version: FormatVersion,
},
#[serde(rename_all = "kebab-case")]
AssignUuid {
uuid: Uuid,
},
#[serde(rename_all = "kebab-case")]
AddSchema {
schema: Schema,
last_column_id: Option<i32>,
},
#[serde(rename_all = "kebab-case")]
SetCurrentSchema {
schema_id: i32,
},
AddSpec {
spec: UnboundPartitionSpec,
},
#[serde(rename_all = "kebab-case")]
SetDefaultSpec {
spec_id: i32,
},
#[serde(rename_all = "kebab-case")]
AddSortOrder {
sort_order: SortOrder,
},
#[serde(rename_all = "kebab-case")]
SetDefaultSortOrder {
sort_order_id: i64,
},
#[serde(rename_all = "kebab-case")]
AddSnapshot {
snapshot: Snapshot,
},
#[serde(rename_all = "kebab-case")]
SetSnapshotRef {
ref_name: String,
#[serde(flatten)]
reference: SnapshotReference,
},
#[serde(rename_all = "kebab-case")]
RemoveSnapshots {
snapshot_ids: Vec<i64>,
},
#[serde(rename_all = "kebab-case")]
RemoveSnapshotRef {
ref_name: String,
},
SetLocation {
location: String,
},
SetProperties {
updates: HashMap<String, String>,
},
RemoveProperties {
removals: Vec<String>,
},
}
impl TableUpdate {
pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
match self {
TableUpdate::AssignUuid { uuid } => builder.assign_uuid(uuid),
_ => unimplemented!(),
}
}
}
#[derive(Debug, TypedBuilder)]
pub struct ViewCreation {
pub name: String,
pub location: String,
pub representations: ViewRepresentations,
pub schema: Schema,
#[builder(default)]
pub properties: HashMap<String, String>,
pub default_namespace: NamespaceIdent,
#[builder(default)]
pub default_catalog: Option<String>,
#[builder(default)]
pub summary: HashMap<String, String>,
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::fmt::Debug;
use serde::de::DeserializeOwned;
use serde::Serialize;
use uuid::uuid;
use crate::spec::{
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary,
TableMetadataBuilder, Transform, Type, UnboundPartitionSpec,
};
use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement, TableUpdate};
#[test]
fn test_create_table_id() {
let table_id = TableIdent {
namespace: NamespaceIdent::from_strs(vec!["ns1"]).unwrap(),
name: "t1".to_string(),
};
assert_eq!(table_id, TableIdent::from_strs(vec!["ns1", "t1"]).unwrap());
}
fn test_serde_json<T: Serialize + DeserializeOwned + PartialEq + Debug>(
json: impl ToString,
expected: T,
) {
let json_str = json.to_string();
let actual: T = serde_json::from_str(&json_str).expect("Failed to parse from json");
assert_eq!(actual, expected, "Parsed value is not equal to expected");
let restored: T = serde_json::from_str(
&serde_json::to_string(&actual).expect("Failed to serialize to json"),
)
.expect("Failed to parse from serialized json");
assert_eq!(
restored, expected,
"Parsed restored value is not equal to expected"
);
}
#[test]
fn test_table_uuid() {
test_serde_json(
r#"
{
"type": "assert-table-uuid",
"uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
}
"#,
TableRequirement::UuidMatch {
uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
},
);
}
#[test]
fn test_assert_table_not_exists() {
test_serde_json(
r#"
{
"type": "assert-create"
}
"#,
TableRequirement::NotExist,
);
}
#[test]
fn test_assert_ref_snapshot_id() {
test_serde_json(
r#"
{
"type": "assert-ref-snapshot-id",
"ref": "snapshot-name",
"snapshot-id": null
}
"#,
TableRequirement::RefSnapshotIdMatch {
r#ref: "snapshot-name".to_string(),
snapshot_id: None,
},
);
test_serde_json(
r#"
{
"type": "assert-ref-snapshot-id",
"ref": "snapshot-name",
"snapshot-id": 1
}
"#,
TableRequirement::RefSnapshotIdMatch {
r#ref: "snapshot-name".to_string(),
snapshot_id: Some(1),
},
);
}
#[test]
fn test_assert_last_assigned_field_id() {
test_serde_json(
r#"
{
"type": "assert-last-assigned-field-id",
"last-assigned-field-id": 12
}
"#,
TableRequirement::LastAssignedFieldIdMatch {
last_assigned_field_id: 12,
},
);
}
#[test]
fn test_assert_current_schema_id() {
test_serde_json(
r#"
{
"type": "assert-current-schema-id",
"current-schema-id": 4
}
"#,
TableRequirement::CurrentSchemaIdMatch {
current_schema_id: 4,
},
);
}
#[test]
fn test_assert_last_assigned_partition_id() {
test_serde_json(
r#"
{
"type": "assert-last-assigned-partition-id",
"last-assigned-partition-id": 1004
}
"#,
TableRequirement::LastAssignedPartitionIdMatch {
last_assigned_partition_id: 1004,
},
);
}
#[test]
fn test_assert_default_spec_id() {
test_serde_json(
r#"
{
"type": "assert-default-spec-id",
"default-spec-id": 5
}
"#,
TableRequirement::DefaultSpecIdMatch { default_spec_id: 5 },
);
}
#[test]
fn test_assert_default_sort_order() {
let json = r#"
{
"type": "assert-default-sort-order-id",
"default-sort-order-id": 10
}
"#;
let update = TableRequirement::DefaultSortOrderIdMatch {
default_sort_order_id: 10,
};
test_serde_json(json, update);
}
#[test]
fn test_parse_assert_invalid() {
assert!(
serde_json::from_str::<TableRequirement>(
r#"
{
"default-sort-order-id": 10
}
"#
)
.is_err(),
"Table requirements should not be parsed without type."
);
}
#[test]
fn test_assign_uuid() {
test_serde_json(
r#"
{
"action": "assign-uuid",
"uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
}
"#,
TableUpdate::AssignUuid {
uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
},
);
}
#[test]
fn test_upgrade_format_version() {
test_serde_json(
r#"
{
"action": "upgrade-format-version",
"format-version": 2
}
"#,
TableUpdate::UpgradeFormatVersion {
format_version: FormatVersion::V2,
},
);
}
#[test]
fn test_add_schema() {
let test_schema = Schema::builder()
.with_schema_id(1)
.with_identifier_field_ids(vec![2])
.with_fields(vec![
NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
])
.build()
.unwrap();
test_serde_json(
r#"
{
"action": "add-schema",
"schema": {
"type": "struct",
"schema-id": 1,
"fields": [
{
"id": 1,
"name": "foo",
"required": false,
"type": "string"
},
{
"id": 2,
"name": "bar",
"required": true,
"type": "int"
},
{
"id": 3,
"name": "baz",
"required": false,
"type": "boolean"
}
],
"identifier-field-ids": [
2
]
},
"last-column-id": 3
}
"#,
TableUpdate::AddSchema {
schema: test_schema.clone(),
last_column_id: Some(3),
},
);
test_serde_json(
r#"
{
"action": "add-schema",
"schema": {
"type": "struct",
"schema-id": 1,
"fields": [
{
"id": 1,
"name": "foo",
"required": false,
"type": "string"
},
{
"id": 2,
"name": "bar",
"required": true,
"type": "int"
},
{
"id": 3,
"name": "baz",
"required": false,
"type": "boolean"
}
],
"identifier-field-ids": [
2
]
}
}
"#,
TableUpdate::AddSchema {
schema: test_schema.clone(),
last_column_id: None,
},
);
}
#[test]
fn test_set_current_schema() {
test_serde_json(
r#"
{
"action": "set-current-schema",
"schema-id": 23
}
"#,
TableUpdate::SetCurrentSchema { schema_id: 23 },
);
}
#[test]
fn test_add_spec() {
test_serde_json(
r#"
{
"action": "add-spec",
"spec": {
"fields": [
{
"source-id": 4,
"name": "ts_day",
"transform": "day"
},
{
"source-id": 1,
"name": "id_bucket",
"transform": "bucket[16]"
},
{
"source-id": 2,
"name": "id_truncate",
"transform": "truncate[4]"
}
]
}
}
"#,
TableUpdate::AddSpec {
spec: UnboundPartitionSpec::builder()
.add_partition_field(4, "ts_day".to_string(), Transform::Day)
.unwrap()
.add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16))
.unwrap()
.add_partition_field(2, "id_truncate".to_string(), Transform::Truncate(4))
.unwrap()
.build(),
},
);
}
#[test]
fn test_set_default_spec() {
test_serde_json(
r#"
{
"action": "set-default-spec",
"spec-id": 1
}
"#,
TableUpdate::SetDefaultSpec { spec_id: 1 },
)
}
#[test]
fn test_add_sort_order() {
let json = r#"
{
"action": "add-sort-order",
"sort-order": {
"order-id": 1,
"fields": [
{
"transform": "identity",
"source-id": 2,
"direction": "asc",
"null-order": "nulls-first"
},
{
"transform": "bucket[4]",
"source-id": 3,
"direction": "desc",
"null-order": "nulls-last"
}
]
}
}
"#;
let update = TableUpdate::AddSortOrder {
sort_order: SortOrder::builder()
.with_order_id(1)
.with_sort_field(
SortField::builder()
.source_id(2)
.direction(SortDirection::Ascending)
.null_order(NullOrder::First)
.transform(Transform::Identity)
.build(),
)
.with_sort_field(
SortField::builder()
.source_id(3)
.direction(SortDirection::Descending)
.null_order(NullOrder::Last)
.transform(Transform::Bucket(4))
.build(),
)
.build_unbound()
.unwrap(),
};
test_serde_json(json, update);
}
#[test]
fn test_set_default_order() {
let json = r#"
{
"action": "set-default-sort-order",
"sort-order-id": 2
}
"#;
let update = TableUpdate::SetDefaultSortOrder { sort_order_id: 2 };
test_serde_json(json, update);
}
#[test]
fn test_add_snapshot() {
let json = r#"
{
"action": "add-snapshot",
"snapshot": {
"snapshot-id": 3055729675574597000,
"parent-snapshot-id": 3051729675574597000,
"timestamp-ms": 1555100955770,
"sequence-number": 1,
"summary": {
"operation": "append"
},
"manifest-list": "s3://a/b/2.avro",
"schema-id": 1
}
}
"#;
let update = TableUpdate::AddSnapshot {
snapshot: Snapshot::builder()
.with_snapshot_id(3055729675574597000)
.with_parent_snapshot_id(Some(3051729675574597000))
.with_timestamp_ms(1555100955770)
.with_sequence_number(1)
.with_manifest_list("s3://a/b/2.avro")
.with_schema_id(1)
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::default(),
})
.build(),
};
test_serde_json(json, update);
}
#[test]
fn test_remove_snapshots() {
let json = r#"
{
"action": "remove-snapshots",
"snapshot-ids": [
1,
2
]
}
"#;
let update = TableUpdate::RemoveSnapshots {
snapshot_ids: vec![1, 2],
};
test_serde_json(json, update);
}
#[test]
fn test_remove_snapshot_ref() {
let json = r#"
{
"action": "remove-snapshot-ref",
"ref-name": "snapshot-ref"
}
"#;
let update = TableUpdate::RemoveSnapshotRef {
ref_name: "snapshot-ref".to_string(),
};
test_serde_json(json, update);
}
#[test]
fn test_set_snapshot_ref_tag() {
let json = r#"
{
"action": "set-snapshot-ref",
"type": "tag",
"ref-name": "hank",
"snapshot-id": 1,
"max-ref-age-ms": 1
}
"#;
let update = TableUpdate::SetSnapshotRef {
ref_name: "hank".to_string(),
reference: SnapshotReference {
snapshot_id: 1,
retention: SnapshotRetention::Tag {
max_ref_age_ms: Some(1),
},
},
};
test_serde_json(json, update);
}
#[test]
fn test_set_snapshot_ref_branch() {
let json = r#"
{
"action": "set-snapshot-ref",
"type": "branch",
"ref-name": "hank",
"snapshot-id": 1,
"min-snapshots-to-keep": 2,
"max-snapshot-age-ms": 3,
"max-ref-age-ms": 4
}
"#;
let update = TableUpdate::SetSnapshotRef {
ref_name: "hank".to_string(),
reference: SnapshotReference {
snapshot_id: 1,
retention: SnapshotRetention::Branch {
min_snapshots_to_keep: Some(2),
max_snapshot_age_ms: Some(3),
max_ref_age_ms: Some(4),
},
},
};
test_serde_json(json, update);
}
#[test]
fn test_set_properties() {
let json = r#"
{
"action": "set-properties",
"updates": {
"prop1": "v1",
"prop2": "v2"
}
}
"#;
let update = TableUpdate::SetProperties {
updates: vec![
("prop1".to_string(), "v1".to_string()),
("prop2".to_string(), "v2".to_string()),
]
.into_iter()
.collect(),
};
test_serde_json(json, update);
}
#[test]
fn test_remove_properties() {
let json = r#"
{
"action": "remove-properties",
"removals": [
"prop1",
"prop2"
]
}
"#;
let update = TableUpdate::RemoveProperties {
removals: vec!["prop1".to_string(), "prop2".to_string()],
};
test_serde_json(json, update);
}
#[test]
fn test_set_location() {
let json = r#"
{
"action": "set-location",
"location": "s3://bucket/warehouse/tbl_location"
}
"#;
let update = TableUpdate::SetLocation {
location: "s3://bucket/warehouse/tbl_location".to_string(),
};
test_serde_json(json, update);
}
#[test]
fn test_table_update_apply() {
let table_creation = TableCreation::builder()
.location("s3://db/table".to_string())
.name("table".to_string())
.properties(HashMap::new())
.schema(Schema::builder().build().unwrap())
.build();
let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
.unwrap()
.build()
.unwrap();
let table_metadata_builder = TableMetadataBuilder::new(table_metadata);
let uuid = uuid::Uuid::new_v4();
let update = TableUpdate::AssignUuid { uuid };
let updated_metadata = update
.apply(table_metadata_builder)
.unwrap()
.build()
.unwrap();
assert_eq!(updated_metadata.uuid(), uuid);
}
}