iceberg/catalog/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Catalog API for Apache Iceberg
19
20pub mod memory;
21mod metadata_location;
22pub(crate) mod utils;
23
24use std::collections::HashMap;
25use std::fmt::{Debug, Display};
26use std::future::Future;
27use std::mem::take;
28use std::ops::Deref;
29use std::str::FromStr;
30use std::sync::Arc;
31
32use _serde::{deserialize_snapshot, serialize_snapshot};
33use async_trait::async_trait;
34pub use memory::MemoryCatalog;
35pub use metadata_location::*;
36#[cfg(test)]
37use mockall::automock;
38use serde_derive::{Deserialize, Serialize};
39use typed_builder::TypedBuilder;
40use uuid::Uuid;
41
42use crate::io::StorageFactory;
43use crate::spec::{
44    EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot,
45    SnapshotReference, SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder,
46    UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion,
47};
48use crate::table::Table;
49use crate::{Error, ErrorKind, Result};
50
51/// The catalog API for Iceberg Rust.
52#[async_trait]
53#[cfg_attr(test, automock)]
54pub trait Catalog: Debug + Sync + Send {
55    /// List namespaces inside the catalog.
56    async fn list_namespaces(&self, parent: Option<&NamespaceIdent>)
57    -> Result<Vec<NamespaceIdent>>;
58
59    /// Create a new namespace inside the catalog.
60    async fn create_namespace(
61        &self,
62        namespace: &NamespaceIdent,
63        properties: HashMap<String, String>,
64    ) -> Result<Namespace>;
65
66    /// Get a namespace information from the catalog.
67    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace>;
68
69    /// Check if namespace exists in catalog.
70    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool>;
71
72    /// Update a namespace inside the catalog.
73    ///
74    /// # Behavior
75    ///
76    /// The properties must be the full set of namespace.
77    async fn update_namespace(
78        &self,
79        namespace: &NamespaceIdent,
80        properties: HashMap<String, String>,
81    ) -> Result<()>;
82
83    /// Drop a namespace from the catalog, or returns error if it doesn't exist.
84    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()>;
85
86    /// List tables from namespace.
87    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>>;
88
89    /// Create a new table inside the namespace.
90    async fn create_table(
91        &self,
92        namespace: &NamespaceIdent,
93        creation: TableCreation,
94    ) -> Result<Table>;
95
96    /// Load table from the catalog.
97    async fn load_table(&self, table: &TableIdent) -> Result<Table>;
98
99    /// Drop a table from the catalog, or returns error if it doesn't exist.
100    async fn drop_table(&self, table: &TableIdent) -> Result<()>;
101
102    /// Drop a table from the catalog and delete the underlying table data.
103    ///
104    /// Implementations should load the table metadata, drop the table
105    /// from the catalog, then delete all associated data and metadata files.
106    /// The [`drop_table_data`](utils::drop_table_data) utility function can
107    /// be used for the file cleanup step.
108    async fn purge_table(&self, table: &TableIdent) -> Result<()>;
109
110    /// Check if a table exists in the catalog.
111    async fn table_exists(&self, table: &TableIdent) -> Result<bool>;
112
113    /// Rename a table in the catalog.
114    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()>;
115
116    /// Register an existing table to the catalog.
117    async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result<Table>;
118
119    /// Update a table to the catalog.
120    async fn update_table(&self, commit: TableCommit) -> Result<Table>;
121}
122
123/// Common interface for all catalog builders.
124pub trait CatalogBuilder: Default + Debug + Send + Sync {
125    /// The catalog type that this builder creates.
126    type C: Catalog;
127
128    /// Set a custom StorageFactory to use for storage operations.
129    ///
130    /// When a StorageFactory is provided, the catalog will use it to build FileIO
131    /// instances for all storage operations instead of using the default factory.
132    ///
133    /// # Arguments
134    ///
135    /// * `storage_factory` - The StorageFactory to use for creating storage instances
136    ///
137    /// # Example
138    ///
139    /// ```rust,ignore
140    /// use iceberg::CatalogBuilder;
141    /// use iceberg::io::StorageFactory;
142    /// use iceberg_storage_opendal::OpenDalStorageFactory;
143    /// use std::sync::Arc;
144    ///
145    /// let catalog = MyCatalogBuilder::default()
146    ///     .with_storage_factory(Arc::new(OpenDalStorageFactory::S3 {
147    ///         configured_scheme: "s3a".to_string(),
148    ///         customized_credential_load: None,
149    ///     }))
150    ///     .load("my_catalog", props)
151    ///     .await?;
152    /// ```
153    fn with_storage_factory(self, storage_factory: Arc<dyn StorageFactory>) -> Self;
154
155    /// Create a new catalog instance.
156    fn load(
157        self,
158        name: impl Into<String>,
159        props: HashMap<String, String>,
160    ) -> impl Future<Output = Result<Self::C>> + Send;
161}
162
163/// NamespaceIdent represents the identifier of a namespace in the catalog.
164///
165/// The namespace identifier is a list of strings, where each string is a
166/// component of the namespace. It's the catalog implementer's responsibility to
167/// handle the namespace identifier correctly.
168#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
169pub struct NamespaceIdent(Vec<String>);
170
171impl NamespaceIdent {
172    /// Create a new namespace identifier with only one level.
173    pub fn new(name: String) -> Self {
174        Self(vec![name])
175    }
176
177    /// Create a multi-level namespace identifier from vector.
178    pub fn from_vec(names: Vec<String>) -> Result<Self> {
179        if names.is_empty() {
180            return Err(Error::new(
181                ErrorKind::DataInvalid,
182                "Namespace identifier can't be empty!",
183            ));
184        }
185        Ok(Self(names))
186    }
187
188    /// Try to create namespace identifier from an iterator of string.
189    pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
190        Self::from_vec(iter.into_iter().map(|s| s.to_string()).collect())
191    }
192
193    /// Returns a string for used in url.
194    pub fn to_url_string(&self) -> String {
195        self.as_ref().join("\u{001f}")
196    }
197
198    /// Returns inner strings.
199    pub fn inner(self) -> Vec<String> {
200        self.0
201    }
202
203    /// Get the parent of this namespace.
204    /// Returns None if this namespace only has a single element and thus has no parent.
205    pub fn parent(&self) -> Option<Self> {
206        self.0.split_last().and_then(|(_, parent)| {
207            if parent.is_empty() {
208                None
209            } else {
210                Some(Self(parent.to_vec()))
211            }
212        })
213    }
214}
215
216impl AsRef<Vec<String>> for NamespaceIdent {
217    fn as_ref(&self) -> &Vec<String> {
218        &self.0
219    }
220}
221
222impl Deref for NamespaceIdent {
223    type Target = [String];
224
225    fn deref(&self) -> &Self::Target {
226        &self.0
227    }
228}
229
230/// Namespace represents a namespace in the catalog.
231#[derive(Debug, Clone, PartialEq, Eq)]
232pub struct Namespace {
233    name: NamespaceIdent,
234    properties: HashMap<String, String>,
235}
236
237impl Namespace {
238    /// Create a new namespace.
239    pub fn new(name: NamespaceIdent) -> Self {
240        Self::with_properties(name, HashMap::default())
241    }
242
243    /// Create a new namespace with properties.
244    pub fn with_properties(name: NamespaceIdent, properties: HashMap<String, String>) -> Self {
245        Self { name, properties }
246    }
247
248    /// Get the name of the namespace.
249    pub fn name(&self) -> &NamespaceIdent {
250        &self.name
251    }
252
253    /// Get the properties of the namespace.
254    pub fn properties(&self) -> &HashMap<String, String> {
255        &self.properties
256    }
257}
258
259impl Display for NamespaceIdent {
260    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261        write!(f, "{}", self.0.join("."))
262    }
263}
264
265/// TableIdent represents the identifier of a table in the catalog.
266#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
267pub struct TableIdent {
268    /// Namespace of the table.
269    pub namespace: NamespaceIdent,
270    /// Table name.
271    pub name: String,
272}
273
274impl TableIdent {
275    /// Create a new table identifier.
276    pub fn new(namespace: NamespaceIdent, name: String) -> Self {
277        Self { namespace, name }
278    }
279
280    /// Get the namespace of the table.
281    pub fn namespace(&self) -> &NamespaceIdent {
282        &self.namespace
283    }
284
285    /// Get the name of the table.
286    pub fn name(&self) -> &str {
287        &self.name
288    }
289
290    /// Try to create table identifier from an iterator of string.
291    pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
292        let mut vec: Vec<String> = iter.into_iter().map(|s| s.to_string()).collect();
293        let table_name = vec.pop().ok_or_else(|| {
294            Error::new(ErrorKind::DataInvalid, "Table identifier can't be empty!")
295        })?;
296        let namespace_ident = NamespaceIdent::from_vec(vec)?;
297
298        Ok(Self {
299            namespace: namespace_ident,
300            name: table_name,
301        })
302    }
303}
304
305impl Display for TableIdent {
306    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
307        write!(f, "{}.{}", self.namespace, self.name)
308    }
309}
310
311/// TableCreation represents the creation of a table in the catalog.
312#[derive(Debug, TypedBuilder)]
313pub struct TableCreation {
314    /// The name of the table.
315    pub name: String,
316    /// The location of the table.
317    #[builder(default, setter(strip_option(fallback = location_opt)))]
318    pub location: Option<String>,
319    /// The schema of the table.
320    pub schema: Schema,
321    /// The partition spec of the table, could be None.
322    #[builder(default, setter(strip_option(fallback = partition_spec_opt), into))]
323    pub partition_spec: Option<UnboundPartitionSpec>,
324    /// The sort order of the table.
325    #[builder(default, setter(strip_option(fallback = sort_order_opt)))]
326    pub sort_order: Option<SortOrder>,
327    /// The properties of the table.
328    #[builder(default, setter(transform = |props: impl IntoIterator<Item=(String, String)>| {
329        props.into_iter().collect()
330    }))]
331    pub properties: HashMap<String, String>,
332    /// Format version of the table. Defaults to V2.
333    #[builder(default = FormatVersion::V2)]
334    pub format_version: FormatVersion,
335}
336
337/// TableCommit represents the commit of a table in the catalog.
338///
339/// The builder is marked as private since it's dangerous and error-prone to construct
340/// [`TableCommit`] directly.
341/// Users are supposed to use [`crate::transaction::Transaction`] to update table.
342#[derive(Debug, TypedBuilder)]
343#[builder(build_method(vis = "pub(crate)"))]
344pub struct TableCommit {
345    /// The table ident.
346    ident: TableIdent,
347    /// The requirements of the table.
348    ///
349    /// Commit will fail if the requirements are not met.
350    requirements: Vec<TableRequirement>,
351    /// The updates of the table.
352    updates: Vec<TableUpdate>,
353}
354
355impl TableCommit {
356    /// Return the table identifier.
357    pub fn identifier(&self) -> &TableIdent {
358        &self.ident
359    }
360
361    /// Take all requirements.
362    pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
363        take(&mut self.requirements)
364    }
365
366    /// Take all updates.
367    pub fn take_updates(&mut self) -> Vec<TableUpdate> {
368        take(&mut self.updates)
369    }
370
371    /// Applies this [`TableCommit`] to the given [`Table`] as part of a catalog update.
372    /// Typically used by [`Catalog::update_table`] to validate requirements and apply metadata updates.
373    ///
374    /// Returns a new [`Table`] with updated metadata,
375    /// or an error if validation or application fails.
376    pub fn apply(self, table: Table) -> Result<Table> {
377        // check requirements
378        for requirement in self.requirements {
379            requirement.check(Some(table.metadata()))?;
380        }
381
382        // get current metadata location
383        let current_metadata_location = table.metadata_location_result()?;
384
385        // apply updates to metadata builder
386        let mut metadata_builder = table
387            .metadata()
388            .clone()
389            .into_builder(Some(current_metadata_location.to_string()));
390        for update in self.updates {
391            metadata_builder = update.apply(metadata_builder)?;
392        }
393
394        // Build the new metadata
395        let new_metadata = metadata_builder.build()?.metadata;
396
397        let new_metadata_location = MetadataLocation::from_str(current_metadata_location)?
398            .with_next_version()
399            .with_new_metadata(&new_metadata)
400            .to_string();
401
402        Ok(table
403            .with_metadata(Arc::new(new_metadata))
404            .with_metadata_location(new_metadata_location))
405    }
406}
407
408/// TableRequirement represents a requirement for a table in the catalog.
409#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
410#[serde(tag = "type")]
411pub enum TableRequirement {
412    /// The table must not already exist; used for create transactions
413    #[serde(rename = "assert-create")]
414    NotExist,
415    /// The table UUID must match the requirement.
416    #[serde(rename = "assert-table-uuid")]
417    UuidMatch {
418        /// Uuid of original table.
419        uuid: Uuid,
420    },
421    /// The table branch or tag identified by the requirement's `reference` must
422    /// reference the requirement's `snapshot-id`.
423    #[serde(rename = "assert-ref-snapshot-id")]
424    RefSnapshotIdMatch {
425        /// The reference of the table to assert.
426        r#ref: String,
427        /// The snapshot id of the table to assert.
428        /// If the id is `None`, the ref must not already exist.
429        #[serde(rename = "snapshot-id")]
430        snapshot_id: Option<i64>,
431    },
432    /// The table's last assigned column id must match the requirement.
433    #[serde(rename = "assert-last-assigned-field-id")]
434    LastAssignedFieldIdMatch {
435        /// The last assigned field id of the table to assert.
436        #[serde(rename = "last-assigned-field-id")]
437        last_assigned_field_id: i32,
438    },
439    /// The table's current schema id must match the requirement.
440    #[serde(rename = "assert-current-schema-id")]
441    CurrentSchemaIdMatch {
442        /// Current schema id of the table to assert.
443        #[serde(rename = "current-schema-id")]
444        current_schema_id: SchemaId,
445    },
446    /// The table's last assigned partition id must match the
447    /// requirement.
448    #[serde(rename = "assert-last-assigned-partition-id")]
449    LastAssignedPartitionIdMatch {
450        /// Last assigned partition id of the table to assert.
451        #[serde(rename = "last-assigned-partition-id")]
452        last_assigned_partition_id: i32,
453    },
454    /// The table's default spec id must match the requirement.
455    #[serde(rename = "assert-default-spec-id")]
456    DefaultSpecIdMatch {
457        /// Default spec id of the table to assert.
458        #[serde(rename = "default-spec-id")]
459        default_spec_id: i32,
460    },
461    /// The table's default sort order id must match the requirement.
462    #[serde(rename = "assert-default-sort-order-id")]
463    DefaultSortOrderIdMatch {
464        /// Default sort order id of the table to assert.
465        #[serde(rename = "default-sort-order-id")]
466        default_sort_order_id: i64,
467    },
468}
469
470/// TableUpdate represents an update to a table in the catalog.
471#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
472#[serde(tag = "action", rename_all = "kebab-case")]
473#[allow(clippy::large_enum_variant)]
474pub enum TableUpdate {
475    /// Upgrade table's format version
476    #[serde(rename_all = "kebab-case")]
477    UpgradeFormatVersion {
478        /// Target format upgrade to.
479        format_version: FormatVersion,
480    },
481    /// Assign a new UUID to the table
482    #[serde(rename_all = "kebab-case")]
483    AssignUuid {
484        /// The new UUID to assign.
485        uuid: Uuid,
486    },
487    /// Add a new schema to the table
488    #[serde(rename_all = "kebab-case")]
489    AddSchema {
490        /// The schema to add.
491        schema: Schema,
492    },
493    /// Set table's current schema
494    #[serde(rename_all = "kebab-case")]
495    SetCurrentSchema {
496        /// Schema ID to set as current, or -1 to set last added schema
497        schema_id: i32,
498    },
499    /// Add a new partition spec to the table
500    AddSpec {
501        /// The partition spec to add.
502        spec: UnboundPartitionSpec,
503    },
504    /// Set table's default spec
505    #[serde(rename_all = "kebab-case")]
506    SetDefaultSpec {
507        /// Partition spec ID to set as the default, or -1 to set last added spec
508        spec_id: i32,
509    },
510    /// Add sort order to table.
511    #[serde(rename_all = "kebab-case")]
512    AddSortOrder {
513        /// Sort order to add.
514        sort_order: SortOrder,
515    },
516    /// Set table's default sort order
517    #[serde(rename_all = "kebab-case")]
518    SetDefaultSortOrder {
519        /// Sort order ID to set as the default, or -1 to set last added sort order
520        sort_order_id: i64,
521    },
522    /// Add snapshot to table.
523    #[serde(rename_all = "kebab-case")]
524    AddSnapshot {
525        /// Snapshot to add.
526        #[serde(
527            deserialize_with = "deserialize_snapshot",
528            serialize_with = "serialize_snapshot"
529        )]
530        snapshot: Snapshot,
531    },
532    /// Set table's snapshot ref.
533    #[serde(rename_all = "kebab-case")]
534    SetSnapshotRef {
535        /// Name of snapshot reference to set.
536        ref_name: String,
537        /// Snapshot reference to set.
538        #[serde(flatten)]
539        reference: SnapshotReference,
540    },
541    /// Remove table's snapshots
542    #[serde(rename_all = "kebab-case")]
543    RemoveSnapshots {
544        /// Snapshot ids to remove.
545        snapshot_ids: Vec<i64>,
546    },
547    /// Remove snapshot reference
548    #[serde(rename_all = "kebab-case")]
549    RemoveSnapshotRef {
550        /// Name of snapshot reference to remove.
551        ref_name: String,
552    },
553    /// Update table's location
554    SetLocation {
555        /// New location for table.
556        location: String,
557    },
558    /// Update table's properties
559    SetProperties {
560        /// Properties to update for table.
561        updates: HashMap<String, String>,
562    },
563    /// Remove table's properties
564    RemoveProperties {
565        /// Properties to remove
566        removals: Vec<String>,
567    },
568    /// Remove partition specs
569    #[serde(rename_all = "kebab-case")]
570    RemovePartitionSpecs {
571        /// Partition spec ids to remove.
572        spec_ids: Vec<i32>,
573    },
574    /// Set statistics for a snapshot
575    #[serde(with = "_serde_set_statistics")]
576    SetStatistics {
577        /// File containing the statistics
578        statistics: StatisticsFile,
579    },
580    /// Remove statistics for a snapshot
581    #[serde(rename_all = "kebab-case")]
582    RemoveStatistics {
583        /// Snapshot id to remove statistics for.
584        snapshot_id: i64,
585    },
586    /// Set partition statistics for a snapshot
587    #[serde(rename_all = "kebab-case")]
588    SetPartitionStatistics {
589        /// File containing the partition statistics
590        partition_statistics: PartitionStatisticsFile,
591    },
592    /// Remove partition statistics for a snapshot
593    #[serde(rename_all = "kebab-case")]
594    RemovePartitionStatistics {
595        /// Snapshot id to remove partition statistics for.
596        snapshot_id: i64,
597    },
598    /// Remove schemas
599    #[serde(rename_all = "kebab-case")]
600    RemoveSchemas {
601        /// Schema IDs to remove.
602        schema_ids: Vec<i32>,
603    },
604    /// Add an encryption key
605    #[serde(rename_all = "kebab-case")]
606    AddEncryptionKey {
607        /// The encryption key to add.
608        encryption_key: EncryptedKey,
609    },
610    /// Remove an encryption key
611    #[serde(rename_all = "kebab-case")]
612    RemoveEncryptionKey {
613        /// The id of the encryption key to remove.
614        key_id: String,
615    },
616}
617
618impl TableUpdate {
619    /// Applies the update to the table metadata builder.
620    pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
621        match self {
622            TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
623            TableUpdate::AddSchema { schema, .. } => Ok(builder.add_schema(schema)?),
624            TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id),
625            TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
626            TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id),
627            TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order),
628            TableUpdate::SetDefaultSortOrder { sort_order_id } => {
629                builder.set_default_sort_order(sort_order_id)
630            }
631            TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot),
632            TableUpdate::SetSnapshotRef {
633                ref_name,
634                reference,
635            } => builder.set_ref(&ref_name, reference),
636            TableUpdate::RemoveSnapshots { snapshot_ids } => {
637                Ok(builder.remove_snapshots(&snapshot_ids))
638            }
639            TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)),
640            TableUpdate::SetLocation { location } => Ok(builder.set_location(location)),
641            TableUpdate::SetProperties { updates } => builder.set_properties(updates),
642            TableUpdate::RemoveProperties { removals } => builder.remove_properties(&removals),
643            TableUpdate::UpgradeFormatVersion { format_version } => {
644                builder.upgrade_format_version(format_version)
645            }
646            TableUpdate::RemovePartitionSpecs { spec_ids } => {
647                builder.remove_partition_specs(&spec_ids)
648            }
649            TableUpdate::SetStatistics { statistics } => Ok(builder.set_statistics(statistics)),
650            TableUpdate::RemoveStatistics { snapshot_id } => {
651                Ok(builder.remove_statistics(snapshot_id))
652            }
653            TableUpdate::SetPartitionStatistics {
654                partition_statistics,
655            } => Ok(builder.set_partition_statistics(partition_statistics)),
656            TableUpdate::RemovePartitionStatistics { snapshot_id } => {
657                Ok(builder.remove_partition_statistics(snapshot_id))
658            }
659            TableUpdate::RemoveSchemas { schema_ids } => builder.remove_schemas(&schema_ids),
660            TableUpdate::AddEncryptionKey { encryption_key } => {
661                Ok(builder.add_encryption_key(encryption_key))
662            }
663            TableUpdate::RemoveEncryptionKey { key_id } => {
664                Ok(builder.remove_encryption_key(&key_id))
665            }
666        }
667    }
668}
669
670impl TableRequirement {
671    /// Check that the requirement is met by the table metadata.
672    /// If the requirement is not met, an appropriate error is returned.
673    ///
674    /// Provide metadata as `None` if the table does not exist.
675    pub fn check(&self, metadata: Option<&TableMetadata>) -> Result<()> {
676        if let Some(metadata) = metadata {
677            match self {
678                TableRequirement::NotExist => {
679                    return Err(Error::new(
680                        ErrorKind::CatalogCommitConflicts,
681                        format!(
682                            "Requirement failed: Table with id {} already exists",
683                            metadata.uuid()
684                        ),
685                    )
686                    .with_retryable(true));
687                }
688                TableRequirement::UuidMatch { uuid } => {
689                    if &metadata.uuid() != uuid {
690                        return Err(Error::new(
691                            ErrorKind::CatalogCommitConflicts,
692                            "Requirement failed: Table UUID does not match",
693                        )
694                        .with_context("expected", *uuid)
695                        .with_context("found", metadata.uuid())
696                        .with_retryable(true));
697                    }
698                }
699                TableRequirement::CurrentSchemaIdMatch { current_schema_id } => {
700                    // ToDo: Harmonize the types of current_schema_id
701                    if metadata.current_schema_id != *current_schema_id {
702                        return Err(Error::new(
703                            ErrorKind::CatalogCommitConflicts,
704                            "Requirement failed: Current schema id does not match",
705                        )
706                        .with_context("expected", current_schema_id.to_string())
707                        .with_context("found", metadata.current_schema_id.to_string())
708                        .with_retryable(true));
709                    }
710                }
711                TableRequirement::DefaultSortOrderIdMatch {
712                    default_sort_order_id,
713                } => {
714                    if metadata.default_sort_order().order_id != *default_sort_order_id {
715                        return Err(Error::new(
716                            ErrorKind::CatalogCommitConflicts,
717                            "Requirement failed: Default sort order id does not match",
718                        )
719                        .with_context("expected", default_sort_order_id.to_string())
720                        .with_context("found", metadata.default_sort_order().order_id.to_string())
721                        .with_retryable(true));
722                    }
723                }
724                TableRequirement::RefSnapshotIdMatch { r#ref, snapshot_id } => {
725                    let snapshot_ref = metadata.snapshot_for_ref(r#ref);
726                    if let Some(snapshot_id) = snapshot_id {
727                        let snapshot_ref = snapshot_ref.ok_or(
728                            Error::new(
729                                ErrorKind::CatalogCommitConflicts,
730                                format!("Requirement failed: Branch or tag `{ref}` not found"),
731                            )
732                            .with_retryable(true),
733                        )?;
734                        if snapshot_ref.snapshot_id() != *snapshot_id {
735                            return Err(Error::new(
736                                ErrorKind::CatalogCommitConflicts,
737                                format!(
738                                    "Requirement failed: Branch or tag `{ref}`'s snapshot has changed"
739                                ),
740                            )
741                            .with_context("expected", snapshot_id.to_string())
742                            .with_context("found", snapshot_ref.snapshot_id().to_string())
743                            .with_retryable(true));
744                        }
745                    } else if snapshot_ref.is_some() {
746                        // a null snapshot ID means the ref should not exist already
747                        return Err(Error::new(
748                            ErrorKind::CatalogCommitConflicts,
749                            format!("Requirement failed: Branch or tag `{ref}` already exists"),
750                        )
751                        .with_retryable(true));
752                    }
753                }
754                TableRequirement::DefaultSpecIdMatch { default_spec_id } => {
755                    // ToDo: Harmonize the types of default_spec_id
756                    if metadata.default_partition_spec_id() != *default_spec_id {
757                        return Err(Error::new(
758                            ErrorKind::CatalogCommitConflicts,
759                            "Requirement failed: Default partition spec id does not match",
760                        )
761                        .with_context("expected", default_spec_id.to_string())
762                        .with_context("found", metadata.default_partition_spec_id().to_string())
763                        .with_retryable(true));
764                    }
765                }
766                TableRequirement::LastAssignedPartitionIdMatch {
767                    last_assigned_partition_id,
768                } => {
769                    if metadata.last_partition_id != *last_assigned_partition_id {
770                        return Err(Error::new(
771                            ErrorKind::CatalogCommitConflicts,
772                            "Requirement failed: Last assigned partition id does not match",
773                        )
774                        .with_context("expected", last_assigned_partition_id.to_string())
775                        .with_context("found", metadata.last_partition_id.to_string())
776                        .with_retryable(true));
777                    }
778                }
779                TableRequirement::LastAssignedFieldIdMatch {
780                    last_assigned_field_id,
781                } => {
782                    if &metadata.last_column_id != last_assigned_field_id {
783                        return Err(Error::new(
784                            ErrorKind::CatalogCommitConflicts,
785                            "Requirement failed: Last assigned field id does not match",
786                        )
787                        .with_context("expected", last_assigned_field_id.to_string())
788                        .with_context("found", metadata.last_column_id.to_string())
789                        .with_retryable(true));
790                    }
791                }
792            };
793        } else {
794            match self {
795                TableRequirement::NotExist => {}
796                _ => {
797                    return Err(Error::new(
798                        ErrorKind::TableNotFound,
799                        "Requirement failed: Table does not exist",
800                    ));
801                }
802            }
803        }
804
805        Ok(())
806    }
807}
808
809pub(super) mod _serde {
810    use serde::{Deserialize as _, Deserializer, Serialize as _};
811
812    use super::*;
813    use crate::spec::{SchemaId, Summary};
814
815    pub(super) fn deserialize_snapshot<'de, D>(
816        deserializer: D,
817    ) -> std::result::Result<Snapshot, D::Error>
818    where D: Deserializer<'de> {
819        let buf = CatalogSnapshot::deserialize(deserializer)?;
820        Ok(buf.into())
821    }
822
823    pub(super) fn serialize_snapshot<S>(
824        snapshot: &Snapshot,
825        serializer: S,
826    ) -> std::result::Result<S::Ok, S::Error>
827    where
828        S: serde::Serializer,
829    {
830        let buf: CatalogSnapshot = snapshot.clone().into();
831        buf.serialize(serializer)
832    }
833
834    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
835    #[serde(rename_all = "kebab-case")]
836    /// Defines the structure of a v2 snapshot for the catalog.
837    /// Main difference to SnapshotV2 is that sequence-number is optional
838    /// in the rest catalog spec to allow for backwards compatibility with v1.
839    struct CatalogSnapshot {
840        snapshot_id: i64,
841        #[serde(skip_serializing_if = "Option::is_none")]
842        parent_snapshot_id: Option<i64>,
843        #[serde(default)]
844        sequence_number: i64,
845        timestamp_ms: i64,
846        manifest_list: String,
847        summary: Summary,
848        #[serde(skip_serializing_if = "Option::is_none")]
849        schema_id: Option<SchemaId>,
850        #[serde(skip_serializing_if = "Option::is_none")]
851        first_row_id: Option<u64>,
852        #[serde(skip_serializing_if = "Option::is_none")]
853        added_rows: Option<u64>,
854        #[serde(skip_serializing_if = "Option::is_none")]
855        key_id: Option<String>,
856    }
857
858    impl From<CatalogSnapshot> for Snapshot {
859        fn from(snapshot: CatalogSnapshot) -> Self {
860            let CatalogSnapshot {
861                snapshot_id,
862                parent_snapshot_id,
863                sequence_number,
864                timestamp_ms,
865                manifest_list,
866                schema_id,
867                summary,
868                first_row_id,
869                added_rows,
870                key_id,
871            } = snapshot;
872            let builder = Snapshot::builder()
873                .with_snapshot_id(snapshot_id)
874                .with_parent_snapshot_id(parent_snapshot_id)
875                .with_sequence_number(sequence_number)
876                .with_timestamp_ms(timestamp_ms)
877                .with_manifest_list(manifest_list)
878                .with_summary(summary)
879                .with_encryption_key_id(key_id);
880            let row_range = first_row_id.zip(added_rows);
881            match (schema_id, row_range) {
882                (None, None) => builder.build(),
883                (Some(schema_id), None) => builder.with_schema_id(schema_id).build(),
884                (None, Some((first_row_id, last_row_id))) => {
885                    builder.with_row_range(first_row_id, last_row_id).build()
886                }
887                (Some(schema_id), Some((first_row_id, last_row_id))) => builder
888                    .with_schema_id(schema_id)
889                    .with_row_range(first_row_id, last_row_id)
890                    .build(),
891            }
892        }
893    }
894
895    impl From<Snapshot> for CatalogSnapshot {
896        fn from(snapshot: Snapshot) -> Self {
897            let first_row_id = snapshot.first_row_id();
898            let added_rows = snapshot.added_rows_count();
899            let Snapshot {
900                snapshot_id,
901                parent_snapshot_id,
902                sequence_number,
903                timestamp_ms,
904                manifest_list,
905                summary,
906                schema_id,
907                row_range: _,
908                encryption_key_id: key_id,
909            } = snapshot;
910            CatalogSnapshot {
911                snapshot_id,
912                parent_snapshot_id,
913                sequence_number,
914                timestamp_ms,
915                manifest_list,
916                summary,
917                schema_id,
918                first_row_id,
919                added_rows,
920                key_id,
921            }
922        }
923    }
924}
925
926/// ViewCreation represents the creation of a view in the catalog.
927#[derive(Debug, TypedBuilder)]
928pub struct ViewCreation {
929    /// The name of the view.
930    pub name: String,
931    /// The view's base location; used to create metadata file locations
932    pub location: String,
933    /// Representations for the view.
934    pub representations: ViewRepresentations,
935    /// The schema of the view.
936    pub schema: Schema,
937    /// The properties of the view.
938    #[builder(default)]
939    pub properties: HashMap<String, String>,
940    /// The default namespace to use when a reference in the SELECT is a single identifier
941    pub default_namespace: NamespaceIdent,
942    /// Default catalog to use when a reference in the SELECT does not contain a catalog
943    #[builder(default)]
944    pub default_catalog: Option<String>,
945    /// A string to string map of summary metadata about the version
946    /// Typical keys are "engine-name" and "engine-version"
947    #[builder(default)]
948    pub summary: HashMap<String, String>,
949}
950
951/// ViewUpdate represents an update to a view in the catalog.
952#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
953#[serde(tag = "action", rename_all = "kebab-case")]
954#[allow(clippy::large_enum_variant)]
955pub enum ViewUpdate {
956    /// Assign a new UUID to the view
957    #[serde(rename_all = "kebab-case")]
958    AssignUuid {
959        /// The new UUID to assign.
960        uuid: Uuid,
961    },
962    /// Upgrade view's format version
963    #[serde(rename_all = "kebab-case")]
964    UpgradeFormatVersion {
965        /// Target format upgrade to.
966        format_version: ViewFormatVersion,
967    },
968    /// Add a new schema to the view
969    #[serde(rename_all = "kebab-case")]
970    AddSchema {
971        /// The schema to add.
972        schema: Schema,
973        /// The last column id of the view.
974        last_column_id: Option<i32>,
975    },
976    /// Set view's current schema
977    #[serde(rename_all = "kebab-case")]
978    SetLocation {
979        /// New location for view.
980        location: String,
981    },
982    /// Set view's properties
983    ///
984    /// Matching keys are updated, and non-matching keys are left unchanged.
985    #[serde(rename_all = "kebab-case")]
986    SetProperties {
987        /// Properties to update for view.
988        updates: HashMap<String, String>,
989    },
990    /// Remove view's properties
991    #[serde(rename_all = "kebab-case")]
992    RemoveProperties {
993        /// Properties to remove
994        removals: Vec<String>,
995    },
996    /// Add a new version to the view
997    #[serde(rename_all = "kebab-case")]
998    AddViewVersion {
999        /// The view version to add.
1000        view_version: ViewVersion,
1001    },
1002    /// Set view's current version
1003    #[serde(rename_all = "kebab-case")]
1004    SetCurrentViewVersion {
1005        /// View version id to set as current, or -1 to set last added version
1006        view_version_id: i32,
1007    },
1008}
1009
1010mod _serde_set_statistics {
1011    // The rest spec requires an additional field `snapshot-id`
1012    // that is redundant with the `snapshot_id` field in the statistics file.
1013    use serde::{Deserialize, Deserializer, Serialize, Serializer};
1014
1015    use super::*;
1016
1017    #[derive(Debug, Serialize, Deserialize)]
1018    #[serde(rename_all = "kebab-case")]
1019    struct SetStatistics {
1020        snapshot_id: Option<i64>,
1021        statistics: StatisticsFile,
1022    }
1023
1024    pub fn serialize<S>(
1025        value: &StatisticsFile,
1026        serializer: S,
1027    ) -> std::result::Result<S::Ok, S::Error>
1028    where
1029        S: Serializer,
1030    {
1031        SetStatistics {
1032            snapshot_id: Some(value.snapshot_id),
1033            statistics: value.clone(),
1034        }
1035        .serialize(serializer)
1036    }
1037
1038    pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<StatisticsFile, D::Error>
1039    where D: Deserializer<'de> {
1040        let SetStatistics {
1041            snapshot_id,
1042            statistics,
1043        } = SetStatistics::deserialize(deserializer)?;
1044        if let Some(snapshot_id) = snapshot_id
1045            && snapshot_id != statistics.snapshot_id
1046        {
1047            return Err(serde::de::Error::custom(format!(
1048                "Snapshot id to set {snapshot_id} does not match the statistics file snapshot id {}",
1049                statistics.snapshot_id
1050            )));
1051        }
1052
1053        Ok(statistics)
1054    }
1055}
1056
1057#[cfg(test)]
1058mod tests {
1059    use std::collections::HashMap;
1060    use std::fmt::Debug;
1061    use std::fs::File;
1062    use std::io::BufReader;
1063
1064    use base64::Engine as _;
1065    use serde::Serialize;
1066    use serde::de::DeserializeOwned;
1067    use uuid::uuid;
1068
1069    use super::ViewUpdate;
1070    use crate::io::FileIO;
1071    use crate::spec::{
1072        BlobMetadata, EncryptedKey, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
1073        PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, SnapshotReference,
1074        SnapshotRetention, SortDirection, SortField, SortOrder, SqlViewRepresentation,
1075        StatisticsFile, Summary, TableMetadata, TableMetadataBuilder, Transform, Type,
1076        UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations,
1077        ViewVersion,
1078    };
1079    use crate::table::Table;
1080    use crate::{
1081        NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, TableUpdate,
1082    };
1083
1084    #[test]
1085    fn test_parent_namespace() {
1086        let ns1 = NamespaceIdent::from_strs(vec!["ns1"]).unwrap();
1087        let ns2 = NamespaceIdent::from_strs(vec!["ns1", "ns2"]).unwrap();
1088        let ns3 = NamespaceIdent::from_strs(vec!["ns1", "ns2", "ns3"]).unwrap();
1089
1090        assert_eq!(ns1.parent(), None);
1091        assert_eq!(ns2.parent(), Some(ns1.clone()));
1092        assert_eq!(ns3.parent(), Some(ns2.clone()));
1093    }
1094
1095    #[test]
1096    fn test_create_table_id() {
1097        let table_id = TableIdent {
1098            namespace: NamespaceIdent::from_strs(vec!["ns1"]).unwrap(),
1099            name: "t1".to_string(),
1100        };
1101
1102        assert_eq!(table_id, TableIdent::from_strs(vec!["ns1", "t1"]).unwrap());
1103    }
1104
1105    #[test]
1106    fn test_table_creation_iterator_properties() {
1107        let builder = TableCreation::builder()
1108            .name("table".to_string())
1109            .schema(Schema::builder().build().unwrap());
1110
1111        fn s(k: &str, v: &str) -> (String, String) {
1112            (k.to_string(), v.to_string())
1113        }
1114
1115        let table_creation = builder
1116            .properties([s("key", "value"), s("foo", "bar")])
1117            .build();
1118
1119        assert_eq!(
1120            HashMap::from([s("key", "value"), s("foo", "bar")]),
1121            table_creation.properties
1122        );
1123    }
1124
1125    fn test_serde_json<T: Serialize + DeserializeOwned + PartialEq + Debug>(
1126        json: impl ToString,
1127        expected: T,
1128    ) {
1129        let json_str = json.to_string();
1130        let actual: T = serde_json::from_str(&json_str).expect("Failed to parse from json");
1131        assert_eq!(actual, expected, "Parsed value is not equal to expected");
1132
1133        let restored: T = serde_json::from_str(
1134            &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1135        )
1136        .expect("Failed to parse from serialized json");
1137
1138        assert_eq!(
1139            restored, expected,
1140            "Parsed restored value is not equal to expected"
1141        );
1142    }
1143
1144    fn metadata() -> TableMetadata {
1145        let tbl_creation = TableCreation::builder()
1146            .name("table".to_string())
1147            .location("/path/to/table".to_string())
1148            .schema(Schema::builder().build().unwrap())
1149            .build();
1150
1151        TableMetadataBuilder::from_table_creation(tbl_creation)
1152            .unwrap()
1153            .assign_uuid(uuid::Uuid::nil())
1154            .build()
1155            .unwrap()
1156            .metadata
1157    }
1158
1159    #[test]
1160    fn test_check_requirement_not_exist() {
1161        let metadata = metadata();
1162        let requirement = TableRequirement::NotExist;
1163
1164        assert!(requirement.check(Some(&metadata)).is_err());
1165        assert!(requirement.check(None).is_ok());
1166    }
1167
1168    #[test]
1169    fn test_check_table_uuid() {
1170        let metadata = metadata();
1171
1172        let requirement = TableRequirement::UuidMatch {
1173            uuid: uuid::Uuid::now_v7(),
1174        };
1175        assert!(requirement.check(Some(&metadata)).is_err());
1176
1177        let requirement = TableRequirement::UuidMatch {
1178            uuid: uuid::Uuid::nil(),
1179        };
1180        assert!(requirement.check(Some(&metadata)).is_ok());
1181    }
1182
1183    #[test]
1184    fn test_check_ref_snapshot_id() {
1185        let metadata = metadata();
1186
1187        // Ref does not exist but should
1188        let requirement = TableRequirement::RefSnapshotIdMatch {
1189            r#ref: "my_branch".to_string(),
1190            snapshot_id: Some(1),
1191        };
1192        assert!(requirement.check(Some(&metadata)).is_err());
1193
1194        // Ref does not exist and should not
1195        let requirement = TableRequirement::RefSnapshotIdMatch {
1196            r#ref: "my_branch".to_string(),
1197            snapshot_id: None,
1198        };
1199        assert!(requirement.check(Some(&metadata)).is_ok());
1200
1201        // Add snapshot
1202        let snapshot = Snapshot::builder()
1203            .with_snapshot_id(3051729675574597004)
1204            .with_sequence_number(10)
1205            .with_timestamp_ms(9992191116217)
1206            .with_manifest_list("s3://b/wh/.../s1.avro".to_string())
1207            .with_schema_id(0)
1208            .with_summary(Summary {
1209                operation: Operation::Append,
1210                additional_properties: HashMap::new(),
1211            })
1212            .build();
1213
1214        let builder = metadata.into_builder(None);
1215        let builder = TableUpdate::AddSnapshot {
1216            snapshot: snapshot.clone(),
1217        }
1218        .apply(builder)
1219        .unwrap();
1220        let metadata = TableUpdate::SetSnapshotRef {
1221            ref_name: MAIN_BRANCH.to_string(),
1222            reference: SnapshotReference {
1223                snapshot_id: snapshot.snapshot_id(),
1224                retention: SnapshotRetention::Branch {
1225                    min_snapshots_to_keep: Some(10),
1226                    max_snapshot_age_ms: None,
1227                    max_ref_age_ms: None,
1228                },
1229            },
1230        }
1231        .apply(builder)
1232        .unwrap()
1233        .build()
1234        .unwrap()
1235        .metadata;
1236
1237        // Ref exists and should match
1238        let requirement = TableRequirement::RefSnapshotIdMatch {
1239            r#ref: "main".to_string(),
1240            snapshot_id: Some(3051729675574597004),
1241        };
1242        assert!(requirement.check(Some(&metadata)).is_ok());
1243
1244        // Ref exists but does not match
1245        let requirement = TableRequirement::RefSnapshotIdMatch {
1246            r#ref: "main".to_string(),
1247            snapshot_id: Some(1),
1248        };
1249        assert!(requirement.check(Some(&metadata)).is_err());
1250    }
1251
1252    #[test]
1253    fn test_check_last_assigned_field_id() {
1254        let metadata = metadata();
1255
1256        let requirement = TableRequirement::LastAssignedFieldIdMatch {
1257            last_assigned_field_id: 1,
1258        };
1259        assert!(requirement.check(Some(&metadata)).is_err());
1260
1261        let requirement = TableRequirement::LastAssignedFieldIdMatch {
1262            last_assigned_field_id: 0,
1263        };
1264        assert!(requirement.check(Some(&metadata)).is_ok());
1265    }
1266
1267    #[test]
1268    fn test_check_current_schema_id() {
1269        let metadata = metadata();
1270
1271        let requirement = TableRequirement::CurrentSchemaIdMatch {
1272            current_schema_id: 1,
1273        };
1274        assert!(requirement.check(Some(&metadata)).is_err());
1275
1276        let requirement = TableRequirement::CurrentSchemaIdMatch {
1277            current_schema_id: 0,
1278        };
1279        assert!(requirement.check(Some(&metadata)).is_ok());
1280    }
1281
1282    #[test]
1283    fn test_check_last_assigned_partition_id() {
1284        let metadata = metadata();
1285        let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1286            last_assigned_partition_id: 0,
1287        };
1288        assert!(requirement.check(Some(&metadata)).is_err());
1289
1290        let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1291            last_assigned_partition_id: 999,
1292        };
1293        assert!(requirement.check(Some(&metadata)).is_ok());
1294    }
1295
1296    #[test]
1297    fn test_check_default_spec_id() {
1298        let metadata = metadata();
1299
1300        let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 1 };
1301        assert!(requirement.check(Some(&metadata)).is_err());
1302
1303        let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 0 };
1304        assert!(requirement.check(Some(&metadata)).is_ok());
1305    }
1306
1307    #[test]
1308    fn test_check_default_sort_order_id() {
1309        let metadata = metadata();
1310
1311        let requirement = TableRequirement::DefaultSortOrderIdMatch {
1312            default_sort_order_id: 1,
1313        };
1314        assert!(requirement.check(Some(&metadata)).is_err());
1315
1316        let requirement = TableRequirement::DefaultSortOrderIdMatch {
1317            default_sort_order_id: 0,
1318        };
1319        assert!(requirement.check(Some(&metadata)).is_ok());
1320    }
1321
1322    #[test]
1323    fn test_table_uuid() {
1324        test_serde_json(
1325            r#"
1326{
1327    "type": "assert-table-uuid",
1328    "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1329}
1330        "#,
1331            TableRequirement::UuidMatch {
1332                uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1333            },
1334        );
1335    }
1336
1337    #[test]
1338    fn test_assert_table_not_exists() {
1339        test_serde_json(
1340            r#"
1341{
1342    "type": "assert-create"
1343}
1344        "#,
1345            TableRequirement::NotExist,
1346        );
1347    }
1348
1349    #[test]
1350    fn test_assert_ref_snapshot_id() {
1351        test_serde_json(
1352            r#"
1353{
1354    "type": "assert-ref-snapshot-id",
1355    "ref": "snapshot-name",
1356    "snapshot-id": null
1357}
1358        "#,
1359            TableRequirement::RefSnapshotIdMatch {
1360                r#ref: "snapshot-name".to_string(),
1361                snapshot_id: None,
1362            },
1363        );
1364
1365        test_serde_json(
1366            r#"
1367{
1368    "type": "assert-ref-snapshot-id",
1369    "ref": "snapshot-name",
1370    "snapshot-id": 1
1371}
1372        "#,
1373            TableRequirement::RefSnapshotIdMatch {
1374                r#ref: "snapshot-name".to_string(),
1375                snapshot_id: Some(1),
1376            },
1377        );
1378    }
1379
1380    #[test]
1381    fn test_assert_last_assigned_field_id() {
1382        test_serde_json(
1383            r#"
1384{
1385    "type": "assert-last-assigned-field-id",
1386    "last-assigned-field-id": 12
1387}
1388        "#,
1389            TableRequirement::LastAssignedFieldIdMatch {
1390                last_assigned_field_id: 12,
1391            },
1392        );
1393    }
1394
1395    #[test]
1396    fn test_assert_current_schema_id() {
1397        test_serde_json(
1398            r#"
1399{
1400    "type": "assert-current-schema-id",
1401    "current-schema-id": 4
1402}
1403        "#,
1404            TableRequirement::CurrentSchemaIdMatch {
1405                current_schema_id: 4,
1406            },
1407        );
1408    }
1409
1410    #[test]
1411    fn test_assert_last_assigned_partition_id() {
1412        test_serde_json(
1413            r#"
1414{
1415    "type": "assert-last-assigned-partition-id",
1416    "last-assigned-partition-id": 1004
1417}
1418        "#,
1419            TableRequirement::LastAssignedPartitionIdMatch {
1420                last_assigned_partition_id: 1004,
1421            },
1422        );
1423    }
1424
1425    #[test]
1426    fn test_assert_default_spec_id() {
1427        test_serde_json(
1428            r#"
1429{
1430    "type": "assert-default-spec-id",
1431    "default-spec-id": 5
1432}
1433        "#,
1434            TableRequirement::DefaultSpecIdMatch { default_spec_id: 5 },
1435        );
1436    }
1437
1438    #[test]
1439    fn test_assert_default_sort_order() {
1440        let json = r#"
1441{
1442    "type": "assert-default-sort-order-id",
1443    "default-sort-order-id": 10
1444}
1445        "#;
1446
1447        let update = TableRequirement::DefaultSortOrderIdMatch {
1448            default_sort_order_id: 10,
1449        };
1450
1451        test_serde_json(json, update);
1452    }
1453
1454    #[test]
1455    fn test_parse_assert_invalid() {
1456        assert!(
1457            serde_json::from_str::<TableRequirement>(
1458                r#"
1459{
1460    "default-sort-order-id": 10
1461}
1462"#
1463            )
1464            .is_err(),
1465            "Table requirements should not be parsed without type."
1466        );
1467    }
1468
1469    #[test]
1470    fn test_assign_uuid() {
1471        test_serde_json(
1472            r#"
1473{
1474    "action": "assign-uuid",
1475    "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1476}
1477        "#,
1478            TableUpdate::AssignUuid {
1479                uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1480            },
1481        );
1482    }
1483
1484    #[test]
1485    fn test_upgrade_format_version() {
1486        test_serde_json(
1487            r#"
1488{
1489    "action": "upgrade-format-version",
1490    "format-version": 2
1491}
1492        "#,
1493            TableUpdate::UpgradeFormatVersion {
1494                format_version: FormatVersion::V2,
1495            },
1496        );
1497    }
1498
1499    #[test]
1500    fn test_add_schema() {
1501        let test_schema = Schema::builder()
1502            .with_schema_id(1)
1503            .with_identifier_field_ids(vec![2])
1504            .with_fields(vec![
1505                NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1506                NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1507                NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1508            ])
1509            .build()
1510            .unwrap();
1511        test_serde_json(
1512            r#"
1513{
1514    "action": "add-schema",
1515    "schema": {
1516        "type": "struct",
1517        "schema-id": 1,
1518        "fields": [
1519            {
1520                "id": 1,
1521                "name": "foo",
1522                "required": false,
1523                "type": "string"
1524            },
1525            {
1526                "id": 2,
1527                "name": "bar",
1528                "required": true,
1529                "type": "int"
1530            },
1531            {
1532                "id": 3,
1533                "name": "baz",
1534                "required": false,
1535                "type": "boolean"
1536            }
1537        ],
1538        "identifier-field-ids": [
1539            2
1540        ]
1541    },
1542    "last-column-id": 3
1543}
1544        "#,
1545            TableUpdate::AddSchema {
1546                schema: test_schema.clone(),
1547            },
1548        );
1549
1550        test_serde_json(
1551            r#"
1552{
1553    "action": "add-schema",
1554    "schema": {
1555        "type": "struct",
1556        "schema-id": 1,
1557        "fields": [
1558            {
1559                "id": 1,
1560                "name": "foo",
1561                "required": false,
1562                "type": "string"
1563            },
1564            {
1565                "id": 2,
1566                "name": "bar",
1567                "required": true,
1568                "type": "int"
1569            },
1570            {
1571                "id": 3,
1572                "name": "baz",
1573                "required": false,
1574                "type": "boolean"
1575            }
1576        ],
1577        "identifier-field-ids": [
1578            2
1579        ]
1580    }
1581}
1582        "#,
1583            TableUpdate::AddSchema {
1584                schema: test_schema.clone(),
1585            },
1586        );
1587    }
1588
1589    #[test]
1590    fn test_set_current_schema() {
1591        test_serde_json(
1592            r#"
1593{
1594   "action": "set-current-schema",
1595   "schema-id": 23
1596}
1597        "#,
1598            TableUpdate::SetCurrentSchema { schema_id: 23 },
1599        );
1600    }
1601
1602    #[test]
1603    fn test_add_spec() {
1604        test_serde_json(
1605            r#"
1606{
1607    "action": "add-spec",
1608    "spec": {
1609        "fields": [
1610            {
1611                "source-id": 4,
1612                "name": "ts_day",
1613                "transform": "day"
1614            },
1615            {
1616                "source-id": 1,
1617                "name": "id_bucket",
1618                "transform": "bucket[16]"
1619            },
1620            {
1621                "source-id": 2,
1622                "name": "id_truncate",
1623                "transform": "truncate[4]"
1624            }
1625        ]
1626    }
1627}
1628        "#,
1629            TableUpdate::AddSpec {
1630                spec: UnboundPartitionSpec::builder()
1631                    .add_partition_field(4, "ts_day".to_string(), Transform::Day)
1632                    .unwrap()
1633                    .add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16))
1634                    .unwrap()
1635                    .add_partition_field(2, "id_truncate".to_string(), Transform::Truncate(4))
1636                    .unwrap()
1637                    .build(),
1638            },
1639        );
1640    }
1641
1642    #[test]
1643    fn test_set_default_spec() {
1644        test_serde_json(
1645            r#"
1646{
1647    "action": "set-default-spec",
1648    "spec-id": 1
1649}
1650        "#,
1651            TableUpdate::SetDefaultSpec { spec_id: 1 },
1652        )
1653    }
1654
1655    #[test]
1656    fn test_add_sort_order() {
1657        let json = r#"
1658{
1659    "action": "add-sort-order",
1660    "sort-order": {
1661        "order-id": 1,
1662        "fields": [
1663            {
1664                "transform": "identity",
1665                "source-id": 2,
1666                "direction": "asc",
1667                "null-order": "nulls-first"
1668            },
1669            {
1670                "transform": "bucket[4]",
1671                "source-id": 3,
1672                "direction": "desc",
1673                "null-order": "nulls-last"
1674            }
1675        ]
1676    }
1677}
1678        "#;
1679
1680        let update = TableUpdate::AddSortOrder {
1681            sort_order: SortOrder::builder()
1682                .with_order_id(1)
1683                .with_sort_field(
1684                    SortField::builder()
1685                        .source_id(2)
1686                        .direction(SortDirection::Ascending)
1687                        .null_order(NullOrder::First)
1688                        .transform(Transform::Identity)
1689                        .build(),
1690                )
1691                .with_sort_field(
1692                    SortField::builder()
1693                        .source_id(3)
1694                        .direction(SortDirection::Descending)
1695                        .null_order(NullOrder::Last)
1696                        .transform(Transform::Bucket(4))
1697                        .build(),
1698                )
1699                .build_unbound()
1700                .unwrap(),
1701        };
1702
1703        test_serde_json(json, update);
1704    }
1705
1706    #[test]
1707    fn test_set_default_order() {
1708        let json = r#"
1709{
1710    "action": "set-default-sort-order",
1711    "sort-order-id": 2
1712}
1713        "#;
1714        let update = TableUpdate::SetDefaultSortOrder { sort_order_id: 2 };
1715
1716        test_serde_json(json, update);
1717    }
1718
1719    #[test]
1720    fn test_add_snapshot() {
1721        let json = r#"
1722{
1723    "action": "add-snapshot",
1724    "snapshot": {
1725        "snapshot-id": 3055729675574597000,
1726        "parent-snapshot-id": 3051729675574597000,
1727        "timestamp-ms": 1555100955770,
1728        "sequence-number": 1,
1729        "summary": {
1730            "operation": "append"
1731        },
1732        "manifest-list": "s3://a/b/2.avro",
1733        "schema-id": 1
1734    }
1735}
1736        "#;
1737
1738        let update = TableUpdate::AddSnapshot {
1739            snapshot: Snapshot::builder()
1740                .with_snapshot_id(3055729675574597000)
1741                .with_parent_snapshot_id(Some(3051729675574597000))
1742                .with_timestamp_ms(1555100955770)
1743                .with_sequence_number(1)
1744                .with_manifest_list("s3://a/b/2.avro")
1745                .with_schema_id(1)
1746                .with_summary(Summary {
1747                    operation: Operation::Append,
1748                    additional_properties: HashMap::default(),
1749                })
1750                .build(),
1751        };
1752
1753        test_serde_json(json, update);
1754    }
1755
1756    #[test]
1757    fn test_add_snapshot_v1() {
1758        let json = r#"
1759{
1760    "action": "add-snapshot",
1761    "snapshot": {
1762        "snapshot-id": 3055729675574597000,
1763        "parent-snapshot-id": 3051729675574597000,
1764        "timestamp-ms": 1555100955770,
1765        "summary": {
1766            "operation": "append"
1767        },
1768        "manifest-list": "s3://a/b/2.avro"
1769    }
1770}
1771    "#;
1772
1773        let update = TableUpdate::AddSnapshot {
1774            snapshot: Snapshot::builder()
1775                .with_snapshot_id(3055729675574597000)
1776                .with_parent_snapshot_id(Some(3051729675574597000))
1777                .with_timestamp_ms(1555100955770)
1778                .with_sequence_number(0)
1779                .with_manifest_list("s3://a/b/2.avro")
1780                .with_summary(Summary {
1781                    operation: Operation::Append,
1782                    additional_properties: HashMap::default(),
1783                })
1784                .build(),
1785        };
1786
1787        let actual: TableUpdate = serde_json::from_str(json).expect("Failed to parse from json");
1788        assert_eq!(actual, update, "Parsed value is not equal to expected");
1789    }
1790
1791    #[test]
1792    fn test_add_snapshot_v3() {
1793        let json = serde_json::json!(
1794        {
1795            "action": "add-snapshot",
1796            "snapshot": {
1797                "snapshot-id": 3055729675574597000i64,
1798                "parent-snapshot-id": 3051729675574597000i64,
1799                "timestamp-ms": 1555100955770i64,
1800                "first-row-id":0,
1801                "added-rows":2,
1802                "key-id":"key123",
1803                "summary": {
1804                    "operation": "append"
1805                },
1806                "manifest-list": "s3://a/b/2.avro"
1807            }
1808        });
1809
1810        let update = TableUpdate::AddSnapshot {
1811            snapshot: Snapshot::builder()
1812                .with_snapshot_id(3055729675574597000)
1813                .with_parent_snapshot_id(Some(3051729675574597000))
1814                .with_timestamp_ms(1555100955770)
1815                .with_sequence_number(0)
1816                .with_manifest_list("s3://a/b/2.avro")
1817                .with_row_range(0, 2)
1818                .with_encryption_key_id(Some("key123".to_string()))
1819                .with_summary(Summary {
1820                    operation: Operation::Append,
1821                    additional_properties: HashMap::default(),
1822                })
1823                .build(),
1824        };
1825
1826        let actual: TableUpdate = serde_json::from_value(json).expect("Failed to parse from json");
1827        assert_eq!(actual, update, "Parsed value is not equal to expected");
1828        let restored: TableUpdate = serde_json::from_str(
1829            &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1830        )
1831        .expect("Failed to parse from serialized json");
1832        assert_eq!(restored, update);
1833    }
1834
1835    #[test]
1836    fn test_remove_snapshots() {
1837        let json = r#"
1838{
1839    "action": "remove-snapshots",
1840    "snapshot-ids": [
1841        1,
1842        2
1843    ]
1844}
1845        "#;
1846
1847        let update = TableUpdate::RemoveSnapshots {
1848            snapshot_ids: vec![1, 2],
1849        };
1850        test_serde_json(json, update);
1851    }
1852
1853    #[test]
1854    fn test_remove_snapshot_ref() {
1855        let json = r#"
1856{
1857    "action": "remove-snapshot-ref",
1858    "ref-name": "snapshot-ref"
1859}
1860        "#;
1861
1862        let update = TableUpdate::RemoveSnapshotRef {
1863            ref_name: "snapshot-ref".to_string(),
1864        };
1865        test_serde_json(json, update);
1866    }
1867
1868    #[test]
1869    fn test_set_snapshot_ref_tag() {
1870        let json = r#"
1871{
1872    "action": "set-snapshot-ref",
1873    "type": "tag",
1874    "ref-name": "hank",
1875    "snapshot-id": 1,
1876    "max-ref-age-ms": 1
1877}
1878        "#;
1879
1880        let update = TableUpdate::SetSnapshotRef {
1881            ref_name: "hank".to_string(),
1882            reference: SnapshotReference {
1883                snapshot_id: 1,
1884                retention: SnapshotRetention::Tag {
1885                    max_ref_age_ms: Some(1),
1886                },
1887            },
1888        };
1889
1890        test_serde_json(json, update);
1891    }
1892
1893    #[test]
1894    fn test_set_snapshot_ref_branch() {
1895        let json = r#"
1896{
1897    "action": "set-snapshot-ref",
1898    "type": "branch",
1899    "ref-name": "hank",
1900    "snapshot-id": 1,
1901    "min-snapshots-to-keep": 2,
1902    "max-snapshot-age-ms": 3,
1903    "max-ref-age-ms": 4
1904}
1905        "#;
1906
1907        let update = TableUpdate::SetSnapshotRef {
1908            ref_name: "hank".to_string(),
1909            reference: SnapshotReference {
1910                snapshot_id: 1,
1911                retention: SnapshotRetention::Branch {
1912                    min_snapshots_to_keep: Some(2),
1913                    max_snapshot_age_ms: Some(3),
1914                    max_ref_age_ms: Some(4),
1915                },
1916            },
1917        };
1918
1919        test_serde_json(json, update);
1920    }
1921
1922    #[test]
1923    fn test_set_properties() {
1924        let json = r#"
1925{
1926    "action": "set-properties",
1927    "updates": {
1928        "prop1": "v1",
1929        "prop2": "v2"
1930    }
1931}
1932        "#;
1933
1934        let update = TableUpdate::SetProperties {
1935            updates: vec![
1936                ("prop1".to_string(), "v1".to_string()),
1937                ("prop2".to_string(), "v2".to_string()),
1938            ]
1939            .into_iter()
1940            .collect(),
1941        };
1942
1943        test_serde_json(json, update);
1944    }
1945
1946    #[test]
1947    fn test_remove_properties() {
1948        let json = r#"
1949{
1950    "action": "remove-properties",
1951    "removals": [
1952        "prop1",
1953        "prop2"
1954    ]
1955}
1956        "#;
1957
1958        let update = TableUpdate::RemoveProperties {
1959            removals: vec!["prop1".to_string(), "prop2".to_string()],
1960        };
1961
1962        test_serde_json(json, update);
1963    }
1964
1965    #[test]
1966    fn test_set_location() {
1967        let json = r#"
1968{
1969    "action": "set-location",
1970    "location": "s3://bucket/warehouse/tbl_location"
1971}
1972    "#;
1973
1974        let update = TableUpdate::SetLocation {
1975            location: "s3://bucket/warehouse/tbl_location".to_string(),
1976        };
1977
1978        test_serde_json(json, update);
1979    }
1980
1981    #[test]
1982    fn test_table_update_apply() {
1983        let table_creation = TableCreation::builder()
1984            .location("s3://db/table".to_string())
1985            .name("table".to_string())
1986            .properties(HashMap::new())
1987            .schema(Schema::builder().build().unwrap())
1988            .build();
1989        let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
1990            .unwrap()
1991            .build()
1992            .unwrap()
1993            .metadata;
1994        let table_metadata_builder = TableMetadataBuilder::new_from_metadata(
1995            table_metadata,
1996            Some("s3://db/table/metadata/metadata1.gz.json".to_string()),
1997        );
1998
1999        let uuid = uuid::Uuid::new_v4();
2000        let update = TableUpdate::AssignUuid { uuid };
2001        let updated_metadata = update
2002            .apply(table_metadata_builder)
2003            .unwrap()
2004            .build()
2005            .unwrap()
2006            .metadata;
2007        assert_eq!(updated_metadata.uuid(), uuid);
2008    }
2009
2010    #[test]
2011    fn test_view_assign_uuid() {
2012        test_serde_json(
2013            r#"
2014{
2015    "action": "assign-uuid",
2016    "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
2017}
2018        "#,
2019            ViewUpdate::AssignUuid {
2020                uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
2021            },
2022        );
2023    }
2024
2025    #[test]
2026    fn test_view_upgrade_format_version() {
2027        test_serde_json(
2028            r#"
2029{
2030    "action": "upgrade-format-version",
2031    "format-version": 1
2032}
2033        "#,
2034            ViewUpdate::UpgradeFormatVersion {
2035                format_version: ViewFormatVersion::V1,
2036            },
2037        );
2038    }
2039
2040    #[test]
2041    fn test_view_add_schema() {
2042        let test_schema = Schema::builder()
2043            .with_schema_id(1)
2044            .with_identifier_field_ids(vec![2])
2045            .with_fields(vec![
2046                NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
2047                NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2048                NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
2049            ])
2050            .build()
2051            .unwrap();
2052        test_serde_json(
2053            r#"
2054{
2055    "action": "add-schema",
2056    "schema": {
2057        "type": "struct",
2058        "schema-id": 1,
2059        "fields": [
2060            {
2061                "id": 1,
2062                "name": "foo",
2063                "required": false,
2064                "type": "string"
2065            },
2066            {
2067                "id": 2,
2068                "name": "bar",
2069                "required": true,
2070                "type": "int"
2071            },
2072            {
2073                "id": 3,
2074                "name": "baz",
2075                "required": false,
2076                "type": "boolean"
2077            }
2078        ],
2079        "identifier-field-ids": [
2080            2
2081        ]
2082    },
2083    "last-column-id": 3
2084}
2085        "#,
2086            ViewUpdate::AddSchema {
2087                schema: test_schema.clone(),
2088                last_column_id: Some(3),
2089            },
2090        );
2091    }
2092
2093    #[test]
2094    fn test_view_set_location() {
2095        test_serde_json(
2096            r#"
2097{
2098    "action": "set-location",
2099    "location": "s3://db/view"
2100}
2101        "#,
2102            ViewUpdate::SetLocation {
2103                location: "s3://db/view".to_string(),
2104            },
2105        );
2106    }
2107
2108    #[test]
2109    fn test_view_set_properties() {
2110        test_serde_json(
2111            r#"
2112{
2113    "action": "set-properties",
2114    "updates": {
2115        "prop1": "v1",
2116        "prop2": "v2"
2117    }
2118}
2119        "#,
2120            ViewUpdate::SetProperties {
2121                updates: vec![
2122                    ("prop1".to_string(), "v1".to_string()),
2123                    ("prop2".to_string(), "v2".to_string()),
2124                ]
2125                .into_iter()
2126                .collect(),
2127            },
2128        );
2129    }
2130
2131    #[test]
2132    fn test_view_remove_properties() {
2133        test_serde_json(
2134            r#"
2135{
2136    "action": "remove-properties",
2137    "removals": [
2138        "prop1",
2139        "prop2"
2140    ]
2141}
2142        "#,
2143            ViewUpdate::RemoveProperties {
2144                removals: vec!["prop1".to_string(), "prop2".to_string()],
2145            },
2146        );
2147    }
2148
2149    #[test]
2150    fn test_view_add_view_version() {
2151        test_serde_json(
2152            r#"
2153{
2154    "action": "add-view-version",
2155    "view-version": {
2156            "version-id" : 1,
2157            "timestamp-ms" : 1573518431292,
2158            "schema-id" : 1,
2159            "default-catalog" : "prod",
2160            "default-namespace" : [ "default" ],
2161            "summary" : {
2162              "engine-name" : "Spark"
2163            },
2164            "representations" : [ {
2165              "type" : "sql",
2166              "sql" : "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
2167              "dialect" : "spark"
2168            } ]
2169    }
2170}
2171        "#,
2172            ViewUpdate::AddViewVersion {
2173                view_version: ViewVersion::builder()
2174                    .with_version_id(1)
2175                    .with_timestamp_ms(1573518431292)
2176                    .with_schema_id(1)
2177                    .with_default_catalog(Some("prod".to_string()))
2178                    .with_default_namespace(NamespaceIdent::from_strs(vec!["default"]).unwrap())
2179                    .with_summary(
2180                        vec![("engine-name".to_string(), "Spark".to_string())]
2181                            .into_iter()
2182                            .collect(),
2183                    )
2184                    .with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(SqlViewRepresentation {
2185                        sql: "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2".to_string(),
2186                        dialect: "spark".to_string(),
2187                    })]))
2188                    .build(),
2189            },
2190        );
2191    }
2192
2193    #[test]
2194    fn test_view_set_current_view_version() {
2195        test_serde_json(
2196            r#"
2197{
2198    "action": "set-current-view-version",
2199    "view-version-id": 1
2200}
2201        "#,
2202            ViewUpdate::SetCurrentViewVersion { view_version_id: 1 },
2203        );
2204    }
2205
2206    #[test]
2207    fn test_remove_partition_specs_update() {
2208        test_serde_json(
2209            r#"
2210{
2211    "action": "remove-partition-specs",
2212    "spec-ids": [1, 2]
2213}
2214        "#,
2215            TableUpdate::RemovePartitionSpecs {
2216                spec_ids: vec![1, 2],
2217            },
2218        );
2219    }
2220
2221    #[test]
2222    fn test_set_statistics_file() {
2223        test_serde_json(
2224            r#"
2225        {
2226                "action": "set-statistics",
2227                "snapshot-id": 1940541653261589030,
2228                "statistics": {
2229                        "snapshot-id": 1940541653261589030,
2230                        "statistics-path": "s3://bucket/warehouse/stats.puffin",
2231                        "file-size-in-bytes": 124,
2232                        "file-footer-size-in-bytes": 27,
2233                        "blob-metadata": [
2234                                {
2235                                        "type": "boring-type",
2236                                        "snapshot-id": 1940541653261589030,
2237                                        "sequence-number": 2,
2238                                        "fields": [
2239                                                1
2240                                        ],
2241                                        "properties": {
2242                                                "prop-key": "prop-value"
2243                                        }
2244                                }
2245                        ]
2246                }
2247        }
2248        "#,
2249            TableUpdate::SetStatistics {
2250                statistics: StatisticsFile {
2251                    snapshot_id: 1940541653261589030,
2252                    statistics_path: "s3://bucket/warehouse/stats.puffin".to_string(),
2253                    file_size_in_bytes: 124,
2254                    file_footer_size_in_bytes: 27,
2255                    key_metadata: None,
2256                    blob_metadata: vec![BlobMetadata {
2257                        r#type: "boring-type".to_string(),
2258                        snapshot_id: 1940541653261589030,
2259                        sequence_number: 2,
2260                        fields: vec![1],
2261                        properties: vec![("prop-key".to_string(), "prop-value".to_string())]
2262                            .into_iter()
2263                            .collect(),
2264                    }],
2265                },
2266            },
2267        );
2268    }
2269
2270    #[test]
2271    fn test_remove_statistics_file() {
2272        test_serde_json(
2273            r#"
2274        {
2275                "action": "remove-statistics",
2276                "snapshot-id": 1940541653261589030
2277        }
2278        "#,
2279            TableUpdate::RemoveStatistics {
2280                snapshot_id: 1940541653261589030,
2281            },
2282        );
2283    }
2284
2285    #[test]
2286    fn test_set_partition_statistics_file() {
2287        test_serde_json(
2288            r#"
2289            {
2290                "action": "set-partition-statistics",
2291                "partition-statistics": {
2292                    "snapshot-id": 1940541653261589030,
2293                    "statistics-path": "s3://bucket/warehouse/stats1.parquet",
2294                    "file-size-in-bytes": 43
2295                }
2296            }
2297            "#,
2298            TableUpdate::SetPartitionStatistics {
2299                partition_statistics: PartitionStatisticsFile {
2300                    snapshot_id: 1940541653261589030,
2301                    statistics_path: "s3://bucket/warehouse/stats1.parquet".to_string(),
2302                    file_size_in_bytes: 43,
2303                },
2304            },
2305        )
2306    }
2307
2308    #[test]
2309    fn test_remove_partition_statistics_file() {
2310        test_serde_json(
2311            r#"
2312            {
2313                "action": "remove-partition-statistics",
2314                "snapshot-id": 1940541653261589030
2315            }
2316            "#,
2317            TableUpdate::RemovePartitionStatistics {
2318                snapshot_id: 1940541653261589030,
2319            },
2320        )
2321    }
2322
2323    #[test]
2324    fn test_remove_schema_update() {
2325        test_serde_json(
2326            r#"
2327                {
2328                    "action": "remove-schemas",
2329                    "schema-ids": [1, 2]
2330                }        
2331            "#,
2332            TableUpdate::RemoveSchemas {
2333                schema_ids: vec![1, 2],
2334            },
2335        );
2336    }
2337
2338    #[test]
2339    fn test_add_encryption_key() {
2340        let key_bytes = "key".as_bytes();
2341        let encoded_key = base64::engine::general_purpose::STANDARD.encode(key_bytes);
2342        test_serde_json(
2343            format!(
2344                r#"
2345                {{
2346                    "action": "add-encryption-key",
2347                    "encryption-key": {{
2348                        "key-id": "a",
2349                        "encrypted-key-metadata": "{encoded_key}",
2350                        "encrypted-by-id": "b"
2351                    }}
2352                }}        
2353            "#
2354            ),
2355            TableUpdate::AddEncryptionKey {
2356                encryption_key: EncryptedKey::builder()
2357                    .key_id("a")
2358                    .encrypted_key_metadata(key_bytes.to_vec())
2359                    .encrypted_by_id("b")
2360                    .build(),
2361            },
2362        );
2363    }
2364
2365    #[test]
2366    fn test_remove_encryption_key() {
2367        test_serde_json(
2368            r#"
2369                {
2370                    "action": "remove-encryption-key",
2371                    "key-id": "a"
2372                }        
2373            "#,
2374            TableUpdate::RemoveEncryptionKey {
2375                key_id: "a".to_string(),
2376            },
2377        );
2378    }
2379
2380    #[test]
2381    fn test_table_commit() {
2382        let table = {
2383            let file = File::open(format!(
2384                "{}/testdata/table_metadata/{}",
2385                env!("CARGO_MANIFEST_DIR"),
2386                "TableMetadataV2Valid.json"
2387            ))
2388            .unwrap();
2389            let reader = BufReader::new(file);
2390            let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2391
2392            Table::builder()
2393                .metadata(resp)
2394                .metadata_location("s3://bucket/test/location/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string())
2395                .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2396                .file_io(FileIO::new_with_memory())
2397                .build()
2398                .unwrap()
2399        };
2400
2401        let updates = vec![
2402            TableUpdate::SetLocation {
2403                location: "s3://bucket/test/new_location/data".to_string(),
2404            },
2405            TableUpdate::SetProperties {
2406                updates: vec![
2407                    ("prop1".to_string(), "v1".to_string()),
2408                    ("prop2".to_string(), "v2".to_string()),
2409                ]
2410                .into_iter()
2411                .collect(),
2412            },
2413        ];
2414
2415        let requirements = vec![TableRequirement::UuidMatch {
2416            uuid: table.metadata().table_uuid,
2417        }];
2418
2419        let table_commit = TableCommit::builder()
2420            .ident(table.identifier().to_owned())
2421            .updates(updates)
2422            .requirements(requirements)
2423            .build();
2424
2425        let updated_table = table_commit.apply(table).unwrap();
2426
2427        assert_eq!(
2428            updated_table.metadata().properties.get("prop1").unwrap(),
2429            "v1"
2430        );
2431        assert_eq!(
2432            updated_table.metadata().properties.get("prop2").unwrap(),
2433            "v2"
2434        );
2435
2436        // metadata version should be bumped
2437        assert!(
2438            updated_table
2439                .metadata_location()
2440                .unwrap()
2441                .starts_with("s3://bucket/test/location/metadata/00001-")
2442        );
2443
2444        assert_eq!(
2445            updated_table.metadata().location,
2446            "s3://bucket/test/new_location/data",
2447        );
2448    }
2449}