iceberg/spec/
view_metadata_builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use chrono::Utc;
22use itertools::Itertools;
23use uuid::Uuid;
24
25use super::{
26    DEFAULT_SCHEMA_ID, INITIAL_VIEW_VERSION_ID, ONE_MINUTE_MS, Schema, SchemaId,
27    TableMetadataBuilder, VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED,
28    VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED_DEFAULT, VIEW_PROPERTY_VERSION_HISTORY_SIZE,
29    VIEW_PROPERTY_VERSION_HISTORY_SIZE_DEFAULT, ViewFormatVersion, ViewMetadata,
30    ViewRepresentation, ViewVersion, ViewVersionLog, ViewVersionRef,
31};
32use crate::ViewCreation;
33use crate::catalog::ViewUpdate;
34use crate::error::{Error, ErrorKind, Result};
35use crate::io::is_truthy;
36
37/// Manipulating view metadata.
38///
39/// For this builder the order of called functions matters.
40/// All operations applied to the `ViewMetadata` are tracked in `changes` as  a chronologically
41/// ordered vec of `ViewUpdate`.
42/// If an operation does not lead to a change of the `ViewMetadata`, the corresponding update
43/// is omitted from `changes`.
44#[derive(Debug, Clone)]
45pub struct ViewMetadataBuilder {
46    metadata: ViewMetadata,
47    changes: Vec<ViewUpdate>,
48    last_added_schema_id: Option<SchemaId>,
49    last_added_version_id: Option<SchemaId>,
50    history_entry: Option<ViewVersionLog>,
51    // Previous view version is only used during build to check
52    // weather dialects are dropped or not.
53    previous_view_version: Option<ViewVersionRef>,
54}
55
56#[derive(Debug, Clone, PartialEq)]
57/// Result of modifying or creating a `ViewMetadata`.
58pub struct ViewMetadataBuildResult {
59    /// The new `ViewMetadata`.
60    pub metadata: ViewMetadata,
61    /// The changes that were applied to the metadata.
62    pub changes: Vec<ViewUpdate>,
63}
64
65impl ViewMetadataBuilder {
66    const LAST_ADDED: i32 = TableMetadataBuilder::LAST_ADDED;
67
68    /// Creates a new view metadata builder.
69    pub fn new(
70        location: String,
71        schema: Schema,
72        view_version: ViewVersion,
73        format_version: ViewFormatVersion,
74        properties: HashMap<String, String>,
75    ) -> Result<Self> {
76        let builder = Self {
77            metadata: ViewMetadata {
78                format_version,
79                view_uuid: Uuid::now_v7(),
80                location: "".to_string(), // Overwritten immediately by set_location
81                current_version_id: -1,   // Overwritten immediately by set_current_version,
82                versions: HashMap::new(), // Overwritten immediately by set_current_version
83                version_log: Vec::new(),
84                schemas: HashMap::new(), // Overwritten immediately by set_current_version
85                properties: HashMap::new(), // Overwritten immediately by set_properties
86            },
87            changes: vec![],
88            last_added_schema_id: None, // Overwritten immediately by set_current_version
89            last_added_version_id: None, // Overwritten immediately by set_current_version
90            history_entry: None,
91            previous_view_version: None, // This is a new view
92        };
93
94        builder
95            .set_location(location)
96            .set_current_version(view_version, schema)?
97            .set_properties(properties)
98    }
99
100    /// Creates a new view metadata builder from the given metadata to modify it.
101    #[must_use]
102    pub fn new_from_metadata(previous: ViewMetadata) -> Self {
103        let previous_view_version = previous.current_version().clone();
104        Self {
105            metadata: previous,
106            changes: Vec::default(),
107            last_added_schema_id: None,
108            last_added_version_id: None,
109            history_entry: None,
110            previous_view_version: Some(previous_view_version),
111        }
112    }
113
114    /// Creates a new view metadata builder from the given view creation.
115    pub fn from_view_creation(view_creation: ViewCreation) -> Result<Self> {
116        let ViewCreation {
117            location,
118            schema,
119            properties,
120            name: _,
121            representations,
122            default_catalog,
123            default_namespace,
124            summary,
125        } = view_creation;
126        let version = ViewVersion::builder()
127            .with_default_catalog(default_catalog)
128            .with_default_namespace(default_namespace)
129            .with_representations(representations)
130            .with_schema_id(schema.schema_id())
131            .with_summary(summary)
132            .with_timestamp_ms(Utc::now().timestamp_millis())
133            .with_version_id(INITIAL_VIEW_VERSION_ID)
134            .build();
135
136        Self::new(location, schema, version, ViewFormatVersion::V1, properties)
137    }
138
139    /// Upgrade `FormatVersion`. Downgrades are not allowed.
140    ///
141    /// # Errors
142    /// - Cannot downgrade to older format versions.
143    pub fn upgrade_format_version(self, format_version: ViewFormatVersion) -> Result<Self> {
144        if format_version < self.metadata.format_version {
145            return Err(Error::new(
146                ErrorKind::DataInvalid,
147                format!(
148                    "Cannot downgrade ViewFormatVersion from {} to {}",
149                    self.metadata.format_version, format_version
150                ),
151            ));
152        }
153
154        if format_version != self.metadata.format_version {
155            match format_version {
156                ViewFormatVersion::V1 => {
157                    // No changes needed for V1
158                }
159            }
160        }
161
162        Ok(self)
163    }
164
165    /// Set the location of the view, stripping any trailing slashes.
166    pub fn set_location(mut self, location: String) -> Self {
167        let location = location.trim_end_matches('/').to_string();
168        if self.metadata.location != location {
169            self.changes.push(ViewUpdate::SetLocation {
170                location: location.clone(),
171            });
172            self.metadata.location = location;
173        }
174
175        self
176    }
177
178    /// Set an existing view version as the current version.
179    ///
180    /// # Errors
181    /// - The specified `version_id` does not exist.
182    /// - The specified `version_id` is `-1` but no version has been added.
183    pub fn set_current_version_id(mut self, mut version_id: i32) -> Result<Self> {
184        if version_id == Self::LAST_ADDED {
185            let Some(last_added_id) = self.last_added_version_id else {
186                return Err(Error::new(
187                    ErrorKind::DataInvalid,
188                    "Cannot set current version id to last added version: no version has been added.",
189                ));
190            };
191            version_id = last_added_id;
192        }
193
194        let version_id = version_id; // make immutable
195
196        if version_id == self.metadata.current_version_id {
197            return Ok(self);
198        }
199
200        let version = self.metadata.versions.get(&version_id).ok_or_else(|| {
201            Error::new(
202                ErrorKind::DataInvalid,
203                format!("Cannot set current version to unknown version with id: {version_id}"),
204            )
205        })?;
206
207        self.metadata.current_version_id = version_id;
208
209        if self.last_added_version_id == Some(version_id) {
210            self.changes.push(ViewUpdate::SetCurrentViewVersion {
211                view_version_id: Self::LAST_ADDED,
212            });
213        } else {
214            self.changes.push(ViewUpdate::SetCurrentViewVersion {
215                view_version_id: version_id,
216            });
217        }
218
219        // Use the timestamp of the snapshot if it was added in this set of changes,
220        // otherwise use a current timestamp for the log. The view version was added
221        // by a past transaction.
222        let version_added_in_this_changes = self
223            .changes
224            .iter()
225            .any(|update| matches!(update, ViewUpdate::AddViewVersion { view_version } if view_version.version_id() == version_id));
226
227        let mut log = version.log();
228        if !version_added_in_this_changes {
229            log.set_timestamp_ms(Utc::now().timestamp_millis());
230        }
231
232        self.history_entry = Some(log);
233
234        Ok(self)
235    }
236
237    /// Add a new view version and set it as current.
238    pub fn set_current_version(
239        mut self,
240        view_version: ViewVersion,
241        schema: Schema,
242    ) -> Result<Self> {
243        let schema_id = self.add_schema_internal(schema);
244        let view_version = view_version.with_schema_id(schema_id);
245        let view_version_id = self.add_version_internal(view_version)?;
246        self.set_current_version_id(view_version_id)
247    }
248
249    /// Add a new version to the view.
250    ///
251    /// # Errors
252    /// - The schema ID of the version is set to `-1`, but no schema has been added.
253    /// - The schema ID of the specified version is unknown.
254    /// - Multiple queries for the same dialect are added.
255    pub fn add_version(mut self, view_version: ViewVersion) -> Result<Self> {
256        self.add_version_internal(view_version)?;
257
258        Ok(self)
259    }
260
261    fn add_version_internal(&mut self, view_version: ViewVersion) -> Result<i32> {
262        let version_id = self.reuse_or_create_new_view_version_id(&view_version);
263        let view_version = view_version.with_version_id(version_id);
264
265        if self.metadata.versions.contains_key(&version_id) {
266            // ToDo Discuss: Similar to TableMetadata sort-order, Java does not add changes
267            // in this case. I prefer to add changes as the state of the builder is
268            // potentially mutated (`last_added_version_id`), thus we should record the change.
269            if self.last_added_version_id != Some(version_id) {
270                self.changes
271                    .push(ViewUpdate::AddViewVersion { view_version });
272                self.last_added_version_id = Some(version_id);
273            }
274            return Ok(version_id);
275        }
276
277        let view_version = if view_version.schema_id() == Self::LAST_ADDED {
278            let last_added_schema_id = self.last_added_schema_id.ok_or_else(|| {
279                Error::new(
280                    ErrorKind::DataInvalid,
281                    "Cannot set last added schema: no schema has been added",
282                )
283            })?;
284            view_version.with_schema_id(last_added_schema_id)
285        } else {
286            view_version
287        };
288
289        if !self
290            .metadata
291            .schemas
292            .contains_key(&view_version.schema_id())
293        {
294            return Err(Error::new(
295                ErrorKind::DataInvalid,
296                format!(
297                    "Cannot add version with unknown schema: {}",
298                    view_version.schema_id()
299                ),
300            ));
301        }
302
303        require_unique_dialects(&view_version)?;
304
305        // The `TableMetadataBuilder` uses these checks in multiple places - also in Java.
306        // If we think delayed requests are a problem, I think we should also add it here.
307        if let Some(last) = self.metadata.version_log.last() {
308            // commits can happen concurrently from different machines.
309            // A tolerance helps us avoid failure for small clock skew
310            if view_version.timestamp_ms() - last.timestamp_ms() < -ONE_MINUTE_MS {
311                return Err(Error::new(
312                    ErrorKind::DataInvalid,
313                    format!(
314                        "Invalid snapshot timestamp {}: before last snapshot timestamp {}",
315                        view_version.timestamp_ms(),
316                        last.timestamp_ms()
317                    ),
318                ));
319            }
320        }
321
322        self.metadata
323            .versions
324            .insert(version_id, Arc::new(view_version.clone()));
325
326        let view_version = if let Some(last_added_schema_id) = self.last_added_schema_id {
327            if view_version.schema_id() == last_added_schema_id {
328                view_version.with_schema_id(Self::LAST_ADDED)
329            } else {
330                view_version
331            }
332        } else {
333            view_version
334        };
335        self.changes
336            .push(ViewUpdate::AddViewVersion { view_version });
337
338        self.last_added_version_id = Some(version_id);
339
340        Ok(version_id)
341    }
342
343    fn reuse_or_create_new_view_version_id(&self, new_view_version: &ViewVersion) -> i32 {
344        self.metadata
345            .versions
346            .iter()
347            .find_map(|(id, other_version)| {
348                new_view_version
349                    .behaves_identical_to(other_version)
350                    .then_some(*id)
351            })
352            .unwrap_or_else(|| {
353                self.get_highest_view_version_id()
354                    .map(|id| id + 1)
355                    .unwrap_or(INITIAL_VIEW_VERSION_ID)
356            })
357    }
358
359    fn get_highest_view_version_id(&self) -> Option<i32> {
360        self.metadata.versions.keys().max().copied()
361    }
362
363    /// Add a new schema to the view.
364    pub fn add_schema(mut self, schema: Schema) -> Self {
365        self.add_schema_internal(schema);
366
367        self
368    }
369
370    fn add_schema_internal(&mut self, schema: Schema) -> SchemaId {
371        let schema_id = self.reuse_or_create_new_schema_id(&schema);
372
373        if self.metadata.schemas.contains_key(&schema_id) {
374            // ToDo Discuss: Java does not add changes in this case. I prefer to add changes
375            // as the state of the builder is potentially mutated (`last_added_schema_id`),
376            // thus we should record the change.
377            if self.last_added_schema_id != Some(schema_id) {
378                self.changes.push(ViewUpdate::AddSchema {
379                    schema: schema.clone().with_schema_id(schema_id),
380                    last_column_id: None,
381                });
382                self.last_added_schema_id = Some(schema_id);
383            }
384            return schema_id;
385        }
386
387        let schema = schema.with_schema_id(schema_id);
388
389        self.metadata
390            .schemas
391            .insert(schema_id, Arc::new(schema.clone()));
392        let last_column_id = schema.highest_field_id();
393        self.changes.push(ViewUpdate::AddSchema {
394            schema,
395            last_column_id: Some(last_column_id),
396        });
397
398        self.last_added_schema_id = Some(schema_id);
399
400        schema_id
401    }
402
403    fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> SchemaId {
404        self.metadata
405            .schemas
406            .iter()
407            .find_map(|(id, schema)| new_schema.is_same_schema(schema).then_some(*id))
408            .unwrap_or_else(|| {
409                self.get_highest_schema_id()
410                    .map(|id| id + 1)
411                    .unwrap_or(DEFAULT_SCHEMA_ID)
412            })
413    }
414
415    fn get_highest_schema_id(&self) -> Option<SchemaId> {
416        self.metadata.schemas.keys().max().copied()
417    }
418
419    /// Update properties of the view.
420    pub fn set_properties(mut self, updates: HashMap<String, String>) -> Result<Self> {
421        if updates.is_empty() {
422            return Ok(self);
423        }
424
425        let num_versions_to_keep = updates
426            .get(VIEW_PROPERTY_VERSION_HISTORY_SIZE)
427            .and_then(|v| v.parse::<i64>().ok())
428            .unwrap_or(1);
429        if num_versions_to_keep < 0 {
430            return Err(Error::new(
431                ErrorKind::DataInvalid,
432                format!(
433                    "{VIEW_PROPERTY_VERSION_HISTORY_SIZE} must be positive but was {num_versions_to_keep}"
434                ),
435            ));
436        }
437
438        self.metadata.properties.extend(updates.clone());
439        self.changes.push(ViewUpdate::SetProperties { updates });
440
441        Ok(self)
442    }
443
444    /// Remove properties from the view
445    pub fn remove_properties(mut self, removals: &[String]) -> Self {
446        if removals.is_empty() {
447            return self;
448        }
449
450        for property in removals {
451            self.metadata.properties.remove(property);
452        }
453
454        self.changes.push(ViewUpdate::RemoveProperties {
455            removals: removals.to_vec(),
456        });
457
458        self
459    }
460
461    /// Assign a new UUID to the view.
462    pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
463        if self.metadata.view_uuid != uuid {
464            self.metadata.view_uuid = uuid;
465            self.changes.push(ViewUpdate::AssignUuid { uuid });
466        }
467
468        self
469    }
470
471    /// Build the `ViewMetadata` from the changes.
472    pub fn build(mut self) -> Result<ViewMetadataBuildResult> {
473        if let Some(history_entry) = self.history_entry.take() {
474            self.metadata.version_log.push(history_entry);
475        }
476
477        // We should run validate before `self.metadata.current_version()` below,
478        // as it might panic if the metadata is invalid.
479        self.metadata.validate()?;
480
481        if let Some(previous) = self.previous_view_version.take() {
482            if !allow_replace_drop_dialects(&self.metadata.properties) {
483                require_no_dialect_dropped(&previous, self.metadata.current_version())?;
484            }
485        }
486
487        let _expired_versions = self.expire_versions();
488        self.metadata.version_log = update_version_log(
489            self.metadata.version_log,
490            self.metadata.versions.keys().copied().collect(),
491        );
492
493        Ok(ViewMetadataBuildResult {
494            metadata: self.metadata,
495            changes: self.changes,
496        })
497    }
498
499    /// Removes expired versions from the view and returns them.
500    fn expire_versions(&mut self) -> Vec<ViewVersionRef> {
501        let num_versions_to_keep = self
502            .metadata
503            .properties
504            .get(VIEW_PROPERTY_VERSION_HISTORY_SIZE)
505            .and_then(|v| v.parse::<usize>().ok())
506            .unwrap_or(VIEW_PROPERTY_VERSION_HISTORY_SIZE_DEFAULT)
507            .max(1);
508
509        // expire old versions, but keep at least the versions added in this builder
510        let num_added_versions = self
511            .changes
512            .iter()
513            .filter(|update| matches!(update, ViewUpdate::AddViewVersion { .. }))
514            .count();
515        let num_versions_to_keep = num_added_versions.max(num_versions_to_keep);
516
517        if self.metadata.versions.len() > num_versions_to_keep {
518            // version ids are assigned sequentially. keep the latest versions by ID.
519            let mut versions_to_keep = self
520                .metadata
521                .versions
522                .keys()
523                .copied()
524                .sorted()
525                .rev()
526                .take(num_versions_to_keep)
527                .collect::<HashSet<_>>();
528
529            // always retain current version
530            if !versions_to_keep.contains(&self.metadata.current_version_id) {
531                // Remove the lowest ID
532                if num_versions_to_keep > num_added_versions {
533                    let lowest_id = versions_to_keep.iter().min().copied();
534                    lowest_id.map(|id| versions_to_keep.remove(&id));
535                }
536                // Add the current version ID
537                versions_to_keep.insert(self.metadata.current_version_id);
538            }
539
540            let mut expired_versions = Vec::new();
541            // remove all versions which are not in versions_to_keep from the metadata
542            // and add them to the expired_versions list
543            self.metadata.versions.retain(|id, version| {
544                if versions_to_keep.contains(id) {
545                    true
546                } else {
547                    expired_versions.push(version.clone());
548                    false
549                }
550            });
551
552            expired_versions
553        } else {
554            Vec::new()
555        }
556    }
557}
558
559/// Expire version log entries that are no longer relevant.
560/// Returns the history entries to retain.
561fn update_version_log(
562    version_log: Vec<ViewVersionLog>,
563    ids_to_keep: HashSet<i32>,
564) -> Vec<ViewVersionLog> {
565    let mut retained_history = Vec::new();
566    for log_entry in version_log {
567        if ids_to_keep.contains(&log_entry.version_id()) {
568            retained_history.push(log_entry);
569        } else {
570            retained_history.clear();
571        }
572    }
573    retained_history
574}
575
576fn allow_replace_drop_dialects(properties: &HashMap<String, String>) -> bool {
577    properties
578        .get(VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED)
579        .map_or(
580            VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED_DEFAULT,
581            |value| is_truthy(value),
582        )
583}
584
585fn require_no_dialect_dropped(previous: &ViewVersion, current: &ViewVersion) -> Result<()> {
586    let base_dialects = lowercase_sql_dialects_for(previous);
587    let updated_dialects = lowercase_sql_dialects_for(current);
588
589    if !updated_dialects.is_superset(&base_dialects) {
590        return Err(Error::new(
591            ErrorKind::DataInvalid,
592            format!(
593                "Cannot replace view due to loss of view dialects: \nPrevious dialects: {:?}\nNew dialects: {:?}\nSet {} to true to allow dropping dialects.",
594                Vec::from_iter(base_dialects),
595                Vec::from_iter(updated_dialects),
596                VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED
597            ),
598        ));
599    }
600
601    Ok(())
602}
603
604fn lowercase_sql_dialects_for(view_version: &ViewVersion) -> HashSet<String> {
605    view_version
606        .representations()
607        .iter()
608        .map(|repr| match repr {
609            ViewRepresentation::Sql(sql_repr) => sql_repr.dialect.to_lowercase(),
610        })
611        .collect()
612}
613
614pub(super) fn require_unique_dialects(view_version: &ViewVersion) -> Result<()> {
615    let mut seen_dialects = HashSet::with_capacity(view_version.representations().len());
616    for repr in view_version.representations().iter() {
617        match repr {
618            ViewRepresentation::Sql(sql_repr) => {
619                if !seen_dialects.insert(sql_repr.dialect.to_lowercase()) {
620                    return Err(Error::new(
621                        ErrorKind::DataInvalid,
622                        format!(
623                            "Invalid view version: Cannot add multiple queries for dialect {}",
624                            sql_repr.dialect
625                        ),
626                    ));
627                }
628            }
629        }
630    }
631    Ok(())
632}
633
634#[cfg(test)]
635mod test {
636    use super::super::view_metadata::tests::get_test_view_metadata;
637    use super::*;
638    use crate::NamespaceIdent;
639    use crate::spec::{
640        NestedField, PrimitiveType, SqlViewRepresentation, Type, ViewRepresentations,
641    };
642
643    fn new_view_version(id: usize, schema_id: SchemaId, sql: &str) -> ViewVersion {
644        new_view_version_with_dialect(id, schema_id, sql, vec!["spark"])
645    }
646
647    fn new_view_version_with_dialect(
648        id: usize,
649        schema_id: SchemaId,
650        sql: &str,
651        dialects: Vec<&str>,
652    ) -> ViewVersion {
653        ViewVersion::builder()
654            .with_version_id(id as i32)
655            .with_schema_id(schema_id)
656            .with_timestamp_ms(1573518431300)
657            .with_default_catalog(Some("prod".to_string()))
658            .with_summary(HashMap::from_iter(vec![(
659                "user".to_string(),
660                "some-user".to_string(),
661            )]))
662            .with_representations(ViewRepresentations(
663                dialects
664                    .iter()
665                    .map(|dialect| {
666                        ViewRepresentation::Sql(SqlViewRepresentation {
667                            dialect: dialect.to_string(),
668                            sql: sql.to_string(),
669                        })
670                    })
671                    .collect(),
672            ))
673            .with_default_namespace(NamespaceIdent::new("default".to_string()))
674            .build()
675    }
676
677    fn builder_without_changes() -> ViewMetadataBuilder {
678        ViewMetadataBuilder::new_from_metadata(get_test_view_metadata("ViewMetadataV1Valid.json"))
679    }
680
681    #[test]
682    fn test_minimal_builder() {
683        let location = "s3://bucket/table".to_string();
684        let schema = Schema::builder()
685            .with_schema_id(1)
686            .with_fields(vec![])
687            .build()
688            .unwrap();
689        // Version ID and schema should be re-assigned
690        let version = new_view_version(20, 21, "select 1 as count");
691        let format_version = ViewFormatVersion::V1;
692        let properties = HashMap::from_iter(vec![("key".to_string(), "value".to_string())]);
693
694        let build_result = ViewMetadataBuilder::new(
695            location.clone(),
696            schema.clone(),
697            version.clone(),
698            format_version,
699            properties.clone(),
700        )
701        .unwrap()
702        .build()
703        .unwrap();
704
705        let metadata = build_result.metadata;
706        assert_eq!(metadata.location, location);
707        assert_eq!(metadata.current_version_id, INITIAL_VIEW_VERSION_ID);
708        assert_eq!(metadata.format_version, format_version);
709        assert_eq!(metadata.properties, properties);
710        assert_eq!(metadata.versions.len(), 1);
711        assert_eq!(metadata.schemas.len(), 1);
712        assert_eq!(metadata.version_log.len(), 1);
713        assert_eq!(
714            Arc::unwrap_or_clone(metadata.versions[&INITIAL_VIEW_VERSION_ID].clone()),
715            version
716                .clone()
717                .with_version_id(INITIAL_VIEW_VERSION_ID)
718                .with_schema_id(0)
719        );
720
721        let changes = build_result.changes;
722        assert_eq!(changes.len(), 5);
723        assert!(changes.contains(&ViewUpdate::SetLocation { location }));
724        assert!(
725            changes.contains(&ViewUpdate::AddViewVersion {
726                view_version: version
727                    .with_version_id(INITIAL_VIEW_VERSION_ID)
728                    .with_schema_id(-1)
729            })
730        );
731        assert!(changes.contains(&ViewUpdate::SetCurrentViewVersion {
732            view_version_id: -1
733        }));
734        assert!(changes.contains(&ViewUpdate::AddSchema {
735            schema: schema.clone().with_schema_id(0),
736            last_column_id: Some(0)
737        }));
738        assert!(changes.contains(&ViewUpdate::SetProperties {
739            updates: properties
740        }));
741    }
742
743    #[test]
744    fn test_version_expiration() {
745        let v1 = new_view_version(0, 1, "select 1 as count");
746        let v2 = new_view_version(0, 1, "select count(1) as count from t2");
747        let v3 = new_view_version(0, 1, "select count from t1");
748
749        let builder = builder_without_changes()
750            .add_version(v1)
751            .unwrap()
752            .add_version(v2)
753            .unwrap()
754            .add_version(v3)
755            .unwrap();
756        let builder_without_changes = builder.clone().build().unwrap().metadata.into_builder();
757
758        // No limit on versions
759        let metadata = builder.clone().build().unwrap().metadata;
760        assert_eq!(
761            metadata.versions.keys().cloned().collect::<HashSet<_>>(),
762            HashSet::from_iter(vec![1, 2, 3, 4])
763        );
764
765        // Limit to 2 versions, we still want to keep 3 versions as 3 where added during this build
766        // Plus the current version
767        let metadata = builder
768            .clone()
769            .set_properties(HashMap::from_iter(vec![(
770                VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
771                "2".to_string(),
772            )]))
773            .unwrap()
774            .build()
775            .unwrap()
776            .metadata;
777        assert_eq!(
778            metadata.versions.keys().cloned().collect::<HashSet<_>>(),
779            HashSet::from_iter(vec![1, 2, 3, 4])
780        );
781        assert_eq!(metadata.version_log.len(), 1);
782
783        // Limit to 2 versions in new build, only keep 2.
784        // One of them should be the current
785        let metadata = builder_without_changes
786            .clone()
787            .set_properties(HashMap::from_iter(vec![(
788                VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
789                "2".to_string(),
790            )]))
791            .unwrap()
792            .build()
793            .unwrap()
794            .metadata;
795        assert_eq!(
796            metadata.versions.keys().cloned().collect::<HashSet<_>>(),
797            HashSet::from_iter(vec![1, 4])
798        );
799
800        // Keep at least 1 version irrespective of the limit.
801        // This is the current version
802        let metadata = builder_without_changes
803            .set_properties(HashMap::from_iter(vec![(
804                VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
805                "0".to_string(),
806            )]))
807            .unwrap()
808            .build()
809            .unwrap()
810            .metadata;
811        assert_eq!(
812            metadata.versions.keys().cloned().collect::<HashSet<_>>(),
813            HashSet::from_iter(vec![1])
814        );
815    }
816
817    #[test]
818    fn test_update_version_log() {
819        let v1 = new_view_version(1, 1, "select 1 as count");
820        let v2 = new_view_version(2, 1, "select count(1) as count from t2");
821        let v3 = new_view_version(3, 1, "select count from t1");
822
823        let one = ViewVersionLog::new(1, v1.timestamp_ms());
824        let two = ViewVersionLog::new(2, v2.timestamp_ms());
825        let three = ViewVersionLog::new(3, v3.timestamp_ms());
826
827        assert_eq!(
828            update_version_log(
829                vec![one.clone(), two.clone(), three.clone()],
830                HashSet::from_iter(vec![1, 2, 3])
831            ),
832            vec![one.clone(), two.clone(), three.clone()]
833        );
834
835        // one was an invalid entry in the history, so all previous elements are removed
836        assert_eq!(
837            update_version_log(
838                vec![
839                    three.clone(),
840                    two.clone(),
841                    one.clone(),
842                    two.clone(),
843                    three.clone()
844                ],
845                HashSet::from_iter(vec![2, 3])
846            ),
847            vec![two.clone(), three.clone()]
848        );
849
850        // two was an invalid entry in the history, so all previous elements are removed
851        assert_eq!(
852            update_version_log(
853                vec![
854                    one.clone(),
855                    two.clone(),
856                    three.clone(),
857                    one.clone(),
858                    three.clone()
859                ],
860                HashSet::from_iter(vec![1, 3])
861            ),
862            vec![three.clone(), one.clone(), three.clone()]
863        );
864    }
865
866    #[test]
867    fn test_use_previously_added_version() {
868        let v2 = new_view_version(2, 1, "select 1 as count");
869        let v3 = new_view_version(3, 1, "select count(1) as count from t2");
870        let schema = Schema::builder().build().unwrap();
871
872        let log_v2 = ViewVersionLog::new(2, v2.timestamp_ms());
873        let log_v3 = ViewVersionLog::new(3, v3.timestamp_ms());
874
875        let metadata_v2 = builder_without_changes()
876            .set_current_version(v2.clone(), schema.clone())
877            .unwrap()
878            .build()
879            .unwrap()
880            .metadata;
881
882        // Log should use the exact timestamp of v1
883        assert_eq!(metadata_v2.version_log.last().unwrap(), &log_v2);
884
885        // Add second version, should use exact timestamp of v2
886        let metadata_v3 = metadata_v2
887            .into_builder()
888            .set_current_version(v3.clone(), schema)
889            .unwrap()
890            .build()
891            .unwrap()
892            .metadata;
893
894        assert_eq!(metadata_v3.version_log[1..], vec![
895            log_v2.clone(),
896            log_v3.clone()
897        ]);
898
899        // Re-use Version 1, add a new log entry with a new timestamp
900        let metadata_v4 = metadata_v3
901            .into_builder()
902            .set_current_version_id(2)
903            .unwrap()
904            .build()
905            .unwrap()
906            .metadata;
907
908        // Last entry should be equal to v2 but with an updated timestamp
909        let entry = metadata_v4.version_log.last().unwrap();
910        assert_eq!(entry.version_id(), 2);
911        assert!(entry.timestamp_ms() > v2.timestamp_ms());
912    }
913
914    #[test]
915    fn test_assign_uuid() {
916        let builder = builder_without_changes();
917        let uuid = Uuid::now_v7();
918        let build_result = builder.clone().assign_uuid(uuid).build().unwrap();
919        assert_eq!(build_result.metadata.view_uuid, uuid);
920        assert_eq!(build_result.changes, vec![ViewUpdate::AssignUuid { uuid }]);
921    }
922
923    #[test]
924    fn test_set_location() {
925        let builder = builder_without_changes();
926        let location = "s3://bucket/table".to_string();
927        let build_result = builder
928            .clone()
929            .set_location(location.clone())
930            .build()
931            .unwrap();
932        assert_eq!(build_result.metadata.location, location);
933        assert_eq!(build_result.changes, vec![ViewUpdate::SetLocation {
934            location
935        }]);
936    }
937
938    #[test]
939    fn test_set_and_remove_properties() {
940        let builder = builder_without_changes();
941        let properties = HashMap::from_iter(vec![
942            ("key1".to_string(), "value1".to_string()),
943            ("key2".to_string(), "value2".to_string()),
944        ]);
945        let build_result = builder
946            .clone()
947            .set_properties(properties.clone())
948            .unwrap()
949            .remove_properties(&["key2".to_string(), "key3".to_string()])
950            .build()
951            .unwrap();
952        assert_eq!(
953            build_result.metadata.properties.get("key1"),
954            Some(&"value1".to_string())
955        );
956        assert_eq!(build_result.metadata.properties.get("key2"), None);
957        assert_eq!(build_result.changes, vec![
958            ViewUpdate::SetProperties {
959                updates: properties
960            },
961            ViewUpdate::RemoveProperties {
962                removals: vec!["key2".to_string(), "key3".to_string()]
963            }
964        ]);
965    }
966
967    #[test]
968    fn test_add_schema() {
969        let builder = builder_without_changes();
970        let schema = Schema::builder()
971            .with_schema_id(1)
972            .with_fields(vec![])
973            .build()
974            .unwrap();
975        let build_result = builder.clone().add_schema(schema.clone()).build().unwrap();
976        assert_eq!(build_result.metadata.schemas.len(), 2);
977        assert_eq!(build_result.changes, vec![ViewUpdate::AddSchema {
978            schema: schema.clone().with_schema_id(2),
979            last_column_id: Some(0)
980        }]);
981
982        // Add schema again - id is reused
983        let build_result = builder.clone().add_schema(schema.clone()).build().unwrap();
984        assert_eq!(build_result.metadata.schemas.len(), 2);
985        assert_eq!(build_result.changes, vec![ViewUpdate::AddSchema {
986            schema: schema.clone().with_schema_id(2),
987            last_column_id: Some(0)
988        }]);
989    }
990
991    #[test]
992    fn test_add_and_set_current_version() {
993        let builder = builder_without_changes();
994        let v1 = new_view_version(2, 1, "select 1 as count");
995        let v2 = new_view_version(3, 2, "select count(1) as count from t2");
996        let v2_schema = Schema::builder()
997            .with_schema_id(2)
998            .with_fields(vec![])
999            .build()
1000            .unwrap();
1001
1002        let build_result = builder
1003            .clone()
1004            .add_version(v1.clone())
1005            .unwrap()
1006            .add_schema(v2_schema.clone())
1007            .add_version(v2.clone())
1008            .unwrap()
1009            .set_current_version_id(3)
1010            .unwrap()
1011            .build()
1012            .unwrap();
1013
1014        assert_eq!(build_result.metadata.current_version_id, 3);
1015        assert_eq!(build_result.metadata.versions.len(), 3);
1016        assert_eq!(build_result.metadata.schemas.len(), 2);
1017        assert_eq!(build_result.metadata.version_log.len(), 2);
1018        assert_eq!(
1019            Arc::unwrap_or_clone(build_result.metadata.versions[&2].clone()),
1020            v1.clone().with_version_id(2).with_schema_id(1)
1021        );
1022        assert_eq!(
1023            Arc::unwrap_or_clone(build_result.metadata.versions[&3].clone()),
1024            v2.clone().with_version_id(3).with_schema_id(2)
1025        );
1026        assert_eq!(build_result.changes.len(), 4);
1027        assert_eq!(build_result.changes, vec![
1028            ViewUpdate::AddViewVersion {
1029                view_version: v1.clone().with_version_id(2).with_schema_id(1)
1030            },
1031            ViewUpdate::AddSchema {
1032                schema: v2_schema.clone().with_schema_id(2),
1033                last_column_id: Some(0)
1034            },
1035            ViewUpdate::AddViewVersion {
1036                view_version: v2.clone().with_version_id(3).with_schema_id(-1)
1037            },
1038            ViewUpdate::SetCurrentViewVersion {
1039                view_version_id: -1
1040            }
1041        ]);
1042        assert_eq!(
1043            build_result
1044                .metadata
1045                .version_log
1046                .iter()
1047                .map(|v| v.version_id())
1048                .collect::<Vec<_>>(),
1049            vec![1, 3]
1050        );
1051    }
1052
1053    #[test]
1054    fn test_schema_and_version_id_reassignment() {
1055        let builder = builder_without_changes();
1056        let v1 = new_view_version(0, 1, "select 1 as count");
1057        let v2 = new_view_version(0, 2, "select count(1) as count from t2");
1058        let v2_schema = Schema::builder()
1059            .with_schema_id(0)
1060            .with_fields(vec![])
1061            .build()
1062            .unwrap();
1063
1064        let build_result = builder
1065            .clone()
1066            .add_version(v1.clone())
1067            .unwrap()
1068            .set_current_version(v2.clone(), v2_schema.clone())
1069            .unwrap()
1070            .build()
1071            .unwrap();
1072
1073        assert_eq!(build_result.metadata.current_version_id, 3);
1074        assert_eq!(build_result.metadata.versions.len(), 3);
1075        assert_eq!(build_result.metadata.schemas.len(), 2);
1076        assert_eq!(build_result.metadata.version_log.len(), 2);
1077        assert_eq!(
1078            Arc::unwrap_or_clone(build_result.metadata.versions[&2].clone()),
1079            v1.clone().with_version_id(2).with_schema_id(1)
1080        );
1081        assert_eq!(
1082            Arc::unwrap_or_clone(build_result.metadata.versions[&3].clone()),
1083            v2.clone().with_version_id(3).with_schema_id(2)
1084        );
1085        assert_eq!(build_result.changes.len(), 4);
1086        assert_eq!(build_result.changes, vec![
1087            ViewUpdate::AddViewVersion {
1088                view_version: v1.clone().with_version_id(2).with_schema_id(1)
1089            },
1090            ViewUpdate::AddSchema {
1091                schema: v2_schema.clone().with_schema_id(2),
1092                last_column_id: Some(0)
1093            },
1094            ViewUpdate::AddViewVersion {
1095                view_version: v2.clone().with_version_id(3).with_schema_id(-1)
1096            },
1097            ViewUpdate::SetCurrentViewVersion {
1098                view_version_id: -1
1099            }
1100        ]);
1101        assert_eq!(
1102            build_result
1103                .metadata
1104                .version_log
1105                .iter()
1106                .map(|v| v.version_id())
1107                .collect::<Vec<_>>(),
1108            vec![1, 3]
1109        );
1110    }
1111
1112    #[test]
1113    fn test_view_version_deduplication() {
1114        let builder = builder_without_changes();
1115        let v1 = new_view_version(0, 1, "select * from ns.tbl");
1116
1117        assert_eq!(builder.metadata.versions.len(), 1);
1118        let build_result = builder
1119            .clone()
1120            .add_version(v1.clone())
1121            .unwrap()
1122            .add_version(v1)
1123            .unwrap()
1124            .build()
1125            .unwrap();
1126
1127        assert_eq!(build_result.metadata.versions.len(), 2);
1128        assert_eq!(build_result.metadata.schemas.len(), 1);
1129    }
1130
1131    #[test]
1132    fn test_view_version_and_schema_deduplication() {
1133        let schema_one = Schema::builder()
1134            .with_schema_id(5)
1135            .with_fields(vec![
1136                NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1137            ])
1138            .build()
1139            .unwrap();
1140        let schema_two = Schema::builder()
1141            .with_schema_id(7)
1142            .with_fields(vec![
1143                NestedField::required(1, "y", Type::Primitive(PrimitiveType::Long)).into(),
1144            ])
1145            .build()
1146            .unwrap();
1147        let schema_three = Schema::builder()
1148            .with_schema_id(9)
1149            .with_fields(vec![
1150                NestedField::required(1, "z", Type::Primitive(PrimitiveType::Long)).into(),
1151            ])
1152            .build()
1153            .unwrap();
1154
1155        let v1 = new_view_version(1, 5, "select * from ns.tbl");
1156        let v2 = new_view_version(1, 7, "select count(*) from ns.tbl");
1157        let v3 = new_view_version(1, 9, "select count(*) as count from ns.tbl");
1158
1159        let build_result = builder_without_changes()
1160            .add_schema(schema_one.clone())
1161            .add_schema(schema_two.clone())
1162            .add_schema(schema_three.clone())
1163            .set_current_version(v1.clone(), schema_one.clone())
1164            .unwrap()
1165            .set_current_version(v2.clone(), schema_two.clone())
1166            .unwrap()
1167            .set_current_version(v3.clone(), schema_three.clone())
1168            .unwrap()
1169            .set_current_version(v3.clone(), schema_three.clone())
1170            .unwrap()
1171            .set_current_version(v2.clone(), schema_two.clone())
1172            .unwrap()
1173            .set_current_version(v1.clone(), schema_one.clone())
1174            .unwrap()
1175            .build()
1176            .unwrap();
1177
1178        assert_eq!(
1179            Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
1180            v1.clone().with_version_id(2).with_schema_id(2)
1181        );
1182        assert_eq!(build_result.metadata.versions.len(), 4);
1183        assert_eq!(
1184            build_result.metadata.versions[&2],
1185            Arc::new(v1.clone().with_version_id(2).with_schema_id(2))
1186        );
1187        assert_eq!(
1188            build_result.metadata.versions[&3],
1189            Arc::new(v2.clone().with_version_id(3).with_schema_id(3))
1190        );
1191        assert_eq!(
1192            build_result.metadata.versions[&4],
1193            Arc::new(v3.clone().with_version_id(4).with_schema_id(4))
1194        );
1195        assert_eq!(
1196            // Remove schema_id 1 and get struct only
1197            build_result
1198                .metadata
1199                .schemas_iter()
1200                .filter(|s| s.schema_id() != 1)
1201                .sorted_by_key(|s| s.schema_id())
1202                .map(|s| s.as_struct())
1203                .collect::<Vec<_>>(),
1204            vec![
1205                schema_one.as_struct(),
1206                schema_two.as_struct(),
1207                schema_three.as_struct()
1208            ]
1209        )
1210    }
1211
1212    #[test]
1213    fn test_error_on_missing_schema() {
1214        let builder = builder_without_changes();
1215        // Missing schema
1216        assert!(
1217            builder
1218                .clone()
1219                .add_version(new_view_version(0, 10, "SELECT * FROM foo"))
1220                .unwrap_err()
1221                .to_string()
1222                .contains("Cannot add version with unknown schema: 10")
1223        );
1224
1225        // Missing last added schema
1226        assert!(
1227            builder
1228                .clone()
1229                .add_version(new_view_version(0, -1, "SELECT * FROM foo"))
1230                .unwrap_err()
1231                .to_string()
1232                .contains("Cannot set last added schema: no schema has been added")
1233        );
1234    }
1235
1236    #[test]
1237    fn test_error_on_missing_current_version() {
1238        let builder = builder_without_changes();
1239        assert!(builder
1240            .clone()
1241            .set_current_version_id(-1)
1242            .unwrap_err()
1243            .to_string()
1244            .contains(
1245                "Cannot set current version id to last added version: no version has been added."
1246            ));
1247        assert!(
1248            builder
1249                .clone()
1250                .set_current_version_id(10)
1251                .unwrap_err()
1252                .to_string()
1253                .contains("Cannot set current version to unknown version with id: 10")
1254        );
1255    }
1256
1257    #[test]
1258    fn test_set_current_version_to_last_added() {
1259        let builder = builder_without_changes();
1260        let v1 = new_view_version(2, 1, "select * from ns.tbl");
1261        let v2 = new_view_version(3, 1, "select a,b from ns.tbl");
1262        let meta = builder
1263            .clone()
1264            .add_version(v1)
1265            .unwrap()
1266            .add_version(v2)
1267            .unwrap()
1268            .set_current_version_id(-1)
1269            .unwrap()
1270            .build()
1271            .unwrap();
1272        assert_eq!(meta.metadata.current_version_id, 3);
1273    }
1274
1275    #[test]
1276    fn test_error_when_setting_negative_version_history_size() {
1277        let builder = builder_without_changes();
1278        assert!(
1279            builder
1280                .clone()
1281                .set_properties(HashMap::from_iter(vec![(
1282                    VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
1283                    "-1".to_string(),
1284                )]))
1285                .unwrap_err()
1286                .to_string()
1287                .contains("version.history.num-entries must be positive but was -1")
1288        );
1289    }
1290
1291    #[test]
1292    fn test_view_version_changes() {
1293        let builder = builder_without_changes();
1294
1295        let v1 = new_view_version(2, 1, "select 1 as count");
1296        let v2 = new_view_version(3, 1, "select count(1) as count from t2");
1297
1298        let changes = builder
1299            .clone()
1300            .add_version(v1.clone())
1301            .unwrap()
1302            .add_version(v2.clone())
1303            .unwrap()
1304            .build()
1305            .unwrap()
1306            .changes;
1307
1308        assert_eq!(changes.len(), 2);
1309        assert_eq!(changes, vec![
1310            ViewUpdate::AddViewVersion {
1311                view_version: v1.clone()
1312            },
1313            ViewUpdate::AddViewVersion {
1314                view_version: v2.clone()
1315            }
1316        ]);
1317    }
1318
1319    #[test]
1320    fn test_dropping_dialect_fails_by_default() {
1321        let builder = builder_without_changes();
1322
1323        let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
1324        let spark_trino =
1325            new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark", "trino"]);
1326        let schema = Schema::builder()
1327            .with_schema_id(0)
1328            .with_fields(vec![])
1329            .build()
1330            .unwrap();
1331
1332        let err = builder
1333            .set_current_version(spark_trino, schema.clone())
1334            .unwrap()
1335            .build()
1336            .unwrap()
1337            .metadata
1338            .into_builder()
1339            .set_current_version(spark, schema)
1340            .unwrap()
1341            .build()
1342            .unwrap_err();
1343
1344        assert!(
1345            err.to_string()
1346                .contains("Cannot replace view due to loss of view dialects")
1347        );
1348    }
1349
1350    #[test]
1351    fn test_dropping_dialects_does_not_fail_when_allowed() {
1352        let builder = builder_without_changes();
1353
1354        let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
1355        let spark_trino =
1356            new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark", "trino"]);
1357        let schema = Schema::builder()
1358            .with_schema_id(0)
1359            .with_fields(vec![])
1360            .build()
1361            .unwrap();
1362
1363        let build_result = builder
1364            .set_properties(HashMap::from_iter(vec![(
1365                VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED.to_string(),
1366                "true".to_string(),
1367            )]))
1368            .unwrap()
1369            .set_current_version(spark_trino, schema.clone())
1370            .unwrap()
1371            .build()
1372            .unwrap()
1373            .metadata
1374            .into_builder()
1375            .set_current_version(spark.clone(), schema)
1376            .unwrap()
1377            .build()
1378            .unwrap();
1379
1380        assert_eq!(
1381            Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
1382            spark.with_version_id(3).with_schema_id(2)
1383        );
1384    }
1385
1386    #[test]
1387    fn test_can_add_dialects_by_default() {
1388        let builder = builder_without_changes();
1389
1390        let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
1391        let spark_trino =
1392            new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark", "trino"]);
1393
1394        let schema = Schema::builder()
1395            .with_schema_id(0)
1396            .with_fields(vec![])
1397            .build()
1398            .unwrap();
1399
1400        let build_result = builder
1401            .set_current_version(spark.clone(), schema.clone())
1402            .unwrap()
1403            .build()
1404            .unwrap()
1405            .metadata
1406            .into_builder()
1407            .set_current_version(spark_trino.clone(), schema.clone())
1408            .unwrap()
1409            .build()
1410            .unwrap();
1411
1412        assert_eq!(
1413            Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
1414            spark_trino.with_version_id(3).with_schema_id(2)
1415        );
1416    }
1417
1418    #[test]
1419    fn test_can_update_dialect_by_default() {
1420        let builder = builder_without_changes();
1421
1422        let spark_v1 = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
1423        let spark_v2 = new_view_version_with_dialect(0, 0, "SELECT * FROM bar", vec!["spark"]);
1424
1425        let schema = Schema::builder()
1426            .with_schema_id(0)
1427            .with_fields(vec![])
1428            .build()
1429            .unwrap();
1430
1431        let build_result = builder
1432            .set_current_version(spark_v1.clone(), schema.clone())
1433            .unwrap()
1434            .build()
1435            .unwrap()
1436            .metadata
1437            .into_builder()
1438            .set_current_version(spark_v2.clone(), schema.clone())
1439            .unwrap()
1440            .build()
1441            .unwrap();
1442
1443        assert_eq!(
1444            Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
1445            spark_v2.with_version_id(3).with_schema_id(2)
1446        );
1447    }
1448
1449    #[test]
1450    fn test_dropping_dialects_allowed_and_then_disallowed() {
1451        let builder = builder_without_changes();
1452
1453        let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
1454        let trino = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["trino"]);
1455
1456        let schema = Schema::builder()
1457            .with_schema_id(0)
1458            .with_fields(vec![])
1459            .build()
1460            .unwrap();
1461
1462        let updated = builder
1463            .set_current_version(spark.clone(), schema.clone())
1464            .unwrap()
1465            .build()
1466            .unwrap()
1467            .metadata
1468            .into_builder()
1469            .set_current_version(trino.clone(), schema.clone())
1470            .unwrap()
1471            .set_properties(HashMap::from_iter(vec![(
1472                VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED.to_string(),
1473                "true".to_string(),
1474            )]))
1475            .unwrap()
1476            .build()
1477            .unwrap();
1478
1479        assert_eq!(
1480            Arc::unwrap_or_clone(updated.metadata.current_version().clone()),
1481            trino.with_version_id(3).with_schema_id(2)
1482        );
1483
1484        let err = updated
1485            .metadata
1486            .into_builder()
1487            .set_current_version(spark.clone(), schema.clone())
1488            .unwrap()
1489            .set_properties(HashMap::from_iter(vec![(
1490                VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED.to_string(),
1491                "false".to_string(),
1492            )]))
1493            .unwrap()
1494            .build()
1495            .unwrap_err();
1496
1497        assert!(
1498            err.to_string()
1499                .contains("Cannot replace view due to loss of view dialects")
1500        );
1501    }
1502
1503    #[test]
1504    fn test_require_no_dialect_dropped() {
1505        let previous = ViewVersion::builder()
1506            .with_version_id(0)
1507            .with_schema_id(0)
1508            .with_timestamp_ms(0)
1509            .with_representations(ViewRepresentations(vec![
1510                ViewRepresentation::Sql(SqlViewRepresentation {
1511                    dialect: "trino".to_string(),
1512                    sql: "SELECT * FROM foo".to_string(),
1513                }),
1514                ViewRepresentation::Sql(SqlViewRepresentation {
1515                    dialect: "spark".to_string(),
1516                    sql: "SELECT * FROM bar".to_string(),
1517                }),
1518            ]))
1519            .with_default_namespace(NamespaceIdent::new("default".to_string()))
1520            .build();
1521
1522        let current = ViewVersion::builder()
1523            .with_version_id(0)
1524            .with_schema_id(0)
1525            .with_timestamp_ms(0)
1526            .with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(
1527                SqlViewRepresentation {
1528                    dialect: "trino".to_string(),
1529                    sql: "SELECT * FROM foo".to_string(),
1530                },
1531            )]))
1532            .with_default_namespace(NamespaceIdent::new("default".to_string()))
1533            .build();
1534
1535        assert!(require_no_dialect_dropped(&previous, &current).is_err());
1536
1537        let current = ViewVersion::builder()
1538            .with_version_id(0)
1539            .with_schema_id(0)
1540            .with_timestamp_ms(0)
1541            .with_representations(ViewRepresentations(vec![
1542                ViewRepresentation::Sql(SqlViewRepresentation {
1543                    dialect: "spark".to_string(),
1544                    sql: "SELECT * FROM bar".to_string(),
1545                }),
1546                ViewRepresentation::Sql(SqlViewRepresentation {
1547                    dialect: "trino".to_string(),
1548                    sql: "SELECT * FROM foo".to_string(),
1549                }),
1550            ]))
1551            .with_default_namespace(NamespaceIdent::new("default".to_string()))
1552            .build();
1553
1554        assert!(require_no_dialect_dropped(&previous, &current).is_ok());
1555    }
1556
1557    #[test]
1558    fn test_allow_replace_drop_dialects() {
1559        use std::collections::HashMap;
1560
1561        use super::allow_replace_drop_dialects;
1562
1563        let mut properties = HashMap::new();
1564        assert!(!allow_replace_drop_dialects(&properties));
1565
1566        properties.insert(
1567            "replace.drop-dialect.allowed".to_string(),
1568            "true".to_string(),
1569        );
1570        assert!(allow_replace_drop_dialects(&properties));
1571
1572        properties.insert(
1573            "replace.drop-dialect.allowed".to_string(),
1574            "false".to_string(),
1575        );
1576        assert!(!allow_replace_drop_dialects(&properties));
1577
1578        properties.insert(
1579            "replace.drop-dialect.allowed".to_string(),
1580            "TRUE".to_string(),
1581        );
1582        assert!(allow_replace_drop_dialects(&properties));
1583
1584        properties.insert(
1585            "replace.drop-dialect.allowed".to_string(),
1586            "FALSE".to_string(),
1587        );
1588        assert!(!allow_replace_drop_dialects(&properties));
1589    }
1590
1591    #[test]
1592    fn test_lowercase_sql_dialects_for() {
1593        let view_version = ViewVersion::builder()
1594            .with_version_id(0)
1595            .with_schema_id(0)
1596            .with_timestamp_ms(0)
1597            .with_representations(ViewRepresentations(vec![
1598                ViewRepresentation::Sql(SqlViewRepresentation {
1599                    dialect: "STARROCKS".to_string(),
1600                    sql: "SELECT * FROM foo".to_string(),
1601                }),
1602                ViewRepresentation::Sql(SqlViewRepresentation {
1603                    dialect: "trino".to_string(),
1604                    sql: "SELECT * FROM bar".to_string(),
1605                }),
1606                ViewRepresentation::Sql(SqlViewRepresentation {
1607                    dialect: "Spark".to_string(),
1608                    sql: "SELECT * FROM bar".to_string(),
1609                }),
1610            ]))
1611            .with_default_namespace(NamespaceIdent::new("default".to_string()))
1612            .build();
1613
1614        let dialects = lowercase_sql_dialects_for(&view_version);
1615        assert_eq!(dialects.len(), 3);
1616        assert!(dialects.contains("trino"));
1617        assert!(dialects.contains("spark"));
1618        assert!(dialects.contains("starrocks"));
1619    }
1620
1621    #[test]
1622    fn test_require_unique_dialects() {
1623        let view_version = ViewVersion::builder()
1624            .with_version_id(0)
1625            .with_schema_id(0)
1626            .with_timestamp_ms(0)
1627            .with_representations(ViewRepresentations(vec![
1628                ViewRepresentation::Sql(SqlViewRepresentation {
1629                    dialect: "trino".to_string(),
1630                    sql: "SELECT * FROM foo".to_string(),
1631                }),
1632                ViewRepresentation::Sql(SqlViewRepresentation {
1633                    dialect: "trino".to_string(),
1634                    sql: "SELECT * FROM bar".to_string(),
1635                }),
1636            ]))
1637            .with_default_namespace(NamespaceIdent::new("default".to_string()))
1638            .build();
1639
1640        assert!(require_unique_dialects(&view_version).is_err());
1641
1642        let view_version = ViewVersion::builder()
1643            .with_version_id(0)
1644            .with_schema_id(0)
1645            .with_timestamp_ms(0)
1646            .with_representations(ViewRepresentations(vec![
1647                ViewRepresentation::Sql(SqlViewRepresentation {
1648                    dialect: "trino".to_string(),
1649                    sql: "SELECT * FROM foo".to_string(),
1650                }),
1651                ViewRepresentation::Sql(SqlViewRepresentation {
1652                    dialect: "spark".to_string(),
1653                    sql: "SELECT * FROM bar".to_string(),
1654                }),
1655            ]))
1656            .with_default_namespace(NamespaceIdent::new("default".to_string()))
1657            .build();
1658
1659        assert!(require_unique_dialects(&view_version).is_ok());
1660    }
1661}