iceberg/spec/
table_metadata_builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use uuid::Uuid;
22
23use super::{
24    DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, FormatVersion, MAIN_BRANCH, MetadataLog,
25    ONE_MINUTE_MS, PartitionSpec, PartitionSpecBuilder, PartitionStatisticsFile, Schema, SchemaRef,
26    Snapshot, SnapshotLog, SnapshotReference, SnapshotRetention, SortOrder, SortOrderRef,
27    StatisticsFile, StructType, TableMetadata, TableProperties, UNPARTITIONED_LAST_ASSIGNED_ID,
28    UnboundPartitionSpec,
29};
30use crate::error::{Error, ErrorKind, Result};
31use crate::spec::{EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE};
32use crate::{TableCreation, TableUpdate};
33
34pub(crate) const FIRST_FIELD_ID: i32 = 1;
35
36/// Manipulating table metadata.
37///
38/// For this builder the order of called functions matters. Functions are applied in-order.
39/// All operations applied to the `TableMetadata` are tracked in `changes` as  a chronologically
40/// ordered vec of `TableUpdate`.
41/// If an operation does not lead to a change of the `TableMetadata`, the corresponding update
42/// is omitted from `changes`.
43///
44/// Unlike a typical builder pattern, the order of function calls matters.
45/// Some basic rules:
46/// - `add_schema` must be called before `set_current_schema`.
47/// - If a new partition spec and schema are added, the schema should be added first.
48#[derive(Debug, Clone)]
49pub struct TableMetadataBuilder {
50    metadata: TableMetadata,
51    changes: Vec<TableUpdate>,
52    last_added_schema_id: Option<i32>,
53    last_added_spec_id: Option<i32>,
54    last_added_order_id: Option<i64>,
55    // None if this is a new table (from_metadata) method not used
56    previous_history_entry: Option<MetadataLog>,
57    last_updated_ms: Option<i64>,
58}
59
60#[derive(Debug, Clone, PartialEq)]
61/// Result of modifying or creating a `TableMetadata`.
62pub struct TableMetadataBuildResult {
63    /// The new `TableMetadata`.
64    pub metadata: TableMetadata,
65    /// The changes that were applied to the metadata.
66    pub changes: Vec<TableUpdate>,
67    /// Expired metadata logs
68    pub expired_metadata_logs: Vec<MetadataLog>,
69}
70
71impl TableMetadataBuilder {
72    /// Proxy id for "last added" items, including schema, partition spec, sort order.
73    pub const LAST_ADDED: i32 = -1;
74
75    /// Create a `TableMetadata` object from scratch.
76    ///
77    /// This method re-assign ids of fields in the schema, schema.id, sort_order.id and
78    /// spec.id. It should only be used to create new table metadata from scratch.
79    pub fn new(
80        schema: Schema,
81        spec: impl Into<UnboundPartitionSpec>,
82        sort_order: SortOrder,
83        location: String,
84        format_version: FormatVersion,
85        properties: HashMap<String, String>,
86    ) -> Result<Self> {
87        // Re-assign field_ids, schema.id, sort_order.id and spec.id for a new table.
88        let (fresh_schema, fresh_spec, fresh_sort_order) =
89            Self::reassign_ids(schema, spec.into(), sort_order)?;
90        let schema_id = fresh_schema.schema_id();
91
92        let builder = Self {
93            metadata: TableMetadata {
94                format_version,
95                table_uuid: Uuid::now_v7(),
96                location: "".to_string(), // Overwritten immediately by set_location
97                last_sequence_number: 0,
98                last_updated_ms: 0,    // Overwritten by build() if not set before
99                last_column_id: -1,    // Overwritten immediately by add_current_schema
100                current_schema_id: -1, // Overwritten immediately by add_current_schema
101                schemas: HashMap::new(),
102                partition_specs: HashMap::new(),
103                default_spec: Arc::new(
104                    // The spec id (-1) is just a proxy value and can be any negative number.
105                    // 0 would lead to wrong changes in the builder if the provided spec by the user is
106                    // also unpartitioned.
107                    // The `default_spec` value is always replaced at the end of this method by he `add_default_partition_spec`
108                    // method.
109                    PartitionSpec::unpartition_spec().with_spec_id(-1),
110                ), // Overwritten immediately by add_default_partition_spec
111                default_partition_type: StructType::new(vec![]),
112                last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID,
113                properties: HashMap::new(),
114                current_snapshot_id: None,
115                snapshots: HashMap::new(),
116                snapshot_log: vec![],
117                sort_orders: HashMap::new(),
118                metadata_log: vec![],
119                default_sort_order_id: -1, // Overwritten immediately by add_default_sort_order
120                refs: HashMap::default(),
121                statistics: HashMap::new(),
122                partition_statistics: HashMap::new(),
123                encryption_keys: HashMap::new(),
124                next_row_id: INITIAL_ROW_ID,
125            },
126            last_updated_ms: None,
127            changes: vec![],
128            last_added_schema_id: Some(schema_id),
129            last_added_spec_id: None,
130            last_added_order_id: None,
131            previous_history_entry: None,
132        };
133
134        builder
135            .set_location(location)
136            .add_current_schema(fresh_schema)?
137            .add_default_partition_spec(fresh_spec.into_unbound())?
138            .add_default_sort_order(fresh_sort_order)?
139            .set_properties(properties)
140    }
141
142    /// Creates a new table metadata builder from the given metadata to modify it.
143    /// `current_file_location` is the location where the current version
144    /// of the metadata file is stored. This is used to update the metadata log.
145    /// If `current_file_location` is `None`, the metadata log will not be updated.
146    /// This should only be used to stage-create tables.
147    #[must_use]
148    pub fn new_from_metadata(
149        previous: TableMetadata,
150        current_file_location: Option<String>,
151    ) -> Self {
152        Self {
153            previous_history_entry: current_file_location.map(|l| MetadataLog {
154                metadata_file: l,
155                timestamp_ms: previous.last_updated_ms,
156            }),
157            metadata: previous,
158            changes: Vec::default(),
159            last_added_schema_id: None,
160            last_added_spec_id: None,
161            last_added_order_id: None,
162            last_updated_ms: None,
163        }
164    }
165
166    /// Creates a new table metadata builder from the given table creation.
167    pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
168        let TableCreation {
169            name: _,
170            location,
171            schema,
172            partition_spec,
173            sort_order,
174            properties,
175            format_version,
176        } = table_creation;
177
178        let location = location.ok_or_else(|| {
179            Error::new(
180                ErrorKind::DataInvalid,
181                "Can't create table without location",
182            )
183        })?;
184        let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec {
185            spec_id: None,
186            fields: vec![],
187        });
188
189        Self::new(
190            schema,
191            partition_spec,
192            sort_order.unwrap_or(SortOrder::unsorted_order()),
193            location,
194            format_version,
195            properties,
196        )
197    }
198
199    /// Changes uuid of table metadata.
200    pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
201        if self.metadata.table_uuid != uuid {
202            self.metadata.table_uuid = uuid;
203            self.changes.push(TableUpdate::AssignUuid { uuid });
204        }
205
206        self
207    }
208
209    /// Upgrade `FormatVersion`. Downgrades are not allowed.
210    ///
211    /// # Errors
212    /// - Cannot downgrade to older format versions.
213    pub fn upgrade_format_version(mut self, format_version: FormatVersion) -> Result<Self> {
214        if format_version < self.metadata.format_version {
215            return Err(Error::new(
216                ErrorKind::DataInvalid,
217                format!(
218                    "Cannot downgrade FormatVersion from {} to {}",
219                    self.metadata.format_version, format_version
220                ),
221            ));
222        }
223
224        if format_version != self.metadata.format_version {
225            match format_version {
226                FormatVersion::V1 => {
227                    // No changes needed for V1
228                }
229                FormatVersion::V2 => {
230                    self.metadata.format_version = format_version;
231                    self.changes
232                        .push(TableUpdate::UpgradeFormatVersion { format_version });
233                }
234                FormatVersion::V3 => {
235                    self.metadata.format_version = format_version;
236                    // Set next-row-id to 0 when upgrading to v3 as per Iceberg spec
237                    self.metadata.next_row_id = INITIAL_ROW_ID;
238                    self.changes
239                        .push(TableUpdate::UpgradeFormatVersion { format_version });
240                }
241            }
242        }
243
244        Ok(self)
245    }
246
247    /// Set properties. If a property already exists, it will be overwritten.
248    ///
249    /// If a reserved property is set, the corresponding action is performed and the property is not persisted.
250    /// Currently the following reserved properties are supported:
251    /// * format-version: Set the format version of the table.
252    ///
253    /// # Errors
254    /// - If properties contains a reserved property
255    pub fn set_properties(mut self, properties: HashMap<String, String>) -> Result<Self> {
256        // List of specified properties that are RESERVED and should not be persisted.
257        let reserved_properties = properties
258            .keys()
259            .filter(|key| TableProperties::RESERVED_PROPERTIES.contains(&key.as_str()))
260            .map(ToString::to_string)
261            .collect::<Vec<_>>();
262
263        if !reserved_properties.is_empty() {
264            return Err(Error::new(
265                ErrorKind::DataInvalid,
266                format!(
267                    "Table properties should not contain reserved properties, but got: [{}]",
268                    reserved_properties.join(", ")
269                ),
270            ));
271        }
272
273        if properties.is_empty() {
274            return Ok(self);
275        }
276
277        self.metadata.properties.extend(properties.clone());
278        self.changes.push(TableUpdate::SetProperties {
279            updates: properties,
280        });
281
282        Ok(self)
283    }
284
285    /// Remove properties from the table metadata.
286    /// Does nothing if the key is not present.
287    ///
288    /// # Errors
289    /// - If properties to remove contains a reserved property
290    pub fn remove_properties(mut self, properties: &[String]) -> Result<Self> {
291        // remove duplicates
292        let properties = properties.iter().cloned().collect::<HashSet<_>>();
293
294        // disallow removal of reserved properties
295        let reserved_properties = properties
296            .iter()
297            .filter(|key| TableProperties::RESERVED_PROPERTIES.contains(&key.as_str()))
298            .map(ToString::to_string)
299            .collect::<Vec<_>>();
300
301        if !reserved_properties.is_empty() {
302            return Err(Error::new(
303                ErrorKind::DataInvalid,
304                format!(
305                    "Table properties to remove contain reserved properties: [{}]",
306                    reserved_properties.join(", ")
307                ),
308            ));
309        }
310
311        for property in &properties {
312            self.metadata.properties.remove(property);
313        }
314
315        if !properties.is_empty() {
316            self.changes.push(TableUpdate::RemoveProperties {
317                removals: properties.into_iter().collect(),
318            });
319        }
320
321        Ok(self)
322    }
323
324    /// Set the location of the table, stripping any trailing slashes.
325    pub fn set_location(mut self, location: String) -> Self {
326        let location = location.trim_end_matches('/').to_string();
327        if self.metadata.location != location {
328            self.changes.push(TableUpdate::SetLocation {
329                location: location.clone(),
330            });
331            self.metadata.location = location;
332        }
333
334        self
335    }
336
337    /// Add a snapshot to the table metadata.
338    ///
339    /// # Errors
340    /// - Snapshot id already exists.
341    /// - For format version > 1: the sequence number of the snapshot is lower than the highest sequence number specified so far.
342    /// - For format version >= 3: the first-row-id of the snapshot is lower than the next-row-id of the table.
343    /// - For format version >= 3: added-rows is null or first-row-id is null.
344    /// - For format version >= 3: next-row-id would overflow when adding added-rows.
345    pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> {
346        if self
347            .metadata
348            .snapshots
349            .contains_key(&snapshot.snapshot_id())
350        {
351            return Err(Error::new(
352                ErrorKind::DataInvalid,
353                format!("Snapshot already exists for: '{}'", snapshot.snapshot_id()),
354            ));
355        }
356
357        if self.metadata.format_version != FormatVersion::V1
358            && snapshot.sequence_number() <= self.metadata.last_sequence_number
359            && snapshot.parent_snapshot_id().is_some()
360        {
361            return Err(Error::new(
362                ErrorKind::DataInvalid,
363                format!(
364                    "Cannot add snapshot with sequence number {} older than last sequence number {}",
365                    snapshot.sequence_number(),
366                    self.metadata.last_sequence_number
367                ),
368            ));
369        }
370
371        if let Some(last) = self.metadata.snapshot_log.last() {
372            // commits can happen concurrently from different machines.
373            // A tolerance helps us avoid failure for small clock skew
374            if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS {
375                return Err(Error::new(
376                    ErrorKind::DataInvalid,
377                    format!(
378                        "Invalid snapshot timestamp {}: before last snapshot timestamp {}",
379                        snapshot.timestamp_ms(),
380                        last.timestamp_ms
381                    ),
382                ));
383            }
384        }
385
386        let max_last_updated = self
387            .last_updated_ms
388            .unwrap_or_default()
389            .max(self.metadata.last_updated_ms);
390        if snapshot.timestamp_ms() - max_last_updated < -ONE_MINUTE_MS {
391            return Err(Error::new(
392                ErrorKind::DataInvalid,
393                format!(
394                    "Invalid snapshot timestamp {}: before last updated timestamp {}",
395                    snapshot.timestamp_ms(),
396                    max_last_updated
397                ),
398            ));
399        }
400
401        let mut added_rows = None;
402        if self.metadata.format_version >= MIN_FORMAT_VERSION_ROW_LINEAGE {
403            if let Some((first_row_id, added_rows_count)) = snapshot.row_range() {
404                if first_row_id < self.metadata.next_row_id {
405                    return Err(Error::new(
406                        ErrorKind::DataInvalid,
407                        format!(
408                            "Cannot add a snapshot, first-row-id is behind table next-row-id: {first_row_id} < {}",
409                            self.metadata.next_row_id
410                        ),
411                    ));
412                }
413
414                added_rows = Some(added_rows_count);
415            } else {
416                return Err(Error::new(
417                    ErrorKind::DataInvalid,
418                    format!(
419                        "Cannot add a snapshot: first-row-id is null. first-row-id must be set for format version >= {MIN_FORMAT_VERSION_ROW_LINEAGE}",
420                    ),
421                ));
422            }
423        }
424
425        if let Some(added_rows) = added_rows {
426            self.metadata.next_row_id = self
427                .metadata
428                .next_row_id
429                .checked_add(added_rows)
430                .ok_or_else(|| {
431                    Error::new(
432                        ErrorKind::DataInvalid,
433                        "Cannot add snapshot: next-row-id overflowed when adding added-rows",
434                    )
435                })?;
436        }
437
438        // Mutation happens in next line - must be infallible from here
439        self.changes.push(TableUpdate::AddSnapshot {
440            snapshot: snapshot.clone(),
441        });
442
443        self.last_updated_ms = Some(snapshot.timestamp_ms());
444        self.metadata.last_sequence_number = snapshot.sequence_number();
445        self.metadata
446            .snapshots
447            .insert(snapshot.snapshot_id(), snapshot.into());
448
449        Ok(self)
450    }
451
452    /// Append a snapshot to the specified branch.
453    /// Retention settings from the `branch` are re-used.
454    ///
455    /// # Errors
456    /// - Any of the preconditions of `self.add_snapshot` are not met.
457    pub fn set_branch_snapshot(self, snapshot: Snapshot, branch: &str) -> Result<Self> {
458        let reference = self.metadata.refs.get(branch).cloned();
459
460        let reference = if let Some(mut reference) = reference {
461            if !reference.is_branch() {
462                return Err(Error::new(
463                    ErrorKind::DataInvalid,
464                    format!("Cannot append snapshot to non-branch reference '{branch}'",),
465                ));
466            }
467
468            reference.snapshot_id = snapshot.snapshot_id();
469            reference
470        } else {
471            SnapshotReference {
472                snapshot_id: snapshot.snapshot_id(),
473                retention: SnapshotRetention::Branch {
474                    min_snapshots_to_keep: None,
475                    max_snapshot_age_ms: None,
476                    max_ref_age_ms: None,
477                },
478            }
479        };
480
481        self.add_snapshot(snapshot)?.set_ref(branch, reference)
482    }
483
484    /// Remove snapshots by its ids from the table metadata.
485    /// Does nothing if a snapshot id is not present.
486    /// Keeps as changes only the snapshots that were actually removed.
487    pub fn remove_snapshots(mut self, snapshot_ids: &[i64]) -> Self {
488        let mut removed_snapshots = Vec::with_capacity(snapshot_ids.len());
489
490        self.metadata.snapshots.retain(|k, _| {
491            if snapshot_ids.contains(k) {
492                removed_snapshots.push(*k);
493                false
494            } else {
495                true
496            }
497        });
498
499        if !removed_snapshots.is_empty() {
500            self.changes.push(TableUpdate::RemoveSnapshots {
501                snapshot_ids: removed_snapshots,
502            });
503        }
504
505        // Remove refs that are no longer valid
506        self.metadata
507            .refs
508            .retain(|_, v| self.metadata.snapshots.contains_key(&v.snapshot_id));
509
510        self
511    }
512
513    /// Set a reference to a snapshot.
514    ///
515    /// # Errors
516    /// - The snapshot id is unknown.
517    pub fn set_ref(mut self, ref_name: &str, reference: SnapshotReference) -> Result<Self> {
518        if self
519            .metadata
520            .refs
521            .get(ref_name)
522            .is_some_and(|snap_ref| snap_ref.eq(&reference))
523        {
524            return Ok(self);
525        }
526
527        let Some(snapshot) = self.metadata.snapshots.get(&reference.snapshot_id) else {
528            return Err(Error::new(
529                ErrorKind::DataInvalid,
530                format!(
531                    "Cannot set '{ref_name}' to unknown snapshot: '{}'",
532                    reference.snapshot_id
533                ),
534            ));
535        };
536
537        // Update last_updated_ms to the exact timestamp of the snapshot if it was added in this commit
538        let is_added_snapshot = self.changes.iter().any(|update| {
539            matches!(update, TableUpdate::AddSnapshot { snapshot: snap } if snap.snapshot_id() == snapshot.snapshot_id())
540        });
541        if is_added_snapshot {
542            self.last_updated_ms = Some(snapshot.timestamp_ms());
543        }
544
545        // Current snapshot id is set only for the main branch
546        if ref_name == MAIN_BRANCH {
547            self.metadata.current_snapshot_id = Some(snapshot.snapshot_id());
548            let timestamp_ms = if let Some(last_updated_ms) = self.last_updated_ms {
549                last_updated_ms
550            } else {
551                let last_updated_ms = chrono::Utc::now().timestamp_millis();
552                self.last_updated_ms = Some(last_updated_ms);
553                last_updated_ms
554            };
555
556            self.metadata.snapshot_log.push(SnapshotLog {
557                snapshot_id: snapshot.snapshot_id(),
558                timestamp_ms,
559            });
560        }
561
562        self.changes.push(TableUpdate::SetSnapshotRef {
563            ref_name: ref_name.to_string(),
564            reference: reference.clone(),
565        });
566        self.metadata.refs.insert(ref_name.to_string(), reference);
567
568        Ok(self)
569    }
570
571    /// Remove a reference
572    ///
573    /// If `ref_name='main'` the current snapshot id is set to -1.
574    pub fn remove_ref(mut self, ref_name: &str) -> Self {
575        if ref_name == MAIN_BRANCH {
576            self.metadata.current_snapshot_id = None;
577        }
578
579        if self.metadata.refs.remove(ref_name).is_some() || ref_name == MAIN_BRANCH {
580            self.changes.push(TableUpdate::RemoveSnapshotRef {
581                ref_name: ref_name.to_string(),
582            });
583        }
584
585        self
586    }
587
588    /// Set statistics for a snapshot
589    pub fn set_statistics(mut self, statistics: StatisticsFile) -> Self {
590        self.metadata
591            .statistics
592            .insert(statistics.snapshot_id, statistics.clone());
593        self.changes.push(TableUpdate::SetStatistics {
594            statistics: statistics.clone(),
595        });
596        self
597    }
598
599    /// Remove statistics for a snapshot
600    pub fn remove_statistics(mut self, snapshot_id: i64) -> Self {
601        let previous = self.metadata.statistics.remove(&snapshot_id);
602        if previous.is_some() {
603            self.changes
604                .push(TableUpdate::RemoveStatistics { snapshot_id });
605        }
606        self
607    }
608
609    /// Set partition statistics
610    pub fn set_partition_statistics(
611        mut self,
612        partition_statistics_file: PartitionStatisticsFile,
613    ) -> Self {
614        self.metadata.partition_statistics.insert(
615            partition_statistics_file.snapshot_id,
616            partition_statistics_file.clone(),
617        );
618        self.changes.push(TableUpdate::SetPartitionStatistics {
619            partition_statistics: partition_statistics_file,
620        });
621        self
622    }
623
624    /// Remove partition statistics
625    pub fn remove_partition_statistics(mut self, snapshot_id: i64) -> Self {
626        let previous = self.metadata.partition_statistics.remove(&snapshot_id);
627        if previous.is_some() {
628            self.changes
629                .push(TableUpdate::RemovePartitionStatistics { snapshot_id });
630        }
631        self
632    }
633
634    /// Add a schema to the table metadata.
635    ///
636    /// The provided `schema.schema_id` may not be used.
637    ///
638    /// Important: Use this method with caution. The builder does not check
639    /// if the added schema is compatible with the current schema.
640    pub fn add_schema(mut self, schema: Schema) -> Result<Self> {
641        // Validate that new schema fields don't conflict with existing partition field names
642        self.validate_schema_field_names(&schema)?;
643
644        let new_schema_id = self.reuse_or_create_new_schema_id(&schema);
645        let schema_found = self.metadata.schemas.contains_key(&new_schema_id);
646
647        if schema_found {
648            if self.last_added_schema_id != Some(new_schema_id) {
649                self.changes.push(TableUpdate::AddSchema {
650                    schema: schema.clone(),
651                });
652                self.last_added_schema_id = Some(new_schema_id);
653            }
654
655            return Ok(self);
656        }
657
658        // New schemas might contain only old columns. In this case last_column_id should not be
659        // reduced.
660        self.metadata.last_column_id =
661            std::cmp::max(self.metadata.last_column_id, schema.highest_field_id());
662
663        // Set schema-id
664        let schema = match new_schema_id == schema.schema_id() {
665            true => schema,
666            false => schema.with_schema_id(new_schema_id),
667        };
668
669        self.metadata
670            .schemas
671            .insert(new_schema_id, schema.clone().into());
672
673        self.changes.push(TableUpdate::AddSchema { schema });
674
675        self.last_added_schema_id = Some(new_schema_id);
676
677        Ok(self)
678    }
679
680    /// Set the current schema id.
681    ///
682    /// If `schema_id` is -1, the last added schema is set as the current schema.
683    ///
684    /// Errors:
685    /// - provided `schema_id` is -1 but no schema has been added via `add_schema`.
686    /// - No schema with the provided `schema_id` exists.
687    pub fn set_current_schema(mut self, mut schema_id: i32) -> Result<Self> {
688        if schema_id == Self::LAST_ADDED {
689            schema_id = self.last_added_schema_id.ok_or_else(|| {
690                Error::new(
691                    ErrorKind::DataInvalid,
692                    "Cannot set current schema to last added schema: no schema has been added.",
693                )
694            })?;
695        };
696        let schema_id = schema_id; // Make immutable
697
698        if schema_id == self.metadata.current_schema_id {
699            return Ok(self);
700        }
701
702        let _schema = self.metadata.schemas.get(&schema_id).ok_or_else(|| {
703            Error::new(
704                ErrorKind::DataInvalid,
705                format!("Cannot set current schema to unknown schema with id: '{schema_id}'"),
706            )
707        })?;
708
709        // Old partition specs and sort-orders should be preserved even if they are not compatible with the new schema,
710        // so that older metadata can still be interpreted.
711        // Default partition spec and sort order are checked in the build() method
712        // which allows other default partition specs and sort orders to be set before the build.
713
714        self.metadata.current_schema_id = schema_id;
715
716        if self.last_added_schema_id == Some(schema_id) {
717            self.changes.push(TableUpdate::SetCurrentSchema {
718                schema_id: Self::LAST_ADDED,
719            });
720        } else {
721            self.changes
722                .push(TableUpdate::SetCurrentSchema { schema_id });
723        }
724
725        Ok(self)
726    }
727
728    /// Add a schema and set it as the current schema.
729    pub fn add_current_schema(self, schema: Schema) -> Result<Self> {
730        self.add_schema(schema)?
731            .set_current_schema(Self::LAST_ADDED)
732    }
733
734    /// Validate schema field names against partition field names across all historical schemas.
735    ///
736    /// Due to Iceberg's multi-version property, this check ignores existing schema fields
737    /// that match partition names (schema evolution allows re-adding previously removed fields).
738    /// Only NEW field names that conflict with partition names are rejected.
739    ///
740    /// # Errors
741    /// - Schema field name conflicts with partition field name but doesn't exist in any historical schema.
742    fn validate_schema_field_names(&self, schema: &Schema) -> Result<()> {
743        if self.metadata.schemas.is_empty() {
744            return Ok(());
745        }
746
747        for field_name in schema.field_id_to_name_map().values() {
748            let has_partition_conflict = self.metadata.partition_name_exists(field_name);
749            let is_new_field = !self.metadata.name_exists_in_any_schema(field_name);
750
751            if has_partition_conflict && is_new_field {
752                return Err(Error::new(
753                    ErrorKind::DataInvalid,
754                    format!(
755                        "Cannot add schema field '{field_name}' because it conflicts with existing partition field name. \
756                         Schema evolution cannot introduce field names that match existing partition field names."
757                    ),
758                ));
759            }
760        }
761
762        Ok(())
763    }
764
765    /// Validate partition field names against schema field names across all historical schemas.
766    ///
767    /// Due to Iceberg's multi-version property, partition fields can share names with schema fields
768    /// if they meet specific requirements (identity transform + matching source field ID).
769    /// This validation enforces those rules across all historical schema versions.
770    ///
771    /// # Errors
772    /// - Partition field name conflicts with schema field name but doesn't use identity transform.
773    /// - Partition field uses identity transform but references wrong source field ID.
774    fn validate_partition_field_names(&self, unbound_spec: &UnboundPartitionSpec) -> Result<()> {
775        if self.metadata.schemas.is_empty() {
776            return Ok(());
777        }
778
779        let current_schema = self.get_current_schema()?;
780        for partition_field in unbound_spec.fields() {
781            let exists_in_any_schema = self
782                .metadata
783                .name_exists_in_any_schema(&partition_field.name);
784
785            // Skip if partition field name doesn't conflict with any schema field
786            if !exists_in_any_schema {
787                continue;
788            }
789
790            // If name exists in schemas, validate against current schema rules
791            if let Some(schema_field) = current_schema.field_by_name(&partition_field.name) {
792                let is_identity_transform =
793                    partition_field.transform == crate::spec::Transform::Identity;
794                let has_matching_source_id = schema_field.id == partition_field.source_id;
795
796                if !is_identity_transform {
797                    return Err(Error::new(
798                        ErrorKind::DataInvalid,
799                        format!(
800                            "Cannot create partition with name '{}' that conflicts with schema field and is not an identity transform.",
801                            partition_field.name
802                        ),
803                    ));
804                }
805
806                if !has_matching_source_id {
807                    return Err(Error::new(
808                        ErrorKind::DataInvalid,
809                        format!(
810                            "Cannot create identity partition sourced from different field in schema. \
811                             Field name '{}' has id `{}` in schema but partition source id is `{}`",
812                            partition_field.name, schema_field.id, partition_field.source_id
813                        ),
814                    ));
815                }
816            }
817        }
818
819        Ok(())
820    }
821
822    /// Add a partition spec to the table metadata.
823    ///
824    /// The spec is bound eagerly to the current schema.
825    /// If a schema is added in the same set of changes, the schema should be added first.
826    ///
827    /// Even if `unbound_spec.spec_id` is provided as `Some`, it may not be used.
828    ///
829    /// # Errors
830    /// - The partition spec cannot be bound to the current schema.
831    /// - The partition spec has non-sequential field ids and the table format version is 1.
832    pub fn add_partition_spec(mut self, unbound_spec: UnboundPartitionSpec) -> Result<Self> {
833        let schema = self.get_current_schema()?.clone();
834
835        // Check if partition field names conflict with schema field names across all schemas
836        self.validate_partition_field_names(&unbound_spec)?;
837
838        // Reuse field IDs for equivalent fields from existing partition specs
839        let unbound_spec = self.reuse_partition_field_ids(unbound_spec)?;
840
841        let spec = PartitionSpecBuilder::new_from_unbound(unbound_spec.clone(), schema)?
842            .with_last_assigned_field_id(self.metadata.last_partition_id)
843            .build()?;
844
845        let new_spec_id = self.reuse_or_create_new_spec_id(&spec);
846        let spec_found = self.metadata.partition_specs.contains_key(&new_spec_id);
847        let spec = spec.with_spec_id(new_spec_id);
848        let unbound_spec = unbound_spec.with_spec_id(new_spec_id);
849
850        if spec_found {
851            if self.last_added_spec_id != Some(new_spec_id) {
852                self.changes
853                    .push(TableUpdate::AddSpec { spec: unbound_spec });
854                self.last_added_spec_id = Some(new_spec_id);
855            }
856
857            return Ok(self);
858        }
859
860        if self.metadata.format_version <= FormatVersion::V1 && !spec.has_sequential_ids() {
861            return Err(Error::new(
862                ErrorKind::DataInvalid,
863                "Cannot add partition spec with non-sequential field ids to format version 1 table",
864            ));
865        }
866
867        let highest_field_id = spec
868            .highest_field_id()
869            .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID);
870        self.metadata
871            .partition_specs
872            .insert(new_spec_id, Arc::new(spec));
873        self.changes
874            .push(TableUpdate::AddSpec { spec: unbound_spec });
875
876        self.last_added_spec_id = Some(new_spec_id);
877        self.metadata.last_partition_id =
878            std::cmp::max(self.metadata.last_partition_id, highest_field_id);
879
880        Ok(self)
881    }
882
883    /// Reuse partition field IDs for equivalent fields from existing partition specs.
884    ///
885    /// According to the Iceberg spec, partition field IDs must be reused if an existing
886    /// partition spec contains an equivalent field (same source_id and transform).
887    fn reuse_partition_field_ids(
888        &self,
889        unbound_spec: UnboundPartitionSpec,
890    ) -> Result<UnboundPartitionSpec> {
891        // Build a map of (source_id, transform) -> field_id from existing specs
892        let equivalent_field_ids: HashMap<_, _> = self
893            .metadata
894            .partition_specs
895            .values()
896            .flat_map(|spec| spec.fields())
897            .map(|field| ((field.source_id, &field.transform), field.field_id))
898            .collect();
899
900        // Create new fields with reused field IDs where possible
901        let fields = unbound_spec
902            .fields
903            .into_iter()
904            .map(|mut field| {
905                if field.field_id.is_none()
906                    && let Some(&existing_field_id) =
907                        equivalent_field_ids.get(&(field.source_id, &field.transform))
908                {
909                    field.field_id = Some(existing_field_id);
910                }
911                field
912            })
913            .collect();
914
915        Ok(UnboundPartitionSpec {
916            spec_id: unbound_spec.spec_id,
917            fields,
918        })
919    }
920
921    /// Set the default partition spec.
922    ///
923    /// # Errors
924    /// - spec_id is -1 but no spec has been added via `add_partition_spec`.
925    /// - No partition spec with the provided `spec_id` exists.
926    pub fn set_default_partition_spec(mut self, mut spec_id: i32) -> Result<Self> {
927        if spec_id == Self::LAST_ADDED {
928            spec_id = self.last_added_spec_id.ok_or_else(|| {
929                Error::new(
930                    ErrorKind::DataInvalid,
931                    "Cannot set default partition spec to last added spec: no spec has been added.",
932                )
933            })?;
934        }
935
936        if self.metadata.default_spec.spec_id() == spec_id {
937            return Ok(self);
938        }
939
940        if !self.metadata.partition_specs.contains_key(&spec_id) {
941            return Err(Error::new(
942                ErrorKind::DataInvalid,
943                format!("Cannot set default partition spec to unknown spec with id: '{spec_id}'",),
944            ));
945        }
946
947        let schemaless_spec = self
948            .metadata
949            .partition_specs
950            .get(&spec_id)
951            .ok_or_else(|| {
952                Error::new(
953                    ErrorKind::DataInvalid,
954                    format!(
955                        "Cannot set default partition spec to unknown spec with id: '{spec_id}'",
956                    ),
957                )
958            })?
959            .clone();
960        let spec = Arc::unwrap_or_clone(schemaless_spec);
961        let spec_type = spec.partition_type(self.get_current_schema()?)?;
962        self.metadata.default_spec = Arc::new(spec);
963        self.metadata.default_partition_type = spec_type;
964
965        if self.last_added_spec_id == Some(spec_id) {
966            self.changes.push(TableUpdate::SetDefaultSpec {
967                spec_id: Self::LAST_ADDED,
968            });
969        } else {
970            self.changes.push(TableUpdate::SetDefaultSpec { spec_id });
971        }
972
973        Ok(self)
974    }
975
976    /// Add a partition spec and set it as the default
977    pub fn add_default_partition_spec(self, unbound_spec: UnboundPartitionSpec) -> Result<Self> {
978        self.add_partition_spec(unbound_spec)?
979            .set_default_partition_spec(Self::LAST_ADDED)
980    }
981
982    /// Remove partition specs by their ids from the table metadata.
983    /// Does nothing if a spec id is not present. Active partition specs
984    /// should not be removed.
985    ///
986    /// # Errors
987    /// - Cannot remove the default partition spec.
988    pub fn remove_partition_specs(mut self, spec_ids: &[i32]) -> Result<Self> {
989        if spec_ids.contains(&self.metadata.default_spec.spec_id()) {
990            return Err(Error::new(
991                ErrorKind::DataInvalid,
992                "Cannot remove default partition spec",
993            ));
994        }
995
996        let mut removed_specs = Vec::with_capacity(spec_ids.len());
997        spec_ids.iter().for_each(|id| {
998            if self.metadata.partition_specs.remove(id).is_some() {
999                removed_specs.push(*id);
1000            }
1001        });
1002
1003        if !removed_specs.is_empty() {
1004            self.changes.push(TableUpdate::RemovePartitionSpecs {
1005                spec_ids: removed_specs,
1006            });
1007        }
1008
1009        Ok(self)
1010    }
1011
1012    /// Add a sort order to the table metadata.
1013    ///
1014    /// The spec is bound eagerly to the current schema and must be valid for it.
1015    /// If a schema is added in the same set of changes, the schema should be added first.
1016    ///
1017    /// Even if `sort_order.order_id` is provided, it may not be used.
1018    ///
1019    /// # Errors
1020    /// - Sort order id to add already exists.
1021    /// - Sort order is incompatible with the current schema.
1022    pub fn add_sort_order(mut self, sort_order: SortOrder) -> Result<Self> {
1023        let new_order_id = self.reuse_or_create_new_sort_id(&sort_order);
1024        let sort_order_found = self.metadata.sort_orders.contains_key(&new_order_id);
1025
1026        if sort_order_found {
1027            if self.last_added_order_id != Some(new_order_id) {
1028                self.changes.push(TableUpdate::AddSortOrder {
1029                    sort_order: sort_order.clone().with_order_id(new_order_id),
1030                });
1031                self.last_added_order_id = Some(new_order_id);
1032            }
1033
1034            return Ok(self);
1035        }
1036
1037        let schema = self.get_current_schema()?.clone().as_ref().clone();
1038        let sort_order = SortOrder::builder()
1039            .with_order_id(new_order_id)
1040            .with_fields(sort_order.fields)
1041            .build(&schema)
1042            .map_err(|e| {
1043                Error::new(
1044                    ErrorKind::DataInvalid,
1045                    format!("Sort order to add is incompatible with current schema: {e}"),
1046                )
1047                .with_source(e)
1048            })?;
1049
1050        self.last_added_order_id = Some(new_order_id);
1051        self.metadata
1052            .sort_orders
1053            .insert(new_order_id, sort_order.clone().into());
1054        self.changes.push(TableUpdate::AddSortOrder { sort_order });
1055
1056        Ok(self)
1057    }
1058
1059    /// Set the default sort order. If `sort_order_id` is -1, the last added sort order is set as default.
1060    ///
1061    /// # Errors
1062    /// - sort_order_id is -1 but no sort order has been added via `add_sort_order`.
1063    /// - No sort order with the provided `sort_order_id` exists.
1064    pub fn set_default_sort_order(mut self, mut sort_order_id: i64) -> Result<Self> {
1065        if sort_order_id == Self::LAST_ADDED as i64 {
1066            sort_order_id = self.last_added_order_id.ok_or_else(|| {
1067                Error::new(
1068                    ErrorKind::DataInvalid,
1069                    "Cannot set default sort order to last added order: no order has been added.",
1070                )
1071            })?;
1072        }
1073
1074        if self.metadata.default_sort_order_id == sort_order_id {
1075            return Ok(self);
1076        }
1077
1078        if !self.metadata.sort_orders.contains_key(&sort_order_id) {
1079            return Err(Error::new(
1080                ErrorKind::DataInvalid,
1081                format!(
1082                    "Cannot set default sort order to unknown order with id: '{sort_order_id}'"
1083                ),
1084            ));
1085        }
1086
1087        self.metadata.default_sort_order_id = sort_order_id;
1088
1089        if self.last_added_order_id == Some(sort_order_id) {
1090            self.changes.push(TableUpdate::SetDefaultSortOrder {
1091                sort_order_id: Self::LAST_ADDED as i64,
1092            });
1093        } else {
1094            self.changes
1095                .push(TableUpdate::SetDefaultSortOrder { sort_order_id });
1096        }
1097
1098        Ok(self)
1099    }
1100
1101    /// Add a sort order and set it as the default
1102    fn add_default_sort_order(self, sort_order: SortOrder) -> Result<Self> {
1103        self.add_sort_order(sort_order)?
1104            .set_default_sort_order(Self::LAST_ADDED as i64)
1105    }
1106
1107    /// Add an encryption key to the table metadata.
1108    pub fn add_encryption_key(mut self, key: EncryptedKey) -> Self {
1109        let key_id = key.key_id().to_string();
1110        if self.metadata.encryption_keys.contains_key(&key_id) {
1111            // already exists
1112            return self;
1113        }
1114
1115        self.metadata.encryption_keys.insert(key_id, key.clone());
1116        self.changes.push(TableUpdate::AddEncryptionKey {
1117            encryption_key: key,
1118        });
1119        self
1120    }
1121
1122    /// Remove an encryption key from the table metadata.
1123    pub fn remove_encryption_key(mut self, key_id: &str) -> Self {
1124        if self.metadata.encryption_keys.remove(key_id).is_some() {
1125            self.changes.push(TableUpdate::RemoveEncryptionKey {
1126                key_id: key_id.to_string(),
1127            });
1128        }
1129        self
1130    }
1131
1132    /// Build the table metadata.
1133    pub fn build(mut self) -> Result<TableMetadataBuildResult> {
1134        self.metadata.last_updated_ms = self
1135            .last_updated_ms
1136            .unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
1137
1138        // Check compatibility of the current schema to the default partition spec and sort order.
1139        // We use the `get_xxx` methods from the builder to avoid using the panicking
1140        // `TableMetadata.default_partition_spec` etc. methods.
1141        let schema = self.get_current_schema()?.clone();
1142        let sort_order = Arc::unwrap_or_clone(self.get_default_sort_order()?);
1143
1144        self.metadata.default_spec = Arc::new(
1145            Arc::unwrap_or_clone(self.metadata.default_spec)
1146                .into_unbound()
1147                .bind(schema.clone())?,
1148        );
1149        self.metadata.default_partition_type =
1150            self.metadata.default_spec.partition_type(&schema)?;
1151        SortOrder::builder()
1152            .with_fields(sort_order.fields)
1153            .build(&schema)?;
1154
1155        self.update_snapshot_log()?;
1156        self.metadata.try_normalize()?;
1157
1158        if let Some(hist_entry) = self.previous_history_entry.take() {
1159            self.metadata.metadata_log.push(hist_entry);
1160        }
1161        let expired_metadata_logs = self.expire_metadata_log();
1162
1163        Ok(TableMetadataBuildResult {
1164            metadata: self.metadata,
1165            changes: self.changes,
1166            expired_metadata_logs,
1167        })
1168    }
1169
1170    fn expire_metadata_log(&mut self) -> Vec<MetadataLog> {
1171        let max_size = self
1172            .metadata
1173            .properties
1174            .get(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX)
1175            .and_then(|v| v.parse::<usize>().ok())
1176            .unwrap_or(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)
1177            .max(1);
1178
1179        if self.metadata.metadata_log.len() > max_size {
1180            self.metadata
1181                .metadata_log
1182                .drain(0..self.metadata.metadata_log.len() - max_size)
1183                .collect()
1184        } else {
1185            Vec::new()
1186        }
1187    }
1188
1189    fn update_snapshot_log(&mut self) -> Result<()> {
1190        let intermediate_snapshots = self.get_intermediate_snapshots();
1191        let has_removed_snapshots = self
1192            .changes
1193            .iter()
1194            .any(|update| matches!(update, TableUpdate::RemoveSnapshots { .. }));
1195
1196        if intermediate_snapshots.is_empty() && !has_removed_snapshots {
1197            return Ok(());
1198        }
1199
1200        let mut new_snapshot_log = Vec::new();
1201        for log_entry in &self.metadata.snapshot_log {
1202            let snapshot_id = log_entry.snapshot_id;
1203            if self.metadata.snapshots.contains_key(&snapshot_id) {
1204                if !intermediate_snapshots.contains(&snapshot_id) {
1205                    new_snapshot_log.push(log_entry.clone());
1206                }
1207            } else if has_removed_snapshots {
1208                // any invalid entry causes the history before it to be removed. otherwise, there could be
1209                // history gaps that cause time-travel queries to produce incorrect results. for example,
1210                // if history is [(t1, s1), (t2, s2), (t3, s3)] and s2 is removed, the history cannot be
1211                // [(t1, s1), (t3, s3)] because it appears that s3 was current during the time between t2
1212                // and t3 when in fact s2 was the current snapshot.
1213                new_snapshot_log.clear();
1214            }
1215        }
1216
1217        if let Some(current_snapshot_id) = self.metadata.current_snapshot_id {
1218            let last_id = new_snapshot_log.last().map(|entry| entry.snapshot_id);
1219            if last_id != Some(current_snapshot_id) {
1220                return Err(Error::new(
1221                    ErrorKind::DataInvalid,
1222                    "Cannot set invalid snapshot log: latest entry is not the current snapshot",
1223                ));
1224            }
1225        };
1226
1227        self.metadata.snapshot_log = new_snapshot_log;
1228        Ok(())
1229    }
1230
1231    /// Finds intermediate snapshots that have not been committed as the current snapshot.
1232    ///
1233    /// Transactions can create snapshots that are never the current snapshot because several
1234    /// changes are combined by the transaction into one table metadata update. when each
1235    /// intermediate snapshot is added to table metadata, it is added to the snapshot log, assuming
1236    /// that it will be the current snapshot. when there are multiple snapshot updates, the log must
1237    /// be corrected by suppressing the intermediate snapshot entries.
1238    ///     
1239    /// A snapshot is an intermediate snapshot if it was added but is not the current snapshot.
1240    fn get_intermediate_snapshots(&self) -> HashSet<i64> {
1241        let added_snapshot_ids = self
1242            .changes
1243            .iter()
1244            .filter_map(|update| match update {
1245                TableUpdate::AddSnapshot { snapshot } => Some(snapshot.snapshot_id()),
1246                _ => None,
1247            })
1248            .collect::<HashSet<_>>();
1249
1250        self.changes
1251            .iter()
1252            .filter_map(|update| match update {
1253                TableUpdate::SetSnapshotRef {
1254                    ref_name,
1255                    reference,
1256                } => {
1257                    if added_snapshot_ids.contains(&reference.snapshot_id)
1258                        && ref_name == MAIN_BRANCH
1259                        && reference.snapshot_id
1260                            != self
1261                                .metadata
1262                                .current_snapshot_id
1263                                .unwrap_or(i64::from(Self::LAST_ADDED))
1264                    {
1265                        Some(reference.snapshot_id)
1266                    } else {
1267                        None
1268                    }
1269                }
1270                _ => None,
1271            })
1272            .collect()
1273    }
1274
1275    fn reassign_ids(
1276        schema: Schema,
1277        spec: UnboundPartitionSpec,
1278        sort_order: SortOrder,
1279    ) -> Result<(Schema, PartitionSpec, SortOrder)> {
1280        // Re-assign field ids and schema ids for a new table.
1281        let previous_id_to_name = schema.field_id_to_name_map().clone();
1282        let fresh_schema = schema
1283            .into_builder()
1284            .with_schema_id(DEFAULT_SCHEMA_ID)
1285            .with_reassigned_field_ids(FIRST_FIELD_ID)
1286            .build()?;
1287
1288        // Re-build partition spec with new ids
1289        let mut fresh_spec = PartitionSpecBuilder::new(fresh_schema.clone());
1290        for field in spec.fields() {
1291            let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| {
1292                Error::new(
1293                    ErrorKind::DataInvalid,
1294                    format!(
1295                        "Cannot find source column with id {} for partition column {} in schema.",
1296                        field.source_id, field.name
1297                    ),
1298                )
1299            })?;
1300            fresh_spec =
1301                fresh_spec.add_partition_field(source_field_name, &field.name, field.transform)?;
1302        }
1303        let fresh_spec = fresh_spec.build()?;
1304
1305        // Re-build sort order with new ids
1306        let mut fresh_order = SortOrder::builder();
1307        for mut field in sort_order.fields {
1308            let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| {
1309                Error::new(
1310                    ErrorKind::DataInvalid,
1311                    format!(
1312                        "Cannot find source column with id {} for sort column in schema.",
1313                        field.source_id
1314                    ),
1315                )
1316            })?;
1317            let new_field_id = fresh_schema
1318                       .field_by_name(source_field_name)
1319                       .ok_or_else(|| {
1320                           Error::new(
1321                               ErrorKind::Unexpected,
1322                               format!(
1323                                   "Cannot find source column with name {source_field_name} for sort column in re-assigned schema."
1324                               ),
1325                           )
1326                       })?.id;
1327            field.source_id = new_field_id;
1328            fresh_order.with_sort_field(field);
1329        }
1330        let fresh_sort_order = fresh_order.build(&fresh_schema)?;
1331
1332        Ok((fresh_schema, fresh_spec, fresh_sort_order))
1333    }
1334
1335    fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> i32 {
1336        self.metadata
1337            .schemas
1338            .iter()
1339            .find_map(|(id, schema)| new_schema.is_same_schema(schema).then_some(*id))
1340            .unwrap_or_else(|| self.get_highest_schema_id() + 1)
1341    }
1342
1343    fn get_highest_schema_id(&self) -> i32 {
1344        *self
1345            .metadata
1346            .schemas
1347            .keys()
1348            .max()
1349            .unwrap_or(&self.metadata.current_schema_id)
1350    }
1351
1352    fn get_current_schema(&self) -> Result<&SchemaRef> {
1353        self.metadata
1354            .schemas
1355            .get(&self.metadata.current_schema_id)
1356            .ok_or_else(|| {
1357                Error::new(
1358                    ErrorKind::DataInvalid,
1359                    format!(
1360                        "Current schema with id '{}' not found in table metadata.",
1361                        self.metadata.current_schema_id
1362                    ),
1363                )
1364            })
1365    }
1366
1367    fn get_default_sort_order(&self) -> Result<SortOrderRef> {
1368        self.metadata
1369            .sort_orders
1370            .get(&self.metadata.default_sort_order_id)
1371            .cloned()
1372            .ok_or_else(|| {
1373                Error::new(
1374                    ErrorKind::DataInvalid,
1375                    format!(
1376                        "Default sort order with id '{}' not found in table metadata.",
1377                        self.metadata.default_sort_order_id
1378                    ),
1379                )
1380            })
1381    }
1382
1383    /// If a compatible spec already exists, use the same ID. Otherwise, use 1 more than the highest ID.
1384    fn reuse_or_create_new_spec_id(&self, new_spec: &PartitionSpec) -> i32 {
1385        self.metadata
1386            .partition_specs
1387            .iter()
1388            .find_map(|(id, old_spec)| new_spec.is_compatible_with(old_spec).then_some(*id))
1389            .unwrap_or_else(|| {
1390                self.get_highest_spec_id()
1391                    .map(|id| id + 1)
1392                    .unwrap_or(DEFAULT_PARTITION_SPEC_ID)
1393            })
1394    }
1395
1396    fn get_highest_spec_id(&self) -> Option<i32> {
1397        self.metadata.partition_specs.keys().max().copied()
1398    }
1399
1400    /// If a compatible sort-order already exists, use the same ID. Otherwise, use 1 more than the highest ID.
1401    fn reuse_or_create_new_sort_id(&self, new_sort_order: &SortOrder) -> i64 {
1402        if new_sort_order.is_unsorted() {
1403            return SortOrder::unsorted_order().order_id;
1404        }
1405
1406        self.metadata
1407            .sort_orders
1408            .iter()
1409            .find_map(|(id, sort_order)| {
1410                sort_order.fields.eq(&new_sort_order.fields).then_some(*id)
1411            })
1412            .unwrap_or_else(|| {
1413                self.highest_sort_order_id()
1414                    .unwrap_or(SortOrder::unsorted_order().order_id)
1415                    + 1
1416            })
1417    }
1418
1419    fn highest_sort_order_id(&self) -> Option<i64> {
1420        self.metadata.sort_orders.keys().max().copied()
1421    }
1422
1423    /// Remove schemas by their ids from the table metadata.
1424    /// Does nothing if a schema id is not present. Active schemas should not be removed.
1425    pub fn remove_schemas(mut self, schema_id_to_remove: &[i32]) -> Result<Self> {
1426        if schema_id_to_remove.contains(&self.metadata.current_schema_id) {
1427            return Err(Error::new(
1428                ErrorKind::DataInvalid,
1429                "Cannot remove current schema",
1430            ));
1431        }
1432
1433        if schema_id_to_remove.is_empty() {
1434            return Ok(self);
1435        }
1436
1437        let mut removed_schemas = Vec::with_capacity(schema_id_to_remove.len());
1438        self.metadata.schemas.retain(|id, _schema| {
1439            if schema_id_to_remove.contains(id) {
1440                removed_schemas.push(*id);
1441                false
1442            } else {
1443                true
1444            }
1445        });
1446
1447        self.changes.push(TableUpdate::RemoveSchemas {
1448            schema_ids: removed_schemas,
1449        });
1450
1451        Ok(self)
1452    }
1453}
1454
1455impl From<TableMetadataBuildResult> for TableMetadata {
1456    fn from(result: TableMetadataBuildResult) -> Self {
1457        result.metadata
1458    }
1459}
1460
1461#[cfg(test)]
1462mod tests {
1463    use std::fs::File;
1464    use std::io::BufReader;
1465    use std::thread::sleep;
1466
1467    use super::*;
1468    use crate::TableIdent;
1469    use crate::io::FileIOBuilder;
1470    use crate::spec::{
1471        BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec, PrimitiveType, Schema,
1472        SnapshotRetention, SortDirection, SortField, StructType, Summary, TableProperties,
1473        Transform, Type, UnboundPartitionField,
1474    };
1475    use crate::table::Table;
1476
1477    const TEST_LOCATION: &str = "s3://bucket/test/location";
1478    const LAST_ASSIGNED_COLUMN_ID: i32 = 3;
1479
1480    fn schema() -> Schema {
1481        Schema::builder()
1482            .with_fields(vec![
1483                NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1484                NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
1485                NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
1486            ])
1487            .build()
1488            .unwrap()
1489    }
1490
1491    fn sort_order() -> SortOrder {
1492        let schema = schema();
1493        SortOrder::builder()
1494            .with_order_id(1)
1495            .with_sort_field(SortField {
1496                source_id: 3,
1497                transform: Transform::Bucket(4),
1498                direction: SortDirection::Descending,
1499                null_order: NullOrder::First,
1500            })
1501            .build(&schema)
1502            .unwrap()
1503    }
1504
1505    fn partition_spec() -> UnboundPartitionSpec {
1506        UnboundPartitionSpec::builder()
1507            .with_spec_id(0)
1508            .add_partition_field(2, "y", Transform::Identity)
1509            .unwrap()
1510            .build()
1511    }
1512
1513    fn builder_without_changes(format_version: FormatVersion) -> TableMetadataBuilder {
1514        TableMetadataBuilder::new(
1515            schema(),
1516            partition_spec(),
1517            sort_order(),
1518            TEST_LOCATION.to_string(),
1519            format_version,
1520            HashMap::new(),
1521        )
1522        .unwrap()
1523        .build()
1524        .unwrap()
1525        .metadata
1526        .into_builder(Some(
1527            "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1528        ))
1529    }
1530
1531    #[test]
1532    fn test_minimal_build() {
1533        let metadata = TableMetadataBuilder::new(
1534            schema(),
1535            partition_spec(),
1536            sort_order(),
1537            TEST_LOCATION.to_string(),
1538            FormatVersion::V1,
1539            HashMap::new(),
1540        )
1541        .unwrap()
1542        .build()
1543        .unwrap()
1544        .metadata;
1545
1546        assert_eq!(metadata.format_version, FormatVersion::V1);
1547        assert_eq!(metadata.location, TEST_LOCATION);
1548        assert_eq!(metadata.current_schema_id, 0);
1549        assert_eq!(metadata.default_spec.spec_id(), 0);
1550        assert_eq!(metadata.default_sort_order_id, 1);
1551        assert_eq!(metadata.last_partition_id, 1000);
1552        assert_eq!(metadata.last_column_id, 3);
1553        assert_eq!(metadata.snapshots.len(), 0);
1554        assert_eq!(metadata.current_snapshot_id, None);
1555        assert_eq!(metadata.refs.len(), 0);
1556        assert_eq!(metadata.properties.len(), 0);
1557        assert_eq!(metadata.metadata_log.len(), 0);
1558        assert_eq!(metadata.last_sequence_number, 0);
1559        assert_eq!(metadata.last_column_id, LAST_ASSIGNED_COLUMN_ID);
1560
1561        // Test can serialize v1
1562        let _ = serde_json::to_string(&metadata).unwrap();
1563
1564        // Test can serialize v2
1565        let metadata = metadata
1566            .into_builder(Some(
1567                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1568            ))
1569            .upgrade_format_version(FormatVersion::V2)
1570            .unwrap()
1571            .build()
1572            .unwrap()
1573            .metadata;
1574
1575        assert_eq!(metadata.format_version, FormatVersion::V2);
1576        let _ = serde_json::to_string(&metadata).unwrap();
1577    }
1578
1579    #[test]
1580    fn test_build_unpartitioned_unsorted() {
1581        let schema = Schema::builder().build().unwrap();
1582        let metadata = TableMetadataBuilder::new(
1583            schema.clone(),
1584            PartitionSpec::unpartition_spec(),
1585            SortOrder::unsorted_order(),
1586            TEST_LOCATION.to_string(),
1587            FormatVersion::V2,
1588            HashMap::new(),
1589        )
1590        .unwrap()
1591        .build()
1592        .unwrap()
1593        .metadata;
1594
1595        assert_eq!(metadata.format_version, FormatVersion::V2);
1596        assert_eq!(metadata.location, TEST_LOCATION);
1597        assert_eq!(metadata.current_schema_id, 0);
1598        assert_eq!(metadata.default_spec.spec_id(), 0);
1599        assert_eq!(metadata.default_sort_order_id, 0);
1600        assert_eq!(metadata.last_partition_id, UNPARTITIONED_LAST_ASSIGNED_ID);
1601        assert_eq!(metadata.last_column_id, 0);
1602        assert_eq!(metadata.snapshots.len(), 0);
1603        assert_eq!(metadata.current_snapshot_id, None);
1604        assert_eq!(metadata.refs.len(), 0);
1605        assert_eq!(metadata.properties.len(), 0);
1606        assert_eq!(metadata.metadata_log.len(), 0);
1607        assert_eq!(metadata.last_sequence_number, 0);
1608    }
1609
1610    #[test]
1611    fn test_reassigns_ids() {
1612        let schema = Schema::builder()
1613            .with_schema_id(10)
1614            .with_fields(vec![
1615                NestedField::required(11, "a", Type::Primitive(PrimitiveType::Long)).into(),
1616                NestedField::required(12, "b", Type::Primitive(PrimitiveType::Long)).into(),
1617                NestedField::required(
1618                    13,
1619                    "struct",
1620                    Type::Struct(StructType::new(vec![
1621                        NestedField::required(14, "nested", Type::Primitive(PrimitiveType::Long))
1622                            .into(),
1623                    ])),
1624                )
1625                .into(),
1626                NestedField::required(15, "c", Type::Primitive(PrimitiveType::Long)).into(),
1627            ])
1628            .build()
1629            .unwrap();
1630        let spec = PartitionSpec::builder(schema.clone())
1631            .with_spec_id(20)
1632            .add_partition_field("a", "a", Transform::Identity)
1633            .unwrap()
1634            .add_partition_field("struct.nested", "nested_partition", Transform::Identity)
1635            .unwrap()
1636            .build()
1637            .unwrap();
1638        let sort_order = SortOrder::builder()
1639            .with_fields(vec![SortField {
1640                source_id: 11,
1641                transform: Transform::Identity,
1642                direction: SortDirection::Ascending,
1643                null_order: NullOrder::First,
1644            }])
1645            .with_order_id(10)
1646            .build(&schema)
1647            .unwrap();
1648
1649        let (fresh_schema, fresh_spec, fresh_sort_order) =
1650            TableMetadataBuilder::reassign_ids(schema, spec.into_unbound(), sort_order).unwrap();
1651
1652        let expected_schema = Schema::builder()
1653            .with_fields(vec![
1654                NestedField::required(1, "a", Type::Primitive(PrimitiveType::Long)).into(),
1655                NestedField::required(2, "b", Type::Primitive(PrimitiveType::Long)).into(),
1656                NestedField::required(
1657                    3,
1658                    "struct",
1659                    Type::Struct(StructType::new(vec![
1660                        NestedField::required(5, "nested", Type::Primitive(PrimitiveType::Long))
1661                            .into(),
1662                    ])),
1663                )
1664                .into(),
1665                NestedField::required(4, "c", Type::Primitive(PrimitiveType::Long)).into(),
1666            ])
1667            .build()
1668            .unwrap();
1669
1670        let expected_spec = PartitionSpec::builder(expected_schema.clone())
1671            .with_spec_id(0)
1672            .add_partition_field("a", "a", Transform::Identity)
1673            .unwrap()
1674            .add_partition_field("struct.nested", "nested_partition", Transform::Identity)
1675            .unwrap()
1676            .build()
1677            .unwrap();
1678
1679        let expected_sort_order = SortOrder::builder()
1680            .with_fields(vec![SortField {
1681                source_id: 1,
1682                transform: Transform::Identity,
1683                direction: SortDirection::Ascending,
1684                null_order: NullOrder::First,
1685            }])
1686            .with_order_id(1)
1687            .build(&expected_schema)
1688            .unwrap();
1689
1690        assert_eq!(fresh_schema, expected_schema);
1691        assert_eq!(fresh_spec, expected_spec);
1692        assert_eq!(fresh_sort_order, expected_sort_order);
1693    }
1694
1695    #[test]
1696    fn test_ids_are_reassigned_for_new_metadata() {
1697        let schema = schema().into_builder().with_schema_id(10).build().unwrap();
1698
1699        let metadata = TableMetadataBuilder::new(
1700            schema,
1701            partition_spec(),
1702            sort_order(),
1703            TEST_LOCATION.to_string(),
1704            FormatVersion::V1,
1705            HashMap::new(),
1706        )
1707        .unwrap()
1708        .build()
1709        .unwrap()
1710        .metadata;
1711
1712        assert_eq!(metadata.current_schema_id, 0);
1713        assert_eq!(metadata.current_schema().schema_id(), 0);
1714    }
1715
1716    #[test]
1717    fn test_new_metadata_changes() {
1718        let changes = TableMetadataBuilder::new(
1719            schema(),
1720            partition_spec(),
1721            sort_order(),
1722            TEST_LOCATION.to_string(),
1723            FormatVersion::V1,
1724            HashMap::from_iter(vec![("property 1".to_string(), "value 1".to_string())]),
1725        )
1726        .unwrap()
1727        .build()
1728        .unwrap()
1729        .changes;
1730
1731        pretty_assertions::assert_eq!(changes, vec![
1732            TableUpdate::SetLocation {
1733                location: TEST_LOCATION.to_string()
1734            },
1735            TableUpdate::AddSchema { schema: schema() },
1736            TableUpdate::SetCurrentSchema { schema_id: -1 },
1737            TableUpdate::AddSpec {
1738                // Because this is a new tables, field-ids are assigned
1739                // partition_spec() has None set for field-id
1740                spec: PartitionSpec::builder(schema())
1741                    .with_spec_id(0)
1742                    .add_unbound_field(UnboundPartitionField {
1743                        name: "y".to_string(),
1744                        transform: Transform::Identity,
1745                        source_id: 2,
1746                        field_id: Some(1000)
1747                    })
1748                    .unwrap()
1749                    .build()
1750                    .unwrap()
1751                    .into_unbound(),
1752            },
1753            TableUpdate::SetDefaultSpec { spec_id: -1 },
1754            TableUpdate::AddSortOrder {
1755                sort_order: sort_order(),
1756            },
1757            TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
1758            TableUpdate::SetProperties {
1759                updates: HashMap::from_iter(vec![(
1760                    "property 1".to_string(),
1761                    "value 1".to_string()
1762                )]),
1763            }
1764        ]);
1765    }
1766
1767    #[test]
1768    fn test_new_metadata_changes_unpartitioned_unsorted() {
1769        let schema = Schema::builder().build().unwrap();
1770        let changes = TableMetadataBuilder::new(
1771            schema.clone(),
1772            PartitionSpec::unpartition_spec().into_unbound(),
1773            SortOrder::unsorted_order(),
1774            TEST_LOCATION.to_string(),
1775            FormatVersion::V1,
1776            HashMap::new(),
1777        )
1778        .unwrap()
1779        .build()
1780        .unwrap()
1781        .changes;
1782
1783        pretty_assertions::assert_eq!(changes, vec![
1784            TableUpdate::SetLocation {
1785                location: TEST_LOCATION.to_string()
1786            },
1787            TableUpdate::AddSchema {
1788                schema: Schema::builder().build().unwrap(),
1789            },
1790            TableUpdate::SetCurrentSchema { schema_id: -1 },
1791            TableUpdate::AddSpec {
1792                // Because this is a new tables, field-ids are assigned
1793                // partition_spec() has None set for field-id
1794                spec: PartitionSpec::builder(schema)
1795                    .with_spec_id(0)
1796                    .build()
1797                    .unwrap()
1798                    .into_unbound(),
1799            },
1800            TableUpdate::SetDefaultSpec { spec_id: -1 },
1801            TableUpdate::AddSortOrder {
1802                sort_order: SortOrder::unsorted_order(),
1803            },
1804            TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
1805        ]);
1806    }
1807
1808    #[test]
1809    fn test_add_partition_spec() {
1810        let builder = builder_without_changes(FormatVersion::V2);
1811
1812        let added_spec = UnboundPartitionSpec::builder()
1813            .with_spec_id(10)
1814            .add_partition_fields(vec![
1815                UnboundPartitionField {
1816                    // The previous field - has field_id set
1817                    name: "y".to_string(),
1818                    transform: Transform::Identity,
1819                    source_id: 2,
1820                    field_id: Some(1000),
1821                },
1822                UnboundPartitionField {
1823                    // A new field without field id - should still be without field id in changes
1824                    name: "z".to_string(),
1825                    transform: Transform::Identity,
1826                    source_id: 3,
1827                    field_id: None,
1828                },
1829            ])
1830            .unwrap()
1831            .build();
1832
1833        let build_result = builder
1834            .add_partition_spec(added_spec.clone())
1835            .unwrap()
1836            .build()
1837            .unwrap();
1838
1839        // Spec id should be re-assigned
1840        let expected_change = added_spec.with_spec_id(1);
1841        let expected_spec = PartitionSpec::builder(schema())
1842            .with_spec_id(1)
1843            .add_unbound_field(UnboundPartitionField {
1844                name: "y".to_string(),
1845                transform: Transform::Identity,
1846                source_id: 2,
1847                field_id: Some(1000),
1848            })
1849            .unwrap()
1850            .add_unbound_field(UnboundPartitionField {
1851                name: "z".to_string(),
1852                transform: Transform::Identity,
1853                source_id: 3,
1854                field_id: Some(1001),
1855            })
1856            .unwrap()
1857            .build()
1858            .unwrap();
1859
1860        assert_eq!(build_result.changes.len(), 1);
1861        assert_eq!(
1862            build_result.metadata.partition_spec_by_id(1),
1863            Some(&Arc::new(expected_spec))
1864        );
1865        assert_eq!(build_result.metadata.default_spec.spec_id(), 0);
1866        assert_eq!(build_result.metadata.last_partition_id, 1001);
1867        pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSpec {
1868            spec: expected_change
1869        });
1870
1871        // Remove the spec
1872        let build_result = build_result
1873            .metadata
1874            .into_builder(Some(
1875                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1876            ))
1877            .remove_partition_specs(&[1])
1878            .unwrap()
1879            .build()
1880            .unwrap();
1881
1882        assert_eq!(build_result.changes.len(), 1);
1883        assert_eq!(build_result.metadata.partition_specs.len(), 1);
1884        assert!(build_result.metadata.partition_spec_by_id(1).is_none());
1885    }
1886
1887    #[test]
1888    fn test_set_default_partition_spec() {
1889        let builder = builder_without_changes(FormatVersion::V2);
1890        let schema = builder.get_current_schema().unwrap().clone();
1891        let added_spec = UnboundPartitionSpec::builder()
1892            .with_spec_id(10)
1893            .add_partition_field(1, "y_bucket[2]", Transform::Bucket(2))
1894            .unwrap()
1895            .build();
1896
1897        let build_result = builder
1898            .add_partition_spec(added_spec.clone())
1899            .unwrap()
1900            .set_default_partition_spec(-1)
1901            .unwrap()
1902            .build()
1903            .unwrap();
1904
1905        let expected_spec = PartitionSpec::builder(schema)
1906            .with_spec_id(1)
1907            .add_unbound_field(UnboundPartitionField {
1908                name: "y_bucket[2]".to_string(),
1909                transform: Transform::Bucket(2),
1910                source_id: 1,
1911                field_id: Some(1001),
1912            })
1913            .unwrap()
1914            .build()
1915            .unwrap();
1916
1917        assert_eq!(build_result.changes.len(), 2);
1918        assert_eq!(build_result.metadata.default_spec, Arc::new(expected_spec));
1919        assert_eq!(build_result.changes, vec![
1920            TableUpdate::AddSpec {
1921                // Should contain the actual ID that was used
1922                spec: added_spec.with_spec_id(1)
1923            },
1924            TableUpdate::SetDefaultSpec { spec_id: -1 }
1925        ]);
1926    }
1927
1928    #[test]
1929    fn test_set_existing_default_partition_spec() {
1930        let builder = builder_without_changes(FormatVersion::V2);
1931        // Add and set an unbound spec as current
1932        let unbound_spec = UnboundPartitionSpec::builder().with_spec_id(1).build();
1933        let build_result = builder
1934            .add_partition_spec(unbound_spec.clone())
1935            .unwrap()
1936            .set_default_partition_spec(-1)
1937            .unwrap()
1938            .build()
1939            .unwrap();
1940
1941        assert_eq!(build_result.changes.len(), 2);
1942        assert_eq!(build_result.changes[0], TableUpdate::AddSpec {
1943            spec: unbound_spec.clone()
1944        });
1945        assert_eq!(build_result.changes[1], TableUpdate::SetDefaultSpec {
1946            spec_id: -1
1947        });
1948        assert_eq!(
1949            build_result.metadata.default_spec,
1950            Arc::new(
1951                unbound_spec
1952                    .bind(build_result.metadata.current_schema().clone())
1953                    .unwrap()
1954            )
1955        );
1956
1957        // Set old spec again
1958        let build_result = build_result
1959            .metadata
1960            .into_builder(Some(
1961                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1962            ))
1963            .set_default_partition_spec(0)
1964            .unwrap()
1965            .build()
1966            .unwrap();
1967
1968        assert_eq!(build_result.changes.len(), 1);
1969        assert_eq!(build_result.changes[0], TableUpdate::SetDefaultSpec {
1970            spec_id: 0
1971        });
1972        assert_eq!(
1973            build_result.metadata.default_spec,
1974            Arc::new(
1975                partition_spec()
1976                    .bind(build_result.metadata.current_schema().clone())
1977                    .unwrap()
1978            )
1979        );
1980    }
1981
1982    #[test]
1983    fn test_add_sort_order() {
1984        let builder = builder_without_changes(FormatVersion::V2);
1985
1986        let added_sort_order = SortOrder::builder()
1987            .with_order_id(10)
1988            .with_fields(vec![SortField {
1989                source_id: 1,
1990                transform: Transform::Identity,
1991                direction: SortDirection::Ascending,
1992                null_order: NullOrder::First,
1993            }])
1994            .build(&schema())
1995            .unwrap();
1996
1997        let build_result = builder
1998            .add_sort_order(added_sort_order.clone())
1999            .unwrap()
2000            .build()
2001            .unwrap();
2002
2003        let expected_sort_order = added_sort_order.with_order_id(2);
2004
2005        assert_eq!(build_result.changes.len(), 1);
2006        assert_eq!(build_result.metadata.sort_orders.keys().max(), Some(&2));
2007        pretty_assertions::assert_eq!(
2008            build_result.metadata.sort_order_by_id(2),
2009            Some(&Arc::new(expected_sort_order.clone()))
2010        );
2011        pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSortOrder {
2012            sort_order: expected_sort_order
2013        });
2014    }
2015
2016    #[test]
2017    fn test_add_compatible_schema() {
2018        let builder = builder_without_changes(FormatVersion::V2);
2019
2020        let added_schema = Schema::builder()
2021            .with_schema_id(1)
2022            .with_fields(vec![
2023                NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
2024                NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
2025                NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
2026                NestedField::required(4, "a", Type::Primitive(PrimitiveType::Long)).into(),
2027            ])
2028            .build()
2029            .unwrap();
2030
2031        let build_result = builder
2032            .add_current_schema(added_schema.clone())
2033            .unwrap()
2034            .build()
2035            .unwrap();
2036
2037        assert_eq!(build_result.changes.len(), 2);
2038        assert_eq!(build_result.metadata.schemas.keys().max(), Some(&1));
2039        pretty_assertions::assert_eq!(
2040            build_result.metadata.schema_by_id(1),
2041            Some(&Arc::new(added_schema.clone()))
2042        );
2043        pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSchema {
2044            schema: added_schema
2045        });
2046        assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
2047            schema_id: -1
2048        });
2049    }
2050
2051    #[test]
2052    fn test_set_current_schema_change_is_minus_one_if_schema_was_added_in_this_change() {
2053        let builder = builder_without_changes(FormatVersion::V2);
2054
2055        let added_schema = Schema::builder()
2056            .with_schema_id(1)
2057            .with_fields(vec![
2058                NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
2059                NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
2060                NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
2061                NestedField::required(4, "a", Type::Primitive(PrimitiveType::Long)).into(),
2062            ])
2063            .build()
2064            .unwrap();
2065
2066        let build_result = builder
2067            .add_schema(added_schema.clone())
2068            .unwrap()
2069            .set_current_schema(1)
2070            .unwrap()
2071            .build()
2072            .unwrap();
2073
2074        assert_eq!(build_result.changes.len(), 2);
2075        assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
2076            schema_id: -1
2077        });
2078    }
2079
2080    #[test]
2081    fn test_no_metadata_log_for_create_table() {
2082        let build_result = TableMetadataBuilder::new(
2083            schema(),
2084            partition_spec(),
2085            sort_order(),
2086            TEST_LOCATION.to_string(),
2087            FormatVersion::V2,
2088            HashMap::new(),
2089        )
2090        .unwrap()
2091        .build()
2092        .unwrap();
2093
2094        assert_eq!(build_result.metadata.metadata_log.len(), 0);
2095    }
2096
2097    #[test]
2098    fn test_no_metadata_log_entry_for_no_previous_location() {
2099        // Used for first commit after stage-creation of tables
2100        let metadata = builder_without_changes(FormatVersion::V2)
2101            .build()
2102            .unwrap()
2103            .metadata;
2104        assert_eq!(metadata.metadata_log.len(), 1);
2105
2106        let build_result = metadata
2107            .into_builder(None)
2108            .set_properties(HashMap::from_iter(vec![(
2109                "foo".to_string(),
2110                "bar".to_string(),
2111            )]))
2112            .unwrap()
2113            .build()
2114            .unwrap();
2115
2116        assert_eq!(build_result.metadata.metadata_log.len(), 1);
2117    }
2118
2119    #[test]
2120    fn test_from_metadata_generates_metadata_log() {
2121        let metadata_path = "s3://bucket/test/location/metadata/metadata1.json";
2122        let builder = TableMetadataBuilder::new(
2123            schema(),
2124            partition_spec(),
2125            sort_order(),
2126            TEST_LOCATION.to_string(),
2127            FormatVersion::V2,
2128            HashMap::new(),
2129        )
2130        .unwrap()
2131        .build()
2132        .unwrap()
2133        .metadata
2134        .into_builder(Some(metadata_path.to_string()));
2135
2136        let builder = builder
2137            .add_default_sort_order(SortOrder::unsorted_order())
2138            .unwrap();
2139
2140        let build_result = builder.build().unwrap();
2141
2142        assert_eq!(build_result.metadata.metadata_log.len(), 1);
2143        assert_eq!(
2144            build_result.metadata.metadata_log[0].metadata_file,
2145            metadata_path
2146        );
2147    }
2148
2149    #[test]
2150    fn test_set_ref() {
2151        let builder = builder_without_changes(FormatVersion::V2);
2152
2153        let snapshot = Snapshot::builder()
2154            .with_snapshot_id(1)
2155            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2156            .with_sequence_number(0)
2157            .with_schema_id(0)
2158            .with_manifest_list("/snap-1.avro")
2159            .with_summary(Summary {
2160                operation: Operation::Append,
2161                additional_properties: HashMap::from_iter(vec![
2162                    (
2163                        "spark.app.id".to_string(),
2164                        "local-1662532784305".to_string(),
2165                    ),
2166                    ("added-data-files".to_string(), "4".to_string()),
2167                    ("added-records".to_string(), "4".to_string()),
2168                    ("added-files-size".to_string(), "6001".to_string()),
2169                ]),
2170            })
2171            .build();
2172
2173        let builder = builder.add_snapshot(snapshot.clone()).unwrap();
2174
2175        assert!(
2176            builder
2177                .clone()
2178                .set_ref(MAIN_BRANCH, SnapshotReference {
2179                    snapshot_id: 10,
2180                    retention: SnapshotRetention::Branch {
2181                        min_snapshots_to_keep: Some(10),
2182                        max_snapshot_age_ms: None,
2183                        max_ref_age_ms: None,
2184                    },
2185                })
2186                .unwrap_err()
2187                .to_string()
2188                .contains("Cannot set 'main' to unknown snapshot: '10'")
2189        );
2190
2191        let build_result = builder
2192            .set_ref(MAIN_BRANCH, SnapshotReference {
2193                snapshot_id: 1,
2194                retention: SnapshotRetention::Branch {
2195                    min_snapshots_to_keep: Some(10),
2196                    max_snapshot_age_ms: None,
2197                    max_ref_age_ms: None,
2198                },
2199            })
2200            .unwrap()
2201            .build()
2202            .unwrap();
2203        assert_eq!(build_result.metadata.snapshots.len(), 1);
2204        assert_eq!(
2205            build_result.metadata.snapshot_by_id(1),
2206            Some(&Arc::new(snapshot.clone()))
2207        );
2208        assert_eq!(build_result.metadata.snapshot_log, vec![SnapshotLog {
2209            snapshot_id: 1,
2210            timestamp_ms: snapshot.timestamp_ms()
2211        }])
2212    }
2213
2214    #[test]
2215    fn test_snapshot_log_skips_intermediates() {
2216        let builder = builder_without_changes(FormatVersion::V2);
2217
2218        let snapshot_1 = Snapshot::builder()
2219            .with_snapshot_id(1)
2220            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2221            .with_sequence_number(0)
2222            .with_schema_id(0)
2223            .with_manifest_list("/snap-1.avro")
2224            .with_summary(Summary {
2225                operation: Operation::Append,
2226                additional_properties: HashMap::from_iter(vec![
2227                    (
2228                        "spark.app.id".to_string(),
2229                        "local-1662532784305".to_string(),
2230                    ),
2231                    ("added-data-files".to_string(), "4".to_string()),
2232                    ("added-records".to_string(), "4".to_string()),
2233                    ("added-files-size".to_string(), "6001".to_string()),
2234                ]),
2235            })
2236            .build();
2237
2238        let snapshot_2 = Snapshot::builder()
2239            .with_snapshot_id(2)
2240            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2241            .with_sequence_number(0)
2242            .with_schema_id(0)
2243            .with_manifest_list("/snap-1.avro")
2244            .with_summary(Summary {
2245                operation: Operation::Append,
2246                additional_properties: HashMap::from_iter(vec![
2247                    (
2248                        "spark.app.id".to_string(),
2249                        "local-1662532784305".to_string(),
2250                    ),
2251                    ("added-data-files".to_string(), "4".to_string()),
2252                    ("added-records".to_string(), "4".to_string()),
2253                    ("added-files-size".to_string(), "6001".to_string()),
2254                ]),
2255            })
2256            .build();
2257
2258        let result = builder
2259            .add_snapshot(snapshot_1)
2260            .unwrap()
2261            .set_ref(MAIN_BRANCH, SnapshotReference {
2262                snapshot_id: 1,
2263                retention: SnapshotRetention::Branch {
2264                    min_snapshots_to_keep: Some(10),
2265                    max_snapshot_age_ms: None,
2266                    max_ref_age_ms: None,
2267                },
2268            })
2269            .unwrap()
2270            .set_branch_snapshot(snapshot_2.clone(), MAIN_BRANCH)
2271            .unwrap()
2272            .build()
2273            .unwrap();
2274
2275        assert_eq!(result.metadata.snapshot_log, vec![SnapshotLog {
2276            snapshot_id: 2,
2277            timestamp_ms: snapshot_2.timestamp_ms()
2278        }]);
2279        assert_eq!(result.metadata.current_snapshot().unwrap().snapshot_id(), 2);
2280    }
2281
2282    #[test]
2283    fn test_remove_main_ref_keeps_snapshot_log() {
2284        let builder = builder_without_changes(FormatVersion::V2);
2285
2286        let snapshot = Snapshot::builder()
2287            .with_snapshot_id(1)
2288            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2289            .with_sequence_number(0)
2290            .with_schema_id(0)
2291            .with_manifest_list("/snap-1.avro")
2292            .with_summary(Summary {
2293                operation: Operation::Append,
2294                additional_properties: HashMap::from_iter(vec![
2295                    (
2296                        "spark.app.id".to_string(),
2297                        "local-1662532784305".to_string(),
2298                    ),
2299                    ("added-data-files".to_string(), "4".to_string()),
2300                    ("added-records".to_string(), "4".to_string()),
2301                    ("added-files-size".to_string(), "6001".to_string()),
2302                ]),
2303            })
2304            .build();
2305
2306        let result = builder
2307            .add_snapshot(snapshot.clone())
2308            .unwrap()
2309            .set_ref(MAIN_BRANCH, SnapshotReference {
2310                snapshot_id: 1,
2311                retention: SnapshotRetention::Branch {
2312                    min_snapshots_to_keep: Some(10),
2313                    max_snapshot_age_ms: None,
2314                    max_ref_age_ms: None,
2315                },
2316            })
2317            .unwrap()
2318            .build()
2319            .unwrap();
2320
2321        // Verify snapshot log was created
2322        assert_eq!(result.metadata.snapshot_log.len(), 1);
2323        assert_eq!(result.metadata.snapshot_log[0].snapshot_id, 1);
2324        assert_eq!(result.metadata.current_snapshot_id, Some(1));
2325
2326        // Remove the main ref
2327        let result_after_remove = result
2328            .metadata
2329            .into_builder(Some(
2330                "s3://bucket/test/location/metadata/metadata2.json".to_string(),
2331            ))
2332            .remove_ref(MAIN_BRANCH)
2333            .build()
2334            .unwrap();
2335
2336        // Verify snapshot log is kept even after removing main ref
2337        assert_eq!(result_after_remove.metadata.snapshot_log.len(), 1);
2338        assert_eq!(result_after_remove.metadata.snapshot_log[0].snapshot_id, 1);
2339        assert_eq!(result_after_remove.metadata.current_snapshot_id, None);
2340        assert_eq!(result_after_remove.changes.len(), 1);
2341        assert_eq!(
2342            result_after_remove.changes[0],
2343            TableUpdate::RemoveSnapshotRef {
2344                ref_name: MAIN_BRANCH.to_string()
2345            }
2346        );
2347    }
2348
2349    #[test]
2350    fn test_set_branch_snapshot_creates_branch_if_not_exists() {
2351        let builder = builder_without_changes(FormatVersion::V2);
2352
2353        let snapshot = Snapshot::builder()
2354            .with_snapshot_id(2)
2355            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2356            .with_sequence_number(0)
2357            .with_schema_id(0)
2358            .with_manifest_list("/snap-1.avro")
2359            .with_summary(Summary {
2360                operation: Operation::Append,
2361                additional_properties: HashMap::new(),
2362            })
2363            .build();
2364
2365        let build_result = builder
2366            .set_branch_snapshot(snapshot.clone(), "new_branch")
2367            .unwrap()
2368            .build()
2369            .unwrap();
2370
2371        let reference = SnapshotReference {
2372            snapshot_id: 2,
2373            retention: SnapshotRetention::Branch {
2374                min_snapshots_to_keep: None,
2375                max_snapshot_age_ms: None,
2376                max_ref_age_ms: None,
2377            },
2378        };
2379
2380        assert_eq!(build_result.metadata.refs.len(), 1);
2381        assert_eq!(
2382            build_result.metadata.refs.get("new_branch"),
2383            Some(&reference)
2384        );
2385        assert_eq!(build_result.changes, vec![
2386            TableUpdate::AddSnapshot { snapshot },
2387            TableUpdate::SetSnapshotRef {
2388                ref_name: "new_branch".to_string(),
2389                reference
2390            }
2391        ]);
2392    }
2393
2394    #[test]
2395    fn test_cannot_add_duplicate_snapshot_id() {
2396        let builder = builder_without_changes(FormatVersion::V2);
2397
2398        let snapshot = Snapshot::builder()
2399            .with_snapshot_id(2)
2400            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2401            .with_sequence_number(0)
2402            .with_schema_id(0)
2403            .with_manifest_list("/snap-1.avro")
2404            .with_summary(Summary {
2405                operation: Operation::Append,
2406                additional_properties: HashMap::from_iter(vec![
2407                    (
2408                        "spark.app.id".to_string(),
2409                        "local-1662532784305".to_string(),
2410                    ),
2411                    ("added-data-files".to_string(), "4".to_string()),
2412                    ("added-records".to_string(), "4".to_string()),
2413                    ("added-files-size".to_string(), "6001".to_string()),
2414                ]),
2415            })
2416            .build();
2417
2418        let builder = builder.add_snapshot(snapshot.clone()).unwrap();
2419        builder.add_snapshot(snapshot).unwrap_err();
2420    }
2421
2422    #[test]
2423    fn test_add_incompatible_current_schema_fails() {
2424        let builder = builder_without_changes(FormatVersion::V2);
2425
2426        let added_schema = Schema::builder()
2427            .with_schema_id(1)
2428            .with_fields(vec![])
2429            .build()
2430            .unwrap();
2431
2432        let err = builder
2433            .add_current_schema(added_schema)
2434            .unwrap()
2435            .build()
2436            .unwrap_err();
2437
2438        assert!(
2439            err.to_string()
2440                .contains("Cannot find partition source field")
2441        );
2442    }
2443
2444    #[test]
2445    fn test_add_partition_spec_for_v1_requires_sequential_ids() {
2446        let builder = builder_without_changes(FormatVersion::V1);
2447
2448        let added_spec = UnboundPartitionSpec::builder()
2449            .with_spec_id(10)
2450            .add_partition_fields(vec![
2451                UnboundPartitionField {
2452                    name: "y".to_string(),
2453                    transform: Transform::Identity,
2454                    source_id: 2,
2455                    field_id: Some(1000),
2456                },
2457                UnboundPartitionField {
2458                    name: "z".to_string(),
2459                    transform: Transform::Identity,
2460                    source_id: 3,
2461                    field_id: Some(1002),
2462                },
2463            ])
2464            .unwrap()
2465            .build();
2466
2467        let err = builder.add_partition_spec(added_spec).unwrap_err();
2468        assert!(err.to_string().contains(
2469            "Cannot add partition spec with non-sequential field ids to format version 1 table"
2470        ));
2471    }
2472
2473    #[test]
2474    fn test_expire_metadata_log() {
2475        let builder = builder_without_changes(FormatVersion::V2);
2476        let metadata = builder
2477            .set_properties(HashMap::from_iter(vec![(
2478                TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX.to_string(),
2479                "2".to_string(),
2480            )]))
2481            .unwrap()
2482            .build()
2483            .unwrap();
2484        assert_eq!(metadata.metadata.metadata_log.len(), 1);
2485        assert_eq!(metadata.expired_metadata_logs.len(), 0);
2486
2487        let metadata = metadata
2488            .metadata
2489            .into_builder(Some("path2".to_string()))
2490            .set_properties(HashMap::from_iter(vec![(
2491                "change_nr".to_string(),
2492                "1".to_string(),
2493            )]))
2494            .unwrap()
2495            .build()
2496            .unwrap();
2497
2498        assert_eq!(metadata.metadata.metadata_log.len(), 2);
2499        assert_eq!(metadata.expired_metadata_logs.len(), 0);
2500
2501        let metadata = metadata
2502            .metadata
2503            .into_builder(Some("path2".to_string()))
2504            .set_properties(HashMap::from_iter(vec![(
2505                "change_nr".to_string(),
2506                "2".to_string(),
2507            )]))
2508            .unwrap()
2509            .build()
2510            .unwrap();
2511        assert_eq!(metadata.metadata.metadata_log.len(), 2);
2512        assert_eq!(metadata.expired_metadata_logs.len(), 1);
2513    }
2514
2515    #[test]
2516    fn test_v2_sequence_number_cannot_decrease() {
2517        let builder = builder_without_changes(FormatVersion::V2);
2518
2519        let snapshot = Snapshot::builder()
2520            .with_snapshot_id(1)
2521            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2522            .with_sequence_number(1)
2523            .with_schema_id(0)
2524            .with_manifest_list("/snap-1")
2525            .with_summary(Summary {
2526                operation: Operation::Append,
2527                additional_properties: HashMap::new(),
2528            })
2529            .build();
2530
2531        let builder = builder
2532            .add_snapshot(snapshot.clone())
2533            .unwrap()
2534            .set_ref(MAIN_BRANCH, SnapshotReference {
2535                snapshot_id: 1,
2536                retention: SnapshotRetention::Branch {
2537                    min_snapshots_to_keep: Some(10),
2538                    max_snapshot_age_ms: None,
2539                    max_ref_age_ms: None,
2540                },
2541            })
2542            .unwrap();
2543
2544        let snapshot = Snapshot::builder()
2545            .with_snapshot_id(2)
2546            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2547            .with_sequence_number(0)
2548            .with_schema_id(0)
2549            .with_manifest_list("/snap-0")
2550            .with_parent_snapshot_id(Some(1))
2551            .with_summary(Summary {
2552                operation: Operation::Append,
2553                additional_properties: HashMap::new(),
2554            })
2555            .build();
2556
2557        let err = builder
2558            .set_branch_snapshot(snapshot, MAIN_BRANCH)
2559            .unwrap_err();
2560        assert!(
2561            err.to_string()
2562                .contains("Cannot add snapshot with sequence number")
2563        );
2564    }
2565
2566    #[test]
2567    fn test_default_spec_cannot_be_removed() {
2568        let builder = builder_without_changes(FormatVersion::V2);
2569
2570        builder.remove_partition_specs(&[0]).unwrap_err();
2571    }
2572
2573    #[test]
2574    fn test_statistics() {
2575        let builder = builder_without_changes(FormatVersion::V2);
2576
2577        let statistics = StatisticsFile {
2578            snapshot_id: 3055729675574597004,
2579            statistics_path: "s3://a/b/stats.puffin".to_string(),
2580            file_size_in_bytes: 413,
2581            file_footer_size_in_bytes: 42,
2582            key_metadata: None,
2583            blob_metadata: vec![BlobMetadata {
2584                snapshot_id: 3055729675574597004,
2585                sequence_number: 1,
2586                fields: vec![1],
2587                r#type: "ndv".to_string(),
2588                properties: HashMap::new(),
2589            }],
2590        };
2591        let build_result = builder.set_statistics(statistics.clone()).build().unwrap();
2592
2593        assert_eq!(
2594            build_result.metadata.statistics,
2595            HashMap::from_iter(vec![(3055729675574597004, statistics.clone())])
2596        );
2597        assert_eq!(build_result.changes, vec![TableUpdate::SetStatistics {
2598            statistics: statistics.clone()
2599        }]);
2600
2601        // Remove
2602        let builder = build_result.metadata.into_builder(None);
2603        let build_result = builder
2604            .remove_statistics(statistics.snapshot_id)
2605            .build()
2606            .unwrap();
2607
2608        assert_eq!(build_result.metadata.statistics.len(), 0);
2609        assert_eq!(build_result.changes, vec![TableUpdate::RemoveStatistics {
2610            snapshot_id: statistics.snapshot_id
2611        }]);
2612
2613        // Remove again yields no changes
2614        let builder = build_result.metadata.into_builder(None);
2615        let build_result = builder
2616            .remove_statistics(statistics.snapshot_id)
2617            .build()
2618            .unwrap();
2619        assert_eq!(build_result.metadata.statistics.len(), 0);
2620        assert_eq!(build_result.changes.len(), 0);
2621    }
2622
2623    #[test]
2624    fn test_add_partition_statistics() {
2625        let builder = builder_without_changes(FormatVersion::V2);
2626
2627        let statistics = PartitionStatisticsFile {
2628            snapshot_id: 3055729675574597004,
2629            statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2630            file_size_in_bytes: 43,
2631        };
2632
2633        let build_result = builder
2634            .set_partition_statistics(statistics.clone())
2635            .build()
2636            .unwrap();
2637        assert_eq!(
2638            build_result.metadata.partition_statistics,
2639            HashMap::from_iter(vec![(3055729675574597004, statistics.clone())])
2640        );
2641        assert_eq!(build_result.changes, vec![
2642            TableUpdate::SetPartitionStatistics {
2643                partition_statistics: statistics.clone()
2644            }
2645        ]);
2646
2647        // Remove
2648        let builder = build_result.metadata.into_builder(None);
2649        let build_result = builder
2650            .remove_partition_statistics(statistics.snapshot_id)
2651            .build()
2652            .unwrap();
2653        assert_eq!(build_result.metadata.partition_statistics.len(), 0);
2654        assert_eq!(build_result.changes, vec![
2655            TableUpdate::RemovePartitionStatistics {
2656                snapshot_id: statistics.snapshot_id
2657            }
2658        ]);
2659
2660        // Remove again yields no changes
2661        let builder = build_result.metadata.into_builder(None);
2662        let build_result = builder
2663            .remove_partition_statistics(statistics.snapshot_id)
2664            .build()
2665            .unwrap();
2666        assert_eq!(build_result.metadata.partition_statistics.len(), 0);
2667        assert_eq!(build_result.changes.len(), 0);
2668    }
2669
2670    #[test]
2671    fn last_update_increased_for_property_only_update() {
2672        let builder = builder_without_changes(FormatVersion::V2);
2673
2674        let metadata = builder.build().unwrap().metadata;
2675        let last_updated_ms = metadata.last_updated_ms;
2676        sleep(std::time::Duration::from_millis(2));
2677
2678        let build_result = metadata
2679            .into_builder(Some(
2680                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2681            ))
2682            .set_properties(HashMap::from_iter(vec![(
2683                "foo".to_string(),
2684                "bar".to_string(),
2685            )]))
2686            .unwrap()
2687            .build()
2688            .unwrap();
2689
2690        assert!(
2691            build_result.metadata.last_updated_ms > last_updated_ms,
2692            "{} > {}",
2693            build_result.metadata.last_updated_ms,
2694            last_updated_ms
2695        );
2696    }
2697
2698    #[test]
2699    fn test_construct_default_main_branch() {
2700        // Load the table without ref
2701        let file = File::open(format!(
2702            "{}/testdata/table_metadata/{}",
2703            env!("CARGO_MANIFEST_DIR"),
2704            "TableMetadataV2Valid.json"
2705        ))
2706        .unwrap();
2707        let reader = BufReader::new(file);
2708        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2709
2710        let table = Table::builder()
2711            .metadata(resp)
2712            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
2713            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2714            .file_io(FileIOBuilder::new("memory").build().unwrap())
2715            .build()
2716            .unwrap();
2717
2718        assert_eq!(
2719            table.metadata().refs.get(MAIN_BRANCH).unwrap().snapshot_id,
2720            table.metadata().current_snapshot_id().unwrap()
2721        );
2722    }
2723
2724    #[test]
2725    fn test_active_schema_cannot_be_removed() {
2726        let builder = builder_without_changes(FormatVersion::V2);
2727        builder.remove_schemas(&[0]).unwrap_err();
2728    }
2729
2730    #[test]
2731    fn test_remove_schemas() {
2732        let file = File::open(format!(
2733            "{}/testdata/table_metadata/{}",
2734            env!("CARGO_MANIFEST_DIR"),
2735            "TableMetadataV2Valid.json"
2736        ))
2737        .unwrap();
2738        let reader = BufReader::new(file);
2739        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2740
2741        let table = Table::builder()
2742            .metadata(resp)
2743            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
2744            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2745            .file_io(FileIOBuilder::new("memory").build().unwrap())
2746            .build()
2747            .unwrap();
2748
2749        assert_eq!(2, table.metadata().schemas.len());
2750
2751        {
2752            // can not remove active schema
2753            let meta_data_builder = table.metadata().clone().into_builder(None);
2754            meta_data_builder.remove_schemas(&[1]).unwrap_err();
2755        }
2756
2757        let mut meta_data_builder = table.metadata().clone().into_builder(None);
2758        meta_data_builder = meta_data_builder.remove_schemas(&[0]).unwrap();
2759        let build_result = meta_data_builder.build().unwrap();
2760        assert_eq!(1, build_result.metadata.schemas.len());
2761        assert_eq!(1, build_result.metadata.current_schema_id);
2762        assert_eq!(1, build_result.metadata.current_schema().schema_id());
2763        assert_eq!(1, build_result.changes.len());
2764
2765        let remove_schema_ids =
2766            if let TableUpdate::RemoveSchemas { schema_ids } = &build_result.changes[0] {
2767                schema_ids
2768            } else {
2769                unreachable!("Expected RemoveSchema change")
2770            };
2771        assert_eq!(remove_schema_ids, &[0]);
2772    }
2773
2774    #[test]
2775    fn test_schema_evolution_now_correctly_validates_partition_field_name_conflicts() {
2776        let initial_schema = Schema::builder()
2777            .with_fields(vec![
2778                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2779            ])
2780            .build()
2781            .unwrap();
2782
2783        let partition_spec_with_bucket = UnboundPartitionSpec::builder()
2784            .with_spec_id(0)
2785            .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2786            .unwrap()
2787            .build();
2788
2789        let metadata = TableMetadataBuilder::new(
2790            initial_schema,
2791            partition_spec_with_bucket,
2792            SortOrder::unsorted_order(),
2793            TEST_LOCATION.to_string(),
2794            FormatVersion::V2,
2795            HashMap::new(),
2796        )
2797        .unwrap()
2798        .build()
2799        .unwrap()
2800        .metadata;
2801
2802        let partition_field_names: Vec<String> = metadata
2803            .default_partition_spec()
2804            .fields()
2805            .iter()
2806            .map(|f| f.name.clone())
2807            .collect();
2808        assert!(partition_field_names.contains(&"bucket_data".to_string()));
2809
2810        let evolved_schema = Schema::builder()
2811            .with_fields(vec![
2812                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2813                // Adding a schema field with the same name as an existing partition field
2814                NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(),
2815            ])
2816            .build()
2817            .unwrap();
2818
2819        let builder = metadata.into_builder(Some(
2820            "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2821        ));
2822
2823        // Try to add the evolved schema - this should now fail immediately with a clear error
2824        let result = builder.add_current_schema(evolved_schema);
2825
2826        assert!(result.is_err());
2827        let error = result.unwrap_err();
2828        let error_message = error.message();
2829        assert!(error_message.contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
2830        assert!(error_message.contains("Schema evolution cannot introduce field names that match existing partition field names"));
2831    }
2832
2833    #[test]
2834    fn test_schema_evolution_should_validate_on_schema_add_not_metadata_build() {
2835        let initial_schema = Schema::builder()
2836            .with_fields(vec![
2837                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2838            ])
2839            .build()
2840            .unwrap();
2841
2842        let partition_spec = UnboundPartitionSpec::builder()
2843            .with_spec_id(0)
2844            .add_partition_field(1, "partition_col", Transform::Bucket(16))
2845            .unwrap()
2846            .build();
2847
2848        let metadata = TableMetadataBuilder::new(
2849            initial_schema,
2850            partition_spec,
2851            SortOrder::unsorted_order(),
2852            TEST_LOCATION.to_string(),
2853            FormatVersion::V2,
2854            HashMap::new(),
2855        )
2856        .unwrap()
2857        .build()
2858        .unwrap()
2859        .metadata;
2860
2861        let non_conflicting_schema = Schema::builder()
2862            .with_fields(vec![
2863                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2864                NestedField::required(2, "new_field", Type::Primitive(PrimitiveType::Int)).into(),
2865            ])
2866            .build()
2867            .unwrap();
2868
2869        // This should succeed since there's no name conflict
2870        let result = metadata
2871            .clone()
2872            .into_builder(Some("test_location".to_string()))
2873            .add_current_schema(non_conflicting_schema)
2874            .unwrap()
2875            .build();
2876
2877        assert!(result.is_ok());
2878    }
2879
2880    #[test]
2881    fn test_partition_spec_evolution_validates_schema_field_name_conflicts() {
2882        let initial_schema = Schema::builder()
2883            .with_fields(vec![
2884                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2885                NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2886                    .into(),
2887            ])
2888            .build()
2889            .unwrap();
2890
2891        let partition_spec = UnboundPartitionSpec::builder()
2892            .with_spec_id(0)
2893            .add_partition_field(1, "data_bucket", Transform::Bucket(16))
2894            .unwrap()
2895            .build();
2896
2897        let metadata = TableMetadataBuilder::new(
2898            initial_schema,
2899            partition_spec,
2900            SortOrder::unsorted_order(),
2901            TEST_LOCATION.to_string(),
2902            FormatVersion::V2,
2903            HashMap::new(),
2904        )
2905        .unwrap()
2906        .build()
2907        .unwrap()
2908        .metadata;
2909
2910        let builder = metadata.into_builder(Some(
2911            "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2912        ));
2913
2914        let conflicting_partition_spec = UnboundPartitionSpec::builder()
2915            .with_spec_id(1)
2916            .add_partition_field(1, "existing_field", Transform::Bucket(8))
2917            .unwrap()
2918            .build();
2919
2920        let result = builder.add_partition_spec(conflicting_partition_spec);
2921
2922        assert!(result.is_err());
2923        let error = result.unwrap_err();
2924        let error_message = error.message();
2925        // The error comes from our multi-version validation
2926        assert!(error_message.contains(
2927            "Cannot create partition with name 'existing_field' that conflicts with schema field"
2928        ));
2929        assert!(error_message.contains("and is not an identity transform"));
2930    }
2931
2932    #[test]
2933    fn test_schema_evolution_validates_against_all_historical_schemas() {
2934        // Create a table with an initial schema that has a field "existing_field"
2935        let initial_schema = Schema::builder()
2936            .with_fields(vec![
2937                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2938                NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2939                    .into(),
2940            ])
2941            .build()
2942            .unwrap();
2943
2944        let partition_spec = UnboundPartitionSpec::builder()
2945            .with_spec_id(0)
2946            .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2947            .unwrap()
2948            .build();
2949
2950        let metadata = TableMetadataBuilder::new(
2951            initial_schema,
2952            partition_spec,
2953            SortOrder::unsorted_order(),
2954            TEST_LOCATION.to_string(),
2955            FormatVersion::V2,
2956            HashMap::new(),
2957        )
2958        .unwrap()
2959        .build()
2960        .unwrap()
2961        .metadata;
2962
2963        // Add a second schema that removes the existing_field but keeps the data field
2964        let second_schema = Schema::builder()
2965            .with_fields(vec![
2966                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2967                NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2968                    .into(),
2969            ])
2970            .build()
2971            .unwrap();
2972
2973        let metadata = metadata
2974            .into_builder(Some("test_location".to_string()))
2975            .add_current_schema(second_schema)
2976            .unwrap()
2977            .build()
2978            .unwrap()
2979            .metadata;
2980
2981        // Now try to add a third schema that reintroduces "existing_field"
2982        // This should succeed because "existing_field" exists in a historical schema,
2983        // even though there's a partition field named "bucket_data"
2984        let third_schema = Schema::builder()
2985            .with_fields(vec![
2986                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2987                NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2988                    .into(),
2989                NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int))
2990                    .into(),
2991            ])
2992            .build()
2993            .unwrap();
2994
2995        let builder = metadata
2996            .clone()
2997            .into_builder(Some("test_location".to_string()));
2998
2999        // This should succeed because "existing_field" exists in a historical schema
3000        let result = builder.add_current_schema(third_schema);
3001        assert!(result.is_ok());
3002
3003        // However, trying to add a schema field that conflicts with the partition field
3004        // and doesn't exist in any historical schema should fail
3005        let conflicting_schema = Schema::builder()
3006            .with_fields(vec![
3007                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3008                NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
3009                    .into(),
3010                NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int))
3011                    .into(),
3012                NestedField::required(5, "bucket_data", Type::Primitive(PrimitiveType::String))
3013                    .into(), // conflicts with partition field
3014            ])
3015            .build()
3016            .unwrap();
3017
3018        let builder2 = metadata.into_builder(Some("test_location".to_string()));
3019        let result2 = builder2.add_current_schema(conflicting_schema);
3020
3021        // This should fail because "bucket_data" conflicts with partition field name
3022        // and doesn't exist in any historical schema
3023        assert!(result2.is_err());
3024        let error = result2.unwrap_err();
3025        assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
3026    }
3027
3028    #[test]
3029    fn test_schema_evolution_allows_existing_partition_field_if_exists_in_historical_schema() {
3030        // Create initial schema with a field
3031        let initial_schema = Schema::builder()
3032            .with_fields(vec![
3033                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3034                NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int))
3035                    .into(),
3036            ])
3037            .build()
3038            .unwrap();
3039
3040        let partition_spec = UnboundPartitionSpec::builder()
3041            .with_spec_id(0)
3042            .add_partition_field(2, "partition_data", Transform::Identity)
3043            .unwrap()
3044            .build();
3045
3046        let metadata = TableMetadataBuilder::new(
3047            initial_schema,
3048            partition_spec,
3049            SortOrder::unsorted_order(),
3050            TEST_LOCATION.to_string(),
3051            FormatVersion::V2,
3052            HashMap::new(),
3053        )
3054        .unwrap()
3055        .build()
3056        .unwrap()
3057        .metadata;
3058
3059        // Add a new schema that still contains the partition_data field
3060        let evolved_schema = Schema::builder()
3061            .with_fields(vec![
3062                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3063                NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int))
3064                    .into(),
3065                NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
3066                    .into(),
3067            ])
3068            .build()
3069            .unwrap();
3070
3071        // This should succeed because partition_data exists in historical schemas
3072        let result = metadata
3073            .into_builder(Some("test_location".to_string()))
3074            .add_current_schema(evolved_schema);
3075
3076        assert!(result.is_ok());
3077    }
3078
3079    #[test]
3080    fn test_schema_evolution_prevents_new_field_conflicting_with_partition_field() {
3081        // Create initial schema WITHOUT the conflicting field
3082        let initial_schema = Schema::builder()
3083            .with_fields(vec![
3084                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3085            ])
3086            .build()
3087            .unwrap();
3088
3089        let partition_spec = UnboundPartitionSpec::builder()
3090            .with_spec_id(0)
3091            .add_partition_field(1, "bucket_data", Transform::Bucket(16))
3092            .unwrap()
3093            .build();
3094
3095        let metadata = TableMetadataBuilder::new(
3096            initial_schema,
3097            partition_spec,
3098            SortOrder::unsorted_order(),
3099            TEST_LOCATION.to_string(),
3100            FormatVersion::V2,
3101            HashMap::new(),
3102        )
3103        .unwrap()
3104        .build()
3105        .unwrap()
3106        .metadata;
3107
3108        // Try to add a schema with a field that conflicts with partition field name
3109        let conflicting_schema = Schema::builder()
3110            .with_fields(vec![
3111                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3112                // This field name conflicts with the partition field "bucket_data"
3113                NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(),
3114            ])
3115            .build()
3116            .unwrap();
3117
3118        let builder = metadata.into_builder(Some("test_location".to_string()));
3119        let result = builder.add_current_schema(conflicting_schema);
3120
3121        // This should fail because "bucket_data" conflicts with partition field name
3122        // and doesn't exist in any historical schema
3123        assert!(result.is_err());
3124        let error = result.unwrap_err();
3125        assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
3126    }
3127
3128    #[test]
3129    fn test_partition_spec_evolution_allows_non_conflicting_names() {
3130        let initial_schema = Schema::builder()
3131            .with_fields(vec![
3132                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3133                NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
3134                    .into(),
3135            ])
3136            .build()
3137            .unwrap();
3138
3139        let partition_spec = UnboundPartitionSpec::builder()
3140            .with_spec_id(0)
3141            .add_partition_field(1, "data_bucket", Transform::Bucket(16))
3142            .unwrap()
3143            .build();
3144
3145        let metadata = TableMetadataBuilder::new(
3146            initial_schema,
3147            partition_spec,
3148            SortOrder::unsorted_order(),
3149            TEST_LOCATION.to_string(),
3150            FormatVersion::V2,
3151            HashMap::new(),
3152        )
3153        .unwrap()
3154        .build()
3155        .unwrap()
3156        .metadata;
3157
3158        let builder = metadata.into_builder(Some(
3159            "s3://bucket/test/location/metadata/metadata1.json".to_string(),
3160        ));
3161
3162        // Try to add a partition spec with a field name that does NOT conflict with existing schema fields
3163        let non_conflicting_partition_spec = UnboundPartitionSpec::builder()
3164            .with_spec_id(1)
3165            .add_partition_field(2, "new_partition_field", Transform::Bucket(8))
3166            .unwrap()
3167            .build();
3168
3169        let result = builder.add_partition_spec(non_conflicting_partition_spec);
3170
3171        assert!(result.is_ok());
3172    }
3173
3174    #[test]
3175    fn test_row_lineage_addition() {
3176        let new_rows = 30;
3177        let base = builder_without_changes(FormatVersion::V3)
3178            .build()
3179            .unwrap()
3180            .metadata;
3181        let add_rows = Snapshot::builder()
3182            .with_snapshot_id(0)
3183            .with_timestamp_ms(base.last_updated_ms + 1)
3184            .with_sequence_number(0)
3185            .with_schema_id(0)
3186            .with_manifest_list("foo")
3187            .with_parent_snapshot_id(None)
3188            .with_summary(Summary {
3189                operation: Operation::Append,
3190                additional_properties: HashMap::new(),
3191            })
3192            .with_row_range(base.next_row_id(), new_rows)
3193            .build();
3194
3195        let first_addition = base
3196            .into_builder(None)
3197            .add_snapshot(add_rows.clone())
3198            .unwrap()
3199            .build()
3200            .unwrap()
3201            .metadata;
3202
3203        assert_eq!(first_addition.next_row_id(), new_rows);
3204
3205        let add_more_rows = Snapshot::builder()
3206            .with_snapshot_id(1)
3207            .with_timestamp_ms(first_addition.last_updated_ms + 1)
3208            .with_sequence_number(1)
3209            .with_schema_id(0)
3210            .with_manifest_list("foo")
3211            .with_parent_snapshot_id(Some(0))
3212            .with_summary(Summary {
3213                operation: Operation::Append,
3214                additional_properties: HashMap::new(),
3215            })
3216            .with_row_range(first_addition.next_row_id(), new_rows)
3217            .build();
3218
3219        let second_addition = first_addition
3220            .into_builder(None)
3221            .add_snapshot(add_more_rows)
3222            .unwrap()
3223            .build()
3224            .unwrap()
3225            .metadata;
3226        assert_eq!(second_addition.next_row_id(), new_rows * 2);
3227    }
3228
3229    #[test]
3230    fn test_row_lineage_invalid_snapshot() {
3231        let new_rows = 30;
3232        let base = builder_without_changes(FormatVersion::V3)
3233            .build()
3234            .unwrap()
3235            .metadata;
3236
3237        // add rows to check TableMetadata validation; Snapshot rejects negative next-row-id
3238        let add_rows = Snapshot::builder()
3239            .with_snapshot_id(0)
3240            .with_timestamp_ms(base.last_updated_ms + 1)
3241            .with_sequence_number(0)
3242            .with_schema_id(0)
3243            .with_manifest_list("foo")
3244            .with_parent_snapshot_id(None)
3245            .with_summary(Summary {
3246                operation: Operation::Append,
3247                additional_properties: HashMap::new(),
3248            })
3249            .with_row_range(base.next_row_id(), new_rows)
3250            .build();
3251
3252        let added = base
3253            .into_builder(None)
3254            .add_snapshot(add_rows)
3255            .unwrap()
3256            .build()
3257            .unwrap()
3258            .metadata;
3259
3260        let invalid_new_rows = Snapshot::builder()
3261            .with_snapshot_id(1)
3262            .with_timestamp_ms(added.last_updated_ms + 1)
3263            .with_sequence_number(1)
3264            .with_schema_id(0)
3265            .with_manifest_list("foo")
3266            .with_parent_snapshot_id(Some(0))
3267            .with_summary(Summary {
3268                operation: Operation::Append,
3269                additional_properties: HashMap::new(),
3270            })
3271            // first_row_id is behind table next_row_id
3272            .with_row_range(added.next_row_id() - 1, 10)
3273            .build();
3274
3275        let err = added
3276            .into_builder(None)
3277            .add_snapshot(invalid_new_rows)
3278            .unwrap_err();
3279        assert!(
3280            err.to_string().contains(
3281                "Cannot add a snapshot, first-row-id is behind table next-row-id: 29 < 30"
3282            )
3283        );
3284    }
3285
3286    #[test]
3287    fn test_row_lineage_append_branch() {
3288        // Appends to a branch should still change last-row-id even if not on main, these changes
3289        // should also affect commits to main
3290
3291        let branch = "some_branch";
3292
3293        // Start with V3 metadata to support row lineage
3294        let base = builder_without_changes(FormatVersion::V3)
3295            .build()
3296            .unwrap()
3297            .metadata;
3298
3299        // Initial next_row_id should be 0
3300        assert_eq!(base.next_row_id(), 0);
3301
3302        // Write to Branch - append 30 rows
3303        let branch_snapshot_1 = Snapshot::builder()
3304            .with_snapshot_id(1)
3305            .with_timestamp_ms(base.last_updated_ms + 1)
3306            .with_sequence_number(0)
3307            .with_schema_id(0)
3308            .with_manifest_list("foo")
3309            .with_parent_snapshot_id(None)
3310            .with_summary(Summary {
3311                operation: Operation::Append,
3312                additional_properties: HashMap::new(),
3313            })
3314            .with_row_range(base.next_row_id(), 30)
3315            .build();
3316
3317        let table_after_branch_1 = base
3318            .into_builder(None)
3319            .set_branch_snapshot(branch_snapshot_1.clone(), branch)
3320            .unwrap()
3321            .build()
3322            .unwrap()
3323            .metadata;
3324
3325        // Current snapshot should be null (no main branch snapshot yet)
3326        assert!(table_after_branch_1.current_snapshot().is_none());
3327
3328        // Branch snapshot should have first_row_id = 0
3329        let branch_ref = table_after_branch_1.refs.get(branch).unwrap();
3330        let branch_snap_1 = table_after_branch_1
3331            .snapshots
3332            .get(&branch_ref.snapshot_id)
3333            .unwrap();
3334        assert_eq!(branch_snap_1.first_row_id(), Some(0));
3335
3336        // Next row id should be 30
3337        assert_eq!(table_after_branch_1.next_row_id(), 30);
3338
3339        // Write to Main - append 28 rows
3340        let main_snapshot = Snapshot::builder()
3341            .with_snapshot_id(2)
3342            .with_timestamp_ms(table_after_branch_1.last_updated_ms + 1)
3343            .with_sequence_number(1)
3344            .with_schema_id(0)
3345            .with_manifest_list("bar")
3346            .with_parent_snapshot_id(None)
3347            .with_summary(Summary {
3348                operation: Operation::Append,
3349                additional_properties: HashMap::new(),
3350            })
3351            .with_row_range(table_after_branch_1.next_row_id(), 28)
3352            .build();
3353
3354        let table_after_main = table_after_branch_1
3355            .into_builder(None)
3356            .add_snapshot(main_snapshot.clone())
3357            .unwrap()
3358            .set_ref(MAIN_BRANCH, SnapshotReference {
3359                snapshot_id: main_snapshot.snapshot_id(),
3360                retention: SnapshotRetention::Branch {
3361                    min_snapshots_to_keep: None,
3362                    max_snapshot_age_ms: None,
3363                    max_ref_age_ms: None,
3364                },
3365            })
3366            .unwrap()
3367            .build()
3368            .unwrap()
3369            .metadata;
3370
3371        // Main snapshot should have first_row_id = 30
3372        let current_snapshot = table_after_main.current_snapshot().unwrap();
3373        assert_eq!(current_snapshot.first_row_id(), Some(30));
3374
3375        // Next row id should be 58 (30 + 28)
3376        assert_eq!(table_after_main.next_row_id(), 58);
3377
3378        // Write again to branch - append 21 rows
3379        let branch_snapshot_2 = Snapshot::builder()
3380            .with_snapshot_id(3)
3381            .with_timestamp_ms(table_after_main.last_updated_ms + 1)
3382            .with_sequence_number(2)
3383            .with_schema_id(0)
3384            .with_manifest_list("baz")
3385            .with_parent_snapshot_id(Some(branch_snapshot_1.snapshot_id()))
3386            .with_summary(Summary {
3387                operation: Operation::Append,
3388                additional_properties: HashMap::new(),
3389            })
3390            .with_row_range(table_after_main.next_row_id(), 21)
3391            .build();
3392
3393        let table_after_branch_2 = table_after_main
3394            .into_builder(None)
3395            .set_branch_snapshot(branch_snapshot_2.clone(), branch)
3396            .unwrap()
3397            .build()
3398            .unwrap()
3399            .metadata;
3400
3401        // Branch snapshot should have first_row_id = 58 (30 + 28)
3402        let branch_ref_2 = table_after_branch_2.refs.get(branch).unwrap();
3403        let branch_snap_2 = table_after_branch_2
3404            .snapshots
3405            .get(&branch_ref_2.snapshot_id)
3406            .unwrap();
3407        assert_eq!(branch_snap_2.first_row_id(), Some(58));
3408
3409        // Next row id should be 79 (30 + 28 + 21)
3410        assert_eq!(table_after_branch_2.next_row_id(), 79);
3411    }
3412
3413    #[test]
3414    fn test_encryption_keys() {
3415        let builder = builder_without_changes(FormatVersion::V2);
3416
3417        // Create test encryption keys
3418        let encryption_key_1 = EncryptedKey::builder()
3419            .key_id("key-1")
3420            .encrypted_key_metadata(vec![1, 2, 3, 4])
3421            .encrypted_by_id("encryption-service-1")
3422            .properties(HashMap::from_iter(vec![(
3423                "algorithm".to_string(),
3424                "AES-256".to_string(),
3425            )]))
3426            .build();
3427
3428        let encryption_key_2 = EncryptedKey::builder()
3429            .key_id("key-2")
3430            .encrypted_key_metadata(vec![5, 6, 7, 8])
3431            .encrypted_by_id("encryption-service-2")
3432            .properties(HashMap::new())
3433            .build();
3434
3435        // Add first encryption key
3436        let build_result = builder
3437            .add_encryption_key(encryption_key_1.clone())
3438            .build()
3439            .unwrap();
3440
3441        assert_eq!(build_result.changes.len(), 1);
3442        assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3443        assert_eq!(
3444            build_result.metadata.encryption_key("key-1"),
3445            Some(&encryption_key_1)
3446        );
3447        assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey {
3448            encryption_key: encryption_key_1.clone()
3449        });
3450
3451        // Add second encryption key
3452        let build_result = build_result
3453            .metadata
3454            .into_builder(Some(
3455                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
3456            ))
3457            .add_encryption_key(encryption_key_2.clone())
3458            .build()
3459            .unwrap();
3460
3461        assert_eq!(build_result.changes.len(), 1);
3462        assert_eq!(build_result.metadata.encryption_keys.len(), 2);
3463        assert_eq!(
3464            build_result.metadata.encryption_key("key-1"),
3465            Some(&encryption_key_1)
3466        );
3467        assert_eq!(
3468            build_result.metadata.encryption_key("key-2"),
3469            Some(&encryption_key_2)
3470        );
3471        assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey {
3472            encryption_key: encryption_key_2.clone()
3473        });
3474
3475        // Try to add duplicate key - should not create a change
3476        let build_result = build_result
3477            .metadata
3478            .into_builder(Some(
3479                "s3://bucket/test/location/metadata/metadata2.json".to_string(),
3480            ))
3481            .add_encryption_key(encryption_key_1.clone())
3482            .build()
3483            .unwrap();
3484
3485        assert_eq!(build_result.changes.len(), 0);
3486        assert_eq!(build_result.metadata.encryption_keys.len(), 2);
3487
3488        // Remove first encryption key
3489        let build_result = build_result
3490            .metadata
3491            .into_builder(Some(
3492                "s3://bucket/test/location/metadata/metadata3.json".to_string(),
3493            ))
3494            .remove_encryption_key("key-1")
3495            .build()
3496            .unwrap();
3497
3498        assert_eq!(build_result.changes.len(), 1);
3499        assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3500        assert_eq!(build_result.metadata.encryption_key("key-1"), None);
3501        assert_eq!(
3502            build_result.metadata.encryption_key("key-2"),
3503            Some(&encryption_key_2)
3504        );
3505        assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey {
3506            key_id: "key-1".to_string()
3507        });
3508
3509        // Try to remove non-existent key - should not create a change
3510        let build_result = build_result
3511            .metadata
3512            .into_builder(Some(
3513                "s3://bucket/test/location/metadata/metadata4.json".to_string(),
3514            ))
3515            .remove_encryption_key("non-existent-key")
3516            .build()
3517            .unwrap();
3518
3519        assert_eq!(build_result.changes.len(), 0);
3520        assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3521
3522        // Test encryption_keys_iter()
3523        let keys = build_result
3524            .metadata
3525            .encryption_keys_iter()
3526            .collect::<Vec<_>>();
3527        assert_eq!(keys.len(), 1);
3528        assert_eq!(keys[0], &encryption_key_2);
3529
3530        // Remove last encryption key
3531        let build_result = build_result
3532            .metadata
3533            .into_builder(Some(
3534                "s3://bucket/test/location/metadata/metadata5.json".to_string(),
3535            ))
3536            .remove_encryption_key("key-2")
3537            .build()
3538            .unwrap();
3539
3540        assert_eq!(build_result.changes.len(), 1);
3541        assert_eq!(build_result.metadata.encryption_keys.len(), 0);
3542        assert_eq!(build_result.metadata.encryption_key("key-2"), None);
3543        assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey {
3544            key_id: "key-2".to_string()
3545        });
3546
3547        // Verify empty encryption_keys_iter()
3548        let keys = build_result.metadata.encryption_keys_iter();
3549        assert_eq!(keys.len(), 0);
3550    }
3551
3552    #[test]
3553    fn test_partition_field_id_reuse_across_specs() {
3554        let schema = Schema::builder()
3555            .with_fields(vec![
3556                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3557                NestedField::required(2, "data", Type::Primitive(PrimitiveType::String)).into(),
3558                NestedField::required(3, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
3559                    .into(),
3560            ])
3561            .build()
3562            .unwrap();
3563
3564        // Create initial table with spec 0: identity(id) -> field_id = 1000
3565        let initial_spec = UnboundPartitionSpec::builder()
3566            .add_partition_field(1, "id", Transform::Identity)
3567            .unwrap()
3568            .build();
3569
3570        let mut metadata = TableMetadataBuilder::new(
3571            schema,
3572            initial_spec,
3573            SortOrder::unsorted_order(),
3574            "s3://bucket/table".to_string(),
3575            FormatVersion::V2,
3576            HashMap::new(),
3577        )
3578        .unwrap()
3579        .build()
3580        .unwrap()
3581        .metadata;
3582
3583        // Add spec 1: bucket(data) -> field_id = 1001
3584        let spec1 = UnboundPartitionSpec::builder()
3585            .add_partition_field(2, "data_bucket", Transform::Bucket(10))
3586            .unwrap()
3587            .build();
3588        let builder = metadata.into_builder(Some("s3://bucket/table/metadata/v1.json".to_string()));
3589        let result = builder.add_partition_spec(spec1).unwrap().build().unwrap();
3590        metadata = result.metadata;
3591
3592        // Add spec 2: identity(id) + bucket(data) + year(timestamp)
3593        // Should reuse field_id 1000 for identity(id) and 1001 for bucket(data)
3594        let spec2 = UnboundPartitionSpec::builder()
3595            .add_partition_field(1, "id", Transform::Identity) // Should reuse 1000
3596            .unwrap()
3597            .add_partition_field(2, "data_bucket", Transform::Bucket(10)) // Should reuse 1001
3598            .unwrap()
3599            .add_partition_field(3, "year", Transform::Year) // Should get new 1002
3600            .unwrap()
3601            .build();
3602        let builder = metadata.into_builder(Some("s3://bucket/table/metadata/v2.json".to_string()));
3603        let result = builder.add_partition_spec(spec2).unwrap().build().unwrap();
3604
3605        // Verify field ID reuse: spec 2 should reuse IDs from specs 0 and 1, assign new ID for new field
3606        let spec2 = result.metadata.partition_spec_by_id(2).unwrap();
3607        let field_ids: Vec<i32> = spec2.fields().iter().map(|f| f.field_id).collect();
3608        assert_eq!(field_ids, vec![1000, 1001, 1002]); // Reused 1000, 1001; new 1002
3609    }
3610}