iceberg/catalog/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Catalog API for Apache Iceberg
19
20pub mod memory;
21mod metadata_location;
22pub(crate) mod utils;
23
24use std::collections::HashMap;
25use std::fmt::{Debug, Display};
26use std::future::Future;
27use std::mem::take;
28use std::ops::Deref;
29use std::str::FromStr;
30use std::sync::Arc;
31
32use _serde::{deserialize_snapshot, serialize_snapshot};
33use async_trait::async_trait;
34pub use memory::MemoryCatalog;
35pub use metadata_location::*;
36#[cfg(test)]
37use mockall::automock;
38use serde_derive::{Deserialize, Serialize};
39use typed_builder::TypedBuilder;
40use uuid::Uuid;
41
42use crate::io::StorageFactory;
43use crate::runtime::Runtime;
44use crate::spec::{
45    EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot,
46    SnapshotReference, SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder,
47    UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion,
48};
49use crate::table::Table;
50use crate::{Error, ErrorKind, Result};
51
52/// The catalog API for Iceberg Rust.
53#[async_trait]
54#[cfg_attr(test, automock)]
55pub trait Catalog: Debug + Sync + Send {
56    /// List namespaces inside the catalog.
57    async fn list_namespaces(&self, parent: Option<&NamespaceIdent>)
58    -> Result<Vec<NamespaceIdent>>;
59
60    /// Create a new namespace inside the catalog.
61    async fn create_namespace(
62        &self,
63        namespace: &NamespaceIdent,
64        properties: HashMap<String, String>,
65    ) -> Result<Namespace>;
66
67    /// Get a namespace information from the catalog.
68    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace>;
69
70    /// Check if namespace exists in catalog.
71    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool>;
72
73    /// Update a namespace inside the catalog.
74    ///
75    /// # Behavior
76    ///
77    /// The properties must be the full set of namespace.
78    async fn update_namespace(
79        &self,
80        namespace: &NamespaceIdent,
81        properties: HashMap<String, String>,
82    ) -> Result<()>;
83
84    /// Drop a namespace from the catalog, or returns error if it doesn't exist.
85    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()>;
86
87    /// List tables from namespace.
88    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>>;
89
90    /// Create a new table inside the namespace.
91    async fn create_table(
92        &self,
93        namespace: &NamespaceIdent,
94        creation: TableCreation,
95    ) -> Result<Table>;
96
97    /// Load table from the catalog.
98    async fn load_table(&self, table: &TableIdent) -> Result<Table>;
99
100    /// Drop a table from the catalog, or returns error if it doesn't exist.
101    async fn drop_table(&self, table: &TableIdent) -> Result<()>;
102
103    /// Drop a table from the catalog and delete the underlying table data.
104    ///
105    /// Implementations should load the table metadata, drop the table
106    /// from the catalog, then delete all associated data and metadata files.
107    /// The [`drop_table_data`](utils::drop_table_data) utility function can
108    /// be used for the file cleanup step.
109    async fn purge_table(&self, table: &TableIdent) -> Result<()>;
110
111    /// Check if a table exists in the catalog.
112    async fn table_exists(&self, table: &TableIdent) -> Result<bool>;
113
114    /// Rename a table in the catalog.
115    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()>;
116
117    /// Register an existing table to the catalog.
118    async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result<Table>;
119
120    /// Update a table to the catalog.
121    async fn update_table(&self, commit: TableCommit) -> Result<Table>;
122}
123
124/// Common interface for all catalog builders.
125pub trait CatalogBuilder: Default + Debug + Send + Sync {
126    /// The catalog type that this builder creates.
127    type C: Catalog;
128
129    /// Set a custom StorageFactory to use for storage operations.
130    ///
131    /// When a StorageFactory is provided, the catalog will use it to build FileIO
132    /// instances for all storage operations instead of using the default factory.
133    ///
134    /// # Arguments
135    ///
136    /// * `storage_factory` - The StorageFactory to use for creating storage instances
137    ///
138    /// # Example
139    ///
140    /// ```rust,ignore
141    /// use iceberg::CatalogBuilder;
142    /// use iceberg::io::StorageFactory;
143    /// use iceberg_storage_opendal::OpenDalStorageFactory;
144    /// use std::sync::Arc;
145    ///
146    /// let catalog = MyCatalogBuilder::default()
147    ///     .with_storage_factory(Arc::new(OpenDalStorageFactory::S3 {
148    ///         customized_credential_load: None,
149    ///     }))
150    ///     .load("my_catalog", props)
151    ///     .await?;
152    /// ```
153    fn with_storage_factory(self, storage_factory: Arc<dyn StorageFactory>) -> Self;
154
155    /// Set a custom tokio Runtime to use for spawning async tasks.
156    ///
157    /// When a Runtime is provided, the catalog will propagate it to all tables
158    /// it creates. Tasks such as scan planning and delete file processing
159    /// will be spawned on this runtime.
160    fn with_runtime(self, runtime: Runtime) -> Self;
161
162    /// Create a new catalog instance.
163    fn load(
164        self,
165        name: impl Into<String>,
166        props: HashMap<String, String>,
167    ) -> impl Future<Output = Result<Self::C>> + Send;
168}
169
170/// NamespaceIdent represents the identifier of a namespace in the catalog.
171///
172/// The namespace identifier is a list of strings, where each string is a
173/// component of the namespace. It's the catalog implementer's responsibility to
174/// handle the namespace identifier correctly.
175#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
176pub struct NamespaceIdent(Vec<String>);
177
178impl NamespaceIdent {
179    /// Create a new namespace identifier with only one level.
180    pub fn new(name: String) -> Self {
181        Self(vec![name])
182    }
183
184    /// Create a multi-level namespace identifier from vector.
185    pub fn from_vec(names: Vec<String>) -> Result<Self> {
186        if names.is_empty() {
187            return Err(Error::new(
188                ErrorKind::DataInvalid,
189                "Namespace identifier can't be empty!",
190            ));
191        }
192        Ok(Self(names))
193    }
194
195    /// Try to create namespace identifier from an iterator of string.
196    pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
197        Self::from_vec(iter.into_iter().map(|s| s.to_string()).collect())
198    }
199
200    /// Returns a string for used in url.
201    pub fn to_url_string(&self) -> String {
202        self.as_ref().join("\u{001f}")
203    }
204
205    /// Returns inner strings.
206    pub fn inner(self) -> Vec<String> {
207        self.0
208    }
209
210    /// Get the parent of this namespace.
211    /// Returns None if this namespace only has a single element and thus has no parent.
212    pub fn parent(&self) -> Option<Self> {
213        self.0.split_last().and_then(|(_, parent)| {
214            if parent.is_empty() {
215                None
216            } else {
217                Some(Self(parent.to_vec()))
218            }
219        })
220    }
221}
222
223impl AsRef<Vec<String>> for NamespaceIdent {
224    fn as_ref(&self) -> &Vec<String> {
225        &self.0
226    }
227}
228
229impl Deref for NamespaceIdent {
230    type Target = [String];
231
232    fn deref(&self) -> &Self::Target {
233        &self.0
234    }
235}
236
237/// Namespace represents a namespace in the catalog.
238#[derive(Debug, Clone, PartialEq, Eq)]
239pub struct Namespace {
240    name: NamespaceIdent,
241    properties: HashMap<String, String>,
242}
243
244impl Namespace {
245    /// Create a new namespace.
246    pub fn new(name: NamespaceIdent) -> Self {
247        Self::with_properties(name, HashMap::default())
248    }
249
250    /// Create a new namespace with properties.
251    pub fn with_properties(name: NamespaceIdent, properties: HashMap<String, String>) -> Self {
252        Self { name, properties }
253    }
254
255    /// Get the name of the namespace.
256    pub fn name(&self) -> &NamespaceIdent {
257        &self.name
258    }
259
260    /// Get the properties of the namespace.
261    pub fn properties(&self) -> &HashMap<String, String> {
262        &self.properties
263    }
264}
265
266impl Display for NamespaceIdent {
267    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
268        write!(f, "{}", self.0.join("."))
269    }
270}
271
272/// TableIdent represents the identifier of a table in the catalog.
273#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
274pub struct TableIdent {
275    /// Namespace of the table.
276    pub namespace: NamespaceIdent,
277    /// Table name.
278    pub name: String,
279}
280
281impl TableIdent {
282    /// Create a new table identifier.
283    pub fn new(namespace: NamespaceIdent, name: String) -> Self {
284        Self { namespace, name }
285    }
286
287    /// Get the namespace of the table.
288    pub fn namespace(&self) -> &NamespaceIdent {
289        &self.namespace
290    }
291
292    /// Get the name of the table.
293    pub fn name(&self) -> &str {
294        &self.name
295    }
296
297    /// Try to create table identifier from an iterator of string.
298    pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
299        let mut vec: Vec<String> = iter.into_iter().map(|s| s.to_string()).collect();
300        let table_name = vec.pop().ok_or_else(|| {
301            Error::new(ErrorKind::DataInvalid, "Table identifier can't be empty!")
302        })?;
303        let namespace_ident = NamespaceIdent::from_vec(vec)?;
304
305        Ok(Self {
306            namespace: namespace_ident,
307            name: table_name,
308        })
309    }
310}
311
312impl Display for TableIdent {
313    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314        write!(f, "{}.{}", self.namespace, self.name)
315    }
316}
317
318/// TableCreation represents the creation of a table in the catalog.
319#[derive(Debug, TypedBuilder)]
320pub struct TableCreation {
321    /// The name of the table.
322    pub name: String,
323    /// The location of the table.
324    #[builder(default, setter(strip_option(fallback = location_opt)))]
325    pub location: Option<String>,
326    /// The schema of the table.
327    pub schema: Schema,
328    /// The partition spec of the table, could be None.
329    #[builder(default, setter(strip_option(fallback = partition_spec_opt), into))]
330    pub partition_spec: Option<UnboundPartitionSpec>,
331    /// The sort order of the table.
332    #[builder(default, setter(strip_option(fallback = sort_order_opt)))]
333    pub sort_order: Option<SortOrder>,
334    /// The properties of the table.
335    #[builder(default, setter(transform = |props: impl IntoIterator<Item=(String, String)>| {
336        props.into_iter().collect()
337    }))]
338    pub properties: HashMap<String, String>,
339    /// Format version of the table. Defaults to V2.
340    #[builder(default = FormatVersion::V2)]
341    pub format_version: FormatVersion,
342}
343
344/// TableCommit represents the commit of a table in the catalog.
345///
346/// The builder is marked as private since it's dangerous and error-prone to construct
347/// [`TableCommit`] directly.
348/// Users are supposed to use [`crate::transaction::Transaction`] to update table.
349#[derive(Debug, TypedBuilder)]
350#[builder(build_method(vis = "pub(crate)"))]
351pub struct TableCommit {
352    /// The table ident.
353    ident: TableIdent,
354    /// The requirements of the table.
355    ///
356    /// Commit will fail if the requirements are not met.
357    requirements: Vec<TableRequirement>,
358    /// The updates of the table.
359    updates: Vec<TableUpdate>,
360}
361
362impl TableCommit {
363    /// Return the table identifier.
364    pub fn identifier(&self) -> &TableIdent {
365        &self.ident
366    }
367
368    /// Take all requirements.
369    pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
370        take(&mut self.requirements)
371    }
372
373    /// Take all updates.
374    pub fn take_updates(&mut self) -> Vec<TableUpdate> {
375        take(&mut self.updates)
376    }
377
378    /// Applies this [`TableCommit`] to the given [`Table`] as part of a catalog update.
379    /// Typically used by [`Catalog::update_table`] to validate requirements and apply metadata updates.
380    ///
381    /// Returns a new [`Table`] with updated metadata,
382    /// or an error if validation or application fails.
383    pub fn apply(self, table: Table) -> Result<Table> {
384        // check requirements
385        for requirement in self.requirements {
386            requirement.check(Some(table.metadata()))?;
387        }
388
389        // get current metadata location
390        let current_metadata_location = table.metadata_location_result()?;
391
392        // apply updates to metadata builder
393        let mut metadata_builder = table
394            .metadata()
395            .clone()
396            .into_builder(Some(current_metadata_location.to_string()));
397        for update in self.updates {
398            metadata_builder = update.apply(metadata_builder)?;
399        }
400
401        // Build the new metadata
402        let new_metadata = metadata_builder.build()?.metadata;
403
404        let new_metadata_location = MetadataLocation::from_str(current_metadata_location)?
405            .with_next_version()
406            .with_new_metadata(&new_metadata)
407            .to_string();
408
409        Ok(table
410            .with_metadata(Arc::new(new_metadata))
411            .with_metadata_location(new_metadata_location))
412    }
413}
414
415/// TableRequirement represents a requirement for a table in the catalog.
416#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
417#[serde(tag = "type")]
418pub enum TableRequirement {
419    /// The table must not already exist; used for create transactions
420    #[serde(rename = "assert-create")]
421    NotExist,
422    /// The table UUID must match the requirement.
423    #[serde(rename = "assert-table-uuid")]
424    UuidMatch {
425        /// Uuid of original table.
426        uuid: Uuid,
427    },
428    /// The table branch or tag identified by the requirement's `reference` must
429    /// reference the requirement's `snapshot-id`.
430    #[serde(rename = "assert-ref-snapshot-id")]
431    RefSnapshotIdMatch {
432        /// The reference of the table to assert.
433        r#ref: String,
434        /// The snapshot id of the table to assert.
435        /// If the id is `None`, the ref must not already exist.
436        #[serde(rename = "snapshot-id")]
437        snapshot_id: Option<i64>,
438    },
439    /// The table's last assigned column id must match the requirement.
440    #[serde(rename = "assert-last-assigned-field-id")]
441    LastAssignedFieldIdMatch {
442        /// The last assigned field id of the table to assert.
443        #[serde(rename = "last-assigned-field-id")]
444        last_assigned_field_id: i32,
445    },
446    /// The table's current schema id must match the requirement.
447    #[serde(rename = "assert-current-schema-id")]
448    CurrentSchemaIdMatch {
449        /// Current schema id of the table to assert.
450        #[serde(rename = "current-schema-id")]
451        current_schema_id: SchemaId,
452    },
453    /// The table's last assigned partition id must match the
454    /// requirement.
455    #[serde(rename = "assert-last-assigned-partition-id")]
456    LastAssignedPartitionIdMatch {
457        /// Last assigned partition id of the table to assert.
458        #[serde(rename = "last-assigned-partition-id")]
459        last_assigned_partition_id: i32,
460    },
461    /// The table's default spec id must match the requirement.
462    #[serde(rename = "assert-default-spec-id")]
463    DefaultSpecIdMatch {
464        /// Default spec id of the table to assert.
465        #[serde(rename = "default-spec-id")]
466        default_spec_id: i32,
467    },
468    /// The table's default sort order id must match the requirement.
469    #[serde(rename = "assert-default-sort-order-id")]
470    DefaultSortOrderIdMatch {
471        /// Default sort order id of the table to assert.
472        #[serde(rename = "default-sort-order-id")]
473        default_sort_order_id: i64,
474    },
475}
476
477/// TableUpdate represents an update to a table in the catalog.
478#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
479#[serde(tag = "action", rename_all = "kebab-case")]
480#[allow(clippy::large_enum_variant)]
481pub enum TableUpdate {
482    /// Upgrade table's format version
483    #[serde(rename_all = "kebab-case")]
484    UpgradeFormatVersion {
485        /// Target format upgrade to.
486        format_version: FormatVersion,
487    },
488    /// Assign a new UUID to the table
489    #[serde(rename_all = "kebab-case")]
490    AssignUuid {
491        /// The new UUID to assign.
492        uuid: Uuid,
493    },
494    /// Add a new schema to the table
495    #[serde(rename_all = "kebab-case")]
496    AddSchema {
497        /// The schema to add.
498        schema: Schema,
499    },
500    /// Set table's current schema
501    #[serde(rename_all = "kebab-case")]
502    SetCurrentSchema {
503        /// Schema ID to set as current, or -1 to set last added schema
504        schema_id: i32,
505    },
506    /// Add a new partition spec to the table
507    AddSpec {
508        /// The partition spec to add.
509        spec: UnboundPartitionSpec,
510    },
511    /// Set table's default spec
512    #[serde(rename_all = "kebab-case")]
513    SetDefaultSpec {
514        /// Partition spec ID to set as the default, or -1 to set last added spec
515        spec_id: i32,
516    },
517    /// Add sort order to table.
518    #[serde(rename_all = "kebab-case")]
519    AddSortOrder {
520        /// Sort order to add.
521        sort_order: SortOrder,
522    },
523    /// Set table's default sort order
524    #[serde(rename_all = "kebab-case")]
525    SetDefaultSortOrder {
526        /// Sort order ID to set as the default, or -1 to set last added sort order
527        sort_order_id: i64,
528    },
529    /// Add snapshot to table.
530    #[serde(rename_all = "kebab-case")]
531    AddSnapshot {
532        /// Snapshot to add.
533        #[serde(
534            deserialize_with = "deserialize_snapshot",
535            serialize_with = "serialize_snapshot"
536        )]
537        snapshot: Snapshot,
538    },
539    /// Set table's snapshot ref.
540    #[serde(rename_all = "kebab-case")]
541    SetSnapshotRef {
542        /// Name of snapshot reference to set.
543        ref_name: String,
544        /// Snapshot reference to set.
545        #[serde(flatten)]
546        reference: SnapshotReference,
547    },
548    /// Remove table's snapshots
549    #[serde(rename_all = "kebab-case")]
550    RemoveSnapshots {
551        /// Snapshot ids to remove.
552        snapshot_ids: Vec<i64>,
553    },
554    /// Remove snapshot reference
555    #[serde(rename_all = "kebab-case")]
556    RemoveSnapshotRef {
557        /// Name of snapshot reference to remove.
558        ref_name: String,
559    },
560    /// Update table's location
561    SetLocation {
562        /// New location for table.
563        location: String,
564    },
565    /// Update table's properties
566    SetProperties {
567        /// Properties to update for table.
568        updates: HashMap<String, String>,
569    },
570    /// Remove table's properties
571    RemoveProperties {
572        /// Properties to remove
573        removals: Vec<String>,
574    },
575    /// Remove partition specs
576    #[serde(rename_all = "kebab-case")]
577    RemovePartitionSpecs {
578        /// Partition spec ids to remove.
579        spec_ids: Vec<i32>,
580    },
581    /// Set statistics for a snapshot
582    #[serde(with = "_serde_set_statistics")]
583    SetStatistics {
584        /// File containing the statistics
585        statistics: StatisticsFile,
586    },
587    /// Remove statistics for a snapshot
588    #[serde(rename_all = "kebab-case")]
589    RemoveStatistics {
590        /// Snapshot id to remove statistics for.
591        snapshot_id: i64,
592    },
593    /// Set partition statistics for a snapshot
594    #[serde(rename_all = "kebab-case")]
595    SetPartitionStatistics {
596        /// File containing the partition statistics
597        partition_statistics: PartitionStatisticsFile,
598    },
599    /// Remove partition statistics for a snapshot
600    #[serde(rename_all = "kebab-case")]
601    RemovePartitionStatistics {
602        /// Snapshot id to remove partition statistics for.
603        snapshot_id: i64,
604    },
605    /// Remove schemas
606    #[serde(rename_all = "kebab-case")]
607    RemoveSchemas {
608        /// Schema IDs to remove.
609        schema_ids: Vec<i32>,
610    },
611    /// Add an encryption key
612    #[serde(rename_all = "kebab-case")]
613    AddEncryptionKey {
614        /// The encryption key to add.
615        encryption_key: EncryptedKey,
616    },
617    /// Remove an encryption key
618    #[serde(rename_all = "kebab-case")]
619    RemoveEncryptionKey {
620        /// The id of the encryption key to remove.
621        key_id: String,
622    },
623}
624
625impl TableUpdate {
626    /// Applies the update to the table metadata builder.
627    pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
628        match self {
629            TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
630            TableUpdate::AddSchema { schema, .. } => Ok(builder.add_schema(schema)?),
631            TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id),
632            TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
633            TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id),
634            TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order),
635            TableUpdate::SetDefaultSortOrder { sort_order_id } => {
636                builder.set_default_sort_order(sort_order_id)
637            }
638            TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot),
639            TableUpdate::SetSnapshotRef {
640                ref_name,
641                reference,
642            } => builder.set_ref(&ref_name, reference),
643            TableUpdate::RemoveSnapshots { snapshot_ids } => {
644                Ok(builder.remove_snapshots(&snapshot_ids))
645            }
646            TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)),
647            TableUpdate::SetLocation { location } => Ok(builder.set_location(location)),
648            TableUpdate::SetProperties { updates } => builder.set_properties(updates),
649            TableUpdate::RemoveProperties { removals } => builder.remove_properties(&removals),
650            TableUpdate::UpgradeFormatVersion { format_version } => {
651                builder.upgrade_format_version(format_version)
652            }
653            TableUpdate::RemovePartitionSpecs { spec_ids } => {
654                builder.remove_partition_specs(&spec_ids)
655            }
656            TableUpdate::SetStatistics { statistics } => Ok(builder.set_statistics(statistics)),
657            TableUpdate::RemoveStatistics { snapshot_id } => {
658                Ok(builder.remove_statistics(snapshot_id))
659            }
660            TableUpdate::SetPartitionStatistics {
661                partition_statistics,
662            } => Ok(builder.set_partition_statistics(partition_statistics)),
663            TableUpdate::RemovePartitionStatistics { snapshot_id } => {
664                Ok(builder.remove_partition_statistics(snapshot_id))
665            }
666            TableUpdate::RemoveSchemas { schema_ids } => builder.remove_schemas(&schema_ids),
667            TableUpdate::AddEncryptionKey { encryption_key } => {
668                Ok(builder.add_encryption_key(encryption_key))
669            }
670            TableUpdate::RemoveEncryptionKey { key_id } => {
671                Ok(builder.remove_encryption_key(&key_id))
672            }
673        }
674    }
675}
676
677impl TableRequirement {
678    /// Check that the requirement is met by the table metadata.
679    /// If the requirement is not met, an appropriate error is returned.
680    ///
681    /// Provide metadata as `None` if the table does not exist.
682    pub fn check(&self, metadata: Option<&TableMetadata>) -> Result<()> {
683        if let Some(metadata) = metadata {
684            match self {
685                TableRequirement::NotExist => {
686                    return Err(Error::new(
687                        ErrorKind::CatalogCommitConflicts,
688                        format!(
689                            "Requirement failed: Table with id {} already exists",
690                            metadata.uuid()
691                        ),
692                    )
693                    .with_retryable(true));
694                }
695                TableRequirement::UuidMatch { uuid } => {
696                    if &metadata.uuid() != uuid {
697                        return Err(Error::new(
698                            ErrorKind::CatalogCommitConflicts,
699                            "Requirement failed: Table UUID does not match",
700                        )
701                        .with_context("expected", *uuid)
702                        .with_context("found", metadata.uuid())
703                        .with_retryable(true));
704                    }
705                }
706                TableRequirement::CurrentSchemaIdMatch { current_schema_id } => {
707                    // ToDo: Harmonize the types of current_schema_id
708                    if metadata.current_schema_id != *current_schema_id {
709                        return Err(Error::new(
710                            ErrorKind::CatalogCommitConflicts,
711                            "Requirement failed: Current schema id does not match",
712                        )
713                        .with_context("expected", current_schema_id.to_string())
714                        .with_context("found", metadata.current_schema_id.to_string())
715                        .with_retryable(true));
716                    }
717                }
718                TableRequirement::DefaultSortOrderIdMatch {
719                    default_sort_order_id,
720                } => {
721                    if metadata.default_sort_order().order_id != *default_sort_order_id {
722                        return Err(Error::new(
723                            ErrorKind::CatalogCommitConflicts,
724                            "Requirement failed: Default sort order id does not match",
725                        )
726                        .with_context("expected", default_sort_order_id.to_string())
727                        .with_context("found", metadata.default_sort_order().order_id.to_string())
728                        .with_retryable(true));
729                    }
730                }
731                TableRequirement::RefSnapshotIdMatch { r#ref, snapshot_id } => {
732                    let snapshot_ref = metadata.snapshot_for_ref(r#ref);
733                    if let Some(snapshot_id) = snapshot_id {
734                        let snapshot_ref = snapshot_ref.ok_or(
735                            Error::new(
736                                ErrorKind::CatalogCommitConflicts,
737                                format!("Requirement failed: Branch or tag `{ref}` not found"),
738                            )
739                            .with_retryable(true),
740                        )?;
741                        if snapshot_ref.snapshot_id() != *snapshot_id {
742                            return Err(Error::new(
743                                ErrorKind::CatalogCommitConflicts,
744                                format!(
745                                    "Requirement failed: Branch or tag `{ref}`'s snapshot has changed"
746                                ),
747                            )
748                            .with_context("expected", snapshot_id.to_string())
749                            .with_context("found", snapshot_ref.snapshot_id().to_string())
750                            .with_retryable(true));
751                        }
752                    } else if snapshot_ref.is_some() {
753                        // a null snapshot ID means the ref should not exist already
754                        return Err(Error::new(
755                            ErrorKind::CatalogCommitConflicts,
756                            format!("Requirement failed: Branch or tag `{ref}` already exists"),
757                        )
758                        .with_retryable(true));
759                    }
760                }
761                TableRequirement::DefaultSpecIdMatch { default_spec_id } => {
762                    // ToDo: Harmonize the types of default_spec_id
763                    if metadata.default_partition_spec_id() != *default_spec_id {
764                        return Err(Error::new(
765                            ErrorKind::CatalogCommitConflicts,
766                            "Requirement failed: Default partition spec id does not match",
767                        )
768                        .with_context("expected", default_spec_id.to_string())
769                        .with_context("found", metadata.default_partition_spec_id().to_string())
770                        .with_retryable(true));
771                    }
772                }
773                TableRequirement::LastAssignedPartitionIdMatch {
774                    last_assigned_partition_id,
775                } => {
776                    if metadata.last_partition_id != *last_assigned_partition_id {
777                        return Err(Error::new(
778                            ErrorKind::CatalogCommitConflicts,
779                            "Requirement failed: Last assigned partition id does not match",
780                        )
781                        .with_context("expected", last_assigned_partition_id.to_string())
782                        .with_context("found", metadata.last_partition_id.to_string())
783                        .with_retryable(true));
784                    }
785                }
786                TableRequirement::LastAssignedFieldIdMatch {
787                    last_assigned_field_id,
788                } => {
789                    if &metadata.last_column_id != last_assigned_field_id {
790                        return Err(Error::new(
791                            ErrorKind::CatalogCommitConflicts,
792                            "Requirement failed: Last assigned field id does not match",
793                        )
794                        .with_context("expected", last_assigned_field_id.to_string())
795                        .with_context("found", metadata.last_column_id.to_string())
796                        .with_retryable(true));
797                    }
798                }
799            };
800        } else {
801            match self {
802                TableRequirement::NotExist => {}
803                _ => {
804                    return Err(Error::new(
805                        ErrorKind::TableNotFound,
806                        "Requirement failed: Table does not exist",
807                    ));
808                }
809            }
810        }
811
812        Ok(())
813    }
814}
815
816pub(super) mod _serde {
817    use serde::{Deserialize as _, Deserializer, Serialize as _};
818
819    use super::*;
820    use crate::spec::{SchemaId, Summary};
821
822    pub(super) fn deserialize_snapshot<'de, D>(
823        deserializer: D,
824    ) -> std::result::Result<Snapshot, D::Error>
825    where D: Deserializer<'de> {
826        let buf = CatalogSnapshot::deserialize(deserializer)?;
827        Ok(buf.into())
828    }
829
830    pub(super) fn serialize_snapshot<S>(
831        snapshot: &Snapshot,
832        serializer: S,
833    ) -> std::result::Result<S::Ok, S::Error>
834    where
835        S: serde::Serializer,
836    {
837        let buf: CatalogSnapshot = snapshot.clone().into();
838        buf.serialize(serializer)
839    }
840
841    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
842    #[serde(rename_all = "kebab-case")]
843    /// Defines the structure of a v2 snapshot for the catalog.
844    /// Main difference to SnapshotV2 is that sequence-number is optional
845    /// in the rest catalog spec to allow for backwards compatibility with v1.
846    struct CatalogSnapshot {
847        snapshot_id: i64,
848        #[serde(skip_serializing_if = "Option::is_none")]
849        parent_snapshot_id: Option<i64>,
850        #[serde(default)]
851        sequence_number: i64,
852        timestamp_ms: i64,
853        manifest_list: String,
854        summary: Summary,
855        #[serde(skip_serializing_if = "Option::is_none")]
856        schema_id: Option<SchemaId>,
857        #[serde(skip_serializing_if = "Option::is_none")]
858        first_row_id: Option<u64>,
859        #[serde(skip_serializing_if = "Option::is_none")]
860        added_rows: Option<u64>,
861        #[serde(skip_serializing_if = "Option::is_none")]
862        key_id: Option<String>,
863    }
864
865    impl From<CatalogSnapshot> for Snapshot {
866        fn from(snapshot: CatalogSnapshot) -> Self {
867            let CatalogSnapshot {
868                snapshot_id,
869                parent_snapshot_id,
870                sequence_number,
871                timestamp_ms,
872                manifest_list,
873                schema_id,
874                summary,
875                first_row_id,
876                added_rows,
877                key_id,
878            } = snapshot;
879            let builder = Snapshot::builder()
880                .with_snapshot_id(snapshot_id)
881                .with_parent_snapshot_id(parent_snapshot_id)
882                .with_sequence_number(sequence_number)
883                .with_timestamp_ms(timestamp_ms)
884                .with_manifest_list(manifest_list)
885                .with_summary(summary)
886                .with_encryption_key_id(key_id);
887            let row_range = first_row_id.zip(added_rows);
888            match (schema_id, row_range) {
889                (None, None) => builder.build(),
890                (Some(schema_id), None) => builder.with_schema_id(schema_id).build(),
891                (None, Some((first_row_id, last_row_id))) => {
892                    builder.with_row_range(first_row_id, last_row_id).build()
893                }
894                (Some(schema_id), Some((first_row_id, last_row_id))) => builder
895                    .with_schema_id(schema_id)
896                    .with_row_range(first_row_id, last_row_id)
897                    .build(),
898            }
899        }
900    }
901
902    impl From<Snapshot> for CatalogSnapshot {
903        fn from(snapshot: Snapshot) -> Self {
904            let first_row_id = snapshot.first_row_id();
905            let added_rows = snapshot.added_rows_count();
906            let Snapshot {
907                snapshot_id,
908                parent_snapshot_id,
909                sequence_number,
910                timestamp_ms,
911                manifest_list,
912                summary,
913                schema_id,
914                row_range: _,
915                encryption_key_id: key_id,
916            } = snapshot;
917            CatalogSnapshot {
918                snapshot_id,
919                parent_snapshot_id,
920                sequence_number,
921                timestamp_ms,
922                manifest_list,
923                summary,
924                schema_id,
925                first_row_id,
926                added_rows,
927                key_id,
928            }
929        }
930    }
931}
932
933/// ViewCreation represents the creation of a view in the catalog.
934#[derive(Debug, TypedBuilder)]
935pub struct ViewCreation {
936    /// The name of the view.
937    pub name: String,
938    /// The view's base location; used to create metadata file locations
939    pub location: String,
940    /// Representations for the view.
941    pub representations: ViewRepresentations,
942    /// The schema of the view.
943    pub schema: Schema,
944    /// The properties of the view.
945    #[builder(default)]
946    pub properties: HashMap<String, String>,
947    /// The default namespace to use when a reference in the SELECT is a single identifier
948    pub default_namespace: NamespaceIdent,
949    /// Default catalog to use when a reference in the SELECT does not contain a catalog
950    #[builder(default)]
951    pub default_catalog: Option<String>,
952    /// A string to string map of summary metadata about the version
953    /// Typical keys are "engine-name" and "engine-version"
954    #[builder(default)]
955    pub summary: HashMap<String, String>,
956}
957
958/// ViewUpdate represents an update to a view in the catalog.
959#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
960#[serde(tag = "action", rename_all = "kebab-case")]
961#[allow(clippy::large_enum_variant)]
962pub enum ViewUpdate {
963    /// Assign a new UUID to the view
964    #[serde(rename_all = "kebab-case")]
965    AssignUuid {
966        /// The new UUID to assign.
967        uuid: Uuid,
968    },
969    /// Upgrade view's format version
970    #[serde(rename_all = "kebab-case")]
971    UpgradeFormatVersion {
972        /// Target format upgrade to.
973        format_version: ViewFormatVersion,
974    },
975    /// Add a new schema to the view
976    #[serde(rename_all = "kebab-case")]
977    AddSchema {
978        /// The schema to add.
979        schema: Schema,
980        /// The last column id of the view.
981        last_column_id: Option<i32>,
982    },
983    /// Set view's current schema
984    #[serde(rename_all = "kebab-case")]
985    SetLocation {
986        /// New location for view.
987        location: String,
988    },
989    /// Set view's properties
990    ///
991    /// Matching keys are updated, and non-matching keys are left unchanged.
992    #[serde(rename_all = "kebab-case")]
993    SetProperties {
994        /// Properties to update for view.
995        updates: HashMap<String, String>,
996    },
997    /// Remove view's properties
998    #[serde(rename_all = "kebab-case")]
999    RemoveProperties {
1000        /// Properties to remove
1001        removals: Vec<String>,
1002    },
1003    /// Add a new version to the view
1004    #[serde(rename_all = "kebab-case")]
1005    AddViewVersion {
1006        /// The view version to add.
1007        view_version: ViewVersion,
1008    },
1009    /// Set view's current version
1010    #[serde(rename_all = "kebab-case")]
1011    SetCurrentViewVersion {
1012        /// View version id to set as current, or -1 to set last added version
1013        view_version_id: i32,
1014    },
1015}
1016
1017mod _serde_set_statistics {
1018    // The rest spec requires an additional field `snapshot-id`
1019    // that is redundant with the `snapshot_id` field in the statistics file.
1020    use serde::{Deserialize, Deserializer, Serialize, Serializer};
1021
1022    use super::*;
1023
1024    #[derive(Debug, Serialize, Deserialize)]
1025    #[serde(rename_all = "kebab-case")]
1026    struct SetStatistics {
1027        snapshot_id: Option<i64>,
1028        statistics: StatisticsFile,
1029    }
1030
1031    pub fn serialize<S>(
1032        value: &StatisticsFile,
1033        serializer: S,
1034    ) -> std::result::Result<S::Ok, S::Error>
1035    where
1036        S: Serializer,
1037    {
1038        SetStatistics {
1039            snapshot_id: Some(value.snapshot_id),
1040            statistics: value.clone(),
1041        }
1042        .serialize(serializer)
1043    }
1044
1045    pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<StatisticsFile, D::Error>
1046    where D: Deserializer<'de> {
1047        let SetStatistics {
1048            snapshot_id,
1049            statistics,
1050        } = SetStatistics::deserialize(deserializer)?;
1051        if let Some(snapshot_id) = snapshot_id
1052            && snapshot_id != statistics.snapshot_id
1053        {
1054            return Err(serde::de::Error::custom(format!(
1055                "Snapshot id to set {snapshot_id} does not match the statistics file snapshot id {}",
1056                statistics.snapshot_id
1057            )));
1058        }
1059
1060        Ok(statistics)
1061    }
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066    use std::collections::HashMap;
1067    use std::fmt::Debug;
1068    use std::fs::File;
1069    use std::io::BufReader;
1070
1071    use base64::Engine as _;
1072    use serde::Serialize;
1073    use serde::de::DeserializeOwned;
1074    use uuid::uuid;
1075
1076    use super::ViewUpdate;
1077    use crate::io::FileIO;
1078    use crate::spec::{
1079        BlobMetadata, EncryptedKey, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
1080        PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, SnapshotReference,
1081        SnapshotRetention, SortDirection, SortField, SortOrder, SqlViewRepresentation,
1082        StatisticsFile, Summary, TableMetadata, TableMetadataBuilder, Transform, Type,
1083        UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations,
1084        ViewVersion,
1085    };
1086    use crate::table::Table;
1087    use crate::test_utils::test_runtime;
1088    use crate::{
1089        NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, TableUpdate,
1090    };
1091
1092    #[test]
1093    fn test_parent_namespace() {
1094        let ns1 = NamespaceIdent::from_strs(vec!["ns1"]).unwrap();
1095        let ns2 = NamespaceIdent::from_strs(vec!["ns1", "ns2"]).unwrap();
1096        let ns3 = NamespaceIdent::from_strs(vec!["ns1", "ns2", "ns3"]).unwrap();
1097
1098        assert_eq!(ns1.parent(), None);
1099        assert_eq!(ns2.parent(), Some(ns1.clone()));
1100        assert_eq!(ns3.parent(), Some(ns2.clone()));
1101    }
1102
1103    #[test]
1104    fn test_create_table_id() {
1105        let table_id = TableIdent {
1106            namespace: NamespaceIdent::from_strs(vec!["ns1"]).unwrap(),
1107            name: "t1".to_string(),
1108        };
1109
1110        assert_eq!(table_id, TableIdent::from_strs(vec!["ns1", "t1"]).unwrap());
1111    }
1112
1113    #[test]
1114    fn test_table_creation_iterator_properties() {
1115        let builder = TableCreation::builder()
1116            .name("table".to_string())
1117            .schema(Schema::builder().build().unwrap());
1118
1119        fn s(k: &str, v: &str) -> (String, String) {
1120            (k.to_string(), v.to_string())
1121        }
1122
1123        let table_creation = builder
1124            .properties([s("key", "value"), s("foo", "bar")])
1125            .build();
1126
1127        assert_eq!(
1128            HashMap::from([s("key", "value"), s("foo", "bar")]),
1129            table_creation.properties
1130        );
1131    }
1132
1133    fn test_serde_json<T: Serialize + DeserializeOwned + PartialEq + Debug>(
1134        json: impl ToString,
1135        expected: T,
1136    ) {
1137        let json_str = json.to_string();
1138        let actual: T = serde_json::from_str(&json_str).expect("Failed to parse from json");
1139        assert_eq!(actual, expected, "Parsed value is not equal to expected");
1140
1141        let restored: T = serde_json::from_str(
1142            &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1143        )
1144        .expect("Failed to parse from serialized json");
1145
1146        assert_eq!(
1147            restored, expected,
1148            "Parsed restored value is not equal to expected"
1149        );
1150    }
1151
1152    fn metadata() -> TableMetadata {
1153        let tbl_creation = TableCreation::builder()
1154            .name("table".to_string())
1155            .location("/path/to/table".to_string())
1156            .schema(Schema::builder().build().unwrap())
1157            .build();
1158
1159        TableMetadataBuilder::from_table_creation(tbl_creation)
1160            .unwrap()
1161            .assign_uuid(uuid::Uuid::nil())
1162            .build()
1163            .unwrap()
1164            .metadata
1165    }
1166
1167    #[test]
1168    fn test_check_requirement_not_exist() {
1169        let metadata = metadata();
1170        let requirement = TableRequirement::NotExist;
1171
1172        assert!(requirement.check(Some(&metadata)).is_err());
1173        assert!(requirement.check(None).is_ok());
1174    }
1175
1176    #[test]
1177    fn test_check_table_uuid() {
1178        let metadata = metadata();
1179
1180        let requirement = TableRequirement::UuidMatch {
1181            uuid: uuid::Uuid::now_v7(),
1182        };
1183        assert!(requirement.check(Some(&metadata)).is_err());
1184
1185        let requirement = TableRequirement::UuidMatch {
1186            uuid: uuid::Uuid::nil(),
1187        };
1188        assert!(requirement.check(Some(&metadata)).is_ok());
1189    }
1190
1191    #[test]
1192    fn test_check_ref_snapshot_id() {
1193        let metadata = metadata();
1194
1195        // Ref does not exist but should
1196        let requirement = TableRequirement::RefSnapshotIdMatch {
1197            r#ref: "my_branch".to_string(),
1198            snapshot_id: Some(1),
1199        };
1200        assert!(requirement.check(Some(&metadata)).is_err());
1201
1202        // Ref does not exist and should not
1203        let requirement = TableRequirement::RefSnapshotIdMatch {
1204            r#ref: "my_branch".to_string(),
1205            snapshot_id: None,
1206        };
1207        assert!(requirement.check(Some(&metadata)).is_ok());
1208
1209        // Add snapshot
1210        let snapshot = Snapshot::builder()
1211            .with_snapshot_id(3051729675574597004)
1212            .with_sequence_number(10)
1213            .with_timestamp_ms(9992191116217)
1214            .with_manifest_list("s3://b/wh/.../s1.avro".to_string())
1215            .with_schema_id(0)
1216            .with_summary(Summary {
1217                operation: Operation::Append,
1218                additional_properties: HashMap::new(),
1219            })
1220            .build();
1221
1222        let builder = metadata.into_builder(None);
1223        let builder = TableUpdate::AddSnapshot {
1224            snapshot: snapshot.clone(),
1225        }
1226        .apply(builder)
1227        .unwrap();
1228        let metadata = TableUpdate::SetSnapshotRef {
1229            ref_name: MAIN_BRANCH.to_string(),
1230            reference: SnapshotReference {
1231                snapshot_id: snapshot.snapshot_id(),
1232                retention: SnapshotRetention::Branch {
1233                    min_snapshots_to_keep: Some(10),
1234                    max_snapshot_age_ms: None,
1235                    max_ref_age_ms: None,
1236                },
1237            },
1238        }
1239        .apply(builder)
1240        .unwrap()
1241        .build()
1242        .unwrap()
1243        .metadata;
1244
1245        // Ref exists and should match
1246        let requirement = TableRequirement::RefSnapshotIdMatch {
1247            r#ref: "main".to_string(),
1248            snapshot_id: Some(3051729675574597004),
1249        };
1250        assert!(requirement.check(Some(&metadata)).is_ok());
1251
1252        // Ref exists but does not match
1253        let requirement = TableRequirement::RefSnapshotIdMatch {
1254            r#ref: "main".to_string(),
1255            snapshot_id: Some(1),
1256        };
1257        assert!(requirement.check(Some(&metadata)).is_err());
1258    }
1259
1260    #[test]
1261    fn test_check_last_assigned_field_id() {
1262        let metadata = metadata();
1263
1264        let requirement = TableRequirement::LastAssignedFieldIdMatch {
1265            last_assigned_field_id: 1,
1266        };
1267        assert!(requirement.check(Some(&metadata)).is_err());
1268
1269        let requirement = TableRequirement::LastAssignedFieldIdMatch {
1270            last_assigned_field_id: 0,
1271        };
1272        assert!(requirement.check(Some(&metadata)).is_ok());
1273    }
1274
1275    #[test]
1276    fn test_check_current_schema_id() {
1277        let metadata = metadata();
1278
1279        let requirement = TableRequirement::CurrentSchemaIdMatch {
1280            current_schema_id: 1,
1281        };
1282        assert!(requirement.check(Some(&metadata)).is_err());
1283
1284        let requirement = TableRequirement::CurrentSchemaIdMatch {
1285            current_schema_id: 0,
1286        };
1287        assert!(requirement.check(Some(&metadata)).is_ok());
1288    }
1289
1290    #[test]
1291    fn test_check_last_assigned_partition_id() {
1292        let metadata = metadata();
1293        let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1294            last_assigned_partition_id: 0,
1295        };
1296        assert!(requirement.check(Some(&metadata)).is_err());
1297
1298        let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1299            last_assigned_partition_id: 999,
1300        };
1301        assert!(requirement.check(Some(&metadata)).is_ok());
1302    }
1303
1304    #[test]
1305    fn test_check_default_spec_id() {
1306        let metadata = metadata();
1307
1308        let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 1 };
1309        assert!(requirement.check(Some(&metadata)).is_err());
1310
1311        let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 0 };
1312        assert!(requirement.check(Some(&metadata)).is_ok());
1313    }
1314
1315    #[test]
1316    fn test_check_default_sort_order_id() {
1317        let metadata = metadata();
1318
1319        let requirement = TableRequirement::DefaultSortOrderIdMatch {
1320            default_sort_order_id: 1,
1321        };
1322        assert!(requirement.check(Some(&metadata)).is_err());
1323
1324        let requirement = TableRequirement::DefaultSortOrderIdMatch {
1325            default_sort_order_id: 0,
1326        };
1327        assert!(requirement.check(Some(&metadata)).is_ok());
1328    }
1329
1330    #[test]
1331    fn test_table_uuid() {
1332        test_serde_json(
1333            r#"
1334{
1335    "type": "assert-table-uuid",
1336    "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1337}
1338        "#,
1339            TableRequirement::UuidMatch {
1340                uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1341            },
1342        );
1343    }
1344
1345    #[test]
1346    fn test_assert_table_not_exists() {
1347        test_serde_json(
1348            r#"
1349{
1350    "type": "assert-create"
1351}
1352        "#,
1353            TableRequirement::NotExist,
1354        );
1355    }
1356
1357    #[test]
1358    fn test_assert_ref_snapshot_id() {
1359        test_serde_json(
1360            r#"
1361{
1362    "type": "assert-ref-snapshot-id",
1363    "ref": "snapshot-name",
1364    "snapshot-id": null
1365}
1366        "#,
1367            TableRequirement::RefSnapshotIdMatch {
1368                r#ref: "snapshot-name".to_string(),
1369                snapshot_id: None,
1370            },
1371        );
1372
1373        test_serde_json(
1374            r#"
1375{
1376    "type": "assert-ref-snapshot-id",
1377    "ref": "snapshot-name",
1378    "snapshot-id": 1
1379}
1380        "#,
1381            TableRequirement::RefSnapshotIdMatch {
1382                r#ref: "snapshot-name".to_string(),
1383                snapshot_id: Some(1),
1384            },
1385        );
1386    }
1387
1388    #[test]
1389    fn test_assert_last_assigned_field_id() {
1390        test_serde_json(
1391            r#"
1392{
1393    "type": "assert-last-assigned-field-id",
1394    "last-assigned-field-id": 12
1395}
1396        "#,
1397            TableRequirement::LastAssignedFieldIdMatch {
1398                last_assigned_field_id: 12,
1399            },
1400        );
1401    }
1402
1403    #[test]
1404    fn test_assert_current_schema_id() {
1405        test_serde_json(
1406            r#"
1407{
1408    "type": "assert-current-schema-id",
1409    "current-schema-id": 4
1410}
1411        "#,
1412            TableRequirement::CurrentSchemaIdMatch {
1413                current_schema_id: 4,
1414            },
1415        );
1416    }
1417
1418    #[test]
1419    fn test_assert_last_assigned_partition_id() {
1420        test_serde_json(
1421            r#"
1422{
1423    "type": "assert-last-assigned-partition-id",
1424    "last-assigned-partition-id": 1004
1425}
1426        "#,
1427            TableRequirement::LastAssignedPartitionIdMatch {
1428                last_assigned_partition_id: 1004,
1429            },
1430        );
1431    }
1432
1433    #[test]
1434    fn test_assert_default_spec_id() {
1435        test_serde_json(
1436            r#"
1437{
1438    "type": "assert-default-spec-id",
1439    "default-spec-id": 5
1440}
1441        "#,
1442            TableRequirement::DefaultSpecIdMatch { default_spec_id: 5 },
1443        );
1444    }
1445
1446    #[test]
1447    fn test_assert_default_sort_order() {
1448        let json = r#"
1449{
1450    "type": "assert-default-sort-order-id",
1451    "default-sort-order-id": 10
1452}
1453        "#;
1454
1455        let update = TableRequirement::DefaultSortOrderIdMatch {
1456            default_sort_order_id: 10,
1457        };
1458
1459        test_serde_json(json, update);
1460    }
1461
1462    #[test]
1463    fn test_parse_assert_invalid() {
1464        assert!(
1465            serde_json::from_str::<TableRequirement>(
1466                r#"
1467{
1468    "default-sort-order-id": 10
1469}
1470"#
1471            )
1472            .is_err(),
1473            "Table requirements should not be parsed without type."
1474        );
1475    }
1476
1477    #[test]
1478    fn test_assign_uuid() {
1479        test_serde_json(
1480            r#"
1481{
1482    "action": "assign-uuid",
1483    "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1484}
1485        "#,
1486            TableUpdate::AssignUuid {
1487                uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1488            },
1489        );
1490    }
1491
1492    #[test]
1493    fn test_upgrade_format_version() {
1494        test_serde_json(
1495            r#"
1496{
1497    "action": "upgrade-format-version",
1498    "format-version": 2
1499}
1500        "#,
1501            TableUpdate::UpgradeFormatVersion {
1502                format_version: FormatVersion::V2,
1503            },
1504        );
1505    }
1506
1507    #[test]
1508    fn test_add_schema() {
1509        let test_schema = Schema::builder()
1510            .with_schema_id(1)
1511            .with_identifier_field_ids(vec![2])
1512            .with_fields(vec![
1513                NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1514                NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1515                NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1516            ])
1517            .build()
1518            .unwrap();
1519        test_serde_json(
1520            r#"
1521{
1522    "action": "add-schema",
1523    "schema": {
1524        "type": "struct",
1525        "schema-id": 1,
1526        "fields": [
1527            {
1528                "id": 1,
1529                "name": "foo",
1530                "required": false,
1531                "type": "string"
1532            },
1533            {
1534                "id": 2,
1535                "name": "bar",
1536                "required": true,
1537                "type": "int"
1538            },
1539            {
1540                "id": 3,
1541                "name": "baz",
1542                "required": false,
1543                "type": "boolean"
1544            }
1545        ],
1546        "identifier-field-ids": [
1547            2
1548        ]
1549    },
1550    "last-column-id": 3
1551}
1552        "#,
1553            TableUpdate::AddSchema {
1554                schema: test_schema.clone(),
1555            },
1556        );
1557
1558        test_serde_json(
1559            r#"
1560{
1561    "action": "add-schema",
1562    "schema": {
1563        "type": "struct",
1564        "schema-id": 1,
1565        "fields": [
1566            {
1567                "id": 1,
1568                "name": "foo",
1569                "required": false,
1570                "type": "string"
1571            },
1572            {
1573                "id": 2,
1574                "name": "bar",
1575                "required": true,
1576                "type": "int"
1577            },
1578            {
1579                "id": 3,
1580                "name": "baz",
1581                "required": false,
1582                "type": "boolean"
1583            }
1584        ],
1585        "identifier-field-ids": [
1586            2
1587        ]
1588    }
1589}
1590        "#,
1591            TableUpdate::AddSchema {
1592                schema: test_schema.clone(),
1593            },
1594        );
1595    }
1596
1597    #[test]
1598    fn test_set_current_schema() {
1599        test_serde_json(
1600            r#"
1601{
1602   "action": "set-current-schema",
1603   "schema-id": 23
1604}
1605        "#,
1606            TableUpdate::SetCurrentSchema { schema_id: 23 },
1607        );
1608    }
1609
1610    #[test]
1611    fn test_add_spec() {
1612        test_serde_json(
1613            r#"
1614{
1615    "action": "add-spec",
1616    "spec": {
1617        "fields": [
1618            {
1619                "source-id": 4,
1620                "name": "ts_day",
1621                "transform": "day"
1622            },
1623            {
1624                "source-id": 1,
1625                "name": "id_bucket",
1626                "transform": "bucket[16]"
1627            },
1628            {
1629                "source-id": 2,
1630                "name": "id_truncate",
1631                "transform": "truncate[4]"
1632            }
1633        ]
1634    }
1635}
1636        "#,
1637            TableUpdate::AddSpec {
1638                spec: UnboundPartitionSpec::builder()
1639                    .add_partition_field(4, "ts_day".to_string(), Transform::Day)
1640                    .unwrap()
1641                    .add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16))
1642                    .unwrap()
1643                    .add_partition_field(2, "id_truncate".to_string(), Transform::Truncate(4))
1644                    .unwrap()
1645                    .build(),
1646            },
1647        );
1648    }
1649
1650    #[test]
1651    fn test_set_default_spec() {
1652        test_serde_json(
1653            r#"
1654{
1655    "action": "set-default-spec",
1656    "spec-id": 1
1657}
1658        "#,
1659            TableUpdate::SetDefaultSpec { spec_id: 1 },
1660        )
1661    }
1662
1663    #[test]
1664    fn test_add_sort_order() {
1665        let json = r#"
1666{
1667    "action": "add-sort-order",
1668    "sort-order": {
1669        "order-id": 1,
1670        "fields": [
1671            {
1672                "transform": "identity",
1673                "source-id": 2,
1674                "direction": "asc",
1675                "null-order": "nulls-first"
1676            },
1677            {
1678                "transform": "bucket[4]",
1679                "source-id": 3,
1680                "direction": "desc",
1681                "null-order": "nulls-last"
1682            }
1683        ]
1684    }
1685}
1686        "#;
1687
1688        let update = TableUpdate::AddSortOrder {
1689            sort_order: SortOrder::builder()
1690                .with_order_id(1)
1691                .with_sort_field(
1692                    SortField::builder()
1693                        .source_id(2)
1694                        .direction(SortDirection::Ascending)
1695                        .null_order(NullOrder::First)
1696                        .transform(Transform::Identity)
1697                        .build(),
1698                )
1699                .with_sort_field(
1700                    SortField::builder()
1701                        .source_id(3)
1702                        .direction(SortDirection::Descending)
1703                        .null_order(NullOrder::Last)
1704                        .transform(Transform::Bucket(4))
1705                        .build(),
1706                )
1707                .build_unbound()
1708                .unwrap(),
1709        };
1710
1711        test_serde_json(json, update);
1712    }
1713
1714    #[test]
1715    fn test_set_default_order() {
1716        let json = r#"
1717{
1718    "action": "set-default-sort-order",
1719    "sort-order-id": 2
1720}
1721        "#;
1722        let update = TableUpdate::SetDefaultSortOrder { sort_order_id: 2 };
1723
1724        test_serde_json(json, update);
1725    }
1726
1727    #[test]
1728    fn test_add_snapshot() {
1729        let json = r#"
1730{
1731    "action": "add-snapshot",
1732    "snapshot": {
1733        "snapshot-id": 3055729675574597000,
1734        "parent-snapshot-id": 3051729675574597000,
1735        "timestamp-ms": 1555100955770,
1736        "sequence-number": 1,
1737        "summary": {
1738            "operation": "append"
1739        },
1740        "manifest-list": "s3://a/b/2.avro",
1741        "schema-id": 1
1742    }
1743}
1744        "#;
1745
1746        let update = TableUpdate::AddSnapshot {
1747            snapshot: Snapshot::builder()
1748                .with_snapshot_id(3055729675574597000)
1749                .with_parent_snapshot_id(Some(3051729675574597000))
1750                .with_timestamp_ms(1555100955770)
1751                .with_sequence_number(1)
1752                .with_manifest_list("s3://a/b/2.avro")
1753                .with_schema_id(1)
1754                .with_summary(Summary {
1755                    operation: Operation::Append,
1756                    additional_properties: HashMap::default(),
1757                })
1758                .build(),
1759        };
1760
1761        test_serde_json(json, update);
1762    }
1763
1764    #[test]
1765    fn test_add_snapshot_v1() {
1766        let json = r#"
1767{
1768    "action": "add-snapshot",
1769    "snapshot": {
1770        "snapshot-id": 3055729675574597000,
1771        "parent-snapshot-id": 3051729675574597000,
1772        "timestamp-ms": 1555100955770,
1773        "summary": {
1774            "operation": "append"
1775        },
1776        "manifest-list": "s3://a/b/2.avro"
1777    }
1778}
1779    "#;
1780
1781        let update = TableUpdate::AddSnapshot {
1782            snapshot: Snapshot::builder()
1783                .with_snapshot_id(3055729675574597000)
1784                .with_parent_snapshot_id(Some(3051729675574597000))
1785                .with_timestamp_ms(1555100955770)
1786                .with_sequence_number(0)
1787                .with_manifest_list("s3://a/b/2.avro")
1788                .with_summary(Summary {
1789                    operation: Operation::Append,
1790                    additional_properties: HashMap::default(),
1791                })
1792                .build(),
1793        };
1794
1795        let actual: TableUpdate = serde_json::from_str(json).expect("Failed to parse from json");
1796        assert_eq!(actual, update, "Parsed value is not equal to expected");
1797    }
1798
1799    #[test]
1800    fn test_add_snapshot_v3() {
1801        let json = serde_json::json!(
1802        {
1803            "action": "add-snapshot",
1804            "snapshot": {
1805                "snapshot-id": 3055729675574597000i64,
1806                "parent-snapshot-id": 3051729675574597000i64,
1807                "timestamp-ms": 1555100955770i64,
1808                "first-row-id":0,
1809                "added-rows":2,
1810                "key-id":"key123",
1811                "summary": {
1812                    "operation": "append"
1813                },
1814                "manifest-list": "s3://a/b/2.avro"
1815            }
1816        });
1817
1818        let update = TableUpdate::AddSnapshot {
1819            snapshot: Snapshot::builder()
1820                .with_snapshot_id(3055729675574597000)
1821                .with_parent_snapshot_id(Some(3051729675574597000))
1822                .with_timestamp_ms(1555100955770)
1823                .with_sequence_number(0)
1824                .with_manifest_list("s3://a/b/2.avro")
1825                .with_row_range(0, 2)
1826                .with_encryption_key_id(Some("key123".to_string()))
1827                .with_summary(Summary {
1828                    operation: Operation::Append,
1829                    additional_properties: HashMap::default(),
1830                })
1831                .build(),
1832        };
1833
1834        let actual: TableUpdate = serde_json::from_value(json).expect("Failed to parse from json");
1835        assert_eq!(actual, update, "Parsed value is not equal to expected");
1836        let restored: TableUpdate = serde_json::from_str(
1837            &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1838        )
1839        .expect("Failed to parse from serialized json");
1840        assert_eq!(restored, update);
1841    }
1842
1843    #[test]
1844    fn test_remove_snapshots() {
1845        let json = r#"
1846{
1847    "action": "remove-snapshots",
1848    "snapshot-ids": [
1849        1,
1850        2
1851    ]
1852}
1853        "#;
1854
1855        let update = TableUpdate::RemoveSnapshots {
1856            snapshot_ids: vec![1, 2],
1857        };
1858        test_serde_json(json, update);
1859    }
1860
1861    #[test]
1862    fn test_remove_snapshot_ref() {
1863        let json = r#"
1864{
1865    "action": "remove-snapshot-ref",
1866    "ref-name": "snapshot-ref"
1867}
1868        "#;
1869
1870        let update = TableUpdate::RemoveSnapshotRef {
1871            ref_name: "snapshot-ref".to_string(),
1872        };
1873        test_serde_json(json, update);
1874    }
1875
1876    #[test]
1877    fn test_set_snapshot_ref_tag() {
1878        let json = r#"
1879{
1880    "action": "set-snapshot-ref",
1881    "type": "tag",
1882    "ref-name": "hank",
1883    "snapshot-id": 1,
1884    "max-ref-age-ms": 1
1885}
1886        "#;
1887
1888        let update = TableUpdate::SetSnapshotRef {
1889            ref_name: "hank".to_string(),
1890            reference: SnapshotReference {
1891                snapshot_id: 1,
1892                retention: SnapshotRetention::Tag {
1893                    max_ref_age_ms: Some(1),
1894                },
1895            },
1896        };
1897
1898        test_serde_json(json, update);
1899    }
1900
1901    #[test]
1902    fn test_set_snapshot_ref_branch() {
1903        let json = r#"
1904{
1905    "action": "set-snapshot-ref",
1906    "type": "branch",
1907    "ref-name": "hank",
1908    "snapshot-id": 1,
1909    "min-snapshots-to-keep": 2,
1910    "max-snapshot-age-ms": 3,
1911    "max-ref-age-ms": 4
1912}
1913        "#;
1914
1915        let update = TableUpdate::SetSnapshotRef {
1916            ref_name: "hank".to_string(),
1917            reference: SnapshotReference {
1918                snapshot_id: 1,
1919                retention: SnapshotRetention::Branch {
1920                    min_snapshots_to_keep: Some(2),
1921                    max_snapshot_age_ms: Some(3),
1922                    max_ref_age_ms: Some(4),
1923                },
1924            },
1925        };
1926
1927        test_serde_json(json, update);
1928    }
1929
1930    #[test]
1931    fn test_set_properties() {
1932        let json = r#"
1933{
1934    "action": "set-properties",
1935    "updates": {
1936        "prop1": "v1",
1937        "prop2": "v2"
1938    }
1939}
1940        "#;
1941
1942        let update = TableUpdate::SetProperties {
1943            updates: vec![
1944                ("prop1".to_string(), "v1".to_string()),
1945                ("prop2".to_string(), "v2".to_string()),
1946            ]
1947            .into_iter()
1948            .collect(),
1949        };
1950
1951        test_serde_json(json, update);
1952    }
1953
1954    #[test]
1955    fn test_remove_properties() {
1956        let json = r#"
1957{
1958    "action": "remove-properties",
1959    "removals": [
1960        "prop1",
1961        "prop2"
1962    ]
1963}
1964        "#;
1965
1966        let update = TableUpdate::RemoveProperties {
1967            removals: vec!["prop1".to_string(), "prop2".to_string()],
1968        };
1969
1970        test_serde_json(json, update);
1971    }
1972
1973    #[test]
1974    fn test_set_location() {
1975        let json = r#"
1976{
1977    "action": "set-location",
1978    "location": "s3://bucket/warehouse/tbl_location"
1979}
1980    "#;
1981
1982        let update = TableUpdate::SetLocation {
1983            location: "s3://bucket/warehouse/tbl_location".to_string(),
1984        };
1985
1986        test_serde_json(json, update);
1987    }
1988
1989    #[test]
1990    fn test_table_update_apply() {
1991        let table_creation = TableCreation::builder()
1992            .location("s3://db/table".to_string())
1993            .name("table".to_string())
1994            .properties(HashMap::new())
1995            .schema(Schema::builder().build().unwrap())
1996            .build();
1997        let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
1998            .unwrap()
1999            .build()
2000            .unwrap()
2001            .metadata;
2002        let table_metadata_builder = TableMetadataBuilder::new_from_metadata(
2003            table_metadata,
2004            Some("s3://db/table/metadata/metadata1.gz.json".to_string()),
2005        );
2006
2007        let uuid = uuid::Uuid::new_v4();
2008        let update = TableUpdate::AssignUuid { uuid };
2009        let updated_metadata = update
2010            .apply(table_metadata_builder)
2011            .unwrap()
2012            .build()
2013            .unwrap()
2014            .metadata;
2015        assert_eq!(updated_metadata.uuid(), uuid);
2016    }
2017
2018    #[test]
2019    fn test_view_assign_uuid() {
2020        test_serde_json(
2021            r#"
2022{
2023    "action": "assign-uuid",
2024    "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
2025}
2026        "#,
2027            ViewUpdate::AssignUuid {
2028                uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
2029            },
2030        );
2031    }
2032
2033    #[test]
2034    fn test_view_upgrade_format_version() {
2035        test_serde_json(
2036            r#"
2037{
2038    "action": "upgrade-format-version",
2039    "format-version": 1
2040}
2041        "#,
2042            ViewUpdate::UpgradeFormatVersion {
2043                format_version: ViewFormatVersion::V1,
2044            },
2045        );
2046    }
2047
2048    #[test]
2049    fn test_view_add_schema() {
2050        let test_schema = Schema::builder()
2051            .with_schema_id(1)
2052            .with_identifier_field_ids(vec![2])
2053            .with_fields(vec![
2054                NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
2055                NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2056                NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
2057            ])
2058            .build()
2059            .unwrap();
2060        test_serde_json(
2061            r#"
2062{
2063    "action": "add-schema",
2064    "schema": {
2065        "type": "struct",
2066        "schema-id": 1,
2067        "fields": [
2068            {
2069                "id": 1,
2070                "name": "foo",
2071                "required": false,
2072                "type": "string"
2073            },
2074            {
2075                "id": 2,
2076                "name": "bar",
2077                "required": true,
2078                "type": "int"
2079            },
2080            {
2081                "id": 3,
2082                "name": "baz",
2083                "required": false,
2084                "type": "boolean"
2085            }
2086        ],
2087        "identifier-field-ids": [
2088            2
2089        ]
2090    },
2091    "last-column-id": 3
2092}
2093        "#,
2094            ViewUpdate::AddSchema {
2095                schema: test_schema.clone(),
2096                last_column_id: Some(3),
2097            },
2098        );
2099    }
2100
2101    #[test]
2102    fn test_view_set_location() {
2103        test_serde_json(
2104            r#"
2105{
2106    "action": "set-location",
2107    "location": "s3://db/view"
2108}
2109        "#,
2110            ViewUpdate::SetLocation {
2111                location: "s3://db/view".to_string(),
2112            },
2113        );
2114    }
2115
2116    #[test]
2117    fn test_view_set_properties() {
2118        test_serde_json(
2119            r#"
2120{
2121    "action": "set-properties",
2122    "updates": {
2123        "prop1": "v1",
2124        "prop2": "v2"
2125    }
2126}
2127        "#,
2128            ViewUpdate::SetProperties {
2129                updates: vec![
2130                    ("prop1".to_string(), "v1".to_string()),
2131                    ("prop2".to_string(), "v2".to_string()),
2132                ]
2133                .into_iter()
2134                .collect(),
2135            },
2136        );
2137    }
2138
2139    #[test]
2140    fn test_view_remove_properties() {
2141        test_serde_json(
2142            r#"
2143{
2144    "action": "remove-properties",
2145    "removals": [
2146        "prop1",
2147        "prop2"
2148    ]
2149}
2150        "#,
2151            ViewUpdate::RemoveProperties {
2152                removals: vec!["prop1".to_string(), "prop2".to_string()],
2153            },
2154        );
2155    }
2156
2157    #[test]
2158    fn test_view_add_view_version() {
2159        test_serde_json(
2160            r#"
2161{
2162    "action": "add-view-version",
2163    "view-version": {
2164            "version-id" : 1,
2165            "timestamp-ms" : 1573518431292,
2166            "schema-id" : 1,
2167            "default-catalog" : "prod",
2168            "default-namespace" : [ "default" ],
2169            "summary" : {
2170              "engine-name" : "Spark"
2171            },
2172            "representations" : [ {
2173              "type" : "sql",
2174              "sql" : "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
2175              "dialect" : "spark"
2176            } ]
2177    }
2178}
2179        "#,
2180            ViewUpdate::AddViewVersion {
2181                view_version: ViewVersion::builder()
2182                    .with_version_id(1)
2183                    .with_timestamp_ms(1573518431292)
2184                    .with_schema_id(1)
2185                    .with_default_catalog(Some("prod".to_string()))
2186                    .with_default_namespace(NamespaceIdent::from_strs(vec!["default"]).unwrap())
2187                    .with_summary(
2188                        vec![("engine-name".to_string(), "Spark".to_string())]
2189                            .into_iter()
2190                            .collect(),
2191                    )
2192                    .with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(SqlViewRepresentation {
2193                        sql: "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2".to_string(),
2194                        dialect: "spark".to_string(),
2195                    })]))
2196                    .build(),
2197            },
2198        );
2199    }
2200
2201    #[test]
2202    fn test_view_set_current_view_version() {
2203        test_serde_json(
2204            r#"
2205{
2206    "action": "set-current-view-version",
2207    "view-version-id": 1
2208}
2209        "#,
2210            ViewUpdate::SetCurrentViewVersion { view_version_id: 1 },
2211        );
2212    }
2213
2214    #[test]
2215    fn test_remove_partition_specs_update() {
2216        test_serde_json(
2217            r#"
2218{
2219    "action": "remove-partition-specs",
2220    "spec-ids": [1, 2]
2221}
2222        "#,
2223            TableUpdate::RemovePartitionSpecs {
2224                spec_ids: vec![1, 2],
2225            },
2226        );
2227    }
2228
2229    #[test]
2230    fn test_set_statistics_file() {
2231        test_serde_json(
2232            r#"
2233        {
2234                "action": "set-statistics",
2235                "snapshot-id": 1940541653261589030,
2236                "statistics": {
2237                        "snapshot-id": 1940541653261589030,
2238                        "statistics-path": "s3://bucket/warehouse/stats.puffin",
2239                        "file-size-in-bytes": 124,
2240                        "file-footer-size-in-bytes": 27,
2241                        "blob-metadata": [
2242                                {
2243                                        "type": "boring-type",
2244                                        "snapshot-id": 1940541653261589030,
2245                                        "sequence-number": 2,
2246                                        "fields": [
2247                                                1
2248                                        ],
2249                                        "properties": {
2250                                                "prop-key": "prop-value"
2251                                        }
2252                                }
2253                        ]
2254                }
2255        }
2256        "#,
2257            TableUpdate::SetStatistics {
2258                statistics: StatisticsFile {
2259                    snapshot_id: 1940541653261589030,
2260                    statistics_path: "s3://bucket/warehouse/stats.puffin".to_string(),
2261                    file_size_in_bytes: 124,
2262                    file_footer_size_in_bytes: 27,
2263                    key_metadata: None,
2264                    blob_metadata: vec![BlobMetadata {
2265                        r#type: "boring-type".to_string(),
2266                        snapshot_id: 1940541653261589030,
2267                        sequence_number: 2,
2268                        fields: vec![1],
2269                        properties: vec![("prop-key".to_string(), "prop-value".to_string())]
2270                            .into_iter()
2271                            .collect(),
2272                    }],
2273                },
2274            },
2275        );
2276    }
2277
2278    #[test]
2279    fn test_remove_statistics_file() {
2280        test_serde_json(
2281            r#"
2282        {
2283                "action": "remove-statistics",
2284                "snapshot-id": 1940541653261589030
2285        }
2286        "#,
2287            TableUpdate::RemoveStatistics {
2288                snapshot_id: 1940541653261589030,
2289            },
2290        );
2291    }
2292
2293    #[test]
2294    fn test_set_partition_statistics_file() {
2295        test_serde_json(
2296            r#"
2297            {
2298                "action": "set-partition-statistics",
2299                "partition-statistics": {
2300                    "snapshot-id": 1940541653261589030,
2301                    "statistics-path": "s3://bucket/warehouse/stats1.parquet",
2302                    "file-size-in-bytes": 43
2303                }
2304            }
2305            "#,
2306            TableUpdate::SetPartitionStatistics {
2307                partition_statistics: PartitionStatisticsFile {
2308                    snapshot_id: 1940541653261589030,
2309                    statistics_path: "s3://bucket/warehouse/stats1.parquet".to_string(),
2310                    file_size_in_bytes: 43,
2311                },
2312            },
2313        )
2314    }
2315
2316    #[test]
2317    fn test_remove_partition_statistics_file() {
2318        test_serde_json(
2319            r#"
2320            {
2321                "action": "remove-partition-statistics",
2322                "snapshot-id": 1940541653261589030
2323            }
2324            "#,
2325            TableUpdate::RemovePartitionStatistics {
2326                snapshot_id: 1940541653261589030,
2327            },
2328        )
2329    }
2330
2331    #[test]
2332    fn test_remove_schema_update() {
2333        test_serde_json(
2334            r#"
2335                {
2336                    "action": "remove-schemas",
2337                    "schema-ids": [1, 2]
2338                }        
2339            "#,
2340            TableUpdate::RemoveSchemas {
2341                schema_ids: vec![1, 2],
2342            },
2343        );
2344    }
2345
2346    #[test]
2347    fn test_add_encryption_key() {
2348        let key_bytes = "key".as_bytes();
2349        let encoded_key = base64::engine::general_purpose::STANDARD.encode(key_bytes);
2350        test_serde_json(
2351            format!(
2352                r#"
2353                {{
2354                    "action": "add-encryption-key",
2355                    "encryption-key": {{
2356                        "key-id": "a",
2357                        "encrypted-key-metadata": "{encoded_key}",
2358                        "encrypted-by-id": "b"
2359                    }}
2360                }}        
2361            "#
2362            ),
2363            TableUpdate::AddEncryptionKey {
2364                encryption_key: EncryptedKey::builder()
2365                    .key_id("a")
2366                    .encrypted_key_metadata(key_bytes.to_vec())
2367                    .encrypted_by_id("b")
2368                    .build(),
2369            },
2370        );
2371    }
2372
2373    #[test]
2374    fn test_remove_encryption_key() {
2375        test_serde_json(
2376            r#"
2377                {
2378                    "action": "remove-encryption-key",
2379                    "key-id": "a"
2380                }        
2381            "#,
2382            TableUpdate::RemoveEncryptionKey {
2383                key_id: "a".to_string(),
2384            },
2385        );
2386    }
2387
2388    #[test]
2389    fn test_table_commit() {
2390        let table = {
2391            let file = File::open(format!(
2392                "{}/testdata/table_metadata/{}",
2393                env!("CARGO_MANIFEST_DIR"),
2394                "TableMetadataV2Valid.json"
2395            ))
2396            .unwrap();
2397            let reader = BufReader::new(file);
2398            let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2399
2400            Table::builder()
2401                .metadata(resp)
2402                .metadata_location("s3://bucket/test/location/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json")
2403                .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2404                .file_io(FileIO::new_with_memory())
2405                .runtime(test_runtime())
2406                .build()
2407                .unwrap()
2408        };
2409
2410        let updates = vec![
2411            TableUpdate::SetLocation {
2412                location: "s3://bucket/test/new_location/data".to_string(),
2413            },
2414            TableUpdate::SetProperties {
2415                updates: vec![
2416                    ("prop1".to_string(), "v1".to_string()),
2417                    ("prop2".to_string(), "v2".to_string()),
2418                ]
2419                .into_iter()
2420                .collect(),
2421            },
2422        ];
2423
2424        let requirements = vec![TableRequirement::UuidMatch {
2425            uuid: table.metadata().table_uuid,
2426        }];
2427
2428        let table_commit = TableCommit::builder()
2429            .ident(table.identifier().to_owned())
2430            .updates(updates)
2431            .requirements(requirements)
2432            .build();
2433
2434        let updated_table = table_commit.apply(table).unwrap();
2435
2436        assert_eq!(
2437            updated_table.metadata().properties.get("prop1").unwrap(),
2438            "v1"
2439        );
2440        assert_eq!(
2441            updated_table.metadata().properties.get("prop2").unwrap(),
2442            "v2"
2443        );
2444
2445        // metadata version should be bumped
2446        assert!(
2447            updated_table
2448                .metadata_location()
2449                .unwrap()
2450                .starts_with("s3://bucket/test/location/metadata/00001-")
2451        );
2452
2453        assert_eq!(
2454            updated_table.metadata().location,
2455            "s3://bucket/test/new_location/data",
2456        );
2457    }
2458}