iceberg/catalog/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Catalog API for Apache Iceberg
19
20pub mod memory;
21mod metadata_location;
22
23use std::collections::HashMap;
24use std::fmt::{Debug, Display};
25use std::future::Future;
26use std::mem::take;
27use std::ops::Deref;
28use std::str::FromStr;
29use std::sync::Arc;
30
31use _serde::{deserialize_snapshot, serialize_snapshot};
32use async_trait::async_trait;
33pub use memory::MemoryCatalog;
34pub use metadata_location::*;
35#[cfg(test)]
36use mockall::automock;
37use serde_derive::{Deserialize, Serialize};
38use typed_builder::TypedBuilder;
39use uuid::Uuid;
40
41use crate::spec::{
42    EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot,
43    SnapshotReference, SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder,
44    UnboundPartitionSpec, ViewFormatVersion, ViewRepresentations, ViewVersion,
45};
46use crate::table::Table;
47use crate::{Error, ErrorKind, Result};
48
49/// The catalog API for Iceberg Rust.
50#[async_trait]
51#[cfg_attr(test, automock)]
52pub trait Catalog: Debug + Sync + Send {
53    /// List namespaces inside the catalog.
54    async fn list_namespaces(&self, parent: Option<&NamespaceIdent>)
55    -> Result<Vec<NamespaceIdent>>;
56
57    /// Create a new namespace inside the catalog.
58    async fn create_namespace(
59        &self,
60        namespace: &NamespaceIdent,
61        properties: HashMap<String, String>,
62    ) -> Result<Namespace>;
63
64    /// Get a namespace information from the catalog.
65    async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace>;
66
67    /// Check if namespace exists in catalog.
68    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool>;
69
70    /// Update a namespace inside the catalog.
71    ///
72    /// # Behavior
73    ///
74    /// The properties must be the full set of namespace.
75    async fn update_namespace(
76        &self,
77        namespace: &NamespaceIdent,
78        properties: HashMap<String, String>,
79    ) -> Result<()>;
80
81    /// Drop a namespace from the catalog, or returns error if it doesn't exist.
82    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()>;
83
84    /// List tables from namespace.
85    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>>;
86
87    /// Create a new table inside the namespace.
88    async fn create_table(
89        &self,
90        namespace: &NamespaceIdent,
91        creation: TableCreation,
92    ) -> Result<Table>;
93
94    /// Load table from the catalog.
95    async fn load_table(&self, table: &TableIdent) -> Result<Table>;
96
97    /// Drop a table from the catalog, or returns error if it doesn't exist.
98    async fn drop_table(&self, table: &TableIdent) -> Result<()>;
99
100    /// Check if a table exists in the catalog.
101    async fn table_exists(&self, table: &TableIdent) -> Result<bool>;
102
103    /// Rename a table in the catalog.
104    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()>;
105
106    /// Register an existing table to the catalog.
107    async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result<Table>;
108
109    /// Update a table to the catalog.
110    async fn update_table(&self, commit: TableCommit) -> Result<Table>;
111}
112
113/// Common interface for all catalog builders.
114pub trait CatalogBuilder: Default + Debug + Send + Sync {
115    /// The catalog type that this builder creates.
116    type C: Catalog;
117    /// Create a new catalog instance.
118    fn load(
119        self,
120        name: impl Into<String>,
121        props: HashMap<String, String>,
122    ) -> impl Future<Output = Result<Self::C>> + Send;
123}
124
125/// NamespaceIdent represents the identifier of a namespace in the catalog.
126///
127/// The namespace identifier is a list of strings, where each string is a
128/// component of the namespace. It's the catalog implementer's responsibility to
129/// handle the namespace identifier correctly.
130#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
131pub struct NamespaceIdent(Vec<String>);
132
133impl NamespaceIdent {
134    /// Create a new namespace identifier with only one level.
135    pub fn new(name: String) -> Self {
136        Self(vec![name])
137    }
138
139    /// Create a multi-level namespace identifier from vector.
140    pub fn from_vec(names: Vec<String>) -> Result<Self> {
141        if names.is_empty() {
142            return Err(Error::new(
143                ErrorKind::DataInvalid,
144                "Namespace identifier can't be empty!",
145            ));
146        }
147        Ok(Self(names))
148    }
149
150    /// Try to create namespace identifier from an iterator of string.
151    pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
152        Self::from_vec(iter.into_iter().map(|s| s.to_string()).collect())
153    }
154
155    /// Returns a string for used in url.
156    pub fn to_url_string(&self) -> String {
157        self.as_ref().join("\u{001f}")
158    }
159
160    /// Returns inner strings.
161    pub fn inner(self) -> Vec<String> {
162        self.0
163    }
164
165    /// Get the parent of this namespace.
166    /// Returns None if this namespace only has a single element and thus has no parent.
167    pub fn parent(&self) -> Option<Self> {
168        self.0.split_last().and_then(|(_, parent)| {
169            if parent.is_empty() {
170                None
171            } else {
172                Some(Self(parent.to_vec()))
173            }
174        })
175    }
176}
177
178impl AsRef<Vec<String>> for NamespaceIdent {
179    fn as_ref(&self) -> &Vec<String> {
180        &self.0
181    }
182}
183
184impl Deref for NamespaceIdent {
185    type Target = [String];
186
187    fn deref(&self) -> &Self::Target {
188        &self.0
189    }
190}
191
192/// Namespace represents a namespace in the catalog.
193#[derive(Debug, Clone, PartialEq, Eq)]
194pub struct Namespace {
195    name: NamespaceIdent,
196    properties: HashMap<String, String>,
197}
198
199impl Namespace {
200    /// Create a new namespace.
201    pub fn new(name: NamespaceIdent) -> Self {
202        Self::with_properties(name, HashMap::default())
203    }
204
205    /// Create a new namespace with properties.
206    pub fn with_properties(name: NamespaceIdent, properties: HashMap<String, String>) -> Self {
207        Self { name, properties }
208    }
209
210    /// Get the name of the namespace.
211    pub fn name(&self) -> &NamespaceIdent {
212        &self.name
213    }
214
215    /// Get the properties of the namespace.
216    pub fn properties(&self) -> &HashMap<String, String> {
217        &self.properties
218    }
219}
220
221impl Display for NamespaceIdent {
222    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223        write!(f, "{}", self.0.join("."))
224    }
225}
226
227/// TableIdent represents the identifier of a table in the catalog.
228#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
229pub struct TableIdent {
230    /// Namespace of the table.
231    pub namespace: NamespaceIdent,
232    /// Table name.
233    pub name: String,
234}
235
236impl TableIdent {
237    /// Create a new table identifier.
238    pub fn new(namespace: NamespaceIdent, name: String) -> Self {
239        Self { namespace, name }
240    }
241
242    /// Get the namespace of the table.
243    pub fn namespace(&self) -> &NamespaceIdent {
244        &self.namespace
245    }
246
247    /// Get the name of the table.
248    pub fn name(&self) -> &str {
249        &self.name
250    }
251
252    /// Try to create table identifier from an iterator of string.
253    pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
254        let mut vec: Vec<String> = iter.into_iter().map(|s| s.to_string()).collect();
255        let table_name = vec.pop().ok_or_else(|| {
256            Error::new(ErrorKind::DataInvalid, "Table identifier can't be empty!")
257        })?;
258        let namespace_ident = NamespaceIdent::from_vec(vec)?;
259
260        Ok(Self {
261            namespace: namespace_ident,
262            name: table_name,
263        })
264    }
265}
266
267impl Display for TableIdent {
268    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269        write!(f, "{}.{}", self.namespace, self.name)
270    }
271}
272
273/// TableCreation represents the creation of a table in the catalog.
274#[derive(Debug, TypedBuilder)]
275pub struct TableCreation {
276    /// The name of the table.
277    pub name: String,
278    /// The location of the table.
279    #[builder(default, setter(strip_option(fallback = location_opt)))]
280    pub location: Option<String>,
281    /// The schema of the table.
282    pub schema: Schema,
283    /// The partition spec of the table, could be None.
284    #[builder(default, setter(strip_option(fallback = partition_spec_opt), into))]
285    pub partition_spec: Option<UnboundPartitionSpec>,
286    /// The sort order of the table.
287    #[builder(default, setter(strip_option(fallback = sort_order_opt)))]
288    pub sort_order: Option<SortOrder>,
289    /// The properties of the table.
290    #[builder(default, setter(transform = |props: impl IntoIterator<Item=(String, String)>| {
291        props.into_iter().collect()
292    }))]
293    pub properties: HashMap<String, String>,
294    /// Format version of the table. Defaults to V2.
295    #[builder(default = FormatVersion::V2)]
296    pub format_version: FormatVersion,
297}
298
299/// TableCommit represents the commit of a table in the catalog.
300///
301/// The builder is marked as private since it's dangerous and error-prone to construct
302/// [`TableCommit`] directly.
303/// Users are supposed to use [`crate::transaction::Transaction`] to update table.
304#[derive(Debug, TypedBuilder)]
305#[builder(build_method(vis = "pub(crate)"))]
306pub struct TableCommit {
307    /// The table ident.
308    ident: TableIdent,
309    /// The requirements of the table.
310    ///
311    /// Commit will fail if the requirements are not met.
312    requirements: Vec<TableRequirement>,
313    /// The updates of the table.
314    updates: Vec<TableUpdate>,
315}
316
317impl TableCommit {
318    /// Return the table identifier.
319    pub fn identifier(&self) -> &TableIdent {
320        &self.ident
321    }
322
323    /// Take all requirements.
324    pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
325        take(&mut self.requirements)
326    }
327
328    /// Take all updates.
329    pub fn take_updates(&mut self) -> Vec<TableUpdate> {
330        take(&mut self.updates)
331    }
332
333    /// Applies this [`TableCommit`] to the given [`Table`] as part of a catalog update.
334    /// Typically used by [`Catalog::update_table`] to validate requirements and apply metadata updates.
335    ///
336    /// Returns a new [`Table`] with updated metadata,
337    /// or an error if validation or application fails.
338    pub fn apply(self, table: Table) -> Result<Table> {
339        // check requirements
340        for requirement in self.requirements {
341            requirement.check(Some(table.metadata()))?;
342        }
343
344        // get current metadata location
345        let current_metadata_location = table.metadata_location_result()?;
346
347        // apply updates to metadata builder
348        let mut metadata_builder = table
349            .metadata()
350            .clone()
351            .into_builder(Some(current_metadata_location.to_string()));
352        for update in self.updates {
353            metadata_builder = update.apply(metadata_builder)?;
354        }
355
356        // Bump the version of metadata
357        let new_metadata_location = MetadataLocation::from_str(current_metadata_location)?
358            .with_next_version()
359            .to_string();
360
361        Ok(table
362            .with_metadata(Arc::new(metadata_builder.build()?.metadata))
363            .with_metadata_location(new_metadata_location))
364    }
365}
366
367/// TableRequirement represents a requirement for a table in the catalog.
368#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
369#[serde(tag = "type")]
370pub enum TableRequirement {
371    /// The table must not already exist; used for create transactions
372    #[serde(rename = "assert-create")]
373    NotExist,
374    /// The table UUID must match the requirement.
375    #[serde(rename = "assert-table-uuid")]
376    UuidMatch {
377        /// Uuid of original table.
378        uuid: Uuid,
379    },
380    /// The table branch or tag identified by the requirement's `reference` must
381    /// reference the requirement's `snapshot-id`.
382    #[serde(rename = "assert-ref-snapshot-id")]
383    RefSnapshotIdMatch {
384        /// The reference of the table to assert.
385        r#ref: String,
386        /// The snapshot id of the table to assert.
387        /// If the id is `None`, the ref must not already exist.
388        #[serde(rename = "snapshot-id")]
389        snapshot_id: Option<i64>,
390    },
391    /// The table's last assigned column id must match the requirement.
392    #[serde(rename = "assert-last-assigned-field-id")]
393    LastAssignedFieldIdMatch {
394        /// The last assigned field id of the table to assert.
395        #[serde(rename = "last-assigned-field-id")]
396        last_assigned_field_id: i32,
397    },
398    /// The table's current schema id must match the requirement.
399    #[serde(rename = "assert-current-schema-id")]
400    CurrentSchemaIdMatch {
401        /// Current schema id of the table to assert.
402        #[serde(rename = "current-schema-id")]
403        current_schema_id: SchemaId,
404    },
405    /// The table's last assigned partition id must match the
406    /// requirement.
407    #[serde(rename = "assert-last-assigned-partition-id")]
408    LastAssignedPartitionIdMatch {
409        /// Last assigned partition id of the table to assert.
410        #[serde(rename = "last-assigned-partition-id")]
411        last_assigned_partition_id: i32,
412    },
413    /// The table's default spec id must match the requirement.
414    #[serde(rename = "assert-default-spec-id")]
415    DefaultSpecIdMatch {
416        /// Default spec id of the table to assert.
417        #[serde(rename = "default-spec-id")]
418        default_spec_id: i32,
419    },
420    /// The table's default sort order id must match the requirement.
421    #[serde(rename = "assert-default-sort-order-id")]
422    DefaultSortOrderIdMatch {
423        /// Default sort order id of the table to assert.
424        #[serde(rename = "default-sort-order-id")]
425        default_sort_order_id: i64,
426    },
427}
428
429/// TableUpdate represents an update to a table in the catalog.
430#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
431#[serde(tag = "action", rename_all = "kebab-case")]
432#[allow(clippy::large_enum_variant)]
433pub enum TableUpdate {
434    /// Upgrade table's format version
435    #[serde(rename_all = "kebab-case")]
436    UpgradeFormatVersion {
437        /// Target format upgrade to.
438        format_version: FormatVersion,
439    },
440    /// Assign a new UUID to the table
441    #[serde(rename_all = "kebab-case")]
442    AssignUuid {
443        /// The new UUID to assign.
444        uuid: Uuid,
445    },
446    /// Add a new schema to the table
447    #[serde(rename_all = "kebab-case")]
448    AddSchema {
449        /// The schema to add.
450        schema: Schema,
451    },
452    /// Set table's current schema
453    #[serde(rename_all = "kebab-case")]
454    SetCurrentSchema {
455        /// Schema ID to set as current, or -1 to set last added schema
456        schema_id: i32,
457    },
458    /// Add a new partition spec to the table
459    AddSpec {
460        /// The partition spec to add.
461        spec: UnboundPartitionSpec,
462    },
463    /// Set table's default spec
464    #[serde(rename_all = "kebab-case")]
465    SetDefaultSpec {
466        /// Partition spec ID to set as the default, or -1 to set last added spec
467        spec_id: i32,
468    },
469    /// Add sort order to table.
470    #[serde(rename_all = "kebab-case")]
471    AddSortOrder {
472        /// Sort order to add.
473        sort_order: SortOrder,
474    },
475    /// Set table's default sort order
476    #[serde(rename_all = "kebab-case")]
477    SetDefaultSortOrder {
478        /// Sort order ID to set as the default, or -1 to set last added sort order
479        sort_order_id: i64,
480    },
481    /// Add snapshot to table.
482    #[serde(rename_all = "kebab-case")]
483    AddSnapshot {
484        /// Snapshot to add.
485        #[serde(
486            deserialize_with = "deserialize_snapshot",
487            serialize_with = "serialize_snapshot"
488        )]
489        snapshot: Snapshot,
490    },
491    /// Set table's snapshot ref.
492    #[serde(rename_all = "kebab-case")]
493    SetSnapshotRef {
494        /// Name of snapshot reference to set.
495        ref_name: String,
496        /// Snapshot reference to set.
497        #[serde(flatten)]
498        reference: SnapshotReference,
499    },
500    /// Remove table's snapshots
501    #[serde(rename_all = "kebab-case")]
502    RemoveSnapshots {
503        /// Snapshot ids to remove.
504        snapshot_ids: Vec<i64>,
505    },
506    /// Remove snapshot reference
507    #[serde(rename_all = "kebab-case")]
508    RemoveSnapshotRef {
509        /// Name of snapshot reference to remove.
510        ref_name: String,
511    },
512    /// Update table's location
513    SetLocation {
514        /// New location for table.
515        location: String,
516    },
517    /// Update table's properties
518    SetProperties {
519        /// Properties to update for table.
520        updates: HashMap<String, String>,
521    },
522    /// Remove table's properties
523    RemoveProperties {
524        /// Properties to remove
525        removals: Vec<String>,
526    },
527    /// Remove partition specs
528    #[serde(rename_all = "kebab-case")]
529    RemovePartitionSpecs {
530        /// Partition spec ids to remove.
531        spec_ids: Vec<i32>,
532    },
533    /// Set statistics for a snapshot
534    #[serde(with = "_serde_set_statistics")]
535    SetStatistics {
536        /// File containing the statistics
537        statistics: StatisticsFile,
538    },
539    /// Remove statistics for a snapshot
540    #[serde(rename_all = "kebab-case")]
541    RemoveStatistics {
542        /// Snapshot id to remove statistics for.
543        snapshot_id: i64,
544    },
545    /// Set partition statistics for a snapshot
546    #[serde(rename_all = "kebab-case")]
547    SetPartitionStatistics {
548        /// File containing the partition statistics
549        partition_statistics: PartitionStatisticsFile,
550    },
551    /// Remove partition statistics for a snapshot
552    #[serde(rename_all = "kebab-case")]
553    RemovePartitionStatistics {
554        /// Snapshot id to remove partition statistics for.
555        snapshot_id: i64,
556    },
557    /// Remove schemas
558    #[serde(rename_all = "kebab-case")]
559    RemoveSchemas {
560        /// Schema IDs to remove.
561        schema_ids: Vec<i32>,
562    },
563    /// Add an encryption key
564    #[serde(rename_all = "kebab-case")]
565    AddEncryptionKey {
566        /// The encryption key to add.
567        encryption_key: EncryptedKey,
568    },
569    /// Remove an encryption key
570    #[serde(rename_all = "kebab-case")]
571    RemoveEncryptionKey {
572        /// The id of the encryption key to remove.
573        key_id: String,
574    },
575}
576
577impl TableUpdate {
578    /// Applies the update to the table metadata builder.
579    pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
580        match self {
581            TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
582            TableUpdate::AddSchema { schema, .. } => Ok(builder.add_schema(schema)?),
583            TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id),
584            TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
585            TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id),
586            TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order),
587            TableUpdate::SetDefaultSortOrder { sort_order_id } => {
588                builder.set_default_sort_order(sort_order_id)
589            }
590            TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot),
591            TableUpdate::SetSnapshotRef {
592                ref_name,
593                reference,
594            } => builder.set_ref(&ref_name, reference),
595            TableUpdate::RemoveSnapshots { snapshot_ids } => {
596                Ok(builder.remove_snapshots(&snapshot_ids))
597            }
598            TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)),
599            TableUpdate::SetLocation { location } => Ok(builder.set_location(location)),
600            TableUpdate::SetProperties { updates } => builder.set_properties(updates),
601            TableUpdate::RemoveProperties { removals } => builder.remove_properties(&removals),
602            TableUpdate::UpgradeFormatVersion { format_version } => {
603                builder.upgrade_format_version(format_version)
604            }
605            TableUpdate::RemovePartitionSpecs { spec_ids } => {
606                builder.remove_partition_specs(&spec_ids)
607            }
608            TableUpdate::SetStatistics { statistics } => Ok(builder.set_statistics(statistics)),
609            TableUpdate::RemoveStatistics { snapshot_id } => {
610                Ok(builder.remove_statistics(snapshot_id))
611            }
612            TableUpdate::SetPartitionStatistics {
613                partition_statistics,
614            } => Ok(builder.set_partition_statistics(partition_statistics)),
615            TableUpdate::RemovePartitionStatistics { snapshot_id } => {
616                Ok(builder.remove_partition_statistics(snapshot_id))
617            }
618            TableUpdate::RemoveSchemas { schema_ids } => builder.remove_schemas(&schema_ids),
619            TableUpdate::AddEncryptionKey { encryption_key } => {
620                Ok(builder.add_encryption_key(encryption_key))
621            }
622            TableUpdate::RemoveEncryptionKey { key_id } => {
623                Ok(builder.remove_encryption_key(&key_id))
624            }
625        }
626    }
627}
628
629impl TableRequirement {
630    /// Check that the requirement is met by the table metadata.
631    /// If the requirement is not met, an appropriate error is returned.
632    ///
633    /// Provide metadata as `None` if the table does not exist.
634    pub fn check(&self, metadata: Option<&TableMetadata>) -> Result<()> {
635        if let Some(metadata) = metadata {
636            match self {
637                TableRequirement::NotExist => {
638                    return Err(Error::new(
639                        ErrorKind::CatalogCommitConflicts,
640                        format!(
641                            "Requirement failed: Table with id {} already exists",
642                            metadata.uuid()
643                        ),
644                    )
645                    .with_retryable(true));
646                }
647                TableRequirement::UuidMatch { uuid } => {
648                    if &metadata.uuid() != uuid {
649                        return Err(Error::new(
650                            ErrorKind::CatalogCommitConflicts,
651                            "Requirement failed: Table UUID does not match",
652                        )
653                        .with_context("expected", *uuid)
654                        .with_context("found", metadata.uuid())
655                        .with_retryable(true));
656                    }
657                }
658                TableRequirement::CurrentSchemaIdMatch { current_schema_id } => {
659                    // ToDo: Harmonize the types of current_schema_id
660                    if metadata.current_schema_id != *current_schema_id {
661                        return Err(Error::new(
662                            ErrorKind::CatalogCommitConflicts,
663                            "Requirement failed: Current schema id does not match",
664                        )
665                        .with_context("expected", current_schema_id.to_string())
666                        .with_context("found", metadata.current_schema_id.to_string())
667                        .with_retryable(true));
668                    }
669                }
670                TableRequirement::DefaultSortOrderIdMatch {
671                    default_sort_order_id,
672                } => {
673                    if metadata.default_sort_order().order_id != *default_sort_order_id {
674                        return Err(Error::new(
675                            ErrorKind::CatalogCommitConflicts,
676                            "Requirement failed: Default sort order id does not match",
677                        )
678                        .with_context("expected", default_sort_order_id.to_string())
679                        .with_context("found", metadata.default_sort_order().order_id.to_string())
680                        .with_retryable(true));
681                    }
682                }
683                TableRequirement::RefSnapshotIdMatch { r#ref, snapshot_id } => {
684                    let snapshot_ref = metadata.snapshot_for_ref(r#ref);
685                    if let Some(snapshot_id) = snapshot_id {
686                        let snapshot_ref = snapshot_ref.ok_or(
687                            Error::new(
688                                ErrorKind::CatalogCommitConflicts,
689                                format!("Requirement failed: Branch or tag `{ref}` not found"),
690                            )
691                            .with_retryable(true),
692                        )?;
693                        if snapshot_ref.snapshot_id() != *snapshot_id {
694                            return Err(Error::new(
695                                ErrorKind::CatalogCommitConflicts,
696                                format!(
697                                    "Requirement failed: Branch or tag `{ref}`'s snapshot has changed"
698                                ),
699                            )
700                            .with_context("expected", snapshot_id.to_string())
701                            .with_context("found", snapshot_ref.snapshot_id().to_string())
702                            .with_retryable(true));
703                        }
704                    } else if snapshot_ref.is_some() {
705                        // a null snapshot ID means the ref should not exist already
706                        return Err(Error::new(
707                            ErrorKind::CatalogCommitConflicts,
708                            format!("Requirement failed: Branch or tag `{ref}` already exists"),
709                        )
710                        .with_retryable(true));
711                    }
712                }
713                TableRequirement::DefaultSpecIdMatch { default_spec_id } => {
714                    // ToDo: Harmonize the types of default_spec_id
715                    if metadata.default_partition_spec_id() != *default_spec_id {
716                        return Err(Error::new(
717                            ErrorKind::CatalogCommitConflicts,
718                            "Requirement failed: Default partition spec id does not match",
719                        )
720                        .with_context("expected", default_spec_id.to_string())
721                        .with_context("found", metadata.default_partition_spec_id().to_string())
722                        .with_retryable(true));
723                    }
724                }
725                TableRequirement::LastAssignedPartitionIdMatch {
726                    last_assigned_partition_id,
727                } => {
728                    if metadata.last_partition_id != *last_assigned_partition_id {
729                        return Err(Error::new(
730                            ErrorKind::CatalogCommitConflicts,
731                            "Requirement failed: Last assigned partition id does not match",
732                        )
733                        .with_context("expected", last_assigned_partition_id.to_string())
734                        .with_context("found", metadata.last_partition_id.to_string())
735                        .with_retryable(true));
736                    }
737                }
738                TableRequirement::LastAssignedFieldIdMatch {
739                    last_assigned_field_id,
740                } => {
741                    if &metadata.last_column_id != last_assigned_field_id {
742                        return Err(Error::new(
743                            ErrorKind::CatalogCommitConflicts,
744                            "Requirement failed: Last assigned field id does not match",
745                        )
746                        .with_context("expected", last_assigned_field_id.to_string())
747                        .with_context("found", metadata.last_column_id.to_string())
748                        .with_retryable(true));
749                    }
750                }
751            };
752        } else {
753            match self {
754                TableRequirement::NotExist => {}
755                _ => {
756                    return Err(Error::new(
757                        ErrorKind::TableNotFound,
758                        "Requirement failed: Table does not exist",
759                    ));
760                }
761            }
762        }
763
764        Ok(())
765    }
766}
767
768pub(super) mod _serde {
769    use serde::{Deserialize as _, Deserializer, Serialize as _};
770
771    use super::*;
772    use crate::spec::{SchemaId, Summary};
773
774    pub(super) fn deserialize_snapshot<'de, D>(
775        deserializer: D,
776    ) -> std::result::Result<Snapshot, D::Error>
777    where D: Deserializer<'de> {
778        let buf = CatalogSnapshot::deserialize(deserializer)?;
779        Ok(buf.into())
780    }
781
782    pub(super) fn serialize_snapshot<S>(
783        snapshot: &Snapshot,
784        serializer: S,
785    ) -> std::result::Result<S::Ok, S::Error>
786    where
787        S: serde::Serializer,
788    {
789        let buf: CatalogSnapshot = snapshot.clone().into();
790        buf.serialize(serializer)
791    }
792
793    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
794    #[serde(rename_all = "kebab-case")]
795    /// Defines the structure of a v2 snapshot for the catalog.
796    /// Main difference to SnapshotV2 is that sequence-number is optional
797    /// in the rest catalog spec to allow for backwards compatibility with v1.
798    struct CatalogSnapshot {
799        snapshot_id: i64,
800        #[serde(skip_serializing_if = "Option::is_none")]
801        parent_snapshot_id: Option<i64>,
802        #[serde(default)]
803        sequence_number: i64,
804        timestamp_ms: i64,
805        manifest_list: String,
806        summary: Summary,
807        #[serde(skip_serializing_if = "Option::is_none")]
808        schema_id: Option<SchemaId>,
809        #[serde(skip_serializing_if = "Option::is_none")]
810        first_row_id: Option<u64>,
811        #[serde(skip_serializing_if = "Option::is_none")]
812        added_rows: Option<u64>,
813        #[serde(skip_serializing_if = "Option::is_none")]
814        key_id: Option<String>,
815    }
816
817    impl From<CatalogSnapshot> for Snapshot {
818        fn from(snapshot: CatalogSnapshot) -> Self {
819            let CatalogSnapshot {
820                snapshot_id,
821                parent_snapshot_id,
822                sequence_number,
823                timestamp_ms,
824                manifest_list,
825                schema_id,
826                summary,
827                first_row_id,
828                added_rows,
829                key_id,
830            } = snapshot;
831            let builder = Snapshot::builder()
832                .with_snapshot_id(snapshot_id)
833                .with_parent_snapshot_id(parent_snapshot_id)
834                .with_sequence_number(sequence_number)
835                .with_timestamp_ms(timestamp_ms)
836                .with_manifest_list(manifest_list)
837                .with_summary(summary)
838                .with_encryption_key_id(key_id);
839            let row_range = first_row_id.zip(added_rows);
840            match (schema_id, row_range) {
841                (None, None) => builder.build(),
842                (Some(schema_id), None) => builder.with_schema_id(schema_id).build(),
843                (None, Some((first_row_id, last_row_id))) => {
844                    builder.with_row_range(first_row_id, last_row_id).build()
845                }
846                (Some(schema_id), Some((first_row_id, last_row_id))) => builder
847                    .with_schema_id(schema_id)
848                    .with_row_range(first_row_id, last_row_id)
849                    .build(),
850            }
851        }
852    }
853
854    impl From<Snapshot> for CatalogSnapshot {
855        fn from(snapshot: Snapshot) -> Self {
856            let first_row_id = snapshot.first_row_id();
857            let added_rows = snapshot.added_rows_count();
858            let Snapshot {
859                snapshot_id,
860                parent_snapshot_id,
861                sequence_number,
862                timestamp_ms,
863                manifest_list,
864                summary,
865                schema_id,
866                row_range: _,
867                encryption_key_id: key_id,
868            } = snapshot;
869            CatalogSnapshot {
870                snapshot_id,
871                parent_snapshot_id,
872                sequence_number,
873                timestamp_ms,
874                manifest_list,
875                summary,
876                schema_id,
877                first_row_id,
878                added_rows,
879                key_id,
880            }
881        }
882    }
883}
884
885/// ViewCreation represents the creation of a view in the catalog.
886#[derive(Debug, TypedBuilder)]
887pub struct ViewCreation {
888    /// The name of the view.
889    pub name: String,
890    /// The view's base location; used to create metadata file locations
891    pub location: String,
892    /// Representations for the view.
893    pub representations: ViewRepresentations,
894    /// The schema of the view.
895    pub schema: Schema,
896    /// The properties of the view.
897    #[builder(default)]
898    pub properties: HashMap<String, String>,
899    /// The default namespace to use when a reference in the SELECT is a single identifier
900    pub default_namespace: NamespaceIdent,
901    /// Default catalog to use when a reference in the SELECT does not contain a catalog
902    #[builder(default)]
903    pub default_catalog: Option<String>,
904    /// A string to string map of summary metadata about the version
905    /// Typical keys are "engine-name" and "engine-version"
906    #[builder(default)]
907    pub summary: HashMap<String, String>,
908}
909
910/// ViewUpdate represents an update to a view in the catalog.
911#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
912#[serde(tag = "action", rename_all = "kebab-case")]
913#[allow(clippy::large_enum_variant)]
914pub enum ViewUpdate {
915    /// Assign a new UUID to the view
916    #[serde(rename_all = "kebab-case")]
917    AssignUuid {
918        /// The new UUID to assign.
919        uuid: Uuid,
920    },
921    /// Upgrade view's format version
922    #[serde(rename_all = "kebab-case")]
923    UpgradeFormatVersion {
924        /// Target format upgrade to.
925        format_version: ViewFormatVersion,
926    },
927    /// Add a new schema to the view
928    #[serde(rename_all = "kebab-case")]
929    AddSchema {
930        /// The schema to add.
931        schema: Schema,
932        /// The last column id of the view.
933        last_column_id: Option<i32>,
934    },
935    /// Set view's current schema
936    #[serde(rename_all = "kebab-case")]
937    SetLocation {
938        /// New location for view.
939        location: String,
940    },
941    /// Set view's properties
942    ///
943    /// Matching keys are updated, and non-matching keys are left unchanged.
944    #[serde(rename_all = "kebab-case")]
945    SetProperties {
946        /// Properties to update for view.
947        updates: HashMap<String, String>,
948    },
949    /// Remove view's properties
950    #[serde(rename_all = "kebab-case")]
951    RemoveProperties {
952        /// Properties to remove
953        removals: Vec<String>,
954    },
955    /// Add a new version to the view
956    #[serde(rename_all = "kebab-case")]
957    AddViewVersion {
958        /// The view version to add.
959        view_version: ViewVersion,
960    },
961    /// Set view's current version
962    #[serde(rename_all = "kebab-case")]
963    SetCurrentViewVersion {
964        /// View version id to set as current, or -1 to set last added version
965        view_version_id: i32,
966    },
967}
968
969mod _serde_set_statistics {
970    // The rest spec requires an additional field `snapshot-id`
971    // that is redundant with the `snapshot_id` field in the statistics file.
972    use serde::{Deserialize, Deserializer, Serialize, Serializer};
973
974    use super::*;
975
976    #[derive(Debug, Serialize, Deserialize)]
977    #[serde(rename_all = "kebab-case")]
978    struct SetStatistics {
979        snapshot_id: Option<i64>,
980        statistics: StatisticsFile,
981    }
982
983    pub fn serialize<S>(
984        value: &StatisticsFile,
985        serializer: S,
986    ) -> std::result::Result<S::Ok, S::Error>
987    where
988        S: Serializer,
989    {
990        SetStatistics {
991            snapshot_id: Some(value.snapshot_id),
992            statistics: value.clone(),
993        }
994        .serialize(serializer)
995    }
996
997    pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<StatisticsFile, D::Error>
998    where D: Deserializer<'de> {
999        let SetStatistics {
1000            snapshot_id,
1001            statistics,
1002        } = SetStatistics::deserialize(deserializer)?;
1003        if let Some(snapshot_id) = snapshot_id {
1004            if snapshot_id != statistics.snapshot_id {
1005                return Err(serde::de::Error::custom(format!(
1006                    "Snapshot id to set {snapshot_id} does not match the statistics file snapshot id {}",
1007                    statistics.snapshot_id
1008                )));
1009            }
1010        }
1011
1012        Ok(statistics)
1013    }
1014}
1015
1016#[cfg(test)]
1017mod tests {
1018    use std::collections::HashMap;
1019    use std::fmt::Debug;
1020    use std::fs::File;
1021    use std::io::BufReader;
1022
1023    use base64::Engine as _;
1024    use serde::Serialize;
1025    use serde::de::DeserializeOwned;
1026    use uuid::uuid;
1027
1028    use super::ViewUpdate;
1029    use crate::io::FileIOBuilder;
1030    use crate::spec::{
1031        BlobMetadata, EncryptedKey, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
1032        PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, SnapshotReference,
1033        SnapshotRetention, SortDirection, SortField, SortOrder, SqlViewRepresentation,
1034        StatisticsFile, Summary, TableMetadata, TableMetadataBuilder, Transform, Type,
1035        UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations,
1036        ViewVersion,
1037    };
1038    use crate::table::Table;
1039    use crate::{
1040        NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, TableUpdate,
1041    };
1042
1043    #[test]
1044    fn test_parent_namespace() {
1045        let ns1 = NamespaceIdent::from_strs(vec!["ns1"]).unwrap();
1046        let ns2 = NamespaceIdent::from_strs(vec!["ns1", "ns2"]).unwrap();
1047        let ns3 = NamespaceIdent::from_strs(vec!["ns1", "ns2", "ns3"]).unwrap();
1048
1049        assert_eq!(ns1.parent(), None);
1050        assert_eq!(ns2.parent(), Some(ns1.clone()));
1051        assert_eq!(ns3.parent(), Some(ns2.clone()));
1052    }
1053
1054    #[test]
1055    fn test_create_table_id() {
1056        let table_id = TableIdent {
1057            namespace: NamespaceIdent::from_strs(vec!["ns1"]).unwrap(),
1058            name: "t1".to_string(),
1059        };
1060
1061        assert_eq!(table_id, TableIdent::from_strs(vec!["ns1", "t1"]).unwrap());
1062    }
1063
1064    #[test]
1065    fn test_table_creation_iterator_properties() {
1066        let builder = TableCreation::builder()
1067            .name("table".to_string())
1068            .schema(Schema::builder().build().unwrap());
1069
1070        fn s(k: &str, v: &str) -> (String, String) {
1071            (k.to_string(), v.to_string())
1072        }
1073
1074        let table_creation = builder
1075            .properties([s("key", "value"), s("foo", "bar")])
1076            .build();
1077
1078        assert_eq!(
1079            HashMap::from([s("key", "value"), s("foo", "bar")]),
1080            table_creation.properties
1081        );
1082    }
1083
1084    fn test_serde_json<T: Serialize + DeserializeOwned + PartialEq + Debug>(
1085        json: impl ToString,
1086        expected: T,
1087    ) {
1088        let json_str = json.to_string();
1089        let actual: T = serde_json::from_str(&json_str).expect("Failed to parse from json");
1090        assert_eq!(actual, expected, "Parsed value is not equal to expected");
1091
1092        let restored: T = serde_json::from_str(
1093            &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1094        )
1095        .expect("Failed to parse from serialized json");
1096
1097        assert_eq!(
1098            restored, expected,
1099            "Parsed restored value is not equal to expected"
1100        );
1101    }
1102
1103    fn metadata() -> TableMetadata {
1104        let tbl_creation = TableCreation::builder()
1105            .name("table".to_string())
1106            .location("/path/to/table".to_string())
1107            .schema(Schema::builder().build().unwrap())
1108            .build();
1109
1110        TableMetadataBuilder::from_table_creation(tbl_creation)
1111            .unwrap()
1112            .assign_uuid(uuid::Uuid::nil())
1113            .build()
1114            .unwrap()
1115            .metadata
1116    }
1117
1118    #[test]
1119    fn test_check_requirement_not_exist() {
1120        let metadata = metadata();
1121        let requirement = TableRequirement::NotExist;
1122
1123        assert!(requirement.check(Some(&metadata)).is_err());
1124        assert!(requirement.check(None).is_ok());
1125    }
1126
1127    #[test]
1128    fn test_check_table_uuid() {
1129        let metadata = metadata();
1130
1131        let requirement = TableRequirement::UuidMatch {
1132            uuid: uuid::Uuid::now_v7(),
1133        };
1134        assert!(requirement.check(Some(&metadata)).is_err());
1135
1136        let requirement = TableRequirement::UuidMatch {
1137            uuid: uuid::Uuid::nil(),
1138        };
1139        assert!(requirement.check(Some(&metadata)).is_ok());
1140    }
1141
1142    #[test]
1143    fn test_check_ref_snapshot_id() {
1144        let metadata = metadata();
1145
1146        // Ref does not exist but should
1147        let requirement = TableRequirement::RefSnapshotIdMatch {
1148            r#ref: "my_branch".to_string(),
1149            snapshot_id: Some(1),
1150        };
1151        assert!(requirement.check(Some(&metadata)).is_err());
1152
1153        // Ref does not exist and should not
1154        let requirement = TableRequirement::RefSnapshotIdMatch {
1155            r#ref: "my_branch".to_string(),
1156            snapshot_id: None,
1157        };
1158        assert!(requirement.check(Some(&metadata)).is_ok());
1159
1160        // Add snapshot
1161        let snapshot = Snapshot::builder()
1162            .with_snapshot_id(3051729675574597004)
1163            .with_sequence_number(10)
1164            .with_timestamp_ms(9992191116217)
1165            .with_manifest_list("s3://b/wh/.../s1.avro".to_string())
1166            .with_schema_id(0)
1167            .with_summary(Summary {
1168                operation: Operation::Append,
1169                additional_properties: HashMap::new(),
1170            })
1171            .build();
1172
1173        let builder = metadata.into_builder(None);
1174        let builder = TableUpdate::AddSnapshot {
1175            snapshot: snapshot.clone(),
1176        }
1177        .apply(builder)
1178        .unwrap();
1179        let metadata = TableUpdate::SetSnapshotRef {
1180            ref_name: MAIN_BRANCH.to_string(),
1181            reference: SnapshotReference {
1182                snapshot_id: snapshot.snapshot_id(),
1183                retention: SnapshotRetention::Branch {
1184                    min_snapshots_to_keep: Some(10),
1185                    max_snapshot_age_ms: None,
1186                    max_ref_age_ms: None,
1187                },
1188            },
1189        }
1190        .apply(builder)
1191        .unwrap()
1192        .build()
1193        .unwrap()
1194        .metadata;
1195
1196        // Ref exists and should match
1197        let requirement = TableRequirement::RefSnapshotIdMatch {
1198            r#ref: "main".to_string(),
1199            snapshot_id: Some(3051729675574597004),
1200        };
1201        assert!(requirement.check(Some(&metadata)).is_ok());
1202
1203        // Ref exists but does not match
1204        let requirement = TableRequirement::RefSnapshotIdMatch {
1205            r#ref: "main".to_string(),
1206            snapshot_id: Some(1),
1207        };
1208        assert!(requirement.check(Some(&metadata)).is_err());
1209    }
1210
1211    #[test]
1212    fn test_check_last_assigned_field_id() {
1213        let metadata = metadata();
1214
1215        let requirement = TableRequirement::LastAssignedFieldIdMatch {
1216            last_assigned_field_id: 1,
1217        };
1218        assert!(requirement.check(Some(&metadata)).is_err());
1219
1220        let requirement = TableRequirement::LastAssignedFieldIdMatch {
1221            last_assigned_field_id: 0,
1222        };
1223        assert!(requirement.check(Some(&metadata)).is_ok());
1224    }
1225
1226    #[test]
1227    fn test_check_current_schema_id() {
1228        let metadata = metadata();
1229
1230        let requirement = TableRequirement::CurrentSchemaIdMatch {
1231            current_schema_id: 1,
1232        };
1233        assert!(requirement.check(Some(&metadata)).is_err());
1234
1235        let requirement = TableRequirement::CurrentSchemaIdMatch {
1236            current_schema_id: 0,
1237        };
1238        assert!(requirement.check(Some(&metadata)).is_ok());
1239    }
1240
1241    #[test]
1242    fn test_check_last_assigned_partition_id() {
1243        let metadata = metadata();
1244        let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1245            last_assigned_partition_id: 0,
1246        };
1247        assert!(requirement.check(Some(&metadata)).is_err());
1248
1249        let requirement = TableRequirement::LastAssignedPartitionIdMatch {
1250            last_assigned_partition_id: 999,
1251        };
1252        assert!(requirement.check(Some(&metadata)).is_ok());
1253    }
1254
1255    #[test]
1256    fn test_check_default_spec_id() {
1257        let metadata = metadata();
1258
1259        let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 1 };
1260        assert!(requirement.check(Some(&metadata)).is_err());
1261
1262        let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 0 };
1263        assert!(requirement.check(Some(&metadata)).is_ok());
1264    }
1265
1266    #[test]
1267    fn test_check_default_sort_order_id() {
1268        let metadata = metadata();
1269
1270        let requirement = TableRequirement::DefaultSortOrderIdMatch {
1271            default_sort_order_id: 1,
1272        };
1273        assert!(requirement.check(Some(&metadata)).is_err());
1274
1275        let requirement = TableRequirement::DefaultSortOrderIdMatch {
1276            default_sort_order_id: 0,
1277        };
1278        assert!(requirement.check(Some(&metadata)).is_ok());
1279    }
1280
1281    #[test]
1282    fn test_table_uuid() {
1283        test_serde_json(
1284            r#"
1285{
1286    "type": "assert-table-uuid",
1287    "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1288}
1289        "#,
1290            TableRequirement::UuidMatch {
1291                uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1292            },
1293        );
1294    }
1295
1296    #[test]
1297    fn test_assert_table_not_exists() {
1298        test_serde_json(
1299            r#"
1300{
1301    "type": "assert-create"
1302}
1303        "#,
1304            TableRequirement::NotExist,
1305        );
1306    }
1307
1308    #[test]
1309    fn test_assert_ref_snapshot_id() {
1310        test_serde_json(
1311            r#"
1312{
1313    "type": "assert-ref-snapshot-id",
1314    "ref": "snapshot-name",
1315    "snapshot-id": null
1316}
1317        "#,
1318            TableRequirement::RefSnapshotIdMatch {
1319                r#ref: "snapshot-name".to_string(),
1320                snapshot_id: None,
1321            },
1322        );
1323
1324        test_serde_json(
1325            r#"
1326{
1327    "type": "assert-ref-snapshot-id",
1328    "ref": "snapshot-name",
1329    "snapshot-id": 1
1330}
1331        "#,
1332            TableRequirement::RefSnapshotIdMatch {
1333                r#ref: "snapshot-name".to_string(),
1334                snapshot_id: Some(1),
1335            },
1336        );
1337    }
1338
1339    #[test]
1340    fn test_assert_last_assigned_field_id() {
1341        test_serde_json(
1342            r#"
1343{
1344    "type": "assert-last-assigned-field-id",
1345    "last-assigned-field-id": 12
1346}
1347        "#,
1348            TableRequirement::LastAssignedFieldIdMatch {
1349                last_assigned_field_id: 12,
1350            },
1351        );
1352    }
1353
1354    #[test]
1355    fn test_assert_current_schema_id() {
1356        test_serde_json(
1357            r#"
1358{
1359    "type": "assert-current-schema-id",
1360    "current-schema-id": 4
1361}
1362        "#,
1363            TableRequirement::CurrentSchemaIdMatch {
1364                current_schema_id: 4,
1365            },
1366        );
1367    }
1368
1369    #[test]
1370    fn test_assert_last_assigned_partition_id() {
1371        test_serde_json(
1372            r#"
1373{
1374    "type": "assert-last-assigned-partition-id",
1375    "last-assigned-partition-id": 1004
1376}
1377        "#,
1378            TableRequirement::LastAssignedPartitionIdMatch {
1379                last_assigned_partition_id: 1004,
1380            },
1381        );
1382    }
1383
1384    #[test]
1385    fn test_assert_default_spec_id() {
1386        test_serde_json(
1387            r#"
1388{
1389    "type": "assert-default-spec-id",
1390    "default-spec-id": 5
1391}
1392        "#,
1393            TableRequirement::DefaultSpecIdMatch { default_spec_id: 5 },
1394        );
1395    }
1396
1397    #[test]
1398    fn test_assert_default_sort_order() {
1399        let json = r#"
1400{
1401    "type": "assert-default-sort-order-id",
1402    "default-sort-order-id": 10
1403}
1404        "#;
1405
1406        let update = TableRequirement::DefaultSortOrderIdMatch {
1407            default_sort_order_id: 10,
1408        };
1409
1410        test_serde_json(json, update);
1411    }
1412
1413    #[test]
1414    fn test_parse_assert_invalid() {
1415        assert!(
1416            serde_json::from_str::<TableRequirement>(
1417                r#"
1418{
1419    "default-sort-order-id": 10
1420}
1421"#
1422            )
1423            .is_err(),
1424            "Table requirements should not be parsed without type."
1425        );
1426    }
1427
1428    #[test]
1429    fn test_assign_uuid() {
1430        test_serde_json(
1431            r#"
1432{
1433    "action": "assign-uuid",
1434    "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1435}
1436        "#,
1437            TableUpdate::AssignUuid {
1438                uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1439            },
1440        );
1441    }
1442
1443    #[test]
1444    fn test_upgrade_format_version() {
1445        test_serde_json(
1446            r#"
1447{
1448    "action": "upgrade-format-version",
1449    "format-version": 2
1450}
1451        "#,
1452            TableUpdate::UpgradeFormatVersion {
1453                format_version: FormatVersion::V2,
1454            },
1455        );
1456    }
1457
1458    #[test]
1459    fn test_add_schema() {
1460        let test_schema = Schema::builder()
1461            .with_schema_id(1)
1462            .with_identifier_field_ids(vec![2])
1463            .with_fields(vec![
1464                NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
1465                NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
1466                NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
1467            ])
1468            .build()
1469            .unwrap();
1470        test_serde_json(
1471            r#"
1472{
1473    "action": "add-schema",
1474    "schema": {
1475        "type": "struct",
1476        "schema-id": 1,
1477        "fields": [
1478            {
1479                "id": 1,
1480                "name": "foo",
1481                "required": false,
1482                "type": "string"
1483            },
1484            {
1485                "id": 2,
1486                "name": "bar",
1487                "required": true,
1488                "type": "int"
1489            },
1490            {
1491                "id": 3,
1492                "name": "baz",
1493                "required": false,
1494                "type": "boolean"
1495            }
1496        ],
1497        "identifier-field-ids": [
1498            2
1499        ]
1500    },
1501    "last-column-id": 3
1502}
1503        "#,
1504            TableUpdate::AddSchema {
1505                schema: test_schema.clone(),
1506            },
1507        );
1508
1509        test_serde_json(
1510            r#"
1511{
1512    "action": "add-schema",
1513    "schema": {
1514        "type": "struct",
1515        "schema-id": 1,
1516        "fields": [
1517            {
1518                "id": 1,
1519                "name": "foo",
1520                "required": false,
1521                "type": "string"
1522            },
1523            {
1524                "id": 2,
1525                "name": "bar",
1526                "required": true,
1527                "type": "int"
1528            },
1529            {
1530                "id": 3,
1531                "name": "baz",
1532                "required": false,
1533                "type": "boolean"
1534            }
1535        ],
1536        "identifier-field-ids": [
1537            2
1538        ]
1539    }
1540}
1541        "#,
1542            TableUpdate::AddSchema {
1543                schema: test_schema.clone(),
1544            },
1545        );
1546    }
1547
1548    #[test]
1549    fn test_set_current_schema() {
1550        test_serde_json(
1551            r#"
1552{
1553   "action": "set-current-schema",
1554   "schema-id": 23
1555}
1556        "#,
1557            TableUpdate::SetCurrentSchema { schema_id: 23 },
1558        );
1559    }
1560
1561    #[test]
1562    fn test_add_spec() {
1563        test_serde_json(
1564            r#"
1565{
1566    "action": "add-spec",
1567    "spec": {
1568        "fields": [
1569            {
1570                "source-id": 4,
1571                "name": "ts_day",
1572                "transform": "day"
1573            },
1574            {
1575                "source-id": 1,
1576                "name": "id_bucket",
1577                "transform": "bucket[16]"
1578            },
1579            {
1580                "source-id": 2,
1581                "name": "id_truncate",
1582                "transform": "truncate[4]"
1583            }
1584        ]
1585    }
1586}
1587        "#,
1588            TableUpdate::AddSpec {
1589                spec: UnboundPartitionSpec::builder()
1590                    .add_partition_field(4, "ts_day".to_string(), Transform::Day)
1591                    .unwrap()
1592                    .add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16))
1593                    .unwrap()
1594                    .add_partition_field(2, "id_truncate".to_string(), Transform::Truncate(4))
1595                    .unwrap()
1596                    .build(),
1597            },
1598        );
1599    }
1600
1601    #[test]
1602    fn test_set_default_spec() {
1603        test_serde_json(
1604            r#"
1605{
1606    "action": "set-default-spec",
1607    "spec-id": 1
1608}
1609        "#,
1610            TableUpdate::SetDefaultSpec { spec_id: 1 },
1611        )
1612    }
1613
1614    #[test]
1615    fn test_add_sort_order() {
1616        let json = r#"
1617{
1618    "action": "add-sort-order",
1619    "sort-order": {
1620        "order-id": 1,
1621        "fields": [
1622            {
1623                "transform": "identity",
1624                "source-id": 2,
1625                "direction": "asc",
1626                "null-order": "nulls-first"
1627            },
1628            {
1629                "transform": "bucket[4]",
1630                "source-id": 3,
1631                "direction": "desc",
1632                "null-order": "nulls-last"
1633            }
1634        ]
1635    }
1636}
1637        "#;
1638
1639        let update = TableUpdate::AddSortOrder {
1640            sort_order: SortOrder::builder()
1641                .with_order_id(1)
1642                .with_sort_field(
1643                    SortField::builder()
1644                        .source_id(2)
1645                        .direction(SortDirection::Ascending)
1646                        .null_order(NullOrder::First)
1647                        .transform(Transform::Identity)
1648                        .build(),
1649                )
1650                .with_sort_field(
1651                    SortField::builder()
1652                        .source_id(3)
1653                        .direction(SortDirection::Descending)
1654                        .null_order(NullOrder::Last)
1655                        .transform(Transform::Bucket(4))
1656                        .build(),
1657                )
1658                .build_unbound()
1659                .unwrap(),
1660        };
1661
1662        test_serde_json(json, update);
1663    }
1664
1665    #[test]
1666    fn test_set_default_order() {
1667        let json = r#"
1668{
1669    "action": "set-default-sort-order",
1670    "sort-order-id": 2
1671}
1672        "#;
1673        let update = TableUpdate::SetDefaultSortOrder { sort_order_id: 2 };
1674
1675        test_serde_json(json, update);
1676    }
1677
1678    #[test]
1679    fn test_add_snapshot() {
1680        let json = r#"
1681{
1682    "action": "add-snapshot",
1683    "snapshot": {
1684        "snapshot-id": 3055729675574597000,
1685        "parent-snapshot-id": 3051729675574597000,
1686        "timestamp-ms": 1555100955770,
1687        "sequence-number": 1,
1688        "summary": {
1689            "operation": "append"
1690        },
1691        "manifest-list": "s3://a/b/2.avro",
1692        "schema-id": 1
1693    }
1694}
1695        "#;
1696
1697        let update = TableUpdate::AddSnapshot {
1698            snapshot: Snapshot::builder()
1699                .with_snapshot_id(3055729675574597000)
1700                .with_parent_snapshot_id(Some(3051729675574597000))
1701                .with_timestamp_ms(1555100955770)
1702                .with_sequence_number(1)
1703                .with_manifest_list("s3://a/b/2.avro")
1704                .with_schema_id(1)
1705                .with_summary(Summary {
1706                    operation: Operation::Append,
1707                    additional_properties: HashMap::default(),
1708                })
1709                .build(),
1710        };
1711
1712        test_serde_json(json, update);
1713    }
1714
1715    #[test]
1716    fn test_add_snapshot_v1() {
1717        let json = r#"
1718{
1719    "action": "add-snapshot",
1720    "snapshot": {
1721        "snapshot-id": 3055729675574597000,
1722        "parent-snapshot-id": 3051729675574597000,
1723        "timestamp-ms": 1555100955770,
1724        "summary": {
1725            "operation": "append"
1726        },
1727        "manifest-list": "s3://a/b/2.avro"
1728    }
1729}
1730    "#;
1731
1732        let update = TableUpdate::AddSnapshot {
1733            snapshot: Snapshot::builder()
1734                .with_snapshot_id(3055729675574597000)
1735                .with_parent_snapshot_id(Some(3051729675574597000))
1736                .with_timestamp_ms(1555100955770)
1737                .with_sequence_number(0)
1738                .with_manifest_list("s3://a/b/2.avro")
1739                .with_summary(Summary {
1740                    operation: Operation::Append,
1741                    additional_properties: HashMap::default(),
1742                })
1743                .build(),
1744        };
1745
1746        let actual: TableUpdate = serde_json::from_str(json).expect("Failed to parse from json");
1747        assert_eq!(actual, update, "Parsed value is not equal to expected");
1748    }
1749
1750    #[test]
1751    fn test_add_snapshot_v3() {
1752        let json = serde_json::json!(
1753        {
1754            "action": "add-snapshot",
1755            "snapshot": {
1756                "snapshot-id": 3055729675574597000i64,
1757                "parent-snapshot-id": 3051729675574597000i64,
1758                "timestamp-ms": 1555100955770i64,
1759                "first-row-id":0,
1760                "added-rows":2,
1761                "key-id":"key123",
1762                "summary": {
1763                    "operation": "append"
1764                },
1765                "manifest-list": "s3://a/b/2.avro"
1766            }
1767        });
1768
1769        let update = TableUpdate::AddSnapshot {
1770            snapshot: Snapshot::builder()
1771                .with_snapshot_id(3055729675574597000)
1772                .with_parent_snapshot_id(Some(3051729675574597000))
1773                .with_timestamp_ms(1555100955770)
1774                .with_sequence_number(0)
1775                .with_manifest_list("s3://a/b/2.avro")
1776                .with_row_range(0, 2)
1777                .with_encryption_key_id(Some("key123".to_string()))
1778                .with_summary(Summary {
1779                    operation: Operation::Append,
1780                    additional_properties: HashMap::default(),
1781                })
1782                .build(),
1783        };
1784
1785        let actual: TableUpdate = serde_json::from_value(json).expect("Failed to parse from json");
1786        assert_eq!(actual, update, "Parsed value is not equal to expected");
1787        let restored: TableUpdate = serde_json::from_str(
1788            &serde_json::to_string(&actual).expect("Failed to serialize to json"),
1789        )
1790        .expect("Failed to parse from serialized json");
1791        assert_eq!(restored, update);
1792    }
1793
1794    #[test]
1795    fn test_remove_snapshots() {
1796        let json = r#"
1797{
1798    "action": "remove-snapshots",
1799    "snapshot-ids": [
1800        1,
1801        2
1802    ]
1803}
1804        "#;
1805
1806        let update = TableUpdate::RemoveSnapshots {
1807            snapshot_ids: vec![1, 2],
1808        };
1809        test_serde_json(json, update);
1810    }
1811
1812    #[test]
1813    fn test_remove_snapshot_ref() {
1814        let json = r#"
1815{
1816    "action": "remove-snapshot-ref",
1817    "ref-name": "snapshot-ref"
1818}
1819        "#;
1820
1821        let update = TableUpdate::RemoveSnapshotRef {
1822            ref_name: "snapshot-ref".to_string(),
1823        };
1824        test_serde_json(json, update);
1825    }
1826
1827    #[test]
1828    fn test_set_snapshot_ref_tag() {
1829        let json = r#"
1830{
1831    "action": "set-snapshot-ref",
1832    "type": "tag",
1833    "ref-name": "hank",
1834    "snapshot-id": 1,
1835    "max-ref-age-ms": 1
1836}
1837        "#;
1838
1839        let update = TableUpdate::SetSnapshotRef {
1840            ref_name: "hank".to_string(),
1841            reference: SnapshotReference {
1842                snapshot_id: 1,
1843                retention: SnapshotRetention::Tag {
1844                    max_ref_age_ms: Some(1),
1845                },
1846            },
1847        };
1848
1849        test_serde_json(json, update);
1850    }
1851
1852    #[test]
1853    fn test_set_snapshot_ref_branch() {
1854        let json = r#"
1855{
1856    "action": "set-snapshot-ref",
1857    "type": "branch",
1858    "ref-name": "hank",
1859    "snapshot-id": 1,
1860    "min-snapshots-to-keep": 2,
1861    "max-snapshot-age-ms": 3,
1862    "max-ref-age-ms": 4
1863}
1864        "#;
1865
1866        let update = TableUpdate::SetSnapshotRef {
1867            ref_name: "hank".to_string(),
1868            reference: SnapshotReference {
1869                snapshot_id: 1,
1870                retention: SnapshotRetention::Branch {
1871                    min_snapshots_to_keep: Some(2),
1872                    max_snapshot_age_ms: Some(3),
1873                    max_ref_age_ms: Some(4),
1874                },
1875            },
1876        };
1877
1878        test_serde_json(json, update);
1879    }
1880
1881    #[test]
1882    fn test_set_properties() {
1883        let json = r#"
1884{
1885    "action": "set-properties",
1886    "updates": {
1887        "prop1": "v1",
1888        "prop2": "v2"
1889    }
1890}
1891        "#;
1892
1893        let update = TableUpdate::SetProperties {
1894            updates: vec![
1895                ("prop1".to_string(), "v1".to_string()),
1896                ("prop2".to_string(), "v2".to_string()),
1897            ]
1898            .into_iter()
1899            .collect(),
1900        };
1901
1902        test_serde_json(json, update);
1903    }
1904
1905    #[test]
1906    fn test_remove_properties() {
1907        let json = r#"
1908{
1909    "action": "remove-properties",
1910    "removals": [
1911        "prop1",
1912        "prop2"
1913    ]
1914}
1915        "#;
1916
1917        let update = TableUpdate::RemoveProperties {
1918            removals: vec!["prop1".to_string(), "prop2".to_string()],
1919        };
1920
1921        test_serde_json(json, update);
1922    }
1923
1924    #[test]
1925    fn test_set_location() {
1926        let json = r#"
1927{
1928    "action": "set-location",
1929    "location": "s3://bucket/warehouse/tbl_location"
1930}
1931    "#;
1932
1933        let update = TableUpdate::SetLocation {
1934            location: "s3://bucket/warehouse/tbl_location".to_string(),
1935        };
1936
1937        test_serde_json(json, update);
1938    }
1939
1940    #[test]
1941    fn test_table_update_apply() {
1942        let table_creation = TableCreation::builder()
1943            .location("s3://db/table".to_string())
1944            .name("table".to_string())
1945            .properties(HashMap::new())
1946            .schema(Schema::builder().build().unwrap())
1947            .build();
1948        let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
1949            .unwrap()
1950            .build()
1951            .unwrap()
1952            .metadata;
1953        let table_metadata_builder = TableMetadataBuilder::new_from_metadata(
1954            table_metadata,
1955            Some("s3://db/table/metadata/metadata1.gz.json".to_string()),
1956        );
1957
1958        let uuid = uuid::Uuid::new_v4();
1959        let update = TableUpdate::AssignUuid { uuid };
1960        let updated_metadata = update
1961            .apply(table_metadata_builder)
1962            .unwrap()
1963            .build()
1964            .unwrap()
1965            .metadata;
1966        assert_eq!(updated_metadata.uuid(), uuid);
1967    }
1968
1969    #[test]
1970    fn test_view_assign_uuid() {
1971        test_serde_json(
1972            r#"
1973{
1974    "action": "assign-uuid",
1975    "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151"
1976}
1977        "#,
1978            ViewUpdate::AssignUuid {
1979                uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"),
1980            },
1981        );
1982    }
1983
1984    #[test]
1985    fn test_view_upgrade_format_version() {
1986        test_serde_json(
1987            r#"
1988{
1989    "action": "upgrade-format-version",
1990    "format-version": 1
1991}
1992        "#,
1993            ViewUpdate::UpgradeFormatVersion {
1994                format_version: ViewFormatVersion::V1,
1995            },
1996        );
1997    }
1998
1999    #[test]
2000    fn test_view_add_schema() {
2001        let test_schema = Schema::builder()
2002            .with_schema_id(1)
2003            .with_identifier_field_ids(vec![2])
2004            .with_fields(vec![
2005                NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
2006                NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
2007                NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
2008            ])
2009            .build()
2010            .unwrap();
2011        test_serde_json(
2012            r#"
2013{
2014    "action": "add-schema",
2015    "schema": {
2016        "type": "struct",
2017        "schema-id": 1,
2018        "fields": [
2019            {
2020                "id": 1,
2021                "name": "foo",
2022                "required": false,
2023                "type": "string"
2024            },
2025            {
2026                "id": 2,
2027                "name": "bar",
2028                "required": true,
2029                "type": "int"
2030            },
2031            {
2032                "id": 3,
2033                "name": "baz",
2034                "required": false,
2035                "type": "boolean"
2036            }
2037        ],
2038        "identifier-field-ids": [
2039            2
2040        ]
2041    },
2042    "last-column-id": 3
2043}
2044        "#,
2045            ViewUpdate::AddSchema {
2046                schema: test_schema.clone(),
2047                last_column_id: Some(3),
2048            },
2049        );
2050    }
2051
2052    #[test]
2053    fn test_view_set_location() {
2054        test_serde_json(
2055            r#"
2056{
2057    "action": "set-location",
2058    "location": "s3://db/view"
2059}
2060        "#,
2061            ViewUpdate::SetLocation {
2062                location: "s3://db/view".to_string(),
2063            },
2064        );
2065    }
2066
2067    #[test]
2068    fn test_view_set_properties() {
2069        test_serde_json(
2070            r#"
2071{
2072    "action": "set-properties",
2073    "updates": {
2074        "prop1": "v1",
2075        "prop2": "v2"
2076    }
2077}
2078        "#,
2079            ViewUpdate::SetProperties {
2080                updates: vec![
2081                    ("prop1".to_string(), "v1".to_string()),
2082                    ("prop2".to_string(), "v2".to_string()),
2083                ]
2084                .into_iter()
2085                .collect(),
2086            },
2087        );
2088    }
2089
2090    #[test]
2091    fn test_view_remove_properties() {
2092        test_serde_json(
2093            r#"
2094{
2095    "action": "remove-properties",
2096    "removals": [
2097        "prop1",
2098        "prop2"
2099    ]
2100}
2101        "#,
2102            ViewUpdate::RemoveProperties {
2103                removals: vec!["prop1".to_string(), "prop2".to_string()],
2104            },
2105        );
2106    }
2107
2108    #[test]
2109    fn test_view_add_view_version() {
2110        test_serde_json(
2111            r#"
2112{
2113    "action": "add-view-version",
2114    "view-version": {
2115            "version-id" : 1,
2116            "timestamp-ms" : 1573518431292,
2117            "schema-id" : 1,
2118            "default-catalog" : "prod",
2119            "default-namespace" : [ "default" ],
2120            "summary" : {
2121              "engine-name" : "Spark"
2122            },
2123            "representations" : [ {
2124              "type" : "sql",
2125              "sql" : "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2",
2126              "dialect" : "spark"
2127            } ]
2128    }
2129}
2130        "#,
2131            ViewUpdate::AddViewVersion {
2132                view_version: ViewVersion::builder()
2133                    .with_version_id(1)
2134                    .with_timestamp_ms(1573518431292)
2135                    .with_schema_id(1)
2136                    .with_default_catalog(Some("prod".to_string()))
2137                    .with_default_namespace(NamespaceIdent::from_strs(vec!["default"]).unwrap())
2138                    .with_summary(
2139                        vec![("engine-name".to_string(), "Spark".to_string())]
2140                            .into_iter()
2141                            .collect(),
2142                    )
2143                    .with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(SqlViewRepresentation {
2144                        sql: "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM events\nGROUP BY 2".to_string(),
2145                        dialect: "spark".to_string(),
2146                    })]))
2147                    .build(),
2148            },
2149        );
2150    }
2151
2152    #[test]
2153    fn test_view_set_current_view_version() {
2154        test_serde_json(
2155            r#"
2156{
2157    "action": "set-current-view-version",
2158    "view-version-id": 1
2159}
2160        "#,
2161            ViewUpdate::SetCurrentViewVersion { view_version_id: 1 },
2162        );
2163    }
2164
2165    #[test]
2166    fn test_remove_partition_specs_update() {
2167        test_serde_json(
2168            r#"
2169{
2170    "action": "remove-partition-specs",
2171    "spec-ids": [1, 2]
2172}
2173        "#,
2174            TableUpdate::RemovePartitionSpecs {
2175                spec_ids: vec![1, 2],
2176            },
2177        );
2178    }
2179
2180    #[test]
2181    fn test_set_statistics_file() {
2182        test_serde_json(
2183            r#"
2184        {
2185                "action": "set-statistics",
2186                "snapshot-id": 1940541653261589030,
2187                "statistics": {
2188                        "snapshot-id": 1940541653261589030,
2189                        "statistics-path": "s3://bucket/warehouse/stats.puffin",
2190                        "file-size-in-bytes": 124,
2191                        "file-footer-size-in-bytes": 27,
2192                        "blob-metadata": [
2193                                {
2194                                        "type": "boring-type",
2195                                        "snapshot-id": 1940541653261589030,
2196                                        "sequence-number": 2,
2197                                        "fields": [
2198                                                1
2199                                        ],
2200                                        "properties": {
2201                                                "prop-key": "prop-value"
2202                                        }
2203                                }
2204                        ]
2205                }
2206        }
2207        "#,
2208            TableUpdate::SetStatistics {
2209                statistics: StatisticsFile {
2210                    snapshot_id: 1940541653261589030,
2211                    statistics_path: "s3://bucket/warehouse/stats.puffin".to_string(),
2212                    file_size_in_bytes: 124,
2213                    file_footer_size_in_bytes: 27,
2214                    key_metadata: None,
2215                    blob_metadata: vec![BlobMetadata {
2216                        r#type: "boring-type".to_string(),
2217                        snapshot_id: 1940541653261589030,
2218                        sequence_number: 2,
2219                        fields: vec![1],
2220                        properties: vec![("prop-key".to_string(), "prop-value".to_string())]
2221                            .into_iter()
2222                            .collect(),
2223                    }],
2224                },
2225            },
2226        );
2227    }
2228
2229    #[test]
2230    fn test_remove_statistics_file() {
2231        test_serde_json(
2232            r#"
2233        {
2234                "action": "remove-statistics",
2235                "snapshot-id": 1940541653261589030
2236        }
2237        "#,
2238            TableUpdate::RemoveStatistics {
2239                snapshot_id: 1940541653261589030,
2240            },
2241        );
2242    }
2243
2244    #[test]
2245    fn test_set_partition_statistics_file() {
2246        test_serde_json(
2247            r#"
2248            {
2249                "action": "set-partition-statistics",
2250                "partition-statistics": {
2251                    "snapshot-id": 1940541653261589030,
2252                    "statistics-path": "s3://bucket/warehouse/stats1.parquet",
2253                    "file-size-in-bytes": 43
2254                }
2255            }
2256            "#,
2257            TableUpdate::SetPartitionStatistics {
2258                partition_statistics: PartitionStatisticsFile {
2259                    snapshot_id: 1940541653261589030,
2260                    statistics_path: "s3://bucket/warehouse/stats1.parquet".to_string(),
2261                    file_size_in_bytes: 43,
2262                },
2263            },
2264        )
2265    }
2266
2267    #[test]
2268    fn test_remove_partition_statistics_file() {
2269        test_serde_json(
2270            r#"
2271            {
2272                "action": "remove-partition-statistics",
2273                "snapshot-id": 1940541653261589030
2274            }
2275            "#,
2276            TableUpdate::RemovePartitionStatistics {
2277                snapshot_id: 1940541653261589030,
2278            },
2279        )
2280    }
2281
2282    #[test]
2283    fn test_remove_schema_update() {
2284        test_serde_json(
2285            r#"
2286                {
2287                    "action": "remove-schemas",
2288                    "schema-ids": [1, 2]
2289                }        
2290            "#,
2291            TableUpdate::RemoveSchemas {
2292                schema_ids: vec![1, 2],
2293            },
2294        );
2295    }
2296
2297    #[test]
2298    fn test_add_encryption_key() {
2299        let key_bytes = "key".as_bytes();
2300        let encoded_key = base64::engine::general_purpose::STANDARD.encode(key_bytes);
2301        test_serde_json(
2302            format!(
2303                r#"
2304                {{
2305                    "action": "add-encryption-key",
2306                    "encryption-key": {{
2307                        "key-id": "a",
2308                        "encrypted-key-metadata": "{encoded_key}",
2309                        "encrypted-by-id": "b"
2310                    }}
2311                }}        
2312            "#
2313            ),
2314            TableUpdate::AddEncryptionKey {
2315                encryption_key: EncryptedKey::builder()
2316                    .key_id("a")
2317                    .encrypted_key_metadata(key_bytes.to_vec())
2318                    .encrypted_by_id("b")
2319                    .build(),
2320            },
2321        );
2322    }
2323
2324    #[test]
2325    fn test_remove_encryption_key() {
2326        test_serde_json(
2327            r#"
2328                {
2329                    "action": "remove-encryption-key",
2330                    "key-id": "a"
2331                }        
2332            "#,
2333            TableUpdate::RemoveEncryptionKey {
2334                key_id: "a".to_string(),
2335            },
2336        );
2337    }
2338
2339    #[test]
2340    fn test_table_commit() {
2341        let table = {
2342            let file = File::open(format!(
2343                "{}/testdata/table_metadata/{}",
2344                env!("CARGO_MANIFEST_DIR"),
2345                "TableMetadataV2Valid.json"
2346            ))
2347            .unwrap();
2348            let reader = BufReader::new(file);
2349            let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2350
2351            Table::builder()
2352                .metadata(resp)
2353                .metadata_location("s3://bucket/test/location/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string())
2354                .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2355                .file_io(FileIOBuilder::new("memory").build().unwrap())
2356                .build()
2357                .unwrap()
2358        };
2359
2360        let updates = vec![
2361            TableUpdate::SetLocation {
2362                location: "s3://bucket/test/new_location/data".to_string(),
2363            },
2364            TableUpdate::SetProperties {
2365                updates: vec![
2366                    ("prop1".to_string(), "v1".to_string()),
2367                    ("prop2".to_string(), "v2".to_string()),
2368                ]
2369                .into_iter()
2370                .collect(),
2371            },
2372        ];
2373
2374        let requirements = vec![TableRequirement::UuidMatch {
2375            uuid: table.metadata().table_uuid,
2376        }];
2377
2378        let table_commit = TableCommit::builder()
2379            .ident(table.identifier().to_owned())
2380            .updates(updates)
2381            .requirements(requirements)
2382            .build();
2383
2384        let updated_table = table_commit.apply(table).unwrap();
2385
2386        assert_eq!(
2387            updated_table.metadata().properties.get("prop1").unwrap(),
2388            "v1"
2389        );
2390        assert_eq!(
2391            updated_table.metadata().properties.get("prop2").unwrap(),
2392            "v2"
2393        );
2394
2395        // metadata version should be bumped
2396        assert!(
2397            updated_table
2398                .metadata_location()
2399                .unwrap()
2400                .starts_with("s3://bucket/test/location/metadata/00001-")
2401        );
2402
2403        assert_eq!(
2404            updated_table.metadata().location,
2405            "s3://bucket/test/new_location/data",
2406        );
2407    }
2408}