use std::collections::HashMap;
use std::fmt::Debug;
use std::mem::take;
use std::ops::Deref;
use _serde::deserialize_snapshot;
use async_trait::async_trait;
use serde_derive::{Deserialize, Serialize};
use typed_builder::TypedBuilder;
use uuid::Uuid;
use crate::spec::{
FormatVersion, Schema, SchemaId, Snapshot, SnapshotReference, SortOrder, TableMetadata,
TableMetadataBuilder, UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations,
ViewVersion,
};
use crate::table::Table;
use crate::{Error, ErrorKind, Result};
#[async_trait]
pub trait Catalog: Debug + Sync + Send {
async fn list_namespaces(&self, parent: Option<&NamespaceIdent>)
-> Result<Vec<NamespaceIdent>>;
async fn create_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<Namespace>;
async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace>;
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool>;
async fn update_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<()>;
async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()>;
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>>;
async fn create_table(
&self,
namespace: &NamespaceIdent,
creation: TableCreation,
) -> Result<Table>;
async fn load_table(&self, table: &TableIdent) -> Result<Table>;
async fn drop_table(&self, table: &TableIdent) -> Result<()>;
async fn table_exists(&self, table: &TableIdent) -> Result<bool>;
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()>;
async fn update_table(&self, commit: TableCommit) -> Result<Table>;
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct NamespaceIdent(Vec<String>);
impl NamespaceIdent {
pub fn new(name: String) -> Self {
Self(vec![name])
}
pub fn from_vec(names: Vec<String>) -> Result<Self> {
if names.is_empty() {
return Err(Error::new(
ErrorKind::DataInvalid,
"Namespace identifier can't be empty!",
));
}
Ok(Self(names))
}
pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
Self::from_vec(iter.into_iter().map(|s| s.to_string()).collect())
}
pub fn to_url_string(&self) -> String {
self.as_ref().join("\u{001f}")
}
pub fn inner(self) -> Vec<String> {
self.0
}
pub fn parent(&self) -> Option<Self> {
self.0.split_last().and_then(|(_, parent)| {
if parent.is_empty() {
None
} else {
Some(Self(parent.to_vec()))
}
})
}
}
impl AsRef<Vec<String>> for NamespaceIdent {
fn as_ref(&self) -> &Vec<String> {
&self.0
}
}
impl Deref for NamespaceIdent {
type Target = [String];
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Namespace {
name: NamespaceIdent,
properties: HashMap<String, String>,
}
impl Namespace {
pub fn new(name: NamespaceIdent) -> Self {
Self::with_properties(name, HashMap::default())
}
pub fn with_properties(name: NamespaceIdent, properties: HashMap<String, String>) -> Self {
Self { name, properties }
}
pub fn name(&self) -> &NamespaceIdent {
&self.name
}
pub fn properties(&self) -> &HashMap<String, String> {
&self.properties
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
pub struct TableIdent {
pub namespace: NamespaceIdent,
pub name: String,
}
impl TableIdent {
pub fn new(namespace: NamespaceIdent, name: String) -> Self {
Self { namespace, name }
}
pub fn namespace(&self) -> &NamespaceIdent {
&self.namespace
}
pub fn name(&self) -> &str {
&self.name
}
pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
let mut vec: Vec<String> = iter.into_iter().map(|s| s.to_string()).collect();
let table_name = vec.pop().ok_or_else(|| {
Error::new(ErrorKind::DataInvalid, "Table identifier can't be empty!")
})?;
let namespace_ident = NamespaceIdent::from_vec(vec)?;
Ok(Self {
namespace: namespace_ident,
name: table_name,
})
}
}
#[derive(Debug, TypedBuilder)]
pub struct TableCreation {
pub name: String,
#[builder(default, setter(strip_option))]
pub location: Option<String>,
pub schema: Schema,
#[builder(default, setter(strip_option, into))]
pub partition_spec: Option<UnboundPartitionSpec>,
#[builder(default, setter(strip_option))]
pub sort_order: Option<SortOrder>,
#[builder(default)]
pub properties: HashMap<String, String>,
}
#[derive(Debug, TypedBuilder)]
#[builder(build_method(vis = "pub(crate)"))]
pub struct TableCommit {
ident: TableIdent,
requirements: Vec<TableRequirement>,
updates: Vec<TableUpdate>,
}
impl TableCommit {
pub fn identifier(&self) -> &TableIdent {
&self.ident
}
pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
take(&mut self.requirements)
}
pub fn take_updates(&mut self) -> Vec<TableUpdate> {
take(&mut self.updates)
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
pub enum TableRequirement {
#[serde(rename = "assert-create")]
NotExist,
#[serde(rename = "assert-table-uuid")]
UuidMatch {
uuid: Uuid,
},
#[serde(rename = "assert-ref-snapshot-id")]
RefSnapshotIdMatch {
r#ref: String,
#[serde(rename = "snapshot-id")]
snapshot_id: Option<i64>,
},
#[serde(rename = "assert-last-assigned-field-id")]
LastAssignedFieldIdMatch {
#[serde(rename = "last-assigned-field-id")]
last_assigned_field_id: i32,
},
#[serde(rename = "assert-current-schema-id")]
CurrentSchemaIdMatch {
#[serde(rename = "current-schema-id")]
current_schema_id: SchemaId,
},
#[serde(rename = "assert-last-assigned-partition-id")]
LastAssignedPartitionIdMatch {
#[serde(rename = "last-assigned-partition-id")]
last_assigned_partition_id: i32,
},
#[serde(rename = "assert-default-spec-id")]
DefaultSpecIdMatch {
#[serde(rename = "default-spec-id")]
default_spec_id: i32,
},
#[serde(rename = "assert-default-sort-order-id")]
DefaultSortOrderIdMatch {
#[serde(rename = "default-sort-order-id")]
default_sort_order_id: i64,
},
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
#[serde(tag = "action", rename_all = "kebab-case")]
pub enum TableUpdate {
#[serde(rename_all = "kebab-case")]
UpgradeFormatVersion {
format_version: FormatVersion,
},
#[serde(rename_all = "kebab-case")]
AssignUuid {
uuid: Uuid,
},
#[serde(rename_all = "kebab-case")]
AddSchema {
schema: Schema,
last_column_id: Option<i32>,
},
#[serde(rename_all = "kebab-case")]
SetCurrentSchema {
schema_id: i32,
},
AddSpec {
spec: UnboundPartitionSpec,
},
#[serde(rename_all = "kebab-case")]
SetDefaultSpec {
spec_id: i32,
},
#[serde(rename_all = "kebab-case")]
AddSortOrder {
sort_order: SortOrder,
},
#[serde(rename_all = "kebab-case")]
SetDefaultSortOrder {
sort_order_id: i64,
},
#[serde(rename_all = "kebab-case")]
AddSnapshot {
#[serde(deserialize_with = "deserialize_snapshot")]
snapshot: Snapshot,
},
#[serde(rename_all = "kebab-case")]
SetSnapshotRef {
ref_name: String,
#[serde(flatten)]
reference: SnapshotReference,
},
#[serde(rename_all = "kebab-case")]
RemoveSnapshots {
snapshot_ids: Vec<i64>,
},
#[serde(rename_all = "kebab-case")]
RemoveSnapshotRef {
ref_name: String,
},
SetLocation {
location: String,
},
SetProperties {
updates: HashMap<String, String>,
},
RemoveProperties {
removals: Vec<String>,
},
}
impl TableUpdate {
pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
match self {
TableUpdate::AssignUuid { uuid } => builder.assign_uuid(uuid),
_ => unimplemented!(),
}
}
}
impl TableRequirement {
pub fn check(&self, metadata: Option<&TableMetadata>) -> Result<()> {
if let Some(metadata) = metadata {
match self {
TableRequirement::NotExist => {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Requirement failed: Table with id {} already exists",
metadata.uuid()
),
));
}
TableRequirement::UuidMatch { uuid } => {
if &metadata.uuid() != uuid {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: Table UUID does not match",
)
.with_context("expected", *uuid)
.with_context("found", metadata.uuid()));
}
}
TableRequirement::CurrentSchemaIdMatch { current_schema_id } => {
if metadata.current_schema_id != *current_schema_id {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: Current schema id does not match",
)
.with_context("expected", current_schema_id.to_string())
.with_context("found", metadata.current_schema_id.to_string()));
}
}
TableRequirement::DefaultSortOrderIdMatch {
default_sort_order_id,
} => {
if metadata.default_sort_order().order_id != *default_sort_order_id {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: Default sort order id does not match",
)
.with_context("expected", default_sort_order_id.to_string())
.with_context(
"found",
metadata.default_sort_order().order_id.to_string(),
));
}
}
TableRequirement::RefSnapshotIdMatch { r#ref, snapshot_id } => {
let snapshot_ref = metadata.snapshot_for_ref(r#ref);
if let Some(snapshot_id) = snapshot_id {
let snapshot_ref = snapshot_ref.ok_or(Error::new(
ErrorKind::DataInvalid,
format!("Requirement failed: Branch or tag `{}` not found", r#ref),
))?;
if snapshot_ref.snapshot_id() != *snapshot_id {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Requirement failed: Branch or tag `{}`'s snapshot has changed",
r#ref
),
)
.with_context("expected", snapshot_id.to_string())
.with_context("found", snapshot_ref.snapshot_id().to_string()));
}
} else if snapshot_ref.is_some() {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Requirement failed: Branch or tag `{}` already exists",
r#ref
),
));
}
}
TableRequirement::DefaultSpecIdMatch { default_spec_id } => {
if metadata.default_partition_spec_id() != *default_spec_id {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: Default partition spec id does not match",
)
.with_context("expected", default_spec_id.to_string())
.with_context("found", metadata.default_partition_spec_id().to_string()));
}
}
TableRequirement::LastAssignedPartitionIdMatch {
last_assigned_partition_id,
} => {
if metadata.last_partition_id != *last_assigned_partition_id {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: Last assigned partition id does not match",
)
.with_context("expected", last_assigned_partition_id.to_string())
.with_context("found", metadata.last_partition_id.to_string()));
}
}
TableRequirement::LastAssignedFieldIdMatch {
last_assigned_field_id,
} => {
if &metadata.last_column_id != last_assigned_field_id {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: Last assigned field id does not match",
)
.with_context("expected", last_assigned_field_id.to_string())
.with_context("found", metadata.last_column_id.to_string()));
}
}
};
} else {
match self {
TableRequirement::NotExist => {}
_ => {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: Table does not exist",
));
}
}
}
Ok(())
}
}
pub(super) mod _serde {
use serde::{Deserialize as _, Deserializer};
use super::*;
use crate::spec::{SchemaId, Summary};
pub(super) fn deserialize_snapshot<'de, D>(
deserializer: D,
) -> std::result::Result<Snapshot, D::Error>
where D: Deserializer<'de> {
let buf = CatalogSnapshot::deserialize(deserializer)?;
Ok(buf.into())
}
#[derive(Debug, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
struct CatalogSnapshot {
snapshot_id: i64,
#[serde(skip_serializing_if = "Option::is_none")]
parent_snapshot_id: Option<i64>,
#[serde(default)]
sequence_number: i64,
timestamp_ms: i64,
manifest_list: String,
summary: Summary,
#[serde(skip_serializing_if = "Option::is_none")]
schema_id: Option<SchemaId>,
}
impl From<CatalogSnapshot> for Snapshot {
fn from(snapshot: CatalogSnapshot) -> Self {
let CatalogSnapshot {
snapshot_id,
parent_snapshot_id,
sequence_number,
timestamp_ms,
manifest_list,
schema_id,
summary,
} = snapshot;
let builder = Snapshot::builder()
.with_snapshot_id(snapshot_id)
.with_parent_snapshot_id(parent_snapshot_id)
.with_sequence_number(sequence_number)
.with_timestamp_ms(timestamp_ms)
.with_manifest_list(manifest_list)
.with_summary(summary);
if let Some(schema_id) = schema_id {
builder.with_schema_id(schema_id).build()
} else {
builder.build()
}
}
}
}
#[derive(Debug, TypedBuilder)]
pub struct ViewCreation {
pub name: String,
pub location: String,
pub representations: ViewRepresentations,
pub schema: Schema,
#[builder(default)]
pub properties: HashMap<String, String>,
pub default_namespace: NamespaceIdent,
#[builder(default)]
pub default_catalog: Option<String>,
#[builder(default)]
pub summary: HashMap<String, String>,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(tag = "action", rename_all = "kebab-case")]
pub enum ViewUpdate {
#[serde(rename_all = "kebab-case")]
AssignUuid {
uuid: uuid::Uuid,
},
#[serde(rename_all = "kebab-case")]
UpgradeFormatVersion {
format_version: ViewFormatVersion,
},
#[serde(rename_all = "kebab-case")]
AddSchema {
schema: Schema,
last_column_id: Option<i32>,
},
#[serde(rename_all = "kebab-case")]
SetLocation {
location: String,
},
#[serde(rename_all = "kebab-case")]
SetProperties {
updates: HashMap<String, String>,
},
#[serde(rename_all = "kebab-case")]
RemoveProperties {
removals: Vec<String>,
},
#[serde(rename_all = "kebab-case")]
AddViewVersion {
view_version: ViewVersion,
},
#[serde(rename_all = "kebab-case")]
SetCurrentViewVersion {
view_version_id: i32,
},
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::fmt::Debug;
use serde::de::DeserializeOwned;
use serde::Serialize;
use uuid::uuid;
use super::ViewUpdate;
use crate::spec::{
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder,
SqlViewRepresentation, Summary, TableMetadata, TableMetadataBuilder, Transform, Type,
UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations,
ViewVersion,
};
use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement, TableUpdate};
#[test]
fn test_parent_namespace() {
let ns1 = NamespaceIdent::from_strs(vec!["ns1"]).unwrap();
let ns2 = NamespaceIdent::from_strs(vec!["ns1", "ns2"]).unwrap();
let ns3 = NamespaceIdent::from_strs(vec!["ns1", "ns2", "ns3"]).unwrap();
assert_eq!(ns1.parent(), None);
assert_eq!(ns2.parent(), Some(ns1.clone()));
assert_eq!(ns3.parent(), Some(ns2.clone()));
}
#[test]
fn test_create_table_id() {
let table_id = TableIdent {
namespace: NamespaceIdent::from_strs(vec!["ns1"]).unwrap(),
name: "t1".to_string(),
};
assert_eq!(table_id, TableIdent::from_strs(vec!["ns1", "t1"]).unwrap());
}
fn test_serde_json<T: Serialize + DeserializeOwned + PartialEq + Debug>(
json: impl ToString,
expected: T,
) {
let json_str = json.to_string();
let actual: T = serde_json::from_str(&json_str).expect("Failed to parse from json");
assert_eq!(actual, expected, "Parsed value is not equal to expected");
let restored: T = serde_json::from_str(
&serde_json::to_string(&actual).expect("Failed to serialize to json"),
)
.expect("Failed to parse from serialized json");
assert_eq!(
restored, expected,
"Parsed restored value is not equal to expected"
);
}
fn metadata() -> TableMetadata {
let tbl_creation = TableCreation::builder()
.name("table".to_string())
.location("/path/to/table".to_string())
.schema(Schema::builder().build().unwrap())
.build();
TableMetadataBuilder::from_table_creation(tbl_creation)
.unwrap()
.assign_uuid(uuid::Uuid::nil())
.unwrap()
.build()
.unwrap()
}
#[test]
fn test_check_requirement_not_exist() {
let metadata = metadata();
let requirement = TableRequirement::NotExist;
assert!(requirement.check(Some(&metadata)).is_err());
assert!(requirement.check(None).is_ok());
}
#[test]
fn test_check_table_uuid() {
let metadata = metadata();
let requirement = TableRequirement::UuidMatch {
uuid: uuid::Uuid::now_v7(),
};
assert!(requirement.check(Some(&metadata)).is_err());
let requirement = TableRequirement::UuidMatch {
uuid: uuid::Uuid::nil(),
};
assert!(requirement.check(Some(&metadata)).is_ok());
}
#[test]
fn test_check_ref_snapshot_id() {
let metadata = metadata();
let requirement = TableRequirement::RefSnapshotIdMatch {
r#ref: "my_branch".to_string(),
snapshot_id: Some(1),
};
assert!(requirement.check(Some(&metadata)).is_err());
let requirement = TableRequirement::RefSnapshotIdMatch {
r#ref: "my_branch".to_string(),
snapshot_id: None,
};
assert!(requirement.check(Some(&metadata)).is_ok());
let record = r#"
{
"snapshot-id": 3051729675574597004,
"sequence-number": 10,
"timestamp-ms": 1515100955770,
"summary": {
"operation": "append"
},
"manifest-list": "s3://b/wh/.../s1.avro",
"schema-id": 0
}
"#;
let snapshot = serde_json::from_str::<Snapshot>(record).unwrap();
let mut metadata = metadata;
metadata.append_snapshot(snapshot);
let requirement = TableRequirement::RefSnapshotIdMatch {
r#ref: "main".to_string(),
snapshot_id: Some(3051729675574597004),
};
assert!(requirement.check(Some(&metadata)).is_ok());
let requirement = TableRequirement::RefSnapshotIdMatch {
r#ref: "main".to_string(),
snapshot_id: Some(1),
};
assert!(requirement.check(Some(&metadata)).is_err());
}
#[test]
fn test_check_last_assigned_field_id() {
let metadata = metadata();
let requirement = TableRequirement::LastAssignedFieldIdMatch {
last_assigned_field_id: 1,
};
assert!(requirement.check(Some(&metadata)).is_err());
let requirement = TableRequirement::LastAssignedFieldIdMatch {
last_assigned_field_id: 0,
};
assert!(requirement.check(Some(&metadata)).is_ok());
}
#[test]
fn test_check_current_schema_id() {
let metadata = metadata();
let requirement = TableRequirement::CurrentSchemaIdMatch {
current_schema_id: 1,
};
assert!(requirement.check(Some(&metadata)).is_err());
let requirement = TableRequirement::CurrentSchemaIdMatch {
current_schema_id: 0,
};
assert!(requirement.check(Some(&metadata)).is_ok());
}
#[test]
fn test_check_last_assigned_partition_id() {
let metadata = metadata();
let requirement = TableRequirement::LastAssignedPartitionIdMatch {
last_assigned_partition_id: 1,
};
assert!(requirement.check(Some(&metadata)).is_err());
let requirement = TableRequirement::LastAssignedPartitionIdMatch {
last_assigned_partition_id: 0,
};
assert!(requirement.check(Some(&metadata)).is_ok());
}
#[test]
fn test_check_default_spec_id() {
let metadata = metadata();
let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 1 };
assert!(requirement.check(Some(&metadata)).is_err());
let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 0 };
assert!(requirement.check(Some(&metadata)).is_ok());
}
#[test]
fn test_check_default_sort_order_id() {
let metadata = metadata();
let requirement = TableRequirement::DefaultSortOrderIdMatch {
default_sort_order_id: 1,
};
assert!(requirement.check(Some(&metadata)).is_err());
let requirement = TableRequirement::DefaultSortOrderIdMatch {
default_sort_order_id: 0,
};
assert!(requirement.check(Some(&metadata)).is_ok());
}
#[test]
fn test_table_uuid() {
test_serde_json(
r#"
{
"type": "assert-table-uuid",
"uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
}
"#,
TableRequirement::UuidMatch {
uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
},
);
}
#[test]
fn test_assert_table_not_exists() {
test_serde_json(
r#"
{
"type": "assert-create"
}
"#,
TableRequirement::NotExist,
);
}
#[test]
fn test_assert_ref_snapshot_id() {
test_serde_json(
r#"
{
"type": "assert-ref-snapshot-id",
"ref": "snapshot-name",
"snapshot-id": null
}
"#,
TableRequirement::RefSnapshotIdMatch {
r#ref: "snapshot-name".to_string(),
snapshot_id: None,
},
);
test_serde_json(
r#"
{
"type": "assert-ref-snapshot-id",
"ref": "snapshot-name",
"snapshot-id": 1
}
"#,
TableRequirement::RefSnapshotIdMatch {
r#ref: "snapshot-name".to_string(),
snapshot_id: Some(1),
},
);
}
#[test]
fn test_assert_last_assigned_field_id() {
test_serde_json(
r#"
{
"type": "assert-last-assigned-field-id",
"last-assigned-field-id": 12
}
"#,
TableRequirement::LastAssignedFieldIdMatch {
last_assigned_field_id: 12,
},
);
}
#[test]
fn test_assert_current_schema_id() {
test_serde_json(
r#"
{
"type": "assert-current-schema-id",
"current-schema-id": 4
}
"#,
TableRequirement::CurrentSchemaIdMatch {
current_schema_id: 4,
},
);
}
#[test]
fn test_assert_last_assigned_partition_id() {
test_serde_json(
r#"
{
"type": "assert-last-assigned-partition-id",
"last-assigned-partition-id": 1004
}
"#,
TableRequirement::LastAssignedPartitionIdMatch {
last_assigned_partition_id: 1004,
},
);
}
#[test]
fn test_assert_default_spec_id() {
test_serde_json(
r#"
{
"type": "assert-default-spec-id",
"default-spec-id": 5
}
"#,
TableRequirement::DefaultSpecIdMatch { default_spec_id: 5 },
);
}
#[test]
fn test_assert_default_sort_order() {
let json = r#"
{
"type": "assert-default-sort-order-id",
"default-sort-order-id": 10
}
"#;
let update = TableRequirement::DefaultSortOrderIdMatch {
default_sort_order_id: 10,
};
test_serde_json(json, update);
}
#[test]
fn test_parse_assert_invalid() {
assert!(
serde_json::from_str::<TableRequirement>(
r#"
{
"default-sort-order-id": 10
}
"#
)
.is_err(),
"Table requirements should not be parsed without type."
);
}
#[test]
fn test_assign_uuid() {
test_serde_json(
r#"
{
"action": "assign-uuid",
"uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
}
"#,
TableUpdate::AssignUuid {
uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
},
);
}
#[test]
fn test_upgrade_format_version() {
test_serde_json(
r#"
{
"action": "upgrade-format-version",
"format-version": 2
}
"#,
TableUpdate::UpgradeFormatVersion {
format_version: FormatVersion::V2,
},
);
}
#[test]
fn test_add_schema() {
let test_schema = Schema::builder()
.with_schema_id(1)
.with_identifier_field_ids(vec![2])
.with_fields(vec![
NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
])
.build()
.unwrap();
test_serde_json(
r#"
{
"action": "add-schema",
"schema": {
"type": "struct",
"schema-id": 1,
"fields": [
{
"id": 1,
"name": "foo",
"required": false,
"type": "string"
},
{
"id": 2,
"name": "bar",
"required": true,
"type": "int"
},
{
"id": 3,
"name": "baz",
"required": false,
"type": "boolean"
}
],
"identifier-field-ids": [
2
]
},
"last-column-id": 3
}
"#,
TableUpdate::AddSchema {
schema: test_schema.clone(),
last_column_id: Some(3),
},
);
test_serde_json(
r#"
{
"action": "add-schema",
"schema": {
"type": "struct",
"schema-id": 1,
"fields": [
{
"id": 1,
"name": "foo",
"required": false,
"type": "string"
},
{
"id": 2,
"name": "bar",
"required": true,
"type": "int"
},
{
"id": 3,
"name": "baz",
"required": false,
"type": "boolean"
}
],
"identifier-field-ids": [
2
]
}
}
"#,
TableUpdate::AddSchema {
schema: test_schema.clone(),
last_column_id: None,
},
);
}
#[test]
fn test_set_current_schema() {
test_serde_json(
r#"
{
"action": "set-current-schema",
"schema-id": 23
}
"#,
TableUpdate::SetCurrentSchema { schema_id: 23 },
);
}
#[test]
fn test_add_spec() {
test_serde_json(
r#"
{
"action": "add-spec",
"spec": {
"fields": [
{
"source-id": 4,
"name": "ts_day",
"transform": "day"
},
{
"source-id": 1,
"name": "id_bucket",
"transform": "bucket[16]"
},
{
"source-id": 2,
"name": "id_truncate",
"transform": "truncate[4]"
}
]
}
}
"#,
TableUpdate::AddSpec {
spec: UnboundPartitionSpec::builder()
.add_partition_field(4, "ts_day".to_string(), Transform::Day)
.unwrap()
.add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16))
.unwrap()
.add_partition_field(2, "id_truncate".to_string(), Transform::Truncate(4))
.unwrap()
.build(),
},
);
}
#[test]
fn test_set_default_spec() {
test_serde_json(
r#"
{
"action": "set-default-spec",
"spec-id": 1
}
"#,
TableUpdate::SetDefaultSpec { spec_id: 1 },
)
}
#[test]
fn test_add_sort_order() {
let json = r#"
{
"action": "add-sort-order",
"sort-order": {
"order-id": 1,
"fields": [
{
"transform": "identity",
"source-id": 2,
"direction": "asc",
"null-order": "nulls-first"
},
{
"transform": "bucket[4]",
"source-id": 3,
"direction": "desc",
"null-order": "nulls-last"
}
]
}
}
"#;
let update = TableUpdate::AddSortOrder {
sort_order: SortOrder::builder()
.with_order_id(1)
.with_sort_field(
SortField::builder()
.source_id(2)
.direction(SortDirection::Ascending)
.null_order(NullOrder::First)
.transform(Transform::Identity)
.build(),
)
.with_sort_field(
SortField::builder()
.source_id(3)
.direction(SortDirection::Descending)
.null_order(NullOrder::Last)
.transform(Transform::Bucket(4))
.build(),
)
.build_unbound()
.unwrap(),
};
test_serde_json(json, update);
}
#[test]
fn test_set_default_order() {
let json = r#"
{
"action": "set-default-sort-order",
"sort-order-id": 2
}
"#;
let update = TableUpdate::SetDefaultSortOrder { sort_order_id: 2 };
test_serde_json(json, update);
}
#[test]
fn test_add_snapshot() {
let json = r#"
{
"action": "add-snapshot",
"snapshot": {
"snapshot-id": 3055729675574597000,
"parent-snapshot-id": 3051729675574597000,
"timestamp-ms": 1555100955770,
"sequence-number": 1,
"summary": {
"operation": "append"
},
"manifest-list": "s3://a/b/2.avro",
"schema-id": 1
}
}
"#;
let update = TableUpdate::AddSnapshot {
snapshot: Snapshot::builder()
.with_snapshot_id(3055729675574597000)
.with_parent_snapshot_id(Some(3051729675574597000))
.with_timestamp_ms(1555100955770)
.with_sequence_number(1)
.with_manifest_list("s3://a/b/2.avro")
.with_schema_id(1)
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::default(),
})
.build(),
};
test_serde_json(json, update);
}
#[test]
fn test_add_snapshot_v1() {
let json = r#"
{
"action": "add-snapshot",
"snapshot": {
"snapshot-id": 3055729675574597000,
"parent-snapshot-id": 3051729675574597000,
"timestamp-ms": 1555100955770,
"summary": {
"operation": "append"
},
"manifest-list": "s3://a/b/2.avro"
}
}
"#;
let update = TableUpdate::AddSnapshot {
snapshot: Snapshot::builder()
.with_snapshot_id(3055729675574597000)
.with_parent_snapshot_id(Some(3051729675574597000))
.with_timestamp_ms(1555100955770)
.with_sequence_number(0)
.with_manifest_list("s3://a/b/2.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::default(),
})
.build(),
};
let actual: TableUpdate = serde_json::from_str(json).expect("Failed to parse from json");
assert_eq!(actual, update, "Parsed value is not equal to expected");
}
#[test]
fn test_remove_snapshots() {
let json = r#"
{
"action": "remove-snapshots",
"snapshot-ids": [
1,
2
]
}
"#;
let update = TableUpdate::RemoveSnapshots {
snapshot_ids: vec![1, 2],
};
test_serde_json(json, update);
}
#[test]
fn test_remove_snapshot_ref() {
let json = r#"
{
"action": "remove-snapshot-ref",
"ref-name": "snapshot-ref"
}
"#;
let update = TableUpdate::RemoveSnapshotRef {
ref_name: "snapshot-ref".to_string(),
};
test_serde_json(json, update);
}
#[test]
fn test_set_snapshot_ref_tag() {
let json = r#"
{
"action": "set-snapshot-ref",
"type": "tag",
"ref-name": "hank",
"snapshot-id": 1,
"max-ref-age-ms": 1
}
"#;
let update = TableUpdate::SetSnapshotRef {
ref_name: "hank".to_string(),
reference: SnapshotReference {
snapshot_id: 1,
retention: SnapshotRetention::Tag {
max_ref_age_ms: Some(1),
},
},
};
test_serde_json(json, update);
}
#[test]
fn test_set_snapshot_ref_branch() {
let json = r#"
{
"action": "set-snapshot-ref",
"type": "branch",
"ref-name": "hank",
"snapshot-id": 1,
"min-snapshots-to-keep": 2,
"max-snapshot-age-ms": 3,
"max-ref-age-ms": 4
}
"#;
let update = TableUpdate::SetSnapshotRef {
ref_name: "hank".to_string(),
reference: SnapshotReference {
snapshot_id: 1,
retention: SnapshotRetention::Branch {
min_snapshots_to_keep: Some(2),
max_snapshot_age_ms: Some(3),
max_ref_age_ms: Some(4),
},
},
};
test_serde_json(json, update);
}
#[test]
fn test_set_properties() {
let json = r#"
{
"action": "set-properties",
"updates": {
"prop1": "v1",
"prop2": "v2"
}
}
"#;
let update = TableUpdate::SetProperties {
updates: vec![
("prop1".to_string(), "v1".to_string()),
("prop2".to_string(), "v2".to_string()),
]
.into_iter()
.collect(),
};
test_serde_json(json, update);
}
#[test]
fn test_remove_properties() {
let json = r#"
{
"action": "remove-properties",
"removals": [
"prop1",
"prop2"
]
}
"#;
let update = TableUpdate::RemoveProperties {
removals: vec!["prop1".to_string(), "prop2".to_string()],
};
test_serde_json(json, update);
}
#[test]
fn test_set_location() {
let json = r#"
{
"action": "set-location",
"location": "s3://bucket/warehouse/tbl_location"
}
"#;
let update = TableUpdate::SetLocation {
location: "s3://bucket/warehouse/tbl_location".to_string(),
};
test_serde_json(json, update);
}
#[test]
fn test_table_update_apply() {
let table_creation = TableCreation::builder()
.location("s3://db/table".to_string())
.name("table".to_string())
.properties(HashMap::new())
.schema(Schema::builder().build().unwrap())
.build();
let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
.unwrap()
.build()
.unwrap();
let table_metadata_builder = TableMetadataBuilder::new(table_metadata);
let uuid = uuid::Uuid::new_v4();
let update = TableUpdate::AssignUuid { uuid };
let updated_metadata = update
.apply(table_metadata_builder)
.unwrap()
.build()
.unwrap();
assert_eq!(updated_metadata.uuid(), uuid);
}
#[test]
fn test_view_assign_uuid() {
test_serde_json(
r#"
{
"action": "assign-uuid",
"uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
}
"#,
ViewUpdate::AssignUuid {
uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
},
);
}
#[test]
fn test_view_upgrade_format_version() {
test_serde_json(
r#"
{
"action": "upgrade-format-version",
"format-version": 1
}
"#,
ViewUpdate::UpgradeFormatVersion {
format_version: ViewFormatVersion::V1,
},
);
}
#[test]
fn test_view_add_schema() {
let test_schema = Schema::builder()
.with_schema_id(1)
.with_identifier_field_ids(vec![2])
.with_fields(vec![
NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
])
.build()
.unwrap();
test_serde_json(
r#"
{
"action": "add-schema",
"schema": {
"type": "struct",
"schema-id": 1,
"fields": [
{
"id": 1,
"name": "foo",
"required": false,
"type": "string"
},
{
"id": 2,
"name": "bar",
"required": true,
"type": "int"
},
{
"id": 3,
"name": "baz",
"required": false,
"type": "boolean"
}
],
"identifier-field-ids": [
2
]
},
"last-column-id": 3
}
"#,
ViewUpdate::AddSchema {
schema: test_schema.clone(),
last_column_id: Some(3),
},
);
}
#[test]
fn test_view_set_location() {
test_serde_json(
r#"
{
"action": "set-location",
"location": "s3://db/view"
}
"#,
ViewUpdate::SetLocation {
location: "s3://db/view".to_string(),
},
);
}
#[test]
fn test_view_set_properties() {
test_serde_json(
r#"
{
"action": "set-properties",
"updates": {
"prop1": "v1",
"prop2": "v2"
}
}
"#,
ViewUpdate::SetProperties {
updates: vec![
("prop1".to_string(), "v1".to_string()),
("prop2".to_string(), "v2".to_string()),
]
.into_iter()
.collect(),
},
);
}
#[test]
fn test_view_remove_properties() {
test_serde_json(
r#"
{
"action": "remove-properties",
"removals": [
"prop1",
"prop2"
]
}
"#,
ViewUpdate::RemoveProperties {
removals: vec!["prop1".to_string(), "prop2".to_string()],
},
);
}
#[test]
fn test_view_add_view_version() {
test_serde_json(
r#"
{
"action": "add-view-version",
"view-version": {
"version-id" : 1,
"timestamp-ms" : 1573518431292,
"schema-id" : 1,
"default-catalog" : "prod",
"default-namespace" : [ "default" ],
"summary" : {
"engine-name" : "Spark"
},
"representations" : [ {
"type" : "sql",
"sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
"dialect" : "spark"
} ]
}
}
"#,
ViewUpdate::AddViewVersion {
view_version: ViewVersion::builder()
.with_version_id(1)
.with_timestamp_ms(1573518431292)
.with_schema_id(1)
.with_default_catalog(Some("prod".to_string()))
.with_default_namespace(NamespaceIdent::from_strs(vec!["default"]).unwrap())
.with_summary(
vec![("engine-name".to_string(), "Spark".to_string())]
.into_iter()
.collect(),
)
.with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(SqlViewRepresentation {
sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2".to_string(),
dialect: "spark".to_string(),
})]))
.build(),
},
);
}
#[test]
fn test_view_set_current_view_version() {
test_serde_json(
r#"
{
"action": "set-current-view-version",
"view-version-id": 1
}
"#,
ViewUpdate::SetCurrentViewVersion { view_version_id: 1 },
);
}
}