1use std::collections::HashMap;
22use std::sync::Arc;
23
24use _serde::ViewVersionV1;
25use chrono::{DateTime, Utc};
26use serde::{Deserialize, Serialize};
27use typed_builder::TypedBuilder;
28
29use super::INITIAL_VIEW_VERSION_ID;
30use super::view_metadata::ViewVersionLog;
31use crate::catalog::NamespaceIdent;
32use crate::error::{Result, timestamp_ms_to_utc};
33use crate::spec::{SchemaId, SchemaRef, ViewMetadata};
34use crate::{Error, ErrorKind};
35
36pub type ViewVersionRef = Arc<ViewVersion>;
38
39pub type ViewVersionId = i32;
41
42#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, TypedBuilder)]
43#[serde(from = "ViewVersionV1", into = "ViewVersionV1")]
44#[builder(field_defaults(setter(prefix = "with_")))]
45pub struct ViewVersion {
47 #[builder(default = INITIAL_VIEW_VERSION_ID)]
49 version_id: ViewVersionId,
50 schema_id: SchemaId,
52 timestamp_ms: i64,
54 #[builder(default = HashMap::new())]
56 summary: HashMap<String, String>,
57 representations: ViewRepresentations,
59 #[builder(default = None)]
61 default_catalog: Option<String>,
62 default_namespace: NamespaceIdent,
64}
65
66impl ViewVersion {
67 #[inline]
69 pub fn version_id(&self) -> ViewVersionId {
70 self.version_id
71 }
72
73 #[inline]
75 pub fn schema_id(&self) -> SchemaId {
76 self.schema_id
77 }
78
79 #[inline]
81 pub fn timestamp(&self) -> Result<DateTime<Utc>> {
82 timestamp_ms_to_utc(self.timestamp_ms)
83 }
84
85 #[inline]
87 pub fn timestamp_ms(&self) -> i64 {
88 self.timestamp_ms
89 }
90
91 #[inline]
93 pub fn summary(&self) -> &HashMap<String, String> {
94 &self.summary
95 }
96
97 #[inline]
99 pub fn representations(&self) -> &ViewRepresentations {
100 &self.representations
101 }
102
103 #[inline]
105 pub fn default_catalog(&self) -> Option<&String> {
106 self.default_catalog.as_ref()
107 }
108
109 #[inline]
111 pub fn default_namespace(&self) -> &NamespaceIdent {
112 &self.default_namespace
113 }
114
115 pub fn schema(&self, view_metadata: &ViewMetadata) -> Result<SchemaRef> {
117 view_metadata
118 .schema_by_id(self.schema_id())
119 .ok_or_else(|| {
120 Error::new(
121 ErrorKind::DataInvalid,
122 format!("Schema with id {} not found", self.schema_id()),
123 )
124 })
125 .cloned()
126 }
127
128 pub(crate) fn log(&self) -> ViewVersionLog {
130 ViewVersionLog::new(self.version_id, self.timestamp_ms)
131 }
132
133 pub(crate) fn behaves_identical_to(&self, other: &Self) -> bool {
143 self.summary == other.summary
144 && self.representations == other.representations
145 && self.default_catalog == other.default_catalog
146 && self.default_namespace == other.default_namespace
147 && self.schema_id == other.schema_id
148 }
149
150 pub fn with_version_id(self, version_id: i32) -> Self {
152 Self { version_id, ..self }
153 }
154
155 pub fn with_schema_id(self, schema_id: SchemaId) -> Self {
157 Self { schema_id, ..self }
158 }
159}
160
161#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
163pub struct ViewRepresentations(pub(crate) Vec<ViewRepresentation>);
164
165impl ViewRepresentations {
166 #[inline]
167 pub fn len(&self) -> usize {
169 self.0.len()
170 }
171
172 #[inline]
173 pub fn is_empty(&self) -> bool {
175 self.0.is_empty()
176 }
177
178 pub fn iter(&self) -> impl ExactSizeIterator<Item = &'_ ViewRepresentation> {
180 self.0.iter()
181 }
182}
183
184impl IntoIterator for ViewRepresentations {
186 type Item = ViewRepresentation;
187 type IntoIter = std::vec::IntoIter<Self::Item>;
188
189 fn into_iter(self) -> Self::IntoIter {
190 self.0.into_iter()
191 }
192}
193
194#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
195#[serde(tag = "type")]
196pub enum ViewRepresentation {
200 #[serde(rename = "sql")]
201 Sql(SqlViewRepresentation),
203}
204
205#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
206#[serde(rename_all = "kebab-case")]
207pub struct SqlViewRepresentation {
210 #[serde(rename = "sql")]
211 pub sql: String,
213 #[serde(rename = "dialect")]
214 pub dialect: String,
216}
217
218pub(super) mod _serde {
219 use serde::{Deserialize, Serialize};
224
225 use super::{ViewRepresentation, ViewRepresentations, ViewVersion};
226 use crate::catalog::NamespaceIdent;
227
228 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
229 #[serde(rename_all = "kebab-case")]
230 pub(crate) struct ViewVersionV1 {
232 pub version_id: i32,
233 pub schema_id: i32,
234 pub timestamp_ms: i64,
235 pub summary: std::collections::HashMap<String, String>,
236 pub representations: Vec<ViewRepresentation>,
237 #[serde(skip_serializing_if = "Option::is_none")]
238 pub default_catalog: Option<String>,
239 pub default_namespace: NamespaceIdent,
240 }
241
242 impl From<ViewVersionV1> for ViewVersion {
243 fn from(v1: ViewVersionV1) -> Self {
244 ViewVersion {
245 version_id: v1.version_id,
246 schema_id: v1.schema_id,
247 timestamp_ms: v1.timestamp_ms,
248 summary: v1.summary,
249 representations: ViewRepresentations(v1.representations),
250 default_catalog: v1.default_catalog,
251 default_namespace: v1.default_namespace,
252 }
253 }
254 }
255
256 impl From<ViewVersion> for ViewVersionV1 {
257 fn from(v1: ViewVersion) -> Self {
258 ViewVersionV1 {
259 version_id: v1.version_id,
260 schema_id: v1.schema_id,
261 timestamp_ms: v1.timestamp_ms,
262 summary: v1.summary,
263 representations: v1.representations.0,
264 default_catalog: v1.default_catalog,
265 default_namespace: v1.default_namespace,
266 }
267 }
268 }
269}
270
271impl From<SqlViewRepresentation> for ViewRepresentation {
272 fn from(sql: SqlViewRepresentation) -> Self {
273 ViewRepresentation::Sql(sql)
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use chrono::{TimeZone, Utc};
280
281 use crate::NamespaceIdent;
282 use crate::spec::ViewRepresentations;
283 use crate::spec::view_version::_serde::ViewVersionV1;
284 use crate::spec::view_version::ViewVersion;
285
286 #[test]
287 fn view_version() {
288 let record = serde_json::json!(
289 {
290 "version-id" : 1,
291 "timestamp-ms" : 1573518431292i64,
292 "schema-id" : 1,
293 "default-catalog" : "prod",
294 "default-namespace" : [ "default" ],
295 "summary" : {
296 "engine-name" : "Spark",
297 "engineVersion" : "3.3.2"
298 },
299 "representations" : [ {
300 "type" : "sql",
301 "sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
302 "dialect" : "spark"
303 } ]
304 }
305 );
306
307 let result: ViewVersion = serde_json::from_value::<ViewVersionV1>(record.clone())
308 .unwrap()
309 .into();
310
311 assert_eq!(serde_json::to_value(result.clone()).unwrap(), record);
313
314 assert_eq!(result.version_id(), 1);
315 assert_eq!(
316 result.timestamp().unwrap(),
317 Utc.timestamp_millis_opt(1573518431292).unwrap()
318 );
319 assert_eq!(result.schema_id(), 1);
320 assert_eq!(result.default_catalog, Some("prod".to_string()));
321 assert_eq!(result.summary(), &{
322 let mut map = std::collections::HashMap::new();
323 map.insert("engine-name".to_string(), "Spark".to_string());
324 map.insert("engineVersion".to_string(), "3.3.2".to_string());
325 map
326 });
327 assert_eq!(
328 result.representations().to_owned(),
329 ViewRepresentations(vec![super::ViewRepresentation::Sql(
330 super::SqlViewRepresentation {
331 sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2"
332 .to_string(),
333 dialect: "spark".to_string(),
334 },
335 )])
336 );
337 assert_eq!(result.default_namespace.inner(), vec![
338 "default".to_string()
339 ]);
340 }
341
342 #[test]
343 fn test_behaves_identical_to() {
344 let view_version = ViewVersion::builder()
345 .with_version_id(1)
346 .with_schema_id(1)
347 .with_timestamp_ms(1573518431292)
348 .with_summary({
349 let mut map = std::collections::HashMap::new();
350 map.insert("engine-name".to_string(), "Spark".to_string());
351 map.insert("engineVersion".to_string(), "3.3.2".to_string());
352 map
353 })
354 .with_representations(ViewRepresentations(vec![super::ViewRepresentation::Sql(
355 super::SqlViewRepresentation {
356 sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2"
357 .to_string(),
358 dialect: "spark".to_string(),
359 },
360 )]))
361 .with_default_catalog(Some("prod".to_string()))
362 .with_default_namespace(NamespaceIdent::new("default".to_string()))
363 .build();
364
365 let mut identical_view_version = view_version.clone();
366 identical_view_version.version_id = 2;
367 identical_view_version.timestamp_ms = 1573518431293;
368
369 let different_view_version = ViewVersion::builder()
370 .with_version_id(view_version.version_id())
371 .with_schema_id(view_version.schema_id())
372 .with_timestamp_ms(view_version.timestamp_ms())
373 .with_summary(view_version.summary().clone())
374 .with_representations(ViewRepresentations(vec![super::ViewRepresentation::Sql(
375 super::SqlViewRepresentation {
376 sql: "SELECT * from events".to_string(),
377 dialect: "spark".to_string(),
378 },
379 )]))
380 .with_default_catalog(view_version.default_catalog().cloned())
381 .with_default_namespace(view_version.default_namespace().clone())
382 .build();
383
384 assert!(view_version.behaves_identical_to(&identical_view_version));
385 assert!(!view_version.behaves_identical_to(&different_view_version));
386 }
387}