1use std::cmp::Ordering;
22use std::collections::HashMap;
23use std::fmt::{Display, Formatter};
24use std::sync::Arc;
25
26use _serde::ViewMetadataEnum;
27use chrono::{DateTime, Utc};
28use serde::{Deserialize, Serialize};
29use serde_repr::{Deserialize_repr, Serialize_repr};
30use uuid::Uuid;
31
32pub use super::view_metadata_builder::ViewMetadataBuilder;
33use super::view_version::{ViewVersionId, ViewVersionRef};
34use super::{SchemaId, SchemaRef};
35use crate::error::{Result, timestamp_ms_to_utc};
36use crate::{Error, ErrorKind};
37
38pub type ViewMetadataRef = Arc<ViewMetadata>;
40
41pub(crate) static INITIAL_VIEW_VERSION_ID: i32 = 1;
43
44pub const VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED: &str = "replace.drop-dialect.allowed";
46pub const VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED_DEFAULT: bool = false;
48pub const VIEW_PROPERTY_VERSION_HISTORY_SIZE: &str = "version.history.num-entries";
50pub const VIEW_PROPERTY_VERSION_HISTORY_SIZE_DEFAULT: usize = 10;
52
53#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
54#[serde(try_from = "ViewMetadataEnum", into = "ViewMetadataEnum")]
55pub struct ViewMetadata {
60 pub(crate) format_version: ViewFormatVersion,
62 pub(crate) view_uuid: Uuid,
64 pub(crate) location: String,
66 pub(crate) current_version_id: ViewVersionId,
68 pub(crate) versions: HashMap<ViewVersionId, ViewVersionRef>,
70 pub(crate) version_log: Vec<ViewVersionLog>,
73 pub(crate) schemas: HashMap<SchemaId, SchemaRef>,
75 pub(crate) properties: HashMap<String, String>,
79}
80
81impl ViewMetadata {
82 #[must_use]
84 pub fn into_builder(self) -> ViewMetadataBuilder {
85 ViewMetadataBuilder::new_from_metadata(self)
86 }
87
88 #[inline]
90 pub fn format_version(&self) -> ViewFormatVersion {
91 self.format_version
92 }
93
94 #[inline]
96 pub fn uuid(&self) -> Uuid {
97 self.view_uuid
98 }
99
100 #[inline]
102 pub fn location(&self) -> &str {
103 self.location.as_str()
104 }
105
106 #[inline]
108 pub fn current_version_id(&self) -> ViewVersionId {
109 self.current_version_id
110 }
111
112 #[inline]
114 pub fn versions(&self) -> impl ExactSizeIterator<Item = &ViewVersionRef> {
115 self.versions.values()
116 }
117
118 #[inline]
120 pub fn version_by_id(&self, version_id: ViewVersionId) -> Option<&ViewVersionRef> {
121 self.versions.get(&version_id)
122 }
123
124 #[inline]
126 pub fn current_version(&self) -> &ViewVersionRef {
127 self.versions
128 .get(&self.current_version_id)
129 .expect("Current version id set, but not found in view versions")
130 }
131
132 #[inline]
134 pub fn schemas_iter(&self) -> impl ExactSizeIterator<Item = &SchemaRef> {
135 self.schemas.values()
136 }
137
138 #[inline]
140 pub fn schema_by_id(&self, schema_id: SchemaId) -> Option<&SchemaRef> {
141 self.schemas.get(&schema_id)
142 }
143
144 #[inline]
146 pub fn current_schema(&self) -> &SchemaRef {
147 let schema_id = self.current_version().schema_id();
148 self.schema_by_id(schema_id)
149 .expect("Current schema id set, but not found in view metadata")
150 }
151
152 #[inline]
154 pub fn properties(&self) -> &HashMap<String, String> {
155 &self.properties
156 }
157
158 #[inline]
160 pub fn history(&self) -> &[ViewVersionLog] {
161 &self.version_log
162 }
163
164 pub(super) fn validate(&self) -> Result<()> {
166 self.validate_current_version_id()?;
167 self.validate_current_schema_id()?;
168 Ok(())
169 }
170
171 fn validate_current_version_id(&self) -> Result<()> {
172 if !self.versions.contains_key(&self.current_version_id) {
173 return Err(Error::new(
174 ErrorKind::DataInvalid,
175 format!(
176 "No version exists with the current version id {}.",
177 self.current_version_id
178 ),
179 ));
180 }
181 Ok(())
182 }
183
184 fn validate_current_schema_id(&self) -> Result<()> {
185 let schema_id = self.current_version().schema_id();
186 if !self.schemas.contains_key(&schema_id) {
187 return Err(Error::new(
188 ErrorKind::DataInvalid,
189 format!("No schema exists with the schema id {schema_id}."),
190 ));
191 }
192 Ok(())
193 }
194}
195
196#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
197#[serde(rename_all = "kebab-case")]
198pub struct ViewVersionLog {
200 version_id: ViewVersionId,
202 timestamp_ms: i64,
204}
205
206impl ViewVersionLog {
207 #[inline]
208 pub fn new(version_id: ViewVersionId, timestamp: i64) -> Self {
210 Self {
211 version_id,
212 timestamp_ms: timestamp,
213 }
214 }
215
216 #[inline]
218 pub fn version_id(&self) -> ViewVersionId {
219 self.version_id
220 }
221
222 #[inline]
224 pub fn timestamp_ms(&self) -> i64 {
225 self.timestamp_ms
226 }
227
228 pub fn timestamp(&self) -> Result<DateTime<Utc>> {
230 timestamp_ms_to_utc(self.timestamp_ms)
231 }
232
233 pub(crate) fn set_timestamp_ms(&mut self, timestamp_ms: i64) -> &mut Self {
235 self.timestamp_ms = timestamp_ms;
236 self
237 }
238}
239
240pub(super) mod _serde {
241 use std::{collections::HashMap, sync::Arc};
246
247 use serde::{Deserialize, Serialize};
248 use uuid::Uuid;
249
250 use super::{ViewFormatVersion, ViewVersionId, ViewVersionLog};
251 use crate::Error;
252 use crate::spec::schema::_serde::SchemaV2;
253 use crate::spec::table_metadata::_serde::VersionNumber;
254 use crate::spec::view_version::_serde::ViewVersionV1;
255 use crate::spec::{ViewMetadata, ViewVersion};
256
257 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
258 #[serde(untagged)]
259 pub(super) enum ViewMetadataEnum {
260 V1(ViewMetadataV1),
261 }
262
263 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
264 #[serde(rename_all = "kebab-case")]
265 pub(super) struct ViewMetadataV1 {
267 pub format_version: VersionNumber<1>,
268 pub(super) view_uuid: Uuid,
269 pub(super) location: String,
270 pub(super) current_version_id: ViewVersionId,
271 pub(super) versions: Vec<ViewVersionV1>,
272 pub(super) version_log: Vec<ViewVersionLog>,
273 pub(super) schemas: Vec<SchemaV2>,
274 pub(super) properties: Option<std::collections::HashMap<String, String>>,
275 }
276
277 impl Serialize for ViewMetadata {
278 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
279 where S: serde::Serializer {
280 let metadata_enum: ViewMetadataEnum =
282 self.clone().try_into().map_err(serde::ser::Error::custom)?;
283
284 metadata_enum.serialize(serializer)
285 }
286 }
287
288 impl TryFrom<ViewMetadataEnum> for ViewMetadata {
289 type Error = Error;
290 fn try_from(value: ViewMetadataEnum) -> Result<Self, Error> {
291 match value {
292 ViewMetadataEnum::V1(value) => value.try_into(),
293 }
294 }
295 }
296
297 impl TryFrom<ViewMetadata> for ViewMetadataEnum {
298 type Error = Error;
299 fn try_from(value: ViewMetadata) -> Result<Self, Error> {
300 Ok(match value.format_version {
301 ViewFormatVersion::V1 => ViewMetadataEnum::V1(value.into()),
302 })
303 }
304 }
305
306 impl TryFrom<ViewMetadataV1> for ViewMetadata {
307 type Error = Error;
308 fn try_from(value: ViewMetadataV1) -> Result<Self, self::Error> {
309 let schemas = HashMap::from_iter(
310 value
311 .schemas
312 .into_iter()
313 .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
314 .collect::<Result<Vec<_>, Error>>()?,
315 );
316 let versions = HashMap::from_iter(
317 value
318 .versions
319 .into_iter()
320 .map(|x| Ok((x.version_id, Arc::new(ViewVersion::from(x)))))
321 .collect::<Result<Vec<_>, Error>>()?,
322 );
323
324 let view_metadata = ViewMetadata {
325 format_version: ViewFormatVersion::V1,
326 view_uuid: value.view_uuid,
327 location: value.location,
328 schemas,
329 properties: value.properties.unwrap_or_default(),
330 current_version_id: value.current_version_id,
331 versions,
332 version_log: value.version_log,
333 };
334 view_metadata.validate()?;
335 Ok(view_metadata)
336 }
337 }
338
339 impl From<ViewMetadata> for ViewMetadataV1 {
340 fn from(v: ViewMetadata) -> Self {
341 let schemas = v
342 .schemas
343 .into_values()
344 .map(|x| {
345 Arc::try_unwrap(x)
346 .unwrap_or_else(|schema| schema.as_ref().clone())
347 .into()
348 })
349 .collect();
350 let versions = v
351 .versions
352 .into_values()
353 .map(|x| {
354 Arc::try_unwrap(x)
355 .unwrap_or_else(|version| version.as_ref().clone())
356 .into()
357 })
358 .collect();
359 ViewMetadataV1 {
360 format_version: VersionNumber::<1>,
361 view_uuid: v.view_uuid,
362 location: v.location,
363 schemas,
364 properties: Some(v.properties),
365 current_version_id: v.current_version_id,
366 versions,
367 version_log: v.version_log,
368 }
369 }
370 }
371}
372
373#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy)]
374#[repr(u8)]
375pub enum ViewFormatVersion {
377 V1 = 1u8,
379}
380
381impl PartialOrd for ViewFormatVersion {
382 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
383 Some(self.cmp(other))
384 }
385}
386
387impl Ord for ViewFormatVersion {
388 fn cmp(&self, other: &Self) -> Ordering {
389 (*self as u8).cmp(&(*other as u8))
390 }
391}
392
393impl Display for ViewFormatVersion {
394 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
395 match self {
396 ViewFormatVersion::V1 => write!(f, "v1"),
397 }
398 }
399}
400
401#[cfg(test)]
402pub(crate) mod tests {
403 use std::collections::HashMap;
404 use std::fs;
405 use std::sync::Arc;
406
407 use anyhow::Result;
408 use pretty_assertions::assert_eq;
409 use uuid::Uuid;
410
411 use super::{ViewFormatVersion, ViewMetadataBuilder, ViewVersionLog};
412 use crate::spec::{
413 INITIAL_VIEW_VERSION_ID, NestedField, PrimitiveType, Schema, SqlViewRepresentation, Type,
414 ViewMetadata, ViewRepresentations, ViewVersion,
415 };
416 use crate::{NamespaceIdent, ViewCreation};
417
418 fn check_view_metadata_serde(json: &str, expected_type: ViewMetadata) {
419 let desered_type: ViewMetadata = serde_json::from_str(json).unwrap();
420 assert_eq!(desered_type, expected_type);
421
422 let sered_json = serde_json::to_string(&expected_type).unwrap();
423 let parsed_json_value = serde_json::from_str::<ViewMetadata>(&sered_json).unwrap();
424
425 assert_eq!(parsed_json_value, desered_type);
426 }
427
428 pub(crate) fn get_test_view_metadata(file_name: &str) -> ViewMetadata {
429 let path = format!("testdata/view_metadata/{file_name}");
430 let metadata: String = fs::read_to_string(path).unwrap();
431
432 serde_json::from_str(&metadata).unwrap()
433 }
434
435 #[test]
436 fn test_view_data_v1() {
437 let data = r#"
438 {
439 "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
440 "format-version" : 1,
441 "location" : "s3://bucket/warehouse/default.db/event_agg",
442 "current-version-id" : 1,
443 "properties" : {
444 "comment" : "Daily event counts"
445 },
446 "versions" : [ {
447 "version-id" : 1,
448 "timestamp-ms" : 1573518431292,
449 "schema-id" : 1,
450 "default-catalog" : "prod",
451 "default-namespace" : [ "default" ],
452 "summary" : {
453 "engine-name" : "Spark",
454 "engineVersion" : "3.3.2"
455 },
456 "representations" : [ {
457 "type" : "sql",
458 "sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
459 "dialect" : "spark"
460 } ]
461 } ],
462 "schemas": [ {
463 "schema-id": 1,
464 "type" : "struct",
465 "fields" : [ {
466 "id" : 1,
467 "name" : "event_count",
468 "required" : false,
469 "type" : "int",
470 "doc" : "Count of events"
471 } ]
472 } ],
473 "version-log" : [ {
474 "timestamp-ms" : 1573518431292,
475 "version-id" : 1
476 } ]
477 }
478 "#;
479
480 let schema = Schema::builder()
481 .with_schema_id(1)
482 .with_fields(vec![Arc::new(
483 NestedField::optional(1, "event_count", Type::Primitive(PrimitiveType::Int))
484 .with_doc("Count of events"),
485 )])
486 .build()
487 .unwrap();
488 let version = ViewVersion::builder()
489 .with_version_id(1)
490 .with_timestamp_ms(1573518431292)
491 .with_schema_id(1)
492 .with_default_catalog("prod".to_string().into())
493 .with_default_namespace(NamespaceIdent::from_vec(vec!["default".to_string()]).unwrap())
494 .with_summary(HashMap::from_iter(vec![
495 ("engineVersion".to_string(), "3.3.2".to_string()),
496 ("engine-name".to_string(), "Spark".to_string()),
497 ]))
498 .with_representations(ViewRepresentations(vec![
499 SqlViewRepresentation {
500 sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2"
501 .to_string(),
502 dialect: "spark".to_string(),
503 }
504 .into(),
505 ]))
506 .build();
507
508 let expected = ViewMetadata {
509 format_version: ViewFormatVersion::V1,
510 view_uuid: Uuid::parse_str("fa6506c3-7681-40c8-86dc-e36561f83385").unwrap(),
511 location: "s3://bucket/warehouse/default.db/event_agg".to_string(),
512 current_version_id: 1,
513 versions: HashMap::from_iter(vec![(1, Arc::new(version))]),
514 version_log: vec![ViewVersionLog {
515 timestamp_ms: 1573518431292,
516 version_id: 1,
517 }],
518 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
519 properties: HashMap::from_iter(vec![(
520 "comment".to_string(),
521 "Daily event counts".to_string(),
522 )]),
523 };
524
525 check_view_metadata_serde(data, expected);
526 }
527
528 #[test]
529 fn test_invalid_view_uuid() -> Result<()> {
530 let data = r#"
531 {
532 "format-version" : 1,
533 "view-uuid": "xxxx"
534 }
535 "#;
536 assert!(serde_json::from_str::<ViewMetadata>(data).is_err());
537 Ok(())
538 }
539
540 #[test]
541 fn test_view_builder_from_view_creation() {
542 let representations = ViewRepresentations(vec![
543 SqlViewRepresentation {
544 sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2"
545 .to_string(),
546 dialect: "spark".to_string(),
547 }
548 .into(),
549 ]);
550 let creation = ViewCreation::builder()
551 .location("s3://bucket/warehouse/default.db/event_agg".to_string())
552 .name("view".to_string())
553 .schema(Schema::builder().build().unwrap())
554 .default_namespace(NamespaceIdent::from_vec(vec!["default".to_string()]).unwrap())
555 .representations(representations)
556 .build();
557
558 let metadata = ViewMetadataBuilder::from_view_creation(creation)
559 .unwrap()
560 .build()
561 .unwrap()
562 .metadata;
563
564 assert_eq!(
565 metadata.location(),
566 "s3://bucket/warehouse/default.db/event_agg"
567 );
568 assert_eq!(metadata.current_version_id(), INITIAL_VIEW_VERSION_ID);
569 assert_eq!(metadata.versions().count(), 1);
570 assert_eq!(metadata.schemas_iter().count(), 1);
571 assert_eq!(metadata.properties().len(), 0);
572 }
573
574 #[test]
575 fn test_view_metadata_v1_file_valid() {
576 let metadata =
577 fs::read_to_string("testdata/view_metadata/ViewMetadataV1Valid.json").unwrap();
578
579 let schema = Schema::builder()
580 .with_schema_id(1)
581 .with_fields(vec![
582 Arc::new(
583 NestedField::optional(1, "event_count", Type::Primitive(PrimitiveType::Int))
584 .with_doc("Count of events"),
585 ),
586 Arc::new(NestedField::optional(
587 2,
588 "event_date",
589 Type::Primitive(PrimitiveType::Date),
590 )),
591 ])
592 .build()
593 .unwrap();
594
595 let version = ViewVersion::builder()
596 .with_version_id(1)
597 .with_timestamp_ms(1573518431292)
598 .with_schema_id(1)
599 .with_default_catalog("prod".to_string().into())
600 .with_default_namespace(NamespaceIdent::from_vec(vec!["default".to_string()]).unwrap())
601 .with_summary(HashMap::from_iter(vec![
602 ("engineVersion".to_string(), "3.3.2".to_string()),
603 ("engine-name".to_string(), "Spark".to_string()),
604 ]))
605 .with_representations(ViewRepresentations(vec![
606 SqlViewRepresentation {
607 sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2"
608 .to_string(),
609 dialect: "spark".to_string(),
610 }
611 .into(),
612 ]))
613 .build();
614
615 let expected = ViewMetadata {
616 format_version: ViewFormatVersion::V1,
617 view_uuid: Uuid::parse_str("fa6506c3-7681-40c8-86dc-e36561f83385").unwrap(),
618 location: "s3://bucket/warehouse/default.db/event_agg".to_string(),
619 current_version_id: 1,
620 versions: HashMap::from_iter(vec![(1, Arc::new(version))]),
621 version_log: vec![ViewVersionLog {
622 timestamp_ms: 1573518431292,
623 version_id: 1,
624 }],
625 schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
626 properties: HashMap::from_iter(vec![(
627 "comment".to_string(),
628 "Daily event counts".to_string(),
629 )]),
630 };
631
632 check_view_metadata_serde(&metadata, expected);
633 }
634
635 #[test]
636 fn test_view_builder_assign_uuid() {
637 let metadata = get_test_view_metadata("ViewMetadataV1Valid.json");
638 let metadata_builder = metadata.into_builder();
639 let uuid = Uuid::new_v4();
640 let metadata = metadata_builder.assign_uuid(uuid).build().unwrap().metadata;
641 assert_eq!(metadata.uuid(), uuid);
642 }
643
644 #[test]
645 fn test_view_metadata_v1_unsupported_version() {
646 let metadata =
647 fs::read_to_string("testdata/view_metadata/ViewMetadataUnsupportedVersion.json")
648 .unwrap();
649
650 let desered: Result<ViewMetadata, serde_json::Error> = serde_json::from_str(&metadata);
651
652 assert_eq!(
653 desered.unwrap_err().to_string(),
654 "data did not match any variant of untagged enum ViewMetadataEnum"
655 )
656 }
657
658 #[test]
659 fn test_view_metadata_v1_version_not_found() {
660 let metadata =
661 fs::read_to_string("testdata/view_metadata/ViewMetadataV1CurrentVersionNotFound.json")
662 .unwrap();
663
664 let desered: Result<ViewMetadata, serde_json::Error> = serde_json::from_str(&metadata);
665
666 assert_eq!(
667 desered.unwrap_err().to_string(),
668 "DataInvalid => No version exists with the current version id 2."
669 )
670 }
671
672 #[test]
673 fn test_view_metadata_v1_schema_not_found() {
674 let metadata =
675 fs::read_to_string("testdata/view_metadata/ViewMetadataV1SchemaNotFound.json").unwrap();
676
677 let desered: Result<ViewMetadata, serde_json::Error> = serde_json::from_str(&metadata);
678
679 assert_eq!(
680 desered.unwrap_err().to_string(),
681 "DataInvalid => No schema exists with the schema id 2."
682 )
683 }
684
685 #[test]
686 fn test_view_metadata_v1_missing_schema_for_version() {
687 let metadata =
688 fs::read_to_string("testdata/view_metadata/ViewMetadataV1MissingSchema.json").unwrap();
689
690 let desered: Result<ViewMetadata, serde_json::Error> = serde_json::from_str(&metadata);
691
692 assert_eq!(
693 desered.unwrap_err().to_string(),
694 "data did not match any variant of untagged enum ViewMetadataEnum"
695 )
696 }
697
698 #[test]
699 fn test_view_metadata_v1_missing_current_version() {
700 let metadata =
701 fs::read_to_string("testdata/view_metadata/ViewMetadataV1MissingCurrentVersion.json")
702 .unwrap();
703
704 let desered: Result<ViewMetadata, serde_json::Error> = serde_json::from_str(&metadata);
705
706 assert_eq!(
707 desered.unwrap_err().to_string(),
708 "data did not match any variant of untagged enum ViewMetadataEnum"
709 )
710 }
711}