use std::collections::HashMap;
use std::sync::Arc;
use _serde::ViewVersionV1;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use typed_builder::TypedBuilder;
use super::view_metadata::ViewVersionLog;
use super::INITIAL_VIEW_VERSION_ID;
use crate::catalog::NamespaceIdent;
use crate::error::{timestamp_ms_to_utc, Result};
use crate::spec::{SchemaId, SchemaRef, ViewMetadata};
use crate::{Error, ErrorKind};
pub type ViewVersionRef = Arc<ViewVersion>;
pub type ViewVersionId = i32;
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, TypedBuilder)]
#[serde(from = "ViewVersionV1", into = "ViewVersionV1")]
#[builder(field_defaults(setter(prefix = "with_")))]
pub struct ViewVersion {
#[builder(default = INITIAL_VIEW_VERSION_ID)]
version_id: ViewVersionId,
schema_id: SchemaId,
timestamp_ms: i64,
#[builder(default = HashMap::new())]
summary: HashMap<String, String>,
representations: ViewRepresentations,
#[builder(default = None)]
default_catalog: Option<String>,
default_namespace: NamespaceIdent,
}
impl ViewVersion {
#[inline]
pub fn version_id(&self) -> ViewVersionId {
self.version_id
}
#[inline]
pub fn schema_id(&self) -> SchemaId {
self.schema_id
}
#[inline]
pub fn timestamp(&self) -> Result<DateTime<Utc>> {
timestamp_ms_to_utc(self.timestamp_ms)
}
#[inline]
pub fn timestamp_ms(&self) -> i64 {
self.timestamp_ms
}
#[inline]
pub fn summary(&self) -> &HashMap<String, String> {
&self.summary
}
#[inline]
pub fn representations(&self) -> &ViewRepresentations {
&self.representations
}
#[inline]
pub fn default_catalog(&self) -> Option<&String> {
self.default_catalog.as_ref()
}
#[inline]
pub fn default_namespace(&self) -> &NamespaceIdent {
&self.default_namespace
}
pub fn schema(&self, view_metadata: &ViewMetadata) -> Result<SchemaRef> {
let r = view_metadata
.schema_by_id(self.schema_id())
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Schema with id {} not found", self.schema_id()),
)
})
.cloned();
r
}
pub(crate) fn log(&self) -> ViewVersionLog {
ViewVersionLog::new(self.version_id, self.timestamp_ms)
}
pub(crate) fn behaves_identical_to(&self, other: &Self) -> bool {
self.summary == other.summary
&& self.representations == other.representations
&& self.default_catalog == other.default_catalog
&& self.default_namespace == other.default_namespace
&& self.schema_id == other.schema_id
}
pub fn with_version_id(self, version_id: i32) -> Self {
Self { version_id, ..self }
}
pub fn with_schema_id(self, schema_id: SchemaId) -> Self {
Self { schema_id, ..self }
}
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct ViewRepresentations(pub(crate) Vec<ViewRepresentation>);
impl ViewRepresentations {
#[inline]
pub fn len(&self) -> usize {
self.0.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn iter(&self) -> impl ExactSizeIterator<Item = &'_ ViewRepresentation> {
self.0.iter()
}
}
impl IntoIterator for ViewRepresentations {
type Item = ViewRepresentation;
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(tag = "type")]
pub enum ViewRepresentation {
#[serde(rename = "sql")]
Sql(SqlViewRepresentation),
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "kebab-case")]
pub struct SqlViewRepresentation {
#[serde(rename = "sql")]
pub sql: String,
#[serde(rename = "dialect")]
pub dialect: String,
}
pub(super) mod _serde {
use serde::{Deserialize, Serialize};
use super::{ViewRepresentation, ViewRepresentations, ViewVersion};
use crate::catalog::NamespaceIdent;
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
pub(crate) struct ViewVersionV1 {
pub version_id: i32,
pub schema_id: i32,
pub timestamp_ms: i64,
pub summary: std::collections::HashMap<String, String>,
pub representations: Vec<ViewRepresentation>,
#[serde(skip_serializing_if = "Option::is_none")]
pub default_catalog: Option<String>,
pub default_namespace: NamespaceIdent,
}
impl From<ViewVersionV1> for ViewVersion {
fn from(v1: ViewVersionV1) -> Self {
ViewVersion {
version_id: v1.version_id,
schema_id: v1.schema_id,
timestamp_ms: v1.timestamp_ms,
summary: v1.summary,
representations: ViewRepresentations(v1.representations),
default_catalog: v1.default_catalog,
default_namespace: v1.default_namespace,
}
}
}
impl From<ViewVersion> for ViewVersionV1 {
fn from(v1: ViewVersion) -> Self {
ViewVersionV1 {
version_id: v1.version_id,
schema_id: v1.schema_id,
timestamp_ms: v1.timestamp_ms,
summary: v1.summary,
representations: v1.representations.0,
default_catalog: v1.default_catalog,
default_namespace: v1.default_namespace,
}
}
}
}
impl From<SqlViewRepresentation> for ViewRepresentation {
fn from(sql: SqlViewRepresentation) -> Self {
ViewRepresentation::Sql(sql)
}
}
#[cfg(test)]
mod tests {
use chrono::{TimeZone, Utc};
use crate::spec::view_version::ViewVersion;
use crate::spec::view_version::_serde::ViewVersionV1;
use crate::spec::ViewRepresentations;
use crate::NamespaceIdent;
#[test]
fn view_version() {
let record = serde_json::json!(
{
"version-id" : 1,
"timestamp-ms" : 1573518431292i64,
"schema-id" : 1,
"default-catalog" : "prod",
"default-namespace" : [ "default" ],
"summary" : {
"engine-name" : "Spark",
"engineVersion" : "3.3.2"
},
"representations" : [ {
"type" : "sql",
"sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
"dialect" : "spark"
} ]
}
);
let result: ViewVersion = serde_json::from_value::<ViewVersionV1>(record.clone())
.unwrap()
.into();
assert_eq!(serde_json::to_value(result.clone()).unwrap(), record);
assert_eq!(result.version_id(), 1);
assert_eq!(
result.timestamp().unwrap(),
Utc.timestamp_millis_opt(1573518431292).unwrap()
);
assert_eq!(result.schema_id(), 1);
assert_eq!(result.default_catalog, Some("prod".to_string()));
assert_eq!(result.summary(), &{
let mut map = std::collections::HashMap::new();
map.insert("engine-name".to_string(), "Spark".to_string());
map.insert("engineVersion".to_string(), "3.3.2".to_string());
map
});
assert_eq!(
result.representations().to_owned(),
ViewRepresentations(vec![super::ViewRepresentation::Sql(
super::SqlViewRepresentation {
sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2"
.to_string(),
dialect: "spark".to_string(),
},
)])
);
assert_eq!(
result.default_namespace.inner(),
vec!["default".to_string()]
);
}
#[test]
fn test_behaves_identical_to() {
let view_version = ViewVersion::builder()
.with_version_id(1)
.with_schema_id(1)
.with_timestamp_ms(1573518431292)
.with_summary({
let mut map = std::collections::HashMap::new();
map.insert("engine-name".to_string(), "Spark".to_string());
map.insert("engineVersion".to_string(), "3.3.2".to_string());
map
})
.with_representations(ViewRepresentations(vec![super::ViewRepresentation::Sql(
super::SqlViewRepresentation {
sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2"
.to_string(),
dialect: "spark".to_string(),
},
)]))
.with_default_catalog(Some("prod".to_string()))
.with_default_namespace(NamespaceIdent::new("default".to_string()))
.build();
let mut identical_view_version = view_version.clone();
identical_view_version.version_id = 2;
identical_view_version.timestamp_ms = 1573518431293;
let different_view_version = ViewVersion::builder()
.with_version_id(view_version.version_id())
.with_schema_id(view_version.schema_id())
.with_timestamp_ms(view_version.timestamp_ms())
.with_summary(view_version.summary().clone())
.with_representations(ViewRepresentations(vec![super::ViewRepresentation::Sql(
super::SqlViewRepresentation {
sql: "SELECT * from events".to_string(),
dialect: "spark".to_string(),
},
)]))
.with_default_catalog(view_version.default_catalog().cloned())
.with_default_namespace(view_version.default_namespace().clone())
.build();
assert!(view_version.behaves_identical_to(&identical_view_version));
assert!(!view_version.behaves_identical_to(&different_view_version));
}
}