iceberg/catalog/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Catalog API for Apache Iceberg
19
20pub mod memory;
21mod metadata_location;
22
23use std::collections::HashMap;
24use std::fmt::{Debug, Display};
25use std::future::Future;
26use std::mem::take;
27use std::ops::Deref;
28use std::str::FromStr;
29use std::sync::Arc;
30
31use _serde::{deserialize_snapshot, serialize_snapshot};
32use async_trait::async_trait;
33pub use memory::MemoryCatalog;
34pub use metadata_location::*;
35#[cfg(test)]
36use mockall::automock;
37use serde_derive::{Deserialize, Serialize};
38use typed_builder::TypedBuilder;
39use uuid::Uuid;
40
41use crate::io::StorageFactory;
42use crate::spec::{
43    EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot,
44    SnapshotReference, SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder,
45    UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion,
46};
47use crate::table::Table;
48use crate::{Error, ErrorKind, Result};
49
50/// The catalog API for Iceberg Rust.
51#[async_trait]
52#[cfg_attr(test, automock)]
53pub trait Catalog: Debug + Sync + Send {
54    /// List namespaces inside the catalog.
55    async fn list_namespaces(&self, parent: Option<&NamespaceIdent>)
56    -> Result<Vec<NamespaceIdent>>;
57
58    /// Create a new namespace inside the catalog.
59    async fn create_namespace(
60        &self,
61        namespace: &NamespaceIdent,
62        properties: HashMap<String, String>,
63    ) -> Result<Namespace>;
64
65    /// Get a namespace information from the catalog.
66    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace>;
67
68    /// Check if namespace exists in catalog.
69    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool>;
70
71    /// Update a namespace inside the catalog.
72    ///
73    /// # Behavior
74    ///
75    /// The properties must be the full set of namespace.
76    async fn update_namespace(
77        &self,
78        namespace: &NamespaceIdent,
79        properties: HashMap<String, String>,
80    ) -> Result<()>;
81
82    /// Drop a namespace from the catalog, or returns error if it doesn't exist.
83    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()>;
84
85    /// List tables from namespace.
86    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>>;
87
88    /// Create a new table inside the namespace.
89    async fn create_table(
90        &self,
91        namespace: &NamespaceIdent,
92        creation: TableCreation,
93    ) -> Result<Table>;
94
95    /// Load table from the catalog.
96    async fn load_table(&self, table: &TableIdent) -> Result<Table>;
97
98    /// Drop a table from the catalog, or returns error if it doesn't exist.
99    async fn drop_table(&self, table: &TableIdent) -> Result<()>;
100
101    /// Check if a table exists in the catalog.
102    async fn table_exists(&self, table: &TableIdent) -> Result<bool>;
103
104    /// Rename a table in the catalog.
105    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()>;
106
107    /// Register an existing table to the catalog.
108    async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result<Table>;
109
110    /// Update a table to the catalog.
111    async fn update_table(&self, commit: TableCommit) -> Result<Table>;
112}
113
114/// Common interface for all catalog builders.
115pub trait CatalogBuilder: Default + Debug + Send + Sync {
116    /// The catalog type that this builder creates.
117    type C: Catalog;
118
119    /// Set a custom StorageFactory to use for storage operations.
120    ///
121    /// When a StorageFactory is provided, the catalog will use it to build FileIO
122    /// instances for all storage operations instead of using the default factory.
123    ///
124    /// # Arguments
125    ///
126    /// * `storage_factory` - The StorageFactory to use for creating storage instances
127    ///
128    /// # Example
129    ///
130    /// ```rust,ignore
131    /// use iceberg::CatalogBuilder;
132    /// use iceberg::io::StorageFactory;
133    /// use iceberg_storage_opendal::OpenDalStorageFactory;
134    /// use std::sync::Arc;
135    ///
136    /// let catalog = MyCatalogBuilder::default()
137    ///     .with_storage_factory(Arc::new(OpenDalStorageFactory::S3 {
138    ///         configured_scheme: "s3a".to_string(),
139    ///         customized_credential_load: None,
140    ///     }))
141    ///     .load("my_catalog", props)
142    ///     .await?;
143    /// ```
144    fn with_storage_factory(self, storage_factory: Arc<dyn StorageFactory>) -> Self;
145
146    /// Create a new catalog instance.
147    fn load(
148        self,
149        name: impl Into<String>,
150        props: HashMap<String, String>,
151    ) -> impl Future<Output = Result<Self::C>> + Send;
152}
153
154/// NamespaceIdent represents the identifier of a namespace in the catalog.
155///
156/// The namespace identifier is a list of strings, where each string is a
157/// component of the namespace. It's the catalog implementer's responsibility to
158/// handle the namespace identifier correctly.
159#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
160pub struct NamespaceIdent(Vec<String>);
161
162impl NamespaceIdent {
163    /// Create a new namespace identifier with only one level.
164    pub fn new(name: String) -> Self {
165        Self(vec![name])
166    }
167
168    /// Create a multi-level namespace identifier from vector.
169    pub fn from_vec(names: Vec<String>) -> Result<Self> {
170        if names.is_empty() {
171            return Err(Error::new(
172                ErrorKind::DataInvalid,
173                "Namespace identifier can't be empty!",
174            ));
175        }
176        Ok(Self(names))
177    }
178
179    /// Try to create namespace identifier from an iterator of string.
180    pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
181        Self::from_vec(iter.into_iter().map(|s| s.to_string()).collect())
182    }
183
184    /// Returns a string for used in url.
185    pub fn to_url_string(&self) -> String {
186        self.as_ref().join("\u{001f}")
187    }
188
189    /// Returns inner strings.
190    pub fn inner(self) -> Vec<String> {
191        self.0
192    }
193
194    /// Get the parent of this namespace.
195    /// Returns None if this namespace only has a single element and thus has no parent.
196    pub fn parent(&self) -> Option<Self> {
197        self.0.split_last().and_then(|(_, parent)| {
198            if parent.is_empty() {
199                None
200            } else {
201                Some(Self(parent.to_vec()))
202            }
203        })
204    }
205}
206
207impl AsRef<Vec<String>> for NamespaceIdent {
208    fn as_ref(&self) -> &Vec<String> {
209        &self.0
210    }
211}
212
213impl Deref for NamespaceIdent {
214    type Target = [String];
215
216    fn deref(&self) -> &Self::Target {
217        &self.0
218    }
219}
220
221/// Namespace represents a namespace in the catalog.
222#[derive(Debug, Clone, PartialEq, Eq)]
223pub struct Namespace {
224    name: NamespaceIdent,
225    properties: HashMap<String, String>,
226}
227
228impl Namespace {
229    /// Create a new namespace.
230    pub fn new(name: NamespaceIdent) -> Self {
231        Self::with_properties(name, HashMap::default())
232    }
233
234    /// Create a new namespace with properties.
235    pub fn with_properties(name: NamespaceIdent, properties: HashMap<String, String>) -> Self {
236        Self { name, properties }
237    }
238
239    /// Get the name of the namespace.
240    pub fn name(&self) -> &NamespaceIdent {
241        &self.name
242    }
243
244    /// Get the properties of the namespace.
245    pub fn properties(&self) -> &HashMap<String, String> {
246        &self.properties
247    }
248}
249
250impl Display for NamespaceIdent {
251    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252        write!(f, "{}", self.0.join("."))
253    }
254}
255
256/// TableIdent represents the identifier of a table in the catalog.
257#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
258pub struct TableIdent {
259    /// Namespace of the table.
260    pub namespace: NamespaceIdent,
261    /// Table name.
262    pub name: String,
263}
264
265impl TableIdent {
266    /// Create a new table identifier.
267    pub fn new(namespace: NamespaceIdent, name: String) -> Self {
268        Self { namespace, name }
269    }
270
271    /// Get the namespace of the table.
272    pub fn namespace(&self) -> &NamespaceIdent {
273        &self.namespace
274    }
275
276    /// Get the name of the table.
277    pub fn name(&self) -> &str {
278        &self.name
279    }
280
281    /// Try to create table identifier from an iterator of string.
282    pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
283        let mut vec: Vec<String> = iter.into_iter().map(|s| s.to_string()).collect();
284        let table_name = vec.pop().ok_or_else(|| {
285            Error::new(ErrorKind::DataInvalid, "Table identifier can't be empty!")
286        })?;
287        let namespace_ident = NamespaceIdent::from_vec(vec)?;
288
289        Ok(Self {
290            namespace: namespace_ident,
291            name: table_name,
292        })
293    }
294}
295
296impl Display for TableIdent {
297    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298        write!(f, "{}.{}", self.namespace, self.name)
299    }
300}
301
302/// TableCreation represents the creation of a table in the catalog.
303#[derive(Debug, TypedBuilder)]
304pub struct TableCreation {
305    /// The name of the table.
306    pub name: String,
307    /// The location of the table.
308    #[builder(default, setter(strip_option(fallback = location_opt)))]
309    pub location: Option<String>,
310    /// The schema of the table.
311    pub schema: Schema,
312    /// The partition spec of the table, could be None.
313    #[builder(default, setter(strip_option(fallback = partition_spec_opt), into))]
314    pub partition_spec: Option<UnboundPartitionSpec>,
315    /// The sort order of the table.
316    #[builder(default, setter(strip_option(fallback = sort_order_opt)))]
317    pub sort_order: Option<SortOrder>,
318    /// The properties of the table.
319    #[builder(default, setter(transform = |props: impl IntoIterator<Item=(String, String)>| {
320        props.into_iter().collect()
321    }))]
322    pub properties: HashMap<String, String>,
323    /// Format version of the table. Defaults to V2.
324    #[builder(default = FormatVersion::V2)]
325    pub format_version: FormatVersion,
326}
327
328/// TableCommit represents the commit of a table in the catalog.
329///
330/// The builder is marked as private since it's dangerous and error-prone to construct
331/// [`TableCommit`] directly.
332/// Users are supposed to use [`crate::transaction::Transaction`] to update table.
333#[derive(Debug, TypedBuilder)]
334#[builder(build_method(vis = "pub(crate)"))]
335pub struct TableCommit {
336    /// The table ident.
337    ident: TableIdent,
338    /// The requirements of the table.
339    ///
340    /// Commit will fail if the requirements are not met.
341    requirements: Vec<TableRequirement>,
342    /// The updates of the table.
343    updates: Vec<TableUpdate>,
344}
345
346impl TableCommit {
347    /// Return the table identifier.
348    pub fn identifier(&self) -> &TableIdent {
349        &self.ident
350    }
351
352    /// Take all requirements.
353    pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
354        take(&mut self.requirements)
355    }
356
357    /// Take all updates.
358    pub fn take_updates(&mut self) -> Vec<TableUpdate> {
359        take(&mut self.updates)
360    }
361
362    /// Applies this [`TableCommit`] to the given [`Table`] as part of a catalog update.
363    /// Typically used by [`Catalog::update_table`] to validate requirements and apply metadata updates.
364    ///
365    /// Returns a new [`Table`] with updated metadata,
366    /// or an error if validation or application fails.
367    pub fn apply(self, table: Table) -> Result<Table> {
368        // check requirements
369        for requirement in self.requirements {
370            requirement.check(Some(table.metadata()))?;
371        }
372
373        // get current metadata location
374        let current_metadata_location = table.metadata_location_result()?;
375
376        // apply updates to metadata builder
377        let mut metadata_builder = table
378            .metadata()
379            .clone()
380            .into_builder(Some(current_metadata_location.to_string()));
381        for update in self.updates {
382            metadata_builder = update.apply(metadata_builder)?;
383        }
384
385        // Build the new metadata
386        let new_metadata = metadata_builder.build()?.metadata;
387
388        let new_metadata_location = MetadataLocation::from_str(current_metadata_location)?
389            .with_next_version()
390            .with_new_metadata(&new_metadata)
391            .to_string();
392
393        Ok(table
394            .with_metadata(Arc::new(new_metadata))
395            .with_metadata_location(new_metadata_location))
396    }
397}
398
399/// TableRequirement represents a requirement for a table in the catalog.
400#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
401#[serde(tag = "type")]
402pub enum TableRequirement {
403    /// The table must not already exist; used for create transactions
404    #[serde(rename = "assert-create")]
405    NotExist,
406    /// The table UUID must match the requirement.
407    #[serde(rename = "assert-table-uuid")]
408    UuidMatch {
409        /// Uuid of original table.
410        uuid: Uuid,
411    },
412    /// The table branch or tag identified by the requirement's `reference` must
413    /// reference the requirement's `snapshot-id`.
414    #[serde(rename = "assert-ref-snapshot-id")]
415    RefSnapshotIdMatch {
416        /// The reference of the table to assert.
417        r#ref: String,
418        /// The snapshot id of the table to assert.
419        /// If the id is `None`, the ref must not already exist.
420        #[serde(rename = "snapshot-id")]
421        snapshot_id: Option<i64>,
422    },
423    /// The table's last assigned column id must match the requirement.
424    #[serde(rename = "assert-last-assigned-field-id")]
425    LastAssignedFieldIdMatch {
426        /// The last assigned field id of the table to assert.
427        #[serde(rename = "last-assigned-field-id")]
428        last_assigned_field_id: i32,
429    },
430    /// The table's current schema id must match the requirement.
431    #[serde(rename = "assert-current-schema-id")]
432    CurrentSchemaIdMatch {
433        /// Current schema id of the table to assert.
434        #[serde(rename = "current-schema-id")]
435        current_schema_id: SchemaId,
436    },
437    /// The table's last assigned partition id must match the
438    /// requirement.
439    #[serde(rename = "assert-last-assigned-partition-id")]
440    LastAssignedPartitionIdMatch {
441        /// Last assigned partition id of the table to assert.
442        #[serde(rename = "last-assigned-partition-id")]
443        last_assigned_partition_id: i32,
444    },
445    /// The table's default spec id must match the requirement.
446    #[serde(rename = "assert-default-spec-id")]
447    DefaultSpecIdMatch {
448        /// Default spec id of the table to assert.
449        #[serde(rename = "default-spec-id")]
450        default_spec_id: i32,
451    },
452    /// The table's default sort order id must match the requirement.
453    #[serde(rename = "assert-default-sort-order-id")]
454    DefaultSortOrderIdMatch {
455        /// Default sort order id of the table to assert.
456        #[serde(rename = "default-sort-order-id")]
457        default_sort_order_id: i64,
458    },
459}
460
461/// TableUpdate represents an update to a table in the catalog.
462#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
463#[serde(tag = "action", rename_all = "kebab-case")]
464#[allow(clippy::large_enum_variant)]
465pub enum TableUpdate {
466    /// Upgrade table's format version
467    #[serde(rename_all = "kebab-case")]
468    UpgradeFormatVersion {
469        /// Target format upgrade to.
470        format_version: FormatVersion,
471    },
472    /// Assign a new UUID to the table
473    #[serde(rename_all = "kebab-case")]
474    AssignUuid {
475        /// The new UUID to assign.
476        uuid: Uuid,
477    },
478    /// Add a new schema to the table
479    #[serde(rename_all = "kebab-case")]
480    AddSchema {
481        /// The schema to add.
482        schema: Schema,
483    },
484    /// Set table's current schema
485    #[serde(rename_all = "kebab-case")]
486    SetCurrentSchema {
487        /// Schema ID to set as current, or -1 to set last added schema
488        schema_id: i32,
489    },
490    /// Add a new partition spec to the table
491    AddSpec {
492        /// The partition spec to add.
493        spec: UnboundPartitionSpec,
494    },
495    /// Set table's default spec
496    #[serde(rename_all = "kebab-case")]
497    SetDefaultSpec {
498        /// Partition spec ID to set as the default, or -1 to set last added spec
499        spec_id: i32,
500    },
501    /// Add sort order to table.
502    #[serde(rename_all = "kebab-case")]
503    AddSortOrder {
504        /// Sort order to add.
505        sort_order: SortOrder,
506    },
507    /// Set table's default sort order
508    #[serde(rename_all = "kebab-case")]
509    SetDefaultSortOrder {
510        /// Sort order ID to set as the default, or -1 to set last added sort order
511        sort_order_id: i64,
512    },
513    /// Add snapshot to table.
514    #[serde(rename_all = "kebab-case")]
515    AddSnapshot {
516        /// Snapshot to add.
517        #[serde(
518            deserialize_with = "deserialize_snapshot",
519            serialize_with = "serialize_snapshot"
520        )]
521        snapshot: Snapshot,
522    },
523    /// Set table's snapshot ref.
524    #[serde(rename_all = "kebab-case")]
525    SetSnapshotRef {
526        /// Name of snapshot reference to set.
527        ref_name: String,
528        /// Snapshot reference to set.
529        #[serde(flatten)]
530        reference: SnapshotReference,
531    },
532    /// Remove table's snapshots
533    #[serde(rename_all = "kebab-case")]
534    RemoveSnapshots {
535        /// Snapshot ids to remove.
536        snapshot_ids: Vec<i64>,
537    },
538    /// Remove snapshot reference
539    #[serde(rename_all = "kebab-case")]
540    RemoveSnapshotRef {
541        /// Name of snapshot reference to remove.
542        ref_name: String,
543    },
544    /// Update table's location
545    SetLocation {
546        /// New location for table.
547        location: String,
548    },
549    /// Update table's properties
550    SetProperties {
551        /// Properties to update for table.
552        updates: HashMap<String, String>,
553    },
554    /// Remove table's properties
555    RemoveProperties {
556        /// Properties to remove
557        removals: Vec<String>,
558    },
559    /// Remove partition specs
560    #[serde(rename_all = "kebab-case")]
561    RemovePartitionSpecs {
562        /// Partition spec ids to remove.
563        spec_ids: Vec<i32>,
564    },
565    /// Set statistics for a snapshot
566    #[serde(with = "_serde_set_statistics")]
567    SetStatistics {
568        /// File containing the statistics
569        statistics: StatisticsFile,
570    },
571    /// Remove statistics for a snapshot
572    #[serde(rename_all = "kebab-case")]
573    RemoveStatistics {
574        /// Snapshot id to remove statistics for.
575        snapshot_id: i64,
576    },
577    /// Set partition statistics for a snapshot
578    #[serde(rename_all = "kebab-case")]
579    SetPartitionStatistics {
580        /// File containing the partition statistics
581        partition_statistics: PartitionStatisticsFile,
582    },
583    /// Remove partition statistics for a snapshot
584    #[serde(rename_all = "kebab-case")]
585    RemovePartitionStatistics {
586        /// Snapshot id to remove partition statistics for.
587        snapshot_id: i64,
588    },
589    /// Remove schemas
590    #[serde(rename_all = "kebab-case")]
591    RemoveSchemas {
592        /// Schema IDs to remove.
593        schema_ids: Vec<i32>,
594    },
595    /// Add an encryption key
596    #[serde(rename_all = "kebab-case")]
597    AddEncryptionKey {
598        /// The encryption key to add.
599        encryption_key: EncryptedKey,
600    },
601    /// Remove an encryption key
602    #[serde(rename_all = "kebab-case")]
603    RemoveEncryptionKey {
604        /// The id of the encryption key to remove.
605        key_id: String,
606    },
607}
608
609impl TableUpdate {
610    /// Applies the update to the table metadata builder.
611    pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
612        match self {
613            TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
614            TableUpdate::AddSchema { schema, .. } => Ok(builder.add_schema(schema)?),
615            TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id),
616            TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
617            TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id),
618            TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order),
619            TableUpdate::SetDefaultSortOrder { sort_order_id } => {
620                builder.set_default_sort_order(sort_order_id)
621            }
622            TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot),
623            TableUpdate::SetSnapshotRef {
624                ref_name,
625                reference,
626            } => builder.set_ref(&ref_name, reference),
627            TableUpdate::RemoveSnapshots { snapshot_ids } => {
628                Ok(builder.remove_snapshots(&snapshot_ids))
629            }
630            TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)),
631            TableUpdate::SetLocation { location } => Ok(builder.set_location(location)),
632            TableUpdate::SetProperties { updates } => builder.set_properties(updates),
633            TableUpdate::RemoveProperties { removals } => builder.remove_properties(&removals),
634            TableUpdate::UpgradeFormatVersion { format_version } => {
635                builder.upgrade_format_version(format_version)
636            }
637            TableUpdate::RemovePartitionSpecs { spec_ids } => {
638                builder.remove_partition_specs(&spec_ids)
639            }
640            TableUpdate::SetStatistics { statistics } => Ok(builder.set_statistics(statistics)),
641            TableUpdate::RemoveStatistics { snapshot_id } => {
642                Ok(builder.remove_statistics(snapshot_id))
643            }
644            TableUpdate::SetPartitionStatistics {
645                partition_statistics,
646            } => Ok(builder.set_partition_statistics(partition_statistics)),
647            TableUpdate::RemovePartitionStatistics { snapshot_id } => {
648                Ok(builder.remove_partition_statistics(snapshot_id))
649            }
650            TableUpdate::RemoveSchemas { schema_ids } => builder.remove_schemas(&schema_ids),
651            TableUpdate::AddEncryptionKey { encryption_key } => {
652                Ok(builder.add_encryption_key(encryption_key))
653            }
654            TableUpdate::RemoveEncryptionKey { key_id } => {
655                Ok(builder.remove_encryption_key(&key_id))
656            }
657        }
658    }
659}
660
661impl TableRequirement {
662    /// Check that the requirement is met by the table metadata.
663    /// If the requirement is not met, an appropriate error is returned.
664    ///
665    /// Provide metadata as `None` if the table does not exist.
666    pub fn check(&self, metadata: Option<&TableMetadata>) -> Result<()> {
667        if let Some(metadata) = metadata {
668            match self {
669                TableRequirement::NotExist => {
670                    return Err(Error::new(
671                        ErrorKind::CatalogCommitConflicts,
672                        format!(
673                            "Requirement failed: Table with id {} already exists",
674                            metadata.uuid()
675                        ),
676                    )
677                    .with_retryable(true));
678                }
679                TableRequirement::UuidMatch { uuid } => {
680                    if &metadata.uuid() != uuid {
681                        return Err(Error::new(
682                            ErrorKind::CatalogCommitConflicts,
683                            "Requirement failed: Table UUID does not match",
684                        )
685                        .with_context("expected", *uuid)
686                        .with_context("found", metadata.uuid())
687                        .with_retryable(true));
688                    }
689                }
690                TableRequirement::CurrentSchemaIdMatch { current_schema_id } => {
691                    // ToDo: Harmonize the types of current_schema_id
692                    if metadata.current_schema_id != *current_schema_id {
693                        return Err(Error::new(
694                            ErrorKind::CatalogCommitConflicts,
695                            "Requirement failed: Current schema id does not match",
696                        )
697                        .with_context("expected", current_schema_id.to_string())
698                        .with_context("found", metadata.current_schema_id.to_string())
699                        .with_retryable(true));
700                    }
701                }
702                TableRequirement::DefaultSortOrderIdMatch {
703                    default_sort_order_id,
704                } => {
705                    if metadata.default_sort_order().order_id != *default_sort_order_id {
706                        return Err(Error::new(
707                            ErrorKind::CatalogCommitConflicts,
708                            "Requirement failed: Default sort order id does not match",
709                        )
710                        .with_context("expected", default_sort_order_id.to_string())
711                        .with_context("found", metadata.default_sort_order().order_id.to_string())
712                        .with_retryable(true));
713                    }
714                }
715                TableRequirement::RefSnapshotIdMatch { r#ref, snapshot_id } => {
716                    let snapshot_ref = metadata.snapshot_for_ref(r#ref);
717                    if let Some(snapshot_id) = snapshot_id {
718                        let snapshot_ref = snapshot_ref.ok_or(
719                            Error::new(
720                                ErrorKind::CatalogCommitConflicts,
721                                format!("Requirement failed: Branch or tag `{ref}` not found"),
722                            )
723                            .with_retryable(true),
724                        )?;
725                        if snapshot_ref.snapshot_id() != *snapshot_id {
726                            return Err(Error::new(
727                                ErrorKind::CatalogCommitConflicts,
728                                format!(
729                                    "Requirement failed: Branch or tag `{ref}`'s snapshot has changed"
730                                ),
731                            )
732                            .with_context("expected", snapshot_id.to_string())
733                            .with_context("found", snapshot_ref.snapshot_id().to_string())
734                            .with_retryable(true));
735                        }
736                    } else if snapshot_ref.is_some() {
737                        // a null snapshot ID means the ref should not exist already
738                        return Err(Error::new(
739                            ErrorKind::CatalogCommitConflicts,
740                            format!("Requirement failed: Branch or tag `{ref}` already exists"),
741                        )
742                        .with_retryable(true));
743                    }
744                }
745                TableRequirement::DefaultSpecIdMatch { default_spec_id } => {
746                    // ToDo: Harmonize the types of default_spec_id
747                    if metadata.default_partition_spec_id() != *default_spec_id {
748                        return Err(Error::new(
749                            ErrorKind::CatalogCommitConflicts,
750                            "Requirement failed: Default partition spec id does not match",
751                        )
752                        .with_context("expected", default_spec_id.to_string())
753                        .with_context("found", metadata.default_partition_spec_id().to_string())
754                        .with_retryable(true));
755                    }
756                }
757                TableRequirement::LastAssignedPartitionIdMatch {
758                    last_assigned_partition_id,
759                } => {
760                    if metadata.last_partition_id != *last_assigned_partition_id {
761                        return Err(Error::new(
762                            ErrorKind::CatalogCommitConflicts,
763                            "Requirement failed: Last assigned partition id does not match",
764                        )
765                        .with_context("expected", last_assigned_partition_id.to_string())
766                        .with_context("found", metadata.last_partition_id.to_string())
767                        .with_retryable(true));
768                    }
769                }
770                TableRequirement::LastAssignedFieldIdMatch {
771                    last_assigned_field_id,
772                } => {
773                    if &metadata.last_column_id != last_assigned_field_id {
774                        return Err(Error::new(
775                            ErrorKind::CatalogCommitConflicts,
776                            "Requirement failed: Last assigned field id does not match",
777                        )
778                        .with_context("expected", last_assigned_field_id.to_string())
779                        .with_context("found", metadata.last_column_id.to_string())
780                        .with_retryable(true));
781                    }
782                }
783            };
784        } else {
785            match self {
786                TableRequirement::NotExist => {}
787                _ => {
788                    return Err(Error::new(
789                        ErrorKind::TableNotFound,
790                        "Requirement failed: Table does not exist",
791                    ));
792                }
793            }
794        }
795
796        Ok(())
797    }
798}
799
800pub(super) mod _serde {
801    use serde::{Deserialize as _, Deserializer, Serialize as _};
802
803    use super::*;
804    use crate::spec::{SchemaId, Summary};
805
806    pub(super) fn deserialize_snapshot<'de, D>(
807        deserializer: D,
808    ) -> std::result::Result<Snapshot, D::Error>
809    where D: Deserializer<'de> {
810        let buf = CatalogSnapshot::deserialize(deserializer)?;
811        Ok(buf.into())
812    }
813
814    pub(super) fn serialize_snapshot<S>(
815        snapshot: &Snapshot,
816        serializer: S,
817    ) -> std::result::Result<S::Ok, S::Error>
818    where
819        S: serde::Serializer,
820    {
821        let buf: CatalogSnapshot = snapshot.clone().into();
822        buf.serialize(serializer)
823    }
824
825    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
826    #[serde(rename_all = "kebab-case")]
827    /// Defines the structure of a v2 snapshot for the catalog.
828    /// Main difference to SnapshotV2 is that sequence-number is optional
829    /// in the rest catalog spec to allow for backwards compatibility with v1.
830    struct CatalogSnapshot {
831        snapshot_id: i64,
832        #[serde(skip_serializing_if = "Option::is_none")]
833        parent_snapshot_id: Option<i64>,
834        #[serde(default)]
835        sequence_number: i64,
836        timestamp_ms: i64,
837        manifest_list: String,
838        summary: Summary,
839        #[serde(skip_serializing_if = "Option::is_none")]
840        schema_id: Option<SchemaId>,
841        #[serde(skip_serializing_if = "Option::is_none")]
842        first_row_id: Option<u64>,
843        #[serde(skip_serializing_if = "Option::is_none")]
844        added_rows: Option<u64>,
845        #[serde(skip_serializing_if = "Option::is_none")]
846        key_id: Option<String>,
847    }
848
849    impl From<CatalogSnapshot> for Snapshot {
850        fn from(snapshot: CatalogSnapshot) -> Self {
851            let CatalogSnapshot {
852                snapshot_id,
853                parent_snapshot_id,
854                sequence_number,
855                timestamp_ms,
856                manifest_list,
857                schema_id,
858                summary,
859                first_row_id,
860                added_rows,
861                key_id,
862            } = snapshot;
863            let builder = Snapshot::builder()
864                .with_snapshot_id(snapshot_id)
865                .with_parent_snapshot_id(parent_snapshot_id)
866                .with_sequence_number(sequence_number)
867                .with_timestamp_ms(timestamp_ms)
868                .with_manifest_list(manifest_list)
869                .with_summary(summary)
870                .with_encryption_key_id(key_id);
871            let row_range = first_row_id.zip(added_rows);
872            match (schema_id, row_range) {
873                (None, None) => builder.build(),
874                (Some(schema_id), None) => builder.with_schema_id(schema_id).build(),
875                (None, Some((first_row_id, last_row_id))) => {
876                    builder.with_row_range(first_row_id, last_row_id).build()
877                }
878                (Some(schema_id), Some((first_row_id, last_row_id))) => builder
879                    .with_schema_id(schema_id)
880                    .with_row_range(first_row_id, last_row_id)
881                    .build(),
882            }
883        }
884    }
885
886    impl From<Snapshot> for CatalogSnapshot {
887        fn from(snapshot: Snapshot) -> Self {
888            let first_row_id = snapshot.first_row_id();
889            let added_rows = snapshot.added_rows_count();
890            let Snapshot {
891                snapshot_id,
892                parent_snapshot_id,
893                sequence_number,
894                timestamp_ms,
895                manifest_list,
896                summary,
897                schema_id,
898                row_range: _,
899                encryption_key_id: key_id,
900            } = snapshot;
901            CatalogSnapshot {
902                snapshot_id,
903                parent_snapshot_id,
904                sequence_number,
905                timestamp_ms,
906                manifest_list,
907                summary,
908                schema_id,
909                first_row_id,
910                added_rows,
911                key_id,
912            }
913        }
914    }
915}
916
917/// ViewCreation represents the creation of a view in the catalog.
918#[derive(Debug, TypedBuilder)]
919pub struct ViewCreation {
920    /// The name of the view.
921    pub name: String,
922    /// The view's base location; used to create metadata file locations
923    pub location: String,
924    /// Representations for the view.
925    pub representations: ViewRepresentations,
926    /// The schema of the view.
927    pub schema: Schema,
928    /// The properties of the view.
929    #[builder(default)]
930    pub properties: HashMap<String, String>,
931    /// The default namespace to use when a reference in the SELECT is a single identifier
932    pub default_namespace: NamespaceIdent,
933    /// Default catalog to use when a reference in the SELECT does not contain a catalog
934    #[builder(default)]
935    pub default_catalog: Option<String>,
936    /// A string to string map of summary metadata about the version
937    /// Typical keys are "engine-name" and "engine-version"
938    #[builder(default)]
939    pub summary: HashMap<String, String>,
940}
941
942/// ViewUpdate represents an update to a view in the catalog.
943#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
944#[serde(tag = "action", rename_all = "kebab-case")]
945#[allow(clippy::large_enum_variant)]
946pub enum ViewUpdate {
947    /// Assign a new UUID to the view
948    #[serde(rename_all = "kebab-case")]
949    AssignUuid {
950        /// The new UUID to assign.
951        uuid: Uuid,
952    },
953    /// Upgrade view's format version
954    #[serde(rename_all = "kebab-case")]
955    UpgradeFormatVersion {
956        /// Target format upgrade to.
957        format_version: ViewFormatVersion,
958    },
959    /// Add a new schema to the view
960    #[serde(rename_all = "kebab-case")]
961    AddSchema {
962        /// The schema to add.
963        schema: Schema,
964        /// The last column id of the view.
965        last_column_id: Option<i32>,
966    },
967    /// Set view's current schema
968    #[serde(rename_all = "kebab-case")]
969    SetLocation {
970        /// New location for view.
971        location: String,
972    },
973    /// Set view's properties
974    ///
975    /// Matching keys are updated, and non-matching keys are left unchanged.
976    #[serde(rename_all = "kebab-case")]
977    SetProperties {
978        /// Properties to update for view.
979        updates: HashMap<String, String>,
980    },
981    /// Remove view's properties
982    #[serde(rename_all = "kebab-case")]
983    RemoveProperties {
984        /// Properties to remove
985        removals: Vec<String>,
986    },
987    /// Add a new version to the view
988    #[serde(rename_all = "kebab-case")]
989    AddViewVersion {
990        /// The view version to add.
991        view_version: ViewVersion,
992    },
993    /// Set view's current version
994    #[serde(rename_all = "kebab-case")]
995    SetCurrentViewVersion {
996        /// View version id to set as current, or -1 to set last added version
997        view_version_id: i32,
998    },
999}
1000
1001mod _serde_set_statistics {
1002    // The rest spec requires an additional field `snapshot-id`
1003    // that is redundant with the `snapshot_id` field in the statistics file.
1004    use serde::{Deserialize, Deserializer, Serialize, Serializer};
1005
1006    use super::*;
1007
1008    #[derive(Debug, Serialize, Deserialize)]
1009    #[serde(rename_all = "kebab-case")]
1010    struct SetStatistics {
1011        snapshot_id: Option<i64>,
1012        statistics: StatisticsFile,
1013    }
1014
1015    pub fn serialize<S>(
1016        value: &StatisticsFile,
1017        serializer: S,
1018    ) -> std::result::Result<S::Ok, S::Error>
1019    where
1020        S: Serializer,
1021    {
1022        SetStatistics {
1023            snapshot_id: Some(value.snapshot_id),
1024            statistics: value.clone(),
1025        }
1026        .serialize(serializer)
1027    }
1028
1029    pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<StatisticsFile, D::Error>
1030    where D: Deserializer<'de> {
1031        let SetStatistics {
1032            snapshot_id,
1033            statistics,
1034        } = SetStatistics::deserialize(deserializer)?;
1035        if let Some(snapshot_id) = snapshot_id
1036            && snapshot_id != statistics.snapshot_id
1037        {
1038            return Err(serde::de::Error::custom(format!(
1039                "Snapshot id to set {snapshot_id} does not match the statistics file snapshot id {}",
1040                statistics.snapshot_id
1041            )));
1042        }
1043
1044        Ok(statistics)
1045    }
1046}
1047
1048#[cfg(test)]
1049mod tests {
1050    use std::collections::HashMap;
1051    use std::fmt::Debug;
1052    use std::fs::File;
1053    use std::io::BufReader;
1054
1055    use base64::Engine as _;
1056    use serde::Serialize;
1057    use serde::de::DeserializeOwned;
1058    use uuid::uuid;
1059
1060    use super::ViewUpdate;
1061    use crate::io::FileIO;
1062    use crate::spec::{
1063        BlobMetadata, EncryptedKey, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
1064        PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, SnapshotReference,
1065        SnapshotRetention, SortDirection, SortField, SortOrder, SqlViewRepresentation,
1066        StatisticsFile, Summary, TableMetadata, TableMetadataBuilder, Transform, Type,
1067        UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations,
1068        ViewVersion,
1069    };
1070    use crate::table::Table;
1071    use crate::{
1072        NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, TableUpdate,
1073    };
1074
1075    #[test]
1076    fn test_parent_namespace() {
1077        let ns1 = NamespaceIdent::from_strs(vec!["ns1"]).unwrap();
1078        let ns2 = NamespaceIdent::from_strs(vec!["ns1", "ns2"]).unwrap();
1079        let ns3 = NamespaceIdent::from_strs(vec!["ns1", "ns2", "ns3"]).unwrap();
1080
1081        assert_eq!(ns1.parent(), None);
1082        assert_eq!(ns2.parent(), Some(ns1.clone()));
1083        assert_eq!(ns3.parent(), Some(ns2.clone()));
1084    }
1085
1086    #[test]
1087    fn test_create_table_id() {
1088        let table_id = TableIdent {
1089            namespace: NamespaceIdent::from_strs(vec!["ns1"]).unwrap(),
1090            name: "t1".to_string(),
1091        };
1092
1093        assert_eq!(table_id, TableIdent::from_strs(vec!["ns1", "t1"]).unwrap());
1094    }
1095
1096    #[test]
1097    fn test_table_creation_iterator_properties() {
1098        let builder = TableCreation::builder()
1099            .name("table".to_string())
1100            .schema(Schema::builder().build().unwrap());
1101
1102        fn s(k: &str, v: &str) -> (String, String) {
1103            (k.to_string(), v.to_string())
1104        }
1105
1106        let table_creation = builder
1107            .properties([s("key", "value"), s("foo", "bar")])
1108            .build();
1109
1110        assert_eq!(
1111            HashMap::from([s("key", "value"), s("foo", "bar")]),
1112            table_creation.properties
1113        );
1114    }
1115
1116    fn test_serde_json<T: Serialize + DeserializeOwned + PartialEq + Debug>(
1117        json: impl ToString,
1118        expected: T,
1119    ) {
1120        let json_str = json.to_string();
1121        let actual: T = serde_json::from_str(&json_str).expect("Failed to parse from json");
1122        assert_eq!(actual, expected, "Parsed value is not equal to expected");
1123
1124        let restored: T = serde_json::from_str(
1125            &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1126        )
1127        .expect("Failed to parse from serialized json");
1128
1129        assert_eq!(
1130            restored, expected,
1131            "Parsed restored value is not equal to expected"
1132        );
1133    }
1134
1135    fn metadata() -> TableMetadata {
1136        let tbl_creation = TableCreation::builder()
1137            .name("table".to_string())
1138            .location("/path/to/table".to_string())
1139            .schema(Schema::builder().build().unwrap())
1140            .build();
1141
1142        TableMetadataBuilder::from_table_creation(tbl_creation)
1143            .unwrap()
1144            .assign_uuid(uuid::Uuid::nil())
1145            .build()
1146            .unwrap()
1147            .metadata
1148    }
1149
1150    #[test]
1151    fn test_check_requirement_not_exist() {
1152        let metadata = metadata();
1153        let requirement = TableRequirement::NotExist;
1154
1155        assert!(requirement.check(Some(&metadata)).is_err());
1156        assert!(requirement.check(None).is_ok());
1157    }
1158
1159    #[test]
1160    fn test_check_table_uuid() {
1161        let metadata = metadata();
1162
1163        let requirement = TableRequirement::UuidMatch {
1164            uuid: uuid::Uuid::now_v7(),
1165        };
1166        assert!(requirement.check(Some(&metadata)).is_err());
1167
1168        let requirement = TableRequirement::UuidMatch {
1169            uuid: uuid::Uuid::nil(),
1170        };
1171        assert!(requirement.check(Some(&metadata)).is_ok());
1172    }
1173
1174    #[test]
1175    fn test_check_ref_snapshot_id() {
1176        let metadata = metadata();
1177
1178        // Ref does not exist but should
1179        let requirement = TableRequirement::RefSnapshotIdMatch {
1180            r#ref: "my_branch".to_string(),
1181            snapshot_id: Some(1),
1182        };
1183        assert!(requirement.check(Some(&metadata)).is_err());
1184
1185        // Ref does not exist and should not
1186        let requirement = TableRequirement::RefSnapshotIdMatch {
1187            r#ref: "my_branch".to_string(),
1188            snapshot_id: None,
1189        };
1190        assert!(requirement.check(Some(&metadata)).is_ok());
1191
1192        // Add snapshot
1193        let snapshot = Snapshot::builder()
1194            .with_snapshot_id(3051729675574597004)
1195            .with_sequence_number(10)
1196            .with_timestamp_ms(9992191116217)
1197            .with_manifest_list("s3://b/wh/.../s1.avro".to_string())
1198            .with_schema_id(0)
1199            .with_summary(Summary {
1200                operation: Operation::Append,
1201                additional_properties: HashMap::new(),
1202            })
1203            .build();
1204
1205        let builder = metadata.into_builder(None);
1206        let builder = TableUpdate::AddSnapshot {
1207            snapshot: snapshot.clone(),
1208        }
1209        .apply(builder)
1210        .unwrap();
1211        let metadata = TableUpdate::SetSnapshotRef {
1212            ref_name: MAIN_BRANCH.to_string(),
1213            reference: SnapshotReference {
1214                snapshot_id: snapshot.snapshot_id(),
1215                retention: SnapshotRetention::Branch {
1216                    min_snapshots_to_keep: Some(10),
1217                    max_snapshot_age_ms: None,
1218                    max_ref_age_ms: None,
1219                },
1220            },
1221        }
1222        .apply(builder)
1223        .unwrap()
1224        .build()
1225        .unwrap()
1226        .metadata;
1227
1228        // Ref exists and should match
1229        let requirement = TableRequirement::RefSnapshotIdMatch {
1230            r#ref: "main".to_string(),
1231            snapshot_id: Some(3051729675574597004),
1232        };
1233        assert!(requirement.check(Some(&metadata)).is_ok());
1234
1235        // Ref exists but does not match
1236        let requirement = TableRequirement::RefSnapshotIdMatch {
1237            r#ref: "main".to_string(),
1238            snapshot_id: Some(1),
1239        };
1240        assert!(requirement.check(Some(&metadata)).is_err());
1241    }
1242
1243    #[test]
1244    fn test_check_last_assigned_field_id() {
1245        let metadata = metadata();
1246
1247        let requirement = TableRequirement::LastAssignedFieldIdMatch {
1248            last_assigned_field_id: 1,
1249        };
1250        assert!(requirement.check(Some(&metadata)).is_err());
1251
1252        let requirement = TableRequirement::LastAssignedFieldIdMatch {
1253            last_assigned_field_id: 0,
1254        };
1255        assert!(requirement.check(Some(&metadata)).is_ok());
1256    }
1257
1258    #[test]
1259    fn test_check_current_schema_id() {
1260        let metadata = metadata();
1261
1262        let requirement = TableRequirement::CurrentSchemaIdMatch {
1263            current_schema_id: 1,
1264        };
1265        assert!(requirement.check(Some(&metadata)).is_err());
1266
1267        let requirement = TableRequirement::CurrentSchemaIdMatch {
1268            current_schema_id: 0,
1269        };
1270        assert!(requirement.check(Some(&metadata)).is_ok());
1271    }
1272
1273    #[test]
1274    fn test_check_last_assigned_partition_id() {
1275        let metadata = metadata();
1276        let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1277            last_assigned_partition_id: 0,
1278        };
1279        assert!(requirement.check(Some(&metadata)).is_err());
1280
1281        let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1282            last_assigned_partition_id: 999,
1283        };
1284        assert!(requirement.check(Some(&metadata)).is_ok());
1285    }
1286
1287    #[test]
1288    fn test_check_default_spec_id() {
1289        let metadata = metadata();
1290
1291        let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 1 };
1292        assert!(requirement.check(Some(&metadata)).is_err());
1293
1294        let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 0 };
1295        assert!(requirement.check(Some(&metadata)).is_ok());
1296    }
1297
1298    #[test]
1299    fn test_check_default_sort_order_id() {
1300        let metadata = metadata();
1301
1302        let requirement = TableRequirement::DefaultSortOrderIdMatch {
1303            default_sort_order_id: 1,
1304        };
1305        assert!(requirement.check(Some(&metadata)).is_err());
1306
1307        let requirement = TableRequirement::DefaultSortOrderIdMatch {
1308            default_sort_order_id: 0,
1309        };
1310        assert!(requirement.check(Some(&metadata)).is_ok());
1311    }
1312
1313    #[test]
1314    fn test_table_uuid() {
1315        test_serde_json(
1316            r#"
1317{
1318    "type": "assert-table-uuid",
1319    "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1320}
1321        "#,
1322            TableRequirement::UuidMatch {
1323                uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1324            },
1325        );
1326    }
1327
1328    #[test]
1329    fn test_assert_table_not_exists() {
1330        test_serde_json(
1331            r#"
1332{
1333    "type": "assert-create"
1334}
1335        "#,
1336            TableRequirement::NotExist,
1337        );
1338    }
1339
1340    #[test]
1341    fn test_assert_ref_snapshot_id() {
1342        test_serde_json(
1343            r#"
1344{
1345    "type": "assert-ref-snapshot-id",
1346    "ref": "snapshot-name",
1347    "snapshot-id": null
1348}
1349        "#,
1350            TableRequirement::RefSnapshotIdMatch {
1351                r#ref: "snapshot-name".to_string(),
1352                snapshot_id: None,
1353            },
1354        );
1355
1356        test_serde_json(
1357            r#"
1358{
1359    "type": "assert-ref-snapshot-id",
1360    "ref": "snapshot-name",
1361    "snapshot-id": 1
1362}
1363        "#,
1364            TableRequirement::RefSnapshotIdMatch {
1365                r#ref: "snapshot-name".to_string(),
1366                snapshot_id: Some(1),
1367            },
1368        );
1369    }
1370
1371    #[test]
1372    fn test_assert_last_assigned_field_id() {
1373        test_serde_json(
1374            r#"
1375{
1376    "type": "assert-last-assigned-field-id",
1377    "last-assigned-field-id": 12
1378}
1379        "#,
1380            TableRequirement::LastAssignedFieldIdMatch {
1381                last_assigned_field_id: 12,
1382            },
1383        );
1384    }
1385
1386    #[test]
1387    fn test_assert_current_schema_id() {
1388        test_serde_json(
1389            r#"
1390{
1391    "type": "assert-current-schema-id",
1392    "current-schema-id": 4
1393}
1394        "#,
1395            TableRequirement::CurrentSchemaIdMatch {
1396                current_schema_id: 4,
1397            },
1398        );
1399    }
1400
1401    #[test]
1402    fn test_assert_last_assigned_partition_id() {
1403        test_serde_json(
1404            r#"
1405{
1406    "type": "assert-last-assigned-partition-id",
1407    "last-assigned-partition-id": 1004
1408}
1409        "#,
1410            TableRequirement::LastAssignedPartitionIdMatch {
1411                last_assigned_partition_id: 1004,
1412            },
1413        );
1414    }
1415
1416    #[test]
1417    fn test_assert_default_spec_id() {
1418        test_serde_json(
1419            r#"
1420{
1421    "type": "assert-default-spec-id",
1422    "default-spec-id": 5
1423}
1424        "#,
1425            TableRequirement::DefaultSpecIdMatch { default_spec_id: 5 },
1426        );
1427    }
1428
1429    #[test]
1430    fn test_assert_default_sort_order() {
1431        let json = r#"
1432{
1433    "type": "assert-default-sort-order-id",
1434    "default-sort-order-id": 10
1435}
1436        "#;
1437
1438        let update = TableRequirement::DefaultSortOrderIdMatch {
1439            default_sort_order_id: 10,
1440        };
1441
1442        test_serde_json(json, update);
1443    }
1444
1445    #[test]
1446    fn test_parse_assert_invalid() {
1447        assert!(
1448            serde_json::from_str::<TableRequirement>(
1449                r#"
1450{
1451    "default-sort-order-id": 10
1452}
1453"#
1454            )
1455            .is_err(),
1456            "Table requirements should not be parsed without type."
1457        );
1458    }
1459
1460    #[test]
1461    fn test_assign_uuid() {
1462        test_serde_json(
1463            r#"
1464{
1465    "action": "assign-uuid",
1466    "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1467}
1468        "#,
1469            TableUpdate::AssignUuid {
1470                uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1471            },
1472        );
1473    }
1474
1475    #[test]
1476    fn test_upgrade_format_version() {
1477        test_serde_json(
1478            r#"
1479{
1480    "action": "upgrade-format-version",
1481    "format-version": 2
1482}
1483        "#,
1484            TableUpdate::UpgradeFormatVersion {
1485                format_version: FormatVersion::V2,
1486            },
1487        );
1488    }
1489
1490    #[test]
1491    fn test_add_schema() {
1492        let test_schema = Schema::builder()
1493            .with_schema_id(1)
1494            .with_identifier_field_ids(vec![2])
1495            .with_fields(vec![
1496                NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1497                NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1498                NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1499            ])
1500            .build()
1501            .unwrap();
1502        test_serde_json(
1503            r#"
1504{
1505    "action": "add-schema",
1506    "schema": {
1507        "type": "struct",
1508        "schema-id": 1,
1509        "fields": [
1510            {
1511                "id": 1,
1512                "name": "foo",
1513                "required": false,
1514                "type": "string"
1515            },
1516            {
1517                "id": 2,
1518                "name": "bar",
1519                "required": true,
1520                "type": "int"
1521            },
1522            {
1523                "id": 3,
1524                "name": "baz",
1525                "required": false,
1526                "type": "boolean"
1527            }
1528        ],
1529        "identifier-field-ids": [
1530            2
1531        ]
1532    },
1533    "last-column-id": 3
1534}
1535        "#,
1536            TableUpdate::AddSchema {
1537                schema: test_schema.clone(),
1538            },
1539        );
1540
1541        test_serde_json(
1542            r#"
1543{
1544    "action": "add-schema",
1545    "schema": {
1546        "type": "struct",
1547        "schema-id": 1,
1548        "fields": [
1549            {
1550                "id": 1,
1551                "name": "foo",
1552                "required": false,
1553                "type": "string"
1554            },
1555            {
1556                "id": 2,
1557                "name": "bar",
1558                "required": true,
1559                "type": "int"
1560            },
1561            {
1562                "id": 3,
1563                "name": "baz",
1564                "required": false,
1565                "type": "boolean"
1566            }
1567        ],
1568        "identifier-field-ids": [
1569            2
1570        ]
1571    }
1572}
1573        "#,
1574            TableUpdate::AddSchema {
1575                schema: test_schema.clone(),
1576            },
1577        );
1578    }
1579
1580    #[test]
1581    fn test_set_current_schema() {
1582        test_serde_json(
1583            r#"
1584{
1585   "action": "set-current-schema",
1586   "schema-id": 23
1587}
1588        "#,
1589            TableUpdate::SetCurrentSchema { schema_id: 23 },
1590        );
1591    }
1592
1593    #[test]
1594    fn test_add_spec() {
1595        test_serde_json(
1596            r#"
1597{
1598    "action": "add-spec",
1599    "spec": {
1600        "fields": [
1601            {
1602                "source-id": 4,
1603                "name": "ts_day",
1604                "transform": "day"
1605            },
1606            {
1607                "source-id": 1,
1608                "name": "id_bucket",
1609                "transform": "bucket[16]"
1610            },
1611            {
1612                "source-id": 2,
1613                "name": "id_truncate",
1614                "transform": "truncate[4]"
1615            }
1616        ]
1617    }
1618}
1619        "#,
1620            TableUpdate::AddSpec {
1621                spec: UnboundPartitionSpec::builder()
1622                    .add_partition_field(4, "ts_day".to_string(), Transform::Day)
1623                    .unwrap()
1624                    .add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16))
1625                    .unwrap()
1626                    .add_partition_field(2, "id_truncate".to_string(), Transform::Truncate(4))
1627                    .unwrap()
1628                    .build(),
1629            },
1630        );
1631    }
1632
1633    #[test]
1634    fn test_set_default_spec() {
1635        test_serde_json(
1636            r#"
1637{
1638    "action": "set-default-spec",
1639    "spec-id": 1
1640}
1641        "#,
1642            TableUpdate::SetDefaultSpec { spec_id: 1 },
1643        )
1644    }
1645
1646    #[test]
1647    fn test_add_sort_order() {
1648        let json = r#"
1649{
1650    "action": "add-sort-order",
1651    "sort-order": {
1652        "order-id": 1,
1653        "fields": [
1654            {
1655                "transform": "identity",
1656                "source-id": 2,
1657                "direction": "asc",
1658                "null-order": "nulls-first"
1659            },
1660            {
1661                "transform": "bucket[4]",
1662                "source-id": 3,
1663                "direction": "desc",
1664                "null-order": "nulls-last"
1665            }
1666        ]
1667    }
1668}
1669        "#;
1670
1671        let update = TableUpdate::AddSortOrder {
1672            sort_order: SortOrder::builder()
1673                .with_order_id(1)
1674                .with_sort_field(
1675                    SortField::builder()
1676                        .source_id(2)
1677                        .direction(SortDirection::Ascending)
1678                        .null_order(NullOrder::First)
1679                        .transform(Transform::Identity)
1680                        .build(),
1681                )
1682                .with_sort_field(
1683                    SortField::builder()
1684                        .source_id(3)
1685                        .direction(SortDirection::Descending)
1686                        .null_order(NullOrder::Last)
1687                        .transform(Transform::Bucket(4))
1688                        .build(),
1689                )
1690                .build_unbound()
1691                .unwrap(),
1692        };
1693
1694        test_serde_json(json, update);
1695    }
1696
1697    #[test]
1698    fn test_set_default_order() {
1699        let json = r#"
1700{
1701    "action": "set-default-sort-order",
1702    "sort-order-id": 2
1703}
1704        "#;
1705        let update = TableUpdate::SetDefaultSortOrder { sort_order_id: 2 };
1706
1707        test_serde_json(json, update);
1708    }
1709
1710    #[test]
1711    fn test_add_snapshot() {
1712        let json = r#"
1713{
1714    "action": "add-snapshot",
1715    "snapshot": {
1716        "snapshot-id": 3055729675574597000,
1717        "parent-snapshot-id": 3051729675574597000,
1718        "timestamp-ms": 1555100955770,
1719        "sequence-number": 1,
1720        "summary": {
1721            "operation": "append"
1722        },
1723        "manifest-list": "s3://a/b/2.avro",
1724        "schema-id": 1
1725    }
1726}
1727        "#;
1728
1729        let update = TableUpdate::AddSnapshot {
1730            snapshot: Snapshot::builder()
1731                .with_snapshot_id(3055729675574597000)
1732                .with_parent_snapshot_id(Some(3051729675574597000))
1733                .with_timestamp_ms(1555100955770)
1734                .with_sequence_number(1)
1735                .with_manifest_list("s3://a/b/2.avro")
1736                .with_schema_id(1)
1737                .with_summary(Summary {
1738                    operation: Operation::Append,
1739                    additional_properties: HashMap::default(),
1740                })
1741                .build(),
1742        };
1743
1744        test_serde_json(json, update);
1745    }
1746
1747    #[test]
1748    fn test_add_snapshot_v1() {
1749        let json = r#"
1750{
1751    "action": "add-snapshot",
1752    "snapshot": {
1753        "snapshot-id": 3055729675574597000,
1754        "parent-snapshot-id": 3051729675574597000,
1755        "timestamp-ms": 1555100955770,
1756        "summary": {
1757            "operation": "append"
1758        },
1759        "manifest-list": "s3://a/b/2.avro"
1760    }
1761}
1762    "#;
1763
1764        let update = TableUpdate::AddSnapshot {
1765            snapshot: Snapshot::builder()
1766                .with_snapshot_id(3055729675574597000)
1767                .with_parent_snapshot_id(Some(3051729675574597000))
1768                .with_timestamp_ms(1555100955770)
1769                .with_sequence_number(0)
1770                .with_manifest_list("s3://a/b/2.avro")
1771                .with_summary(Summary {
1772                    operation: Operation::Append,
1773                    additional_properties: HashMap::default(),
1774                })
1775                .build(),
1776        };
1777
1778        let actual: TableUpdate = serde_json::from_str(json).expect("Failed to parse from json");
1779        assert_eq!(actual, update, "Parsed value is not equal to expected");
1780    }
1781
1782    #[test]
1783    fn test_add_snapshot_v3() {
1784        let json = serde_json::json!(
1785        {
1786            "action": "add-snapshot",
1787            "snapshot": {
1788                "snapshot-id": 3055729675574597000i64,
1789                "parent-snapshot-id": 3051729675574597000i64,
1790                "timestamp-ms": 1555100955770i64,
1791                "first-row-id":0,
1792                "added-rows":2,
1793                "key-id":"key123",
1794                "summary": {
1795                    "operation": "append"
1796                },
1797                "manifest-list": "s3://a/b/2.avro"
1798            }
1799        });
1800
1801        let update = TableUpdate::AddSnapshot {
1802            snapshot: Snapshot::builder()
1803                .with_snapshot_id(3055729675574597000)
1804                .with_parent_snapshot_id(Some(3051729675574597000))
1805                .with_timestamp_ms(1555100955770)
1806                .with_sequence_number(0)
1807                .with_manifest_list("s3://a/b/2.avro")
1808                .with_row_range(0, 2)
1809                .with_encryption_key_id(Some("key123".to_string()))
1810                .with_summary(Summary {
1811                    operation: Operation::Append,
1812                    additional_properties: HashMap::default(),
1813                })
1814                .build(),
1815        };
1816
1817        let actual: TableUpdate = serde_json::from_value(json).expect("Failed to parse from json");
1818        assert_eq!(actual, update, "Parsed value is not equal to expected");
1819        let restored: TableUpdate = serde_json::from_str(
1820            &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1821        )
1822        .expect("Failed to parse from serialized json");
1823        assert_eq!(restored, update);
1824    }
1825
1826    #[test]
1827    fn test_remove_snapshots() {
1828        let json = r#"
1829{
1830    "action": "remove-snapshots",
1831    "snapshot-ids": [
1832        1,
1833        2
1834    ]
1835}
1836        "#;
1837
1838        let update = TableUpdate::RemoveSnapshots {
1839            snapshot_ids: vec![1, 2],
1840        };
1841        test_serde_json(json, update);
1842    }
1843
1844    #[test]
1845    fn test_remove_snapshot_ref() {
1846        let json = r#"
1847{
1848    "action": "remove-snapshot-ref",
1849    "ref-name": "snapshot-ref"
1850}
1851        "#;
1852
1853        let update = TableUpdate::RemoveSnapshotRef {
1854            ref_name: "snapshot-ref".to_string(),
1855        };
1856        test_serde_json(json, update);
1857    }
1858
1859    #[test]
1860    fn test_set_snapshot_ref_tag() {
1861        let json = r#"
1862{
1863    "action": "set-snapshot-ref",
1864    "type": "tag",
1865    "ref-name": "hank",
1866    "snapshot-id": 1,
1867    "max-ref-age-ms": 1
1868}
1869        "#;
1870
1871        let update = TableUpdate::SetSnapshotRef {
1872            ref_name: "hank".to_string(),
1873            reference: SnapshotReference {
1874                snapshot_id: 1,
1875                retention: SnapshotRetention::Tag {
1876                    max_ref_age_ms: Some(1),
1877                },
1878            },
1879        };
1880
1881        test_serde_json(json, update);
1882    }
1883
1884    #[test]
1885    fn test_set_snapshot_ref_branch() {
1886        let json = r#"
1887{
1888    "action": "set-snapshot-ref",
1889    "type": "branch",
1890    "ref-name": "hank",
1891    "snapshot-id": 1,
1892    "min-snapshots-to-keep": 2,
1893    "max-snapshot-age-ms": 3,
1894    "max-ref-age-ms": 4
1895}
1896        "#;
1897
1898        let update = TableUpdate::SetSnapshotRef {
1899            ref_name: "hank".to_string(),
1900            reference: SnapshotReference {
1901                snapshot_id: 1,
1902                retention: SnapshotRetention::Branch {
1903                    min_snapshots_to_keep: Some(2),
1904                    max_snapshot_age_ms: Some(3),
1905                    max_ref_age_ms: Some(4),
1906                },
1907            },
1908        };
1909
1910        test_serde_json(json, update);
1911    }
1912
1913    #[test]
1914    fn test_set_properties() {
1915        let json = r#"
1916{
1917    "action": "set-properties",
1918    "updates": {
1919        "prop1": "v1",
1920        "prop2": "v2"
1921    }
1922}
1923        "#;
1924
1925        let update = TableUpdate::SetProperties {
1926            updates: vec![
1927                ("prop1".to_string(), "v1".to_string()),
1928                ("prop2".to_string(), "v2".to_string()),
1929            ]
1930            .into_iter()
1931            .collect(),
1932        };
1933
1934        test_serde_json(json, update);
1935    }
1936
1937    #[test]
1938    fn test_remove_properties() {
1939        let json = r#"
1940{
1941    "action": "remove-properties",
1942    "removals": [
1943        "prop1",
1944        "prop2"
1945    ]
1946}
1947        "#;
1948
1949        let update = TableUpdate::RemoveProperties {
1950            removals: vec!["prop1".to_string(), "prop2".to_string()],
1951        };
1952
1953        test_serde_json(json, update);
1954    }
1955
1956    #[test]
1957    fn test_set_location() {
1958        let json = r#"
1959{
1960    "action": "set-location",
1961    "location": "s3://bucket/warehouse/tbl_location"
1962}
1963    "#;
1964
1965        let update = TableUpdate::SetLocation {
1966            location: "s3://bucket/warehouse/tbl_location".to_string(),
1967        };
1968
1969        test_serde_json(json, update);
1970    }
1971
1972    #[test]
1973    fn test_table_update_apply() {
1974        let table_creation = TableCreation::builder()
1975            .location("s3://db/table".to_string())
1976            .name("table".to_string())
1977            .properties(HashMap::new())
1978            .schema(Schema::builder().build().unwrap())
1979            .build();
1980        let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
1981            .unwrap()
1982            .build()
1983            .unwrap()
1984            .metadata;
1985        let table_metadata_builder = TableMetadataBuilder::new_from_metadata(
1986            table_metadata,
1987            Some("s3://db/table/metadata/metadata1.gz.json".to_string()),
1988        );
1989
1990        let uuid = uuid::Uuid::new_v4();
1991        let update = TableUpdate::AssignUuid { uuid };
1992        let updated_metadata = update
1993            .apply(table_metadata_builder)
1994            .unwrap()
1995            .build()
1996            .unwrap()
1997            .metadata;
1998        assert_eq!(updated_metadata.uuid(), uuid);
1999    }
2000
2001    #[test]
2002    fn test_view_assign_uuid() {
2003        test_serde_json(
2004            r#"
2005{
2006    "action": "assign-uuid",
2007    "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
2008}
2009        "#,
2010            ViewUpdate::AssignUuid {
2011                uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
2012            },
2013        );
2014    }
2015
2016    #[test]
2017    fn test_view_upgrade_format_version() {
2018        test_serde_json(
2019            r#"
2020{
2021    "action": "upgrade-format-version",
2022    "format-version": 1
2023}
2024        "#,
2025            ViewUpdate::UpgradeFormatVersion {
2026                format_version: ViewFormatVersion::V1,
2027            },
2028        );
2029    }
2030
2031    #[test]
2032    fn test_view_add_schema() {
2033        let test_schema = Schema::builder()
2034            .with_schema_id(1)
2035            .with_identifier_field_ids(vec![2])
2036            .with_fields(vec![
2037                NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
2038                NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2039                NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
2040            ])
2041            .build()
2042            .unwrap();
2043        test_serde_json(
2044            r#"
2045{
2046    "action": "add-schema",
2047    "schema": {
2048        "type": "struct",
2049        "schema-id": 1,
2050        "fields": [
2051            {
2052                "id": 1,
2053                "name": "foo",
2054                "required": false,
2055                "type": "string"
2056            },
2057            {
2058                "id": 2,
2059                "name": "bar",
2060                "required": true,
2061                "type": "int"
2062            },
2063            {
2064                "id": 3,
2065                "name": "baz",
2066                "required": false,
2067                "type": "boolean"
2068            }
2069        ],
2070        "identifier-field-ids": [
2071            2
2072        ]
2073    },
2074    "last-column-id": 3
2075}
2076        "#,
2077            ViewUpdate::AddSchema {
2078                schema: test_schema.clone(),
2079                last_column_id: Some(3),
2080            },
2081        );
2082    }
2083
2084    #[test]
2085    fn test_view_set_location() {
2086        test_serde_json(
2087            r#"
2088{
2089    "action": "set-location",
2090    "location": "s3://db/view"
2091}
2092        "#,
2093            ViewUpdate::SetLocation {
2094                location: "s3://db/view".to_string(),
2095            },
2096        );
2097    }
2098
2099    #[test]
2100    fn test_view_set_properties() {
2101        test_serde_json(
2102            r#"
2103{
2104    "action": "set-properties",
2105    "updates": {
2106        "prop1": "v1",
2107        "prop2": "v2"
2108    }
2109}
2110        "#,
2111            ViewUpdate::SetProperties {
2112                updates: vec![
2113                    ("prop1".to_string(), "v1".to_string()),
2114                    ("prop2".to_string(), "v2".to_string()),
2115                ]
2116                .into_iter()
2117                .collect(),
2118            },
2119        );
2120    }
2121
2122    #[test]
2123    fn test_view_remove_properties() {
2124        test_serde_json(
2125            r#"
2126{
2127    "action": "remove-properties",
2128    "removals": [
2129        "prop1",
2130        "prop2"
2131    ]
2132}
2133        "#,
2134            ViewUpdate::RemoveProperties {
2135                removals: vec!["prop1".to_string(), "prop2".to_string()],
2136            },
2137        );
2138    }
2139
2140    #[test]
2141    fn test_view_add_view_version() {
2142        test_serde_json(
2143            r#"
2144{
2145    "action": "add-view-version",
2146    "view-version": {
2147            "version-id" : 1,
2148            "timestamp-ms" : 1573518431292,
2149            "schema-id" : 1,
2150            "default-catalog" : "prod",
2151            "default-namespace" : [ "default" ],
2152            "summary" : {
2153              "engine-name" : "Spark"
2154            },
2155            "representations" : [ {
2156              "type" : "sql",
2157              "sql" : "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
2158              "dialect" : "spark"
2159            } ]
2160    }
2161}
2162        "#,
2163            ViewUpdate::AddViewVersion {
2164                view_version: ViewVersion::builder()
2165                    .with_version_id(1)
2166                    .with_timestamp_ms(1573518431292)
2167                    .with_schema_id(1)
2168                    .with_default_catalog(Some("prod".to_string()))
2169                    .with_default_namespace(NamespaceIdent::from_strs(vec!["default"]).unwrap())
2170                    .with_summary(
2171                        vec![("engine-name".to_string(), "Spark".to_string())]
2172                            .into_iter()
2173                            .collect(),
2174                    )
2175                    .with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(SqlViewRepresentation {
2176                        sql: "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2".to_string(),
2177                        dialect: "spark".to_string(),
2178                    })]))
2179                    .build(),
2180            },
2181        );
2182    }
2183
2184    #[test]
2185    fn test_view_set_current_view_version() {
2186        test_serde_json(
2187            r#"
2188{
2189    "action": "set-current-view-version",
2190    "view-version-id": 1
2191}
2192        "#,
2193            ViewUpdate::SetCurrentViewVersion { view_version_id: 1 },
2194        );
2195    }
2196
2197    #[test]
2198    fn test_remove_partition_specs_update() {
2199        test_serde_json(
2200            r#"
2201{
2202    "action": "remove-partition-specs",
2203    "spec-ids": [1, 2]
2204}
2205        "#,
2206            TableUpdate::RemovePartitionSpecs {
2207                spec_ids: vec![1, 2],
2208            },
2209        );
2210    }
2211
2212    #[test]
2213    fn test_set_statistics_file() {
2214        test_serde_json(
2215            r#"
2216        {
2217                "action": "set-statistics",
2218                "snapshot-id": 1940541653261589030,
2219                "statistics": {
2220                        "snapshot-id": 1940541653261589030,
2221                        "statistics-path": "s3://bucket/warehouse/stats.puffin",
2222                        "file-size-in-bytes": 124,
2223                        "file-footer-size-in-bytes": 27,
2224                        "blob-metadata": [
2225                                {
2226                                        "type": "boring-type",
2227                                        "snapshot-id": 1940541653261589030,
2228                                        "sequence-number": 2,
2229                                        "fields": [
2230                                                1
2231                                        ],
2232                                        "properties": {
2233                                                "prop-key": "prop-value"
2234                                        }
2235                                }
2236                        ]
2237                }
2238        }
2239        "#,
2240            TableUpdate::SetStatistics {
2241                statistics: StatisticsFile {
2242                    snapshot_id: 1940541653261589030,
2243                    statistics_path: "s3://bucket/warehouse/stats.puffin".to_string(),
2244                    file_size_in_bytes: 124,
2245                    file_footer_size_in_bytes: 27,
2246                    key_metadata: None,
2247                    blob_metadata: vec![BlobMetadata {
2248                        r#type: "boring-type".to_string(),
2249                        snapshot_id: 1940541653261589030,
2250                        sequence_number: 2,
2251                        fields: vec![1],
2252                        properties: vec![("prop-key".to_string(), "prop-value".to_string())]
2253                            .into_iter()
2254                            .collect(),
2255                    }],
2256                },
2257            },
2258        );
2259    }
2260
2261    #[test]
2262    fn test_remove_statistics_file() {
2263        test_serde_json(
2264            r#"
2265        {
2266                "action": "remove-statistics",
2267                "snapshot-id": 1940541653261589030
2268        }
2269        "#,
2270            TableUpdate::RemoveStatistics {
2271                snapshot_id: 1940541653261589030,
2272            },
2273        );
2274    }
2275
2276    #[test]
2277    fn test_set_partition_statistics_file() {
2278        test_serde_json(
2279            r#"
2280            {
2281                "action": "set-partition-statistics",
2282                "partition-statistics": {
2283                    "snapshot-id": 1940541653261589030,
2284                    "statistics-path": "s3://bucket/warehouse/stats1.parquet",
2285                    "file-size-in-bytes": 43
2286                }
2287            }
2288            "#,
2289            TableUpdate::SetPartitionStatistics {
2290                partition_statistics: PartitionStatisticsFile {
2291                    snapshot_id: 1940541653261589030,
2292                    statistics_path: "s3://bucket/warehouse/stats1.parquet".to_string(),
2293                    file_size_in_bytes: 43,
2294                },
2295            },
2296        )
2297    }
2298
2299    #[test]
2300    fn test_remove_partition_statistics_file() {
2301        test_serde_json(
2302            r#"
2303            {
2304                "action": "remove-partition-statistics",
2305                "snapshot-id": 1940541653261589030
2306            }
2307            "#,
2308            TableUpdate::RemovePartitionStatistics {
2309                snapshot_id: 1940541653261589030,
2310            },
2311        )
2312    }
2313
2314    #[test]
2315    fn test_remove_schema_update() {
2316        test_serde_json(
2317            r#"
2318                {
2319                    "action": "remove-schemas",
2320                    "schema-ids": [1, 2]
2321                }        
2322            "#,
2323            TableUpdate::RemoveSchemas {
2324                schema_ids: vec![1, 2],
2325            },
2326        );
2327    }
2328
2329    #[test]
2330    fn test_add_encryption_key() {
2331        let key_bytes = "key".as_bytes();
2332        let encoded_key = base64::engine::general_purpose::STANDARD.encode(key_bytes);
2333        test_serde_json(
2334            format!(
2335                r#"
2336                {{
2337                    "action": "add-encryption-key",
2338                    "encryption-key": {{
2339                        "key-id": "a",
2340                        "encrypted-key-metadata": "{encoded_key}",
2341                        "encrypted-by-id": "b"
2342                    }}
2343                }}        
2344            "#
2345            ),
2346            TableUpdate::AddEncryptionKey {
2347                encryption_key: EncryptedKey::builder()
2348                    .key_id("a")
2349                    .encrypted_key_metadata(key_bytes.to_vec())
2350                    .encrypted_by_id("b")
2351                    .build(),
2352            },
2353        );
2354    }
2355
2356    #[test]
2357    fn test_remove_encryption_key() {
2358        test_serde_json(
2359            r#"
2360                {
2361                    "action": "remove-encryption-key",
2362                    "key-id": "a"
2363                }        
2364            "#,
2365            TableUpdate::RemoveEncryptionKey {
2366                key_id: "a".to_string(),
2367            },
2368        );
2369    }
2370
2371    #[test]
2372    fn test_table_commit() {
2373        let table = {
2374            let file = File::open(format!(
2375                "{}/testdata/table_metadata/{}",
2376                env!("CARGO_MANIFEST_DIR"),
2377                "TableMetadataV2Valid.json"
2378            ))
2379            .unwrap();
2380            let reader = BufReader::new(file);
2381            let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2382
2383            Table::builder()
2384                .metadata(resp)
2385                .metadata_location("s3://bucket/test/location/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string())
2386                .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2387                .file_io(FileIO::new_with_memory())
2388                .build()
2389                .unwrap()
2390        };
2391
2392        let updates = vec![
2393            TableUpdate::SetLocation {
2394                location: "s3://bucket/test/new_location/data".to_string(),
2395            },
2396            TableUpdate::SetProperties {
2397                updates: vec![
2398                    ("prop1".to_string(), "v1".to_string()),
2399                    ("prop2".to_string(), "v2".to_string()),
2400                ]
2401                .into_iter()
2402                .collect(),
2403            },
2404        ];
2405
2406        let requirements = vec![TableRequirement::UuidMatch {
2407            uuid: table.metadata().table_uuid,
2408        }];
2409
2410        let table_commit = TableCommit::builder()
2411            .ident(table.identifier().to_owned())
2412            .updates(updates)
2413            .requirements(requirements)
2414            .build();
2415
2416        let updated_table = table_commit.apply(table).unwrap();
2417
2418        assert_eq!(
2419            updated_table.metadata().properties.get("prop1").unwrap(),
2420            "v1"
2421        );
2422        assert_eq!(
2423            updated_table.metadata().properties.get("prop2").unwrap(),
2424            "v2"
2425        );
2426
2427        // metadata version should be bumped
2428        assert!(
2429            updated_table
2430                .metadata_location()
2431                .unwrap()
2432                .starts_with("s3://bucket/test/location/metadata/00001-")
2433        );
2434
2435        assert_eq!(
2436            updated_table.metadata().location,
2437            "s3://bucket/test/new_location/data",
2438        );
2439    }
2440}