use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use chrono::Utc;
use itertools::Itertools;
use uuid::Uuid;
use super::{
Schema, SchemaId, TableMetadataBuilder, ViewFormatVersion, ViewMetadata, ViewRepresentation,
ViewVersion, ViewVersionLog, ViewVersionRef, DEFAULT_SCHEMA_ID, INITIAL_VIEW_VERSION_ID,
ONE_MINUTE_MS, VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED,
VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED_DEFAULT, VIEW_PROPERTY_VERSION_HISTORY_SIZE,
VIEW_PROPERTY_VERSION_HISTORY_SIZE_DEFAULT,
};
use crate::catalog::ViewUpdate;
use crate::error::{Error, ErrorKind, Result};
use crate::io::is_truthy;
use crate::ViewCreation;
#[derive(Debug, Clone)]
pub struct ViewMetadataBuilder {
metadata: ViewMetadata,
changes: Vec<ViewUpdate>,
last_added_schema_id: Option<SchemaId>,
last_added_version_id: Option<SchemaId>,
history_entry: Option<ViewVersionLog>,
previous_view_version: Option<ViewVersionRef>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct ViewMetadataBuildResult {
pub metadata: ViewMetadata,
pub changes: Vec<ViewUpdate>,
}
impl ViewMetadataBuilder {
const LAST_ADDED: i32 = TableMetadataBuilder::LAST_ADDED;
pub fn new(
location: String,
schema: Schema,
view_version: ViewVersion,
format_version: ViewFormatVersion,
properties: HashMap<String, String>,
) -> Result<Self> {
let builder = Self {
metadata: ViewMetadata {
format_version,
view_uuid: Uuid::now_v7(),
location: "".to_string(), current_version_id: -1, versions: HashMap::new(), version_log: Vec::new(),
schemas: HashMap::new(), properties: HashMap::new(), },
changes: vec![],
last_added_schema_id: None, last_added_version_id: None, history_entry: None,
previous_view_version: None, };
builder
.set_location(location)
.set_current_version(view_version, schema)?
.set_properties(properties)
}
#[must_use]
pub fn new_from_metadata(previous: ViewMetadata) -> Self {
let previous_view_version = previous.current_version().clone();
Self {
metadata: previous,
changes: Vec::default(),
last_added_schema_id: None,
last_added_version_id: None,
history_entry: None,
previous_view_version: Some(previous_view_version),
}
}
pub fn from_view_creation(view_creation: ViewCreation) -> Result<Self> {
let ViewCreation {
location,
schema,
properties,
name: _,
representations,
default_catalog,
default_namespace,
summary,
} = view_creation;
let version = ViewVersion::builder()
.with_default_catalog(default_catalog)
.with_default_namespace(default_namespace)
.with_representations(representations)
.with_schema_id(schema.schema_id())
.with_summary(summary)
.with_timestamp_ms(Utc::now().timestamp_millis())
.with_version_id(INITIAL_VIEW_VERSION_ID)
.build();
Self::new(location, schema, version, ViewFormatVersion::V1, properties)
}
pub fn upgrade_format_version(self, format_version: ViewFormatVersion) -> Result<Self> {
if format_version < self.metadata.format_version {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot downgrade ViewFormatVersion from {} to {}",
self.metadata.format_version, format_version
),
));
}
if format_version != self.metadata.format_version {
match format_version {
ViewFormatVersion::V1 => {
}
}
}
Ok(self)
}
pub fn set_location(mut self, location: String) -> Self {
let location = location.trim_end_matches('/').to_string();
if self.metadata.location != location {
self.changes.push(ViewUpdate::SetLocation {
location: location.clone(),
});
self.metadata.location = location;
}
self
}
pub fn set_current_version_id(mut self, mut version_id: i32) -> Result<Self> {
if version_id == Self::LAST_ADDED && self.last_added_version_id.is_none() {
version_id = self.last_added_version_id.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
"Cannot set current version id to last added version: no version has been added.",
)
})?;
}
let version_id = version_id; if version_id == self.metadata.current_version_id {
return Ok(self);
}
let version = self.metadata.versions.get(&version_id).ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot set current version to unknown version with id: {}",
version_id
),
)
})?;
self.metadata.current_version_id = version_id;
if self.last_added_version_id == Some(version_id) {
self.changes.push(ViewUpdate::SetCurrentViewVersion {
view_version_id: Self::LAST_ADDED,
});
} else {
self.changes.push(ViewUpdate::SetCurrentViewVersion {
view_version_id: version_id,
});
}
self.history_entry = Some(version.log());
Ok(self)
}
pub fn set_current_version(
mut self,
view_version: ViewVersion,
schema: Schema,
) -> Result<Self> {
let schema_id = self.add_schema_internal(schema);
let view_version = view_version.with_schema_id(schema_id);
let view_version_id = self.add_version_internal(view_version)?;
self.set_current_version_id(view_version_id)
}
pub fn add_version(mut self, view_version: ViewVersion) -> Result<Self> {
self.add_version_internal(view_version)?;
Ok(self)
}
fn add_version_internal(&mut self, view_version: ViewVersion) -> Result<i32> {
let version_id = self.reuse_or_create_new_view_version_id(&view_version);
let view_version = view_version.with_version_id(version_id);
if self.metadata.versions.contains_key(&version_id) {
if self.last_added_version_id != Some(version_id) {
self.changes.push(ViewUpdate::AddViewVersion {
view_version: view_version.with_version_id(version_id),
});
self.last_added_version_id = Some(version_id);
}
return Ok(version_id);
}
let view_version = if view_version.schema_id() == Self::LAST_ADDED {
let last_added_schema_id = self.last_added_schema_id.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
"Cannot set last added schema: no schema has been added",
)
})?;
view_version.with_schema_id(last_added_schema_id)
} else {
view_version
};
if !self
.metadata
.schemas
.contains_key(&view_version.schema_id())
{
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot add version with unknown schema: {}",
view_version.schema_id()
),
));
}
require_unique_dialects(&view_version)?;
if let Some(last) = self.metadata.version_log.last() {
if view_version.timestamp_ms() - last.timestamp_ms() < -ONE_MINUTE_MS {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Invalid snapshot timestamp {}: before last snapshot timestamp {}",
view_version.timestamp_ms(),
last.timestamp_ms()
),
));
}
}
self.metadata
.versions
.insert(version_id, Arc::new(view_version.clone()));
let view_version = if let Some(last_added_schema_id) = self.last_added_schema_id {
if view_version.schema_id() == last_added_schema_id {
view_version.with_schema_id(Self::LAST_ADDED)
} else {
view_version
}
} else {
view_version
};
self.changes
.push(ViewUpdate::AddViewVersion { view_version });
self.last_added_version_id = Some(version_id);
Ok(version_id)
}
fn reuse_or_create_new_view_version_id(&self, new_view_version: &ViewVersion) -> i32 {
self.metadata
.versions
.iter()
.find_map(|(id, other_version)| {
new_view_version
.behaves_identical_to(other_version)
.then_some(*id)
})
.unwrap_or_else(|| {
self.get_highest_view_version_id()
.map(|id| id + 1)
.unwrap_or(INITIAL_VIEW_VERSION_ID)
})
}
fn get_highest_view_version_id(&self) -> Option<i32> {
self.metadata.versions.keys().max().copied()
}
pub fn add_schema(mut self, schema: Schema) -> Self {
self.add_schema_internal(schema);
self
}
fn add_schema_internal(&mut self, schema: Schema) -> SchemaId {
let schema_id = self.reuse_or_create_new_schema_id(&schema);
if self.metadata.schemas.contains_key(&schema_id) {
if self.last_added_schema_id != Some(schema_id) {
self.changes.push(ViewUpdate::AddSchema {
schema: schema.clone().with_schema_id(schema_id),
last_column_id: None,
});
self.last_added_schema_id = Some(schema_id);
}
return schema_id;
}
let schema = schema.with_schema_id(schema_id);
self.metadata
.schemas
.insert(schema_id, Arc::new(schema.clone()));
let last_column_id = schema.highest_field_id();
self.changes.push(ViewUpdate::AddSchema {
schema,
last_column_id: Some(last_column_id),
});
self.last_added_schema_id = Some(schema_id);
schema_id
}
fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> SchemaId {
self.metadata
.schemas
.iter()
.find_map(|(id, schema)| new_schema.is_same_schema(schema).then_some(*id))
.unwrap_or_else(|| {
self.get_highest_schema_id()
.map(|id| id + 1)
.unwrap_or(DEFAULT_SCHEMA_ID)
})
}
fn get_highest_schema_id(&self) -> Option<SchemaId> {
self.metadata.schemas.keys().max().copied()
}
pub fn set_properties(mut self, updates: HashMap<String, String>) -> Result<Self> {
if updates.is_empty() {
return Ok(self);
}
let num_versions_to_keep = updates
.get(VIEW_PROPERTY_VERSION_HISTORY_SIZE)
.and_then(|v| v.parse::<i64>().ok())
.unwrap_or(1);
if num_versions_to_keep < 0 {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"{} must be positive but was {}",
VIEW_PROPERTY_VERSION_HISTORY_SIZE, num_versions_to_keep
),
));
}
self.metadata.properties.extend(updates.clone());
self.changes.push(ViewUpdate::SetProperties { updates });
Ok(self)
}
pub fn remove_properties(mut self, removals: &[String]) -> Self {
if removals.is_empty() {
return self;
}
for property in removals {
self.metadata.properties.remove(property);
}
self.changes.push(ViewUpdate::RemoveProperties {
removals: removals.to_vec(),
});
self
}
pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
if self.metadata.view_uuid != uuid {
self.metadata.view_uuid = uuid;
self.changes.push(ViewUpdate::AssignUuid { uuid });
}
self
}
pub fn build(mut self) -> Result<ViewMetadataBuildResult> {
if let Some(history_entry) = self.history_entry.take() {
self.metadata.version_log.push(history_entry);
}
self.metadata.validate()?;
if let Some(previous) = self.previous_view_version.take() {
if !allow_replace_drop_dialects(&self.metadata.properties) {
require_no_dialect_dropped(&previous, self.metadata.current_version())?;
}
}
let _expired_versions = self.expire_versions();
self.metadata.version_log = update_version_log(
self.metadata.version_log,
self.metadata.versions.keys().copied().collect(),
);
Ok(ViewMetadataBuildResult {
metadata: self.metadata,
changes: self.changes,
})
}
fn expire_versions(&mut self) -> Vec<ViewVersionRef> {
let num_versions_to_keep = self
.metadata
.properties
.get(VIEW_PROPERTY_VERSION_HISTORY_SIZE)
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(VIEW_PROPERTY_VERSION_HISTORY_SIZE_DEFAULT)
.max(1);
let num_added_versions = self
.changes
.iter()
.filter(|update| matches!(update, ViewUpdate::AddViewVersion { .. }))
.count();
let num_versions_to_keep = num_added_versions.max(num_versions_to_keep);
if self.metadata.versions.len() > num_versions_to_keep {
let mut versions_to_keep = self
.metadata
.versions
.keys()
.copied()
.sorted()
.rev()
.take(num_versions_to_keep)
.collect::<HashSet<_>>();
if !versions_to_keep.contains(&self.metadata.current_version_id) {
if num_versions_to_keep > num_added_versions {
let lowest_id = versions_to_keep.iter().min().copied();
lowest_id.map(|id| versions_to_keep.remove(&id));
}
versions_to_keep.insert(self.metadata.current_version_id);
}
let mut expired_versions = Vec::new();
self.metadata.versions.retain(|id, version| {
if versions_to_keep.contains(id) {
true
} else {
expired_versions.push(version.clone());
false
}
});
expired_versions
} else {
Vec::new()
}
}
}
fn update_version_log(
version_log: Vec<ViewVersionLog>,
ids_to_keep: HashSet<i32>,
) -> Vec<ViewVersionLog> {
let mut retained_history = Vec::new();
for log_entry in version_log {
if ids_to_keep.contains(&log_entry.version_id()) {
retained_history.push(log_entry);
} else {
retained_history.clear();
}
}
retained_history
}
fn allow_replace_drop_dialects(properties: &HashMap<String, String>) -> bool {
properties
.get(VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED)
.map_or(
VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED_DEFAULT,
|value| is_truthy(value),
)
}
fn require_no_dialect_dropped(previous: &ViewVersion, current: &ViewVersion) -> Result<()> {
let base_dialects = lowercase_sql_dialects_for(previous);
let updated_dialects = lowercase_sql_dialects_for(current);
if !updated_dialects.is_superset(&base_dialects) {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot replace view due to loss of view dialects: \nPrevious dialects: {:?}\nNew dialects: {:?}\nSet {} to true to allow dropping dialects.",
Vec::from_iter(base_dialects), Vec::from_iter(updated_dialects), VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED
),
));
}
Ok(())
}
fn lowercase_sql_dialects_for(view_version: &ViewVersion) -> HashSet<String> {
view_version
.representations()
.iter()
.map(|repr| match repr {
ViewRepresentation::Sql(sql_repr) => sql_repr.dialect.to_lowercase(),
})
.collect()
}
pub(super) fn require_unique_dialects(view_version: &ViewVersion) -> Result<()> {
let mut seen_dialects = HashSet::with_capacity(view_version.representations().len());
for repr in view_version.representations().iter() {
match repr {
ViewRepresentation::Sql(sql_repr) => {
if !seen_dialects.insert(sql_repr.dialect.to_lowercase()) {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Invalid view version: Cannot add multiple queries for dialect {}",
sql_repr.dialect
),
));
}
}
}
}
Ok(())
}
#[cfg(test)]
mod test {
use super::super::view_metadata::tests::get_test_view_metadata;
use super::*;
use crate::spec::{
NestedField, PrimitiveType, SqlViewRepresentation, Type, ViewRepresentations,
};
use crate::NamespaceIdent;
fn new_view_version(id: usize, schema_id: SchemaId, sql: &str) -> ViewVersion {
new_view_version_with_dialect(id, schema_id, sql, vec!["spark"])
}
fn new_view_version_with_dialect(
id: usize,
schema_id: SchemaId,
sql: &str,
dialects: Vec<&str>,
) -> ViewVersion {
ViewVersion::builder()
.with_version_id(id as i32)
.with_schema_id(schema_id)
.with_timestamp_ms(1573518431300)
.with_default_catalog(Some("prod".to_string()))
.with_summary(HashMap::from_iter(vec![(
"user".to_string(),
"some-user".to_string(),
)]))
.with_representations(ViewRepresentations(
dialects
.iter()
.map(|dialect| {
ViewRepresentation::Sql(SqlViewRepresentation {
dialect: dialect.to_string(),
sql: sql.to_string(),
})
})
.collect(),
))
.with_default_namespace(NamespaceIdent::new("default".to_string()))
.build()
}
fn builder_without_changes() -> ViewMetadataBuilder {
ViewMetadataBuilder::new_from_metadata(get_test_view_metadata("ViewMetadataV1Valid.json"))
}
#[test]
fn test_minimal_builder() {
let location = "s3://bucket/table".to_string();
let schema = Schema::builder()
.with_schema_id(1)
.with_fields(vec![])
.build()
.unwrap();
let version = new_view_version(20, 21, "select 1 as count");
let format_version = ViewFormatVersion::V1;
let properties = HashMap::from_iter(vec![("key".to_string(), "value".to_string())]);
let build_result = ViewMetadataBuilder::new(
location.clone(),
schema.clone(),
version.clone(),
format_version,
properties.clone(),
)
.unwrap()
.build()
.unwrap();
let metadata = build_result.metadata;
assert_eq!(metadata.location, location);
assert_eq!(metadata.current_version_id, INITIAL_VIEW_VERSION_ID);
assert_eq!(metadata.format_version, format_version);
assert_eq!(metadata.properties, properties);
assert_eq!(metadata.versions.len(), 1);
assert_eq!(metadata.schemas.len(), 1);
assert_eq!(metadata.version_log.len(), 1);
assert_eq!(
Arc::unwrap_or_clone(metadata.versions[&INITIAL_VIEW_VERSION_ID].clone()),
version
.clone()
.with_version_id(INITIAL_VIEW_VERSION_ID)
.with_schema_id(0)
);
let changes = build_result.changes;
assert_eq!(changes.len(), 5);
assert!(changes.contains(&ViewUpdate::SetLocation { location }));
assert!(changes.contains(&ViewUpdate::AddViewVersion {
view_version: version
.with_version_id(INITIAL_VIEW_VERSION_ID)
.with_schema_id(-1)
}));
assert!(changes.contains(&ViewUpdate::SetCurrentViewVersion {
view_version_id: -1
}));
assert!(changes.contains(&ViewUpdate::AddSchema {
schema: schema.clone().with_schema_id(0),
last_column_id: Some(0)
}));
assert!(changes.contains(&ViewUpdate::SetProperties {
updates: properties
}));
}
#[test]
fn test_version_expiration() {
let v1 = new_view_version(0, 1, "select 1 as count");
let v2 = new_view_version(0, 1, "select count(1) as count from t2");
let v3 = new_view_version(0, 1, "select count from t1");
let builder = builder_without_changes()
.add_version(v1)
.unwrap()
.add_version(v2)
.unwrap()
.add_version(v3)
.unwrap();
let builder_without_changes = builder.clone().build().unwrap().metadata.into_builder();
let metadata = builder.clone().build().unwrap().metadata;
assert_eq!(
metadata.versions.keys().cloned().collect::<HashSet<_>>(),
HashSet::from_iter(vec![1, 2, 3, 4])
);
let metadata = builder
.clone()
.set_properties(HashMap::from_iter(vec![(
VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
"2".to_string(),
)]))
.unwrap()
.build()
.unwrap()
.metadata;
assert_eq!(
metadata.versions.keys().cloned().collect::<HashSet<_>>(),
HashSet::from_iter(vec![1, 2, 3, 4])
);
assert_eq!(metadata.version_log.len(), 1);
let metadata = builder_without_changes
.clone()
.set_properties(HashMap::from_iter(vec![(
VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
"2".to_string(),
)]))
.unwrap()
.build()
.unwrap()
.metadata;
assert_eq!(
metadata.versions.keys().cloned().collect::<HashSet<_>>(),
HashSet::from_iter(vec![1, 4])
);
let metadata = builder_without_changes
.set_properties(HashMap::from_iter(vec![(
VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
"0".to_string(),
)]))
.unwrap()
.build()
.unwrap()
.metadata;
assert_eq!(
metadata.versions.keys().cloned().collect::<HashSet<_>>(),
HashSet::from_iter(vec![1])
);
}
#[test]
fn test_update_version_log() {
let v1 = new_view_version(1, 1, "select 1 as count");
let v2 = new_view_version(2, 1, "select count(1) as count from t2");
let v3 = new_view_version(3, 1, "select count from t1");
let one = ViewVersionLog::new(1, v1.timestamp_ms());
let two = ViewVersionLog::new(2, v2.timestamp_ms());
let three = ViewVersionLog::new(3, v3.timestamp_ms());
assert_eq!(
update_version_log(
vec![one.clone(), two.clone(), three.clone()],
HashSet::from_iter(vec![1, 2, 3])
),
vec![one.clone(), two.clone(), three.clone()]
);
assert_eq!(
update_version_log(
vec![
three.clone(),
two.clone(),
one.clone(),
two.clone(),
three.clone()
],
HashSet::from_iter(vec![2, 3])
),
vec![two.clone(), three.clone()]
);
assert_eq!(
update_version_log(
vec![
one.clone(),
two.clone(),
three.clone(),
one.clone(),
three.clone()
],
HashSet::from_iter(vec![1, 3])
),
vec![three.clone(), one.clone(), three.clone()]
);
}
#[test]
fn test_assign_uuid() {
let builder = builder_without_changes();
let uuid = Uuid::now_v7();
let build_result = builder.clone().assign_uuid(uuid).build().unwrap();
assert_eq!(build_result.metadata.view_uuid, uuid);
assert_eq!(build_result.changes, vec![ViewUpdate::AssignUuid { uuid }]);
}
#[test]
fn test_set_location() {
let builder = builder_without_changes();
let location = "s3://bucket/table".to_string();
let build_result = builder
.clone()
.set_location(location.clone())
.build()
.unwrap();
assert_eq!(build_result.metadata.location, location);
assert_eq!(build_result.changes, vec![ViewUpdate::SetLocation {
location
}]);
}
#[test]
fn test_set_and_remove_properties() {
let builder = builder_without_changes();
let properties = HashMap::from_iter(vec![
("key1".to_string(), "value1".to_string()),
("key2".to_string(), "value2".to_string()),
]);
let build_result = builder
.clone()
.set_properties(properties.clone())
.unwrap()
.remove_properties(&["key2".to_string(), "key3".to_string()])
.build()
.unwrap();
assert_eq!(
build_result.metadata.properties.get("key1"),
Some(&"value1".to_string())
);
assert_eq!(build_result.metadata.properties.get("key2"), None);
assert_eq!(build_result.changes, vec![
ViewUpdate::SetProperties {
updates: properties
},
ViewUpdate::RemoveProperties {
removals: vec!["key2".to_string(), "key3".to_string()]
}
]);
}
#[test]
fn test_add_schema() {
let builder = builder_without_changes();
let schema = Schema::builder()
.with_schema_id(1)
.with_fields(vec![])
.build()
.unwrap();
let build_result = builder.clone().add_schema(schema.clone()).build().unwrap();
assert_eq!(build_result.metadata.schemas.len(), 2);
assert_eq!(build_result.changes, vec![ViewUpdate::AddSchema {
schema: schema.clone().with_schema_id(2),
last_column_id: Some(0)
}]);
let build_result = builder.clone().add_schema(schema.clone()).build().unwrap();
assert_eq!(build_result.metadata.schemas.len(), 2);
assert_eq!(build_result.changes, vec![ViewUpdate::AddSchema {
schema: schema.clone().with_schema_id(2),
last_column_id: Some(0)
}]);
}
#[test]
fn test_add_and_set_current_version() {
let builder = builder_without_changes();
let v1 = new_view_version(2, 1, "select 1 as count");
let v2 = new_view_version(3, 2, "select count(1) as count from t2");
let v2_schema = Schema::builder()
.with_schema_id(2)
.with_fields(vec![])
.build()
.unwrap();
let build_result = builder
.clone()
.add_version(v1.clone())
.unwrap()
.add_schema(v2_schema.clone())
.add_version(v2.clone())
.unwrap()
.set_current_version_id(3)
.unwrap()
.build()
.unwrap();
assert_eq!(build_result.metadata.current_version_id, 3);
assert_eq!(build_result.metadata.versions.len(), 3);
assert_eq!(build_result.metadata.schemas.len(), 2);
assert_eq!(build_result.metadata.version_log.len(), 2);
assert_eq!(
Arc::unwrap_or_clone(build_result.metadata.versions[&2].clone()),
v1.clone().with_version_id(2).with_schema_id(1)
);
assert_eq!(
Arc::unwrap_or_clone(build_result.metadata.versions[&3].clone()),
v2.clone().with_version_id(3).with_schema_id(2)
);
assert_eq!(build_result.changes.len(), 4);
assert_eq!(build_result.changes, vec![
ViewUpdate::AddViewVersion {
view_version: v1.clone().with_version_id(2).with_schema_id(1)
},
ViewUpdate::AddSchema {
schema: v2_schema.clone().with_schema_id(2),
last_column_id: Some(0)
},
ViewUpdate::AddViewVersion {
view_version: v2.clone().with_version_id(3).with_schema_id(-1)
},
ViewUpdate::SetCurrentViewVersion {
view_version_id: -1
}
]);
assert_eq!(
build_result
.metadata
.version_log
.iter()
.map(|v| v.version_id())
.collect::<Vec<_>>(),
vec![1, 3]
);
}
#[test]
fn test_schema_and_version_id_reassignment() {
let builder = builder_without_changes();
let v1 = new_view_version(0, 1, "select 1 as count");
let v2 = new_view_version(0, 2, "select count(1) as count from t2");
let v2_schema = Schema::builder()
.with_schema_id(0)
.with_fields(vec![])
.build()
.unwrap();
let build_result = builder
.clone()
.add_version(v1.clone())
.unwrap()
.set_current_version(v2.clone(), v2_schema.clone())
.unwrap()
.build()
.unwrap();
assert_eq!(build_result.metadata.current_version_id, 3);
assert_eq!(build_result.metadata.versions.len(), 3);
assert_eq!(build_result.metadata.schemas.len(), 2);
assert_eq!(build_result.metadata.version_log.len(), 2);
assert_eq!(
Arc::unwrap_or_clone(build_result.metadata.versions[&2].clone()),
v1.clone().with_version_id(2).with_schema_id(1)
);
assert_eq!(
Arc::unwrap_or_clone(build_result.metadata.versions[&3].clone()),
v2.clone().with_version_id(3).with_schema_id(2)
);
assert_eq!(build_result.changes.len(), 4);
assert_eq!(build_result.changes, vec![
ViewUpdate::AddViewVersion {
view_version: v1.clone().with_version_id(2).with_schema_id(1)
},
ViewUpdate::AddSchema {
schema: v2_schema.clone().with_schema_id(2),
last_column_id: Some(0)
},
ViewUpdate::AddViewVersion {
view_version: v2.clone().with_version_id(3).with_schema_id(-1)
},
ViewUpdate::SetCurrentViewVersion {
view_version_id: -1
}
]);
assert_eq!(
build_result
.metadata
.version_log
.iter()
.map(|v| v.version_id())
.collect::<Vec<_>>(),
vec![1, 3]
);
}
#[test]
fn test_view_version_deduplication() {
let builder = builder_without_changes();
let v1 = new_view_version(0, 1, "select * from ns.tbl");
assert_eq!(builder.metadata.versions.len(), 1);
let build_result = builder
.clone()
.add_version(v1.clone())
.unwrap()
.add_version(v1)
.unwrap()
.build()
.unwrap();
assert_eq!(build_result.metadata.versions.len(), 2);
assert_eq!(build_result.metadata.schemas.len(), 1);
}
#[test]
fn test_view_version_and_schema_deduplication() {
let schema_one = Schema::builder()
.with_schema_id(5)
.with_fields(vec![NestedField::required(
1,
"x",
Type::Primitive(PrimitiveType::Long),
)
.into()])
.build()
.unwrap();
let schema_two = Schema::builder()
.with_schema_id(7)
.with_fields(vec![NestedField::required(
1,
"y",
Type::Primitive(PrimitiveType::Long),
)
.into()])
.build()
.unwrap();
let schema_three = Schema::builder()
.with_schema_id(9)
.with_fields(vec![NestedField::required(
1,
"z",
Type::Primitive(PrimitiveType::Long),
)
.into()])
.build()
.unwrap();
let v1 = new_view_version(1, 5, "select * from ns.tbl");
let v2 = new_view_version(1, 7, "select count(*) from ns.tbl");
let v3 = new_view_version(1, 9, "select count(*) as count from ns.tbl");
let build_result = builder_without_changes()
.add_schema(schema_one.clone())
.add_schema(schema_two.clone())
.add_schema(schema_three.clone())
.set_current_version(v1.clone(), schema_one.clone())
.unwrap()
.set_current_version(v2.clone(), schema_two.clone())
.unwrap()
.set_current_version(v3.clone(), schema_three.clone())
.unwrap()
.set_current_version(v3.clone(), schema_three.clone())
.unwrap()
.set_current_version(v2.clone(), schema_two.clone())
.unwrap()
.set_current_version(v1.clone(), schema_one.clone())
.unwrap()
.build()
.unwrap();
assert_eq!(
Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
v1.clone().with_version_id(2).with_schema_id(2)
);
assert_eq!(build_result.metadata.versions.len(), 4);
assert_eq!(
build_result.metadata.versions[&2],
Arc::new(v1.clone().with_version_id(2).with_schema_id(2))
);
assert_eq!(
build_result.metadata.versions[&3],
Arc::new(v2.clone().with_version_id(3).with_schema_id(3))
);
assert_eq!(
build_result.metadata.versions[&4],
Arc::new(v3.clone().with_version_id(4).with_schema_id(4))
);
assert_eq!(
build_result
.metadata
.schemas_iter()
.filter(|s| s.schema_id() != 1)
.sorted_by_key(|s| s.schema_id())
.map(|s| s.as_struct())
.collect::<Vec<_>>(),
vec![
schema_one.as_struct(),
schema_two.as_struct(),
schema_three.as_struct()
]
)
}
#[test]
fn test_error_on_missing_schema() {
let builder = builder_without_changes();
assert!(builder
.clone()
.add_version(new_view_version(0, 10, "SELECT * FROM foo"))
.unwrap_err()
.to_string()
.contains("Cannot add version with unknown schema: 10"));
assert!(builder
.clone()
.add_version(new_view_version(0, -1, "SELECT * FROM foo"))
.unwrap_err()
.to_string()
.contains("Cannot set last added schema: no schema has been added"));
}
#[test]
fn test_error_on_missing_current_version() {
let builder = builder_without_changes();
assert!(builder
.clone()
.set_current_version_id(-1)
.unwrap_err()
.to_string()
.contains(
"Cannot set current version id to last added version: no version has been added."
));
assert!(builder
.clone()
.set_current_version_id(10)
.unwrap_err()
.to_string()
.contains("Cannot set current version to unknown version with id: 10"));
}
#[test]
fn test_error_when_setting_negative_version_history_size() {
let builder = builder_without_changes();
assert!(builder
.clone()
.set_properties(HashMap::from_iter(vec![(
VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
"-1".to_string(),
)]))
.unwrap_err()
.to_string()
.contains("version.history.num-entries must be positive but was -1"));
}
#[test]
fn test_view_version_changes() {
let builder = builder_without_changes();
let v1 = new_view_version(2, 1, "select 1 as count");
let v2 = new_view_version(3, 1, "select count(1) as count from t2");
let changes = builder
.clone()
.add_version(v1.clone())
.unwrap()
.add_version(v2.clone())
.unwrap()
.build()
.unwrap()
.changes;
assert_eq!(changes.len(), 2);
assert_eq!(changes, vec![
ViewUpdate::AddViewVersion {
view_version: v1.clone()
},
ViewUpdate::AddViewVersion {
view_version: v2.clone()
}
]);
}
#[test]
fn test_dropping_dialect_fails_by_default() {
let builder = builder_without_changes();
let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
let spark_trino =
new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark", "trino"]);
let schema = Schema::builder()
.with_schema_id(0)
.with_fields(vec![])
.build()
.unwrap();
let err = builder
.set_current_version(spark_trino, schema.clone())
.unwrap()
.build()
.unwrap()
.metadata
.into_builder()
.set_current_version(spark, schema)
.unwrap()
.build()
.unwrap_err();
assert!(err
.to_string()
.contains("Cannot replace view due to loss of view dialects"));
}
#[test]
fn test_dropping_dialects_does_not_fail_when_allowed() {
let builder = builder_without_changes();
let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
let spark_trino =
new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark", "trino"]);
let schema = Schema::builder()
.with_schema_id(0)
.with_fields(vec![])
.build()
.unwrap();
let build_result = builder
.set_properties(HashMap::from_iter(vec![(
VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED.to_string(),
"true".to_string(),
)]))
.unwrap()
.set_current_version(spark_trino, schema.clone())
.unwrap()
.build()
.unwrap()
.metadata
.into_builder()
.set_current_version(spark.clone(), schema)
.unwrap()
.build()
.unwrap();
assert_eq!(
Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
spark.with_version_id(3).with_schema_id(2)
);
}
#[test]
fn test_can_add_dialects_by_default() {
let builder = builder_without_changes();
let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
let spark_trino =
new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark", "trino"]);
let schema = Schema::builder()
.with_schema_id(0)
.with_fields(vec![])
.build()
.unwrap();
let build_result = builder
.set_current_version(spark.clone(), schema.clone())
.unwrap()
.build()
.unwrap()
.metadata
.into_builder()
.set_current_version(spark_trino.clone(), schema.clone())
.unwrap()
.build()
.unwrap();
assert_eq!(
Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
spark_trino.with_version_id(3).with_schema_id(2)
);
}
#[test]
fn test_can_update_dialect_by_default() {
let builder = builder_without_changes();
let spark_v1 = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
let spark_v2 = new_view_version_with_dialect(0, 0, "SELECT * FROM bar", vec!["spark"]);
let schema = Schema::builder()
.with_schema_id(0)
.with_fields(vec![])
.build()
.unwrap();
let build_result = builder
.set_current_version(spark_v1.clone(), schema.clone())
.unwrap()
.build()
.unwrap()
.metadata
.into_builder()
.set_current_version(spark_v2.clone(), schema.clone())
.unwrap()
.build()
.unwrap();
assert_eq!(
Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
spark_v2.with_version_id(3).with_schema_id(2)
);
}
#[test]
fn test_dropping_dialects_allowed_and_then_disallowed() {
let builder = builder_without_changes();
let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
let trino = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["trino"]);
let schema = Schema::builder()
.with_schema_id(0)
.with_fields(vec![])
.build()
.unwrap();
let updated = builder
.set_current_version(spark.clone(), schema.clone())
.unwrap()
.build()
.unwrap()
.metadata
.into_builder()
.set_current_version(trino.clone(), schema.clone())
.unwrap()
.set_properties(HashMap::from_iter(vec![(
VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED.to_string(),
"true".to_string(),
)]))
.unwrap()
.build()
.unwrap();
assert_eq!(
Arc::unwrap_or_clone(updated.metadata.current_version().clone()),
trino.with_version_id(3).with_schema_id(2)
);
let err = updated
.metadata
.into_builder()
.set_current_version(spark.clone(), schema.clone())
.unwrap()
.set_properties(HashMap::from_iter(vec![(
VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED.to_string(),
"false".to_string(),
)]))
.unwrap()
.build()
.unwrap_err();
assert!(err
.to_string()
.contains("Cannot replace view due to loss of view dialects"));
}
#[test]
fn test_require_no_dialect_dropped() {
let previous = ViewVersion::builder()
.with_version_id(0)
.with_schema_id(0)
.with_timestamp_ms(0)
.with_representations(ViewRepresentations(vec![
ViewRepresentation::Sql(SqlViewRepresentation {
dialect: "trino".to_string(),
sql: "SELECT * FROM foo".to_string(),
}),
ViewRepresentation::Sql(SqlViewRepresentation {
dialect: "spark".to_string(),
sql: "SELECT * FROM bar".to_string(),
}),
]))
.with_default_namespace(NamespaceIdent::new("default".to_string()))
.build();
let current = ViewVersion::builder()
.with_version_id(0)
.with_schema_id(0)
.with_timestamp_ms(0)
.with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(
SqlViewRepresentation {
dialect: "trino".to_string(),
sql: "SELECT * FROM foo".to_string(),
},
)]))
.with_default_namespace(NamespaceIdent::new("default".to_string()))
.build();
assert!(require_no_dialect_dropped(&previous, ¤t).is_err());
let current = ViewVersion::builder()
.with_version_id(0)
.with_schema_id(0)
.with_timestamp_ms(0)
.with_representations(ViewRepresentations(vec![
ViewRepresentation::Sql(SqlViewRepresentation {
dialect: "spark".to_string(),
sql: "SELECT * FROM bar".to_string(),
}),
ViewRepresentation::Sql(SqlViewRepresentation {
dialect: "trino".to_string(),
sql: "SELECT * FROM foo".to_string(),
}),
]))
.with_default_namespace(NamespaceIdent::new("default".to_string()))
.build();
assert!(require_no_dialect_dropped(&previous, ¤t).is_ok());
}
#[test]
fn test_allow_replace_drop_dialects() {
use std::collections::HashMap;
use super::allow_replace_drop_dialects;
let mut properties = HashMap::new();
assert!(!allow_replace_drop_dialects(&properties));
properties.insert(
"replace.drop-dialect.allowed".to_string(),
"true".to_string(),
);
assert!(allow_replace_drop_dialects(&properties));
properties.insert(
"replace.drop-dialect.allowed".to_string(),
"false".to_string(),
);
assert!(!allow_replace_drop_dialects(&properties));
properties.insert(
"replace.drop-dialect.allowed".to_string(),
"TRUE".to_string(),
);
assert!(allow_replace_drop_dialects(&properties));
properties.insert(
"replace.drop-dialect.allowed".to_string(),
"FALSE".to_string(),
);
assert!(!allow_replace_drop_dialects(&properties));
}
#[test]
fn test_lowercase_sql_dialects_for() {
let view_version = ViewVersion::builder()
.with_version_id(0)
.with_schema_id(0)
.with_timestamp_ms(0)
.with_representations(ViewRepresentations(vec![
ViewRepresentation::Sql(SqlViewRepresentation {
dialect: "STARROCKS".to_string(),
sql: "SELECT * FROM foo".to_string(),
}),
ViewRepresentation::Sql(SqlViewRepresentation {
dialect: "trino".to_string(),
sql: "SELECT * FROM bar".to_string(),
}),
ViewRepresentation::Sql(SqlViewRepresentation {
dialect: "Spark".to_string(),
sql: "SELECT * FROM bar".to_string(),
}),
]))
.with_default_namespace(NamespaceIdent::new("default".to_string()))
.build();
let dialects = lowercase_sql_dialects_for(&view_version);
assert_eq!(dialects.len(), 3);
assert!(dialects.contains("trino"));
assert!(dialects.contains("spark"));
assert!(dialects.contains("starrocks"));
}
#[test]
fn test_require_unique_dialects() {
let view_version = ViewVersion::builder()
.with_version_id(0)
.with_schema_id(0)
.with_timestamp_ms(0)
.with_representations(ViewRepresentations(vec![
ViewRepresentation::Sql(SqlViewRepresentation {
dialect: "trino".to_string(),
sql: "SELECT * FROM foo".to_string(),
}),
ViewRepresentation::Sql(SqlViewRepresentation {
dialect: "trino".to_string(),
sql: "SELECT * FROM bar".to_string(),
}),
]))
.with_default_namespace(NamespaceIdent::new("default".to_string()))
.build();
assert!(require_unique_dialects(&view_version).is_err());
let view_version = ViewVersion::builder()
.with_version_id(0)
.with_schema_id(0)
.with_timestamp_ms(0)
.with_representations(ViewRepresentations(vec![
ViewRepresentation::Sql(SqlViewRepresentation {
dialect: "trino".to_string(),
sql: "SELECT * FROM foo".to_string(),
}),
ViewRepresentation::Sql(SqlViewRepresentation {
dialect: "spark".to_string(),
sql: "SELECT * FROM bar".to_string(),
}),
]))
.with_default_namespace(NamespaceIdent::new("default".to_string()))
.build();
assert!(require_unique_dialects(&view_version).is_ok());
}
}