1use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use chrono::Utc;
22use itertools::Itertools;
23use uuid::Uuid;
24
25use super::{
26 DEFAULT_SCHEMA_ID, INITIAL_VIEW_VERSION_ID, ONE_MINUTE_MS, Schema, SchemaId,
27 TableMetadataBuilder, VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED,
28 VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED_DEFAULT, VIEW_PROPERTY_VERSION_HISTORY_SIZE,
29 VIEW_PROPERTY_VERSION_HISTORY_SIZE_DEFAULT, ViewFormatVersion, ViewMetadata,
30 ViewRepresentation, ViewVersion, ViewVersionLog, ViewVersionRef,
31};
32use crate::ViewCreation;
33use crate::catalog::ViewUpdate;
34use crate::error::{Error, ErrorKind, Result};
35use crate::io::is_truthy;
36
37#[derive(Debug, Clone)]
45pub struct ViewMetadataBuilder {
46 metadata: ViewMetadata,
47 changes: Vec<ViewUpdate>,
48 last_added_schema_id: Option<SchemaId>,
49 last_added_version_id: Option<SchemaId>,
50 history_entry: Option<ViewVersionLog>,
51 previous_view_version: Option<ViewVersionRef>,
54}
55
56#[derive(Debug, Clone, PartialEq)]
57pub struct ViewMetadataBuildResult {
59 pub metadata: ViewMetadata,
61 pub changes: Vec<ViewUpdate>,
63}
64
65impl ViewMetadataBuilder {
66 const LAST_ADDED: i32 = TableMetadataBuilder::LAST_ADDED;
67
68 pub fn new(
70 location: String,
71 schema: Schema,
72 view_version: ViewVersion,
73 format_version: ViewFormatVersion,
74 properties: HashMap<String, String>,
75 ) -> Result<Self> {
76 let builder = Self {
77 metadata: ViewMetadata {
78 format_version,
79 view_uuid: Uuid::now_v7(),
80 location: "".to_string(), current_version_id: -1, versions: HashMap::new(), version_log: Vec::new(),
84 schemas: HashMap::new(), properties: HashMap::new(), },
87 changes: vec![],
88 last_added_schema_id: None, last_added_version_id: None, history_entry: None,
91 previous_view_version: None, };
93
94 builder
95 .set_location(location)
96 .set_current_version(view_version, schema)?
97 .set_properties(properties)
98 }
99
100 #[must_use]
102 pub fn new_from_metadata(previous: ViewMetadata) -> Self {
103 let previous_view_version = previous.current_version().clone();
104 Self {
105 metadata: previous,
106 changes: Vec::default(),
107 last_added_schema_id: None,
108 last_added_version_id: None,
109 history_entry: None,
110 previous_view_version: Some(previous_view_version),
111 }
112 }
113
114 pub fn from_view_creation(view_creation: ViewCreation) -> Result<Self> {
116 let ViewCreation {
117 location,
118 schema,
119 properties,
120 name: _,
121 representations,
122 default_catalog,
123 default_namespace,
124 summary,
125 } = view_creation;
126 let version = ViewVersion::builder()
127 .with_default_catalog(default_catalog)
128 .with_default_namespace(default_namespace)
129 .with_representations(representations)
130 .with_schema_id(schema.schema_id())
131 .with_summary(summary)
132 .with_timestamp_ms(Utc::now().timestamp_millis())
133 .with_version_id(INITIAL_VIEW_VERSION_ID)
134 .build();
135
136 Self::new(location, schema, version, ViewFormatVersion::V1, properties)
137 }
138
139 pub fn upgrade_format_version(self, format_version: ViewFormatVersion) -> Result<Self> {
144 if format_version < self.metadata.format_version {
145 return Err(Error::new(
146 ErrorKind::DataInvalid,
147 format!(
148 "Cannot downgrade ViewFormatVersion from {} to {}",
149 self.metadata.format_version, format_version
150 ),
151 ));
152 }
153
154 if format_version != self.metadata.format_version {
155 match format_version {
156 ViewFormatVersion::V1 => {
157 }
159 }
160 }
161
162 Ok(self)
163 }
164
165 pub fn set_location(mut self, location: String) -> Self {
167 let location = location.trim_end_matches('/').to_string();
168 if self.metadata.location != location {
169 self.changes.push(ViewUpdate::SetLocation {
170 location: location.clone(),
171 });
172 self.metadata.location = location;
173 }
174
175 self
176 }
177
178 pub fn set_current_version_id(mut self, mut version_id: i32) -> Result<Self> {
184 if version_id == Self::LAST_ADDED {
185 let Some(last_added_id) = self.last_added_version_id else {
186 return Err(Error::new(
187 ErrorKind::DataInvalid,
188 "Cannot set current version id to last added version: no version has been added.",
189 ));
190 };
191 version_id = last_added_id;
192 }
193
194 let version_id = version_id; if version_id == self.metadata.current_version_id {
197 return Ok(self);
198 }
199
200 let version = self.metadata.versions.get(&version_id).ok_or_else(|| {
201 Error::new(
202 ErrorKind::DataInvalid,
203 format!("Cannot set current version to unknown version with id: {version_id}"),
204 )
205 })?;
206
207 self.metadata.current_version_id = version_id;
208
209 if self.last_added_version_id == Some(version_id) {
210 self.changes.push(ViewUpdate::SetCurrentViewVersion {
211 view_version_id: Self::LAST_ADDED,
212 });
213 } else {
214 self.changes.push(ViewUpdate::SetCurrentViewVersion {
215 view_version_id: version_id,
216 });
217 }
218
219 let version_added_in_this_changes = self
223 .changes
224 .iter()
225 .any(|update| matches!(update, ViewUpdate::AddViewVersion { view_version } if view_version.version_id() == version_id));
226
227 let mut log = version.log();
228 if !version_added_in_this_changes {
229 log.set_timestamp_ms(Utc::now().timestamp_millis());
230 }
231
232 self.history_entry = Some(log);
233
234 Ok(self)
235 }
236
237 pub fn set_current_version(
239 mut self,
240 view_version: ViewVersion,
241 schema: Schema,
242 ) -> Result<Self> {
243 let schema_id = self.add_schema_internal(schema);
244 let view_version = view_version.with_schema_id(schema_id);
245 let view_version_id = self.add_version_internal(view_version)?;
246 self.set_current_version_id(view_version_id)
247 }
248
249 pub fn add_version(mut self, view_version: ViewVersion) -> Result<Self> {
256 self.add_version_internal(view_version)?;
257
258 Ok(self)
259 }
260
261 fn add_version_internal(&mut self, view_version: ViewVersion) -> Result<i32> {
262 let version_id = self.reuse_or_create_new_view_version_id(&view_version);
263 let view_version = view_version.with_version_id(version_id);
264
265 if self.metadata.versions.contains_key(&version_id) {
266 if self.last_added_version_id != Some(version_id) {
270 self.changes
271 .push(ViewUpdate::AddViewVersion { view_version });
272 self.last_added_version_id = Some(version_id);
273 }
274 return Ok(version_id);
275 }
276
277 let view_version = if view_version.schema_id() == Self::LAST_ADDED {
278 let last_added_schema_id = self.last_added_schema_id.ok_or_else(|| {
279 Error::new(
280 ErrorKind::DataInvalid,
281 "Cannot set last added schema: no schema has been added",
282 )
283 })?;
284 view_version.with_schema_id(last_added_schema_id)
285 } else {
286 view_version
287 };
288
289 if !self
290 .metadata
291 .schemas
292 .contains_key(&view_version.schema_id())
293 {
294 return Err(Error::new(
295 ErrorKind::DataInvalid,
296 format!(
297 "Cannot add version with unknown schema: {}",
298 view_version.schema_id()
299 ),
300 ));
301 }
302
303 require_unique_dialects(&view_version)?;
304
305 if let Some(last) = self.metadata.version_log.last() {
308 if view_version.timestamp_ms() - last.timestamp_ms() < -ONE_MINUTE_MS {
311 return Err(Error::new(
312 ErrorKind::DataInvalid,
313 format!(
314 "Invalid snapshot timestamp {}: before last snapshot timestamp {}",
315 view_version.timestamp_ms(),
316 last.timestamp_ms()
317 ),
318 ));
319 }
320 }
321
322 self.metadata
323 .versions
324 .insert(version_id, Arc::new(view_version.clone()));
325
326 let view_version = if let Some(last_added_schema_id) = self.last_added_schema_id {
327 if view_version.schema_id() == last_added_schema_id {
328 view_version.with_schema_id(Self::LAST_ADDED)
329 } else {
330 view_version
331 }
332 } else {
333 view_version
334 };
335 self.changes
336 .push(ViewUpdate::AddViewVersion { view_version });
337
338 self.last_added_version_id = Some(version_id);
339
340 Ok(version_id)
341 }
342
343 fn reuse_or_create_new_view_version_id(&self, new_view_version: &ViewVersion) -> i32 {
344 self.metadata
345 .versions
346 .iter()
347 .find_map(|(id, other_version)| {
348 new_view_version
349 .behaves_identical_to(other_version)
350 .then_some(*id)
351 })
352 .unwrap_or_else(|| {
353 self.get_highest_view_version_id()
354 .map(|id| id + 1)
355 .unwrap_or(INITIAL_VIEW_VERSION_ID)
356 })
357 }
358
359 fn get_highest_view_version_id(&self) -> Option<i32> {
360 self.metadata.versions.keys().max().copied()
361 }
362
363 pub fn add_schema(mut self, schema: Schema) -> Self {
365 self.add_schema_internal(schema);
366
367 self
368 }
369
370 fn add_schema_internal(&mut self, schema: Schema) -> SchemaId {
371 let schema_id = self.reuse_or_create_new_schema_id(&schema);
372
373 if self.metadata.schemas.contains_key(&schema_id) {
374 if self.last_added_schema_id != Some(schema_id) {
378 self.changes.push(ViewUpdate::AddSchema {
379 schema: schema.clone().with_schema_id(schema_id),
380 last_column_id: None,
381 });
382 self.last_added_schema_id = Some(schema_id);
383 }
384 return schema_id;
385 }
386
387 let schema = schema.with_schema_id(schema_id);
388
389 self.metadata
390 .schemas
391 .insert(schema_id, Arc::new(schema.clone()));
392 let last_column_id = schema.highest_field_id();
393 self.changes.push(ViewUpdate::AddSchema {
394 schema,
395 last_column_id: Some(last_column_id),
396 });
397
398 self.last_added_schema_id = Some(schema_id);
399
400 schema_id
401 }
402
403 fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> SchemaId {
404 self.metadata
405 .schemas
406 .iter()
407 .find_map(|(id, schema)| new_schema.is_same_schema(schema).then_some(*id))
408 .unwrap_or_else(|| {
409 self.get_highest_schema_id()
410 .map(|id| id + 1)
411 .unwrap_or(DEFAULT_SCHEMA_ID)
412 })
413 }
414
415 fn get_highest_schema_id(&self) -> Option<SchemaId> {
416 self.metadata.schemas.keys().max().copied()
417 }
418
419 pub fn set_properties(mut self, updates: HashMap<String, String>) -> Result<Self> {
421 if updates.is_empty() {
422 return Ok(self);
423 }
424
425 let num_versions_to_keep = updates
426 .get(VIEW_PROPERTY_VERSION_HISTORY_SIZE)
427 .and_then(|v| v.parse::<i64>().ok())
428 .unwrap_or(1);
429 if num_versions_to_keep < 0 {
430 return Err(Error::new(
431 ErrorKind::DataInvalid,
432 format!(
433 "{VIEW_PROPERTY_VERSION_HISTORY_SIZE} must be positive but was {num_versions_to_keep}"
434 ),
435 ));
436 }
437
438 self.metadata.properties.extend(updates.clone());
439 self.changes.push(ViewUpdate::SetProperties { updates });
440
441 Ok(self)
442 }
443
444 pub fn remove_properties(mut self, removals: &[String]) -> Self {
446 if removals.is_empty() {
447 return self;
448 }
449
450 for property in removals {
451 self.metadata.properties.remove(property);
452 }
453
454 self.changes.push(ViewUpdate::RemoveProperties {
455 removals: removals.to_vec(),
456 });
457
458 self
459 }
460
461 pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
463 if self.metadata.view_uuid != uuid {
464 self.metadata.view_uuid = uuid;
465 self.changes.push(ViewUpdate::AssignUuid { uuid });
466 }
467
468 self
469 }
470
471 pub fn build(mut self) -> Result<ViewMetadataBuildResult> {
473 if let Some(history_entry) = self.history_entry.take() {
474 self.metadata.version_log.push(history_entry);
475 }
476
477 self.metadata.validate()?;
480
481 if let Some(previous) = self.previous_view_version.take() {
482 if !allow_replace_drop_dialects(&self.metadata.properties) {
483 require_no_dialect_dropped(&previous, self.metadata.current_version())?;
484 }
485 }
486
487 let _expired_versions = self.expire_versions();
488 self.metadata.version_log = update_version_log(
489 self.metadata.version_log,
490 self.metadata.versions.keys().copied().collect(),
491 );
492
493 Ok(ViewMetadataBuildResult {
494 metadata: self.metadata,
495 changes: self.changes,
496 })
497 }
498
499 fn expire_versions(&mut self) -> Vec<ViewVersionRef> {
501 let num_versions_to_keep = self
502 .metadata
503 .properties
504 .get(VIEW_PROPERTY_VERSION_HISTORY_SIZE)
505 .and_then(|v| v.parse::<usize>().ok())
506 .unwrap_or(VIEW_PROPERTY_VERSION_HISTORY_SIZE_DEFAULT)
507 .max(1);
508
509 let num_added_versions = self
511 .changes
512 .iter()
513 .filter(|update| matches!(update, ViewUpdate::AddViewVersion { .. }))
514 .count();
515 let num_versions_to_keep = num_added_versions.max(num_versions_to_keep);
516
517 if self.metadata.versions.len() > num_versions_to_keep {
518 let mut versions_to_keep = self
520 .metadata
521 .versions
522 .keys()
523 .copied()
524 .sorted()
525 .rev()
526 .take(num_versions_to_keep)
527 .collect::<HashSet<_>>();
528
529 if !versions_to_keep.contains(&self.metadata.current_version_id) {
531 if num_versions_to_keep > num_added_versions {
533 let lowest_id = versions_to_keep.iter().min().copied();
534 lowest_id.map(|id| versions_to_keep.remove(&id));
535 }
536 versions_to_keep.insert(self.metadata.current_version_id);
538 }
539
540 let mut expired_versions = Vec::new();
541 self.metadata.versions.retain(|id, version| {
544 if versions_to_keep.contains(id) {
545 true
546 } else {
547 expired_versions.push(version.clone());
548 false
549 }
550 });
551
552 expired_versions
553 } else {
554 Vec::new()
555 }
556 }
557}
558
559fn update_version_log(
562 version_log: Vec<ViewVersionLog>,
563 ids_to_keep: HashSet<i32>,
564) -> Vec<ViewVersionLog> {
565 let mut retained_history = Vec::new();
566 for log_entry in version_log {
567 if ids_to_keep.contains(&log_entry.version_id()) {
568 retained_history.push(log_entry);
569 } else {
570 retained_history.clear();
571 }
572 }
573 retained_history
574}
575
576fn allow_replace_drop_dialects(properties: &HashMap<String, String>) -> bool {
577 properties
578 .get(VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED)
579 .map_or(
580 VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED_DEFAULT,
581 |value| is_truthy(value),
582 )
583}
584
585fn require_no_dialect_dropped(previous: &ViewVersion, current: &ViewVersion) -> Result<()> {
586 let base_dialects = lowercase_sql_dialects_for(previous);
587 let updated_dialects = lowercase_sql_dialects_for(current);
588
589 if !updated_dialects.is_superset(&base_dialects) {
590 return Err(Error::new(
591 ErrorKind::DataInvalid,
592 format!(
593 "Cannot replace view due to loss of view dialects: \nPrevious dialects: {:?}\nNew dialects: {:?}\nSet {} to true to allow dropping dialects.",
594 Vec::from_iter(base_dialects),
595 Vec::from_iter(updated_dialects),
596 VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED
597 ),
598 ));
599 }
600
601 Ok(())
602}
603
604fn lowercase_sql_dialects_for(view_version: &ViewVersion) -> HashSet<String> {
605 view_version
606 .representations()
607 .iter()
608 .map(|repr| match repr {
609 ViewRepresentation::Sql(sql_repr) => sql_repr.dialect.to_lowercase(),
610 })
611 .collect()
612}
613
614pub(super) fn require_unique_dialects(view_version: &ViewVersion) -> Result<()> {
615 let mut seen_dialects = HashSet::with_capacity(view_version.representations().len());
616 for repr in view_version.representations().iter() {
617 match repr {
618 ViewRepresentation::Sql(sql_repr) => {
619 if !seen_dialects.insert(sql_repr.dialect.to_lowercase()) {
620 return Err(Error::new(
621 ErrorKind::DataInvalid,
622 format!(
623 "Invalid view version: Cannot add multiple queries for dialect {}",
624 sql_repr.dialect
625 ),
626 ));
627 }
628 }
629 }
630 }
631 Ok(())
632}
633
634#[cfg(test)]
635mod test {
636 use super::super::view_metadata::tests::get_test_view_metadata;
637 use super::*;
638 use crate::NamespaceIdent;
639 use crate::spec::{
640 NestedField, PrimitiveType, SqlViewRepresentation, Type, ViewRepresentations,
641 };
642
643 fn new_view_version(id: usize, schema_id: SchemaId, sql: &str) -> ViewVersion {
644 new_view_version_with_dialect(id, schema_id, sql, vec!["spark"])
645 }
646
647 fn new_view_version_with_dialect(
648 id: usize,
649 schema_id: SchemaId,
650 sql: &str,
651 dialects: Vec<&str>,
652 ) -> ViewVersion {
653 ViewVersion::builder()
654 .with_version_id(id as i32)
655 .with_schema_id(schema_id)
656 .with_timestamp_ms(1573518431300)
657 .with_default_catalog(Some("prod".to_string()))
658 .with_summary(HashMap::from_iter(vec![(
659 "user".to_string(),
660 "some-user".to_string(),
661 )]))
662 .with_representations(ViewRepresentations(
663 dialects
664 .iter()
665 .map(|dialect| {
666 ViewRepresentation::Sql(SqlViewRepresentation {
667 dialect: dialect.to_string(),
668 sql: sql.to_string(),
669 })
670 })
671 .collect(),
672 ))
673 .with_default_namespace(NamespaceIdent::new("default".to_string()))
674 .build()
675 }
676
677 fn builder_without_changes() -> ViewMetadataBuilder {
678 ViewMetadataBuilder::new_from_metadata(get_test_view_metadata("ViewMetadataV1Valid.json"))
679 }
680
681 #[test]
682 fn test_minimal_builder() {
683 let location = "s3://bucket/table".to_string();
684 let schema = Schema::builder()
685 .with_schema_id(1)
686 .with_fields(vec![])
687 .build()
688 .unwrap();
689 let version = new_view_version(20, 21, "select 1 as count");
691 let format_version = ViewFormatVersion::V1;
692 let properties = HashMap::from_iter(vec![("key".to_string(), "value".to_string())]);
693
694 let build_result = ViewMetadataBuilder::new(
695 location.clone(),
696 schema.clone(),
697 version.clone(),
698 format_version,
699 properties.clone(),
700 )
701 .unwrap()
702 .build()
703 .unwrap();
704
705 let metadata = build_result.metadata;
706 assert_eq!(metadata.location, location);
707 assert_eq!(metadata.current_version_id, INITIAL_VIEW_VERSION_ID);
708 assert_eq!(metadata.format_version, format_version);
709 assert_eq!(metadata.properties, properties);
710 assert_eq!(metadata.versions.len(), 1);
711 assert_eq!(metadata.schemas.len(), 1);
712 assert_eq!(metadata.version_log.len(), 1);
713 assert_eq!(
714 Arc::unwrap_or_clone(metadata.versions[&INITIAL_VIEW_VERSION_ID].clone()),
715 version
716 .clone()
717 .with_version_id(INITIAL_VIEW_VERSION_ID)
718 .with_schema_id(0)
719 );
720
721 let changes = build_result.changes;
722 assert_eq!(changes.len(), 5);
723 assert!(changes.contains(&ViewUpdate::SetLocation { location }));
724 assert!(
725 changes.contains(&ViewUpdate::AddViewVersion {
726 view_version: version
727 .with_version_id(INITIAL_VIEW_VERSION_ID)
728 .with_schema_id(-1)
729 })
730 );
731 assert!(changes.contains(&ViewUpdate::SetCurrentViewVersion {
732 view_version_id: -1
733 }));
734 assert!(changes.contains(&ViewUpdate::AddSchema {
735 schema: schema.clone().with_schema_id(0),
736 last_column_id: Some(0)
737 }));
738 assert!(changes.contains(&ViewUpdate::SetProperties {
739 updates: properties
740 }));
741 }
742
743 #[test]
744 fn test_version_expiration() {
745 let v1 = new_view_version(0, 1, "select 1 as count");
746 let v2 = new_view_version(0, 1, "select count(1) as count from t2");
747 let v3 = new_view_version(0, 1, "select count from t1");
748
749 let builder = builder_without_changes()
750 .add_version(v1)
751 .unwrap()
752 .add_version(v2)
753 .unwrap()
754 .add_version(v3)
755 .unwrap();
756 let builder_without_changes = builder.clone().build().unwrap().metadata.into_builder();
757
758 let metadata = builder.clone().build().unwrap().metadata;
760 assert_eq!(
761 metadata.versions.keys().cloned().collect::<HashSet<_>>(),
762 HashSet::from_iter(vec![1, 2, 3, 4])
763 );
764
765 let metadata = builder
768 .clone()
769 .set_properties(HashMap::from_iter(vec![(
770 VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
771 "2".to_string(),
772 )]))
773 .unwrap()
774 .build()
775 .unwrap()
776 .metadata;
777 assert_eq!(
778 metadata.versions.keys().cloned().collect::<HashSet<_>>(),
779 HashSet::from_iter(vec![1, 2, 3, 4])
780 );
781 assert_eq!(metadata.version_log.len(), 1);
782
783 let metadata = builder_without_changes
786 .clone()
787 .set_properties(HashMap::from_iter(vec![(
788 VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
789 "2".to_string(),
790 )]))
791 .unwrap()
792 .build()
793 .unwrap()
794 .metadata;
795 assert_eq!(
796 metadata.versions.keys().cloned().collect::<HashSet<_>>(),
797 HashSet::from_iter(vec![1, 4])
798 );
799
800 let metadata = builder_without_changes
803 .set_properties(HashMap::from_iter(vec![(
804 VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
805 "0".to_string(),
806 )]))
807 .unwrap()
808 .build()
809 .unwrap()
810 .metadata;
811 assert_eq!(
812 metadata.versions.keys().cloned().collect::<HashSet<_>>(),
813 HashSet::from_iter(vec![1])
814 );
815 }
816
817 #[test]
818 fn test_update_version_log() {
819 let v1 = new_view_version(1, 1, "select 1 as count");
820 let v2 = new_view_version(2, 1, "select count(1) as count from t2");
821 let v3 = new_view_version(3, 1, "select count from t1");
822
823 let one = ViewVersionLog::new(1, v1.timestamp_ms());
824 let two = ViewVersionLog::new(2, v2.timestamp_ms());
825 let three = ViewVersionLog::new(3, v3.timestamp_ms());
826
827 assert_eq!(
828 update_version_log(
829 vec![one.clone(), two.clone(), three.clone()],
830 HashSet::from_iter(vec![1, 2, 3])
831 ),
832 vec![one.clone(), two.clone(), three.clone()]
833 );
834
835 assert_eq!(
837 update_version_log(
838 vec![
839 three.clone(),
840 two.clone(),
841 one.clone(),
842 two.clone(),
843 three.clone()
844 ],
845 HashSet::from_iter(vec![2, 3])
846 ),
847 vec![two.clone(), three.clone()]
848 );
849
850 assert_eq!(
852 update_version_log(
853 vec![
854 one.clone(),
855 two.clone(),
856 three.clone(),
857 one.clone(),
858 three.clone()
859 ],
860 HashSet::from_iter(vec![1, 3])
861 ),
862 vec![three.clone(), one.clone(), three.clone()]
863 );
864 }
865
866 #[test]
867 fn test_use_previously_added_version() {
868 let v2 = new_view_version(2, 1, "select 1 as count");
869 let v3 = new_view_version(3, 1, "select count(1) as count from t2");
870 let schema = Schema::builder().build().unwrap();
871
872 let log_v2 = ViewVersionLog::new(2, v2.timestamp_ms());
873 let log_v3 = ViewVersionLog::new(3, v3.timestamp_ms());
874
875 let metadata_v2 = builder_without_changes()
876 .set_current_version(v2.clone(), schema.clone())
877 .unwrap()
878 .build()
879 .unwrap()
880 .metadata;
881
882 assert_eq!(metadata_v2.version_log.last().unwrap(), &log_v2);
884
885 let metadata_v3 = metadata_v2
887 .into_builder()
888 .set_current_version(v3.clone(), schema)
889 .unwrap()
890 .build()
891 .unwrap()
892 .metadata;
893
894 assert_eq!(metadata_v3.version_log[1..], vec![
895 log_v2.clone(),
896 log_v3.clone()
897 ]);
898
899 let metadata_v4 = metadata_v3
901 .into_builder()
902 .set_current_version_id(2)
903 .unwrap()
904 .build()
905 .unwrap()
906 .metadata;
907
908 let entry = metadata_v4.version_log.last().unwrap();
910 assert_eq!(entry.version_id(), 2);
911 assert!(entry.timestamp_ms() > v2.timestamp_ms());
912 }
913
914 #[test]
915 fn test_assign_uuid() {
916 let builder = builder_without_changes();
917 let uuid = Uuid::now_v7();
918 let build_result = builder.clone().assign_uuid(uuid).build().unwrap();
919 assert_eq!(build_result.metadata.view_uuid, uuid);
920 assert_eq!(build_result.changes, vec![ViewUpdate::AssignUuid { uuid }]);
921 }
922
923 #[test]
924 fn test_set_location() {
925 let builder = builder_without_changes();
926 let location = "s3://bucket/table".to_string();
927 let build_result = builder
928 .clone()
929 .set_location(location.clone())
930 .build()
931 .unwrap();
932 assert_eq!(build_result.metadata.location, location);
933 assert_eq!(build_result.changes, vec![ViewUpdate::SetLocation {
934 location
935 }]);
936 }
937
938 #[test]
939 fn test_set_and_remove_properties() {
940 let builder = builder_without_changes();
941 let properties = HashMap::from_iter(vec![
942 ("key1".to_string(), "value1".to_string()),
943 ("key2".to_string(), "value2".to_string()),
944 ]);
945 let build_result = builder
946 .clone()
947 .set_properties(properties.clone())
948 .unwrap()
949 .remove_properties(&["key2".to_string(), "key3".to_string()])
950 .build()
951 .unwrap();
952 assert_eq!(
953 build_result.metadata.properties.get("key1"),
954 Some(&"value1".to_string())
955 );
956 assert_eq!(build_result.metadata.properties.get("key2"), None);
957 assert_eq!(build_result.changes, vec![
958 ViewUpdate::SetProperties {
959 updates: properties
960 },
961 ViewUpdate::RemoveProperties {
962 removals: vec!["key2".to_string(), "key3".to_string()]
963 }
964 ]);
965 }
966
967 #[test]
968 fn test_add_schema() {
969 let builder = builder_without_changes();
970 let schema = Schema::builder()
971 .with_schema_id(1)
972 .with_fields(vec![])
973 .build()
974 .unwrap();
975 let build_result = builder.clone().add_schema(schema.clone()).build().unwrap();
976 assert_eq!(build_result.metadata.schemas.len(), 2);
977 assert_eq!(build_result.changes, vec![ViewUpdate::AddSchema {
978 schema: schema.clone().with_schema_id(2),
979 last_column_id: Some(0)
980 }]);
981
982 let build_result = builder.clone().add_schema(schema.clone()).build().unwrap();
984 assert_eq!(build_result.metadata.schemas.len(), 2);
985 assert_eq!(build_result.changes, vec![ViewUpdate::AddSchema {
986 schema: schema.clone().with_schema_id(2),
987 last_column_id: Some(0)
988 }]);
989 }
990
991 #[test]
992 fn test_add_and_set_current_version() {
993 let builder = builder_without_changes();
994 let v1 = new_view_version(2, 1, "select 1 as count");
995 let v2 = new_view_version(3, 2, "select count(1) as count from t2");
996 let v2_schema = Schema::builder()
997 .with_schema_id(2)
998 .with_fields(vec![])
999 .build()
1000 .unwrap();
1001
1002 let build_result = builder
1003 .clone()
1004 .add_version(v1.clone())
1005 .unwrap()
1006 .add_schema(v2_schema.clone())
1007 .add_version(v2.clone())
1008 .unwrap()
1009 .set_current_version_id(3)
1010 .unwrap()
1011 .build()
1012 .unwrap();
1013
1014 assert_eq!(build_result.metadata.current_version_id, 3);
1015 assert_eq!(build_result.metadata.versions.len(), 3);
1016 assert_eq!(build_result.metadata.schemas.len(), 2);
1017 assert_eq!(build_result.metadata.version_log.len(), 2);
1018 assert_eq!(
1019 Arc::unwrap_or_clone(build_result.metadata.versions[&2].clone()),
1020 v1.clone().with_version_id(2).with_schema_id(1)
1021 );
1022 assert_eq!(
1023 Arc::unwrap_or_clone(build_result.metadata.versions[&3].clone()),
1024 v2.clone().with_version_id(3).with_schema_id(2)
1025 );
1026 assert_eq!(build_result.changes.len(), 4);
1027 assert_eq!(build_result.changes, vec![
1028 ViewUpdate::AddViewVersion {
1029 view_version: v1.clone().with_version_id(2).with_schema_id(1)
1030 },
1031 ViewUpdate::AddSchema {
1032 schema: v2_schema.clone().with_schema_id(2),
1033 last_column_id: Some(0)
1034 },
1035 ViewUpdate::AddViewVersion {
1036 view_version: v2.clone().with_version_id(3).with_schema_id(-1)
1037 },
1038 ViewUpdate::SetCurrentViewVersion {
1039 view_version_id: -1
1040 }
1041 ]);
1042 assert_eq!(
1043 build_result
1044 .metadata
1045 .version_log
1046 .iter()
1047 .map(|v| v.version_id())
1048 .collect::<Vec<_>>(),
1049 vec![1, 3]
1050 );
1051 }
1052
1053 #[test]
1054 fn test_schema_and_version_id_reassignment() {
1055 let builder = builder_without_changes();
1056 let v1 = new_view_version(0, 1, "select 1 as count");
1057 let v2 = new_view_version(0, 2, "select count(1) as count from t2");
1058 let v2_schema = Schema::builder()
1059 .with_schema_id(0)
1060 .with_fields(vec![])
1061 .build()
1062 .unwrap();
1063
1064 let build_result = builder
1065 .clone()
1066 .add_version(v1.clone())
1067 .unwrap()
1068 .set_current_version(v2.clone(), v2_schema.clone())
1069 .unwrap()
1070 .build()
1071 .unwrap();
1072
1073 assert_eq!(build_result.metadata.current_version_id, 3);
1074 assert_eq!(build_result.metadata.versions.len(), 3);
1075 assert_eq!(build_result.metadata.schemas.len(), 2);
1076 assert_eq!(build_result.metadata.version_log.len(), 2);
1077 assert_eq!(
1078 Arc::unwrap_or_clone(build_result.metadata.versions[&2].clone()),
1079 v1.clone().with_version_id(2).with_schema_id(1)
1080 );
1081 assert_eq!(
1082 Arc::unwrap_or_clone(build_result.metadata.versions[&3].clone()),
1083 v2.clone().with_version_id(3).with_schema_id(2)
1084 );
1085 assert_eq!(build_result.changes.len(), 4);
1086 assert_eq!(build_result.changes, vec![
1087 ViewUpdate::AddViewVersion {
1088 view_version: v1.clone().with_version_id(2).with_schema_id(1)
1089 },
1090 ViewUpdate::AddSchema {
1091 schema: v2_schema.clone().with_schema_id(2),
1092 last_column_id: Some(0)
1093 },
1094 ViewUpdate::AddViewVersion {
1095 view_version: v2.clone().with_version_id(3).with_schema_id(-1)
1096 },
1097 ViewUpdate::SetCurrentViewVersion {
1098 view_version_id: -1
1099 }
1100 ]);
1101 assert_eq!(
1102 build_result
1103 .metadata
1104 .version_log
1105 .iter()
1106 .map(|v| v.version_id())
1107 .collect::<Vec<_>>(),
1108 vec![1, 3]
1109 );
1110 }
1111
1112 #[test]
1113 fn test_view_version_deduplication() {
1114 let builder = builder_without_changes();
1115 let v1 = new_view_version(0, 1, "select * from ns.tbl");
1116
1117 assert_eq!(builder.metadata.versions.len(), 1);
1118 let build_result = builder
1119 .clone()
1120 .add_version(v1.clone())
1121 .unwrap()
1122 .add_version(v1)
1123 .unwrap()
1124 .build()
1125 .unwrap();
1126
1127 assert_eq!(build_result.metadata.versions.len(), 2);
1128 assert_eq!(build_result.metadata.schemas.len(), 1);
1129 }
1130
1131 #[test]
1132 fn test_view_version_and_schema_deduplication() {
1133 let schema_one = Schema::builder()
1134 .with_schema_id(5)
1135 .with_fields(vec![
1136 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1137 ])
1138 .build()
1139 .unwrap();
1140 let schema_two = Schema::builder()
1141 .with_schema_id(7)
1142 .with_fields(vec![
1143 NestedField::required(1, "y", Type::Primitive(PrimitiveType::Long)).into(),
1144 ])
1145 .build()
1146 .unwrap();
1147 let schema_three = Schema::builder()
1148 .with_schema_id(9)
1149 .with_fields(vec![
1150 NestedField::required(1, "z", Type::Primitive(PrimitiveType::Long)).into(),
1151 ])
1152 .build()
1153 .unwrap();
1154
1155 let v1 = new_view_version(1, 5, "select * from ns.tbl");
1156 let v2 = new_view_version(1, 7, "select count(*) from ns.tbl");
1157 let v3 = new_view_version(1, 9, "select count(*) as count from ns.tbl");
1158
1159 let build_result = builder_without_changes()
1160 .add_schema(schema_one.clone())
1161 .add_schema(schema_two.clone())
1162 .add_schema(schema_three.clone())
1163 .set_current_version(v1.clone(), schema_one.clone())
1164 .unwrap()
1165 .set_current_version(v2.clone(), schema_two.clone())
1166 .unwrap()
1167 .set_current_version(v3.clone(), schema_three.clone())
1168 .unwrap()
1169 .set_current_version(v3.clone(), schema_three.clone())
1170 .unwrap()
1171 .set_current_version(v2.clone(), schema_two.clone())
1172 .unwrap()
1173 .set_current_version(v1.clone(), schema_one.clone())
1174 .unwrap()
1175 .build()
1176 .unwrap();
1177
1178 assert_eq!(
1179 Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
1180 v1.clone().with_version_id(2).with_schema_id(2)
1181 );
1182 assert_eq!(build_result.metadata.versions.len(), 4);
1183 assert_eq!(
1184 build_result.metadata.versions[&2],
1185 Arc::new(v1.clone().with_version_id(2).with_schema_id(2))
1186 );
1187 assert_eq!(
1188 build_result.metadata.versions[&3],
1189 Arc::new(v2.clone().with_version_id(3).with_schema_id(3))
1190 );
1191 assert_eq!(
1192 build_result.metadata.versions[&4],
1193 Arc::new(v3.clone().with_version_id(4).with_schema_id(4))
1194 );
1195 assert_eq!(
1196 build_result
1198 .metadata
1199 .schemas_iter()
1200 .filter(|s| s.schema_id() != 1)
1201 .sorted_by_key(|s| s.schema_id())
1202 .map(|s| s.as_struct())
1203 .collect::<Vec<_>>(),
1204 vec![
1205 schema_one.as_struct(),
1206 schema_two.as_struct(),
1207 schema_three.as_struct()
1208 ]
1209 )
1210 }
1211
1212 #[test]
1213 fn test_error_on_missing_schema() {
1214 let builder = builder_without_changes();
1215 assert!(
1217 builder
1218 .clone()
1219 .add_version(new_view_version(0, 10, "SELECT * FROM foo"))
1220 .unwrap_err()
1221 .to_string()
1222 .contains("Cannot add version with unknown schema: 10")
1223 );
1224
1225 assert!(
1227 builder
1228 .clone()
1229 .add_version(new_view_version(0, -1, "SELECT * FROM foo"))
1230 .unwrap_err()
1231 .to_string()
1232 .contains("Cannot set last added schema: no schema has been added")
1233 );
1234 }
1235
1236 #[test]
1237 fn test_error_on_missing_current_version() {
1238 let builder = builder_without_changes();
1239 assert!(builder
1240 .clone()
1241 .set_current_version_id(-1)
1242 .unwrap_err()
1243 .to_string()
1244 .contains(
1245 "Cannot set current version id to last added version: no version has been added."
1246 ));
1247 assert!(
1248 builder
1249 .clone()
1250 .set_current_version_id(10)
1251 .unwrap_err()
1252 .to_string()
1253 .contains("Cannot set current version to unknown version with id: 10")
1254 );
1255 }
1256
1257 #[test]
1258 fn test_set_current_version_to_last_added() {
1259 let builder = builder_without_changes();
1260 let v1 = new_view_version(2, 1, "select * from ns.tbl");
1261 let v2 = new_view_version(3, 1, "select a,b from ns.tbl");
1262 let meta = builder
1263 .clone()
1264 .add_version(v1)
1265 .unwrap()
1266 .add_version(v2)
1267 .unwrap()
1268 .set_current_version_id(-1)
1269 .unwrap()
1270 .build()
1271 .unwrap();
1272 assert_eq!(meta.metadata.current_version_id, 3);
1273 }
1274
1275 #[test]
1276 fn test_error_when_setting_negative_version_history_size() {
1277 let builder = builder_without_changes();
1278 assert!(
1279 builder
1280 .clone()
1281 .set_properties(HashMap::from_iter(vec![(
1282 VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
1283 "-1".to_string(),
1284 )]))
1285 .unwrap_err()
1286 .to_string()
1287 .contains("version.history.num-entries must be positive but was -1")
1288 );
1289 }
1290
1291 #[test]
1292 fn test_view_version_changes() {
1293 let builder = builder_without_changes();
1294
1295 let v1 = new_view_version(2, 1, "select 1 as count");
1296 let v2 = new_view_version(3, 1, "select count(1) as count from t2");
1297
1298 let changes = builder
1299 .clone()
1300 .add_version(v1.clone())
1301 .unwrap()
1302 .add_version(v2.clone())
1303 .unwrap()
1304 .build()
1305 .unwrap()
1306 .changes;
1307
1308 assert_eq!(changes.len(), 2);
1309 assert_eq!(changes, vec![
1310 ViewUpdate::AddViewVersion {
1311 view_version: v1.clone()
1312 },
1313 ViewUpdate::AddViewVersion {
1314 view_version: v2.clone()
1315 }
1316 ]);
1317 }
1318
1319 #[test]
1320 fn test_dropping_dialect_fails_by_default() {
1321 let builder = builder_without_changes();
1322
1323 let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
1324 let spark_trino =
1325 new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark", "trino"]);
1326 let schema = Schema::builder()
1327 .with_schema_id(0)
1328 .with_fields(vec![])
1329 .build()
1330 .unwrap();
1331
1332 let err = builder
1333 .set_current_version(spark_trino, schema.clone())
1334 .unwrap()
1335 .build()
1336 .unwrap()
1337 .metadata
1338 .into_builder()
1339 .set_current_version(spark, schema)
1340 .unwrap()
1341 .build()
1342 .unwrap_err();
1343
1344 assert!(
1345 err.to_string()
1346 .contains("Cannot replace view due to loss of view dialects")
1347 );
1348 }
1349
1350 #[test]
1351 fn test_dropping_dialects_does_not_fail_when_allowed() {
1352 let builder = builder_without_changes();
1353
1354 let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
1355 let spark_trino =
1356 new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark", "trino"]);
1357 let schema = Schema::builder()
1358 .with_schema_id(0)
1359 .with_fields(vec![])
1360 .build()
1361 .unwrap();
1362
1363 let build_result = builder
1364 .set_properties(HashMap::from_iter(vec![(
1365 VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED.to_string(),
1366 "true".to_string(),
1367 )]))
1368 .unwrap()
1369 .set_current_version(spark_trino, schema.clone())
1370 .unwrap()
1371 .build()
1372 .unwrap()
1373 .metadata
1374 .into_builder()
1375 .set_current_version(spark.clone(), schema)
1376 .unwrap()
1377 .build()
1378 .unwrap();
1379
1380 assert_eq!(
1381 Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
1382 spark.with_version_id(3).with_schema_id(2)
1383 );
1384 }
1385
1386 #[test]
1387 fn test_can_add_dialects_by_default() {
1388 let builder = builder_without_changes();
1389
1390 let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
1391 let spark_trino =
1392 new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark", "trino"]);
1393
1394 let schema = Schema::builder()
1395 .with_schema_id(0)
1396 .with_fields(vec![])
1397 .build()
1398 .unwrap();
1399
1400 let build_result = builder
1401 .set_current_version(spark.clone(), schema.clone())
1402 .unwrap()
1403 .build()
1404 .unwrap()
1405 .metadata
1406 .into_builder()
1407 .set_current_version(spark_trino.clone(), schema.clone())
1408 .unwrap()
1409 .build()
1410 .unwrap();
1411
1412 assert_eq!(
1413 Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
1414 spark_trino.with_version_id(3).with_schema_id(2)
1415 );
1416 }
1417
1418 #[test]
1419 fn test_can_update_dialect_by_default() {
1420 let builder = builder_without_changes();
1421
1422 let spark_v1 = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
1423 let spark_v2 = new_view_version_with_dialect(0, 0, "SELECT * FROM bar", vec!["spark"]);
1424
1425 let schema = Schema::builder()
1426 .with_schema_id(0)
1427 .with_fields(vec![])
1428 .build()
1429 .unwrap();
1430
1431 let build_result = builder
1432 .set_current_version(spark_v1.clone(), schema.clone())
1433 .unwrap()
1434 .build()
1435 .unwrap()
1436 .metadata
1437 .into_builder()
1438 .set_current_version(spark_v2.clone(), schema.clone())
1439 .unwrap()
1440 .build()
1441 .unwrap();
1442
1443 assert_eq!(
1444 Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
1445 spark_v2.with_version_id(3).with_schema_id(2)
1446 );
1447 }
1448
1449 #[test]
1450 fn test_dropping_dialects_allowed_and_then_disallowed() {
1451 let builder = builder_without_changes();
1452
1453 let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
1454 let trino = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["trino"]);
1455
1456 let schema = Schema::builder()
1457 .with_schema_id(0)
1458 .with_fields(vec![])
1459 .build()
1460 .unwrap();
1461
1462 let updated = builder
1463 .set_current_version(spark.clone(), schema.clone())
1464 .unwrap()
1465 .build()
1466 .unwrap()
1467 .metadata
1468 .into_builder()
1469 .set_current_version(trino.clone(), schema.clone())
1470 .unwrap()
1471 .set_properties(HashMap::from_iter(vec![(
1472 VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED.to_string(),
1473 "true".to_string(),
1474 )]))
1475 .unwrap()
1476 .build()
1477 .unwrap();
1478
1479 assert_eq!(
1480 Arc::unwrap_or_clone(updated.metadata.current_version().clone()),
1481 trino.with_version_id(3).with_schema_id(2)
1482 );
1483
1484 let err = updated
1485 .metadata
1486 .into_builder()
1487 .set_current_version(spark.clone(), schema.clone())
1488 .unwrap()
1489 .set_properties(HashMap::from_iter(vec![(
1490 VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED.to_string(),
1491 "false".to_string(),
1492 )]))
1493 .unwrap()
1494 .build()
1495 .unwrap_err();
1496
1497 assert!(
1498 err.to_string()
1499 .contains("Cannot replace view due to loss of view dialects")
1500 );
1501 }
1502
1503 #[test]
1504 fn test_require_no_dialect_dropped() {
1505 let previous = ViewVersion::builder()
1506 .with_version_id(0)
1507 .with_schema_id(0)
1508 .with_timestamp_ms(0)
1509 .with_representations(ViewRepresentations(vec![
1510 ViewRepresentation::Sql(SqlViewRepresentation {
1511 dialect: "trino".to_string(),
1512 sql: "SELECT * FROM foo".to_string(),
1513 }),
1514 ViewRepresentation::Sql(SqlViewRepresentation {
1515 dialect: "spark".to_string(),
1516 sql: "SELECT * FROM bar".to_string(),
1517 }),
1518 ]))
1519 .with_default_namespace(NamespaceIdent::new("default".to_string()))
1520 .build();
1521
1522 let current = ViewVersion::builder()
1523 .with_version_id(0)
1524 .with_schema_id(0)
1525 .with_timestamp_ms(0)
1526 .with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(
1527 SqlViewRepresentation {
1528 dialect: "trino".to_string(),
1529 sql: "SELECT * FROM foo".to_string(),
1530 },
1531 )]))
1532 .with_default_namespace(NamespaceIdent::new("default".to_string()))
1533 .build();
1534
1535 assert!(require_no_dialect_dropped(&previous, ¤t).is_err());
1536
1537 let current = ViewVersion::builder()
1538 .with_version_id(0)
1539 .with_schema_id(0)
1540 .with_timestamp_ms(0)
1541 .with_representations(ViewRepresentations(vec![
1542 ViewRepresentation::Sql(SqlViewRepresentation {
1543 dialect: "spark".to_string(),
1544 sql: "SELECT * FROM bar".to_string(),
1545 }),
1546 ViewRepresentation::Sql(SqlViewRepresentation {
1547 dialect: "trino".to_string(),
1548 sql: "SELECT * FROM foo".to_string(),
1549 }),
1550 ]))
1551 .with_default_namespace(NamespaceIdent::new("default".to_string()))
1552 .build();
1553
1554 assert!(require_no_dialect_dropped(&previous, ¤t).is_ok());
1555 }
1556
1557 #[test]
1558 fn test_allow_replace_drop_dialects() {
1559 use std::collections::HashMap;
1560
1561 use super::allow_replace_drop_dialects;
1562
1563 let mut properties = HashMap::new();
1564 assert!(!allow_replace_drop_dialects(&properties));
1565
1566 properties.insert(
1567 "replace.drop-dialect.allowed".to_string(),
1568 "true".to_string(),
1569 );
1570 assert!(allow_replace_drop_dialects(&properties));
1571
1572 properties.insert(
1573 "replace.drop-dialect.allowed".to_string(),
1574 "false".to_string(),
1575 );
1576 assert!(!allow_replace_drop_dialects(&properties));
1577
1578 properties.insert(
1579 "replace.drop-dialect.allowed".to_string(),
1580 "TRUE".to_string(),
1581 );
1582 assert!(allow_replace_drop_dialects(&properties));
1583
1584 properties.insert(
1585 "replace.drop-dialect.allowed".to_string(),
1586 "FALSE".to_string(),
1587 );
1588 assert!(!allow_replace_drop_dialects(&properties));
1589 }
1590
1591 #[test]
1592 fn test_lowercase_sql_dialects_for() {
1593 let view_version = ViewVersion::builder()
1594 .with_version_id(0)
1595 .with_schema_id(0)
1596 .with_timestamp_ms(0)
1597 .with_representations(ViewRepresentations(vec![
1598 ViewRepresentation::Sql(SqlViewRepresentation {
1599 dialect: "STARROCKS".to_string(),
1600 sql: "SELECT * FROM foo".to_string(),
1601 }),
1602 ViewRepresentation::Sql(SqlViewRepresentation {
1603 dialect: "trino".to_string(),
1604 sql: "SELECT * FROM bar".to_string(),
1605 }),
1606 ViewRepresentation::Sql(SqlViewRepresentation {
1607 dialect: "Spark".to_string(),
1608 sql: "SELECT * FROM bar".to_string(),
1609 }),
1610 ]))
1611 .with_default_namespace(NamespaceIdent::new("default".to_string()))
1612 .build();
1613
1614 let dialects = lowercase_sql_dialects_for(&view_version);
1615 assert_eq!(dialects.len(), 3);
1616 assert!(dialects.contains("trino"));
1617 assert!(dialects.contains("spark"));
1618 assert!(dialects.contains("starrocks"));
1619 }
1620
1621 #[test]
1622 fn test_require_unique_dialects() {
1623 let view_version = ViewVersion::builder()
1624 .with_version_id(0)
1625 .with_schema_id(0)
1626 .with_timestamp_ms(0)
1627 .with_representations(ViewRepresentations(vec![
1628 ViewRepresentation::Sql(SqlViewRepresentation {
1629 dialect: "trino".to_string(),
1630 sql: "SELECT * FROM foo".to_string(),
1631 }),
1632 ViewRepresentation::Sql(SqlViewRepresentation {
1633 dialect: "trino".to_string(),
1634 sql: "SELECT * FROM bar".to_string(),
1635 }),
1636 ]))
1637 .with_default_namespace(NamespaceIdent::new("default".to_string()))
1638 .build();
1639
1640 assert!(require_unique_dialects(&view_version).is_err());
1641
1642 let view_version = ViewVersion::builder()
1643 .with_version_id(0)
1644 .with_schema_id(0)
1645 .with_timestamp_ms(0)
1646 .with_representations(ViewRepresentations(vec![
1647 ViewRepresentation::Sql(SqlViewRepresentation {
1648 dialect: "trino".to_string(),
1649 sql: "SELECT * FROM foo".to_string(),
1650 }),
1651 ViewRepresentation::Sql(SqlViewRepresentation {
1652 dialect: "spark".to_string(),
1653 sql: "SELECT * FROM bar".to_string(),
1654 }),
1655 ]))
1656 .with_default_namespace(NamespaceIdent::new("default".to_string()))
1657 .build();
1658
1659 assert!(require_unique_dialects(&view_version).is_ok());
1660 }
1661}