iceberg/spec/
table_metadata_builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use uuid::Uuid;
22
23use super::{
24    DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, FormatVersion, MAIN_BRANCH, MetadataLog,
25    ONE_MINUTE_MS, PartitionSpec, PartitionSpecBuilder, PartitionStatisticsFile, Schema, SchemaRef,
26    Snapshot, SnapshotLog, SnapshotReference, SnapshotRetention, SortOrder, SortOrderRef,
27    StatisticsFile, StructType, TableMetadata, TableProperties, UNPARTITIONED_LAST_ASSIGNED_ID,
28    UnboundPartitionSpec,
29};
30use crate::error::{Error, ErrorKind, Result};
31use crate::spec::{EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE};
32use crate::{TableCreation, TableUpdate};
33
34pub(crate) const FIRST_FIELD_ID: i32 = 1;
35
36/// Manipulating table metadata.
37///
38/// For this builder the order of called functions matters. Functions are applied in-order.
39/// All operations applied to the `TableMetadata` are tracked in `changes` as  a chronologically
40/// ordered vec of `TableUpdate`.
41/// If an operation does not lead to a change of the `TableMetadata`, the corresponding update
42/// is omitted from `changes`.
43///
44/// Unlike a typical builder pattern, the order of function calls matters.
45/// Some basic rules:
46/// - `add_schema` must be called before `set_current_schema`.
47/// - If a new partition spec and schema are added, the schema should be added first.
48#[derive(Debug, Clone)]
49pub struct TableMetadataBuilder {
50    metadata: TableMetadata,
51    changes: Vec<TableUpdate>,
52    last_added_schema_id: Option<i32>,
53    last_added_spec_id: Option<i32>,
54    last_added_order_id: Option<i64>,
55    // None if this is a new table (from_metadata) method not used
56    previous_history_entry: Option<MetadataLog>,
57    last_updated_ms: Option<i64>,
58}
59
60#[derive(Debug, Clone, PartialEq)]
61/// Result of modifying or creating a `TableMetadata`.
62pub struct TableMetadataBuildResult {
63    /// The new `TableMetadata`.
64    pub metadata: TableMetadata,
65    /// The changes that were applied to the metadata.
66    pub changes: Vec<TableUpdate>,
67    /// Expired metadata logs
68    pub expired_metadata_logs: Vec<MetadataLog>,
69}
70
71impl TableMetadataBuilder {
72    /// Proxy id for "last added" items, including schema, partition spec, sort order.
73    pub const LAST_ADDED: i32 = -1;
74
75    /// Create a `TableMetadata` object from scratch.
76    ///
77    /// This method re-assign ids of fields in the schema, schema.id, sort_order.id and
78    /// spec.id. It should only be used to create new table metadata from scratch.
79    pub fn new(
80        schema: Schema,
81        spec: impl Into<UnboundPartitionSpec>,
82        sort_order: SortOrder,
83        location: String,
84        format_version: FormatVersion,
85        properties: HashMap<String, String>,
86    ) -> Result<Self> {
87        // Re-assign field_ids, schema.id, sort_order.id and spec.id for a new table.
88        let (fresh_schema, fresh_spec, fresh_sort_order) =
89            Self::reassign_ids(schema, spec.into(), sort_order)?;
90        let schema_id = fresh_schema.schema_id();
91
92        let builder = Self {
93            metadata: TableMetadata {
94                format_version,
95                table_uuid: Uuid::now_v7(),
96                location: "".to_string(), // Overwritten immediately by set_location
97                last_sequence_number: 0,
98                last_updated_ms: 0,    // Overwritten by build() if not set before
99                last_column_id: -1,    // Overwritten immediately by add_current_schema
100                current_schema_id: -1, // Overwritten immediately by add_current_schema
101                schemas: HashMap::new(),
102                partition_specs: HashMap::new(),
103                default_spec: Arc::new(
104                    // The spec id (-1) is just a proxy value and can be any negative number.
105                    // 0 would lead to wrong changes in the builder if the provided spec by the user is
106                    // also unpartitioned.
107                    // The `default_spec` value is always replaced at the end of this method by he `add_default_partition_spec`
108                    // method.
109                    PartitionSpec::unpartition_spec().with_spec_id(-1),
110                ), // Overwritten immediately by add_default_partition_spec
111                default_partition_type: StructType::new(vec![]),
112                last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID,
113                properties: HashMap::new(),
114                current_snapshot_id: None,
115                snapshots: HashMap::new(),
116                snapshot_log: vec![],
117                sort_orders: HashMap::new(),
118                metadata_log: vec![],
119                default_sort_order_id: -1, // Overwritten immediately by add_default_sort_order
120                refs: HashMap::default(),
121                statistics: HashMap::new(),
122                partition_statistics: HashMap::new(),
123                encryption_keys: HashMap::new(),
124                next_row_id: INITIAL_ROW_ID,
125            },
126            last_updated_ms: None,
127            changes: vec![],
128            last_added_schema_id: Some(schema_id),
129            last_added_spec_id: None,
130            last_added_order_id: None,
131            previous_history_entry: None,
132        };
133
134        builder
135            .set_location(location)
136            .add_current_schema(fresh_schema)?
137            .add_default_partition_spec(fresh_spec.into_unbound())?
138            .add_default_sort_order(fresh_sort_order)?
139            .set_properties(properties)
140    }
141
142    /// Creates a new table metadata builder from the given metadata to modify it.
143    /// `current_file_location` is the location where the current version
144    /// of the metadata file is stored. This is used to update the metadata log.
145    /// If `current_file_location` is `None`, the metadata log will not be updated.
146    /// This should only be used to stage-create tables.
147    #[must_use]
148    pub fn new_from_metadata(
149        previous: TableMetadata,
150        current_file_location: Option<String>,
151    ) -> Self {
152        Self {
153            previous_history_entry: current_file_location.map(|l| MetadataLog {
154                metadata_file: l,
155                timestamp_ms: previous.last_updated_ms,
156            }),
157            metadata: previous,
158            changes: Vec::default(),
159            last_added_schema_id: None,
160            last_added_spec_id: None,
161            last_added_order_id: None,
162            last_updated_ms: None,
163        }
164    }
165
166    /// Creates a new table metadata builder from the given table creation.
167    pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
168        let TableCreation {
169            name: _,
170            location,
171            schema,
172            partition_spec,
173            sort_order,
174            properties,
175            format_version,
176        } = table_creation;
177
178        let location = location.ok_or_else(|| {
179            Error::new(
180                ErrorKind::DataInvalid,
181                "Can't create table without location",
182            )
183        })?;
184        let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec {
185            spec_id: None,
186            fields: vec![],
187        });
188
189        Self::new(
190            schema,
191            partition_spec,
192            sort_order.unwrap_or(SortOrder::unsorted_order()),
193            location,
194            format_version,
195            properties,
196        )
197    }
198
199    /// Changes uuid of table metadata.
200    pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
201        if self.metadata.table_uuid != uuid {
202            self.metadata.table_uuid = uuid;
203            self.changes.push(TableUpdate::AssignUuid { uuid });
204        }
205
206        self
207    }
208
209    /// Upgrade `FormatVersion`. Downgrades are not allowed.
210    ///
211    /// # Errors
212    /// - Cannot downgrade to older format versions.
213    pub fn upgrade_format_version(mut self, format_version: FormatVersion) -> Result<Self> {
214        if format_version < self.metadata.format_version {
215            return Err(Error::new(
216                ErrorKind::DataInvalid,
217                format!(
218                    "Cannot downgrade FormatVersion from {} to {}",
219                    self.metadata.format_version, format_version
220                ),
221            ));
222        }
223
224        if format_version != self.metadata.format_version {
225            match format_version {
226                FormatVersion::V1 => {
227                    // No changes needed for V1
228                }
229                FormatVersion::V2 => {
230                    self.metadata.format_version = format_version;
231                    self.changes
232                        .push(TableUpdate::UpgradeFormatVersion { format_version });
233                }
234                FormatVersion::V3 => {
235                    self.metadata.format_version = format_version;
236                    // Set next-row-id to 0 when upgrading to v3 as per Iceberg spec
237                    self.metadata.next_row_id = INITIAL_ROW_ID;
238                    self.changes
239                        .push(TableUpdate::UpgradeFormatVersion { format_version });
240                }
241            }
242        }
243
244        Ok(self)
245    }
246
247    /// Set properties. If a property already exists, it will be overwritten.
248    ///
249    /// If a reserved property is set, the corresponding action is performed and the property is not persisted.
250    /// Currently the following reserved properties are supported:
251    /// * format-version: Set the format version of the table.
252    ///
253    /// # Errors
254    /// - If properties contains a reserved property
255    pub fn set_properties(mut self, properties: HashMap<String, String>) -> Result<Self> {
256        // List of specified properties that are RESERVED and should not be persisted.
257        let reserved_properties = properties
258            .keys()
259            .filter(|key| TableProperties::RESERVED_PROPERTIES.contains(&key.as_str()))
260            .map(ToString::to_string)
261            .collect::<Vec<_>>();
262
263        if !reserved_properties.is_empty() {
264            return Err(Error::new(
265                ErrorKind::DataInvalid,
266                format!(
267                    "Table properties should not contain reserved properties, but got: [{}]",
268                    reserved_properties.join(", ")
269                ),
270            ));
271        }
272
273        if properties.is_empty() {
274            return Ok(self);
275        }
276
277        self.metadata.properties.extend(properties.clone());
278        self.changes.push(TableUpdate::SetProperties {
279            updates: properties,
280        });
281
282        Ok(self)
283    }
284
285    /// Remove properties from the table metadata.
286    /// Does nothing if the key is not present.
287    ///
288    /// # Errors
289    /// - If properties to remove contains a reserved property
290    pub fn remove_properties(mut self, properties: &[String]) -> Result<Self> {
291        // remove duplicates
292        let properties = properties.iter().cloned().collect::<HashSet<_>>();
293
294        // disallow removal of reserved properties
295        let reserved_properties = properties
296            .iter()
297            .filter(|key| TableProperties::RESERVED_PROPERTIES.contains(&key.as_str()))
298            .map(ToString::to_string)
299            .collect::<Vec<_>>();
300
301        if !reserved_properties.is_empty() {
302            return Err(Error::new(
303                ErrorKind::DataInvalid,
304                format!(
305                    "Table properties to remove contain reserved properties: [{}]",
306                    reserved_properties.join(", ")
307                ),
308            ));
309        }
310
311        for property in &properties {
312            self.metadata.properties.remove(property);
313        }
314
315        if !properties.is_empty() {
316            self.changes.push(TableUpdate::RemoveProperties {
317                removals: properties.into_iter().collect(),
318            });
319        }
320
321        Ok(self)
322    }
323
324    /// Set the location of the table, stripping any trailing slashes.
325    pub fn set_location(mut self, location: String) -> Self {
326        let location = location.trim_end_matches('/').to_string();
327        if self.metadata.location != location {
328            self.changes.push(TableUpdate::SetLocation {
329                location: location.clone(),
330            });
331            self.metadata.location = location;
332        }
333
334        self
335    }
336
337    /// Add a snapshot to the table metadata.
338    ///
339    /// # Errors
340    /// - Snapshot id already exists.
341    /// - For format version > 1: the sequence number of the snapshot is lower than the highest sequence number specified so far.
342    /// - For format version >= 3: the first-row-id of the snapshot is lower than the next-row-id of the table.
343    /// - For format version >= 3: added-rows is null or first-row-id is null.
344    /// - For format version >= 3: next-row-id would overflow when adding added-rows.
345    pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> {
346        if self
347            .metadata
348            .snapshots
349            .contains_key(&snapshot.snapshot_id())
350        {
351            return Err(Error::new(
352                ErrorKind::DataInvalid,
353                format!("Snapshot already exists for: '{}'", snapshot.snapshot_id()),
354            ));
355        }
356
357        if self.metadata.format_version != FormatVersion::V1
358            && snapshot.sequence_number() <= self.metadata.last_sequence_number
359            && snapshot.parent_snapshot_id().is_some()
360        {
361            return Err(Error::new(
362                ErrorKind::DataInvalid,
363                format!(
364                    "Cannot add snapshot with sequence number {} older than last sequence number {}",
365                    snapshot.sequence_number(),
366                    self.metadata.last_sequence_number
367                ),
368            ));
369        }
370
371        if let Some(last) = self.metadata.snapshot_log.last() {
372            // commits can happen concurrently from different machines.
373            // A tolerance helps us avoid failure for small clock skew
374            if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS {
375                return Err(Error::new(
376                    ErrorKind::DataInvalid,
377                    format!(
378                        "Invalid snapshot timestamp {}: before last snapshot timestamp {}",
379                        snapshot.timestamp_ms(),
380                        last.timestamp_ms
381                    ),
382                ));
383            }
384        }
385
386        let max_last_updated = self
387            .last_updated_ms
388            .unwrap_or_default()
389            .max(self.metadata.last_updated_ms);
390        if snapshot.timestamp_ms() - max_last_updated < -ONE_MINUTE_MS {
391            return Err(Error::new(
392                ErrorKind::DataInvalid,
393                format!(
394                    "Invalid snapshot timestamp {}: before last updated timestamp {}",
395                    snapshot.timestamp_ms(),
396                    max_last_updated
397                ),
398            ));
399        }
400
401        let mut added_rows = None;
402        if self.metadata.format_version >= MIN_FORMAT_VERSION_ROW_LINEAGE {
403            if let Some((first_row_id, added_rows_count)) = snapshot.row_range() {
404                if first_row_id < self.metadata.next_row_id {
405                    return Err(Error::new(
406                        ErrorKind::DataInvalid,
407                        format!(
408                            "Cannot add a snapshot, first-row-id is behind table next-row-id: {first_row_id} < {}",
409                            self.metadata.next_row_id
410                        ),
411                    ));
412                }
413
414                added_rows = Some(added_rows_count);
415            } else {
416                return Err(Error::new(
417                    ErrorKind::DataInvalid,
418                    format!(
419                        "Cannot add a snapshot: first-row-id is null. first-row-id must be set for format version >= {MIN_FORMAT_VERSION_ROW_LINEAGE}",
420                    ),
421                ));
422            }
423        }
424
425        if let Some(added_rows) = added_rows {
426            self.metadata.next_row_id = self
427                .metadata
428                .next_row_id
429                .checked_add(added_rows)
430                .ok_or_else(|| {
431                    Error::new(
432                        ErrorKind::DataInvalid,
433                        "Cannot add snapshot: next-row-id overflowed when adding added-rows",
434                    )
435                })?;
436        }
437
438        // Mutation happens in next line - must be infallible from here
439        self.changes.push(TableUpdate::AddSnapshot {
440            snapshot: snapshot.clone(),
441        });
442
443        self.last_updated_ms = Some(snapshot.timestamp_ms());
444        self.metadata.last_sequence_number = snapshot.sequence_number();
445        self.metadata
446            .snapshots
447            .insert(snapshot.snapshot_id(), snapshot.into());
448
449        Ok(self)
450    }
451
452    /// Append a snapshot to the specified branch.
453    /// Retention settings from the `branch` are re-used.
454    ///
455    /// # Errors
456    /// - Any of the preconditions of `self.add_snapshot` are not met.
457    pub fn set_branch_snapshot(self, snapshot: Snapshot, branch: &str) -> Result<Self> {
458        let reference = self.metadata.refs.get(branch).cloned();
459
460        let reference = if let Some(mut reference) = reference {
461            if !reference.is_branch() {
462                return Err(Error::new(
463                    ErrorKind::DataInvalid,
464                    format!("Cannot append snapshot to non-branch reference '{branch}'",),
465                ));
466            }
467
468            reference.snapshot_id = snapshot.snapshot_id();
469            reference
470        } else {
471            SnapshotReference {
472                snapshot_id: snapshot.snapshot_id(),
473                retention: SnapshotRetention::Branch {
474                    min_snapshots_to_keep: None,
475                    max_snapshot_age_ms: None,
476                    max_ref_age_ms: None,
477                },
478            }
479        };
480
481        self.add_snapshot(snapshot)?.set_ref(branch, reference)
482    }
483
484    /// Remove snapshots by its ids from the table metadata.
485    /// Does nothing if a snapshot id is not present.
486    /// Keeps as changes only the snapshots that were actually removed.
487    pub fn remove_snapshots(mut self, snapshot_ids: &[i64]) -> Self {
488        let mut removed_snapshots = Vec::with_capacity(snapshot_ids.len());
489
490        self.metadata.snapshots.retain(|k, _| {
491            if snapshot_ids.contains(k) {
492                removed_snapshots.push(*k);
493                false
494            } else {
495                true
496            }
497        });
498
499        if !removed_snapshots.is_empty() {
500            self.changes.push(TableUpdate::RemoveSnapshots {
501                snapshot_ids: removed_snapshots,
502            });
503        }
504
505        // Remove refs that are no longer valid
506        self.metadata
507            .refs
508            .retain(|_, v| self.metadata.snapshots.contains_key(&v.snapshot_id));
509
510        self
511    }
512
513    /// Set a reference to a snapshot.
514    ///
515    /// # Errors
516    /// - The snapshot id is unknown.
517    pub fn set_ref(mut self, ref_name: &str, reference: SnapshotReference) -> Result<Self> {
518        if self
519            .metadata
520            .refs
521            .get(ref_name)
522            .is_some_and(|snap_ref| snap_ref.eq(&reference))
523        {
524            return Ok(self);
525        }
526
527        let Some(snapshot) = self.metadata.snapshots.get(&reference.snapshot_id) else {
528            return Err(Error::new(
529                ErrorKind::DataInvalid,
530                format!(
531                    "Cannot set '{ref_name}' to unknown snapshot: '{}'",
532                    reference.snapshot_id
533                ),
534            ));
535        };
536
537        // Update last_updated_ms to the exact timestamp of the snapshot if it was added in this commit
538        let is_added_snapshot = self.changes.iter().any(|update| {
539            matches!(update, TableUpdate::AddSnapshot { snapshot: snap } if snap.snapshot_id() == snapshot.snapshot_id())
540        });
541        if is_added_snapshot {
542            self.last_updated_ms = Some(snapshot.timestamp_ms());
543        }
544
545        // Current snapshot id is set only for the main branch
546        if ref_name == MAIN_BRANCH {
547            self.metadata.current_snapshot_id = Some(snapshot.snapshot_id());
548            let timestamp_ms = if let Some(last_updated_ms) = self.last_updated_ms {
549                last_updated_ms
550            } else {
551                let last_updated_ms = chrono::Utc::now().timestamp_millis();
552                self.last_updated_ms = Some(last_updated_ms);
553                last_updated_ms
554            };
555
556            self.metadata.snapshot_log.push(SnapshotLog {
557                snapshot_id: snapshot.snapshot_id(),
558                timestamp_ms,
559            });
560        }
561
562        self.changes.push(TableUpdate::SetSnapshotRef {
563            ref_name: ref_name.to_string(),
564            reference: reference.clone(),
565        });
566        self.metadata.refs.insert(ref_name.to_string(), reference);
567
568        Ok(self)
569    }
570
571    /// Remove a reference
572    ///
573    /// If `ref_name='main'` the current snapshot id is set to `None`.
574    pub fn remove_ref(mut self, ref_name: &str) -> Self {
575        if ref_name == MAIN_BRANCH {
576            self.metadata.current_snapshot_id = None;
577        }
578
579        if self.metadata.refs.remove(ref_name).is_some() || ref_name == MAIN_BRANCH {
580            self.changes.push(TableUpdate::RemoveSnapshotRef {
581                ref_name: ref_name.to_string(),
582            });
583        }
584
585        self
586    }
587
588    /// Set statistics for a snapshot
589    pub fn set_statistics(mut self, statistics: StatisticsFile) -> Self {
590        self.metadata
591            .statistics
592            .insert(statistics.snapshot_id, statistics.clone());
593        self.changes.push(TableUpdate::SetStatistics {
594            statistics: statistics.clone(),
595        });
596        self
597    }
598
599    /// Remove statistics for a snapshot
600    pub fn remove_statistics(mut self, snapshot_id: i64) -> Self {
601        let previous = self.metadata.statistics.remove(&snapshot_id);
602        if previous.is_some() {
603            self.changes
604                .push(TableUpdate::RemoveStatistics { snapshot_id });
605        }
606        self
607    }
608
609    /// Set partition statistics
610    pub fn set_partition_statistics(
611        mut self,
612        partition_statistics_file: PartitionStatisticsFile,
613    ) -> Self {
614        self.metadata.partition_statistics.insert(
615            partition_statistics_file.snapshot_id,
616            partition_statistics_file.clone(),
617        );
618        self.changes.push(TableUpdate::SetPartitionStatistics {
619            partition_statistics: partition_statistics_file,
620        });
621        self
622    }
623
624    /// Remove partition statistics
625    pub fn remove_partition_statistics(mut self, snapshot_id: i64) -> Self {
626        let previous = self.metadata.partition_statistics.remove(&snapshot_id);
627        if previous.is_some() {
628            self.changes
629                .push(TableUpdate::RemovePartitionStatistics { snapshot_id });
630        }
631        self
632    }
633
634    /// Add a schema to the table metadata.
635    ///
636    /// The provided `schema.schema_id` may not be used.
637    ///
638    /// Important: Use this method with caution. The builder does not check
639    /// if the added schema is compatible with the current schema.
640    pub fn add_schema(mut self, schema: Schema) -> Result<Self> {
641        // Validate that new schema fields don't conflict with existing partition field names
642        self.validate_schema_field_names(&schema)?;
643
644        let new_schema_id = self.reuse_or_create_new_schema_id(&schema);
645        let schema_found = self.metadata.schemas.contains_key(&new_schema_id);
646
647        if schema_found {
648            if self.last_added_schema_id != Some(new_schema_id) {
649                self.changes.push(TableUpdate::AddSchema {
650                    schema: schema.clone(),
651                });
652                self.last_added_schema_id = Some(new_schema_id);
653            }
654
655            return Ok(self);
656        }
657
658        // New schemas might contain only old columns. In this case last_column_id should not be
659        // reduced.
660        self.metadata.last_column_id =
661            std::cmp::max(self.metadata.last_column_id, schema.highest_field_id());
662
663        // Set schema-id
664        let schema = match new_schema_id == schema.schema_id() {
665            true => schema,
666            false => schema.with_schema_id(new_schema_id),
667        };
668
669        self.metadata
670            .schemas
671            .insert(new_schema_id, schema.clone().into());
672
673        self.changes.push(TableUpdate::AddSchema { schema });
674
675        self.last_added_schema_id = Some(new_schema_id);
676
677        Ok(self)
678    }
679
680    /// Set the current schema id.
681    ///
682    /// If `schema_id` is -1, the last added schema is set as the current schema.
683    ///
684    /// Errors:
685    /// - provided `schema_id` is -1 but no schema has been added via `add_schema`.
686    /// - No schema with the provided `schema_id` exists.
687    pub fn set_current_schema(mut self, mut schema_id: i32) -> Result<Self> {
688        if schema_id == Self::LAST_ADDED {
689            schema_id = self.last_added_schema_id.ok_or_else(|| {
690                Error::new(
691                    ErrorKind::DataInvalid,
692                    "Cannot set current schema to last added schema: no schema has been added.",
693                )
694            })?;
695        };
696        let schema_id = schema_id; // Make immutable
697
698        if schema_id == self.metadata.current_schema_id {
699            return Ok(self);
700        }
701
702        let _schema = self.metadata.schemas.get(&schema_id).ok_or_else(|| {
703            Error::new(
704                ErrorKind::DataInvalid,
705                format!("Cannot set current schema to unknown schema with id: '{schema_id}'"),
706            )
707        })?;
708
709        // Old partition specs and sort-orders should be preserved even if they are not compatible with the new schema,
710        // so that older metadata can still be interpreted.
711        // Default partition spec and sort order are checked in the build() method
712        // which allows other default partition specs and sort orders to be set before the build.
713
714        self.metadata.current_schema_id = schema_id;
715
716        if self.last_added_schema_id == Some(schema_id) {
717            self.changes.push(TableUpdate::SetCurrentSchema {
718                schema_id: Self::LAST_ADDED,
719            });
720        } else {
721            self.changes
722                .push(TableUpdate::SetCurrentSchema { schema_id });
723        }
724
725        Ok(self)
726    }
727
728    /// Add a schema and set it as the current schema.
729    pub fn add_current_schema(self, schema: Schema) -> Result<Self> {
730        self.add_schema(schema)?
731            .set_current_schema(Self::LAST_ADDED)
732    }
733
734    /// Validate schema field names against partition field names across all historical schemas.
735    ///
736    /// Due to Iceberg's multi-version property, this check ignores existing schema fields
737    /// that match partition names (schema evolution allows re-adding previously removed fields).
738    /// Only NEW field names that conflict with partition names are rejected.
739    ///
740    /// # Errors
741    /// - Schema field name conflicts with partition field name but doesn't exist in any historical schema.
742    fn validate_schema_field_names(&self, schema: &Schema) -> Result<()> {
743        if self.metadata.schemas.is_empty() {
744            return Ok(());
745        }
746
747        for field_name in schema.field_id_to_name_map().values() {
748            let has_partition_conflict = self.metadata.partition_name_exists(field_name);
749            let is_new_field = !self.metadata.name_exists_in_any_schema(field_name);
750
751            if has_partition_conflict && is_new_field {
752                return Err(Error::new(
753                    ErrorKind::DataInvalid,
754                    format!(
755                        "Cannot add schema field '{field_name}' because it conflicts with existing partition field name. \
756                         Schema evolution cannot introduce field names that match existing partition field names."
757                    ),
758                ));
759            }
760        }
761
762        Ok(())
763    }
764
765    /// Validate partition field names against schema field names across all historical schemas.
766    ///
767    /// Due to Iceberg's multi-version property, partition fields can share names with schema fields
768    /// if they meet specific requirements (identity transform + matching source field ID).
769    /// This validation enforces those rules across all historical schema versions.
770    ///
771    /// # Errors
772    /// - Partition field name conflicts with schema field name but doesn't use identity transform.
773    /// - Partition field uses identity transform but references wrong source field ID.
774    fn validate_partition_field_names(&self, unbound_spec: &UnboundPartitionSpec) -> Result<()> {
775        if self.metadata.schemas.is_empty() {
776            return Ok(());
777        }
778
779        let current_schema = self.get_current_schema()?;
780        for partition_field in unbound_spec.fields() {
781            let exists_in_any_schema = self
782                .metadata
783                .name_exists_in_any_schema(&partition_field.name);
784
785            // Skip if partition field name doesn't conflict with any schema field
786            if !exists_in_any_schema {
787                continue;
788            }
789
790            // If name exists in schemas, validate against current schema rules
791            if let Some(schema_field) = current_schema.field_by_name(&partition_field.name) {
792                let is_identity_transform =
793                    partition_field.transform == crate::spec::Transform::Identity;
794                let has_matching_source_id = schema_field.id == partition_field.source_id;
795
796                if !is_identity_transform {
797                    return Err(Error::new(
798                        ErrorKind::DataInvalid,
799                        format!(
800                            "Cannot create partition with name '{}' that conflicts with schema field and is not an identity transform.",
801                            partition_field.name
802                        ),
803                    ));
804                }
805
806                if !has_matching_source_id {
807                    return Err(Error::new(
808                        ErrorKind::DataInvalid,
809                        format!(
810                            "Cannot create identity partition sourced from different field in schema. \
811                             Field name '{}' has id `{}` in schema but partition source id is `{}`",
812                            partition_field.name, schema_field.id, partition_field.source_id
813                        ),
814                    ));
815                }
816            }
817        }
818
819        Ok(())
820    }
821
822    /// Add a partition spec to the table metadata.
823    ///
824    /// The spec is bound eagerly to the current schema.
825    /// If a schema is added in the same set of changes, the schema should be added first.
826    ///
827    /// Even if `unbound_spec.spec_id` is provided as `Some`, it may not be used.
828    ///
829    /// # Errors
830    /// - The partition spec cannot be bound to the current schema.
831    /// - The partition spec has non-sequential field ids and the table format version is 1.
832    pub fn add_partition_spec(mut self, unbound_spec: UnboundPartitionSpec) -> Result<Self> {
833        let schema = self.get_current_schema()?.clone();
834
835        // Check if partition field names conflict with schema field names across all schemas
836        self.validate_partition_field_names(&unbound_spec)?;
837
838        // Reuse field IDs for equivalent fields from existing partition specs
839        let unbound_spec = self.reuse_partition_field_ids(unbound_spec)?;
840
841        let spec = PartitionSpecBuilder::new_from_unbound(unbound_spec.clone(), schema)?
842            .with_last_assigned_field_id(self.metadata.last_partition_id)
843            .build()?;
844
845        let new_spec_id = self.reuse_or_create_new_spec_id(&spec);
846        let spec_found = self.metadata.partition_specs.contains_key(&new_spec_id);
847        let spec = spec.with_spec_id(new_spec_id);
848        let unbound_spec = unbound_spec.with_spec_id(new_spec_id);
849
850        if spec_found {
851            if self.last_added_spec_id != Some(new_spec_id) {
852                self.changes
853                    .push(TableUpdate::AddSpec { spec: unbound_spec });
854                self.last_added_spec_id = Some(new_spec_id);
855            }
856
857            return Ok(self);
858        }
859
860        if self.metadata.format_version <= FormatVersion::V1 && !spec.has_sequential_ids() {
861            return Err(Error::new(
862                ErrorKind::DataInvalid,
863                "Cannot add partition spec with non-sequential field ids to format version 1 table",
864            ));
865        }
866
867        let highest_field_id = spec
868            .highest_field_id()
869            .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID);
870        self.metadata
871            .partition_specs
872            .insert(new_spec_id, Arc::new(spec));
873        self.changes
874            .push(TableUpdate::AddSpec { spec: unbound_spec });
875
876        self.last_added_spec_id = Some(new_spec_id);
877        self.metadata.last_partition_id =
878            std::cmp::max(self.metadata.last_partition_id, highest_field_id);
879
880        Ok(self)
881    }
882
883    /// Reuse partition field IDs for equivalent fields from existing partition specs.
884    ///
885    /// According to the Iceberg spec, partition field IDs must be reused if an existing
886    /// partition spec contains an equivalent field (same source_id and transform).
887    fn reuse_partition_field_ids(
888        &self,
889        unbound_spec: UnboundPartitionSpec,
890    ) -> Result<UnboundPartitionSpec> {
891        // Build a map of (source_id, transform) -> field_id from existing specs
892        let equivalent_field_ids: HashMap<_, _> = self
893            .metadata
894            .partition_specs
895            .values()
896            .flat_map(|spec| spec.fields())
897            .map(|field| ((field.source_id, &field.transform), field.field_id))
898            .collect();
899
900        // Create new fields with reused field IDs where possible
901        let fields = unbound_spec
902            .fields
903            .into_iter()
904            .map(|mut field| {
905                if field.field_id.is_none()
906                    && let Some(&existing_field_id) =
907                        equivalent_field_ids.get(&(field.source_id, &field.transform))
908                {
909                    field.field_id = Some(existing_field_id);
910                }
911                field
912            })
913            .collect();
914
915        Ok(UnboundPartitionSpec {
916            spec_id: unbound_spec.spec_id,
917            fields,
918        })
919    }
920
921    /// Set the default partition spec.
922    ///
923    /// # Errors
924    /// - spec_id is -1 but no spec has been added via `add_partition_spec`.
925    /// - No partition spec with the provided `spec_id` exists.
926    pub fn set_default_partition_spec(mut self, mut spec_id: i32) -> Result<Self> {
927        if spec_id == Self::LAST_ADDED {
928            spec_id = self.last_added_spec_id.ok_or_else(|| {
929                Error::new(
930                    ErrorKind::DataInvalid,
931                    "Cannot set default partition spec to last added spec: no spec has been added.",
932                )
933            })?;
934        }
935
936        if self.metadata.default_spec.spec_id() == spec_id {
937            return Ok(self);
938        }
939
940        if !self.metadata.partition_specs.contains_key(&spec_id) {
941            return Err(Error::new(
942                ErrorKind::DataInvalid,
943                format!("Cannot set default partition spec to unknown spec with id: '{spec_id}'",),
944            ));
945        }
946
947        let schemaless_spec = self
948            .metadata
949            .partition_specs
950            .get(&spec_id)
951            .ok_or_else(|| {
952                Error::new(
953                    ErrorKind::DataInvalid,
954                    format!(
955                        "Cannot set default partition spec to unknown spec with id: '{spec_id}'",
956                    ),
957                )
958            })?
959            .clone();
960        let spec = Arc::unwrap_or_clone(schemaless_spec);
961        let spec_type = spec.partition_type(self.get_current_schema()?)?;
962        self.metadata.default_spec = Arc::new(spec);
963        self.metadata.default_partition_type = spec_type;
964
965        if self.last_added_spec_id == Some(spec_id) {
966            self.changes.push(TableUpdate::SetDefaultSpec {
967                spec_id: Self::LAST_ADDED,
968            });
969        } else {
970            self.changes.push(TableUpdate::SetDefaultSpec { spec_id });
971        }
972
973        Ok(self)
974    }
975
976    /// Add a partition spec and set it as the default
977    pub fn add_default_partition_spec(self, unbound_spec: UnboundPartitionSpec) -> Result<Self> {
978        self.add_partition_spec(unbound_spec)?
979            .set_default_partition_spec(Self::LAST_ADDED)
980    }
981
982    /// Remove partition specs by their ids from the table metadata.
983    /// Does nothing if a spec id is not present. Active partition specs
984    /// should not be removed.
985    ///
986    /// # Errors
987    /// - Cannot remove the default partition spec.
988    pub fn remove_partition_specs(mut self, spec_ids: &[i32]) -> Result<Self> {
989        if spec_ids.contains(&self.metadata.default_spec.spec_id()) {
990            return Err(Error::new(
991                ErrorKind::DataInvalid,
992                "Cannot remove default partition spec",
993            ));
994        }
995
996        let mut removed_specs = Vec::with_capacity(spec_ids.len());
997        spec_ids.iter().for_each(|id| {
998            if self.metadata.partition_specs.remove(id).is_some() {
999                removed_specs.push(*id);
1000            }
1001        });
1002
1003        if !removed_specs.is_empty() {
1004            self.changes.push(TableUpdate::RemovePartitionSpecs {
1005                spec_ids: removed_specs,
1006            });
1007        }
1008
1009        Ok(self)
1010    }
1011
1012    /// Add a sort order to the table metadata.
1013    ///
1014    /// The spec is bound eagerly to the current schema and must be valid for it.
1015    /// If a schema is added in the same set of changes, the schema should be added first.
1016    ///
1017    /// Even if `sort_order.order_id` is provided, it may not be used.
1018    ///
1019    /// # Errors
1020    /// - Sort order id to add already exists.
1021    /// - Sort order is incompatible with the current schema.
1022    pub fn add_sort_order(mut self, sort_order: SortOrder) -> Result<Self> {
1023        let new_order_id = self.reuse_or_create_new_sort_id(&sort_order);
1024        let sort_order_found = self.metadata.sort_orders.contains_key(&new_order_id);
1025
1026        if sort_order_found {
1027            if self.last_added_order_id != Some(new_order_id) {
1028                self.changes.push(TableUpdate::AddSortOrder {
1029                    sort_order: sort_order.clone().with_order_id(new_order_id),
1030                });
1031                self.last_added_order_id = Some(new_order_id);
1032            }
1033
1034            return Ok(self);
1035        }
1036
1037        let schema = self.get_current_schema()?.clone().as_ref().clone();
1038        let sort_order = SortOrder::builder()
1039            .with_order_id(new_order_id)
1040            .with_fields(sort_order.fields)
1041            .build(&schema)
1042            .map_err(|e| {
1043                Error::new(
1044                    ErrorKind::DataInvalid,
1045                    format!("Sort order to add is incompatible with current schema: {e}"),
1046                )
1047                .with_source(e)
1048            })?;
1049
1050        self.last_added_order_id = Some(new_order_id);
1051        self.metadata
1052            .sort_orders
1053            .insert(new_order_id, sort_order.clone().into());
1054        self.changes.push(TableUpdate::AddSortOrder { sort_order });
1055
1056        Ok(self)
1057    }
1058
1059    /// Set the default sort order. If `sort_order_id` is -1, the last added sort order is set as default.
1060    ///
1061    /// # Errors
1062    /// - sort_order_id is -1 but no sort order has been added via `add_sort_order`.
1063    /// - No sort order with the provided `sort_order_id` exists.
1064    pub fn set_default_sort_order(mut self, mut sort_order_id: i64) -> Result<Self> {
1065        if sort_order_id == Self::LAST_ADDED as i64 {
1066            sort_order_id = self.last_added_order_id.ok_or_else(|| {
1067                Error::new(
1068                    ErrorKind::DataInvalid,
1069                    "Cannot set default sort order to last added order: no order has been added.",
1070                )
1071            })?;
1072        }
1073
1074        if self.metadata.default_sort_order_id == sort_order_id {
1075            return Ok(self);
1076        }
1077
1078        if !self.metadata.sort_orders.contains_key(&sort_order_id) {
1079            return Err(Error::new(
1080                ErrorKind::DataInvalid,
1081                format!(
1082                    "Cannot set default sort order to unknown order with id: '{sort_order_id}'"
1083                ),
1084            ));
1085        }
1086
1087        self.metadata.default_sort_order_id = sort_order_id;
1088
1089        if self.last_added_order_id == Some(sort_order_id) {
1090            self.changes.push(TableUpdate::SetDefaultSortOrder {
1091                sort_order_id: Self::LAST_ADDED as i64,
1092            });
1093        } else {
1094            self.changes
1095                .push(TableUpdate::SetDefaultSortOrder { sort_order_id });
1096        }
1097
1098        Ok(self)
1099    }
1100
1101    /// Add a sort order and set it as the default
1102    fn add_default_sort_order(self, sort_order: SortOrder) -> Result<Self> {
1103        self.add_sort_order(sort_order)?
1104            .set_default_sort_order(Self::LAST_ADDED as i64)
1105    }
1106
1107    /// Add an encryption key to the table metadata.
1108    pub fn add_encryption_key(mut self, key: EncryptedKey) -> Self {
1109        let key_id = key.key_id().to_string();
1110        if self.metadata.encryption_keys.contains_key(&key_id) {
1111            // already exists
1112            return self;
1113        }
1114
1115        self.metadata.encryption_keys.insert(key_id, key.clone());
1116        self.changes.push(TableUpdate::AddEncryptionKey {
1117            encryption_key: key,
1118        });
1119        self
1120    }
1121
1122    /// Remove an encryption key from the table metadata.
1123    pub fn remove_encryption_key(mut self, key_id: &str) -> Self {
1124        if self.metadata.encryption_keys.remove(key_id).is_some() {
1125            self.changes.push(TableUpdate::RemoveEncryptionKey {
1126                key_id: key_id.to_string(),
1127            });
1128        }
1129        self
1130    }
1131
1132    /// Build the table metadata.
1133    pub fn build(mut self) -> Result<TableMetadataBuildResult> {
1134        self.metadata.last_updated_ms = self
1135            .last_updated_ms
1136            .unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
1137
1138        // Check compatibility of the current schema to the default partition spec and sort order.
1139        // We use the `get_xxx` methods from the builder to avoid using the panicking
1140        // `TableMetadata.default_partition_spec` etc. methods.
1141        let schema = self.get_current_schema()?.clone();
1142        let sort_order = Arc::unwrap_or_clone(self.get_default_sort_order()?);
1143
1144        self.metadata.default_spec = Arc::new(
1145            Arc::unwrap_or_clone(self.metadata.default_spec)
1146                .into_unbound()
1147                .bind(schema.clone())?,
1148        );
1149        self.metadata.default_partition_type =
1150            self.metadata.default_spec.partition_type(&schema)?;
1151        SortOrder::builder()
1152            .with_fields(sort_order.fields)
1153            .build(&schema)?;
1154
1155        self.update_snapshot_log()?;
1156        self.metadata.try_normalize()?;
1157
1158        if let Some(hist_entry) = self.previous_history_entry.take() {
1159            self.metadata.metadata_log.push(hist_entry);
1160        }
1161        let expired_metadata_logs = self.expire_metadata_log();
1162
1163        Ok(TableMetadataBuildResult {
1164            metadata: self.metadata,
1165            changes: self.changes,
1166            expired_metadata_logs,
1167        })
1168    }
1169
1170    fn expire_metadata_log(&mut self) -> Vec<MetadataLog> {
1171        let max_size = self
1172            .metadata
1173            .properties
1174            .get(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX)
1175            .and_then(|v| v.parse::<usize>().ok())
1176            .unwrap_or(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)
1177            .max(1);
1178
1179        if self.metadata.metadata_log.len() > max_size {
1180            self.metadata
1181                .metadata_log
1182                .drain(0..self.metadata.metadata_log.len() - max_size)
1183                .collect()
1184        } else {
1185            Vec::new()
1186        }
1187    }
1188
1189    fn update_snapshot_log(&mut self) -> Result<()> {
1190        let intermediate_snapshots = self.get_intermediate_snapshots();
1191        let has_removed_snapshots = self
1192            .changes
1193            .iter()
1194            .any(|update| matches!(update, TableUpdate::RemoveSnapshots { .. }));
1195
1196        if intermediate_snapshots.is_empty() && !has_removed_snapshots {
1197            return Ok(());
1198        }
1199
1200        let mut new_snapshot_log = Vec::new();
1201        for log_entry in &self.metadata.snapshot_log {
1202            let snapshot_id = log_entry.snapshot_id;
1203            if self.metadata.snapshots.contains_key(&snapshot_id) {
1204                if !intermediate_snapshots.contains(&snapshot_id) {
1205                    new_snapshot_log.push(log_entry.clone());
1206                }
1207            } else if has_removed_snapshots {
1208                // any invalid entry causes the history before it to be removed. otherwise, there could be
1209                // history gaps that cause time-travel queries to produce incorrect results. for example,
1210                // if history is [(t1, s1), (t2, s2), (t3, s3)] and s2 is removed, the history cannot be
1211                // [(t1, s1), (t3, s3)] because it appears that s3 was current during the time between t2
1212                // and t3 when in fact s2 was the current snapshot.
1213                new_snapshot_log.clear();
1214            }
1215        }
1216
1217        if let Some(current_snapshot_id) = self.metadata.current_snapshot_id {
1218            let last_id = new_snapshot_log.last().map(|entry| entry.snapshot_id);
1219            if last_id != Some(current_snapshot_id) {
1220                return Err(Error::new(
1221                    ErrorKind::DataInvalid,
1222                    "Cannot set invalid snapshot log: latest entry is not the current snapshot",
1223                ));
1224            }
1225        };
1226
1227        self.metadata.snapshot_log = new_snapshot_log;
1228        Ok(())
1229    }
1230
1231    /// Finds intermediate snapshots that have not been committed as the current snapshot.
1232    ///
1233    /// Transactions can create snapshots that are never the current snapshot because several
1234    /// changes are combined by the transaction into one table metadata update. when each
1235    /// intermediate snapshot is added to table metadata, it is added to the snapshot log, assuming
1236    /// that it will be the current snapshot. when there are multiple snapshot updates, the log must
1237    /// be corrected by suppressing the intermediate snapshot entries.
1238    ///     
1239    /// A snapshot is an intermediate snapshot if it was added but is not the current snapshot.
1240    fn get_intermediate_snapshots(&self) -> HashSet<i64> {
1241        let added_snapshot_ids = self
1242            .changes
1243            .iter()
1244            .filter_map(|update| match update {
1245                TableUpdate::AddSnapshot { snapshot } => Some(snapshot.snapshot_id()),
1246                _ => None,
1247            })
1248            .collect::<HashSet<_>>();
1249
1250        self.changes
1251            .iter()
1252            .filter_map(|update| match update {
1253                TableUpdate::SetSnapshotRef {
1254                    ref_name,
1255                    reference,
1256                } => {
1257                    if added_snapshot_ids.contains(&reference.snapshot_id)
1258                        && ref_name == MAIN_BRANCH
1259                        && reference.snapshot_id
1260                            != self
1261                                .metadata
1262                                .current_snapshot_id
1263                                .unwrap_or(i64::from(Self::LAST_ADDED))
1264                    {
1265                        Some(reference.snapshot_id)
1266                    } else {
1267                        None
1268                    }
1269                }
1270                _ => None,
1271            })
1272            .collect()
1273    }
1274
1275    fn reassign_ids(
1276        schema: Schema,
1277        spec: UnboundPartitionSpec,
1278        sort_order: SortOrder,
1279    ) -> Result<(Schema, PartitionSpec, SortOrder)> {
1280        // Re-assign field ids and schema ids for a new table.
1281        let previous_id_to_name = schema.field_id_to_name_map().clone();
1282        let fresh_schema = schema
1283            .into_builder()
1284            .with_schema_id(DEFAULT_SCHEMA_ID)
1285            .with_reassigned_field_ids(FIRST_FIELD_ID)
1286            .build()?;
1287
1288        // Re-build partition spec with new ids
1289        let mut fresh_spec = PartitionSpecBuilder::new(fresh_schema.clone());
1290        for field in spec.fields() {
1291            let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| {
1292                Error::new(
1293                    ErrorKind::DataInvalid,
1294                    format!(
1295                        "Cannot find source column with id {} for partition column {} in schema.",
1296                        field.source_id, field.name
1297                    ),
1298                )
1299            })?;
1300            fresh_spec =
1301                fresh_spec.add_partition_field(source_field_name, &field.name, field.transform)?;
1302        }
1303        let fresh_spec = fresh_spec.build()?;
1304
1305        // Re-build sort order with new ids
1306        let mut fresh_order = SortOrder::builder();
1307        for mut field in sort_order.fields {
1308            let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| {
1309                Error::new(
1310                    ErrorKind::DataInvalid,
1311                    format!(
1312                        "Cannot find source column with id {} for sort column in schema.",
1313                        field.source_id
1314                    ),
1315                )
1316            })?;
1317            let new_field_id = fresh_schema
1318                       .field_by_name(source_field_name)
1319                       .ok_or_else(|| {
1320                           Error::new(
1321                               ErrorKind::Unexpected,
1322                               format!(
1323                                   "Cannot find source column with name {source_field_name} for sort column in re-assigned schema."
1324                               ),
1325                           )
1326                       })?.id;
1327            field.source_id = new_field_id;
1328            fresh_order.with_sort_field(field);
1329        }
1330        let fresh_sort_order = fresh_order.build(&fresh_schema)?;
1331
1332        Ok((fresh_schema, fresh_spec, fresh_sort_order))
1333    }
1334
1335    fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> i32 {
1336        self.metadata
1337            .schemas
1338            .iter()
1339            .find_map(|(id, schema)| new_schema.is_same_schema(schema).then_some(*id))
1340            .unwrap_or_else(|| self.get_highest_schema_id() + 1)
1341    }
1342
1343    fn get_highest_schema_id(&self) -> i32 {
1344        *self
1345            .metadata
1346            .schemas
1347            .keys()
1348            .max()
1349            .unwrap_or(&self.metadata.current_schema_id)
1350    }
1351
1352    fn get_current_schema(&self) -> Result<&SchemaRef> {
1353        self.metadata
1354            .schemas
1355            .get(&self.metadata.current_schema_id)
1356            .ok_or_else(|| {
1357                Error::new(
1358                    ErrorKind::DataInvalid,
1359                    format!(
1360                        "Current schema with id '{}' not found in table metadata.",
1361                        self.metadata.current_schema_id
1362                    ),
1363                )
1364            })
1365    }
1366
1367    fn get_default_sort_order(&self) -> Result<SortOrderRef> {
1368        self.metadata
1369            .sort_orders
1370            .get(&self.metadata.default_sort_order_id)
1371            .cloned()
1372            .ok_or_else(|| {
1373                Error::new(
1374                    ErrorKind::DataInvalid,
1375                    format!(
1376                        "Default sort order with id '{}' not found in table metadata.",
1377                        self.metadata.default_sort_order_id
1378                    ),
1379                )
1380            })
1381    }
1382
1383    /// If a compatible spec already exists, use the same ID. Otherwise, use 1 more than the highest ID.
1384    fn reuse_or_create_new_spec_id(&self, new_spec: &PartitionSpec) -> i32 {
1385        self.metadata
1386            .partition_specs
1387            .iter()
1388            .find_map(|(id, old_spec)| new_spec.is_compatible_with(old_spec).then_some(*id))
1389            .unwrap_or_else(|| {
1390                self.get_highest_spec_id()
1391                    .map(|id| id + 1)
1392                    .unwrap_or(DEFAULT_PARTITION_SPEC_ID)
1393            })
1394    }
1395
1396    fn get_highest_spec_id(&self) -> Option<i32> {
1397        self.metadata.partition_specs.keys().max().copied()
1398    }
1399
1400    /// If a compatible sort-order already exists, use the same ID. Otherwise, use 1 more than the highest ID.
1401    fn reuse_or_create_new_sort_id(&self, new_sort_order: &SortOrder) -> i64 {
1402        if new_sort_order.is_unsorted() {
1403            return SortOrder::unsorted_order().order_id;
1404        }
1405
1406        self.metadata
1407            .sort_orders
1408            .iter()
1409            .find_map(|(id, sort_order)| {
1410                sort_order.fields.eq(&new_sort_order.fields).then_some(*id)
1411            })
1412            .unwrap_or_else(|| {
1413                self.highest_sort_order_id()
1414                    .unwrap_or(SortOrder::unsorted_order().order_id)
1415                    + 1
1416            })
1417    }
1418
1419    fn highest_sort_order_id(&self) -> Option<i64> {
1420        self.metadata.sort_orders.keys().max().copied()
1421    }
1422
1423    /// Remove schemas by their ids from the table metadata.
1424    /// Does nothing if a schema id is not present. Active schemas should not be removed.
1425    pub fn remove_schemas(mut self, schema_id_to_remove: &[i32]) -> Result<Self> {
1426        if schema_id_to_remove.contains(&self.metadata.current_schema_id) {
1427            return Err(Error::new(
1428                ErrorKind::DataInvalid,
1429                "Cannot remove current schema",
1430            ));
1431        }
1432
1433        if schema_id_to_remove.is_empty() {
1434            return Ok(self);
1435        }
1436
1437        let mut removed_schemas = Vec::with_capacity(schema_id_to_remove.len());
1438        self.metadata.schemas.retain(|id, _schema| {
1439            if schema_id_to_remove.contains(id) {
1440                removed_schemas.push(*id);
1441                false
1442            } else {
1443                true
1444            }
1445        });
1446
1447        self.changes.push(TableUpdate::RemoveSchemas {
1448            schema_ids: removed_schemas,
1449        });
1450
1451        Ok(self)
1452    }
1453}
1454
1455impl From<TableMetadataBuildResult> for TableMetadata {
1456    fn from(result: TableMetadataBuildResult) -> Self {
1457        result.metadata
1458    }
1459}
1460
1461#[cfg(test)]
1462mod tests {
1463    use std::fs::File;
1464    use std::io::BufReader;
1465    use std::thread::sleep;
1466
1467    use super::*;
1468    use crate::TableIdent;
1469    use crate::io::FileIO;
1470    use crate::spec::{
1471        BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec, PrimitiveType, Schema,
1472        SnapshotRetention, SortDirection, SortField, StructType, Summary, TableProperties,
1473        Transform, Type, UnboundPartitionField,
1474    };
1475    use crate::table::Table;
1476    use crate::test_utils::test_runtime;
1477
1478    const TEST_LOCATION: &str = "s3://bucket/test/location";
1479    const LAST_ASSIGNED_COLUMN_ID: i32 = 3;
1480
1481    fn schema() -> Schema {
1482        Schema::builder()
1483            .with_fields(vec![
1484                NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1485                NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
1486                NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
1487            ])
1488            .build()
1489            .unwrap()
1490    }
1491
1492    fn sort_order() -> SortOrder {
1493        let schema = schema();
1494        SortOrder::builder()
1495            .with_order_id(1)
1496            .with_sort_field(SortField {
1497                source_id: 3,
1498                transform: Transform::Bucket(4),
1499                direction: SortDirection::Descending,
1500                null_order: NullOrder::First,
1501            })
1502            .build(&schema)
1503            .unwrap()
1504    }
1505
1506    fn partition_spec() -> UnboundPartitionSpec {
1507        UnboundPartitionSpec::builder()
1508            .with_spec_id(0)
1509            .add_partition_field(2, "y", Transform::Identity)
1510            .unwrap()
1511            .build()
1512    }
1513
1514    fn builder_without_changes(format_version: FormatVersion) -> TableMetadataBuilder {
1515        TableMetadataBuilder::new(
1516            schema(),
1517            partition_spec(),
1518            sort_order(),
1519            TEST_LOCATION.to_string(),
1520            format_version,
1521            HashMap::new(),
1522        )
1523        .unwrap()
1524        .build()
1525        .unwrap()
1526        .metadata
1527        .into_builder(Some(
1528            "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1529        ))
1530    }
1531
1532    #[test]
1533    fn test_minimal_build() {
1534        let metadata = TableMetadataBuilder::new(
1535            schema(),
1536            partition_spec(),
1537            sort_order(),
1538            TEST_LOCATION.to_string(),
1539            FormatVersion::V1,
1540            HashMap::new(),
1541        )
1542        .unwrap()
1543        .build()
1544        .unwrap()
1545        .metadata;
1546
1547        assert_eq!(metadata.format_version, FormatVersion::V1);
1548        assert_eq!(metadata.location, TEST_LOCATION);
1549        assert_eq!(metadata.current_schema_id, 0);
1550        assert_eq!(metadata.default_spec.spec_id(), 0);
1551        assert_eq!(metadata.default_sort_order_id, 1);
1552        assert_eq!(metadata.last_partition_id, 1000);
1553        assert_eq!(metadata.last_column_id, 3);
1554        assert_eq!(metadata.snapshots.len(), 0);
1555        assert_eq!(metadata.current_snapshot_id, None);
1556        assert_eq!(metadata.refs.len(), 0);
1557        assert_eq!(metadata.properties.len(), 0);
1558        assert_eq!(metadata.metadata_log.len(), 0);
1559        assert_eq!(metadata.last_sequence_number, 0);
1560        assert_eq!(metadata.last_column_id, LAST_ASSIGNED_COLUMN_ID);
1561
1562        // Test can serialize v1
1563        let _ = serde_json::to_string(&metadata).unwrap();
1564
1565        // Test can serialize v2
1566        let metadata = metadata
1567            .into_builder(Some(
1568                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1569            ))
1570            .upgrade_format_version(FormatVersion::V2)
1571            .unwrap()
1572            .build()
1573            .unwrap()
1574            .metadata;
1575
1576        assert_eq!(metadata.format_version, FormatVersion::V2);
1577        let _ = serde_json::to_string(&metadata).unwrap();
1578    }
1579
1580    #[test]
1581    fn test_build_unpartitioned_unsorted() {
1582        let schema = Schema::builder().build().unwrap();
1583        let metadata = TableMetadataBuilder::new(
1584            schema.clone(),
1585            PartitionSpec::unpartition_spec(),
1586            SortOrder::unsorted_order(),
1587            TEST_LOCATION.to_string(),
1588            FormatVersion::V2,
1589            HashMap::new(),
1590        )
1591        .unwrap()
1592        .build()
1593        .unwrap()
1594        .metadata;
1595
1596        assert_eq!(metadata.format_version, FormatVersion::V2);
1597        assert_eq!(metadata.location, TEST_LOCATION);
1598        assert_eq!(metadata.current_schema_id, 0);
1599        assert_eq!(metadata.default_spec.spec_id(), 0);
1600        assert_eq!(metadata.default_sort_order_id, 0);
1601        assert_eq!(metadata.last_partition_id, UNPARTITIONED_LAST_ASSIGNED_ID);
1602        assert_eq!(metadata.last_column_id, 0);
1603        assert_eq!(metadata.snapshots.len(), 0);
1604        assert_eq!(metadata.current_snapshot_id, None);
1605        assert_eq!(metadata.refs.len(), 0);
1606        assert_eq!(metadata.properties.len(), 0);
1607        assert_eq!(metadata.metadata_log.len(), 0);
1608        assert_eq!(metadata.last_sequence_number, 0);
1609    }
1610
1611    #[test]
1612    fn test_reassigns_ids() {
1613        let schema = Schema::builder()
1614            .with_schema_id(10)
1615            .with_fields(vec![
1616                NestedField::required(11, "a", Type::Primitive(PrimitiveType::Long)).into(),
1617                NestedField::required(12, "b", Type::Primitive(PrimitiveType::Long)).into(),
1618                NestedField::required(
1619                    13,
1620                    "struct",
1621                    Type::Struct(StructType::new(vec![
1622                        NestedField::required(14, "nested", Type::Primitive(PrimitiveType::Long))
1623                            .into(),
1624                    ])),
1625                )
1626                .into(),
1627                NestedField::required(15, "c", Type::Primitive(PrimitiveType::Long)).into(),
1628            ])
1629            .build()
1630            .unwrap();
1631        let spec = PartitionSpec::builder(schema.clone())
1632            .with_spec_id(20)
1633            .add_partition_field("a", "a", Transform::Identity)
1634            .unwrap()
1635            .add_partition_field("struct.nested", "nested_partition", Transform::Identity)
1636            .unwrap()
1637            .build()
1638            .unwrap();
1639        let sort_order = SortOrder::builder()
1640            .with_fields(vec![SortField {
1641                source_id: 11,
1642                transform: Transform::Identity,
1643                direction: SortDirection::Ascending,
1644                null_order: NullOrder::First,
1645            }])
1646            .with_order_id(10)
1647            .build(&schema)
1648            .unwrap();
1649
1650        let (fresh_schema, fresh_spec, fresh_sort_order) =
1651            TableMetadataBuilder::reassign_ids(schema, spec.into_unbound(), sort_order).unwrap();
1652
1653        let expected_schema = Schema::builder()
1654            .with_fields(vec![
1655                NestedField::required(1, "a", Type::Primitive(PrimitiveType::Long)).into(),
1656                NestedField::required(2, "b", Type::Primitive(PrimitiveType::Long)).into(),
1657                NestedField::required(
1658                    3,
1659                    "struct",
1660                    Type::Struct(StructType::new(vec![
1661                        NestedField::required(5, "nested", Type::Primitive(PrimitiveType::Long))
1662                            .into(),
1663                    ])),
1664                )
1665                .into(),
1666                NestedField::required(4, "c", Type::Primitive(PrimitiveType::Long)).into(),
1667            ])
1668            .build()
1669            .unwrap();
1670
1671        let expected_spec = PartitionSpec::builder(expected_schema.clone())
1672            .with_spec_id(0)
1673            .add_partition_field("a", "a", Transform::Identity)
1674            .unwrap()
1675            .add_partition_field("struct.nested", "nested_partition", Transform::Identity)
1676            .unwrap()
1677            .build()
1678            .unwrap();
1679
1680        let expected_sort_order = SortOrder::builder()
1681            .with_fields(vec![SortField {
1682                source_id: 1,
1683                transform: Transform::Identity,
1684                direction: SortDirection::Ascending,
1685                null_order: NullOrder::First,
1686            }])
1687            .with_order_id(1)
1688            .build(&expected_schema)
1689            .unwrap();
1690
1691        assert_eq!(fresh_schema, expected_schema);
1692        assert_eq!(fresh_spec, expected_spec);
1693        assert_eq!(fresh_sort_order, expected_sort_order);
1694    }
1695
1696    #[test]
1697    fn test_ids_are_reassigned_for_new_metadata() {
1698        let schema = schema().into_builder().with_schema_id(10).build().unwrap();
1699
1700        let metadata = TableMetadataBuilder::new(
1701            schema,
1702            partition_spec(),
1703            sort_order(),
1704            TEST_LOCATION.to_string(),
1705            FormatVersion::V1,
1706            HashMap::new(),
1707        )
1708        .unwrap()
1709        .build()
1710        .unwrap()
1711        .metadata;
1712
1713        assert_eq!(metadata.current_schema_id, 0);
1714        assert_eq!(metadata.current_schema().schema_id(), 0);
1715    }
1716
1717    #[test]
1718    fn test_new_metadata_changes() {
1719        let changes = TableMetadataBuilder::new(
1720            schema(),
1721            partition_spec(),
1722            sort_order(),
1723            TEST_LOCATION.to_string(),
1724            FormatVersion::V1,
1725            HashMap::from_iter(vec![("property 1".to_string(), "value 1".to_string())]),
1726        )
1727        .unwrap()
1728        .build()
1729        .unwrap()
1730        .changes;
1731
1732        pretty_assertions::assert_eq!(changes, vec![
1733            TableUpdate::SetLocation {
1734                location: TEST_LOCATION.to_string()
1735            },
1736            TableUpdate::AddSchema { schema: schema() },
1737            TableUpdate::SetCurrentSchema { schema_id: -1 },
1738            TableUpdate::AddSpec {
1739                // Because this is a new tables, field-ids are assigned
1740                // partition_spec() has None set for field-id
1741                spec: PartitionSpec::builder(schema())
1742                    .with_spec_id(0)
1743                    .add_unbound_field(UnboundPartitionField {
1744                        name: "y".to_string(),
1745                        transform: Transform::Identity,
1746                        source_id: 2,
1747                        field_id: Some(1000)
1748                    })
1749                    .unwrap()
1750                    .build()
1751                    .unwrap()
1752                    .into_unbound(),
1753            },
1754            TableUpdate::SetDefaultSpec { spec_id: -1 },
1755            TableUpdate::AddSortOrder {
1756                sort_order: sort_order(),
1757            },
1758            TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
1759            TableUpdate::SetProperties {
1760                updates: HashMap::from_iter(vec![(
1761                    "property 1".to_string(),
1762                    "value 1".to_string()
1763                )]),
1764            }
1765        ]);
1766    }
1767
1768    #[test]
1769    fn test_new_metadata_changes_unpartitioned_unsorted() {
1770        let schema = Schema::builder().build().unwrap();
1771        let changes = TableMetadataBuilder::new(
1772            schema.clone(),
1773            PartitionSpec::unpartition_spec().into_unbound(),
1774            SortOrder::unsorted_order(),
1775            TEST_LOCATION.to_string(),
1776            FormatVersion::V1,
1777            HashMap::new(),
1778        )
1779        .unwrap()
1780        .build()
1781        .unwrap()
1782        .changes;
1783
1784        pretty_assertions::assert_eq!(changes, vec![
1785            TableUpdate::SetLocation {
1786                location: TEST_LOCATION.to_string()
1787            },
1788            TableUpdate::AddSchema {
1789                schema: Schema::builder().build().unwrap(),
1790            },
1791            TableUpdate::SetCurrentSchema { schema_id: -1 },
1792            TableUpdate::AddSpec {
1793                // Because this is a new tables, field-ids are assigned
1794                // partition_spec() has None set for field-id
1795                spec: PartitionSpec::builder(schema)
1796                    .with_spec_id(0)
1797                    .build()
1798                    .unwrap()
1799                    .into_unbound(),
1800            },
1801            TableUpdate::SetDefaultSpec { spec_id: -1 },
1802            TableUpdate::AddSortOrder {
1803                sort_order: SortOrder::unsorted_order(),
1804            },
1805            TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
1806        ]);
1807    }
1808
1809    #[test]
1810    fn test_add_partition_spec() {
1811        let builder = builder_without_changes(FormatVersion::V2);
1812
1813        let added_spec = UnboundPartitionSpec::builder()
1814            .with_spec_id(10)
1815            .add_partition_fields(vec![
1816                UnboundPartitionField {
1817                    // The previous field - has field_id set
1818                    name: "y".to_string(),
1819                    transform: Transform::Identity,
1820                    source_id: 2,
1821                    field_id: Some(1000),
1822                },
1823                UnboundPartitionField {
1824                    // A new field without field id - should still be without field id in changes
1825                    name: "z".to_string(),
1826                    transform: Transform::Identity,
1827                    source_id: 3,
1828                    field_id: None,
1829                },
1830            ])
1831            .unwrap()
1832            .build();
1833
1834        let build_result = builder
1835            .add_partition_spec(added_spec.clone())
1836            .unwrap()
1837            .build()
1838            .unwrap();
1839
1840        // Spec id should be re-assigned
1841        let expected_change = added_spec.with_spec_id(1);
1842        let expected_spec = PartitionSpec::builder(schema())
1843            .with_spec_id(1)
1844            .add_unbound_field(UnboundPartitionField {
1845                name: "y".to_string(),
1846                transform: Transform::Identity,
1847                source_id: 2,
1848                field_id: Some(1000),
1849            })
1850            .unwrap()
1851            .add_unbound_field(UnboundPartitionField {
1852                name: "z".to_string(),
1853                transform: Transform::Identity,
1854                source_id: 3,
1855                field_id: Some(1001),
1856            })
1857            .unwrap()
1858            .build()
1859            .unwrap();
1860
1861        assert_eq!(build_result.changes.len(), 1);
1862        assert_eq!(
1863            build_result.metadata.partition_spec_by_id(1),
1864            Some(&Arc::new(expected_spec))
1865        );
1866        assert_eq!(build_result.metadata.default_spec.spec_id(), 0);
1867        assert_eq!(build_result.metadata.last_partition_id, 1001);
1868        pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSpec {
1869            spec: expected_change
1870        });
1871
1872        // Remove the spec
1873        let build_result = build_result
1874            .metadata
1875            .into_builder(Some(
1876                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1877            ))
1878            .remove_partition_specs(&[1])
1879            .unwrap()
1880            .build()
1881            .unwrap();
1882
1883        assert_eq!(build_result.changes.len(), 1);
1884        assert_eq!(build_result.metadata.partition_specs.len(), 1);
1885        assert!(build_result.metadata.partition_spec_by_id(1).is_none());
1886    }
1887
1888    #[test]
1889    fn test_set_default_partition_spec() {
1890        let builder = builder_without_changes(FormatVersion::V2);
1891        let schema = builder.get_current_schema().unwrap().clone();
1892        let added_spec = UnboundPartitionSpec::builder()
1893            .with_spec_id(10)
1894            .add_partition_field(1, "y_bucket[2]", Transform::Bucket(2))
1895            .unwrap()
1896            .build();
1897
1898        let build_result = builder
1899            .add_partition_spec(added_spec.clone())
1900            .unwrap()
1901            .set_default_partition_spec(-1)
1902            .unwrap()
1903            .build()
1904            .unwrap();
1905
1906        let expected_spec = PartitionSpec::builder(schema)
1907            .with_spec_id(1)
1908            .add_unbound_field(UnboundPartitionField {
1909                name: "y_bucket[2]".to_string(),
1910                transform: Transform::Bucket(2),
1911                source_id: 1,
1912                field_id: Some(1001),
1913            })
1914            .unwrap()
1915            .build()
1916            .unwrap();
1917
1918        assert_eq!(build_result.changes.len(), 2);
1919        assert_eq!(build_result.metadata.default_spec, Arc::new(expected_spec));
1920        assert_eq!(build_result.changes, vec![
1921            TableUpdate::AddSpec {
1922                // Should contain the actual ID that was used
1923                spec: added_spec.with_spec_id(1)
1924            },
1925            TableUpdate::SetDefaultSpec { spec_id: -1 }
1926        ]);
1927    }
1928
1929    #[test]
1930    fn test_set_existing_default_partition_spec() {
1931        let builder = builder_without_changes(FormatVersion::V2);
1932        // Add and set an unbound spec as current
1933        let unbound_spec = UnboundPartitionSpec::builder().with_spec_id(1).build();
1934        let build_result = builder
1935            .add_partition_spec(unbound_spec.clone())
1936            .unwrap()
1937            .set_default_partition_spec(-1)
1938            .unwrap()
1939            .build()
1940            .unwrap();
1941
1942        assert_eq!(build_result.changes.len(), 2);
1943        assert_eq!(build_result.changes[0], TableUpdate::AddSpec {
1944            spec: unbound_spec.clone()
1945        });
1946        assert_eq!(build_result.changes[1], TableUpdate::SetDefaultSpec {
1947            spec_id: -1
1948        });
1949        assert_eq!(
1950            build_result.metadata.default_spec,
1951            Arc::new(
1952                unbound_spec
1953                    .bind(build_result.metadata.current_schema().clone())
1954                    .unwrap()
1955            )
1956        );
1957
1958        // Set old spec again
1959        let build_result = build_result
1960            .metadata
1961            .into_builder(Some(
1962                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
1963            ))
1964            .set_default_partition_spec(0)
1965            .unwrap()
1966            .build()
1967            .unwrap();
1968
1969        assert_eq!(build_result.changes.len(), 1);
1970        assert_eq!(build_result.changes[0], TableUpdate::SetDefaultSpec {
1971            spec_id: 0
1972        });
1973        assert_eq!(
1974            build_result.metadata.default_spec,
1975            Arc::new(
1976                partition_spec()
1977                    .bind(build_result.metadata.current_schema().clone())
1978                    .unwrap()
1979            )
1980        );
1981    }
1982
1983    #[test]
1984    fn test_add_sort_order() {
1985        let builder = builder_without_changes(FormatVersion::V2);
1986
1987        let added_sort_order = SortOrder::builder()
1988            .with_order_id(10)
1989            .with_fields(vec![SortField {
1990                source_id: 1,
1991                transform: Transform::Identity,
1992                direction: SortDirection::Ascending,
1993                null_order: NullOrder::First,
1994            }])
1995            .build(&schema())
1996            .unwrap();
1997
1998        let build_result = builder
1999            .add_sort_order(added_sort_order.clone())
2000            .unwrap()
2001            .build()
2002            .unwrap();
2003
2004        let expected_sort_order = added_sort_order.with_order_id(2);
2005
2006        assert_eq!(build_result.changes.len(), 1);
2007        assert_eq!(build_result.metadata.sort_orders.keys().max(), Some(&2));
2008        pretty_assertions::assert_eq!(
2009            build_result.metadata.sort_order_by_id(2),
2010            Some(&Arc::new(expected_sort_order.clone()))
2011        );
2012        pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSortOrder {
2013            sort_order: expected_sort_order
2014        });
2015    }
2016
2017    #[test]
2018    fn test_add_compatible_schema() {
2019        let builder = builder_without_changes(FormatVersion::V2);
2020
2021        let added_schema = Schema::builder()
2022            .with_schema_id(1)
2023            .with_fields(vec![
2024                NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
2025                NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
2026                NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
2027                NestedField::required(4, "a", Type::Primitive(PrimitiveType::Long)).into(),
2028            ])
2029            .build()
2030            .unwrap();
2031
2032        let build_result = builder
2033            .add_current_schema(added_schema.clone())
2034            .unwrap()
2035            .build()
2036            .unwrap();
2037
2038        assert_eq!(build_result.changes.len(), 2);
2039        assert_eq!(build_result.metadata.schemas.keys().max(), Some(&1));
2040        pretty_assertions::assert_eq!(
2041            build_result.metadata.schema_by_id(1),
2042            Some(&Arc::new(added_schema.clone()))
2043        );
2044        pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSchema {
2045            schema: added_schema
2046        });
2047        assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
2048            schema_id: -1
2049        });
2050    }
2051
2052    #[test]
2053    fn test_set_current_schema_change_is_minus_one_if_schema_was_added_in_this_change() {
2054        let builder = builder_without_changes(FormatVersion::V2);
2055
2056        let added_schema = Schema::builder()
2057            .with_schema_id(1)
2058            .with_fields(vec![
2059                NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
2060                NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)).into(),
2061                NestedField::required(3, "z", Type::Primitive(PrimitiveType::Long)).into(),
2062                NestedField::required(4, "a", Type::Primitive(PrimitiveType::Long)).into(),
2063            ])
2064            .build()
2065            .unwrap();
2066
2067        let build_result = builder
2068            .add_schema(added_schema.clone())
2069            .unwrap()
2070            .set_current_schema(1)
2071            .unwrap()
2072            .build()
2073            .unwrap();
2074
2075        assert_eq!(build_result.changes.len(), 2);
2076        assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
2077            schema_id: -1
2078        });
2079    }
2080
2081    #[test]
2082    fn test_no_metadata_log_for_create_table() {
2083        let build_result = TableMetadataBuilder::new(
2084            schema(),
2085            partition_spec(),
2086            sort_order(),
2087            TEST_LOCATION.to_string(),
2088            FormatVersion::V2,
2089            HashMap::new(),
2090        )
2091        .unwrap()
2092        .build()
2093        .unwrap();
2094
2095        assert_eq!(build_result.metadata.metadata_log.len(), 0);
2096    }
2097
2098    #[test]
2099    fn test_no_metadata_log_entry_for_no_previous_location() {
2100        // Used for first commit after stage-creation of tables
2101        let metadata = builder_without_changes(FormatVersion::V2)
2102            .build()
2103            .unwrap()
2104            .metadata;
2105        assert_eq!(metadata.metadata_log.len(), 1);
2106
2107        let build_result = metadata
2108            .into_builder(None)
2109            .set_properties(HashMap::from_iter(vec![(
2110                "foo".to_string(),
2111                "bar".to_string(),
2112            )]))
2113            .unwrap()
2114            .build()
2115            .unwrap();
2116
2117        assert_eq!(build_result.metadata.metadata_log.len(), 1);
2118    }
2119
2120    #[test]
2121    fn test_from_metadata_generates_metadata_log() {
2122        let metadata_path = "s3://bucket/test/location/metadata/metadata1.json";
2123        let builder = TableMetadataBuilder::new(
2124            schema(),
2125            partition_spec(),
2126            sort_order(),
2127            TEST_LOCATION.to_string(),
2128            FormatVersion::V2,
2129            HashMap::new(),
2130        )
2131        .unwrap()
2132        .build()
2133        .unwrap()
2134        .metadata
2135        .into_builder(Some(metadata_path.to_string()));
2136
2137        let builder = builder
2138            .add_default_sort_order(SortOrder::unsorted_order())
2139            .unwrap();
2140
2141        let build_result = builder.build().unwrap();
2142
2143        assert_eq!(build_result.metadata.metadata_log.len(), 1);
2144        assert_eq!(
2145            build_result.metadata.metadata_log[0].metadata_file,
2146            metadata_path
2147        );
2148    }
2149
2150    #[test]
2151    fn test_set_ref() {
2152        let builder = builder_without_changes(FormatVersion::V2);
2153
2154        let snapshot = Snapshot::builder()
2155            .with_snapshot_id(1)
2156            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2157            .with_sequence_number(0)
2158            .with_schema_id(0)
2159            .with_manifest_list("/snap-1.avro")
2160            .with_summary(Summary {
2161                operation: Operation::Append,
2162                additional_properties: HashMap::from_iter(vec![
2163                    (
2164                        "spark.app.id".to_string(),
2165                        "local-1662532784305".to_string(),
2166                    ),
2167                    ("added-data-files".to_string(), "4".to_string()),
2168                    ("added-records".to_string(), "4".to_string()),
2169                    ("added-files-size".to_string(), "6001".to_string()),
2170                ]),
2171            })
2172            .build();
2173
2174        let builder = builder.add_snapshot(snapshot.clone()).unwrap();
2175
2176        assert!(
2177            builder
2178                .clone()
2179                .set_ref(MAIN_BRANCH, SnapshotReference {
2180                    snapshot_id: 10,
2181                    retention: SnapshotRetention::Branch {
2182                        min_snapshots_to_keep: Some(10),
2183                        max_snapshot_age_ms: None,
2184                        max_ref_age_ms: None,
2185                    },
2186                })
2187                .unwrap_err()
2188                .to_string()
2189                .contains("Cannot set 'main' to unknown snapshot: '10'")
2190        );
2191
2192        let build_result = builder
2193            .set_ref(MAIN_BRANCH, SnapshotReference {
2194                snapshot_id: 1,
2195                retention: SnapshotRetention::Branch {
2196                    min_snapshots_to_keep: Some(10),
2197                    max_snapshot_age_ms: None,
2198                    max_ref_age_ms: None,
2199                },
2200            })
2201            .unwrap()
2202            .build()
2203            .unwrap();
2204        assert_eq!(build_result.metadata.snapshots.len(), 1);
2205        assert_eq!(
2206            build_result.metadata.snapshot_by_id(1),
2207            Some(&Arc::new(snapshot.clone()))
2208        );
2209        assert_eq!(build_result.metadata.snapshot_log, vec![SnapshotLog {
2210            snapshot_id: 1,
2211            timestamp_ms: snapshot.timestamp_ms()
2212        }])
2213    }
2214
2215    #[test]
2216    fn test_snapshot_log_skips_intermediates() {
2217        let builder = builder_without_changes(FormatVersion::V2);
2218
2219        let snapshot_1 = Snapshot::builder()
2220            .with_snapshot_id(1)
2221            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2222            .with_sequence_number(0)
2223            .with_schema_id(0)
2224            .with_manifest_list("/snap-1.avro")
2225            .with_summary(Summary {
2226                operation: Operation::Append,
2227                additional_properties: HashMap::from_iter(vec![
2228                    (
2229                        "spark.app.id".to_string(),
2230                        "local-1662532784305".to_string(),
2231                    ),
2232                    ("added-data-files".to_string(), "4".to_string()),
2233                    ("added-records".to_string(), "4".to_string()),
2234                    ("added-files-size".to_string(), "6001".to_string()),
2235                ]),
2236            })
2237            .build();
2238
2239        let snapshot_2 = Snapshot::builder()
2240            .with_snapshot_id(2)
2241            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2242            .with_sequence_number(0)
2243            .with_schema_id(0)
2244            .with_manifest_list("/snap-1.avro")
2245            .with_summary(Summary {
2246                operation: Operation::Append,
2247                additional_properties: HashMap::from_iter(vec![
2248                    (
2249                        "spark.app.id".to_string(),
2250                        "local-1662532784305".to_string(),
2251                    ),
2252                    ("added-data-files".to_string(), "4".to_string()),
2253                    ("added-records".to_string(), "4".to_string()),
2254                    ("added-files-size".to_string(), "6001".to_string()),
2255                ]),
2256            })
2257            .build();
2258
2259        let result = builder
2260            .add_snapshot(snapshot_1)
2261            .unwrap()
2262            .set_ref(MAIN_BRANCH, SnapshotReference {
2263                snapshot_id: 1,
2264                retention: SnapshotRetention::Branch {
2265                    min_snapshots_to_keep: Some(10),
2266                    max_snapshot_age_ms: None,
2267                    max_ref_age_ms: None,
2268                },
2269            })
2270            .unwrap()
2271            .set_branch_snapshot(snapshot_2.clone(), MAIN_BRANCH)
2272            .unwrap()
2273            .build()
2274            .unwrap();
2275
2276        assert_eq!(result.metadata.snapshot_log, vec![SnapshotLog {
2277            snapshot_id: 2,
2278            timestamp_ms: snapshot_2.timestamp_ms()
2279        }]);
2280        assert_eq!(result.metadata.current_snapshot().unwrap().snapshot_id(), 2);
2281    }
2282
2283    #[test]
2284    fn test_remove_main_ref_keeps_snapshot_log() {
2285        let builder = builder_without_changes(FormatVersion::V2);
2286
2287        let snapshot = Snapshot::builder()
2288            .with_snapshot_id(1)
2289            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2290            .with_sequence_number(0)
2291            .with_schema_id(0)
2292            .with_manifest_list("/snap-1.avro")
2293            .with_summary(Summary {
2294                operation: Operation::Append,
2295                additional_properties: HashMap::from_iter(vec![
2296                    (
2297                        "spark.app.id".to_string(),
2298                        "local-1662532784305".to_string(),
2299                    ),
2300                    ("added-data-files".to_string(), "4".to_string()),
2301                    ("added-records".to_string(), "4".to_string()),
2302                    ("added-files-size".to_string(), "6001".to_string()),
2303                ]),
2304            })
2305            .build();
2306
2307        let result = builder
2308            .add_snapshot(snapshot.clone())
2309            .unwrap()
2310            .set_ref(MAIN_BRANCH, SnapshotReference {
2311                snapshot_id: 1,
2312                retention: SnapshotRetention::Branch {
2313                    min_snapshots_to_keep: Some(10),
2314                    max_snapshot_age_ms: None,
2315                    max_ref_age_ms: None,
2316                },
2317            })
2318            .unwrap()
2319            .build()
2320            .unwrap();
2321
2322        // Verify snapshot log was created
2323        assert_eq!(result.metadata.snapshot_log.len(), 1);
2324        assert_eq!(result.metadata.snapshot_log[0].snapshot_id, 1);
2325        assert_eq!(result.metadata.current_snapshot_id, Some(1));
2326
2327        // Remove the main ref
2328        let result_after_remove = result
2329            .metadata
2330            .into_builder(Some(
2331                "s3://bucket/test/location/metadata/metadata2.json".to_string(),
2332            ))
2333            .remove_ref(MAIN_BRANCH)
2334            .build()
2335            .unwrap();
2336
2337        // Verify snapshot log is kept even after removing main ref
2338        assert_eq!(result_after_remove.metadata.snapshot_log.len(), 1);
2339        assert_eq!(result_after_remove.metadata.snapshot_log[0].snapshot_id, 1);
2340        assert_eq!(result_after_remove.metadata.current_snapshot_id, None);
2341        assert_eq!(result_after_remove.changes.len(), 1);
2342        assert_eq!(
2343            result_after_remove.changes[0],
2344            TableUpdate::RemoveSnapshotRef {
2345                ref_name: MAIN_BRANCH.to_string()
2346            }
2347        );
2348    }
2349
2350    #[test]
2351    fn test_set_branch_snapshot_creates_branch_if_not_exists() {
2352        let builder = builder_without_changes(FormatVersion::V2);
2353
2354        let snapshot = Snapshot::builder()
2355            .with_snapshot_id(2)
2356            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2357            .with_sequence_number(0)
2358            .with_schema_id(0)
2359            .with_manifest_list("/snap-1.avro")
2360            .with_summary(Summary {
2361                operation: Operation::Append,
2362                additional_properties: HashMap::new(),
2363            })
2364            .build();
2365
2366        let build_result = builder
2367            .set_branch_snapshot(snapshot.clone(), "new_branch")
2368            .unwrap()
2369            .build()
2370            .unwrap();
2371
2372        let reference = SnapshotReference {
2373            snapshot_id: 2,
2374            retention: SnapshotRetention::Branch {
2375                min_snapshots_to_keep: None,
2376                max_snapshot_age_ms: None,
2377                max_ref_age_ms: None,
2378            },
2379        };
2380
2381        assert_eq!(build_result.metadata.refs.len(), 1);
2382        assert_eq!(
2383            build_result.metadata.refs.get("new_branch"),
2384            Some(&reference)
2385        );
2386        assert_eq!(build_result.changes, vec![
2387            TableUpdate::AddSnapshot { snapshot },
2388            TableUpdate::SetSnapshotRef {
2389                ref_name: "new_branch".to_string(),
2390                reference
2391            }
2392        ]);
2393    }
2394
2395    #[test]
2396    fn test_cannot_add_duplicate_snapshot_id() {
2397        let builder = builder_without_changes(FormatVersion::V2);
2398
2399        let snapshot = Snapshot::builder()
2400            .with_snapshot_id(2)
2401            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2402            .with_sequence_number(0)
2403            .with_schema_id(0)
2404            .with_manifest_list("/snap-1.avro")
2405            .with_summary(Summary {
2406                operation: Operation::Append,
2407                additional_properties: HashMap::from_iter(vec![
2408                    (
2409                        "spark.app.id".to_string(),
2410                        "local-1662532784305".to_string(),
2411                    ),
2412                    ("added-data-files".to_string(), "4".to_string()),
2413                    ("added-records".to_string(), "4".to_string()),
2414                    ("added-files-size".to_string(), "6001".to_string()),
2415                ]),
2416            })
2417            .build();
2418
2419        let builder = builder.add_snapshot(snapshot.clone()).unwrap();
2420        builder.add_snapshot(snapshot).unwrap_err();
2421    }
2422
2423    #[test]
2424    fn test_add_incompatible_current_schema_fails() {
2425        let builder = builder_without_changes(FormatVersion::V2);
2426
2427        let added_schema = Schema::builder()
2428            .with_schema_id(1)
2429            .with_fields(vec![])
2430            .build()
2431            .unwrap();
2432
2433        let err = builder
2434            .add_current_schema(added_schema)
2435            .unwrap()
2436            .build()
2437            .unwrap_err();
2438
2439        assert!(
2440            err.to_string()
2441                .contains("Cannot find partition source field")
2442        );
2443    }
2444
2445    #[test]
2446    fn test_add_partition_spec_for_v1_requires_sequential_ids() {
2447        let builder = builder_without_changes(FormatVersion::V1);
2448
2449        let added_spec = UnboundPartitionSpec::builder()
2450            .with_spec_id(10)
2451            .add_partition_fields(vec![
2452                UnboundPartitionField {
2453                    name: "y".to_string(),
2454                    transform: Transform::Identity,
2455                    source_id: 2,
2456                    field_id: Some(1000),
2457                },
2458                UnboundPartitionField {
2459                    name: "z".to_string(),
2460                    transform: Transform::Identity,
2461                    source_id: 3,
2462                    field_id: Some(1002),
2463                },
2464            ])
2465            .unwrap()
2466            .build();
2467
2468        let err = builder.add_partition_spec(added_spec).unwrap_err();
2469        assert!(err.to_string().contains(
2470            "Cannot add partition spec with non-sequential field ids to format version 1 table"
2471        ));
2472    }
2473
2474    #[test]
2475    fn test_expire_metadata_log() {
2476        let builder = builder_without_changes(FormatVersion::V2);
2477        let metadata = builder
2478            .set_properties(HashMap::from_iter(vec![(
2479                TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX.to_string(),
2480                "2".to_string(),
2481            )]))
2482            .unwrap()
2483            .build()
2484            .unwrap();
2485        assert_eq!(metadata.metadata.metadata_log.len(), 1);
2486        assert_eq!(metadata.expired_metadata_logs.len(), 0);
2487
2488        let metadata = metadata
2489            .metadata
2490            .into_builder(Some("path2".to_string()))
2491            .set_properties(HashMap::from_iter(vec![(
2492                "change_nr".to_string(),
2493                "1".to_string(),
2494            )]))
2495            .unwrap()
2496            .build()
2497            .unwrap();
2498
2499        assert_eq!(metadata.metadata.metadata_log.len(), 2);
2500        assert_eq!(metadata.expired_metadata_logs.len(), 0);
2501
2502        let metadata = metadata
2503            .metadata
2504            .into_builder(Some("path2".to_string()))
2505            .set_properties(HashMap::from_iter(vec![(
2506                "change_nr".to_string(),
2507                "2".to_string(),
2508            )]))
2509            .unwrap()
2510            .build()
2511            .unwrap();
2512        assert_eq!(metadata.metadata.metadata_log.len(), 2);
2513        assert_eq!(metadata.expired_metadata_logs.len(), 1);
2514    }
2515
2516    #[test]
2517    fn test_v2_sequence_number_cannot_decrease() {
2518        let builder = builder_without_changes(FormatVersion::V2);
2519
2520        let snapshot = Snapshot::builder()
2521            .with_snapshot_id(1)
2522            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2523            .with_sequence_number(1)
2524            .with_schema_id(0)
2525            .with_manifest_list("/snap-1")
2526            .with_summary(Summary {
2527                operation: Operation::Append,
2528                additional_properties: HashMap::new(),
2529            })
2530            .build();
2531
2532        let builder = builder
2533            .add_snapshot(snapshot.clone())
2534            .unwrap()
2535            .set_ref(MAIN_BRANCH, SnapshotReference {
2536                snapshot_id: 1,
2537                retention: SnapshotRetention::Branch {
2538                    min_snapshots_to_keep: Some(10),
2539                    max_snapshot_age_ms: None,
2540                    max_ref_age_ms: None,
2541                },
2542            })
2543            .unwrap();
2544
2545        let snapshot = Snapshot::builder()
2546            .with_snapshot_id(2)
2547            .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
2548            .with_sequence_number(0)
2549            .with_schema_id(0)
2550            .with_manifest_list("/snap-0")
2551            .with_parent_snapshot_id(Some(1))
2552            .with_summary(Summary {
2553                operation: Operation::Append,
2554                additional_properties: HashMap::new(),
2555            })
2556            .build();
2557
2558        let err = builder
2559            .set_branch_snapshot(snapshot, MAIN_BRANCH)
2560            .unwrap_err();
2561        assert!(
2562            err.to_string()
2563                .contains("Cannot add snapshot with sequence number")
2564        );
2565    }
2566
2567    #[test]
2568    fn test_default_spec_cannot_be_removed() {
2569        let builder = builder_without_changes(FormatVersion::V2);
2570
2571        builder.remove_partition_specs(&[0]).unwrap_err();
2572    }
2573
2574    #[test]
2575    fn test_statistics() {
2576        let builder = builder_without_changes(FormatVersion::V2);
2577
2578        let statistics = StatisticsFile {
2579            snapshot_id: 3055729675574597004,
2580            statistics_path: "s3://a/b/stats.puffin".to_string(),
2581            file_size_in_bytes: 413,
2582            file_footer_size_in_bytes: 42,
2583            key_metadata: None,
2584            blob_metadata: vec![BlobMetadata {
2585                snapshot_id: 3055729675574597004,
2586                sequence_number: 1,
2587                fields: vec![1],
2588                r#type: "ndv".to_string(),
2589                properties: HashMap::new(),
2590            }],
2591        };
2592        let build_result = builder.set_statistics(statistics.clone()).build().unwrap();
2593
2594        assert_eq!(
2595            build_result.metadata.statistics,
2596            HashMap::from_iter(vec![(3055729675574597004, statistics.clone())])
2597        );
2598        assert_eq!(build_result.changes, vec![TableUpdate::SetStatistics {
2599            statistics: statistics.clone()
2600        }]);
2601
2602        // Remove
2603        let builder = build_result.metadata.into_builder(None);
2604        let build_result = builder
2605            .remove_statistics(statistics.snapshot_id)
2606            .build()
2607            .unwrap();
2608
2609        assert_eq!(build_result.metadata.statistics.len(), 0);
2610        assert_eq!(build_result.changes, vec![TableUpdate::RemoveStatistics {
2611            snapshot_id: statistics.snapshot_id
2612        }]);
2613
2614        // Remove again yields no changes
2615        let builder = build_result.metadata.into_builder(None);
2616        let build_result = builder
2617            .remove_statistics(statistics.snapshot_id)
2618            .build()
2619            .unwrap();
2620        assert_eq!(build_result.metadata.statistics.len(), 0);
2621        assert_eq!(build_result.changes.len(), 0);
2622    }
2623
2624    #[test]
2625    fn test_add_partition_statistics() {
2626        let builder = builder_without_changes(FormatVersion::V2);
2627
2628        let statistics = PartitionStatisticsFile {
2629            snapshot_id: 3055729675574597004,
2630            statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2631            file_size_in_bytes: 43,
2632        };
2633
2634        let build_result = builder
2635            .set_partition_statistics(statistics.clone())
2636            .build()
2637            .unwrap();
2638        assert_eq!(
2639            build_result.metadata.partition_statistics,
2640            HashMap::from_iter(vec![(3055729675574597004, statistics.clone())])
2641        );
2642        assert_eq!(build_result.changes, vec![
2643            TableUpdate::SetPartitionStatistics {
2644                partition_statistics: statistics.clone()
2645            }
2646        ]);
2647
2648        // Remove
2649        let builder = build_result.metadata.into_builder(None);
2650        let build_result = builder
2651            .remove_partition_statistics(statistics.snapshot_id)
2652            .build()
2653            .unwrap();
2654        assert_eq!(build_result.metadata.partition_statistics.len(), 0);
2655        assert_eq!(build_result.changes, vec![
2656            TableUpdate::RemovePartitionStatistics {
2657                snapshot_id: statistics.snapshot_id
2658            }
2659        ]);
2660
2661        // Remove again yields no changes
2662        let builder = build_result.metadata.into_builder(None);
2663        let build_result = builder
2664            .remove_partition_statistics(statistics.snapshot_id)
2665            .build()
2666            .unwrap();
2667        assert_eq!(build_result.metadata.partition_statistics.len(), 0);
2668        assert_eq!(build_result.changes.len(), 0);
2669    }
2670
2671    #[test]
2672    fn last_update_increased_for_property_only_update() {
2673        let builder = builder_without_changes(FormatVersion::V2);
2674
2675        let metadata = builder.build().unwrap().metadata;
2676        let last_updated_ms = metadata.last_updated_ms;
2677        sleep(std::time::Duration::from_millis(2));
2678
2679        let build_result = metadata
2680            .into_builder(Some(
2681                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2682            ))
2683            .set_properties(HashMap::from_iter(vec![(
2684                "foo".to_string(),
2685                "bar".to_string(),
2686            )]))
2687            .unwrap()
2688            .build()
2689            .unwrap();
2690
2691        assert!(
2692            build_result.metadata.last_updated_ms > last_updated_ms,
2693            "{} > {}",
2694            build_result.metadata.last_updated_ms,
2695            last_updated_ms
2696        );
2697    }
2698
2699    #[test]
2700    fn test_construct_default_main_branch() {
2701        // Load the table without ref
2702        let file = File::open(format!(
2703            "{}/testdata/table_metadata/{}",
2704            env!("CARGO_MANIFEST_DIR"),
2705            "TableMetadataV2Valid.json"
2706        ))
2707        .unwrap();
2708        let reader = BufReader::new(file);
2709        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2710
2711        let table = Table::builder()
2712            .metadata(resp)
2713            .metadata_location("s3://bucket/test/location/metadata/v1.json")
2714            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2715            .file_io(FileIO::new_with_memory())
2716            .runtime(test_runtime())
2717            .build()
2718            .unwrap();
2719
2720        assert_eq!(
2721            table.metadata().refs.get(MAIN_BRANCH).unwrap().snapshot_id,
2722            table.metadata().current_snapshot_id().unwrap()
2723        );
2724    }
2725
2726    #[test]
2727    fn test_active_schema_cannot_be_removed() {
2728        let builder = builder_without_changes(FormatVersion::V2);
2729        builder.remove_schemas(&[0]).unwrap_err();
2730    }
2731
2732    #[test]
2733    fn test_remove_schemas() {
2734        let file = File::open(format!(
2735            "{}/testdata/table_metadata/{}",
2736            env!("CARGO_MANIFEST_DIR"),
2737            "TableMetadataV2Valid.json"
2738        ))
2739        .unwrap();
2740        let reader = BufReader::new(file);
2741        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
2742
2743        let table = Table::builder()
2744            .metadata(resp)
2745            .metadata_location("s3://bucket/test/location/metadata/v1.json")
2746            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
2747            .file_io(FileIO::new_with_memory())
2748            .runtime(test_runtime())
2749            .build()
2750            .unwrap();
2751
2752        assert_eq!(2, table.metadata().schemas.len());
2753
2754        {
2755            // can not remove active schema
2756            let meta_data_builder = table.metadata().clone().into_builder(None);
2757            meta_data_builder.remove_schemas(&[1]).unwrap_err();
2758        }
2759
2760        let mut meta_data_builder = table.metadata().clone().into_builder(None);
2761        meta_data_builder = meta_data_builder.remove_schemas(&[0]).unwrap();
2762        let build_result = meta_data_builder.build().unwrap();
2763        assert_eq!(1, build_result.metadata.schemas.len());
2764        assert_eq!(1, build_result.metadata.current_schema_id);
2765        assert_eq!(1, build_result.metadata.current_schema().schema_id());
2766        assert_eq!(1, build_result.changes.len());
2767
2768        let remove_schema_ids =
2769            if let TableUpdate::RemoveSchemas { schema_ids } = &build_result.changes[0] {
2770                schema_ids
2771            } else {
2772                unreachable!("Expected RemoveSchema change")
2773            };
2774        assert_eq!(remove_schema_ids, &[0]);
2775    }
2776
2777    #[test]
2778    fn test_schema_evolution_now_correctly_validates_partition_field_name_conflicts() {
2779        let initial_schema = Schema::builder()
2780            .with_fields(vec![
2781                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2782            ])
2783            .build()
2784            .unwrap();
2785
2786        let partition_spec_with_bucket = UnboundPartitionSpec::builder()
2787            .with_spec_id(0)
2788            .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2789            .unwrap()
2790            .build();
2791
2792        let metadata = TableMetadataBuilder::new(
2793            initial_schema,
2794            partition_spec_with_bucket,
2795            SortOrder::unsorted_order(),
2796            TEST_LOCATION.to_string(),
2797            FormatVersion::V2,
2798            HashMap::new(),
2799        )
2800        .unwrap()
2801        .build()
2802        .unwrap()
2803        .metadata;
2804
2805        let partition_field_names: Vec<String> = metadata
2806            .default_partition_spec()
2807            .fields()
2808            .iter()
2809            .map(|f| f.name.clone())
2810            .collect();
2811        assert!(partition_field_names.contains(&"bucket_data".to_string()));
2812
2813        let evolved_schema = Schema::builder()
2814            .with_fields(vec![
2815                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2816                // Adding a schema field with the same name as an existing partition field
2817                NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(),
2818            ])
2819            .build()
2820            .unwrap();
2821
2822        let builder = metadata.into_builder(Some(
2823            "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2824        ));
2825
2826        // Try to add the evolved schema - this should now fail immediately with a clear error
2827        let result = builder.add_current_schema(evolved_schema);
2828
2829        assert!(result.is_err());
2830        let error = result.unwrap_err();
2831        let error_message = error.message();
2832        assert!(error_message.contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
2833        assert!(error_message.contains("Schema evolution cannot introduce field names that match existing partition field names"));
2834    }
2835
2836    #[test]
2837    fn test_schema_evolution_should_validate_on_schema_add_not_metadata_build() {
2838        let initial_schema = Schema::builder()
2839            .with_fields(vec![
2840                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2841            ])
2842            .build()
2843            .unwrap();
2844
2845        let partition_spec = UnboundPartitionSpec::builder()
2846            .with_spec_id(0)
2847            .add_partition_field(1, "partition_col", Transform::Bucket(16))
2848            .unwrap()
2849            .build();
2850
2851        let metadata = TableMetadataBuilder::new(
2852            initial_schema,
2853            partition_spec,
2854            SortOrder::unsorted_order(),
2855            TEST_LOCATION.to_string(),
2856            FormatVersion::V2,
2857            HashMap::new(),
2858        )
2859        .unwrap()
2860        .build()
2861        .unwrap()
2862        .metadata;
2863
2864        let non_conflicting_schema = Schema::builder()
2865            .with_fields(vec![
2866                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2867                NestedField::required(2, "new_field", Type::Primitive(PrimitiveType::Int)).into(),
2868            ])
2869            .build()
2870            .unwrap();
2871
2872        // This should succeed since there's no name conflict
2873        let result = metadata
2874            .clone()
2875            .into_builder(Some("test_location".to_string()))
2876            .add_current_schema(non_conflicting_schema)
2877            .unwrap()
2878            .build();
2879
2880        assert!(result.is_ok());
2881    }
2882
2883    #[test]
2884    fn test_partition_spec_evolution_validates_schema_field_name_conflicts() {
2885        let initial_schema = Schema::builder()
2886            .with_fields(vec![
2887                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2888                NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2889                    .into(),
2890            ])
2891            .build()
2892            .unwrap();
2893
2894        let partition_spec = UnboundPartitionSpec::builder()
2895            .with_spec_id(0)
2896            .add_partition_field(1, "data_bucket", Transform::Bucket(16))
2897            .unwrap()
2898            .build();
2899
2900        let metadata = TableMetadataBuilder::new(
2901            initial_schema,
2902            partition_spec,
2903            SortOrder::unsorted_order(),
2904            TEST_LOCATION.to_string(),
2905            FormatVersion::V2,
2906            HashMap::new(),
2907        )
2908        .unwrap()
2909        .build()
2910        .unwrap()
2911        .metadata;
2912
2913        let builder = metadata.into_builder(Some(
2914            "s3://bucket/test/location/metadata/metadata1.json".to_string(),
2915        ));
2916
2917        let conflicting_partition_spec = UnboundPartitionSpec::builder()
2918            .with_spec_id(1)
2919            .add_partition_field(1, "existing_field", Transform::Bucket(8))
2920            .unwrap()
2921            .build();
2922
2923        let result = builder.add_partition_spec(conflicting_partition_spec);
2924
2925        assert!(result.is_err());
2926        let error = result.unwrap_err();
2927        let error_message = error.message();
2928        // The error comes from our multi-version validation
2929        assert!(error_message.contains(
2930            "Cannot create partition with name 'existing_field' that conflicts with schema field"
2931        ));
2932        assert!(error_message.contains("and is not an identity transform"));
2933    }
2934
2935    #[test]
2936    fn test_schema_evolution_validates_against_all_historical_schemas() {
2937        // Create a table with an initial schema that has a field "existing_field"
2938        let initial_schema = Schema::builder()
2939            .with_fields(vec![
2940                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2941                NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
2942                    .into(),
2943            ])
2944            .build()
2945            .unwrap();
2946
2947        let partition_spec = UnboundPartitionSpec::builder()
2948            .with_spec_id(0)
2949            .add_partition_field(1, "bucket_data", Transform::Bucket(16))
2950            .unwrap()
2951            .build();
2952
2953        let metadata = TableMetadataBuilder::new(
2954            initial_schema,
2955            partition_spec,
2956            SortOrder::unsorted_order(),
2957            TEST_LOCATION.to_string(),
2958            FormatVersion::V2,
2959            HashMap::new(),
2960        )
2961        .unwrap()
2962        .build()
2963        .unwrap()
2964        .metadata;
2965
2966        // Add a second schema that removes the existing_field but keeps the data field
2967        let second_schema = Schema::builder()
2968            .with_fields(vec![
2969                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2970                NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2971                    .into(),
2972            ])
2973            .build()
2974            .unwrap();
2975
2976        let metadata = metadata
2977            .into_builder(Some("test_location".to_string()))
2978            .add_current_schema(second_schema)
2979            .unwrap()
2980            .build()
2981            .unwrap()
2982            .metadata;
2983
2984        // Now try to add a third schema that reintroduces "existing_field"
2985        // This should succeed because "existing_field" exists in a historical schema,
2986        // even though there's a partition field named "bucket_data"
2987        let third_schema = Schema::builder()
2988            .with_fields(vec![
2989                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
2990                NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
2991                    .into(),
2992                NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int))
2993                    .into(),
2994            ])
2995            .build()
2996            .unwrap();
2997
2998        let builder = metadata
2999            .clone()
3000            .into_builder(Some("test_location".to_string()));
3001
3002        // This should succeed because "existing_field" exists in a historical schema
3003        let result = builder.add_current_schema(third_schema);
3004        assert!(result.is_ok());
3005
3006        // However, trying to add a schema field that conflicts with the partition field
3007        // and doesn't exist in any historical schema should fail
3008        let conflicting_schema = Schema::builder()
3009            .with_fields(vec![
3010                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3011                NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
3012                    .into(),
3013                NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int))
3014                    .into(),
3015                NestedField::required(5, "bucket_data", Type::Primitive(PrimitiveType::String))
3016                    .into(), // conflicts with partition field
3017            ])
3018            .build()
3019            .unwrap();
3020
3021        let builder2 = metadata.into_builder(Some("test_location".to_string()));
3022        let result2 = builder2.add_current_schema(conflicting_schema);
3023
3024        // This should fail because "bucket_data" conflicts with partition field name
3025        // and doesn't exist in any historical schema
3026        assert!(result2.is_err());
3027        let error = result2.unwrap_err();
3028        assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
3029    }
3030
3031    #[test]
3032    fn test_schema_evolution_allows_existing_partition_field_if_exists_in_historical_schema() {
3033        // Create initial schema with a field
3034        let initial_schema = Schema::builder()
3035            .with_fields(vec![
3036                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3037                NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int))
3038                    .into(),
3039            ])
3040            .build()
3041            .unwrap();
3042
3043        let partition_spec = UnboundPartitionSpec::builder()
3044            .with_spec_id(0)
3045            .add_partition_field(2, "partition_data", Transform::Identity)
3046            .unwrap()
3047            .build();
3048
3049        let metadata = TableMetadataBuilder::new(
3050            initial_schema,
3051            partition_spec,
3052            SortOrder::unsorted_order(),
3053            TEST_LOCATION.to_string(),
3054            FormatVersion::V2,
3055            HashMap::new(),
3056        )
3057        .unwrap()
3058        .build()
3059        .unwrap()
3060        .metadata;
3061
3062        // Add a new schema that still contains the partition_data field
3063        let evolved_schema = Schema::builder()
3064            .with_fields(vec![
3065                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3066                NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int))
3067                    .into(),
3068                NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String))
3069                    .into(),
3070            ])
3071            .build()
3072            .unwrap();
3073
3074        // This should succeed because partition_data exists in historical schemas
3075        let result = metadata
3076            .into_builder(Some("test_location".to_string()))
3077            .add_current_schema(evolved_schema);
3078
3079        assert!(result.is_ok());
3080    }
3081
3082    #[test]
3083    fn test_schema_evolution_prevents_new_field_conflicting_with_partition_field() {
3084        // Create initial schema WITHOUT the conflicting field
3085        let initial_schema = Schema::builder()
3086            .with_fields(vec![
3087                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3088            ])
3089            .build()
3090            .unwrap();
3091
3092        let partition_spec = UnboundPartitionSpec::builder()
3093            .with_spec_id(0)
3094            .add_partition_field(1, "bucket_data", Transform::Bucket(16))
3095            .unwrap()
3096            .build();
3097
3098        let metadata = TableMetadataBuilder::new(
3099            initial_schema,
3100            partition_spec,
3101            SortOrder::unsorted_order(),
3102            TEST_LOCATION.to_string(),
3103            FormatVersion::V2,
3104            HashMap::new(),
3105        )
3106        .unwrap()
3107        .build()
3108        .unwrap()
3109        .metadata;
3110
3111        // Try to add a schema with a field that conflicts with partition field name
3112        let conflicting_schema = Schema::builder()
3113            .with_fields(vec![
3114                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3115                // This field name conflicts with the partition field "bucket_data"
3116                NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(),
3117            ])
3118            .build()
3119            .unwrap();
3120
3121        let builder = metadata.into_builder(Some("test_location".to_string()));
3122        let result = builder.add_current_schema(conflicting_schema);
3123
3124        // This should fail because "bucket_data" conflicts with partition field name
3125        // and doesn't exist in any historical schema
3126        assert!(result.is_err());
3127        let error = result.unwrap_err();
3128        assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name"));
3129    }
3130
3131    #[test]
3132    fn test_partition_spec_evolution_allows_non_conflicting_names() {
3133        let initial_schema = Schema::builder()
3134            .with_fields(vec![
3135                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3136                NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int))
3137                    .into(),
3138            ])
3139            .build()
3140            .unwrap();
3141
3142        let partition_spec = UnboundPartitionSpec::builder()
3143            .with_spec_id(0)
3144            .add_partition_field(1, "data_bucket", Transform::Bucket(16))
3145            .unwrap()
3146            .build();
3147
3148        let metadata = TableMetadataBuilder::new(
3149            initial_schema,
3150            partition_spec,
3151            SortOrder::unsorted_order(),
3152            TEST_LOCATION.to_string(),
3153            FormatVersion::V2,
3154            HashMap::new(),
3155        )
3156        .unwrap()
3157        .build()
3158        .unwrap()
3159        .metadata;
3160
3161        let builder = metadata.into_builder(Some(
3162            "s3://bucket/test/location/metadata/metadata1.json".to_string(),
3163        ));
3164
3165        // Try to add a partition spec with a field name that does NOT conflict with existing schema fields
3166        let non_conflicting_partition_spec = UnboundPartitionSpec::builder()
3167            .with_spec_id(1)
3168            .add_partition_field(2, "new_partition_field", Transform::Bucket(8))
3169            .unwrap()
3170            .build();
3171
3172        let result = builder.add_partition_spec(non_conflicting_partition_spec);
3173
3174        assert!(result.is_ok());
3175    }
3176
3177    #[test]
3178    fn test_row_lineage_addition() {
3179        let new_rows = 30;
3180        let base = builder_without_changes(FormatVersion::V3)
3181            .build()
3182            .unwrap()
3183            .metadata;
3184        let add_rows = Snapshot::builder()
3185            .with_snapshot_id(0)
3186            .with_timestamp_ms(base.last_updated_ms + 1)
3187            .with_sequence_number(0)
3188            .with_schema_id(0)
3189            .with_manifest_list("foo")
3190            .with_parent_snapshot_id(None)
3191            .with_summary(Summary {
3192                operation: Operation::Append,
3193                additional_properties: HashMap::new(),
3194            })
3195            .with_row_range(base.next_row_id(), new_rows)
3196            .build();
3197
3198        let first_addition = base
3199            .into_builder(None)
3200            .add_snapshot(add_rows.clone())
3201            .unwrap()
3202            .build()
3203            .unwrap()
3204            .metadata;
3205
3206        assert_eq!(first_addition.next_row_id(), new_rows);
3207
3208        let add_more_rows = Snapshot::builder()
3209            .with_snapshot_id(1)
3210            .with_timestamp_ms(first_addition.last_updated_ms + 1)
3211            .with_sequence_number(1)
3212            .with_schema_id(0)
3213            .with_manifest_list("foo")
3214            .with_parent_snapshot_id(Some(0))
3215            .with_summary(Summary {
3216                operation: Operation::Append,
3217                additional_properties: HashMap::new(),
3218            })
3219            .with_row_range(first_addition.next_row_id(), new_rows)
3220            .build();
3221
3222        let second_addition = first_addition
3223            .into_builder(None)
3224            .add_snapshot(add_more_rows)
3225            .unwrap()
3226            .build()
3227            .unwrap()
3228            .metadata;
3229        assert_eq!(second_addition.next_row_id(), new_rows * 2);
3230    }
3231
3232    #[test]
3233    fn test_row_lineage_invalid_snapshot() {
3234        let new_rows = 30;
3235        let base = builder_without_changes(FormatVersion::V3)
3236            .build()
3237            .unwrap()
3238            .metadata;
3239
3240        // add rows to check TableMetadata validation; Snapshot rejects negative next-row-id
3241        let add_rows = Snapshot::builder()
3242            .with_snapshot_id(0)
3243            .with_timestamp_ms(base.last_updated_ms + 1)
3244            .with_sequence_number(0)
3245            .with_schema_id(0)
3246            .with_manifest_list("foo")
3247            .with_parent_snapshot_id(None)
3248            .with_summary(Summary {
3249                operation: Operation::Append,
3250                additional_properties: HashMap::new(),
3251            })
3252            .with_row_range(base.next_row_id(), new_rows)
3253            .build();
3254
3255        let added = base
3256            .into_builder(None)
3257            .add_snapshot(add_rows)
3258            .unwrap()
3259            .build()
3260            .unwrap()
3261            .metadata;
3262
3263        let invalid_new_rows = Snapshot::builder()
3264            .with_snapshot_id(1)
3265            .with_timestamp_ms(added.last_updated_ms + 1)
3266            .with_sequence_number(1)
3267            .with_schema_id(0)
3268            .with_manifest_list("foo")
3269            .with_parent_snapshot_id(Some(0))
3270            .with_summary(Summary {
3271                operation: Operation::Append,
3272                additional_properties: HashMap::new(),
3273            })
3274            // first_row_id is behind table next_row_id
3275            .with_row_range(added.next_row_id() - 1, 10)
3276            .build();
3277
3278        let err = added
3279            .into_builder(None)
3280            .add_snapshot(invalid_new_rows)
3281            .unwrap_err();
3282        assert!(
3283            err.to_string().contains(
3284                "Cannot add a snapshot, first-row-id is behind table next-row-id: 29 < 30"
3285            )
3286        );
3287    }
3288
3289    #[test]
3290    fn test_row_lineage_append_branch() {
3291        // Appends to a branch should still change last-row-id even if not on main, these changes
3292        // should also affect commits to main
3293
3294        let branch = "some_branch";
3295
3296        // Start with V3 metadata to support row lineage
3297        let base = builder_without_changes(FormatVersion::V3)
3298            .build()
3299            .unwrap()
3300            .metadata;
3301
3302        // Initial next_row_id should be 0
3303        assert_eq!(base.next_row_id(), 0);
3304
3305        // Write to Branch - append 30 rows
3306        let branch_snapshot_1 = Snapshot::builder()
3307            .with_snapshot_id(1)
3308            .with_timestamp_ms(base.last_updated_ms + 1)
3309            .with_sequence_number(0)
3310            .with_schema_id(0)
3311            .with_manifest_list("foo")
3312            .with_parent_snapshot_id(None)
3313            .with_summary(Summary {
3314                operation: Operation::Append,
3315                additional_properties: HashMap::new(),
3316            })
3317            .with_row_range(base.next_row_id(), 30)
3318            .build();
3319
3320        let table_after_branch_1 = base
3321            .into_builder(None)
3322            .set_branch_snapshot(branch_snapshot_1.clone(), branch)
3323            .unwrap()
3324            .build()
3325            .unwrap()
3326            .metadata;
3327
3328        // Current snapshot should be null (no main branch snapshot yet)
3329        assert!(table_after_branch_1.current_snapshot().is_none());
3330
3331        // Branch snapshot should have first_row_id = 0
3332        let branch_ref = table_after_branch_1.refs.get(branch).unwrap();
3333        let branch_snap_1 = table_after_branch_1
3334            .snapshots
3335            .get(&branch_ref.snapshot_id)
3336            .unwrap();
3337        assert_eq!(branch_snap_1.first_row_id(), Some(0));
3338
3339        // Next row id should be 30
3340        assert_eq!(table_after_branch_1.next_row_id(), 30);
3341
3342        // Write to Main - append 28 rows
3343        let main_snapshot = Snapshot::builder()
3344            .with_snapshot_id(2)
3345            .with_timestamp_ms(table_after_branch_1.last_updated_ms + 1)
3346            .with_sequence_number(1)
3347            .with_schema_id(0)
3348            .with_manifest_list("bar")
3349            .with_parent_snapshot_id(None)
3350            .with_summary(Summary {
3351                operation: Operation::Append,
3352                additional_properties: HashMap::new(),
3353            })
3354            .with_row_range(table_after_branch_1.next_row_id(), 28)
3355            .build();
3356
3357        let table_after_main = table_after_branch_1
3358            .into_builder(None)
3359            .add_snapshot(main_snapshot.clone())
3360            .unwrap()
3361            .set_ref(MAIN_BRANCH, SnapshotReference {
3362                snapshot_id: main_snapshot.snapshot_id(),
3363                retention: SnapshotRetention::Branch {
3364                    min_snapshots_to_keep: None,
3365                    max_snapshot_age_ms: None,
3366                    max_ref_age_ms: None,
3367                },
3368            })
3369            .unwrap()
3370            .build()
3371            .unwrap()
3372            .metadata;
3373
3374        // Main snapshot should have first_row_id = 30
3375        let current_snapshot = table_after_main.current_snapshot().unwrap();
3376        assert_eq!(current_snapshot.first_row_id(), Some(30));
3377
3378        // Next row id should be 58 (30 + 28)
3379        assert_eq!(table_after_main.next_row_id(), 58);
3380
3381        // Write again to branch - append 21 rows
3382        let branch_snapshot_2 = Snapshot::builder()
3383            .with_snapshot_id(3)
3384            .with_timestamp_ms(table_after_main.last_updated_ms + 1)
3385            .with_sequence_number(2)
3386            .with_schema_id(0)
3387            .with_manifest_list("baz")
3388            .with_parent_snapshot_id(Some(branch_snapshot_1.snapshot_id()))
3389            .with_summary(Summary {
3390                operation: Operation::Append,
3391                additional_properties: HashMap::new(),
3392            })
3393            .with_row_range(table_after_main.next_row_id(), 21)
3394            .build();
3395
3396        let table_after_branch_2 = table_after_main
3397            .into_builder(None)
3398            .set_branch_snapshot(branch_snapshot_2.clone(), branch)
3399            .unwrap()
3400            .build()
3401            .unwrap()
3402            .metadata;
3403
3404        // Branch snapshot should have first_row_id = 58 (30 + 28)
3405        let branch_ref_2 = table_after_branch_2.refs.get(branch).unwrap();
3406        let branch_snap_2 = table_after_branch_2
3407            .snapshots
3408            .get(&branch_ref_2.snapshot_id)
3409            .unwrap();
3410        assert_eq!(branch_snap_2.first_row_id(), Some(58));
3411
3412        // Next row id should be 79 (30 + 28 + 21)
3413        assert_eq!(table_after_branch_2.next_row_id(), 79);
3414    }
3415
3416    #[test]
3417    fn test_encryption_keys() {
3418        let builder = builder_without_changes(FormatVersion::V2);
3419
3420        // Create test encryption keys
3421        let encryption_key_1 = EncryptedKey::builder()
3422            .key_id("key-1")
3423            .encrypted_key_metadata(vec![1, 2, 3, 4])
3424            .encrypted_by_id("encryption-service-1")
3425            .properties(HashMap::from_iter(vec![(
3426                "algorithm".to_string(),
3427                "AES-256".to_string(),
3428            )]))
3429            .build();
3430
3431        let encryption_key_2 = EncryptedKey::builder()
3432            .key_id("key-2")
3433            .encrypted_key_metadata(vec![5, 6, 7, 8])
3434            .encrypted_by_id("encryption-service-2")
3435            .properties(HashMap::new())
3436            .build();
3437
3438        // Add first encryption key
3439        let build_result = builder
3440            .add_encryption_key(encryption_key_1.clone())
3441            .build()
3442            .unwrap();
3443
3444        assert_eq!(build_result.changes.len(), 1);
3445        assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3446        assert_eq!(
3447            build_result.metadata.encryption_key("key-1"),
3448            Some(&encryption_key_1)
3449        );
3450        assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey {
3451            encryption_key: encryption_key_1.clone()
3452        });
3453
3454        // Add second encryption key
3455        let build_result = build_result
3456            .metadata
3457            .into_builder(Some(
3458                "s3://bucket/test/location/metadata/metadata1.json".to_string(),
3459            ))
3460            .add_encryption_key(encryption_key_2.clone())
3461            .build()
3462            .unwrap();
3463
3464        assert_eq!(build_result.changes.len(), 1);
3465        assert_eq!(build_result.metadata.encryption_keys.len(), 2);
3466        assert_eq!(
3467            build_result.metadata.encryption_key("key-1"),
3468            Some(&encryption_key_1)
3469        );
3470        assert_eq!(
3471            build_result.metadata.encryption_key("key-2"),
3472            Some(&encryption_key_2)
3473        );
3474        assert_eq!(build_result.changes[0], TableUpdate::AddEncryptionKey {
3475            encryption_key: encryption_key_2.clone()
3476        });
3477
3478        // Try to add duplicate key - should not create a change
3479        let build_result = build_result
3480            .metadata
3481            .into_builder(Some(
3482                "s3://bucket/test/location/metadata/metadata2.json".to_string(),
3483            ))
3484            .add_encryption_key(encryption_key_1.clone())
3485            .build()
3486            .unwrap();
3487
3488        assert_eq!(build_result.changes.len(), 0);
3489        assert_eq!(build_result.metadata.encryption_keys.len(), 2);
3490
3491        // Remove first encryption key
3492        let build_result = build_result
3493            .metadata
3494            .into_builder(Some(
3495                "s3://bucket/test/location/metadata/metadata3.json".to_string(),
3496            ))
3497            .remove_encryption_key("key-1")
3498            .build()
3499            .unwrap();
3500
3501        assert_eq!(build_result.changes.len(), 1);
3502        assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3503        assert_eq!(build_result.metadata.encryption_key("key-1"), None);
3504        assert_eq!(
3505            build_result.metadata.encryption_key("key-2"),
3506            Some(&encryption_key_2)
3507        );
3508        assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey {
3509            key_id: "key-1".to_string()
3510        });
3511
3512        // Try to remove non-existent key - should not create a change
3513        let build_result = build_result
3514            .metadata
3515            .into_builder(Some(
3516                "s3://bucket/test/location/metadata/metadata4.json".to_string(),
3517            ))
3518            .remove_encryption_key("non-existent-key")
3519            .build()
3520            .unwrap();
3521
3522        assert_eq!(build_result.changes.len(), 0);
3523        assert_eq!(build_result.metadata.encryption_keys.len(), 1);
3524
3525        // Test encryption_keys_iter()
3526        let keys = build_result
3527            .metadata
3528            .encryption_keys_iter()
3529            .collect::<Vec<_>>();
3530        assert_eq!(keys.len(), 1);
3531        assert_eq!(keys[0], &encryption_key_2);
3532
3533        // Remove last encryption key
3534        let build_result = build_result
3535            .metadata
3536            .into_builder(Some(
3537                "s3://bucket/test/location/metadata/metadata5.json".to_string(),
3538            ))
3539            .remove_encryption_key("key-2")
3540            .build()
3541            .unwrap();
3542
3543        assert_eq!(build_result.changes.len(), 1);
3544        assert_eq!(build_result.metadata.encryption_keys.len(), 0);
3545        assert_eq!(build_result.metadata.encryption_key("key-2"), None);
3546        assert_eq!(build_result.changes[0], TableUpdate::RemoveEncryptionKey {
3547            key_id: "key-2".to_string()
3548        });
3549
3550        // Verify empty encryption_keys_iter()
3551        let keys = build_result.metadata.encryption_keys_iter();
3552        assert_eq!(keys.len(), 0);
3553    }
3554
3555    #[test]
3556    fn test_partition_field_id_reuse_across_specs() {
3557        let schema = Schema::builder()
3558            .with_fields(vec![
3559                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3560                NestedField::required(2, "data", Type::Primitive(PrimitiveType::String)).into(),
3561                NestedField::required(3, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
3562                    .into(),
3563            ])
3564            .build()
3565            .unwrap();
3566
3567        // Create initial table with spec 0: identity(id) -> field_id = 1000
3568        let initial_spec = UnboundPartitionSpec::builder()
3569            .add_partition_field(1, "id", Transform::Identity)
3570            .unwrap()
3571            .build();
3572
3573        let mut metadata = TableMetadataBuilder::new(
3574            schema,
3575            initial_spec,
3576            SortOrder::unsorted_order(),
3577            "s3://bucket/table".to_string(),
3578            FormatVersion::V2,
3579            HashMap::new(),
3580        )
3581        .unwrap()
3582        .build()
3583        .unwrap()
3584        .metadata;
3585
3586        // Add spec 1: bucket(data) -> field_id = 1001
3587        let spec1 = UnboundPartitionSpec::builder()
3588            .add_partition_field(2, "data_bucket", Transform::Bucket(10))
3589            .unwrap()
3590            .build();
3591        let builder = metadata.into_builder(Some("s3://bucket/table/metadata/v1.json".to_string()));
3592        let result = builder.add_partition_spec(spec1).unwrap().build().unwrap();
3593        metadata = result.metadata;
3594
3595        // Add spec 2: identity(id) + bucket(data) + year(timestamp)
3596        // Should reuse field_id 1000 for identity(id) and 1001 for bucket(data)
3597        let spec2 = UnboundPartitionSpec::builder()
3598            .add_partition_field(1, "id", Transform::Identity) // Should reuse 1000
3599            .unwrap()
3600            .add_partition_field(2, "data_bucket", Transform::Bucket(10)) // Should reuse 1001
3601            .unwrap()
3602            .add_partition_field(3, "year", Transform::Year) // Should get new 1002
3603            .unwrap()
3604            .build();
3605        let builder = metadata.into_builder(Some("s3://bucket/table/metadata/v2.json".to_string()));
3606        let result = builder.add_partition_spec(spec2).unwrap().build().unwrap();
3607
3608        // Verify field ID reuse: spec 2 should reuse IDs from specs 0 and 1, assign new ID for new field
3609        let spec2 = result.metadata.partition_spec_by_id(2).unwrap();
3610        let field_ids: Vec<i32> = spec2.fields().iter().map(|f| f.field_id).collect();
3611        assert_eq!(field_ids, vec![1000, 1001, 1002]); // Reused 1000, 1001; new 1002
3612    }
3613}