iceberg/spec/
view_version.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18/*!
19 * View Versions!
20 */
21use std::collections::HashMap;
22use std::sync::Arc;
23
24use _serde::ViewVersionV1;
25use chrono::{DateTime, Utc};
26use serde::{Deserialize, Serialize};
27use typed_builder::TypedBuilder;
28
29use super::INITIAL_VIEW_VERSION_ID;
30use super::view_metadata::ViewVersionLog;
31use crate::catalog::NamespaceIdent;
32use crate::error::{Result, timestamp_ms_to_utc};
33use crate::spec::{SchemaId, SchemaRef, ViewMetadata};
34use crate::{Error, ErrorKind};
35
36/// Reference to [`ViewVersion`].
37pub type ViewVersionRef = Arc<ViewVersion>;
38
39/// Alias for the integer type used for view version ids.
40pub type ViewVersionId = i32;
41
42#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, TypedBuilder)]
43#[serde(from = "ViewVersionV1", into = "ViewVersionV1")]
44#[builder(field_defaults(setter(prefix = "with_")))]
45/// A view versions represents the definition of a view at a specific point in time.
46pub struct ViewVersion {
47    /// A unique long ID
48    #[builder(default = INITIAL_VIEW_VERSION_ID)]
49    version_id: ViewVersionId,
50    /// ID of the schema for the view version
51    schema_id: SchemaId,
52    /// Timestamp when the version was created (ms from epoch)
53    timestamp_ms: i64,
54    /// A string to string map of summary metadata about the version
55    #[builder(default = HashMap::new())]
56    summary: HashMap<String, String>,
57    /// A list of representations for the view definition.
58    representations: ViewRepresentations,
59    /// Catalog name to use when a reference in the SELECT does not contain a catalog
60    #[builder(default = None)]
61    default_catalog: Option<String>,
62    /// Namespace to use when a reference in the SELECT is a single identifier
63    default_namespace: NamespaceIdent,
64}
65
66impl ViewVersion {
67    /// Get the version id of this view version.
68    #[inline]
69    pub fn version_id(&self) -> ViewVersionId {
70        self.version_id
71    }
72
73    /// Get the schema id of this view version.
74    #[inline]
75    pub fn schema_id(&self) -> SchemaId {
76        self.schema_id
77    }
78
79    /// Get the timestamp of when the view version was created
80    #[inline]
81    pub fn timestamp(&self) -> Result<DateTime<Utc>> {
82        timestamp_ms_to_utc(self.timestamp_ms)
83    }
84
85    /// Get the timestamp of when the view version was created in milliseconds since epoch
86    #[inline]
87    pub fn timestamp_ms(&self) -> i64 {
88        self.timestamp_ms
89    }
90
91    /// Get summary of the view version
92    #[inline]
93    pub fn summary(&self) -> &HashMap<String, String> {
94        &self.summary
95    }
96
97    /// Get this views representations
98    #[inline]
99    pub fn representations(&self) -> &ViewRepresentations {
100        &self.representations
101    }
102
103    /// Get the default catalog for this view version
104    #[inline]
105    pub fn default_catalog(&self) -> Option<&String> {
106        self.default_catalog.as_ref()
107    }
108
109    /// Get the default namespace to use when a reference in the SELECT is a single identifier
110    #[inline]
111    pub fn default_namespace(&self) -> &NamespaceIdent {
112        &self.default_namespace
113    }
114
115    /// Get the schema of this snapshot.
116    pub fn schema(&self, view_metadata: &ViewMetadata) -> Result<SchemaRef> {
117        view_metadata
118            .schema_by_id(self.schema_id())
119            .ok_or_else(|| {
120                Error::new(
121                    ErrorKind::DataInvalid,
122                    format!("Schema with id {} not found", self.schema_id()),
123                )
124            })
125            .cloned()
126    }
127
128    /// Retrieve the history log entry for this view version.
129    pub(crate) fn log(&self) -> ViewVersionLog {
130        ViewVersionLog::new(self.version_id, self.timestamp_ms)
131    }
132
133    /// Check if this view version behaves the same as another view spec.
134    ///
135    /// Returns true if the view version is equal to the other view version
136    /// with `timestamp_ms` and `version_id` ignored. The following must be identical:
137    /// * Summary (all of them)
138    /// * Representations
139    /// * Default Catalog
140    /// * Default Namespace
141    /// * The Schema ID
142    pub(crate) fn behaves_identical_to(&self, other: &Self) -> bool {
143        self.summary == other.summary
144            && self.representations == other.representations
145            && self.default_catalog == other.default_catalog
146            && self.default_namespace == other.default_namespace
147            && self.schema_id == other.schema_id
148    }
149
150    /// Change the version id of this view version.
151    pub fn with_version_id(self, version_id: i32) -> Self {
152        Self { version_id, ..self }
153    }
154
155    /// Change the schema id of this view version.
156    pub fn with_schema_id(self, schema_id: SchemaId) -> Self {
157        Self { schema_id, ..self }
158    }
159}
160
161/// A list of view representations.
162#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
163pub struct ViewRepresentations(pub(crate) Vec<ViewRepresentation>);
164
165impl ViewRepresentations {
166    #[inline]
167    /// Get the number of representations
168    pub fn len(&self) -> usize {
169        self.0.len()
170    }
171
172    #[inline]
173    /// Check if there are no representations
174    pub fn is_empty(&self) -> bool {
175        self.0.is_empty()
176    }
177
178    /// Get an iterator over the representations
179    pub fn iter(&self) -> impl ExactSizeIterator<Item = &'_ ViewRepresentation> {
180        self.0.iter()
181    }
182}
183
184// Iterator for ViewRepresentations
185impl IntoIterator for ViewRepresentations {
186    type Item = ViewRepresentation;
187    type IntoIter = std::vec::IntoIter<Self::Item>;
188
189    fn into_iter(self) -> Self::IntoIter {
190        self.0.into_iter()
191    }
192}
193
194#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
195#[serde(tag = "type")]
196/// View definitions can be represented in multiple ways.
197/// Representations are documented ways to express a view definition.
198// ToDo: Make unique per Dialect
199pub enum ViewRepresentation {
200    #[serde(rename = "sql")]
201    /// The SQL representation stores the view definition as a SQL SELECT,
202    Sql(SqlViewRepresentation),
203}
204
205#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
206#[serde(rename_all = "kebab-case")]
207/// The SQL representation stores the view definition as a SQL SELECT,
208/// with metadata such as the SQL dialect.
209pub struct SqlViewRepresentation {
210    #[serde(rename = "sql")]
211    /// The SQL SELECT statement that defines the view.
212    pub sql: String,
213    #[serde(rename = "dialect")]
214    /// The dialect of the sql SELECT statement (e.g., "trino" or "spark")
215    pub dialect: String,
216}
217
218pub(super) mod _serde {
219    /// This is a helper module that defines types to help with serialization/deserialization.
220    /// For deserialization the input first gets read into the [`ViewVersionV1`] struct.
221    /// and then converted into the [Snapshot] struct. Serialization works the other way around.
222    /// [ViewVersionV1] are internal struct that are only used for serialization and deserialization.
223    use serde::{Deserialize, Serialize};
224
225    use super::{ViewRepresentation, ViewRepresentations, ViewVersion};
226    use crate::catalog::NamespaceIdent;
227
228    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
229    #[serde(rename_all = "kebab-case")]
230    /// Defines the structure of a v1 view version for serialization/deserialization
231    pub(crate) struct ViewVersionV1 {
232        pub version_id: i32,
233        pub schema_id: i32,
234        pub timestamp_ms: i64,
235        pub summary: std::collections::HashMap<String, String>,
236        pub representations: Vec<ViewRepresentation>,
237        #[serde(skip_serializing_if = "Option::is_none")]
238        pub default_catalog: Option<String>,
239        pub default_namespace: NamespaceIdent,
240    }
241
242    impl From<ViewVersionV1> for ViewVersion {
243        fn from(v1: ViewVersionV1) -> Self {
244            ViewVersion {
245                version_id: v1.version_id,
246                schema_id: v1.schema_id,
247                timestamp_ms: v1.timestamp_ms,
248                summary: v1.summary,
249                representations: ViewRepresentations(v1.representations),
250                default_catalog: v1.default_catalog,
251                default_namespace: v1.default_namespace,
252            }
253        }
254    }
255
256    impl From<ViewVersion> for ViewVersionV1 {
257        fn from(v1: ViewVersion) -> Self {
258            ViewVersionV1 {
259                version_id: v1.version_id,
260                schema_id: v1.schema_id,
261                timestamp_ms: v1.timestamp_ms,
262                summary: v1.summary,
263                representations: v1.representations.0,
264                default_catalog: v1.default_catalog,
265                default_namespace: v1.default_namespace,
266            }
267        }
268    }
269}
270
271impl From<SqlViewRepresentation> for ViewRepresentation {
272    fn from(sql: SqlViewRepresentation) -> Self {
273        ViewRepresentation::Sql(sql)
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use chrono::{TimeZone, Utc};
280
281    use crate::NamespaceIdent;
282    use crate::spec::ViewRepresentations;
283    use crate::spec::view_version::_serde::ViewVersionV1;
284    use crate::spec::view_version::ViewVersion;
285
286    #[test]
287    fn view_version() {
288        let record = serde_json::json!(
289        {
290            "version-id" : 1,
291            "timestamp-ms" : 1573518431292i64,
292            "schema-id" : 1,
293            "default-catalog" : "prod",
294            "default-namespace" : [ "default" ],
295            "summary" : {
296              "engine-name" : "Spark",
297              "engineVersion" : "3.3.2"
298            },
299            "representations" : [ {
300              "type" : "sql",
301              "sql" : "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
302              "dialect" : "spark"
303            } ]
304          }
305        );
306
307        let result: ViewVersion = serde_json::from_value::<ViewVersionV1>(record.clone())
308            .unwrap()
309            .into();
310
311        // Roundtrip
312        assert_eq!(serde_json::to_value(result.clone()).unwrap(), record);
313
314        assert_eq!(result.version_id(), 1);
315        assert_eq!(
316            result.timestamp().unwrap(),
317            Utc.timestamp_millis_opt(1573518431292).unwrap()
318        );
319        assert_eq!(result.schema_id(), 1);
320        assert_eq!(result.default_catalog, Some("prod".to_string()));
321        assert_eq!(result.summary(), &{
322            let mut map = std::collections::HashMap::new();
323            map.insert("engine-name".to_string(), "Spark".to_string());
324            map.insert("engineVersion".to_string(), "3.3.2".to_string());
325            map
326        });
327        assert_eq!(
328            result.representations().to_owned(),
329            ViewRepresentations(vec![super::ViewRepresentation::Sql(
330                super::SqlViewRepresentation {
331                    sql: "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2"
332                        .to_string(),
333                    dialect: "spark".to_string(),
334                },
335            )])
336        );
337        assert_eq!(result.default_namespace.inner(), vec![
338            "default".to_string()
339        ]);
340    }
341
342    #[test]
343    fn test_behaves_identical_to() {
344        let view_version = ViewVersion::builder()
345            .with_version_id(1)
346            .with_schema_id(1)
347            .with_timestamp_ms(1573518431292)
348            .with_summary({
349                let mut map = std::collections::HashMap::new();
350                map.insert("engine-name".to_string(), "Spark".to_string());
351                map.insert("engineVersion".to_string(), "3.3.2".to_string());
352                map
353            })
354            .with_representations(ViewRepresentations(vec![super::ViewRepresentation::Sql(
355                super::SqlViewRepresentation {
356                    sql: "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2"
357                        .to_string(),
358                    dialect: "spark".to_string(),
359                },
360            )]))
361            .with_default_catalog(Some("prod".to_string()))
362            .with_default_namespace(NamespaceIdent::new("default".to_string()))
363            .build();
364
365        let mut identical_view_version = view_version.clone();
366        identical_view_version.version_id = 2;
367        identical_view_version.timestamp_ms = 1573518431293;
368
369        let different_view_version = ViewVersion::builder()
370            .with_version_id(view_version.version_id())
371            .with_schema_id(view_version.schema_id())
372            .with_timestamp_ms(view_version.timestamp_ms())
373            .with_summary(view_version.summary().clone())
374            .with_representations(ViewRepresentations(vec![super::ViewRepresentation::Sql(
375                super::SqlViewRepresentation {
376                    sql: "SELECT * from events".to_string(),
377                    dialect: "spark".to_string(),
378                },
379            )]))
380            .with_default_catalog(view_version.default_catalog().cloned())
381            .with_default_namespace(view_version.default_namespace().clone())
382            .build();
383
384        assert!(view_version.behaves_identical_to(&identical_view_version));
385        assert!(!view_version.behaves_identical_to(&different_view_version));
386    }
387}