use std::cmp::Ordering;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use _serde::ViewMetadataEnum;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use uuid::Uuid;
use super::view_version::{ViewVersion, ViewVersionId, ViewVersionRef};
use super::{SchemaId, SchemaRef};
use crate::catalog::ViewCreation;
use crate::error::{timestamp_ms_to_utc, Result};
pub type ViewMetadataRef = Arc<ViewMetadata>;
pub(crate) static INITIAL_VIEW_VERSION_ID: i32 = 1;
#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
#[serde(try_from = "ViewMetadataEnum", into = "ViewMetadataEnum")]
pub struct ViewMetadata {
pub(crate) format_version: ViewFormatVersion,
pub(crate) view_uuid: Uuid,
pub(crate) location: String,
pub(crate) current_version_id: ViewVersionId,
pub(crate) versions: HashMap<ViewVersionId, ViewVersionRef>,
pub(crate) version_log: Vec<ViewVersionLog>,
pub(crate) schemas: HashMap<i32, SchemaRef>,
pub(crate) properties: HashMap<String, String>,
}
impl ViewMetadata {
#[inline]
pub fn format_version(&self) -> ViewFormatVersion {
self.format_version
}
#[inline]
pub fn uuid(&self) -> Uuid {
self.view_uuid
}
#[inline]
pub fn location(&self) -> &str {
self.location.as_str()
}
#[inline]
pub fn current_version_id(&self) -> ViewVersionId {
self.current_version_id
}
#[inline]
pub fn versions(&self) -> impl ExactSizeIterator<Item = &ViewVersionRef> {
self.versions.values()
}
#[inline]
pub fn version_by_id(&self, version_id: ViewVersionId) -> Option<&ViewVersionRef> {
self.versions.get(&version_id)
}
#[inline]
pub fn current_version(&self) -> &ViewVersionRef {
self.versions
.get(&self.current_version_id)
.expect("Current version id set, but not found in view versions")
}
#[inline]
pub fn schemas_iter(&self) -> impl ExactSizeIterator<Item = &SchemaRef> {
self.schemas.values()
}
#[inline]
pub fn schema_by_id(&self, schema_id: SchemaId) -> Option<&SchemaRef> {
self.schemas.get(&schema_id)
}
#[inline]
pub fn current_schema(&self) -> &SchemaRef {
let schema_id = self.current_version().schema_id();
self.schema_by_id(schema_id)
.expect("Current schema id set, but not found in view metadata")
}
#[inline]
pub fn properties(&self) -> &HashMap<String, String> {
&self.properties
}
#[inline]
pub fn history(&self) -> &[ViewVersionLog] {
&self.version_log
}
}
pub struct ViewMetadataBuilder(ViewMetadata);
impl ViewMetadataBuilder {
pub fn new(origin: ViewMetadata) -> Self {
Self(origin)
}
pub fn from_view_creation(view_creation: ViewCreation) -> Result<Self> {
let ViewCreation {
location,
schema,
properties,
name: _,
representations,
default_catalog,
default_namespace,
summary,
} = view_creation;
let initial_version_id = super::INITIAL_VIEW_VERSION_ID;
let version = ViewVersion::builder()
.with_default_catalog(default_catalog)
.with_default_namespace(default_namespace)
.with_representations(representations)
.with_schema_id(schema.schema_id())
.with_summary(summary)
.with_timestamp_ms(Utc::now().timestamp_millis())
.with_version_id(initial_version_id)
.build();
let versions = HashMap::from_iter(vec![(initial_version_id, version.into())]);
let view_metadata = ViewMetadata {
format_version: ViewFormatVersion::V1,
view_uuid: Uuid::now_v7(),
location,
current_version_id: initial_version_id,
versions,
version_log: Vec::new(),
schemas: HashMap::from_iter(vec![(schema.schema_id(), Arc::new(schema))]),
properties,
};
Ok(Self(view_metadata))
}
pub fn assign_uuid(mut self, uuid: Uuid) -> Result<Self> {
self.0.view_uuid = uuid;
Ok(self)
}
pub fn build(self) -> Result<ViewMetadata> {
Ok(self.0)
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "kebab-case")]
pub struct ViewVersionLog {
version_id: ViewVersionId,
timestamp_ms: i64,
}
impl ViewVersionLog {
#[inline]
pub fn new(version_id: ViewVersionId, timestamp: i64) -> Self {
Self {
version_id,
timestamp_ms: timestamp,
}
}
#[inline]
pub fn version_id(&self) -> ViewVersionId {
self.version_id
}
#[inline]
pub fn timestamp_ms(&self) -> i64 {
self.timestamp_ms
}
pub fn timestamp(self) -> Result<DateTime<Utc>> {
timestamp_ms_to_utc(self.timestamp_ms)
}
}
pub(super) mod _serde {
use std::{collections::HashMap, sync::Arc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::{ViewFormatVersion, ViewVersionId, ViewVersionLog};
use crate::spec::schema::_serde::SchemaV2;
use crate::spec::table_metadata::_serde::VersionNumber;
use crate::spec::view_version::_serde::ViewVersionV1;
use crate::spec::{ViewMetadata, ViewVersion};
use crate::{Error, ErrorKind};
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(untagged)]
pub(super) enum ViewMetadataEnum {
V1(ViewMetadataV1),
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
pub(super) struct ViewMetadataV1 {
pub format_version: VersionNumber<1>,
pub(super) view_uuid: Uuid,
pub(super) location: String,
pub(super) current_version_id: ViewVersionId,
pub(super) versions: Vec<ViewVersionV1>,
pub(super) version_log: Vec<ViewVersionLog>,
pub(super) schemas: Vec<SchemaV2>,
pub(super) properties: Option<std::collections::HashMap<String, String>>,
}
impl Serialize for ViewMetadata {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: serde::Serializer {
let metadata_enum: ViewMetadataEnum =
self.clone().try_into().map_err(serde::ser::Error::custom)?;
metadata_enum.serialize(serializer)
}
}
impl TryFrom<ViewMetadataEnum> for ViewMetadata {
type Error = Error;
fn try_from(value: ViewMetadataEnum) -> Result<Self, Error> {
match value {
ViewMetadataEnum::V1(value) => value.try_into(),
}
}
}
impl TryFrom<ViewMetadata> for ViewMetadataEnum {
type Error = Error;
fn try_from(value: ViewMetadata) -> Result<Self, Error> {
Ok(match value.format_version {
ViewFormatVersion::V1 => ViewMetadataEnum::V1(value.into()),
})
}
}
impl TryFrom<ViewMetadataV1> for ViewMetadata {
type Error = Error;
fn try_from(value: ViewMetadataV1) -> Result<Self, self::Error> {
let schemas = HashMap::from_iter(
value
.schemas
.into_iter()
.map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
.collect::<Result<Vec<_>, Error>>()?,
);
let versions = HashMap::from_iter(
value
.versions
.into_iter()
.map(|x| Ok((x.version_id, Arc::new(ViewVersion::from(x)))))
.collect::<Result<Vec<_>, Error>>()?,
);
let current_version =
versions
.get(&value.current_version_id)
.ok_or(self::Error::new(
ErrorKind::DataInvalid,
format!(
"No version exists with the current version id {}.",
value.current_version_id
),
))?;
if !schemas.contains_key(¤t_version.schema_id()) {
return Err(self::Error::new(
ErrorKind::DataInvalid,
format!(
"No schema exists with the schema id {}.",
current_version.schema_id()
),
));
}
Ok(ViewMetadata {
format_version: ViewFormatVersion::V1,
view_uuid: value.view_uuid,
location: value.location,
schemas,
properties: value.properties.unwrap_or_default(),
current_version_id: value.current_version_id,
versions,
version_log: value.version_log,
})
}
}
impl From<ViewMetadata> for ViewMetadataV1 {
fn from(v: ViewMetadata) -> Self {
let schemas = v
.schemas
.into_values()
.map(|x| {
Arc::try_unwrap(x)
.unwrap_or_else(|schema| schema.as_ref().clone())
.into()
})
.collect();
let versions = v
.versions
.into_values()
.map(|x| {
Arc::try_unwrap(x)
.unwrap_or_else(|version| version.as_ref().clone())
.into()
})
.collect();
ViewMetadataV1 {
format_version: VersionNumber::<1>,
view_uuid: v.view_uuid,
location: v.location,
schemas,
properties: Some(v.properties),
current_version_id: v.current_version_id,
versions,
version_log: v.version_log,
}
}
}
}
#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy)]
#[repr(u8)]
pub enum ViewFormatVersion {
V1 = 1u8,
}
impl PartialOrd for ViewFormatVersion {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ViewFormatVersion {
fn cmp(&self, other: &Self) -> Ordering {
(*self as u8).cmp(&(*other as u8))
}
}
impl Display for ViewFormatVersion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ViewFormatVersion::V1 => write!(f, "v1"),
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::fs;
use std::sync::Arc;
use anyhow::Result;
use pretty_assertions::assert_eq;
use uuid::Uuid;
use super::{ViewFormatVersion, ViewMetadataBuilder, ViewVersionLog};
use crate::spec::{
NestedField, PrimitiveType, Schema, SqlViewRepresentation, Type, ViewMetadata,
ViewRepresentations, ViewVersion,
};
use crate::{NamespaceIdent, ViewCreation};
fn check_view_metadata_serde(json: &str, expected_type: ViewMetadata) {
let desered_type: ViewMetadata = serde_json::from_str(json).unwrap();
assert_eq!(desered_type, expected_type);
let sered_json = serde_json::to_string(&expected_type).unwrap();
let parsed_json_value = serde_json::from_str::<ViewMetadata>(&sered_json).unwrap();
assert_eq!(parsed_json_value, desered_type);
}
fn get_test_view_metadata(file_name: &str) -> ViewMetadata {
let path = format!("testdata/view_metadata/{}", file_name);
let metadata: String = fs::read_to_string(path).unwrap();
serde_json::from_str(&metadata).unwrap()
}
#[test]
fn test_view_data_v1() {
let data = r#"
{
"view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
"format-version" : 1,
"location" : "s3://bucket/warehouse/default.db/event_agg",
"current-version-id" : 1,
"properties" : {
"comment" : "Daily event counts"
},
"versions" : [ {
"version-id" : 1,
"timestamp-ms" : 1573518431292,
"schema-id" : 1,
"default-catalog" : "prod",
"default-namespace" : [ "default" ],
"summary" : {
"engine-name" : "Spark",
"engineVersion" : "3.3.2"
},
"representations" : [ {
"type" : "sql",
"sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
"dialect" : "spark"
} ]
} ],
"schemas": [ {
"schema-id": 1,
"type" : "struct",
"fields" : [ {
"id" : 1,
"name" : "event_count",
"required" : false,
"type" : "int",
"doc" : "Count of events"
} ]
} ],
"version-log" : [ {
"timestamp-ms" : 1573518431292,
"version-id" : 1
} ]
}
"#;
let schema = Schema::builder()
.with_schema_id(1)
.with_fields(vec![Arc::new(
NestedField::optional(1, "event_count", Type::Primitive(PrimitiveType::Int))
.with_doc("Count of events"),
)])
.build()
.unwrap();
let version = ViewVersion::builder()
.with_version_id(1)
.with_timestamp_ms(1573518431292)
.with_schema_id(1)
.with_default_catalog("prod".to_string().into())
.with_default_namespace(NamespaceIdent::from_vec(vec!["default".to_string()]).unwrap())
.with_summary(HashMap::from_iter(vec![
("engineVersion".to_string(), "3.3.2".to_string()),
("engine-name".to_string(), "Spark".to_string()),
]))
.with_representations(ViewRepresentations(vec![SqlViewRepresentation {
sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2"
.to_string(),
dialect: "spark".to_string(),
}
.into()]))
.build();
let expected = ViewMetadata {
format_version: ViewFormatVersion::V1,
view_uuid: Uuid::parse_str("fa6506c3-7681-40c8-86dc-e36561f83385").unwrap(),
location: "s3://bucket/warehouse/default.db/event_agg".to_string(),
current_version_id: 1,
versions: HashMap::from_iter(vec![(1, Arc::new(version))]),
version_log: vec![ViewVersionLog {
timestamp_ms: 1573518431292,
version_id: 1,
}],
schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
properties: HashMap::from_iter(vec![(
"comment".to_string(),
"Daily event counts".to_string(),
)]),
};
check_view_metadata_serde(data, expected);
}
#[test]
fn test_invalid_view_uuid() -> Result<()> {
let data = r#"
{
"format-version" : 1,
"view-uuid": "xxxx"
}
"#;
assert!(serde_json::from_str::<ViewMetadata>(data).is_err());
Ok(())
}
#[test]
fn test_view_builder_from_view_creation() {
let representations = ViewRepresentations(vec![SqlViewRepresentation {
sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2"
.to_string(),
dialect: "spark".to_string(),
}
.into()]);
let creation = ViewCreation::builder()
.location("s3://bucket/warehouse/default.db/event_agg".to_string())
.name("view".to_string())
.schema(Schema::builder().build().unwrap())
.default_namespace(NamespaceIdent::from_vec(vec!["default".to_string()]).unwrap())
.representations(representations)
.build();
let metadata = ViewMetadataBuilder::from_view_creation(creation)
.unwrap()
.build()
.unwrap();
assert_eq!(
metadata.location(),
"s3://bucket/warehouse/default.db/event_agg"
);
assert_eq!(metadata.current_version_id(), 1);
assert_eq!(metadata.versions().count(), 1);
assert_eq!(metadata.schemas_iter().count(), 1);
assert_eq!(metadata.properties().len(), 0);
}
#[test]
fn test_view_metadata_v1_file_valid() {
let metadata =
fs::read_to_string("testdata/view_metadata/ViewMetadataV1Valid.json").unwrap();
let schema = Schema::builder()
.with_schema_id(1)
.with_fields(vec![
Arc::new(
NestedField::optional(1, "event_count", Type::Primitive(PrimitiveType::Int))
.with_doc("Count of events"),
),
Arc::new(NestedField::optional(
2,
"event_date",
Type::Primitive(PrimitiveType::Date),
)),
])
.build()
.unwrap();
let version = ViewVersion::builder()
.with_version_id(1)
.with_timestamp_ms(1573518431292)
.with_schema_id(1)
.with_default_catalog("prod".to_string().into())
.with_default_namespace(NamespaceIdent::from_vec(vec!["default".to_string()]).unwrap())
.with_summary(HashMap::from_iter(vec![
("engineVersion".to_string(), "3.3.2".to_string()),
("engine-name".to_string(), "Spark".to_string()),
]))
.with_representations(ViewRepresentations(vec![SqlViewRepresentation {
sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2"
.to_string(),
dialect: "spark".to_string(),
}
.into()]))
.build();
let expected = ViewMetadata {
format_version: ViewFormatVersion::V1,
view_uuid: Uuid::parse_str("fa6506c3-7681-40c8-86dc-e36561f83385").unwrap(),
location: "s3://bucket/warehouse/default.db/event_agg".to_string(),
current_version_id: 1,
versions: HashMap::from_iter(vec![(1, Arc::new(version))]),
version_log: vec![ViewVersionLog {
timestamp_ms: 1573518431292,
version_id: 1,
}],
schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
properties: HashMap::from_iter(vec![(
"comment".to_string(),
"Daily event counts".to_string(),
)]),
};
check_view_metadata_serde(&metadata, expected);
}
#[test]
fn test_view_builder_assign_uuid() {
let metadata = get_test_view_metadata("ViewMetadataV1Valid.json");
let metadata_builder = ViewMetadataBuilder::new(metadata);
let uuid = Uuid::new_v4();
let metadata = metadata_builder.assign_uuid(uuid).unwrap().build().unwrap();
assert_eq!(metadata.uuid(), uuid);
}
#[test]
fn test_view_metadata_v1_unsupported_version() {
let metadata =
fs::read_to_string("testdata/view_metadata/ViewMetadataUnsupportedVersion.json")
.unwrap();
let desered: Result<ViewMetadata, serde_json::Error> = serde_json::from_str(&metadata);
assert_eq!(
desered.unwrap_err().to_string(),
"data did not match any variant of untagged enum ViewMetadataEnum"
)
}
#[test]
fn test_view_metadata_v1_version_not_found() {
let metadata =
fs::read_to_string("testdata/view_metadata/ViewMetadataV1CurrentVersionNotFound.json")
.unwrap();
let desered: Result<ViewMetadata, serde_json::Error> = serde_json::from_str(&metadata);
assert_eq!(
desered.unwrap_err().to_string(),
"DataInvalid => No version exists with the current version id 2."
)
}
#[test]
fn test_view_metadata_v1_schema_not_found() {
let metadata =
fs::read_to_string("testdata/view_metadata/ViewMetadataV1SchemaNotFound.json").unwrap();
let desered: Result<ViewMetadata, serde_json::Error> = serde_json::from_str(&metadata);
assert_eq!(
desered.unwrap_err().to_string(),
"DataInvalid => No schema exists with the schema id 2."
)
}
#[test]
fn test_view_metadata_v1_missing_schema_for_version() {
let metadata =
fs::read_to_string("testdata/view_metadata/ViewMetadataV1MissingSchema.json").unwrap();
let desered: Result<ViewMetadata, serde_json::Error> = serde_json::from_str(&metadata);
assert_eq!(
desered.unwrap_err().to_string(),
"data did not match any variant of untagged enum ViewMetadataEnum"
)
}
#[test]
fn test_view_metadata_v1_missing_current_version() {
let metadata =
fs::read_to_string("testdata/view_metadata/ViewMetadataV1MissingCurrentVersion.json")
.unwrap();
let desered: Result<ViewMetadata, serde_json::Error> = serde_json::from_str(&metadata);
assert_eq!(
desered.unwrap_err().to_string(),
"data did not match any variant of untagged enum ViewMetadataEnum"
)
}
}