iceberg/spec/
table_metadata_builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use uuid::Uuid;
22
23use super::{
24    DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, FormatVersion, MAIN_BRANCH, MetadataLog,
25    ONE_MINUTE_MS, PartitionSpec, PartitionSpecBuilder, PartitionStatisticsFile, Schema, SchemaRef,
26    Snapshot, SnapshotLog, SnapshotReference, SnapshotRetention, SortOrder, SortOrderRef,
27    StatisticsFile, StructType, TableMetadata, TableProperties, UNPARTITIONED_LAST_ASSIGNED_ID,
28    UnboundPartitionSpec,
29};
30use crate::error::{Error, ErrorKind, Result};
31use crate::spec::{EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE};
32use crate::{TableCreation, TableUpdate};
33
34const FIRST_FIELD_ID: u32 = 1;
35
36/// Manipulating table metadata.
37///
38/// For this builder the order of called functions matters. Functions are applied in-order.
39/// All operations applied to the `TableMetadata` are tracked in `changes` as  a chronologically
40/// ordered vec of `TableUpdate`.
41/// If an operation does not lead to a change of the `TableMetadata`, the corresponding update
42/// is omitted from `changes`.
43///
44/// Unlike a typical builder pattern, the order of function calls matters.
45/// Some basic rules:
46/// - `add_schema` must be called before `set_current_schema`.
47/// - If a new partition spec and schema are added, the schema should be added first.
48#[derive(Debug, Clone)]
49pub struct TableMetadataBuilder {
50    metadata: TableMetadata,
51    changes: Vec<TableUpdate>,
52    last_added_schema_id: Option<i32>,
53    last_added_spec_id: Option<i32>,
54    last_added_order_id: Option<i64>,
55    // None if this is a new table (from_metadata) method not used
56    previous_history_entry: Option<MetadataLog>,
57    last_updated_ms: Option<i64>,
58}
59
60#[derive(Debug, Clone, PartialEq)]
61/// Result of modifying or creating a `TableMetadata`.
62pub struct TableMetadataBuildResult {
63    /// The new `TableMetadata`.
64    pub metadata: TableMetadata,
65    /// The changes that were applied to the metadata.
66    pub changes: Vec<TableUpdate>,
67    /// Expired metadata logs
68    pub expired_metadata_logs: Vec<MetadataLog>,
69}
70
71impl TableMetadataBuilder {
72    /// Proxy id for "last added" items, including schema, partition spec, sort order.
73    pub const LAST_ADDED: i32 = -1;
74
75    /// Create a `TableMetadata` object from scratch.
76    ///
77    /// This method re-assign ids of fields in the schema, schema.id, sort_order.id and
78    /// spec.id. It should only be used to create new table metadata from scratch.
79    pub fn new(
80        schema: Schema,
81        spec: impl Into<UnboundPartitionSpec>,
82        sort_order: SortOrder,
83        location: String,
84        format_version: FormatVersion,
85        properties: HashMap<String, String>,
86    ) -> Result<Self> {
87        // Re-assign field_ids, schema.id, sort_order.id and spec.id for a new table.
88        let (fresh_schema, fresh_spec, fresh_sort_order) =
89            Self::reassign_ids(schema, spec.into(), sort_order)?;
90        let schema_id = fresh_schema.schema_id();
91
92        let builder = Self {
93            metadata: TableMetadata {
94                format_version,
95                table_uuid: Uuid::now_v7(),
96                location: "".to_string(), // Overwritten immediately by set_location
97                last_sequence_number: 0,
98                last_updated_ms: 0,    // Overwritten by build() if not set before
99                last_column_id: -1,    // Overwritten immediately by add_current_schema
100                current_schema_id: -1, // Overwritten immediately by add_current_schema
101                schemas: HashMap::new(),
102                partition_specs: HashMap::new(),
103                default_spec: Arc::new(
104                    // The spec id (-1) is just a proxy value and can be any negative number.
105                    // 0 would lead to wrong changes in the builder if the provided spec by the user is
106                    // also unpartitioned.
107                    // The `default_spec` value is always replaced at the end of this method by he `add_default_partition_spec`
108                    // method.
109                    PartitionSpec::unpartition_spec().with_spec_id(-1),
110                ), // Overwritten immediately by add_default_partition_spec
111                default_partition_type: StructType::new(vec![]),
112                last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID,
113                properties: HashMap::new(),
114                current_snapshot_id: None,
115                snapshots: HashMap::new(),
116                snapshot_log: vec![],
117                sort_orders: HashMap::new(),
118                metadata_log: vec![],
119                default_sort_order_id: -1, // Overwritten immediately by add_default_sort_order
120                refs: HashMap::default(),
121                statistics: HashMap::new(),
122                partition_statistics: HashMap::new(),
123                encryption_keys: HashMap::new(),
124                next_row_id: INITIAL_ROW_ID,
125            },
126            last_updated_ms: None,
127            changes: vec![],
128            last_added_schema_id: Some(schema_id),
129            last_added_spec_id: None,
130            last_added_order_id: None,
131            previous_history_entry: None,
132        };
133
134        builder
135            .set_location(location)
136            .add_current_schema(fresh_schema)?
137            .add_default_partition_spec(fresh_spec.into_unbound())?
138            .add_default_sort_order(fresh_sort_order)?
139            .set_properties(properties)
140    }
141
142    /// Creates a new table metadata builder from the given metadata to modify it.
143    /// `current_file_location` is the location where the current version
144    /// of the metadata file is stored. This is used to update the metadata log.
145    /// If `current_file_location` is `None`, the metadata log will not be updated.
146    /// This should only be used to stage-create tables.
147    #[must_use]
148    pub fn new_from_metadata(
149        previous: TableMetadata,
150        current_file_location: Option<String>,
151    ) -> Self {
152        Self {
153            previous_history_entry: current_file_location.map(|l| MetadataLog {
154                metadata_file: l,
155                timestamp_ms: previous.last_updated_ms,
156            }),
157            metadata: previous,
158            changes: Vec::default(),
159            last_added_schema_id: None,
160            last_added_spec_id: None,
161            last_added_order_id: None,
162            last_updated_ms: None,
163        }
164    }
165
166    /// Creates a new table metadata builder from the given table creation.
167    pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
168        let TableCreation {
169            name: _,
170            location,
171            schema,
172            partition_spec,
173            sort_order,
174            properties,
175            format_version,
176        } = table_creation;
177
178        let location = location.ok_or_else(|| {
179            Error::new(
180                ErrorKind::DataInvalid,
181                "Can't create table without location",
182            )
183        })?;
184        let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec {
185            spec_id: None,
186            fields: vec![],
187        });
188
189        Self::new(
190            schema,
191            partition_spec,
192            sort_order.unwrap_or(SortOrder::unsorted_order()),
193            location,
194            format_version,
195            properties,
196        )
197    }
198
199    /// Changes uuid of table metadata.
200    pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
201        if self.metadata.table_uuid != uuid {
202            self.metadata.table_uuid = uuid;
203            self.changes.push(TableUpdate::AssignUuid { uuid });
204        }
205
206        self
207    }
208
209    /// Upgrade `FormatVersion`. Downgrades are not allowed.
210    ///
211    /// # Errors
212    /// - Cannot downgrade to older format versions.
213    pub fn upgrade_format_version(mut self, format_version: FormatVersion) -> Result<Self> {
214        if format_version < self.metadata.format_version {
215            return Err(Error::new(
216                ErrorKind::DataInvalid,
217                format!(
218                    "Cannot downgrade FormatVersion from {} to {}",
219                    self.metadata.format_version, format_version
220                ),
221            ));
222        }
223
224        if format_version != self.metadata.format_version {
225            match format_version {
226                FormatVersion::V1 => {
227                    // No changes needed for V1
228                }
229                FormatVersion::V2 => {
230                    self.metadata.format_version = format_version;
231                    self.changes
232                        .push(TableUpdate::UpgradeFormatVersion { format_version });
233                }
234                FormatVersion::V3 => {
235                    self.metadata.format_version = format_version;
236                    self.changes
237                        .push(TableUpdate::UpgradeFormatVersion { format_version });
238                }
239            }
240        }
241
242        Ok(self)
243    }
244
245    /// Set properties. If a property already exists, it will be overwritten.
246    ///
247    /// If a reserved property is set, the corresponding action is performed and the property is not persisted.
248    /// Currently the following reserved properties are supported:
249    /// * format-version: Set the format version of the table.
250    ///
251    /// # Errors
252    /// - If properties contains a reserved property
253    pub fn set_properties(mut self, properties: HashMap<String, String>) -> Result<Self> {
254        // List of specified properties that are RESERVED and should not be persisted.
255        let reserved_properties = properties
256            .keys()
257            .filter(|key| TableProperties::RESERVED_PROPERTIES.contains(&key.as_str()))
258            .map(ToString::to_string)
259            .collect::<Vec<_>>();
260
261        if !reserved_properties.is_empty() {
262            return Err(Error::new(
263                ErrorKind::DataInvalid,
264                format!(
265                    "Table properties should not contain reserved properties, but got: [{}]",
266                    reserved_properties.join(", ")
267                ),
268            ));
269        }
270
271        if properties.is_empty() {
272            return Ok(self);
273        }
274
275        self.metadata.properties.extend(properties.clone());
276        self.changes.push(TableUpdate::SetProperties {
277            updates: properties,
278        });
279
280        Ok(self)
281    }
282
283    /// Remove properties from the table metadata.
284    /// Does nothing if the key is not present.
285    ///
286    /// # Errors
287    /// - If properties to remove contains a reserved property
288    pub fn remove_properties(mut self, properties: &[String]) -> Result<Self> {
289        // remove duplicates
290        let properties = properties.iter().cloned().collect::<HashSet<_>>();
291
292        // disallow removal of reserved properties
293        let reserved_properties = properties
294            .iter()
295            .filter(|key| TableProperties::RESERVED_PROPERTIES.contains(&key.as_str()))
296            .map(ToString::to_string)
297            .collect::<Vec<_>>();
298
299        if !reserved_properties.is_empty() {
300            return Err(Error::new(
301                ErrorKind::DataInvalid,
302                format!(
303                    "Table properties to remove contain reserved properties: [{}]",
304                    reserved_properties.join(", ")
305                ),
306            ));
307        }
308
309        for property in &properties {
310            self.metadata.properties.remove(property);
311        }
312
313        if !properties.is_empty() {
314            self.changes.push(TableUpdate::RemoveProperties {
315                removals: properties.into_iter().collect(),
316            });
317        }
318
319        Ok(self)
320    }
321
322    /// Set the location of the table, stripping any trailing slashes.
323    pub fn set_location(mut self, location: String) -> Self {
324        let location = location.trim_end_matches('/').to_string();
325        if self.metadata.location != location {
326            self.changes.push(TableUpdate::SetLocation {
327                location: location.clone(),
328            });
329            self.metadata.location = location;
330        }
331
332        self
333    }
334
335    /// Add a snapshot to the table metadata.
336    ///
337    /// # Errors
338    /// - Snapshot id already exists.
339    /// - For format version > 1: the sequence number of the snapshot is lower than the highest sequence number specified so far.
340    /// - For format version >= 3: the first-row-id of the snapshot is lower than the next-row-id of the table.
341    /// - For format version >= 3: added-rows is null or first-row-id is null.
342    /// - For format version >= 3: next-row-id would overflow when adding added-rows.
343    pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> {
344        if self
345            .metadata
346            .snapshots
347            .contains_key(&snapshot.snapshot_id())
348        {
349            return Err(Error::new(
350                ErrorKind::DataInvalid,
351                format!("Snapshot already exists for: '{}'", snapshot.snapshot_id()),
352            ));
353        }
354
355        if self.metadata.format_version != FormatVersion::V1
356            && snapshot.sequence_number() <= self.metadata.last_sequence_number
357            && snapshot.parent_snapshot_id().is_some()
358        {
359            return Err(Error::new(
360                ErrorKind::DataInvalid,
361                format!(
362                    "Cannot add snapshot with sequence number {} older than last sequence number {}",
363                    snapshot.sequence_number(),
364                    self.metadata.last_sequence_number
365                ),
366            ));
367        }
368
369        if let Some(last) = self.metadata.snapshot_log.last() {
370            // commits can happen concurrently from different machines.
371            // A tolerance helps us avoid failure for small clock skew
372            if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS {
373                return Err(Error::new(
374                    ErrorKind::DataInvalid,
375                    format!(
376                        "Invalid snapshot timestamp {}: before last snapshot timestamp {}",
377                        snapshot.timestamp_ms(),
378                        last.timestamp_ms
379                    ),
380                ));
381            }
382        }
383
384        let max_last_updated = self
385            .last_updated_ms
386            .unwrap_or_default()
387            .max(self.metadata.last_updated_ms);
388        if snapshot.timestamp_ms() - max_last_updated < -ONE_MINUTE_MS {
389            return Err(Error::new(
390                ErrorKind::DataInvalid,
391                format!(
392                    "Invalid snapshot timestamp {}: before last updated timestamp {}",
393                    snapshot.timestamp_ms(),
394                    max_last_updated
395                ),
396            ));
397        }
398
399        let mut added_rows = None;
400        if self.metadata.format_version >= MIN_FORMAT_VERSION_ROW_LINEAGE {
401            if let Some((first_row_id, added_rows_count)) = snapshot.row_range() {
402                if first_row_id < self.metadata.next_row_id {
403                    return Err(Error::new(
404                        ErrorKind::DataInvalid,
405                        format!(
406                            "Cannot add a snapshot, first-row-id is behind table next-row-id: {first_row_id} < {}",
407                            self.metadata.next_row_id
408                        ),
409                    ));
410                }
411
412                added_rows = Some(added_rows_count);
413            } else {
414                return Err(Error::new(
415                    ErrorKind::DataInvalid,
416                    format!(
417                        "Cannot add a snapshot: first-row-id is null. first-row-id must be set for format version >= {MIN_FORMAT_VERSION_ROW_LINEAGE}",
418                    ),
419                ));
420            }
421        }
422
423        if let Some(added_rows) = added_rows {
424            self.metadata.next_row_id = self
425                .metadata
426                .next_row_id
427                .checked_add(added_rows)
428                .ok_or_else(|| {
429                    Error::new(
430                        ErrorKind::DataInvalid,
431                        "Cannot add snapshot: next-row-id overflowed when adding added-rows",
432                    )
433                })?;
434        }
435
436        // Mutation happens in next line - must be infallible from here
437        self.changes.push(TableUpdate::AddSnapshot {
438            snapshot: snapshot.clone(),
439        });
440
441        self.last_updated_ms = Some(snapshot.timestamp_ms());
442        self.metadata.last_sequence_number = snapshot.sequence_number();
443        self.metadata
444            .snapshots
445            .insert(snapshot.snapshot_id(), snapshot.into());
446
447        Ok(self)
448    }
449
450    /// Append a snapshot to the specified branch.
451    /// Retention settings from the `branch` are re-used.
452    ///
453    /// # Errors
454    /// - Any of the preconditions of `self.add_snapshot` are not met.
455    pub fn set_branch_snapshot(self, snapshot: Snapshot, branch: &str) -> Result<Self> {
456        let reference = self.metadata.refs.get(branch).cloned();
457
458        let reference = if let Some(mut reference) = reference {
459            if !reference.is_branch() {
460                return Err(Error::new(
461                    ErrorKind::DataInvalid,
462                    format!("Cannot append snapshot to non-branch reference '{branch}'",),
463                ));
464            }
465
466            reference.snapshot_id = snapshot.snapshot_id();
467            reference
468        } else {
469            SnapshotReference {
470                snapshot_id: snapshot.snapshot_id(),
471                retention: SnapshotRetention::Branch {
472                    min_snapshots_to_keep: None,
473                    max_snapshot_age_ms: None,
474                    max_ref_age_ms: None,
475                },
476            }
477        };
478
479        self.add_snapshot(snapshot)?.set_ref(branch, reference)
480    }
481
482    /// Remove snapshots by its ids from the table metadata.
483    /// Does nothing if a snapshot id is not present.
484    /// Keeps as changes only the snapshots that were actually removed.
485    pub fn remove_snapshots(mut self, snapshot_ids: &[i64]) -> Self {
486        let mut removed_snapshots = Vec::with_capacity(snapshot_ids.len());
487
488        self.metadata.snapshots.retain(|k, _| {
489            if snapshot_ids.contains(k) {
490                removed_snapshots.push(*k);
491                false
492            } else {
493                true
494            }
495        });
496
497        if !removed_snapshots.is_empty() {
498            self.changes.push(TableUpdate::RemoveSnapshots {
499                snapshot_ids: removed_snapshots,
500            });
501        }
502
503        // Remove refs that are no longer valid
504        self.metadata
505            .refs
506            .retain(|_, v| self.metadata.snapshots.contains_key(&v.snapshot_id));
507
508        self
509    }
510
511    /// Set a reference to a snapshot.
512    ///
513    /// # Errors
514    /// - The snapshot id is unknown.
515    pub fn set_ref(mut self, ref_name: &str, reference: SnapshotReference) -> Result<Self> {
516        if self
517            .metadata
518            .refs
519            .get(ref_name)
520            .is_some_and(|snap_ref| snap_ref.eq(&reference))
521        {
522            return Ok(self);
523        }
524
525        let Some(snapshot) = self.metadata.snapshots.get(&reference.snapshot_id) else {
526            return Err(Error::new(
527                ErrorKind::DataInvalid,
528                format!(
529                    "Cannot set '{ref_name}' to unknown snapshot: '{}'",
530                    reference.snapshot_id
531                ),
532            ));
533        };
534
535        // Update last_updated_ms to the exact timestamp of the snapshot if it was added in this commit
536        let is_added_snapshot = self.changes.iter().any(|update| {
537            matches!(update, TableUpdate::AddSnapshot { snapshot: snap } if snap.snapshot_id() == snapshot.snapshot_id())
538        });
539        if is_added_snapshot {
540            self.last_updated_ms = Some(snapshot.timestamp_ms());
541        }
542
543        // Current snapshot id is set only for the main branch
544        if ref_name == MAIN_BRANCH {
545            self.metadata.current_snapshot_id = Some(snapshot.snapshot_id());
546            let timestamp_ms = if let Some(last_updated_ms) = self.last_updated_ms {
547                last_updated_ms
548            } else {
549                let last_updated_ms = chrono::Utc::now().timestamp_millis();
550                self.last_updated_ms = Some(last_updated_ms);
551                last_updated_ms
552            };
553
554            self.metadata.snapshot_log.push(SnapshotLog {
555                snapshot_id: snapshot.snapshot_id(),
556                timestamp_ms,
557            });
558        }
559
560        self.changes.push(TableUpdate::SetSnapshotRef {
561            ref_name: ref_name.to_string(),
562            reference: reference.clone(),
563        });
564        self.metadata.refs.insert(ref_name.to_string(), reference);
565
566        Ok(self)
567    }
568
569    /// Remove a reference
570    ///
571    /// If `ref_name='main'` the current snapshot id is set to -1.
572    pub fn remove_ref(mut self, ref_name: &str) -> Self {
573        if ref_name == MAIN_BRANCH {
574            self.metadata.current_snapshot_id = None;
575            self.metadata.snapshot_log.clear();
576        }
577
578        if self.metadata.refs.remove(ref_name).is_some() || ref_name == MAIN_BRANCH {
579            self.changes.push(TableUpdate::RemoveSnapshotRef {
580                ref_name: ref_name.to_string(),
581            });
582        }
583
584        self
585    }
586
587    /// Set statistics for a snapshot
588    pub fn set_statistics(mut self, statistics: StatisticsFile) -> Self {
589        self.metadata
590            .statistics
591            .insert(statistics.snapshot_id, statistics.clone());
592        self.changes.push(TableUpdate::SetStatistics {
593            statistics: statistics.clone(),
594        });
595        self
596    }
597
598    /// Remove statistics for a snapshot
599    pub fn remove_statistics(mut self, snapshot_id: i64) -> Self {
600        let previous = self.metadata.statistics.remove(&snapshot_id);
601        if previous.is_some() {
602            self.changes
603                .push(TableUpdate::RemoveStatistics { snapshot_id });
604        }
605        self
606    }
607
608    /// Set partition statistics
609    pub fn set_partition_statistics(
610        mut self,
611        partition_statistics_file: PartitionStatisticsFile,
612    ) -> Self {
613        self.metadata.partition_statistics.insert(
614            partition_statistics_file.snapshot_id,
615            partition_statistics_file.clone(),
616        );
617        self.changes.push(TableUpdate::SetPartitionStatistics {
618            partition_statistics: partition_statistics_file,
619        });
620        self
621    }
622
623    /// Remove partition statistics
624    pub fn remove_partition_statistics(mut self, snapshot_id: i64) -> Self {
625        let previous = self.metadata.partition_statistics.remove(&snapshot_id);
626        if previous.is_some() {
627            self.changes
628                .push(TableUpdate::RemovePartitionStatistics { snapshot_id });
629        }
630        self
631    }
632
633    /// Add a schema to the table metadata.
634    ///
635    /// The provided `schema.schema_id` may not be used.
636    ///
637    /// Important: Use this method with caution. The builder does not check
638    /// if the added schema is compatible with the current schema.
639    pub fn add_schema(mut self, schema: Schema) -> Result<Self> {
640        // Validate that new schema fields don't conflict with existing partition field names
641        self.validate_schema_field_names(&schema)?;
642
643        let new_schema_id = self.reuse_or_create_new_schema_id(&schema);
644        let schema_found = self.metadata.schemas.contains_key(&new_schema_id);
645
646        if schema_found {
647            if self.last_added_schema_id != Some(new_schema_id) {
648                self.changes.push(TableUpdate::AddSchema {
649                    schema: schema.clone(),
650                });
651                self.last_added_schema_id = Some(new_schema_id);
652            }
653
654            return Ok(self);
655        }
656
657        // New schemas might contain only old columns. In this case last_column_id should not be
658        // reduced.
659        self.metadata.last_column_id =
660            std::cmp::max(self.metadata.last_column_id, schema.highest_field_id());
661
662        // Set schema-id
663        let schema = match new_schema_id == schema.schema_id() {
664            true => schema,
665            false => schema.with_schema_id(new_schema_id),
666        };
667
668        self.metadata
669            .schemas
670            .insert(new_schema_id, schema.clone().into());
671
672        self.changes.push(TableUpdate::AddSchema { schema });
673
674        self.last_added_schema_id = Some(new_schema_id);
675
676        Ok(self)
677    }
678
679    /// Set the current schema id.
680    ///
681    /// If `schema_id` is -1, the last added schema is set as the current schema.
682    ///
683    /// Errors:
684    /// - provided `schema_id` is -1 but no schema has been added via `add_schema`.
685    /// - No schema with the provided `schema_id` exists.
686    pub fn set_current_schema(mut self, mut schema_id: i32) -> Result<Self> {
687        if schema_id == Self::LAST_ADDED {
688            schema_id = self.last_added_schema_id.ok_or_else(|| {
689                Error::new(
690                    ErrorKind::DataInvalid,
691                    "Cannot set current schema to last added schema: no schema has been added.",
692                )
693            })?;
694        };
695        let schema_id = schema_id; // Make immutable
696
697        if schema_id == self.metadata.current_schema_id {
698            return Ok(self);
699        }
700
701        let _schema = self.metadata.schemas.get(&schema_id).ok_or_else(|| {
702            Error::new(
703                ErrorKind::DataInvalid,
704                format!("Cannot set current schema to unknown schema with id: '{schema_id}'"),
705            )
706        })?;
707
708        // Old partition specs and sort-orders should be preserved even if they are not compatible with the new schema,
709        // so that older metadata can still be interpreted.
710        // Default partition spec and sort order are checked in the build() method
711        // which allows other default partition specs and sort orders to be set before the build.
712
713        self.metadata.current_schema_id = schema_id;
714
715        if self.last_added_schema_id == Some(schema_id) {
716            self.changes.push(TableUpdate::SetCurrentSchema {
717                schema_id: Self::LAST_ADDED,
718            });
719        } else {
720            self.changes
721                .push(TableUpdate::SetCurrentSchema { schema_id });
722        }
723
724        Ok(self)
725    }
726
727    /// Add a schema and set it as the current schema.
728    pub fn add_current_schema(self, schema: Schema) -> Result<Self> {
729        self.add_schema(schema)?
730            .set_current_schema(Self::LAST_ADDED)
731    }
732
733    /// Validate schema field names against partition field names across all historical schemas.
734    ///
735    /// Due to Iceberg's multi-version property, this check ignores existing schema fields
736    /// that match partition names (schema evolution allows re-adding previously removed fields).
737    /// Only NEW field names that conflict with partition names are rejected.
738    ///
739    /// # Errors
740    /// - Schema field name conflicts with partition field name but doesn't exist in any historical schema.
741    fn validate_schema_field_names(&self, schema: &Schema) -> Result<()> {
742        if self.metadata.schemas.is_empty() {
743            return Ok(());
744        }
745
746        for field_name in schema.field_id_to_name_map().values() {
747            let has_partition_conflict = self.metadata.partition_name_exists(field_name);
748            let is_new_field = !self.metadata.name_exists_in_any_schema(field_name);
749
750            if has_partition_conflict && is_new_field {
751                return Err(Error::new(
752                    ErrorKind::DataInvalid,
753                    format!(
754                        "Cannot add schema field '{field_name}' because it conflicts with existing partition field name. \
755                         Schema evolution cannot introduce field names that match existing partition field names."
756                    ),
757                ));
758            }
759        }
760
761        Ok(())
762    }
763
764    /// Validate partition field names against schema field names across all historical schemas.
765    ///
766    /// Due to Iceberg's multi-version property, partition fields can share names with schema fields
767    /// if they meet specific requirements (identity transform + matching source field ID).
768    /// This validation enforces those rules across all historical schema versions.
769    ///
770    /// # Errors
771    /// - Partition field name conflicts with schema field name but doesn't use identity transform.
772    /// - Partition field uses identity transform but references wrong source field ID.
773    fn validate_partition_field_names(&self, unbound_spec: &UnboundPartitionSpec) -> Result<()> {
774        if self.metadata.schemas.is_empty() {
775            return Ok(());
776        }
777
778        let current_schema = self.get_current_schema()?;
779        for partition_field in unbound_spec.fields() {
780            let exists_in_any_schema = self
781                .metadata
782                .name_exists_in_any_schema(&partition_field.name);
783
784            // Skip if partition field name doesn't conflict with any schema field
785            if !exists_in_any_schema {
786                continue;
787            }
788
789            // If name exists in schemas, validate against current schema rules
790            if let Some(schema_field) = current_schema.field_by_name(&partition_field.name) {
791                let is_identity_transform =
792                    partition_field.transform == crate::spec::Transform::Identity;
793                let has_matching_source_id = schema_field.id == partition_field.source_id;
794
795                if !is_identity_transform {
796                    return Err(Error::new(
797                        ErrorKind::DataInvalid,
798                        format!(
799                            "Cannot create partition with name '{}' that conflicts with schema field and is not an identity transform.",
800                            partition_field.name
801                        ),
802                    ));
803                }
804
805                if !has_matching_source_id {
806                    return Err(Error::new(
807                        ErrorKind::DataInvalid,
808                        format!(
809                            "Cannot create identity partition sourced from different field in schema. \
810                             Field name '{}' has id `{}` in schema but partition source id is `{}`",
811                            partition_field.name, schema_field.id, partition_field.source_id
812                        ),
813                    ));
814                }
815            }
816        }
817
818        Ok(())
819    }
820
821    /// Add a partition spec to the table metadata.
822    ///
823    /// The spec is bound eagerly to the current schema.
824    /// If a schema is added in the same set of changes, the schema should be added first.
825    ///
826    /// Even if `unbound_spec.spec_id` is provided as `Some`, it may not be used.
827    ///
828    /// # Errors
829    /// - The partition spec cannot be bound to the current schema.
830    /// - The partition spec has non-sequential field ids and the table format version is 1.
831    pub fn add_partition_spec(mut self, unbound_spec: UnboundPartitionSpec) -> Result<Self> {
832        let schema = self.get_current_schema()?.clone();
833
834        // Check if partition field names conflict with schema field names across all schemas
835        self.validate_partition_field_names(&unbound_spec)?;
836
837        let spec = PartitionSpecBuilder::new_from_unbound(unbound_spec.clone(), schema)?
838            .with_last_assigned_field_id(self.metadata.last_partition_id)
839            .build()?;
840
841        let new_spec_id = self.reuse_or_create_new_spec_id(&spec);
842        let spec_found = self.metadata.partition_specs.contains_key(&new_spec_id);
843        let spec = spec.with_spec_id(new_spec_id);
844        let unbound_spec = unbound_spec.with_spec_id(new_spec_id);
845
846        if spec_found {
847            if self.last_added_spec_id != Some(new_spec_id) {
848                self.changes
849                    .push(TableUpdate::AddSpec { spec: unbound_spec });
850                self.last_added_spec_id = Some(new_spec_id);
851            }
852
853            return Ok(self);
854        }
855
856        if self.metadata.format_version <= FormatVersion::V1 && !spec.has_sequential_ids() {
857            return Err(Error::new(
858                ErrorKind::DataInvalid,
859                "Cannot add partition spec with non-sequential field ids to format version 1 table",
860            ));
861        }
862
863        let highest_field_id = spec
864            .highest_field_id()
865            .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID);
866        self.metadata
867            .partition_specs
868            .insert(new_spec_id, Arc::new(spec));
869        self.changes
870            .push(TableUpdate::AddSpec { spec: unbound_spec });
871
872        self.last_added_spec_id = Some(new_spec_id);
873        self.metadata.last_partition_id =
874            std::cmp::max(self.metadata.last_partition_id, highest_field_id);
875
876        Ok(self)
877    }
878
879    /// Set the default partition spec.
880    ///
881    /// # Errors
882    /// - spec_id is -1 but no spec has been added via `add_partition_spec`.
883    /// - No partition spec with the provided `spec_id` exists.
884    pub fn set_default_partition_spec(mut self, mut spec_id: i32) -> Result<Self> {
885        if spec_id == Self::LAST_ADDED {
886            spec_id = self.last_added_spec_id.ok_or_else(|| {
887                Error::new(
888                    ErrorKind::DataInvalid,
889                    "Cannot set default partition spec to last added spec: no spec has been added.",
890                )
891            })?;
892        }
893
894        if self.metadata.default_spec.spec_id() == spec_id {
895            return Ok(self);
896        }
897
898        if !self.metadata.partition_specs.contains_key(&spec_id) {
899            return Err(Error::new(
900                ErrorKind::DataInvalid,
901                format!("Cannot set default partition spec to unknown spec with id: '{spec_id}'",),
902            ));
903        }
904
905        let schemaless_spec = self
906            .metadata
907            .partition_specs
908            .get(&spec_id)
909            .ok_or_else(|| {
910                Error::new(
911                    ErrorKind::DataInvalid,
912                    format!(
913                        "Cannot set default partition spec to unknown spec with id: '{spec_id}'",
914                    ),
915                )
916            })?
917            .clone();
918        let spec = Arc::unwrap_or_clone(schemaless_spec);
919        let spec_type = spec.partition_type(self.get_current_schema()?)?;
920        self.metadata.default_spec = Arc::new(spec);
921        self.metadata.default_partition_type = spec_type;
922
923        if self.last_added_spec_id == Some(spec_id) {
924            self.changes.push(TableUpdate::SetDefaultSpec {
925                spec_id: Self::LAST_ADDED,
926            });
927        } else {
928            self.changes.push(TableUpdate::SetDefaultSpec { spec_id });
929        }
930
931        Ok(self)
932    }
933
934    /// Add a partition spec and set it as the default
935    pub fn add_default_partition_spec(self, unbound_spec: UnboundPartitionSpec) -> Result<Self> {
936        self.add_partition_spec(unbound_spec)?
937            .set_default_partition_spec(Self::LAST_ADDED)
938    }
939
940    /// Remove partition specs by their ids from the table metadata.
941    /// Does nothing if a spec id is not present. Active partition specs
942    /// should not be removed.
943    ///
944    /// # Errors
945    /// - Cannot remove the default partition spec.
946    pub fn remove_partition_specs(mut self, spec_ids: &[i32]) -> Result<Self> {
947        if spec_ids.contains(&self.metadata.default_spec.spec_id()) {
948            return Err(Error::new(
949                ErrorKind::DataInvalid,
950                "Cannot remove default partition spec",
951            ));
952        }
953
954        let mut removed_specs = Vec::with_capacity(spec_ids.len());
955        spec_ids.iter().for_each(|id| {
956            if self.metadata.partition_specs.remove(id).is_some() {
957                removed_specs.push(*id);
958            }
959        });
960
961        if !removed_specs.is_empty() {
962            self.changes.push(TableUpdate::RemovePartitionSpecs {
963                spec_ids: removed_specs,
964            });
965        }
966
967        Ok(self)
968    }
969
970    /// Add a sort order to the table metadata.
971    ///
972    /// The spec is bound eagerly to the current schema and must be valid for it.
973    /// If a schema is added in the same set of changes, the schema should be added first.
974    ///
975    /// Even if `sort_order.order_id` is provided, it may not be used.
976    ///
977    /// # Errors
978    /// - Sort order id to add already exists.
979    /// - Sort order is incompatible with the current schema.
980    pub fn add_sort_order(mut self, sort_order: SortOrder) -> Result<Self> {
981        let new_order_id = self.reuse_or_create_new_sort_id(&sort_order);
982        let sort_order_found = self.metadata.sort_orders.contains_key(&new_order_id);
983
984        if sort_order_found {
985            if self.last_added_order_id != Some(new_order_id) {
986                self.changes.push(TableUpdate::AddSortOrder {
987                    sort_order: sort_order.clone().with_order_id(new_order_id),
988                });
989                self.last_added_order_id = Some(new_order_id);
990            }
991
992            return Ok(self);
993        }
994
995        let schema = self.get_current_schema()?.clone().as_ref().clone();
996        let sort_order = SortOrder::builder()
997            .with_order_id(new_order_id)
998            .with_fields(sort_order.fields)
999            .build(&schema)
1000            .map_err(|e| {
1001                Error::new(
1002                    ErrorKind::DataInvalid,
1003                    format!("Sort order to add is incompatible with current schema: {e}"),
1004                )
1005                .with_source(e)
1006            })?;
1007
1008        self.last_added_order_id = Some(new_order_id);
1009        self.metadata
1010            .sort_orders
1011            .insert(new_order_id, sort_order.clone().into());
1012        self.changes.push(TableUpdate::AddSortOrder { sort_order });
1013
1014        Ok(self)
1015    }
1016
1017    /// Set the default sort order. If `sort_order_id` is -1, the last added sort order is set as default.
1018    ///
1019    /// # Errors
1020    /// - sort_order_id is -1 but no sort order has been added via `add_sort_order`.
1021    /// - No sort order with the provided `sort_order_id` exists.
1022    pub fn set_default_sort_order(mut self, mut sort_order_id: i64) -> Result<Self> {
1023        if sort_order_id == Self::LAST_ADDED as i64 {
1024            sort_order_id = self.last_added_order_id.ok_or_else(|| {
1025                Error::new(
1026                    ErrorKind::DataInvalid,
1027                    "Cannot set default sort order to last added order: no order has been added.",
1028                )
1029            })?;
1030        }
1031
1032        if self.metadata.default_sort_order_id == sort_order_id {
1033            return Ok(self);
1034        }
1035
1036        if !self.metadata.sort_orders.contains_key(&sort_order_id) {
1037            return Err(Error::new(
1038                ErrorKind::DataInvalid,
1039                format!(
1040                    "Cannot set default sort order to unknown order with id: '{sort_order_id}'"
1041                ),
1042            ));
1043        }
1044
1045        self.metadata.default_sort_order_id = sort_order_id;
1046
1047        if self.last_added_order_id == Some(sort_order_id) {
1048            self.changes.push(TableUpdate::SetDefaultSortOrder {
1049                sort_order_id: Self::LAST_ADDED as i64,
1050            });
1051        } else {
1052            self.changes
1053                .push(TableUpdate::SetDefaultSortOrder { sort_order_id });
1054        }
1055
1056        Ok(self)
1057    }
1058
1059    /// Add a sort order and set it as the default
1060    fn add_default_sort_order(self, sort_order: SortOrder) -> Result<Self> {
1061        self.add_sort_order(sort_order)?
1062            .set_default_sort_order(Self::LAST_ADDED as i64)
1063    }
1064
1065    /// Add an encryption key to the table metadata.
1066    pub fn add_encryption_key(mut self, key: EncryptedKey) -> Self {
1067        let key_id = key.key_id().to_string();
1068        if self.metadata.encryption_keys.contains_key(&key_id) {
1069            // already exists
1070            return self;
1071        }
1072
1073        self.metadata.encryption_keys.insert(key_id, key.clone());
1074        self.changes.push(TableUpdate::AddEncryptionKey {
1075            encryption_key: key,
1076        });
1077        self
1078    }
1079
1080    /// Remove an encryption key from the table metadata.
1081    pub fn remove_encryption_key(mut self, key_id: &str) -> Self {
1082        if self.metadata.encryption_keys.remove(key_id).is_some() {
1083            self.changes.push(TableUpdate::RemoveEncryptionKey {
1084                key_id: key_id.to_string(),
1085            });
1086        }
1087        self
1088    }
1089
1090    /// Build the table metadata.
1091    pub fn build(mut self) -> Result<TableMetadataBuildResult> {
1092        self.metadata.last_updated_ms = self
1093            .last_updated_ms
1094            .unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
1095
1096        // Check compatibility of the current schema to the default partition spec and sort order.
1097        // We use the `get_xxx` methods from the builder to avoid using the panicking
1098        // `TableMetadata.default_partition_spec` etc. methods.
1099        let schema = self.get_current_schema()?.clone();
1100        let sort_order = Arc::unwrap_or_clone(self.get_default_sort_order()?);
1101
1102        self.metadata.default_spec = Arc::new(
1103            Arc::unwrap_or_clone(self.metadata.default_spec)
1104                .into_unbound()
1105                .bind(schema.clone())?,
1106        );
1107        self.metadata.default_partition_type =
1108            self.metadata.default_spec.partition_type(&schema)?;
1109        SortOrder::builder()
1110            .with_fields(sort_order.fields)
1111            .build(&schema)?;
1112
1113        self.update_snapshot_log()?;
1114        self.metadata.try_normalize()?;
1115
1116        if let Some(hist_entry) = self.previous_history_entry.take() {
1117            self.metadata.metadata_log.push(hist_entry);
1118        }
1119        let expired_metadata_logs = self.expire_metadata_log();
1120
1121        Ok(TableMetadataBuildResult {
1122            metadata: self.metadata,
1123            changes: self.changes,
1124            expired_metadata_logs,
1125        })
1126    }
1127
1128    fn expire_metadata_log(&mut self) -> Vec<MetadataLog> {
1129        let max_size = self
1130            .metadata
1131            .properties
1132            .get(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX)
1133            .and_then(|v| v.parse::<usize>().ok())
1134            .unwrap_or(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)
1135            .max(1);
1136
1137        if self.metadata.metadata_log.len() > max_size {
1138            self.metadata
1139                .metadata_log
1140                .drain(0..self.metadata.metadata_log.len() - max_size)
1141                .collect()
1142        } else {
1143            Vec::new()
1144        }
1145    }
1146
1147    fn update_snapshot_log(&mut self) -> Result<()> {
1148        let intermediate_snapshots = self.get_intermediate_snapshots();
1149        let has_removed_snapshots = self
1150            .changes
1151            .iter()
1152            .any(|update| matches!(update, TableUpdate::RemoveSnapshots { .. }));
1153
1154        if intermediate_snapshots.is_empty() && !has_removed_snapshots {
1155            return Ok(());
1156        }
1157
1158        let mut new_snapshot_log = Vec::new();
1159        for log_entry in &self.metadata.snapshot_log {
1160            let snapshot_id = log_entry.snapshot_id;
1161            if self.metadata.snapshots.contains_key(&snapshot_id) {
1162                if !intermediate_snapshots.contains(&snapshot_id) {
1163                    new_snapshot_log.push(log_entry.clone());
1164                }
1165            } else if has_removed_snapshots {
1166                // any invalid entry causes the history before it to be removed. otherwise, there could be
1167                // history gaps that cause time-travel queries to produce incorrect results. for example,
1168                // if history is [(t1, s1), (t2, s2), (t3, s3)] and s2 is removed, the history cannot be
1169                // [(t1, s1), (t3, s3)] because it appears that s3 was current during the time between t2
1170                // and t3 when in fact s2 was the current snapshot.
1171                new_snapshot_log.clear();
1172            }
1173        }
1174
1175        if let Some(current_snapshot_id) = self.metadata.current_snapshot_id {
1176            let last_id = new_snapshot_log.last().map(|entry| entry.snapshot_id);
1177            if last_id != Some(current_snapshot_id) {
1178                return Err(Error::new(
1179                    ErrorKind::DataInvalid,
1180                    "Cannot set invalid snapshot log: latest entry is not the current snapshot",
1181                ));
1182            }
1183        };
1184
1185        self.metadata.snapshot_log = new_snapshot_log;
1186        Ok(())
1187    }
1188
1189    /// Finds intermediate snapshots that have not been committed as the current snapshot.
1190    ///
1191    /// Transactions can create snapshots that are never the current snapshot because several
1192    /// changes are combined by the transaction into one table metadata update. when each
1193    /// intermediate snapshot is added to table metadata, it is added to the snapshot log, assuming
1194    /// that it will be the current snapshot. when there are multiple snapshot updates, the log must
1195    /// be corrected by suppressing the intermediate snapshot entries.
1196    ///     
1197    /// A snapshot is an intermediate snapshot if it was added but is not the current snapshot.
1198    fn get_intermediate_snapshots(&self) -> HashSet<i64> {
1199        let added_snapshot_ids = self
1200            .changes
1201            .iter()
1202            .filter_map(|update| match update {
1203                TableUpdate::AddSnapshot { snapshot } => Some(snapshot.snapshot_id()),
1204                _ => None,
1205            })
1206            .collect::<HashSet<_>>();
1207
1208        self.changes
1209            .iter()
1210            .filter_map(|update| match update {
1211                TableUpdate::SetSnapshotRef {
1212                    ref_name,
1213                    reference,
1214                } => {
1215                    if added_snapshot_ids.contains(&reference.snapshot_id)
1216                        && ref_name == MAIN_BRANCH
1217                        && reference.snapshot_id
1218                            != self
1219                                .metadata
1220                                .current_snapshot_id
1221                                .unwrap_or(i64::from(Self::LAST_ADDED))
1222                    {
1223                        Some(reference.snapshot_id)
1224                    } else {
1225                        None
1226                    }
1227                }
1228                _ => None,
1229            })
1230            .collect()
1231    }
1232
1233    fn reassign_ids(
1234        schema: Schema,
1235        spec: UnboundPartitionSpec,
1236        sort_order: SortOrder,
1237    ) -> Result<(Schema, PartitionSpec, SortOrder)> {
1238        // Re-assign field ids and schema ids for a new table.
1239        let previous_id_to_name = schema.field_id_to_name_map().clone();
1240        let fresh_schema = schema
1241            .into_builder()
1242            .with_schema_id(DEFAULT_SCHEMA_ID)
1243            .with_reassigned_field_ids(FIRST_FIELD_ID)
1244            .build()?;
1245
1246        // Re-build partition spec with new ids
1247        let mut fresh_spec = PartitionSpecBuilder::new(fresh_schema.clone());
1248        for field in spec.fields() {
1249            let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| {
1250                Error::new(
1251                    ErrorKind::DataInvalid,
1252                    format!(
1253                        "Cannot find source column with id {} for partition column {} in schema.",
1254                        field.source_id, field.name
1255                    ),
1256                )
1257            })?;
1258            fresh_spec =
1259                fresh_spec.add_partition_field(source_field_name, &field.name, field.transform)?;
1260        }
1261        let fresh_spec = fresh_spec.build()?;
1262
1263        // Re-build sort order with new ids
1264        let mut fresh_order = SortOrder::builder();
1265        for mut field in sort_order.fields {
1266            let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| {
1267                Error::new(
1268                    ErrorKind::DataInvalid,
1269                    format!(
1270                        "Cannot find source column with id {} for sort column in schema.",
1271                        field.source_id
1272                    ),
1273                )
1274            })?;
1275            let new_field_id = fresh_schema
1276                       .field_by_name(source_field_name)
1277                       .ok_or_else(|| {
1278                           Error::new(
1279                               ErrorKind::Unexpected,
1280                               format!(
1281                                   "Cannot find source column with name {source_field_name} for sort column in re-assigned schema."
1282                               ),
1283                           )
1284                       })?.id;
1285            field.source_id = new_field_id;
1286            fresh_order.with_sort_field(field);
1287        }
1288        let fresh_sort_order = fresh_order.build(&fresh_schema)?;
1289
1290        Ok((fresh_schema, fresh_spec, fresh_sort_order))
1291    }
1292
1293    fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> i32 {
1294        self.metadata
1295            .schemas
1296            .iter()
1297            .find_map(|(id, schema)| new_schema.is_same_schema(schema).then_some(*id))
1298            .unwrap_or_else(|| self.get_highest_schema_id() + 1)
1299    }
1300
1301    fn get_highest_schema_id(&self) -> i32 {
1302        *self
1303            .metadata
1304            .schemas
1305            .keys()
1306            .max()
1307            .unwrap_or(&self.metadata.current_schema_id)
1308    }
1309
1310    fn get_current_schema(&self) -> Result<&SchemaRef> {
1311        self.metadata
1312            .schemas
1313            .get(&self.metadata.current_schema_id)
1314            .ok_or_else(|| {
1315                Error::new(
1316                    ErrorKind::DataInvalid,
1317                    format!(
1318                        "Current schema with id '{}' not found in table metadata.",
1319                        self.metadata.current_schema_id
1320                    ),
1321                )
1322            })
1323    }
1324
1325    fn get_default_sort_order(&self) -> Result<SortOrderRef> {
1326        self.metadata
1327            .sort_orders
1328            .get(&self.metadata.default_sort_order_id)
1329            .cloned()
1330            .ok_or_else(|| {
1331                Error::new(
1332                    ErrorKind::DataInvalid,
1333                    format!(
1334                        "Default sort order with id '{}' not found in table metadata.",
1335                        self.metadata.default_sort_order_id
1336                    ),
1337                )
1338            })
1339    }
1340
1341    /// If a compatible spec already exists, use the same ID. Otherwise, use 1 more than the highest ID.
1342    fn reuse_or_create_new_spec_id(&self, new_spec: &PartitionSpec) -> i32 {
1343        self.metadata
1344            .partition_specs
1345            .iter()
1346            .find_map(|(id, old_spec)| new_spec.is_compatible_with(old_spec).then_some(*id))
1347            .unwrap_or_else(|| {
1348                self.get_highest_spec_id()
1349                    .map(|id| id + 1)
1350                    .unwrap_or(DEFAULT_PARTITION_SPEC_ID)
1351            })
1352    }
1353
1354    fn get_highest_spec_id(&self) -> Option<i32> {
1355        self.metadata.partition_specs.keys().max().copied()
1356    }
1357
1358    /// If a compatible sort-order already exists, use the same ID. Otherwise, use 1 more than the highest ID.
1359    fn reuse_or_create_new_sort_id(&self, new_sort_order: &SortOrder) -> i64 {
1360        if new_sort_order.is_unsorted() {
1361            return SortOrder::unsorted_order().order_id;
1362        }
1363
1364        self.metadata
1365            .sort_orders
1366            .iter()
1367            .find_map(|(id, sort_order)| {
1368                sort_order.fields.eq(&new_sort_order.fields).then_some(*id)
1369            })
1370            .unwrap_or_else(|| {
1371                self.highest_sort_order_id()
1372                    .unwrap_or(SortOrder::unsorted_order().order_id)
1373                    + 1
1374            })
1375    }
1376
1377    fn highest_sort_order_id(&self) -> Option<i64> {
1378        self.metadata.sort_orders.keys().max().copied()
1379    }
1380
1381    /// Remove schemas by their ids from the table metadata.
1382    /// Does nothing if a schema id is not present. Active schemas should not be removed.
1383    pub fn remove_schemas(mut self, schema_id_to_remove: &[i32]) -> Result<Self> {
1384        if schema_id_to_remove.contains(&self.metadata.current_schema_id) {
1385            return Err(Error::new(
1386                ErrorKind::DataInvalid,
1387                "Cannot remove current schema",
1388            ));
1389        }
1390
1391        if schema_id_to_remove.is_empty() {
1392            return Ok(self);
1393        }
1394
1395        let mut removed_schemas = Vec::with_capacity(schema_id_to_remove.len());
1396        self.metadata.schemas.retain(|id, _schema| {
1397            if schema_id_to_remove.contains(id) {
1398                removed_schemas.push(*id);
1399                false
1400            } else {
1401                true
1402            }
1403        });
1404
1405        self.changes.push(TableUpdate::RemoveSchemas {
1406            schema_ids: removed_schemas,
1407        });
1408
1409        Ok(self)
1410    }
1411}
1412
1413impl From<TableMetadataBuildResult> for TableMetadata {
1414    fn from(result: TableMetadataBuildResult) -> Self {
1415        result.metadata
1416    }
1417}
1418
1419#[cfg(test)]
1420mod tests {
1421    use std::fs::File;
1422    use std::io::BufReader;
1423    use std::thread::sleep;
1424
1425    use super::*;
1426    use crate::TableIdent;
1427    use crate::io::FileIOBuilder;
1428    use crate::spec::{
1429        BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec, PrimitiveType, Schema,
1430        SnapshotRetention, SortDirection, SortField, StructType, Summary, TableProperties,
1431        Transform, Type, UnboundPartitionField,
1432    };
1433    use crate::table::Table;
1434
1435    const TEST_LOCATION: &str = "s3://bucket/test/location";
1436    const LAST_ASSIGNED_COLUMN_ID: i32 = 3;
1437
1438    fn schema() -> Schema {
1439        Schema::builder()
1440            .with_fields(vec![
1441                NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1442                NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
1443                NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
1444            ])
1445            .build()
1446            .unwrap()
1447    }
1448
1449    fn sort_order() -> SortOrder {
1450        let schema = schema();
1451        SortOrder::builder()
1452            .with_order_id(1)
1453            .with_sort_field(SortField {
1454                source_id: 3,
1455                transform: Transform::Bucket(4),
1456                direction: SortDirection::Descending,
1457                null_order: NullOrder::First,
1458            })
1459            .build(&schema)
1460            .unwrap()
1461    }
1462
1463    fn partition_spec() -> UnboundPartitionSpec {
1464        UnboundPartitionSpec::builder()
1465            .with_spec_id(0)
1466            .add_partition_field(2, "y", Transform::Identity)
1467            .unwrap()
1468            .build()
1469    }
1470
1471    fn builder_without_changes(format_version: FormatVersion) -> TableMetadataBuilder {
1472        TableMetadataBuilder::new(
1473            schema(),
1474            partition_spec(),
1475            sort_order(),
1476            TEST_LOCATION.to_string(),
1477            format_version,
1478            HashMap::new(),
1479        )
1480        .unwrap()
1481        .build()
1482        .unwrap()
1483        .metadata
1484        .into_builder(Some(
1485            "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1486        ))
1487    }
1488
1489    #[test]
1490    fn test_minimal_build() {
1491        let metadata = TableMetadataBuilder::new(
1492            schema(),
1493            partition_spec(),
1494            sort_order(),
1495            TEST_LOCATION.to_string(),
1496            FormatVersion::V1,
1497            HashMap::new(),
1498        )
1499        .unwrap()
1500        .build()
1501        .unwrap()
1502        .metadata;
1503
1504        assert_eq!(metadata.format_version, FormatVersion::V1);
1505        assert_eq!(metadata.location, TEST_LOCATION);
1506        assert_eq!(metadata.current_schema_id, 0);
1507        assert_eq!(metadata.default_spec.spec_id(), 0);
1508        assert_eq!(metadata.default_sort_order_id, 1);
1509        assert_eq!(metadata.last_partition_id, 1000);
1510        assert_eq!(metadata.last_column_id, 3);
1511        assert_eq!(metadata.snapshots.len(), 0);
1512        assert_eq!(metadata.current_snapshot_id, None);
1513        assert_eq!(metadata.refs.len(), 0);
1514        assert_eq!(metadata.properties.len(), 0);
1515        assert_eq!(metadata.metadata_log.len(), 0);
1516        assert_eq!(metadata.last_sequence_number, 0);
1517        assert_eq!(metadata.last_column_id, LAST_ASSIGNED_COLUMN_ID);
1518
1519        // Test can serialize v1
1520        let _ = serde_json::to_string(&metadata).unwrap();
1521
1522        // Test can serialize v2
1523        let metadata = metadata
1524            .into_builder(Some(
1525                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1526            ))
1527            .upgrade_format_version(FormatVersion::V2)
1528            .unwrap()
1529            .build()
1530            .unwrap()
1531            .metadata;
1532
1533        assert_eq!(metadata.format_version, FormatVersion::V2);
1534        let _ = serde_json::to_string(&metadata).unwrap();
1535    }
1536
1537    #[test]
1538    fn test_build_unpartitioned_unsorted() {
1539        let schema = Schema::builder().build().unwrap();
1540        let metadata = TableMetadataBuilder::new(
1541            schema.clone(),
1542            PartitionSpec::unpartition_spec(),
1543            SortOrder::unsorted_order(),
1544            TEST_LOCATION.to_string(),
1545            FormatVersion::V2,
1546            HashMap::new(),
1547        )
1548        .unwrap()
1549        .build()
1550        .unwrap()
1551        .metadata;
1552
1553        assert_eq!(metadata.format_version, FormatVersion::V2);
1554        assert_eq!(metadata.location, TEST_LOCATION);
1555        assert_eq!(metadata.current_schema_id, 0);
1556        assert_eq!(metadata.default_spec.spec_id(), 0);
1557        assert_eq!(metadata.default_sort_order_id, 0);
1558        assert_eq!(metadata.last_partition_id, UNPARTITIONED_LAST_ASSIGNED_ID);
1559        assert_eq!(metadata.last_column_id, 0);
1560        assert_eq!(metadata.snapshots.len(), 0);
1561        assert_eq!(metadata.current_snapshot_id, None);
1562        assert_eq!(metadata.refs.len(), 0);
1563        assert_eq!(metadata.properties.len(), 0);
1564        assert_eq!(metadata.metadata_log.len(), 0);
1565        assert_eq!(metadata.last_sequence_number, 0);
1566    }
1567
1568    #[test]
1569    fn test_reassigns_ids() {
1570        let schema = Schema::builder()
1571            .with_schema_id(10)
1572            .with_fields(vec![
1573                NestedField::required(11, "a", Type::Primitive(PrimitiveType::Long)).into(),
1574                NestedField::required(12, "b", Type::Primitive(PrimitiveType::Long)).into(),
1575                NestedField::required(
1576                    13,
1577                    "struct",
1578                    Type::Struct(StructType::new(vec![
1579                        NestedField::required(14, "nested", Type::Primitive(PrimitiveType::Long))
1580                            .into(),
1581                    ])),
1582                )
1583                .into(),
1584                NestedField::required(15, "c", Type::Primitive(PrimitiveType::Long)).into(),
1585            ])
1586            .build()
1587            .unwrap();
1588        let spec = PartitionSpec::builder(schema.clone())
1589            .with_spec_id(20)
1590            .add_partition_field("a", "a", Transform::Identity)
1591            .unwrap()
1592            .add_partition_field("struct.nested", "nested_partition", Transform::Identity)
1593            .unwrap()
1594            .build()
1595            .unwrap();
1596        let sort_order = SortOrder::builder()
1597            .with_fields(vec![SortField {
1598                source_id: 11,
1599                transform: Transform::Identity,
1600                direction: SortDirection::Ascending,
1601                null_order: NullOrder::First,
1602            }])
1603            .with_order_id(10)
1604            .build(&schema)
1605            .unwrap();
1606
1607        let (fresh_schema, fresh_spec, fresh_sort_order) =
1608            TableMetadataBuilder::reassign_ids(schema, spec.into_unbound(), sort_order).unwrap();
1609
1610        let expected_schema = Schema::builder()
1611            .with_fields(vec![
1612                NestedField::required(1, "a", Type::Primitive(PrimitiveType::Long)).into(),
1613                NestedField::required(2, "b", Type::Primitive(PrimitiveType::Long)).into(),
1614                NestedField::required(
1615                    3,
1616                    "struct",
1617                    Type::Struct(StructType::new(vec![
1618                        NestedField::required(5, "nested", Type::Primitive(PrimitiveType::Long))
1619                            .into(),
1620                    ])),
1621                )
1622                .into(),
1623                NestedField::required(4, "c", Type::Primitive(PrimitiveType::Long)).into(),
1624            ])
1625            .build()
1626            .unwrap();
1627
1628        let expected_spec = PartitionSpec::builder(expected_schema.clone())
1629            .with_spec_id(0)
1630            .add_partition_field("a", "a", Transform::Identity)
1631            .unwrap()
1632            .add_partition_field("struct.nested", "nested_partition", Transform::Identity)
1633            .unwrap()
1634            .build()
1635            .unwrap();
1636
1637        let expected_sort_order = SortOrder::builder()
1638            .with_fields(vec![SortField {
1639                source_id: 1,
1640                transform: Transform::Identity,
1641                direction: SortDirection::Ascending,
1642                null_order: NullOrder::First,
1643            }])
1644            .with_order_id(1)
1645            .build(&expected_schema)
1646            .unwrap();
1647
1648        assert_eq!(fresh_schema, expected_schema);
1649        assert_eq!(fresh_spec, expected_spec);
1650        assert_eq!(fresh_sort_order, expected_sort_order);
1651    }
1652
1653    #[test]
1654    fn test_ids_are_reassigned_for_new_metadata() {
1655        let schema = schema().into_builder().with_schema_id(10).build().unwrap();
1656
1657        let metadata = TableMetadataBuilder::new(
1658            schema,
1659            partition_spec(),
1660            sort_order(),
1661            TEST_LOCATION.to_string(),
1662            FormatVersion::V1,
1663            HashMap::new(),
1664        )
1665        .unwrap()
1666        .build()
1667        .unwrap()
1668        .metadata;
1669
1670        assert_eq!(metadata.current_schema_id, 0);
1671        assert_eq!(metadata.current_schema().schema_id(), 0);
1672    }
1673
1674    #[test]
1675    fn test_new_metadata_changes() {
1676        let changes = TableMetadataBuilder::new(
1677            schema(),
1678            partition_spec(),
1679            sort_order(),
1680            TEST_LOCATION.to_string(),
1681            FormatVersion::V1,
1682            HashMap::from_iter(vec![("property 1".to_string(), "value 1".to_string())]),
1683        )
1684        .unwrap()
1685        .build()
1686        .unwrap()
1687        .changes;
1688
1689        pretty_assertions::assert_eq!(changes, vec![
1690            TableUpdate::SetLocation {
1691                location: TEST_LOCATION.to_string()
1692            },
1693            TableUpdate::AddSchema { schema: schema() },
1694            TableUpdate::SetCurrentSchema { schema_id: -1 },
1695            TableUpdate::AddSpec {
1696                // Because this is a new tables, field-ids are assigned
1697                // partition_spec() has None set for field-id
1698                spec: PartitionSpec::builder(schema())
1699                    .with_spec_id(0)
1700                    .add_unbound_field(UnboundPartitionField {
1701                        name: "y".to_string(),
1702                        transform: Transform::Identity,
1703                        source_id: 2,
1704                        field_id: Some(1000)
1705                    })
1706                    .unwrap()
1707                    .build()
1708                    .unwrap()
1709                    .into_unbound(),
1710            },
1711            TableUpdate::SetDefaultSpec { spec_id: -1 },
1712            TableUpdate::AddSortOrder {
1713                sort_order: sort_order(),
1714            },
1715            TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
1716            TableUpdate::SetProperties {
1717                updates: HashMap::from_iter(vec![(
1718                    "property 1".to_string(),
1719                    "value 1".to_string()
1720                )]),
1721            }
1722        ]);
1723    }
1724
1725    #[test]
1726    fn test_new_metadata_changes_unpartitioned_unsorted() {
1727        let schema = Schema::builder().build().unwrap();
1728        let changes = TableMetadataBuilder::new(
1729            schema.clone(),
1730            PartitionSpec::unpartition_spec().into_unbound(),
1731            SortOrder::unsorted_order(),
1732            TEST_LOCATION.to_string(),
1733            FormatVersion::V1,
1734            HashMap::new(),
1735        )
1736        .unwrap()
1737        .build()
1738        .unwrap()
1739        .changes;
1740
1741        pretty_assertions::assert_eq!(changes, vec![
1742            TableUpdate::SetLocation {
1743                location: TEST_LOCATION.to_string()
1744            },
1745            TableUpdate::AddSchema {
1746                schema: Schema::builder().build().unwrap(),
1747            },
1748            TableUpdate::SetCurrentSchema { schema_id: -1 },
1749            TableUpdate::AddSpec {
1750                // Because this is a new tables, field-ids are assigned
1751                // partition_spec() has None set for field-id
1752                spec: PartitionSpec::builder(schema)
1753                    .with_spec_id(0)
1754                    .build()
1755                    .unwrap()
1756                    .into_unbound(),
1757            },
1758            TableUpdate::SetDefaultSpec { spec_id: -1 },
1759            TableUpdate::AddSortOrder {
1760                sort_order: SortOrder::unsorted_order(),
1761            },
1762            TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
1763        ]);
1764    }
1765
1766    #[test]
1767    fn test_add_partition_spec() {
1768        let builder = builder_without_changes(FormatVersion::V2);
1769
1770        let added_spec = UnboundPartitionSpec::builder()
1771            .with_spec_id(10)
1772            .add_partition_fields(vec![
1773                UnboundPartitionField {
1774                    // The previous field - has field_id set
1775                    name: "y".to_string(),
1776                    transform: Transform::Identity,
1777                    source_id: 2,
1778                    field_id: Some(1000),
1779                },
1780                UnboundPartitionField {
1781                    // A new field without field id - should still be without field id in changes
1782                    name: "z".to_string(),
1783                    transform: Transform::Identity,
1784                    source_id: 3,
1785                    field_id: None,
1786                },
1787            ])
1788            .unwrap()
1789            .build();
1790
1791        let build_result = builder
1792            .add_partition_spec(added_spec.clone())
1793            .unwrap()
1794            .build()
1795            .unwrap();
1796
1797        // Spec id should be re-assigned
1798        let expected_change = added_spec.with_spec_id(1);
1799        let expected_spec = PartitionSpec::builder(schema())
1800            .with_spec_id(1)
1801            .add_unbound_field(UnboundPartitionField {
1802                name: "y".to_string(),
1803                transform: Transform::Identity,
1804                source_id: 2,
1805                field_id: Some(1000),
1806            })
1807            .unwrap()
1808            .add_unbound_field(UnboundPartitionField {
1809                name: "z".to_string(),
1810                transform: Transform::Identity,
1811                source_id: 3,
1812                field_id: Some(1001),
1813            })
1814            .unwrap()
1815            .build()
1816            .unwrap();
1817
1818        assert_eq!(build_result.changes.len(), 1);
1819        assert_eq!(
1820            build_result.metadata.partition_spec_by_id(1),
1821            Some(&Arc::new(expected_spec))
1822        );
1823        assert_eq!(build_result.metadata.default_spec.spec_id(), 0);
1824        assert_eq!(build_result.metadata.last_partition_id, 1001);
1825        pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSpec {
1826            spec: expected_change
1827        });
1828
1829        // Remove the spec
1830        let build_result = build_result
1831            .metadata
1832            .into_builder(Some(
1833                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1834            ))
1835            .remove_partition_specs(&[1])
1836            .unwrap()
1837            .build()
1838            .unwrap();
1839
1840        assert_eq!(build_result.changes.len(), 1);
1841        assert_eq!(build_result.metadata.partition_specs.len(), 1);
1842        assert!(build_result.metadata.partition_spec_by_id(1).is_none());
1843    }
1844
1845    #[test]
1846    fn test_set_default_partition_spec() {
1847        let builder = builder_without_changes(FormatVersion::V2);
1848        let schema = builder.get_current_schema().unwrap().clone();
1849        let added_spec = UnboundPartitionSpec::builder()
1850            .with_spec_id(10)
1851            .add_partition_field(1, "y_bucket[2]", Transform::Bucket(2))
1852            .unwrap()
1853            .build();
1854
1855        let build_result = builder
1856            .add_partition_spec(added_spec.clone())
1857            .unwrap()
1858            .set_default_partition_spec(-1)
1859            .unwrap()
1860            .build()
1861            .unwrap();
1862
1863        let expected_spec = PartitionSpec::builder(schema)
1864            .with_spec_id(1)
1865            .add_unbound_field(UnboundPartitionField {
1866                name: "y_bucket[2]".to_string(),
1867                transform: Transform::Bucket(2),
1868                source_id: 1,
1869                field_id: Some(1001),
1870            })
1871            .unwrap()
1872            .build()
1873            .unwrap();
1874
1875        assert_eq!(build_result.changes.len(), 2);
1876        assert_eq!(build_result.metadata.default_spec, Arc::new(expected_spec));
1877        assert_eq!(build_result.changes, vec![
1878            TableUpdate::AddSpec {
1879                // Should contain the actual ID that was used
1880                spec: added_spec.with_spec_id(1)
1881            },
1882            TableUpdate::SetDefaultSpec { spec_id: -1 }
1883        ]);
1884    }
1885
1886    #[test]
1887    fn test_set_existing_default_partition_spec() {
1888        let builder = builder_without_changes(FormatVersion::V2);
1889        // Add and set an unbound spec as current
1890        let unbound_spec = UnboundPartitionSpec::builder().with_spec_id(1).build();
1891        let build_result = builder
1892            .add_partition_spec(unbound_spec.clone())
1893            .unwrap()
1894            .set_default_partition_spec(-1)
1895            .unwrap()
1896            .build()
1897            .unwrap();
1898
1899        assert_eq!(build_result.changes.len(), 2);
1900        assert_eq!(build_result.changes[0], TableUpdate::AddSpec {
1901            spec: unbound_spec.clone()
1902        });
1903        assert_eq!(build_result.changes[1], TableUpdate::SetDefaultSpec {
1904            spec_id: -1
1905        });
1906        assert_eq!(
1907            build_result.metadata.default_spec,
1908            Arc::new(
1909                unbound_spec
1910                    .bind(build_result.metadata.current_schema().clone())
1911                    .unwrap()
1912            )
1913        );
1914
1915        // Set old spec again
1916        let build_result = build_result
1917            .metadata
1918            .into_builder(Some(
1919                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1920            ))
1921            .set_default_partition_spec(0)
1922            .unwrap()
1923            .build()
1924            .unwrap();
1925
1926        assert_eq!(build_result.changes.len(), 1);
1927        assert_eq!(build_result.changes[0], TableUpdate::SetDefaultSpec {
1928            spec_id: 0
1929        });
1930        assert_eq!(
1931            build_result.metadata.default_spec,
1932            Arc::new(
1933                partition_spec()
1934                    .bind(build_result.metadata.current_schema().clone())
1935                    .unwrap()
1936            )
1937        );
1938    }
1939
1940    #[test]
1941    fn test_add_sort_order() {
1942        let builder = builder_without_changes(FormatVersion::V2);
1943
1944        let added_sort_order = SortOrder::builder()
1945            .with_order_id(10)
1946            .with_fields(vec![SortField {
1947                source_id: 1,
1948                transform: Transform::Identity,
1949                direction: SortDirection::Ascending,
1950                null_order: NullOrder::First,
1951            }])
1952            .build(&schema())
1953            .unwrap();
1954
1955        let build_result = builder
1956            .add_sort_order(added_sort_order.clone())
1957            .unwrap()
1958            .build()
1959            .unwrap();
1960
1961        let expected_sort_order = added_sort_order.with_order_id(2);
1962
1963        assert_eq!(build_result.changes.len(), 1);
1964        assert_eq!(build_result.metadata.sort_orders.keys().max(), Some(&2));
1965        pretty_assertions::assert_eq!(
1966            build_result.metadata.sort_order_by_id(2),
1967            Some(&Arc::new(expected_sort_order.clone()))
1968        );
1969        pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSortOrder {
1970            sort_order: expected_sort_order
1971        });
1972    }
1973
1974    #[test]
1975    fn test_add_compatible_schema() {
1976        let builder = builder_without_changes(FormatVersion::V2);
1977
1978        let added_schema = Schema::builder()
1979            .with_schema_id(1)
1980            .with_fields(vec![
1981                NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1982                NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
1983                NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
1984                NestedField::required(4, "a", Type::Primitive(PrimitiveType::Long)).into(),
1985            ])
1986            .build()
1987            .unwrap();
1988
1989        let build_result = builder
1990            .add_current_schema(added_schema.clone())
1991            .unwrap()
1992            .build()
1993            .unwrap();
1994
1995        assert_eq!(build_result.changes.len(), 2);
1996        assert_eq!(build_result.metadata.schemas.keys().max(), Some(&1));
1997        pretty_assertions::assert_eq!(
1998            build_result.metadata.schema_by_id(1),
1999            Some(&Arc::new(added_schema.clone()))
2000        );
2001        pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSchema {
2002            schema: added_schema
2003        });
2004        assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
2005            schema_id: -1
2006        });
2007    }
2008
2009    #[test]
2010    fn test_set_current_schema_change_is_minus_one_if_schema_was_added_in_this_change() {
2011        let builder = builder_without_changes(FormatVersion::V2);
2012
2013        let added_schema = Schema::builder()
2014            .with_schema_id(1)
2015            .with_fields(vec![
2016                NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
2017                NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
2018                NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
2019                NestedField::required(4, "a", Type::Primitive(PrimitiveType::Long)).into(),
2020            ])
2021            .build()
2022            .unwrap();
2023
2024        let build_result = builder
2025            .add_schema(added_schema.clone())
2026            .unwrap()
2027            .set_current_schema(1)
2028            .unwrap()
2029            .build()
2030            .unwrap();
2031
2032        assert_eq!(build_result.changes.len(), 2);
2033        assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
2034            schema_id: -1
2035        });
2036    }
2037
2038    #[test]
2039    fn test_no_metadata_log_for_create_table() {
2040        let build_result = TableMetadataBuilder::new(
2041            schema(),
2042            partition_spec(),
2043            sort_order(),
2044            TEST_LOCATION.to_string(),
2045            FormatVersion::V2,
2046            HashMap::new(),
2047        )
2048        .unwrap()
2049        .build()
2050        .unwrap();
2051
2052        assert_eq!(build_result.metadata.metadata_log.len(), 0);
2053    }
2054
2055    #[test]
2056    fn test_no_metadata_log_entry_for_no_previous_location() {
2057        // Used for first commit after stage-creation of tables
2058        let metadata = builder_without_changes(FormatVersion::V2)
2059            .build()
2060            .unwrap()
2061            .metadata;
2062        assert_eq!(metadata.metadata_log.len(), 1);
2063
2064        let build_result = metadata
2065            .into_builder(None)
2066            .set_properties(HashMap::from_iter(vec![(
2067                "foo".to_string(),
2068                "bar".to_string(),
2069            )]))
2070            .unwrap()
2071            .build()
2072            .unwrap();
2073
2074        assert_eq!(build_result.metadata.metadata_log.len(), 1);
2075    }
2076
2077    #[test]
2078    fn test_from_metadata_generates_metadata_log() {
2079        let metadata_path = "s3://bucket/test/location/metadata/metadata1.json";
2080        let builder = TableMetadataBuilder::new(
2081            schema(),
2082            partition_spec(),
2083            sort_order(),
2084            TEST_LOCATION.to_string(),
2085            FormatVersion::V2,
2086            HashMap::new(),
2087        )
2088        .unwrap()
2089        .build()
2090        .unwrap()
2091        .metadata
2092        .into_builder(Some(metadata_path.to_string()));
2093
2094        let builder = builder
2095            .add_default_sort_order(SortOrder::unsorted_order())
2096            .unwrap();
2097
2098        let build_result = builder.build().unwrap();
2099
2100        assert_eq!(build_result.metadata.metadata_log.len(), 1);
2101        assert_eq!(
2102            build_result.metadata.metadata_log[0].metadata_file,
2103            metadata_path
2104        );
2105    }
2106
2107    #[test]
2108    fn test_set_ref() {
2109        let builder = builder_without_changes(FormatVersion::V2);
2110
2111        let snapshot = Snapshot::builder()
2112            .with_snapshot_id(1)
2113            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2114            .with_sequence_number(0)
2115            .with_schema_id(0)
2116            .with_manifest_list("/snap-1.avro")
2117            .with_summary(Summary {
2118                operation: Operation::Append,
2119                additional_properties: HashMap::from_iter(vec![
2120                    (
2121                        "spark.app.id".to_string(),
2122                        "local-1662532784305".to_string(),
2123                    ),
2124                    ("added-data-files".to_string(), "4".to_string()),
2125                    ("added-records".to_string(), "4".to_string()),
2126                    ("added-files-size".to_string(), "6001".to_string()),
2127                ]),
2128            })
2129            .build();
2130
2131        let builder = builder.add_snapshot(snapshot.clone()).unwrap();
2132
2133        assert!(
2134            builder
2135                .clone()
2136                .set_ref(MAIN_BRANCH, SnapshotReference {
2137                    snapshot_id: 10,
2138                    retention: SnapshotRetention::Branch {
2139                        min_snapshots_to_keep: Some(10),
2140                        max_snapshot_age_ms: None,
2141                        max_ref_age_ms: None,
2142                    },
2143                })
2144                .unwrap_err()
2145                .to_string()
2146                .contains("Cannot set 'main' to unknown snapshot: '10'")
2147        );
2148
2149        let build_result = builder
2150            .set_ref(MAIN_BRANCH, SnapshotReference {
2151                snapshot_id: 1,
2152                retention: SnapshotRetention::Branch {
2153                    min_snapshots_to_keep: Some(10),
2154                    max_snapshot_age_ms: None,
2155                    max_ref_age_ms: None,
2156                },
2157            })
2158            .unwrap()
2159            .build()
2160            .unwrap();
2161        assert_eq!(build_result.metadata.snapshots.len(), 1);
2162        assert_eq!(
2163            build_result.metadata.snapshot_by_id(1),
2164            Some(&Arc::new(snapshot.clone()))
2165        );
2166        assert_eq!(build_result.metadata.snapshot_log, vec![SnapshotLog {
2167            snapshot_id: 1,
2168            timestamp_ms: snapshot.timestamp_ms()
2169        }])
2170    }
2171
2172    #[test]
2173    fn test_snapshot_log_skips_intermediates() {
2174        let builder = builder_without_changes(FormatVersion::V2);
2175
2176        let snapshot_1 = Snapshot::builder()
2177            .with_snapshot_id(1)
2178            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2179            .with_sequence_number(0)
2180            .with_schema_id(0)
2181            .with_manifest_list("/snap-1.avro")
2182            .with_summary(Summary {
2183                operation: Operation::Append,
2184                additional_properties: HashMap::from_iter(vec![
2185                    (
2186                        "spark.app.id".to_string(),
2187                        "local-1662532784305".to_string(),
2188                    ),
2189                    ("added-data-files".to_string(), "4".to_string()),
2190                    ("added-records".to_string(), "4".to_string()),
2191                    ("added-files-size".to_string(), "6001".to_string()),
2192                ]),
2193            })
2194            .build();
2195
2196        let snapshot_2 = Snapshot::builder()
2197            .with_snapshot_id(2)
2198            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2199            .with_sequence_number(0)
2200            .with_schema_id(0)
2201            .with_manifest_list("/snap-1.avro")
2202            .with_summary(Summary {
2203                operation: Operation::Append,
2204                additional_properties: HashMap::from_iter(vec![
2205                    (
2206                        "spark.app.id".to_string(),
2207                        "local-1662532784305".to_string(),
2208                    ),
2209                    ("added-data-files".to_string(), "4".to_string()),
2210                    ("added-records".to_string(), "4".to_string()),
2211                    ("added-files-size".to_string(), "6001".to_string()),
2212                ]),
2213            })
2214            .build();
2215
2216        let result = builder
2217            .add_snapshot(snapshot_1)
2218            .unwrap()
2219            .set_ref(MAIN_BRANCH, SnapshotReference {
2220                snapshot_id: 1,
2221                retention: SnapshotRetention::Branch {
2222                    min_snapshots_to_keep: Some(10),
2223                    max_snapshot_age_ms: None,
2224                    max_ref_age_ms: None,
2225                },
2226            })
2227            .unwrap()
2228            .set_branch_snapshot(snapshot_2.clone(), MAIN_BRANCH)
2229            .unwrap()
2230            .build()
2231            .unwrap();
2232
2233        assert_eq!(result.metadata.snapshot_log, vec![SnapshotLog {
2234            snapshot_id: 2,
2235            timestamp_ms: snapshot_2.timestamp_ms()
2236        }]);
2237        assert_eq!(result.metadata.current_snapshot().unwrap().snapshot_id(), 2);
2238    }
2239
2240    #[test]
2241    fn test_set_branch_snapshot_creates_branch_if_not_exists() {
2242        let builder = builder_without_changes(FormatVersion::V2);
2243
2244        let snapshot = Snapshot::builder()
2245            .with_snapshot_id(2)
2246            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2247            .with_sequence_number(0)
2248            .with_schema_id(0)
2249            .with_manifest_list("/snap-1.avro")
2250            .with_summary(Summary {
2251                operation: Operation::Append,
2252                additional_properties: HashMap::new(),
2253            })
2254            .build();
2255
2256        let build_result = builder
2257            .set_branch_snapshot(snapshot.clone(), "new_branch")
2258            .unwrap()
2259            .build()
2260            .unwrap();
2261
2262        let reference = SnapshotReference {
2263            snapshot_id: 2,
2264            retention: SnapshotRetention::Branch {
2265                min_snapshots_to_keep: None,
2266                max_snapshot_age_ms: None,
2267                max_ref_age_ms: None,
2268            },
2269        };
2270
2271        assert_eq!(build_result.metadata.refs.len(), 1);
2272        assert_eq!(
2273            build_result.metadata.refs.get("new_branch"),
2274            Some(&reference)
2275        );
2276        assert_eq!(build_result.changes, vec![
2277            TableUpdate::AddSnapshot { snapshot },
2278            TableUpdate::SetSnapshotRef {
2279                ref_name: "new_branch".to_string(),
2280                reference
2281            }
2282        ]);
2283    }
2284
2285    #[test]
2286    fn test_cannot_add_duplicate_snapshot_id() {
2287        let builder = builder_without_changes(FormatVersion::V2);
2288
2289        let snapshot = Snapshot::builder()
2290            .with_snapshot_id(2)
2291            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2292            .with_sequence_number(0)
2293            .with_schema_id(0)
2294            .with_manifest_list("/snap-1.avro")
2295            .with_summary(Summary {
2296                operation: Operation::Append,
2297                additional_properties: HashMap::from_iter(vec![
2298                    (
2299                        "spark.app.id".to_string(),
2300                        "local-1662532784305".to_string(),
2301                    ),
2302                    ("added-data-files".to_string(), "4".to_string()),
2303                    ("added-records".to_string(), "4".to_string()),
2304                    ("added-files-size".to_string(), "6001".to_string()),
2305                ]),
2306            })
2307            .build();
2308
2309        let builder = builder.add_snapshot(snapshot.clone()).unwrap();
2310        builder.add_snapshot(snapshot).unwrap_err();
2311    }
2312
2313    #[test]
2314    fn test_add_incompatible_current_schema_fails() {
2315        let builder = builder_without_changes(FormatVersion::V2);
2316
2317        let added_schema = Schema::builder()
2318            .with_schema_id(1)
2319            .with_fields(vec![])
2320            .build()
2321            .unwrap();
2322
2323        let err = builder
2324            .add_current_schema(added_schema)
2325            .unwrap()
2326            .build()
2327            .unwrap_err();
2328
2329        assert!(
2330            err.to_string()
2331                .contains("Cannot find partition source field")
2332        );
2333    }
2334
2335    #[test]
2336    fn test_add_partition_spec_for_v1_requires_sequential_ids() {
2337        let builder = builder_without_changes(FormatVersion::V1);
2338
2339        let added_spec = UnboundPartitionSpec::builder()
2340            .with_spec_id(10)
2341            .add_partition_fields(vec![
2342                UnboundPartitionField {
2343                    name: "y".to_string(),
2344                    transform: Transform::Identity,
2345                    source_id: 2,
2346                    field_id: Some(1000),
2347                },
2348                UnboundPartitionField {
2349                    name: "z".to_string(),
2350                    transform: Transform::Identity,
2351                    source_id: 3,
2352                    field_id: Some(1002),
2353                },
2354            ])
2355            .unwrap()
2356            .build();
2357
2358        let err = builder.add_partition_spec(added_spec).unwrap_err();
2359        assert!(err.to_string().contains(
2360            "Cannot add partition spec with non-sequential field ids to format version 1 table"
2361        ));
2362    }
2363
2364    #[test]
2365    fn test_expire_metadata_log() {
2366        let builder = builder_without_changes(FormatVersion::V2);
2367        let metadata = builder
2368            .set_properties(HashMap::from_iter(vec![(
2369                TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX.to_string(),
2370                "2".to_string(),
2371            )]))
2372            .unwrap()
2373            .build()
2374            .unwrap();
2375        assert_eq!(metadata.metadata.metadata_log.len(), 1);
2376        assert_eq!(metadata.expired_metadata_logs.len(), 0);
2377
2378        let metadata = metadata
2379            .metadata
2380            .into_builder(Some("path2".to_string()))
2381            .set_properties(HashMap::from_iter(vec![(
2382                "change_nr".to_string(),
2383                "1".to_string(),
2384            )]))
2385            .unwrap()
2386            .build()
2387            .unwrap();
2388
2389        assert_eq!(metadata.metadata.metadata_log.len(), 2);
2390        assert_eq!(metadata.expired_metadata_logs.len(), 0);
2391
2392        let metadata = metadata
2393            .metadata
2394            .into_builder(Some("path2".to_string()))
2395            .set_properties(HashMap::from_iter(vec![(
2396                "change_nr".to_string(),
2397                "2".to_string(),
2398            )]))
2399            .unwrap()
2400            .build()
2401            .unwrap();
2402        assert_eq!(metadata.metadata.metadata_log.len(), 2);
2403        assert_eq!(metadata.expired_metadata_logs.len(), 1);
2404    }
2405
2406    #[test]
2407    fn test_v2_sequence_number_cannot_decrease() {
2408        let builder = builder_without_changes(FormatVersion::V2);
2409
2410        let snapshot = Snapshot::builder()
2411            .with_snapshot_id(1)
2412            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2413            .with_sequence_number(1)
2414            .with_schema_id(0)
2415            .with_manifest_list("/snap-1")
2416            .with_summary(Summary {
2417                operation: Operation::Append,
2418                additional_properties: HashMap::new(),
2419            })
2420            .build();
2421
2422        let builder = builder
2423            .add_snapshot(snapshot.clone())
2424            .unwrap()
2425            .set_ref(MAIN_BRANCH, SnapshotReference {
2426                snapshot_id: 1,
2427                retention: SnapshotRetention::Branch {
2428                    min_snapshots_to_keep: Some(10),
2429                    max_snapshot_age_ms: None,
2430                    max_ref_age_ms: None,
2431                },
2432            })
2433            .unwrap();
2434
2435        let snapshot = Snapshot::builder()
2436            .with_snapshot_id(2)
2437            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2438            .with_sequence_number(0)
2439            .with_schema_id(0)
2440            .with_manifest_list("/snap-0")
2441            .with_parent_snapshot_id(Some(1))
2442            .with_summary(Summary {
2443                operation: Operation::Append,
2444                additional_properties: HashMap::new(),
2445            })
2446            .build();
2447
2448        let err = builder
2449            .set_branch_snapshot(snapshot, MAIN_BRANCH)
2450            .unwrap_err();
2451        assert!(
2452            err.to_string()
2453                .contains("Cannot add snapshot with sequence number")
2454        );
2455    }
2456
2457    #[test]
2458    fn test_default_spec_cannot_be_removed() {
2459        let builder = builder_without_changes(FormatVersion::V2);
2460
2461        builder.remove_partition_specs(&[0]).unwrap_err();
2462    }
2463
2464    #[test]
2465    fn test_statistics() {
2466        let builder = builder_without_changes(FormatVersion::V2);
2467
2468        let statistics = StatisticsFile {
2469            snapshot_id: 3055729675574597004,
2470            statistics_path: "s3://a/b/stats.puffin".to_string(),
2471            file_size_in_bytes: 413,
2472            file_footer_size_in_bytes: 42,
2473            key_metadata: None,
2474            blob_metadata: vec![BlobMetadata {
2475                snapshot_id: 3055729675574597004,
2476                sequence_number: 1,
2477                fields: vec![1],
2478                r#type: "ndv".to_string(),
2479                properties: HashMap::new(),
2480            }],
2481        };
2482        let build_result = builder.set_statistics(statistics.clone()).build().unwrap();
2483
2484        assert_eq!(
2485            build_result.metadata.statistics,
2486            HashMap::from_iter(vec![(3055729675574597004, statistics.clone())])
2487        );
2488        assert_eq!(build_result.changes, vec![TableUpdate::SetStatistics {
2489            statistics: statistics.clone()
2490        }]);
2491
2492        // Remove
2493        let builder = build_result.metadata.into_builder(None);
2494        let build_result = builder
2495            .remove_statistics(statistics.snapshot_id)
2496            .build()
2497            .unwrap();
2498
2499        assert_eq!(build_result.metadata.statistics.len(), 0);
2500        assert_eq!(build_result.changes, vec![TableUpdate::RemoveStatistics {
2501            snapshot_id: statistics.snapshot_id
2502        }]);
2503
2504        // Remove again yields no changes
2505        let builder = build_result.metadata.into_builder(None);
2506        let build_result = builder
2507            .remove_statistics(statistics.snapshot_id)
2508            .build()
2509            .unwrap();
2510        assert_eq!(build_result.metadata.statistics.len(), 0);
2511        assert_eq!(build_result.changes.len(), 0);
2512    }
2513
2514    #[test]
2515    fn test_add_partition_statistics() {
2516        let builder = builder_without_changes(FormatVersion::V2);
2517
2518        let statistics = PartitionStatisticsFile {
2519            snapshot_id: 3055729675574597004,
2520            statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2521            file_size_in_bytes: 43,
2522        };
2523
2524        let build_result = builder
2525            .set_partition_statistics(statistics.clone())
2526            .build()
2527            .unwrap();
2528        assert_eq!(
2529            build_result.metadata.partition_statistics,
2530            HashMap::from_iter(vec![(3055729675574597004, statistics.clone())])
2531        );
2532        assert_eq!(build_result.changes, vec![
2533            TableUpdate::SetPartitionStatistics {
2534                partition_statistics: statistics.clone()
2535            }
2536        ]);
2537
2538        // Remove
2539        let builder = build_result.metadata.into_builder(None);
2540        let build_result = builder
2541            .remove_partition_statistics(statistics.snapshot_id)
2542            .build()
2543            .unwrap();
2544        assert_eq!(build_result.metadata.partition_statistics.len(), 0);
2545        assert_eq!(build_result.changes, vec![
2546            TableUpdate::RemovePartitionStatistics {
2547                snapshot_id: statistics.snapshot_id
2548            }
2549        ]);
2550
2551        // Remove again yields no changes
2552        let builder = build_result.metadata.into_builder(None);
2553        let build_result = builder
2554            .remove_partition_statistics(statistics.snapshot_id)
2555            .build()
2556            .unwrap();
2557        assert_eq!(build_result.metadata.partition_statistics.len(), 0);
2558        assert_eq!(build_result.changes.len(), 0);
2559    }
2560
2561    #[test]
2562    fn last_update_increased_for_property_only_update() {
2563        let builder = builder_without_changes(FormatVersion::V2);
2564
2565        let metadata = builder.build().unwrap().metadata;
2566        let last_updated_ms = metadata.last_updated_ms;
2567        sleep(std::time::Duration::from_millis(2));
2568
2569        let build_result = metadata
2570            .into_builder(Some(
2571                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2572            ))
2573            .set_properties(HashMap::from_iter(vec![(
2574                "foo".to_string(),
2575                "bar".to_string(),
2576            )]))
2577            .unwrap()
2578            .build()
2579            .unwrap();
2580
2581        assert!(
2582            build_result.metadata.last_updated_ms > last_updated_ms,
2583            "{} > {}",
2584            build_result.metadata.last_updated_ms,
2585            last_updated_ms
2586        );
2587    }
2588
2589    #[test]
2590    fn test_construct_default_main_branch() {
2591        // Load the table without ref
2592        let file = File::open(format!(
2593            "{}/testdata/table_metadata/{}",
2594            env!("CARGO_MANIFEST_DIR"),
2595            "TableMetadataV2Valid.json"
2596        ))
2597        .unwrap();
2598        let reader = BufReader::new(file);
2599        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2600
2601        let table = Table::builder()
2602            .metadata(resp)
2603            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
2604            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2605            .file_io(FileIOBuilder::new("memory").build().unwrap())
2606            .build()
2607            .unwrap();
2608
2609        assert_eq!(
2610            table.metadata().refs.get(MAIN_BRANCH).unwrap().snapshot_id,
2611            table.metadata().current_snapshot_id().unwrap()
2612        );
2613    }
2614
2615    #[test]
2616    fn test_active_schema_cannot_be_removed() {
2617        let builder = builder_without_changes(FormatVersion::V2);
2618        builder.remove_schemas(&[0]).unwrap_err();
2619    }
2620
2621    #[test]
2622    fn test_remove_schemas() {
2623        let file = File::open(format!(
2624            "{}/testdata/table_metadata/{}",
2625            env!("CARGO_MANIFEST_DIR"),
2626            "TableMetadataV2Valid.json"
2627        ))
2628        .unwrap();
2629        let reader = BufReader::new(file);
2630        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2631
2632        let table = Table::builder()
2633            .metadata(resp)
2634            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
2635            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2636            .file_io(FileIOBuilder::new("memory").build().unwrap())
2637            .build()
2638            .unwrap();
2639
2640        assert_eq!(2, table.metadata().schemas.len());
2641
2642        {
2643            // can not remove active schema
2644            let meta_data_builder = table.metadata().clone().into_builder(None);
2645            meta_data_builder.remove_schemas(&[1]).unwrap_err();
2646        }
2647
2648        let mut meta_data_builder = table.metadata().clone().into_builder(None);
2649        meta_data_builder = meta_data_builder.remove_schemas(&[0]).unwrap();
2650        let build_result = meta_data_builder.build().unwrap();
2651        assert_eq!(1, build_result.metadata.schemas.len());
2652        assert_eq!(1, build_result.metadata.current_schema_id);
2653        assert_eq!(1, build_result.metadata.current_schema().schema_id());
2654        assert_eq!(1, build_result.changes.len());
2655
2656        let remove_schema_ids =
2657            if let TableUpdate::RemoveSchemas { schema_ids } = &build_result.changes[0] {
2658                schema_ids
2659            } else {
2660                unreachable!("Expected RemoveSchema change")
2661            };
2662        assert_eq!(remove_schema_ids, &[0]);
2663    }
2664
2665    #[test]
2666    fn test_schema_evolution_now_correctly_validates_partition_field_name_conflicts() {
2667        let initial_schema = Schema::builder()
2668            .with_fields(vec![
2669                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2670            ])
2671            .build()
2672            .unwrap();
2673
2674        let partition_spec_with_bucket = UnboundPartitionSpec::builder()
2675            .with_spec_id(0)
2676            .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2677            .unwrap()
2678            .build();
2679
2680        let metadata = TableMetadataBuilder::new(
2681            initial_schema,
2682            partition_spec_with_bucket,
2683            SortOrder::unsorted_order(),
2684            TEST_LOCATION.to_string(),
2685            FormatVersion::V2,
2686            HashMap::new(),
2687        )
2688        .unwrap()
2689        .build()
2690        .unwrap()
2691        .metadata;
2692
2693        let partition_field_names: Vec<String> = metadata
2694            .default_partition_spec()
2695            .fields()
2696            .iter()
2697            .map(|f| f.name.clone())
2698            .collect();
2699        assert!(partition_field_names.contains(&"bucket_data".to_string()));
2700
2701        let evolved_schema = Schema::builder()
2702            .with_fields(vec![
2703                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2704                // Adding a schema field with the same name as an existing partition field
2705                NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(),
2706            ])
2707            .build()
2708            .unwrap();
2709
2710        let builder = metadata.into_builder(Some(
2711            "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2712        ));
2713
2714        // Try to add the evolved schema - this should now fail immediately with a clear error
2715        let result = builder.add_current_schema(evolved_schema);
2716
2717        assert!(result.is_err());
2718        let error = result.unwrap_err();
2719        let error_message = error.message();
2720        assert!(error_message.contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
2721        assert!(error_message.contains("Schema evolution cannot introduce field names that match existing partition field names"));
2722    }
2723
2724    #[test]
2725    fn test_schema_evolution_should_validate_on_schema_add_not_metadata_build() {
2726        let initial_schema = Schema::builder()
2727            .with_fields(vec![
2728                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2729            ])
2730            .build()
2731            .unwrap();
2732
2733        let partition_spec = UnboundPartitionSpec::builder()
2734            .with_spec_id(0)
2735            .add_partition_field(1, "partition_col", Transform::Bucket(16))
2736            .unwrap()
2737            .build();
2738
2739        let metadata = TableMetadataBuilder::new(
2740            initial_schema,
2741            partition_spec,
2742            SortOrder::unsorted_order(),
2743            TEST_LOCATION.to_string(),
2744            FormatVersion::V2,
2745            HashMap::new(),
2746        )
2747        .unwrap()
2748        .build()
2749        .unwrap()
2750        .metadata;
2751
2752        let non_conflicting_schema = Schema::builder()
2753            .with_fields(vec![
2754                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2755                NestedField::required(2, "new_field", Type::Primitive(PrimitiveType::Int)).into(),
2756            ])
2757            .build()
2758            .unwrap();
2759
2760        // This should succeed since there's no name conflict
2761        let result = metadata
2762            .clone()
2763            .into_builder(Some("test_location".to_string()))
2764            .add_current_schema(non_conflicting_schema)
2765            .unwrap()
2766            .build();
2767
2768        assert!(result.is_ok());
2769    }
2770
2771    #[test]
2772    fn test_partition_spec_evolution_validates_schema_field_name_conflicts() {
2773        let initial_schema = Schema::builder()
2774            .with_fields(vec![
2775                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2776                NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2777                    .into(),
2778            ])
2779            .build()
2780            .unwrap();
2781
2782        let partition_spec = UnboundPartitionSpec::builder()
2783            .with_spec_id(0)
2784            .add_partition_field(1, "data_bucket", Transform::Bucket(16))
2785            .unwrap()
2786            .build();
2787
2788        let metadata = TableMetadataBuilder::new(
2789            initial_schema,
2790            partition_spec,
2791            SortOrder::unsorted_order(),
2792            TEST_LOCATION.to_string(),
2793            FormatVersion::V2,
2794            HashMap::new(),
2795        )
2796        .unwrap()
2797        .build()
2798        .unwrap()
2799        .metadata;
2800
2801        let builder = metadata.into_builder(Some(
2802            "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2803        ));
2804
2805        let conflicting_partition_spec = UnboundPartitionSpec::builder()
2806            .with_spec_id(1)
2807            .add_partition_field(1, "existing_field", Transform::Bucket(8))
2808            .unwrap()
2809            .build();
2810
2811        let result = builder.add_partition_spec(conflicting_partition_spec);
2812
2813        assert!(result.is_err());
2814        let error = result.unwrap_err();
2815        let error_message = error.message();
2816        // The error comes from our multi-version validation
2817        assert!(error_message.contains(
2818            "Cannot create partition with name 'existing_field' that conflicts with schema field"
2819        ));
2820        assert!(error_message.contains("and is not an identity transform"));
2821    }
2822
2823    #[test]
2824    fn test_schema_evolution_validates_against_all_historical_schemas() {
2825        // Create a table with an initial schema that has a field "existing_field"
2826        let initial_schema = Schema::builder()
2827            .with_fields(vec![
2828                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2829                NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2830                    .into(),
2831            ])
2832            .build()
2833            .unwrap();
2834
2835        let partition_spec = UnboundPartitionSpec::builder()
2836            .with_spec_id(0)
2837            .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2838            .unwrap()
2839            .build();
2840
2841        let metadata = TableMetadataBuilder::new(
2842            initial_schema,
2843            partition_spec,
2844            SortOrder::unsorted_order(),
2845            TEST_LOCATION.to_string(),
2846            FormatVersion::V2,
2847            HashMap::new(),
2848        )
2849        .unwrap()
2850        .build()
2851        .unwrap()
2852        .metadata;
2853
2854        // Add a second schema that removes the existing_field but keeps the data field
2855        let second_schema = Schema::builder()
2856            .with_fields(vec![
2857                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2858                NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2859                    .into(),
2860            ])
2861            .build()
2862            .unwrap();
2863
2864        let metadata = metadata
2865            .into_builder(Some("test_location".to_string()))
2866            .add_current_schema(second_schema)
2867            .unwrap()
2868            .build()
2869            .unwrap()
2870            .metadata;
2871
2872        // Now try to add a third schema that reintroduces "existing_field"
2873        // This should succeed because "existing_field" exists in a historical schema,
2874        // even though there's a partition field named "bucket_data"
2875        let third_schema = Schema::builder()
2876            .with_fields(vec![
2877                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2878                NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2879                    .into(),
2880                NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int))
2881                    .into(),
2882            ])
2883            .build()
2884            .unwrap();
2885
2886        let builder = metadata
2887            .clone()
2888            .into_builder(Some("test_location".to_string()));
2889
2890        // This should succeed because "existing_field" exists in a historical schema
2891        let result = builder.add_current_schema(third_schema);
2892        assert!(result.is_ok());
2893
2894        // However, trying to add a schema field that conflicts with the partition field
2895        // and doesn't exist in any historical schema should fail
2896        let conflicting_schema = Schema::builder()
2897            .with_fields(vec![
2898                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2899                NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2900                    .into(),
2901                NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int))
2902                    .into(),
2903                NestedField::required(5, "bucket_data", Type::Primitive(PrimitiveType::String))
2904                    .into(), // conflicts with partition field
2905            ])
2906            .build()
2907            .unwrap();
2908
2909        let builder2 = metadata.into_builder(Some("test_location".to_string()));
2910        let result2 = builder2.add_current_schema(conflicting_schema);
2911
2912        // This should fail because "bucket_data" conflicts with partition field name
2913        // and doesn't exist in any historical schema
2914        assert!(result2.is_err());
2915        let error = result2.unwrap_err();
2916        assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
2917    }
2918
2919    #[test]
2920    fn test_schema_evolution_allows_existing_partition_field_if_exists_in_historical_schema() {
2921        // Create initial schema with a field
2922        let initial_schema = Schema::builder()
2923            .with_fields(vec![
2924                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2925                NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int))
2926                    .into(),
2927            ])
2928            .build()
2929            .unwrap();
2930
2931        let partition_spec = UnboundPartitionSpec::builder()
2932            .with_spec_id(0)
2933            .add_partition_field(2, "partition_data", Transform::Identity)
2934            .unwrap()
2935            .build();
2936
2937        let metadata = TableMetadataBuilder::new(
2938            initial_schema,
2939            partition_spec,
2940            SortOrder::unsorted_order(),
2941            TEST_LOCATION.to_string(),
2942            FormatVersion::V2,
2943            HashMap::new(),
2944        )
2945        .unwrap()
2946        .build()
2947        .unwrap()
2948        .metadata;
2949
2950        // Add a new schema that still contains the partition_data field
2951        let evolved_schema = Schema::builder()
2952            .with_fields(vec![
2953                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2954                NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int))
2955                    .into(),
2956                NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2957                    .into(),
2958            ])
2959            .build()
2960            .unwrap();
2961
2962        // This should succeed because partition_data exists in historical schemas
2963        let result = metadata
2964            .into_builder(Some("test_location".to_string()))
2965            .add_current_schema(evolved_schema);
2966
2967        assert!(result.is_ok());
2968    }
2969
2970    #[test]
2971    fn test_schema_evolution_prevents_new_field_conflicting_with_partition_field() {
2972        // Create initial schema WITHOUT the conflicting field
2973        let initial_schema = Schema::builder()
2974            .with_fields(vec![
2975                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2976            ])
2977            .build()
2978            .unwrap();
2979
2980        let partition_spec = UnboundPartitionSpec::builder()
2981            .with_spec_id(0)
2982            .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2983            .unwrap()
2984            .build();
2985
2986        let metadata = TableMetadataBuilder::new(
2987            initial_schema,
2988            partition_spec,
2989            SortOrder::unsorted_order(),
2990            TEST_LOCATION.to_string(),
2991            FormatVersion::V2,
2992            HashMap::new(),
2993        )
2994        .unwrap()
2995        .build()
2996        .unwrap()
2997        .metadata;
2998
2999        // Try to add a schema with a field that conflicts with partition field name
3000        let conflicting_schema = Schema::builder()
3001            .with_fields(vec![
3002                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3003                // This field name conflicts with the partition field "bucket_data"
3004                NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(),
3005            ])
3006            .build()
3007            .unwrap();
3008
3009        let builder = metadata.into_builder(Some("test_location".to_string()));
3010        let result = builder.add_current_schema(conflicting_schema);
3011
3012        // This should fail because "bucket_data" conflicts with partition field name
3013        // and doesn't exist in any historical schema
3014        assert!(result.is_err());
3015        let error = result.unwrap_err();
3016        assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
3017    }
3018
3019    #[test]
3020    fn test_partition_spec_evolution_allows_non_conflicting_names() {
3021        let initial_schema = Schema::builder()
3022            .with_fields(vec![
3023                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3024                NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
3025                    .into(),
3026            ])
3027            .build()
3028            .unwrap();
3029
3030        let partition_spec = UnboundPartitionSpec::builder()
3031            .with_spec_id(0)
3032            .add_partition_field(1, "data_bucket", Transform::Bucket(16))
3033            .unwrap()
3034            .build();
3035
3036        let metadata = TableMetadataBuilder::new(
3037            initial_schema,
3038            partition_spec,
3039            SortOrder::unsorted_order(),
3040            TEST_LOCATION.to_string(),
3041            FormatVersion::V2,
3042            HashMap::new(),
3043        )
3044        .unwrap()
3045        .build()
3046        .unwrap()
3047        .metadata;
3048
3049        let builder = metadata.into_builder(Some(
3050            "s3://bucket/test/location/metadata/metadata1.json".to_string(),
3051        ));
3052
3053        // Try to add a partition spec with a field name that does NOT conflict with existing schema fields
3054        let non_conflicting_partition_spec = UnboundPartitionSpec::builder()
3055            .with_spec_id(1)
3056            .add_partition_field(2, "new_partition_field", Transform::Bucket(8))
3057            .unwrap()
3058            .build();
3059
3060        let result = builder.add_partition_spec(non_conflicting_partition_spec);
3061
3062        assert!(result.is_ok());
3063    }
3064
3065    #[test]
3066    fn test_row_lineage_addition() {
3067        let new_rows = 30;
3068        let base = builder_without_changes(FormatVersion::V3)
3069            .build()
3070            .unwrap()
3071            .metadata;
3072        let add_rows = Snapshot::builder()
3073            .with_snapshot_id(0)
3074            .with_timestamp_ms(base.last_updated_ms + 1)
3075            .with_sequence_number(0)
3076            .with_schema_id(0)
3077            .with_manifest_list("foo")
3078            .with_parent_snapshot_id(None)
3079            .with_summary(Summary {
3080                operation: Operation::Append,
3081                additional_properties: HashMap::new(),
3082            })
3083            .with_row_range(base.next_row_id(), new_rows)
3084            .build();
3085
3086        let first_addition = base
3087            .into_builder(None)
3088            .add_snapshot(add_rows.clone())
3089            .unwrap()
3090            .build()
3091            .unwrap()
3092            .metadata;
3093
3094        assert_eq!(first_addition.next_row_id(), new_rows);
3095
3096        let add_more_rows = Snapshot::builder()
3097            .with_snapshot_id(1)
3098            .with_timestamp_ms(first_addition.last_updated_ms + 1)
3099            .with_sequence_number(1)
3100            .with_schema_id(0)
3101            .with_manifest_list("foo")
3102            .with_parent_snapshot_id(Some(0))
3103            .with_summary(Summary {
3104                operation: Operation::Append,
3105                additional_properties: HashMap::new(),
3106            })
3107            .with_row_range(first_addition.next_row_id(), new_rows)
3108            .build();
3109
3110        let second_addition = first_addition
3111            .into_builder(None)
3112            .add_snapshot(add_more_rows)
3113            .unwrap()
3114            .build()
3115            .unwrap()
3116            .metadata;
3117        assert_eq!(second_addition.next_row_id(), new_rows * 2);
3118    }
3119
3120    #[test]
3121    fn test_row_lineage_invalid_snapshot() {
3122        let new_rows = 30;
3123        let base = builder_without_changes(FormatVersion::V3)
3124            .build()
3125            .unwrap()
3126            .metadata;
3127
3128        // add rows to check TableMetadata validation; Snapshot rejects negative next-row-id
3129        let add_rows = Snapshot::builder()
3130            .with_snapshot_id(0)
3131            .with_timestamp_ms(base.last_updated_ms + 1)
3132            .with_sequence_number(0)
3133            .with_schema_id(0)
3134            .with_manifest_list("foo")
3135            .with_parent_snapshot_id(None)
3136            .with_summary(Summary {
3137                operation: Operation::Append,
3138                additional_properties: HashMap::new(),
3139            })
3140            .with_row_range(base.next_row_id(), new_rows)
3141            .build();
3142
3143        let added = base
3144            .into_builder(None)
3145            .add_snapshot(add_rows)
3146            .unwrap()
3147            .build()
3148            .unwrap()
3149            .metadata;
3150
3151        let invalid_new_rows = Snapshot::builder()
3152            .with_snapshot_id(1)
3153            .with_timestamp_ms(added.last_updated_ms + 1)
3154            .with_sequence_number(1)
3155            .with_schema_id(0)
3156            .with_manifest_list("foo")
3157            .with_parent_snapshot_id(Some(0))
3158            .with_summary(Summary {
3159                operation: Operation::Append,
3160                additional_properties: HashMap::new(),
3161            })
3162            // first_row_id is behind table next_row_id
3163            .with_row_range(added.next_row_id() - 1, 10)
3164            .build();
3165
3166        let err = added
3167            .into_builder(None)
3168            .add_snapshot(invalid_new_rows)
3169            .unwrap_err();
3170        assert!(
3171            err.to_string().contains(
3172                "Cannot add a snapshot, first-row-id is behind table next-row-id: 29 < 30"
3173            )
3174        );
3175    }
3176
3177    #[test]
3178    fn test_row_lineage_append_branch() {
3179        // Appends to a branch should still change last-row-id even if not on main, these changes
3180        // should also affect commits to main
3181
3182        let branch = "some_branch";
3183
3184        // Start with V3 metadata to support row lineage
3185        let base = builder_without_changes(FormatVersion::V3)
3186            .build()
3187            .unwrap()
3188            .metadata;
3189
3190        // Initial next_row_id should be 0
3191        assert_eq!(base.next_row_id(), 0);
3192
3193        // Write to Branch - append 30 rows
3194        let branch_snapshot_1 = Snapshot::builder()
3195            .with_snapshot_id(1)
3196            .with_timestamp_ms(base.last_updated_ms + 1)
3197            .with_sequence_number(0)
3198            .with_schema_id(0)
3199            .with_manifest_list("foo")
3200            .with_parent_snapshot_id(None)
3201            .with_summary(Summary {
3202                operation: Operation::Append,
3203                additional_properties: HashMap::new(),
3204            })
3205            .with_row_range(base.next_row_id(), 30)
3206            .build();
3207
3208        let table_after_branch_1 = base
3209            .into_builder(None)
3210            .set_branch_snapshot(branch_snapshot_1.clone(), branch)
3211            .unwrap()
3212            .build()
3213            .unwrap()
3214            .metadata;
3215
3216        // Current snapshot should be null (no main branch snapshot yet)
3217        assert!(table_after_branch_1.current_snapshot().is_none());
3218
3219        // Branch snapshot should have first_row_id = 0
3220        let branch_ref = table_after_branch_1.refs.get(branch).unwrap();
3221        let branch_snap_1 = table_after_branch_1
3222            .snapshots
3223            .get(&branch_ref.snapshot_id)
3224            .unwrap();
3225        assert_eq!(branch_snap_1.first_row_id(), Some(0));
3226
3227        // Next row id should be 30
3228        assert_eq!(table_after_branch_1.next_row_id(), 30);
3229
3230        // Write to Main - append 28 rows
3231        let main_snapshot = Snapshot::builder()
3232            .with_snapshot_id(2)
3233            .with_timestamp_ms(table_after_branch_1.last_updated_ms + 1)
3234            .with_sequence_number(1)
3235            .with_schema_id(0)
3236            .with_manifest_list("bar")
3237            .with_parent_snapshot_id(None)
3238            .with_summary(Summary {
3239                operation: Operation::Append,
3240                additional_properties: HashMap::new(),
3241            })
3242            .with_row_range(table_after_branch_1.next_row_id(), 28)
3243            .build();
3244
3245        let table_after_main = table_after_branch_1
3246            .into_builder(None)
3247            .add_snapshot(main_snapshot.clone())
3248            .unwrap()
3249            .set_ref(MAIN_BRANCH, SnapshotReference {
3250                snapshot_id: main_snapshot.snapshot_id(),
3251                retention: SnapshotRetention::Branch {
3252                    min_snapshots_to_keep: None,
3253                    max_snapshot_age_ms: None,
3254                    max_ref_age_ms: None,
3255                },
3256            })
3257            .unwrap()
3258            .build()
3259            .unwrap()
3260            .metadata;
3261
3262        // Main snapshot should have first_row_id = 30
3263        let current_snapshot = table_after_main.current_snapshot().unwrap();
3264        assert_eq!(current_snapshot.first_row_id(), Some(30));
3265
3266        // Next row id should be 58 (30 + 28)
3267        assert_eq!(table_after_main.next_row_id(), 58);
3268
3269        // Write again to branch - append 21 rows
3270        let branch_snapshot_2 = Snapshot::builder()
3271            .with_snapshot_id(3)
3272            .with_timestamp_ms(table_after_main.last_updated_ms + 1)
3273            .with_sequence_number(2)
3274            .with_schema_id(0)
3275            .with_manifest_list("baz")
3276            .with_parent_snapshot_id(Some(branch_snapshot_1.snapshot_id()))
3277            .with_summary(Summary {
3278                operation: Operation::Append,
3279                additional_properties: HashMap::new(),
3280            })
3281            .with_row_range(table_after_main.next_row_id(), 21)
3282            .build();
3283
3284        let table_after_branch_2 = table_after_main
3285            .into_builder(None)
3286            .set_branch_snapshot(branch_snapshot_2.clone(), branch)
3287            .unwrap()
3288            .build()
3289            .unwrap()
3290            .metadata;
3291
3292        // Branch snapshot should have first_row_id = 58 (30 + 28)
3293        let branch_ref_2 = table_after_branch_2.refs.get(branch).unwrap();
3294        let branch_snap_2 = table_after_branch_2
3295            .snapshots
3296            .get(&branch_ref_2.snapshot_id)
3297            .unwrap();
3298        assert_eq!(branch_snap_2.first_row_id(), Some(58));
3299
3300        // Next row id should be 79 (30 + 28 + 21)
3301        assert_eq!(table_after_branch_2.next_row_id(), 79);
3302    }
3303
3304    #[test]
3305    fn test_encryption_keys() {
3306        let builder = builder_without_changes(FormatVersion::V2);
3307
3308        // Create test encryption keys
3309        let encryption_key_1 = EncryptedKey::builder()
3310            .key_id("key-1")
3311            .encrypted_key_metadata(vec![1, 2, 3, 4])
3312            .encrypted_by_id("encryption-service-1")
3313            .properties(HashMap::from_iter(vec![(
3314                "algorithm".to_string(),
3315                "AES-256".to_string(),
3316            )]))
3317            .build();
3318
3319        let encryption_key_2 = EncryptedKey::builder()
3320            .key_id("key-2")
3321            .encrypted_key_metadata(vec![5, 6, 7, 8])
3322            .encrypted_by_id("encryption-service-2")
3323            .properties(HashMap::new())
3324            .build();
3325
3326        // Add first encryption key
3327        let build_result = builder
3328            .add_encryption_key(encryption_key_1.clone())
3329            .build()
3330            .unwrap();
3331
3332        assert_eq!(build_result.changes.len(), 1);
3333        assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3334        assert_eq!(
3335            build_result.metadata.encryption_key("key-1"),
3336            Some(&encryption_key_1)
3337        );
3338        assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey {
3339            encryption_key: encryption_key_1.clone()
3340        });
3341
3342        // Add second encryption key
3343        let build_result = build_result
3344            .metadata
3345            .into_builder(Some(
3346                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
3347            ))
3348            .add_encryption_key(encryption_key_2.clone())
3349            .build()
3350            .unwrap();
3351
3352        assert_eq!(build_result.changes.len(), 1);
3353        assert_eq!(build_result.metadata.encryption_keys.len(), 2);
3354        assert_eq!(
3355            build_result.metadata.encryption_key("key-1"),
3356            Some(&encryption_key_1)
3357        );
3358        assert_eq!(
3359            build_result.metadata.encryption_key("key-2"),
3360            Some(&encryption_key_2)
3361        );
3362        assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey {
3363            encryption_key: encryption_key_2.clone()
3364        });
3365
3366        // Try to add duplicate key - should not create a change
3367        let build_result = build_result
3368            .metadata
3369            .into_builder(Some(
3370                "s3://bucket/test/location/metadata/metadata2.json".to_string(),
3371            ))
3372            .add_encryption_key(encryption_key_1.clone())
3373            .build()
3374            .unwrap();
3375
3376        assert_eq!(build_result.changes.len(), 0);
3377        assert_eq!(build_result.metadata.encryption_keys.len(), 2);
3378
3379        // Remove first encryption key
3380        let build_result = build_result
3381            .metadata
3382            .into_builder(Some(
3383                "s3://bucket/test/location/metadata/metadata3.json".to_string(),
3384            ))
3385            .remove_encryption_key("key-1")
3386            .build()
3387            .unwrap();
3388
3389        assert_eq!(build_result.changes.len(), 1);
3390        assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3391        assert_eq!(build_result.metadata.encryption_key("key-1"), None);
3392        assert_eq!(
3393            build_result.metadata.encryption_key("key-2"),
3394            Some(&encryption_key_2)
3395        );
3396        assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey {
3397            key_id: "key-1".to_string()
3398        });
3399
3400        // Try to remove non-existent key - should not create a change
3401        let build_result = build_result
3402            .metadata
3403            .into_builder(Some(
3404                "s3://bucket/test/location/metadata/metadata4.json".to_string(),
3405            ))
3406            .remove_encryption_key("non-existent-key")
3407            .build()
3408            .unwrap();
3409
3410        assert_eq!(build_result.changes.len(), 0);
3411        assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3412
3413        // Test encryption_keys_iter()
3414        let keys = build_result
3415            .metadata
3416            .encryption_keys_iter()
3417            .collect::<Vec<_>>();
3418        assert_eq!(keys.len(), 1);
3419        assert_eq!(keys[0], &encryption_key_2);
3420
3421        // Remove last encryption key
3422        let build_result = build_result
3423            .metadata
3424            .into_builder(Some(
3425                "s3://bucket/test/location/metadata/metadata5.json".to_string(),
3426            ))
3427            .remove_encryption_key("key-2")
3428            .build()
3429            .unwrap();
3430
3431        assert_eq!(build_result.changes.len(), 1);
3432        assert_eq!(build_result.metadata.encryption_keys.len(), 0);
3433        assert_eq!(build_result.metadata.encryption_key("key-2"), None);
3434        assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey {
3435            key_id: "key-2".to_string()
3436        });
3437
3438        // Verify empty encryption_keys_iter()
3439        let keys = build_result.metadata.encryption_keys_iter();
3440        assert_eq!(keys.len(), 0);
3441    }
3442}