iceberg/catalog/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Catalog API for Apache Iceberg
19
20pub mod memory;
21mod metadata_location;
22pub(crate) mod utils;
23
24use std::collections::HashMap;
25use std::fmt::{Debug, Display};
26use std::future::Future;
27use std::mem::take;
28use std::ops::Deref;
29use std::str::FromStr;
30use std::sync::Arc;
31
32use _serde::{deserialize_snapshot, serialize_snapshot};
33use async_trait::async_trait;
34pub use memory::MemoryCatalog;
35pub use metadata_location::*;
36#[cfg(test)]
37use mockall::automock;
38use serde_derive::{Deserialize, Serialize};
39use typed_builder::TypedBuilder;
40use uuid::Uuid;
41
42use crate::io::StorageFactory;
43use crate::spec::{
44    EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot,
45    SnapshotReference, SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder,
46    UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion,
47};
48use crate::table::Table;
49use crate::{Error, ErrorKind, Result};
50
51/// The catalog API for Iceberg Rust.
52#[async_trait]
53#[cfg_attr(test, automock)]
54pub trait Catalog: Debug + Sync + Send {
55    /// List namespaces inside the catalog.
56    async fn list_namespaces(&self, parent: Option<&NamespaceIdent>)
57    -> Result<Vec<NamespaceIdent>>;
58
59    /// Create a new namespace inside the catalog.
60    async fn create_namespace(
61        &self,
62        namespace: &NamespaceIdent,
63        properties: HashMap<String, String>,
64    ) -> Result<Namespace>;
65
66    /// Get a namespace information from the catalog.
67    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace>;
68
69    /// Check if namespace exists in catalog.
70    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool>;
71
72    /// Update a namespace inside the catalog.
73    ///
74    /// # Behavior
75    ///
76    /// The properties must be the full set of namespace.
77    async fn update_namespace(
78        &self,
79        namespace: &NamespaceIdent,
80        properties: HashMap<String, String>,
81    ) -> Result<()>;
82
83    /// Drop a namespace from the catalog, or returns error if it doesn't exist.
84    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()>;
85
86    /// List tables from namespace.
87    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>>;
88
89    /// Create a new table inside the namespace.
90    async fn create_table(
91        &self,
92        namespace: &NamespaceIdent,
93        creation: TableCreation,
94    ) -> Result<Table>;
95
96    /// Load table from the catalog.
97    async fn load_table(&self, table: &TableIdent) -> Result<Table>;
98
99    /// Drop a table from the catalog, or returns error if it doesn't exist.
100    async fn drop_table(&self, table: &TableIdent) -> Result<()>;
101
102    /// Drop a table from the catalog and delete the underlying table data.
103    ///
104    /// Implementations should load the table metadata, drop the table
105    /// from the catalog, then delete all associated data and metadata files.
106    /// The [`drop_table_data`](utils::drop_table_data) utility function can
107    /// be used for the file cleanup step.
108    async fn purge_table(&self, table: &TableIdent) -> Result<()>;
109
110    /// Check if a table exists in the catalog.
111    async fn table_exists(&self, table: &TableIdent) -> Result<bool>;
112
113    /// Rename a table in the catalog.
114    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()>;
115
116    /// Register an existing table to the catalog.
117    async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result<Table>;
118
119    /// Update a table to the catalog.
120    async fn update_table(&self, commit: TableCommit) -> Result<Table>;
121}
122
123/// Common interface for all catalog builders.
124pub trait CatalogBuilder: Default + Debug + Send + Sync {
125    /// The catalog type that this builder creates.
126    type C: Catalog;
127
128    /// Set a custom StorageFactory to use for storage operations.
129    ///
130    /// When a StorageFactory is provided, the catalog will use it to build FileIO
131    /// instances for all storage operations instead of using the default factory.
132    ///
133    /// # Arguments
134    ///
135    /// * `storage_factory` - The StorageFactory to use for creating storage instances
136    ///
137    /// # Example
138    ///
139    /// ```rust,ignore
140    /// use iceberg::CatalogBuilder;
141    /// use iceberg::io::StorageFactory;
142    /// use iceberg_storage_opendal::OpenDalStorageFactory;
143    /// use std::sync::Arc;
144    ///
145    /// let catalog = MyCatalogBuilder::default()
146    ///     .with_storage_factory(Arc::new(OpenDalStorageFactory::S3 {
147    ///         customized_credential_load: None,
148    ///     }))
149    ///     .load("my_catalog", props)
150    ///     .await?;
151    /// ```
152    fn with_storage_factory(self, storage_factory: Arc<dyn StorageFactory>) -> Self;
153
154    /// Create a new catalog instance.
155    fn load(
156        self,
157        name: impl Into<String>,
158        props: HashMap<String, String>,
159    ) -> impl Future<Output = Result<Self::C>> + Send;
160}
161
162/// NamespaceIdent represents the identifier of a namespace in the catalog.
163///
164/// The namespace identifier is a list of strings, where each string is a
165/// component of the namespace. It's the catalog implementer's responsibility to
166/// handle the namespace identifier correctly.
167#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
168pub struct NamespaceIdent(Vec<String>);
169
170impl NamespaceIdent {
171    /// Create a new namespace identifier with only one level.
172    pub fn new(name: String) -> Self {
173        Self(vec![name])
174    }
175
176    /// Create a multi-level namespace identifier from vector.
177    pub fn from_vec(names: Vec<String>) -> Result<Self> {
178        if names.is_empty() {
179            return Err(Error::new(
180                ErrorKind::DataInvalid,
181                "Namespace identifier can't be empty!",
182            ));
183        }
184        Ok(Self(names))
185    }
186
187    /// Try to create namespace identifier from an iterator of string.
188    pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
189        Self::from_vec(iter.into_iter().map(|s| s.to_string()).collect())
190    }
191
192    /// Returns a string for used in url.
193    pub fn to_url_string(&self) -> String {
194        self.as_ref().join("\u{001f}")
195    }
196
197    /// Returns inner strings.
198    pub fn inner(self) -> Vec<String> {
199        self.0
200    }
201
202    /// Get the parent of this namespace.
203    /// Returns None if this namespace only has a single element and thus has no parent.
204    pub fn parent(&self) -> Option<Self> {
205        self.0.split_last().and_then(|(_, parent)| {
206            if parent.is_empty() {
207                None
208            } else {
209                Some(Self(parent.to_vec()))
210            }
211        })
212    }
213}
214
215impl AsRef<Vec<String>> for NamespaceIdent {
216    fn as_ref(&self) -> &Vec<String> {
217        &self.0
218    }
219}
220
221impl Deref for NamespaceIdent {
222    type Target = [String];
223
224    fn deref(&self) -> &Self::Target {
225        &self.0
226    }
227}
228
229/// Namespace represents a namespace in the catalog.
230#[derive(Debug, Clone, PartialEq, Eq)]
231pub struct Namespace {
232    name: NamespaceIdent,
233    properties: HashMap<String, String>,
234}
235
236impl Namespace {
237    /// Create a new namespace.
238    pub fn new(name: NamespaceIdent) -> Self {
239        Self::with_properties(name, HashMap::default())
240    }
241
242    /// Create a new namespace with properties.
243    pub fn with_properties(name: NamespaceIdent, properties: HashMap<String, String>) -> Self {
244        Self { name, properties }
245    }
246
247    /// Get the name of the namespace.
248    pub fn name(&self) -> &NamespaceIdent {
249        &self.name
250    }
251
252    /// Get the properties of the namespace.
253    pub fn properties(&self) -> &HashMap<String, String> {
254        &self.properties
255    }
256}
257
258impl Display for NamespaceIdent {
259    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260        write!(f, "{}", self.0.join("."))
261    }
262}
263
264/// TableIdent represents the identifier of a table in the catalog.
265#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
266pub struct TableIdent {
267    /// Namespace of the table.
268    pub namespace: NamespaceIdent,
269    /// Table name.
270    pub name: String,
271}
272
273impl TableIdent {
274    /// Create a new table identifier.
275    pub fn new(namespace: NamespaceIdent, name: String) -> Self {
276        Self { namespace, name }
277    }
278
279    /// Get the namespace of the table.
280    pub fn namespace(&self) -> &NamespaceIdent {
281        &self.namespace
282    }
283
284    /// Get the name of the table.
285    pub fn name(&self) -> &str {
286        &self.name
287    }
288
289    /// Try to create table identifier from an iterator of string.
290    pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
291        let mut vec: Vec<String> = iter.into_iter().map(|s| s.to_string()).collect();
292        let table_name = vec.pop().ok_or_else(|| {
293            Error::new(ErrorKind::DataInvalid, "Table identifier can't be empty!")
294        })?;
295        let namespace_ident = NamespaceIdent::from_vec(vec)?;
296
297        Ok(Self {
298            namespace: namespace_ident,
299            name: table_name,
300        })
301    }
302}
303
304impl Display for TableIdent {
305    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
306        write!(f, "{}.{}", self.namespace, self.name)
307    }
308}
309
310/// TableCreation represents the creation of a table in the catalog.
311#[derive(Debug, TypedBuilder)]
312pub struct TableCreation {
313    /// The name of the table.
314    pub name: String,
315    /// The location of the table.
316    #[builder(default, setter(strip_option(fallback = location_opt)))]
317    pub location: Option<String>,
318    /// The schema of the table.
319    pub schema: Schema,
320    /// The partition spec of the table, could be None.
321    #[builder(default, setter(strip_option(fallback = partition_spec_opt), into))]
322    pub partition_spec: Option<UnboundPartitionSpec>,
323    /// The sort order of the table.
324    #[builder(default, setter(strip_option(fallback = sort_order_opt)))]
325    pub sort_order: Option<SortOrder>,
326    /// The properties of the table.
327    #[builder(default, setter(transform = |props: impl IntoIterator<Item=(String, String)>| {
328        props.into_iter().collect()
329    }))]
330    pub properties: HashMap<String, String>,
331    /// Format version of the table. Defaults to V2.
332    #[builder(default = FormatVersion::V2)]
333    pub format_version: FormatVersion,
334}
335
336/// TableCommit represents the commit of a table in the catalog.
337///
338/// The builder is marked as private since it's dangerous and error-prone to construct
339/// [`TableCommit`] directly.
340/// Users are supposed to use [`crate::transaction::Transaction`] to update table.
341#[derive(Debug, TypedBuilder)]
342#[builder(build_method(vis = "pub(crate)"))]
343pub struct TableCommit {
344    /// The table ident.
345    ident: TableIdent,
346    /// The requirements of the table.
347    ///
348    /// Commit will fail if the requirements are not met.
349    requirements: Vec<TableRequirement>,
350    /// The updates of the table.
351    updates: Vec<TableUpdate>,
352}
353
354impl TableCommit {
355    /// Return the table identifier.
356    pub fn identifier(&self) -> &TableIdent {
357        &self.ident
358    }
359
360    /// Take all requirements.
361    pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
362        take(&mut self.requirements)
363    }
364
365    /// Take all updates.
366    pub fn take_updates(&mut self) -> Vec<TableUpdate> {
367        take(&mut self.updates)
368    }
369
370    /// Applies this [`TableCommit`] to the given [`Table`] as part of a catalog update.
371    /// Typically used by [`Catalog::update_table`] to validate requirements and apply metadata updates.
372    ///
373    /// Returns a new [`Table`] with updated metadata,
374    /// or an error if validation or application fails.
375    pub fn apply(self, table: Table) -> Result<Table> {
376        // check requirements
377        for requirement in self.requirements {
378            requirement.check(Some(table.metadata()))?;
379        }
380
381        // get current metadata location
382        let current_metadata_location = table.metadata_location_result()?;
383
384        // apply updates to metadata builder
385        let mut metadata_builder = table
386            .metadata()
387            .clone()
388            .into_builder(Some(current_metadata_location.to_string()));
389        for update in self.updates {
390            metadata_builder = update.apply(metadata_builder)?;
391        }
392
393        // Build the new metadata
394        let new_metadata = metadata_builder.build()?.metadata;
395
396        let new_metadata_location = MetadataLocation::from_str(current_metadata_location)?
397            .with_next_version()
398            .with_new_metadata(&new_metadata)
399            .to_string();
400
401        Ok(table
402            .with_metadata(Arc::new(new_metadata))
403            .with_metadata_location(new_metadata_location))
404    }
405}
406
407/// TableRequirement represents a requirement for a table in the catalog.
408#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
409#[serde(tag = "type")]
410pub enum TableRequirement {
411    /// The table must not already exist; used for create transactions
412    #[serde(rename = "assert-create")]
413    NotExist,
414    /// The table UUID must match the requirement.
415    #[serde(rename = "assert-table-uuid")]
416    UuidMatch {
417        /// Uuid of original table.
418        uuid: Uuid,
419    },
420    /// The table branch or tag identified by the requirement's `reference` must
421    /// reference the requirement's `snapshot-id`.
422    #[serde(rename = "assert-ref-snapshot-id")]
423    RefSnapshotIdMatch {
424        /// The reference of the table to assert.
425        r#ref: String,
426        /// The snapshot id of the table to assert.
427        /// If the id is `None`, the ref must not already exist.
428        #[serde(rename = "snapshot-id")]
429        snapshot_id: Option<i64>,
430    },
431    /// The table's last assigned column id must match the requirement.
432    #[serde(rename = "assert-last-assigned-field-id")]
433    LastAssignedFieldIdMatch {
434        /// The last assigned field id of the table to assert.
435        #[serde(rename = "last-assigned-field-id")]
436        last_assigned_field_id: i32,
437    },
438    /// The table's current schema id must match the requirement.
439    #[serde(rename = "assert-current-schema-id")]
440    CurrentSchemaIdMatch {
441        /// Current schema id of the table to assert.
442        #[serde(rename = "current-schema-id")]
443        current_schema_id: SchemaId,
444    },
445    /// The table's last assigned partition id must match the
446    /// requirement.
447    #[serde(rename = "assert-last-assigned-partition-id")]
448    LastAssignedPartitionIdMatch {
449        /// Last assigned partition id of the table to assert.
450        #[serde(rename = "last-assigned-partition-id")]
451        last_assigned_partition_id: i32,
452    },
453    /// The table's default spec id must match the requirement.
454    #[serde(rename = "assert-default-spec-id")]
455    DefaultSpecIdMatch {
456        /// Default spec id of the table to assert.
457        #[serde(rename = "default-spec-id")]
458        default_spec_id: i32,
459    },
460    /// The table's default sort order id must match the requirement.
461    #[serde(rename = "assert-default-sort-order-id")]
462    DefaultSortOrderIdMatch {
463        /// Default sort order id of the table to assert.
464        #[serde(rename = "default-sort-order-id")]
465        default_sort_order_id: i64,
466    },
467}
468
469/// TableUpdate represents an update to a table in the catalog.
470#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
471#[serde(tag = "action", rename_all = "kebab-case")]
472#[allow(clippy::large_enum_variant)]
473pub enum TableUpdate {
474    /// Upgrade table's format version
475    #[serde(rename_all = "kebab-case")]
476    UpgradeFormatVersion {
477        /// Target format upgrade to.
478        format_version: FormatVersion,
479    },
480    /// Assign a new UUID to the table
481    #[serde(rename_all = "kebab-case")]
482    AssignUuid {
483        /// The new UUID to assign.
484        uuid: Uuid,
485    },
486    /// Add a new schema to the table
487    #[serde(rename_all = "kebab-case")]
488    AddSchema {
489        /// The schema to add.
490        schema: Schema,
491    },
492    /// Set table's current schema
493    #[serde(rename_all = "kebab-case")]
494    SetCurrentSchema {
495        /// Schema ID to set as current, or -1 to set last added schema
496        schema_id: i32,
497    },
498    /// Add a new partition spec to the table
499    AddSpec {
500        /// The partition spec to add.
501        spec: UnboundPartitionSpec,
502    },
503    /// Set table's default spec
504    #[serde(rename_all = "kebab-case")]
505    SetDefaultSpec {
506        /// Partition spec ID to set as the default, or -1 to set last added spec
507        spec_id: i32,
508    },
509    /// Add sort order to table.
510    #[serde(rename_all = "kebab-case")]
511    AddSortOrder {
512        /// Sort order to add.
513        sort_order: SortOrder,
514    },
515    /// Set table's default sort order
516    #[serde(rename_all = "kebab-case")]
517    SetDefaultSortOrder {
518        /// Sort order ID to set as the default, or -1 to set last added sort order
519        sort_order_id: i64,
520    },
521    /// Add snapshot to table.
522    #[serde(rename_all = "kebab-case")]
523    AddSnapshot {
524        /// Snapshot to add.
525        #[serde(
526            deserialize_with = "deserialize_snapshot",
527            serialize_with = "serialize_snapshot"
528        )]
529        snapshot: Snapshot,
530    },
531    /// Set table's snapshot ref.
532    #[serde(rename_all = "kebab-case")]
533    SetSnapshotRef {
534        /// Name of snapshot reference to set.
535        ref_name: String,
536        /// Snapshot reference to set.
537        #[serde(flatten)]
538        reference: SnapshotReference,
539    },
540    /// Remove table's snapshots
541    #[serde(rename_all = "kebab-case")]
542    RemoveSnapshots {
543        /// Snapshot ids to remove.
544        snapshot_ids: Vec<i64>,
545    },
546    /// Remove snapshot reference
547    #[serde(rename_all = "kebab-case")]
548    RemoveSnapshotRef {
549        /// Name of snapshot reference to remove.
550        ref_name: String,
551    },
552    /// Update table's location
553    SetLocation {
554        /// New location for table.
555        location: String,
556    },
557    /// Update table's properties
558    SetProperties {
559        /// Properties to update for table.
560        updates: HashMap<String, String>,
561    },
562    /// Remove table's properties
563    RemoveProperties {
564        /// Properties to remove
565        removals: Vec<String>,
566    },
567    /// Remove partition specs
568    #[serde(rename_all = "kebab-case")]
569    RemovePartitionSpecs {
570        /// Partition spec ids to remove.
571        spec_ids: Vec<i32>,
572    },
573    /// Set statistics for a snapshot
574    #[serde(with = "_serde_set_statistics")]
575    SetStatistics {
576        /// File containing the statistics
577        statistics: StatisticsFile,
578    },
579    /// Remove statistics for a snapshot
580    #[serde(rename_all = "kebab-case")]
581    RemoveStatistics {
582        /// Snapshot id to remove statistics for.
583        snapshot_id: i64,
584    },
585    /// Set partition statistics for a snapshot
586    #[serde(rename_all = "kebab-case")]
587    SetPartitionStatistics {
588        /// File containing the partition statistics
589        partition_statistics: PartitionStatisticsFile,
590    },
591    /// Remove partition statistics for a snapshot
592    #[serde(rename_all = "kebab-case")]
593    RemovePartitionStatistics {
594        /// Snapshot id to remove partition statistics for.
595        snapshot_id: i64,
596    },
597    /// Remove schemas
598    #[serde(rename_all = "kebab-case")]
599    RemoveSchemas {
600        /// Schema IDs to remove.
601        schema_ids: Vec<i32>,
602    },
603    /// Add an encryption key
604    #[serde(rename_all = "kebab-case")]
605    AddEncryptionKey {
606        /// The encryption key to add.
607        encryption_key: EncryptedKey,
608    },
609    /// Remove an encryption key
610    #[serde(rename_all = "kebab-case")]
611    RemoveEncryptionKey {
612        /// The id of the encryption key to remove.
613        key_id: String,
614    },
615}
616
617impl TableUpdate {
618    /// Applies the update to the table metadata builder.
619    pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
620        match self {
621            TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
622            TableUpdate::AddSchema { schema, .. } => Ok(builder.add_schema(schema)?),
623            TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id),
624            TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
625            TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id),
626            TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order),
627            TableUpdate::SetDefaultSortOrder { sort_order_id } => {
628                builder.set_default_sort_order(sort_order_id)
629            }
630            TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot),
631            TableUpdate::SetSnapshotRef {
632                ref_name,
633                reference,
634            } => builder.set_ref(&ref_name, reference),
635            TableUpdate::RemoveSnapshots { snapshot_ids } => {
636                Ok(builder.remove_snapshots(&snapshot_ids))
637            }
638            TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)),
639            TableUpdate::SetLocation { location } => Ok(builder.set_location(location)),
640            TableUpdate::SetProperties { updates } => builder.set_properties(updates),
641            TableUpdate::RemoveProperties { removals } => builder.remove_properties(&removals),
642            TableUpdate::UpgradeFormatVersion { format_version } => {
643                builder.upgrade_format_version(format_version)
644            }
645            TableUpdate::RemovePartitionSpecs { spec_ids } => {
646                builder.remove_partition_specs(&spec_ids)
647            }
648            TableUpdate::SetStatistics { statistics } => Ok(builder.set_statistics(statistics)),
649            TableUpdate::RemoveStatistics { snapshot_id } => {
650                Ok(builder.remove_statistics(snapshot_id))
651            }
652            TableUpdate::SetPartitionStatistics {
653                partition_statistics,
654            } => Ok(builder.set_partition_statistics(partition_statistics)),
655            TableUpdate::RemovePartitionStatistics { snapshot_id } => {
656                Ok(builder.remove_partition_statistics(snapshot_id))
657            }
658            TableUpdate::RemoveSchemas { schema_ids } => builder.remove_schemas(&schema_ids),
659            TableUpdate::AddEncryptionKey { encryption_key } => {
660                Ok(builder.add_encryption_key(encryption_key))
661            }
662            TableUpdate::RemoveEncryptionKey { key_id } => {
663                Ok(builder.remove_encryption_key(&key_id))
664            }
665        }
666    }
667}
668
669impl TableRequirement {
670    /// Check that the requirement is met by the table metadata.
671    /// If the requirement is not met, an appropriate error is returned.
672    ///
673    /// Provide metadata as `None` if the table does not exist.
674    pub fn check(&self, metadata: Option<&TableMetadata>) -> Result<()> {
675        if let Some(metadata) = metadata {
676            match self {
677                TableRequirement::NotExist => {
678                    return Err(Error::new(
679                        ErrorKind::CatalogCommitConflicts,
680                        format!(
681                            "Requirement failed: Table with id {} already exists",
682                            metadata.uuid()
683                        ),
684                    )
685                    .with_retryable(true));
686                }
687                TableRequirement::UuidMatch { uuid } => {
688                    if &metadata.uuid() != uuid {
689                        return Err(Error::new(
690                            ErrorKind::CatalogCommitConflicts,
691                            "Requirement failed: Table UUID does not match",
692                        )
693                        .with_context("expected", *uuid)
694                        .with_context("found", metadata.uuid())
695                        .with_retryable(true));
696                    }
697                }
698                TableRequirement::CurrentSchemaIdMatch { current_schema_id } => {
699                    // ToDo: Harmonize the types of current_schema_id
700                    if metadata.current_schema_id != *current_schema_id {
701                        return Err(Error::new(
702                            ErrorKind::CatalogCommitConflicts,
703                            "Requirement failed: Current schema id does not match",
704                        )
705                        .with_context("expected", current_schema_id.to_string())
706                        .with_context("found", metadata.current_schema_id.to_string())
707                        .with_retryable(true));
708                    }
709                }
710                TableRequirement::DefaultSortOrderIdMatch {
711                    default_sort_order_id,
712                } => {
713                    if metadata.default_sort_order().order_id != *default_sort_order_id {
714                        return Err(Error::new(
715                            ErrorKind::CatalogCommitConflicts,
716                            "Requirement failed: Default sort order id does not match",
717                        )
718                        .with_context("expected", default_sort_order_id.to_string())
719                        .with_context("found", metadata.default_sort_order().order_id.to_string())
720                        .with_retryable(true));
721                    }
722                }
723                TableRequirement::RefSnapshotIdMatch { r#ref, snapshot_id } => {
724                    let snapshot_ref = metadata.snapshot_for_ref(r#ref);
725                    if let Some(snapshot_id) = snapshot_id {
726                        let snapshot_ref = snapshot_ref.ok_or(
727                            Error::new(
728                                ErrorKind::CatalogCommitConflicts,
729                                format!("Requirement failed: Branch or tag `{ref}` not found"),
730                            )
731                            .with_retryable(true),
732                        )?;
733                        if snapshot_ref.snapshot_id() != *snapshot_id {
734                            return Err(Error::new(
735                                ErrorKind::CatalogCommitConflicts,
736                                format!(
737                                    "Requirement failed: Branch or tag `{ref}`'s snapshot has changed"
738                                ),
739                            )
740                            .with_context("expected", snapshot_id.to_string())
741                            .with_context("found", snapshot_ref.snapshot_id().to_string())
742                            .with_retryable(true));
743                        }
744                    } else if snapshot_ref.is_some() {
745                        // a null snapshot ID means the ref should not exist already
746                        return Err(Error::new(
747                            ErrorKind::CatalogCommitConflicts,
748                            format!("Requirement failed: Branch or tag `{ref}` already exists"),
749                        )
750                        .with_retryable(true));
751                    }
752                }
753                TableRequirement::DefaultSpecIdMatch { default_spec_id } => {
754                    // ToDo: Harmonize the types of default_spec_id
755                    if metadata.default_partition_spec_id() != *default_spec_id {
756                        return Err(Error::new(
757                            ErrorKind::CatalogCommitConflicts,
758                            "Requirement failed: Default partition spec id does not match",
759                        )
760                        .with_context("expected", default_spec_id.to_string())
761                        .with_context("found", metadata.default_partition_spec_id().to_string())
762                        .with_retryable(true));
763                    }
764                }
765                TableRequirement::LastAssignedPartitionIdMatch {
766                    last_assigned_partition_id,
767                } => {
768                    if metadata.last_partition_id != *last_assigned_partition_id {
769                        return Err(Error::new(
770                            ErrorKind::CatalogCommitConflicts,
771                            "Requirement failed: Last assigned partition id does not match",
772                        )
773                        .with_context("expected", last_assigned_partition_id.to_string())
774                        .with_context("found", metadata.last_partition_id.to_string())
775                        .with_retryable(true));
776                    }
777                }
778                TableRequirement::LastAssignedFieldIdMatch {
779                    last_assigned_field_id,
780                } => {
781                    if &metadata.last_column_id != last_assigned_field_id {
782                        return Err(Error::new(
783                            ErrorKind::CatalogCommitConflicts,
784                            "Requirement failed: Last assigned field id does not match",
785                        )
786                        .with_context("expected", last_assigned_field_id.to_string())
787                        .with_context("found", metadata.last_column_id.to_string())
788                        .with_retryable(true));
789                    }
790                }
791            };
792        } else {
793            match self {
794                TableRequirement::NotExist => {}
795                _ => {
796                    return Err(Error::new(
797                        ErrorKind::TableNotFound,
798                        "Requirement failed: Table does not exist",
799                    ));
800                }
801            }
802        }
803
804        Ok(())
805    }
806}
807
808pub(super) mod _serde {
809    use serde::{Deserialize as _, Deserializer, Serialize as _};
810
811    use super::*;
812    use crate::spec::{SchemaId, Summary};
813
814    pub(super) fn deserialize_snapshot<'de, D>(
815        deserializer: D,
816    ) -> std::result::Result<Snapshot, D::Error>
817    where D: Deserializer<'de> {
818        let buf = CatalogSnapshot::deserialize(deserializer)?;
819        Ok(buf.into())
820    }
821
822    pub(super) fn serialize_snapshot<S>(
823        snapshot: &Snapshot,
824        serializer: S,
825    ) -> std::result::Result<S::Ok, S::Error>
826    where
827        S: serde::Serializer,
828    {
829        let buf: CatalogSnapshot = snapshot.clone().into();
830        buf.serialize(serializer)
831    }
832
833    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
834    #[serde(rename_all = "kebab-case")]
835    /// Defines the structure of a v2 snapshot for the catalog.
836    /// Main difference to SnapshotV2 is that sequence-number is optional
837    /// in the rest catalog spec to allow for backwards compatibility with v1.
838    struct CatalogSnapshot {
839        snapshot_id: i64,
840        #[serde(skip_serializing_if = "Option::is_none")]
841        parent_snapshot_id: Option<i64>,
842        #[serde(default)]
843        sequence_number: i64,
844        timestamp_ms: i64,
845        manifest_list: String,
846        summary: Summary,
847        #[serde(skip_serializing_if = "Option::is_none")]
848        schema_id: Option<SchemaId>,
849        #[serde(skip_serializing_if = "Option::is_none")]
850        first_row_id: Option<u64>,
851        #[serde(skip_serializing_if = "Option::is_none")]
852        added_rows: Option<u64>,
853        #[serde(skip_serializing_if = "Option::is_none")]
854        key_id: Option<String>,
855    }
856
857    impl From<CatalogSnapshot> for Snapshot {
858        fn from(snapshot: CatalogSnapshot) -> Self {
859            let CatalogSnapshot {
860                snapshot_id,
861                parent_snapshot_id,
862                sequence_number,
863                timestamp_ms,
864                manifest_list,
865                schema_id,
866                summary,
867                first_row_id,
868                added_rows,
869                key_id,
870            } = snapshot;
871            let builder = Snapshot::builder()
872                .with_snapshot_id(snapshot_id)
873                .with_parent_snapshot_id(parent_snapshot_id)
874                .with_sequence_number(sequence_number)
875                .with_timestamp_ms(timestamp_ms)
876                .with_manifest_list(manifest_list)
877                .with_summary(summary)
878                .with_encryption_key_id(key_id);
879            let row_range = first_row_id.zip(added_rows);
880            match (schema_id, row_range) {
881                (None, None) => builder.build(),
882                (Some(schema_id), None) => builder.with_schema_id(schema_id).build(),
883                (None, Some((first_row_id, last_row_id))) => {
884                    builder.with_row_range(first_row_id, last_row_id).build()
885                }
886                (Some(schema_id), Some((first_row_id, last_row_id))) => builder
887                    .with_schema_id(schema_id)
888                    .with_row_range(first_row_id, last_row_id)
889                    .build(),
890            }
891        }
892    }
893
894    impl From<Snapshot> for CatalogSnapshot {
895        fn from(snapshot: Snapshot) -> Self {
896            let first_row_id = snapshot.first_row_id();
897            let added_rows = snapshot.added_rows_count();
898            let Snapshot {
899                snapshot_id,
900                parent_snapshot_id,
901                sequence_number,
902                timestamp_ms,
903                manifest_list,
904                summary,
905                schema_id,
906                row_range: _,
907                encryption_key_id: key_id,
908            } = snapshot;
909            CatalogSnapshot {
910                snapshot_id,
911                parent_snapshot_id,
912                sequence_number,
913                timestamp_ms,
914                manifest_list,
915                summary,
916                schema_id,
917                first_row_id,
918                added_rows,
919                key_id,
920            }
921        }
922    }
923}
924
925/// ViewCreation represents the creation of a view in the catalog.
926#[derive(Debug, TypedBuilder)]
927pub struct ViewCreation {
928    /// The name of the view.
929    pub name: String,
930    /// The view's base location; used to create metadata file locations
931    pub location: String,
932    /// Representations for the view.
933    pub representations: ViewRepresentations,
934    /// The schema of the view.
935    pub schema: Schema,
936    /// The properties of the view.
937    #[builder(default)]
938    pub properties: HashMap<String, String>,
939    /// The default namespace to use when a reference in the SELECT is a single identifier
940    pub default_namespace: NamespaceIdent,
941    /// Default catalog to use when a reference in the SELECT does not contain a catalog
942    #[builder(default)]
943    pub default_catalog: Option<String>,
944    /// A string to string map of summary metadata about the version
945    /// Typical keys are "engine-name" and "engine-version"
946    #[builder(default)]
947    pub summary: HashMap<String, String>,
948}
949
950/// ViewUpdate represents an update to a view in the catalog.
951#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
952#[serde(tag = "action", rename_all = "kebab-case")]
953#[allow(clippy::large_enum_variant)]
954pub enum ViewUpdate {
955    /// Assign a new UUID to the view
956    #[serde(rename_all = "kebab-case")]
957    AssignUuid {
958        /// The new UUID to assign.
959        uuid: Uuid,
960    },
961    /// Upgrade view's format version
962    #[serde(rename_all = "kebab-case")]
963    UpgradeFormatVersion {
964        /// Target format upgrade to.
965        format_version: ViewFormatVersion,
966    },
967    /// Add a new schema to the view
968    #[serde(rename_all = "kebab-case")]
969    AddSchema {
970        /// The schema to add.
971        schema: Schema,
972        /// The last column id of the view.
973        last_column_id: Option<i32>,
974    },
975    /// Set view's current schema
976    #[serde(rename_all = "kebab-case")]
977    SetLocation {
978        /// New location for view.
979        location: String,
980    },
981    /// Set view's properties
982    ///
983    /// Matching keys are updated, and non-matching keys are left unchanged.
984    #[serde(rename_all = "kebab-case")]
985    SetProperties {
986        /// Properties to update for view.
987        updates: HashMap<String, String>,
988    },
989    /// Remove view's properties
990    #[serde(rename_all = "kebab-case")]
991    RemoveProperties {
992        /// Properties to remove
993        removals: Vec<String>,
994    },
995    /// Add a new version to the view
996    #[serde(rename_all = "kebab-case")]
997    AddViewVersion {
998        /// The view version to add.
999        view_version: ViewVersion,
1000    },
1001    /// Set view's current version
1002    #[serde(rename_all = "kebab-case")]
1003    SetCurrentViewVersion {
1004        /// View version id to set as current, or -1 to set last added version
1005        view_version_id: i32,
1006    },
1007}
1008
1009mod _serde_set_statistics {
1010    // The rest spec requires an additional field `snapshot-id`
1011    // that is redundant with the `snapshot_id` field in the statistics file.
1012    use serde::{Deserialize, Deserializer, Serialize, Serializer};
1013
1014    use super::*;
1015
1016    #[derive(Debug, Serialize, Deserialize)]
1017    #[serde(rename_all = "kebab-case")]
1018    struct SetStatistics {
1019        snapshot_id: Option<i64>,
1020        statistics: StatisticsFile,
1021    }
1022
1023    pub fn serialize<S>(
1024        value: &StatisticsFile,
1025        serializer: S,
1026    ) -> std::result::Result<S::Ok, S::Error>
1027    where
1028        S: Serializer,
1029    {
1030        SetStatistics {
1031            snapshot_id: Some(value.snapshot_id),
1032            statistics: value.clone(),
1033        }
1034        .serialize(serializer)
1035    }
1036
1037    pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<StatisticsFile, D::Error>
1038    where D: Deserializer<'de> {
1039        let SetStatistics {
1040            snapshot_id,
1041            statistics,
1042        } = SetStatistics::deserialize(deserializer)?;
1043        if let Some(snapshot_id) = snapshot_id
1044            && snapshot_id != statistics.snapshot_id
1045        {
1046            return Err(serde::de::Error::custom(format!(
1047                "Snapshot id to set {snapshot_id} does not match the statistics file snapshot id {}",
1048                statistics.snapshot_id
1049            )));
1050        }
1051
1052        Ok(statistics)
1053    }
1054}
1055
1056#[cfg(test)]
1057mod tests {
1058    use std::collections::HashMap;
1059    use std::fmt::Debug;
1060    use std::fs::File;
1061    use std::io::BufReader;
1062
1063    use base64::Engine as _;
1064    use serde::Serialize;
1065    use serde::de::DeserializeOwned;
1066    use uuid::uuid;
1067
1068    use super::ViewUpdate;
1069    use crate::io::FileIO;
1070    use crate::spec::{
1071        BlobMetadata, EncryptedKey, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
1072        PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, SnapshotReference,
1073        SnapshotRetention, SortDirection, SortField, SortOrder, SqlViewRepresentation,
1074        StatisticsFile, Summary, TableMetadata, TableMetadataBuilder, Transform, Type,
1075        UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations,
1076        ViewVersion,
1077    };
1078    use crate::table::Table;
1079    use crate::{
1080        NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, TableUpdate,
1081    };
1082
1083    #[test]
1084    fn test_parent_namespace() {
1085        let ns1 = NamespaceIdent::from_strs(vec!["ns1"]).unwrap();
1086        let ns2 = NamespaceIdent::from_strs(vec!["ns1", "ns2"]).unwrap();
1087        let ns3 = NamespaceIdent::from_strs(vec!["ns1", "ns2", "ns3"]).unwrap();
1088
1089        assert_eq!(ns1.parent(), None);
1090        assert_eq!(ns2.parent(), Some(ns1.clone()));
1091        assert_eq!(ns3.parent(), Some(ns2.clone()));
1092    }
1093
1094    #[test]
1095    fn test_create_table_id() {
1096        let table_id = TableIdent {
1097            namespace: NamespaceIdent::from_strs(vec!["ns1"]).unwrap(),
1098            name: "t1".to_string(),
1099        };
1100
1101        assert_eq!(table_id, TableIdent::from_strs(vec!["ns1", "t1"]).unwrap());
1102    }
1103
1104    #[test]
1105    fn test_table_creation_iterator_properties() {
1106        let builder = TableCreation::builder()
1107            .name("table".to_string())
1108            .schema(Schema::builder().build().unwrap());
1109
1110        fn s(k: &str, v: &str) -> (String, String) {
1111            (k.to_string(), v.to_string())
1112        }
1113
1114        let table_creation = builder
1115            .properties([s("key", "value"), s("foo", "bar")])
1116            .build();
1117
1118        assert_eq!(
1119            HashMap::from([s("key", "value"), s("foo", "bar")]),
1120            table_creation.properties
1121        );
1122    }
1123
1124    fn test_serde_json<T: Serialize + DeserializeOwned + PartialEq + Debug>(
1125        json: impl ToString,
1126        expected: T,
1127    ) {
1128        let json_str = json.to_string();
1129        let actual: T = serde_json::from_str(&json_str).expect("Failed to parse from json");
1130        assert_eq!(actual, expected, "Parsed value is not equal to expected");
1131
1132        let restored: T = serde_json::from_str(
1133            &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1134        )
1135        .expect("Failed to parse from serialized json");
1136
1137        assert_eq!(
1138            restored, expected,
1139            "Parsed restored value is not equal to expected"
1140        );
1141    }
1142
1143    fn metadata() -> TableMetadata {
1144        let tbl_creation = TableCreation::builder()
1145            .name("table".to_string())
1146            .location("/path/to/table".to_string())
1147            .schema(Schema::builder().build().unwrap())
1148            .build();
1149
1150        TableMetadataBuilder::from_table_creation(tbl_creation)
1151            .unwrap()
1152            .assign_uuid(uuid::Uuid::nil())
1153            .build()
1154            .unwrap()
1155            .metadata
1156    }
1157
1158    #[test]
1159    fn test_check_requirement_not_exist() {
1160        let metadata = metadata();
1161        let requirement = TableRequirement::NotExist;
1162
1163        assert!(requirement.check(Some(&metadata)).is_err());
1164        assert!(requirement.check(None).is_ok());
1165    }
1166
1167    #[test]
1168    fn test_check_table_uuid() {
1169        let metadata = metadata();
1170
1171        let requirement = TableRequirement::UuidMatch {
1172            uuid: uuid::Uuid::now_v7(),
1173        };
1174        assert!(requirement.check(Some(&metadata)).is_err());
1175
1176        let requirement = TableRequirement::UuidMatch {
1177            uuid: uuid::Uuid::nil(),
1178        };
1179        assert!(requirement.check(Some(&metadata)).is_ok());
1180    }
1181
1182    #[test]
1183    fn test_check_ref_snapshot_id() {
1184        let metadata = metadata();
1185
1186        // Ref does not exist but should
1187        let requirement = TableRequirement::RefSnapshotIdMatch {
1188            r#ref: "my_branch".to_string(),
1189            snapshot_id: Some(1),
1190        };
1191        assert!(requirement.check(Some(&metadata)).is_err());
1192
1193        // Ref does not exist and should not
1194        let requirement = TableRequirement::RefSnapshotIdMatch {
1195            r#ref: "my_branch".to_string(),
1196            snapshot_id: None,
1197        };
1198        assert!(requirement.check(Some(&metadata)).is_ok());
1199
1200        // Add snapshot
1201        let snapshot = Snapshot::builder()
1202            .with_snapshot_id(3051729675574597004)
1203            .with_sequence_number(10)
1204            .with_timestamp_ms(9992191116217)
1205            .with_manifest_list("s3://b/wh/.../s1.avro".to_string())
1206            .with_schema_id(0)
1207            .with_summary(Summary {
1208                operation: Operation::Append,
1209                additional_properties: HashMap::new(),
1210            })
1211            .build();
1212
1213        let builder = metadata.into_builder(None);
1214        let builder = TableUpdate::AddSnapshot {
1215            snapshot: snapshot.clone(),
1216        }
1217        .apply(builder)
1218        .unwrap();
1219        let metadata = TableUpdate::SetSnapshotRef {
1220            ref_name: MAIN_BRANCH.to_string(),
1221            reference: SnapshotReference {
1222                snapshot_id: snapshot.snapshot_id(),
1223                retention: SnapshotRetention::Branch {
1224                    min_snapshots_to_keep: Some(10),
1225                    max_snapshot_age_ms: None,
1226                    max_ref_age_ms: None,
1227                },
1228            },
1229        }
1230        .apply(builder)
1231        .unwrap()
1232        .build()
1233        .unwrap()
1234        .metadata;
1235
1236        // Ref exists and should match
1237        let requirement = TableRequirement::RefSnapshotIdMatch {
1238            r#ref: "main".to_string(),
1239            snapshot_id: Some(3051729675574597004),
1240        };
1241        assert!(requirement.check(Some(&metadata)).is_ok());
1242
1243        // Ref exists but does not match
1244        let requirement = TableRequirement::RefSnapshotIdMatch {
1245            r#ref: "main".to_string(),
1246            snapshot_id: Some(1),
1247        };
1248        assert!(requirement.check(Some(&metadata)).is_err());
1249    }
1250
1251    #[test]
1252    fn test_check_last_assigned_field_id() {
1253        let metadata = metadata();
1254
1255        let requirement = TableRequirement::LastAssignedFieldIdMatch {
1256            last_assigned_field_id: 1,
1257        };
1258        assert!(requirement.check(Some(&metadata)).is_err());
1259
1260        let requirement = TableRequirement::LastAssignedFieldIdMatch {
1261            last_assigned_field_id: 0,
1262        };
1263        assert!(requirement.check(Some(&metadata)).is_ok());
1264    }
1265
1266    #[test]
1267    fn test_check_current_schema_id() {
1268        let metadata = metadata();
1269
1270        let requirement = TableRequirement::CurrentSchemaIdMatch {
1271            current_schema_id: 1,
1272        };
1273        assert!(requirement.check(Some(&metadata)).is_err());
1274
1275        let requirement = TableRequirement::CurrentSchemaIdMatch {
1276            current_schema_id: 0,
1277        };
1278        assert!(requirement.check(Some(&metadata)).is_ok());
1279    }
1280
1281    #[test]
1282    fn test_check_last_assigned_partition_id() {
1283        let metadata = metadata();
1284        let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1285            last_assigned_partition_id: 0,
1286        };
1287        assert!(requirement.check(Some(&metadata)).is_err());
1288
1289        let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1290            last_assigned_partition_id: 999,
1291        };
1292        assert!(requirement.check(Some(&metadata)).is_ok());
1293    }
1294
1295    #[test]
1296    fn test_check_default_spec_id() {
1297        let metadata = metadata();
1298
1299        let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 1 };
1300        assert!(requirement.check(Some(&metadata)).is_err());
1301
1302        let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 0 };
1303        assert!(requirement.check(Some(&metadata)).is_ok());
1304    }
1305
1306    #[test]
1307    fn test_check_default_sort_order_id() {
1308        let metadata = metadata();
1309
1310        let requirement = TableRequirement::DefaultSortOrderIdMatch {
1311            default_sort_order_id: 1,
1312        };
1313        assert!(requirement.check(Some(&metadata)).is_err());
1314
1315        let requirement = TableRequirement::DefaultSortOrderIdMatch {
1316            default_sort_order_id: 0,
1317        };
1318        assert!(requirement.check(Some(&metadata)).is_ok());
1319    }
1320
1321    #[test]
1322    fn test_table_uuid() {
1323        test_serde_json(
1324            r#"
1325{
1326    "type": "assert-table-uuid",
1327    "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1328}
1329        "#,
1330            TableRequirement::UuidMatch {
1331                uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1332            },
1333        );
1334    }
1335
1336    #[test]
1337    fn test_assert_table_not_exists() {
1338        test_serde_json(
1339            r#"
1340{
1341    "type": "assert-create"
1342}
1343        "#,
1344            TableRequirement::NotExist,
1345        );
1346    }
1347
1348    #[test]
1349    fn test_assert_ref_snapshot_id() {
1350        test_serde_json(
1351            r#"
1352{
1353    "type": "assert-ref-snapshot-id",
1354    "ref": "snapshot-name",
1355    "snapshot-id": null
1356}
1357        "#,
1358            TableRequirement::RefSnapshotIdMatch {
1359                r#ref: "snapshot-name".to_string(),
1360                snapshot_id: None,
1361            },
1362        );
1363
1364        test_serde_json(
1365            r#"
1366{
1367    "type": "assert-ref-snapshot-id",
1368    "ref": "snapshot-name",
1369    "snapshot-id": 1
1370}
1371        "#,
1372            TableRequirement::RefSnapshotIdMatch {
1373                r#ref: "snapshot-name".to_string(),
1374                snapshot_id: Some(1),
1375            },
1376        );
1377    }
1378
1379    #[test]
1380    fn test_assert_last_assigned_field_id() {
1381        test_serde_json(
1382            r#"
1383{
1384    "type": "assert-last-assigned-field-id",
1385    "last-assigned-field-id": 12
1386}
1387        "#,
1388            TableRequirement::LastAssignedFieldIdMatch {
1389                last_assigned_field_id: 12,
1390            },
1391        );
1392    }
1393
1394    #[test]
1395    fn test_assert_current_schema_id() {
1396        test_serde_json(
1397            r#"
1398{
1399    "type": "assert-current-schema-id",
1400    "current-schema-id": 4
1401}
1402        "#,
1403            TableRequirement::CurrentSchemaIdMatch {
1404                current_schema_id: 4,
1405            },
1406        );
1407    }
1408
1409    #[test]
1410    fn test_assert_last_assigned_partition_id() {
1411        test_serde_json(
1412            r#"
1413{
1414    "type": "assert-last-assigned-partition-id",
1415    "last-assigned-partition-id": 1004
1416}
1417        "#,
1418            TableRequirement::LastAssignedPartitionIdMatch {
1419                last_assigned_partition_id: 1004,
1420            },
1421        );
1422    }
1423
1424    #[test]
1425    fn test_assert_default_spec_id() {
1426        test_serde_json(
1427            r#"
1428{
1429    "type": "assert-default-spec-id",
1430    "default-spec-id": 5
1431}
1432        "#,
1433            TableRequirement::DefaultSpecIdMatch { default_spec_id: 5 },
1434        );
1435    }
1436
1437    #[test]
1438    fn test_assert_default_sort_order() {
1439        let json = r#"
1440{
1441    "type": "assert-default-sort-order-id",
1442    "default-sort-order-id": 10
1443}
1444        "#;
1445
1446        let update = TableRequirement::DefaultSortOrderIdMatch {
1447            default_sort_order_id: 10,
1448        };
1449
1450        test_serde_json(json, update);
1451    }
1452
1453    #[test]
1454    fn test_parse_assert_invalid() {
1455        assert!(
1456            serde_json::from_str::<TableRequirement>(
1457                r#"
1458{
1459    "default-sort-order-id": 10
1460}
1461"#
1462            )
1463            .is_err(),
1464            "Table requirements should not be parsed without type."
1465        );
1466    }
1467
1468    #[test]
1469    fn test_assign_uuid() {
1470        test_serde_json(
1471            r#"
1472{
1473    "action": "assign-uuid",
1474    "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1475}
1476        "#,
1477            TableUpdate::AssignUuid {
1478                uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1479            },
1480        );
1481    }
1482
1483    #[test]
1484    fn test_upgrade_format_version() {
1485        test_serde_json(
1486            r#"
1487{
1488    "action": "upgrade-format-version",
1489    "format-version": 2
1490}
1491        "#,
1492            TableUpdate::UpgradeFormatVersion {
1493                format_version: FormatVersion::V2,
1494            },
1495        );
1496    }
1497
1498    #[test]
1499    fn test_add_schema() {
1500        let test_schema = Schema::builder()
1501            .with_schema_id(1)
1502            .with_identifier_field_ids(vec![2])
1503            .with_fields(vec![
1504                NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1505                NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1506                NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1507            ])
1508            .build()
1509            .unwrap();
1510        test_serde_json(
1511            r#"
1512{
1513    "action": "add-schema",
1514    "schema": {
1515        "type": "struct",
1516        "schema-id": 1,
1517        "fields": [
1518            {
1519                "id": 1,
1520                "name": "foo",
1521                "required": false,
1522                "type": "string"
1523            },
1524            {
1525                "id": 2,
1526                "name": "bar",
1527                "required": true,
1528                "type": "int"
1529            },
1530            {
1531                "id": 3,
1532                "name": "baz",
1533                "required": false,
1534                "type": "boolean"
1535            }
1536        ],
1537        "identifier-field-ids": [
1538            2
1539        ]
1540    },
1541    "last-column-id": 3
1542}
1543        "#,
1544            TableUpdate::AddSchema {
1545                schema: test_schema.clone(),
1546            },
1547        );
1548
1549        test_serde_json(
1550            r#"
1551{
1552    "action": "add-schema",
1553    "schema": {
1554        "type": "struct",
1555        "schema-id": 1,
1556        "fields": [
1557            {
1558                "id": 1,
1559                "name": "foo",
1560                "required": false,
1561                "type": "string"
1562            },
1563            {
1564                "id": 2,
1565                "name": "bar",
1566                "required": true,
1567                "type": "int"
1568            },
1569            {
1570                "id": 3,
1571                "name": "baz",
1572                "required": false,
1573                "type": "boolean"
1574            }
1575        ],
1576        "identifier-field-ids": [
1577            2
1578        ]
1579    }
1580}
1581        "#,
1582            TableUpdate::AddSchema {
1583                schema: test_schema.clone(),
1584            },
1585        );
1586    }
1587
1588    #[test]
1589    fn test_set_current_schema() {
1590        test_serde_json(
1591            r#"
1592{
1593   "action": "set-current-schema",
1594   "schema-id": 23
1595}
1596        "#,
1597            TableUpdate::SetCurrentSchema { schema_id: 23 },
1598        );
1599    }
1600
1601    #[test]
1602    fn test_add_spec() {
1603        test_serde_json(
1604            r#"
1605{
1606    "action": "add-spec",
1607    "spec": {
1608        "fields": [
1609            {
1610                "source-id": 4,
1611                "name": "ts_day",
1612                "transform": "day"
1613            },
1614            {
1615                "source-id": 1,
1616                "name": "id_bucket",
1617                "transform": "bucket[16]"
1618            },
1619            {
1620                "source-id": 2,
1621                "name": "id_truncate",
1622                "transform": "truncate[4]"
1623            }
1624        ]
1625    }
1626}
1627        "#,
1628            TableUpdate::AddSpec {
1629                spec: UnboundPartitionSpec::builder()
1630                    .add_partition_field(4, "ts_day".to_string(), Transform::Day)
1631                    .unwrap()
1632                    .add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16))
1633                    .unwrap()
1634                    .add_partition_field(2, "id_truncate".to_string(), Transform::Truncate(4))
1635                    .unwrap()
1636                    .build(),
1637            },
1638        );
1639    }
1640
1641    #[test]
1642    fn test_set_default_spec() {
1643        test_serde_json(
1644            r#"
1645{
1646    "action": "set-default-spec",
1647    "spec-id": 1
1648}
1649        "#,
1650            TableUpdate::SetDefaultSpec { spec_id: 1 },
1651        )
1652    }
1653
1654    #[test]
1655    fn test_add_sort_order() {
1656        let json = r#"
1657{
1658    "action": "add-sort-order",
1659    "sort-order": {
1660        "order-id": 1,
1661        "fields": [
1662            {
1663                "transform": "identity",
1664                "source-id": 2,
1665                "direction": "asc",
1666                "null-order": "nulls-first"
1667            },
1668            {
1669                "transform": "bucket[4]",
1670                "source-id": 3,
1671                "direction": "desc",
1672                "null-order": "nulls-last"
1673            }
1674        ]
1675    }
1676}
1677        "#;
1678
1679        let update = TableUpdate::AddSortOrder {
1680            sort_order: SortOrder::builder()
1681                .with_order_id(1)
1682                .with_sort_field(
1683                    SortField::builder()
1684                        .source_id(2)
1685                        .direction(SortDirection::Ascending)
1686                        .null_order(NullOrder::First)
1687                        .transform(Transform::Identity)
1688                        .build(),
1689                )
1690                .with_sort_field(
1691                    SortField::builder()
1692                        .source_id(3)
1693                        .direction(SortDirection::Descending)
1694                        .null_order(NullOrder::Last)
1695                        .transform(Transform::Bucket(4))
1696                        .build(),
1697                )
1698                .build_unbound()
1699                .unwrap(),
1700        };
1701
1702        test_serde_json(json, update);
1703    }
1704
1705    #[test]
1706    fn test_set_default_order() {
1707        let json = r#"
1708{
1709    "action": "set-default-sort-order",
1710    "sort-order-id": 2
1711}
1712        "#;
1713        let update = TableUpdate::SetDefaultSortOrder { sort_order_id: 2 };
1714
1715        test_serde_json(json, update);
1716    }
1717
1718    #[test]
1719    fn test_add_snapshot() {
1720        let json = r#"
1721{
1722    "action": "add-snapshot",
1723    "snapshot": {
1724        "snapshot-id": 3055729675574597000,
1725        "parent-snapshot-id": 3051729675574597000,
1726        "timestamp-ms": 1555100955770,
1727        "sequence-number": 1,
1728        "summary": {
1729            "operation": "append"
1730        },
1731        "manifest-list": "s3://a/b/2.avro",
1732        "schema-id": 1
1733    }
1734}
1735        "#;
1736
1737        let update = TableUpdate::AddSnapshot {
1738            snapshot: Snapshot::builder()
1739                .with_snapshot_id(3055729675574597000)
1740                .with_parent_snapshot_id(Some(3051729675574597000))
1741                .with_timestamp_ms(1555100955770)
1742                .with_sequence_number(1)
1743                .with_manifest_list("s3://a/b/2.avro")
1744                .with_schema_id(1)
1745                .with_summary(Summary {
1746                    operation: Operation::Append,
1747                    additional_properties: HashMap::default(),
1748                })
1749                .build(),
1750        };
1751
1752        test_serde_json(json, update);
1753    }
1754
1755    #[test]
1756    fn test_add_snapshot_v1() {
1757        let json = r#"
1758{
1759    "action": "add-snapshot",
1760    "snapshot": {
1761        "snapshot-id": 3055729675574597000,
1762        "parent-snapshot-id": 3051729675574597000,
1763        "timestamp-ms": 1555100955770,
1764        "summary": {
1765            "operation": "append"
1766        },
1767        "manifest-list": "s3://a/b/2.avro"
1768    }
1769}
1770    "#;
1771
1772        let update = TableUpdate::AddSnapshot {
1773            snapshot: Snapshot::builder()
1774                .with_snapshot_id(3055729675574597000)
1775                .with_parent_snapshot_id(Some(3051729675574597000))
1776                .with_timestamp_ms(1555100955770)
1777                .with_sequence_number(0)
1778                .with_manifest_list("s3://a/b/2.avro")
1779                .with_summary(Summary {
1780                    operation: Operation::Append,
1781                    additional_properties: HashMap::default(),
1782                })
1783                .build(),
1784        };
1785
1786        let actual: TableUpdate = serde_json::from_str(json).expect("Failed to parse from json");
1787        assert_eq!(actual, update, "Parsed value is not equal to expected");
1788    }
1789
1790    #[test]
1791    fn test_add_snapshot_v3() {
1792        let json = serde_json::json!(
1793        {
1794            "action": "add-snapshot",
1795            "snapshot": {
1796                "snapshot-id": 3055729675574597000i64,
1797                "parent-snapshot-id": 3051729675574597000i64,
1798                "timestamp-ms": 1555100955770i64,
1799                "first-row-id":0,
1800                "added-rows":2,
1801                "key-id":"key123",
1802                "summary": {
1803                    "operation": "append"
1804                },
1805                "manifest-list": "s3://a/b/2.avro"
1806            }
1807        });
1808
1809        let update = TableUpdate::AddSnapshot {
1810            snapshot: Snapshot::builder()
1811                .with_snapshot_id(3055729675574597000)
1812                .with_parent_snapshot_id(Some(3051729675574597000))
1813                .with_timestamp_ms(1555100955770)
1814                .with_sequence_number(0)
1815                .with_manifest_list("s3://a/b/2.avro")
1816                .with_row_range(0, 2)
1817                .with_encryption_key_id(Some("key123".to_string()))
1818                .with_summary(Summary {
1819                    operation: Operation::Append,
1820                    additional_properties: HashMap::default(),
1821                })
1822                .build(),
1823        };
1824
1825        let actual: TableUpdate = serde_json::from_value(json).expect("Failed to parse from json");
1826        assert_eq!(actual, update, "Parsed value is not equal to expected");
1827        let restored: TableUpdate = serde_json::from_str(
1828            &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1829        )
1830        .expect("Failed to parse from serialized json");
1831        assert_eq!(restored, update);
1832    }
1833
1834    #[test]
1835    fn test_remove_snapshots() {
1836        let json = r#"
1837{
1838    "action": "remove-snapshots",
1839    "snapshot-ids": [
1840        1,
1841        2
1842    ]
1843}
1844        "#;
1845
1846        let update = TableUpdate::RemoveSnapshots {
1847            snapshot_ids: vec![1, 2],
1848        };
1849        test_serde_json(json, update);
1850    }
1851
1852    #[test]
1853    fn test_remove_snapshot_ref() {
1854        let json = r#"
1855{
1856    "action": "remove-snapshot-ref",
1857    "ref-name": "snapshot-ref"
1858}
1859        "#;
1860
1861        let update = TableUpdate::RemoveSnapshotRef {
1862            ref_name: "snapshot-ref".to_string(),
1863        };
1864        test_serde_json(json, update);
1865    }
1866
1867    #[test]
1868    fn test_set_snapshot_ref_tag() {
1869        let json = r#"
1870{
1871    "action": "set-snapshot-ref",
1872    "type": "tag",
1873    "ref-name": "hank",
1874    "snapshot-id": 1,
1875    "max-ref-age-ms": 1
1876}
1877        "#;
1878
1879        let update = TableUpdate::SetSnapshotRef {
1880            ref_name: "hank".to_string(),
1881            reference: SnapshotReference {
1882                snapshot_id: 1,
1883                retention: SnapshotRetention::Tag {
1884                    max_ref_age_ms: Some(1),
1885                },
1886            },
1887        };
1888
1889        test_serde_json(json, update);
1890    }
1891
1892    #[test]
1893    fn test_set_snapshot_ref_branch() {
1894        let json = r#"
1895{
1896    "action": "set-snapshot-ref",
1897    "type": "branch",
1898    "ref-name": "hank",
1899    "snapshot-id": 1,
1900    "min-snapshots-to-keep": 2,
1901    "max-snapshot-age-ms": 3,
1902    "max-ref-age-ms": 4
1903}
1904        "#;
1905
1906        let update = TableUpdate::SetSnapshotRef {
1907            ref_name: "hank".to_string(),
1908            reference: SnapshotReference {
1909                snapshot_id: 1,
1910                retention: SnapshotRetention::Branch {
1911                    min_snapshots_to_keep: Some(2),
1912                    max_snapshot_age_ms: Some(3),
1913                    max_ref_age_ms: Some(4),
1914                },
1915            },
1916        };
1917
1918        test_serde_json(json, update);
1919    }
1920
1921    #[test]
1922    fn test_set_properties() {
1923        let json = r#"
1924{
1925    "action": "set-properties",
1926    "updates": {
1927        "prop1": "v1",
1928        "prop2": "v2"
1929    }
1930}
1931        "#;
1932
1933        let update = TableUpdate::SetProperties {
1934            updates: vec![
1935                ("prop1".to_string(), "v1".to_string()),
1936                ("prop2".to_string(), "v2".to_string()),
1937            ]
1938            .into_iter()
1939            .collect(),
1940        };
1941
1942        test_serde_json(json, update);
1943    }
1944
1945    #[test]
1946    fn test_remove_properties() {
1947        let json = r#"
1948{
1949    "action": "remove-properties",
1950    "removals": [
1951        "prop1",
1952        "prop2"
1953    ]
1954}
1955        "#;
1956
1957        let update = TableUpdate::RemoveProperties {
1958            removals: vec!["prop1".to_string(), "prop2".to_string()],
1959        };
1960
1961        test_serde_json(json, update);
1962    }
1963
1964    #[test]
1965    fn test_set_location() {
1966        let json = r#"
1967{
1968    "action": "set-location",
1969    "location": "s3://bucket/warehouse/tbl_location"
1970}
1971    "#;
1972
1973        let update = TableUpdate::SetLocation {
1974            location: "s3://bucket/warehouse/tbl_location".to_string(),
1975        };
1976
1977        test_serde_json(json, update);
1978    }
1979
1980    #[test]
1981    fn test_table_update_apply() {
1982        let table_creation = TableCreation::builder()
1983            .location("s3://db/table".to_string())
1984            .name("table".to_string())
1985            .properties(HashMap::new())
1986            .schema(Schema::builder().build().unwrap())
1987            .build();
1988        let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
1989            .unwrap()
1990            .build()
1991            .unwrap()
1992            .metadata;
1993        let table_metadata_builder = TableMetadataBuilder::new_from_metadata(
1994            table_metadata,
1995            Some("s3://db/table/metadata/metadata1.gz.json".to_string()),
1996        );
1997
1998        let uuid = uuid::Uuid::new_v4();
1999        let update = TableUpdate::AssignUuid { uuid };
2000        let updated_metadata = update
2001            .apply(table_metadata_builder)
2002            .unwrap()
2003            .build()
2004            .unwrap()
2005            .metadata;
2006        assert_eq!(updated_metadata.uuid(), uuid);
2007    }
2008
2009    #[test]
2010    fn test_view_assign_uuid() {
2011        test_serde_json(
2012            r#"
2013{
2014    "action": "assign-uuid",
2015    "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
2016}
2017        "#,
2018            ViewUpdate::AssignUuid {
2019                uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
2020            },
2021        );
2022    }
2023
2024    #[test]
2025    fn test_view_upgrade_format_version() {
2026        test_serde_json(
2027            r#"
2028{
2029    "action": "upgrade-format-version",
2030    "format-version": 1
2031}
2032        "#,
2033            ViewUpdate::UpgradeFormatVersion {
2034                format_version: ViewFormatVersion::V1,
2035            },
2036        );
2037    }
2038
2039    #[test]
2040    fn test_view_add_schema() {
2041        let test_schema = Schema::builder()
2042            .with_schema_id(1)
2043            .with_identifier_field_ids(vec![2])
2044            .with_fields(vec![
2045                NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
2046                NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2047                NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
2048            ])
2049            .build()
2050            .unwrap();
2051        test_serde_json(
2052            r#"
2053{
2054    "action": "add-schema",
2055    "schema": {
2056        "type": "struct",
2057        "schema-id": 1,
2058        "fields": [
2059            {
2060                "id": 1,
2061                "name": "foo",
2062                "required": false,
2063                "type": "string"
2064            },
2065            {
2066                "id": 2,
2067                "name": "bar",
2068                "required": true,
2069                "type": "int"
2070            },
2071            {
2072                "id": 3,
2073                "name": "baz",
2074                "required": false,
2075                "type": "boolean"
2076            }
2077        ],
2078        "identifier-field-ids": [
2079            2
2080        ]
2081    },
2082    "last-column-id": 3
2083}
2084        "#,
2085            ViewUpdate::AddSchema {
2086                schema: test_schema.clone(),
2087                last_column_id: Some(3),
2088            },
2089        );
2090    }
2091
2092    #[test]
2093    fn test_view_set_location() {
2094        test_serde_json(
2095            r#"
2096{
2097    "action": "set-location",
2098    "location": "s3://db/view"
2099}
2100        "#,
2101            ViewUpdate::SetLocation {
2102                location: "s3://db/view".to_string(),
2103            },
2104        );
2105    }
2106
2107    #[test]
2108    fn test_view_set_properties() {
2109        test_serde_json(
2110            r#"
2111{
2112    "action": "set-properties",
2113    "updates": {
2114        "prop1": "v1",
2115        "prop2": "v2"
2116    }
2117}
2118        "#,
2119            ViewUpdate::SetProperties {
2120                updates: vec![
2121                    ("prop1".to_string(), "v1".to_string()),
2122                    ("prop2".to_string(), "v2".to_string()),
2123                ]
2124                .into_iter()
2125                .collect(),
2126            },
2127        );
2128    }
2129
2130    #[test]
2131    fn test_view_remove_properties() {
2132        test_serde_json(
2133            r#"
2134{
2135    "action": "remove-properties",
2136    "removals": [
2137        "prop1",
2138        "prop2"
2139    ]
2140}
2141        "#,
2142            ViewUpdate::RemoveProperties {
2143                removals: vec!["prop1".to_string(), "prop2".to_string()],
2144            },
2145        );
2146    }
2147
2148    #[test]
2149    fn test_view_add_view_version() {
2150        test_serde_json(
2151            r#"
2152{
2153    "action": "add-view-version",
2154    "view-version": {
2155            "version-id" : 1,
2156            "timestamp-ms" : 1573518431292,
2157            "schema-id" : 1,
2158            "default-catalog" : "prod",
2159            "default-namespace" : [ "default" ],
2160            "summary" : {
2161              "engine-name" : "Spark"
2162            },
2163            "representations" : [ {
2164              "type" : "sql",
2165              "sql" : "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
2166              "dialect" : "spark"
2167            } ]
2168    }
2169}
2170        "#,
2171            ViewUpdate::AddViewVersion {
2172                view_version: ViewVersion::builder()
2173                    .with_version_id(1)
2174                    .with_timestamp_ms(1573518431292)
2175                    .with_schema_id(1)
2176                    .with_default_catalog(Some("prod".to_string()))
2177                    .with_default_namespace(NamespaceIdent::from_strs(vec!["default"]).unwrap())
2178                    .with_summary(
2179                        vec![("engine-name".to_string(), "Spark".to_string())]
2180                            .into_iter()
2181                            .collect(),
2182                    )
2183                    .with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(SqlViewRepresentation {
2184                        sql: "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2".to_string(),
2185                        dialect: "spark".to_string(),
2186                    })]))
2187                    .build(),
2188            },
2189        );
2190    }
2191
2192    #[test]
2193    fn test_view_set_current_view_version() {
2194        test_serde_json(
2195            r#"
2196{
2197    "action": "set-current-view-version",
2198    "view-version-id": 1
2199}
2200        "#,
2201            ViewUpdate::SetCurrentViewVersion { view_version_id: 1 },
2202        );
2203    }
2204
2205    #[test]
2206    fn test_remove_partition_specs_update() {
2207        test_serde_json(
2208            r#"
2209{
2210    "action": "remove-partition-specs",
2211    "spec-ids": [1, 2]
2212}
2213        "#,
2214            TableUpdate::RemovePartitionSpecs {
2215                spec_ids: vec![1, 2],
2216            },
2217        );
2218    }
2219
2220    #[test]
2221    fn test_set_statistics_file() {
2222        test_serde_json(
2223            r#"
2224        {
2225                "action": "set-statistics",
2226                "snapshot-id": 1940541653261589030,
2227                "statistics": {
2228                        "snapshot-id": 1940541653261589030,
2229                        "statistics-path": "s3://bucket/warehouse/stats.puffin",
2230                        "file-size-in-bytes": 124,
2231                        "file-footer-size-in-bytes": 27,
2232                        "blob-metadata": [
2233                                {
2234                                        "type": "boring-type",
2235                                        "snapshot-id": 1940541653261589030,
2236                                        "sequence-number": 2,
2237                                        "fields": [
2238                                                1
2239                                        ],
2240                                        "properties": {
2241                                                "prop-key": "prop-value"
2242                                        }
2243                                }
2244                        ]
2245                }
2246        }
2247        "#,
2248            TableUpdate::SetStatistics {
2249                statistics: StatisticsFile {
2250                    snapshot_id: 1940541653261589030,
2251                    statistics_path: "s3://bucket/warehouse/stats.puffin".to_string(),
2252                    file_size_in_bytes: 124,
2253                    file_footer_size_in_bytes: 27,
2254                    key_metadata: None,
2255                    blob_metadata: vec![BlobMetadata {
2256                        r#type: "boring-type".to_string(),
2257                        snapshot_id: 1940541653261589030,
2258                        sequence_number: 2,
2259                        fields: vec![1],
2260                        properties: vec![("prop-key".to_string(), "prop-value".to_string())]
2261                            .into_iter()
2262                            .collect(),
2263                    }],
2264                },
2265            },
2266        );
2267    }
2268
2269    #[test]
2270    fn test_remove_statistics_file() {
2271        test_serde_json(
2272            r#"
2273        {
2274                "action": "remove-statistics",
2275                "snapshot-id": 1940541653261589030
2276        }
2277        "#,
2278            TableUpdate::RemoveStatistics {
2279                snapshot_id: 1940541653261589030,
2280            },
2281        );
2282    }
2283
2284    #[test]
2285    fn test_set_partition_statistics_file() {
2286        test_serde_json(
2287            r#"
2288            {
2289                "action": "set-partition-statistics",
2290                "partition-statistics": {
2291                    "snapshot-id": 1940541653261589030,
2292                    "statistics-path": "s3://bucket/warehouse/stats1.parquet",
2293                    "file-size-in-bytes": 43
2294                }
2295            }
2296            "#,
2297            TableUpdate::SetPartitionStatistics {
2298                partition_statistics: PartitionStatisticsFile {
2299                    snapshot_id: 1940541653261589030,
2300                    statistics_path: "s3://bucket/warehouse/stats1.parquet".to_string(),
2301                    file_size_in_bytes: 43,
2302                },
2303            },
2304        )
2305    }
2306
2307    #[test]
2308    fn test_remove_partition_statistics_file() {
2309        test_serde_json(
2310            r#"
2311            {
2312                "action": "remove-partition-statistics",
2313                "snapshot-id": 1940541653261589030
2314            }
2315            "#,
2316            TableUpdate::RemovePartitionStatistics {
2317                snapshot_id: 1940541653261589030,
2318            },
2319        )
2320    }
2321
2322    #[test]
2323    fn test_remove_schema_update() {
2324        test_serde_json(
2325            r#"
2326                {
2327                    "action": "remove-schemas",
2328                    "schema-ids": [1, 2]
2329                }        
2330            "#,
2331            TableUpdate::RemoveSchemas {
2332                schema_ids: vec![1, 2],
2333            },
2334        );
2335    }
2336
2337    #[test]
2338    fn test_add_encryption_key() {
2339        let key_bytes = "key".as_bytes();
2340        let encoded_key = base64::engine::general_purpose::STANDARD.encode(key_bytes);
2341        test_serde_json(
2342            format!(
2343                r#"
2344                {{
2345                    "action": "add-encryption-key",
2346                    "encryption-key": {{
2347                        "key-id": "a",
2348                        "encrypted-key-metadata": "{encoded_key}",
2349                        "encrypted-by-id": "b"
2350                    }}
2351                }}        
2352            "#
2353            ),
2354            TableUpdate::AddEncryptionKey {
2355                encryption_key: EncryptedKey::builder()
2356                    .key_id("a")
2357                    .encrypted_key_metadata(key_bytes.to_vec())
2358                    .encrypted_by_id("b")
2359                    .build(),
2360            },
2361        );
2362    }
2363
2364    #[test]
2365    fn test_remove_encryption_key() {
2366        test_serde_json(
2367            r#"
2368                {
2369                    "action": "remove-encryption-key",
2370                    "key-id": "a"
2371                }        
2372            "#,
2373            TableUpdate::RemoveEncryptionKey {
2374                key_id: "a".to_string(),
2375            },
2376        );
2377    }
2378
2379    #[test]
2380    fn test_table_commit() {
2381        let table = {
2382            let file = File::open(format!(
2383                "{}/testdata/table_metadata/{}",
2384                env!("CARGO_MANIFEST_DIR"),
2385                "TableMetadataV2Valid.json"
2386            ))
2387            .unwrap();
2388            let reader = BufReader::new(file);
2389            let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2390
2391            Table::builder()
2392                .metadata(resp)
2393                .metadata_location("s3://bucket/test/location/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string())
2394                .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2395                .file_io(FileIO::new_with_memory())
2396                .build()
2397                .unwrap()
2398        };
2399
2400        let updates = vec![
2401            TableUpdate::SetLocation {
2402                location: "s3://bucket/test/new_location/data".to_string(),
2403            },
2404            TableUpdate::SetProperties {
2405                updates: vec![
2406                    ("prop1".to_string(), "v1".to_string()),
2407                    ("prop2".to_string(), "v2".to_string()),
2408                ]
2409                .into_iter()
2410                .collect(),
2411            },
2412        ];
2413
2414        let requirements = vec![TableRequirement::UuidMatch {
2415            uuid: table.metadata().table_uuid,
2416        }];
2417
2418        let table_commit = TableCommit::builder()
2419            .ident(table.identifier().to_owned())
2420            .updates(updates)
2421            .requirements(requirements)
2422            .build();
2423
2424        let updated_table = table_commit.apply(table).unwrap();
2425
2426        assert_eq!(
2427            updated_table.metadata().properties.get("prop1").unwrap(),
2428            "v1"
2429        );
2430        assert_eq!(
2431            updated_table.metadata().properties.get("prop2").unwrap(),
2432            "v2"
2433        );
2434
2435        // metadata version should be bumped
2436        assert!(
2437            updated_table
2438                .metadata_location()
2439                .unwrap()
2440                .starts_with("s3://bucket/test/location/metadata/00001-")
2441        );
2442
2443        assert_eq!(
2444            updated_table.metadata().location,
2445            "s3://bucket/test/new_location/data",
2446        );
2447    }
2448}