iceberg/spec/
table_metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the [table metadata](https://iceberg.apache.org/spec/#table-metadata).
19//! The main struct here is [TableMetadataV2] which defines the data for a table.
20
21use std::cmp::Ordering;
22use std::collections::HashMap;
23use std::fmt::{Display, Formatter};
24use std::hash::Hash;
25use std::io::Read as _;
26use std::sync::Arc;
27
28use _serde::TableMetadataEnum;
29use chrono::{DateTime, Utc};
30use flate2::read::GzDecoder;
31use serde::{Deserialize, Serialize};
32use serde_repr::{Deserialize_repr, Serialize_repr};
33use uuid::Uuid;
34
35use super::snapshot::SnapshotReference;
36pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder};
37use super::{
38    DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef,
39    SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType,
40};
41use crate::error::{Result, timestamp_ms_to_utc};
42use crate::io::FileIO;
43use crate::spec::EncryptedKey;
44use crate::{Error, ErrorKind};
45
46static MAIN_BRANCH: &str = "main";
47pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
48
49pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
50pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
51
52/// Initial row id for row lineage for new v3 tables and older tables upgrading to v3.
53pub const INITIAL_ROW_ID: u64 = 0;
54/// Minimum format version that supports row lineage (v3).
55pub const MIN_FORMAT_VERSION_ROW_LINEAGE: FormatVersion = FormatVersion::V3;
56/// Reference to [`TableMetadata`].
57pub type TableMetadataRef = Arc<TableMetadata>;
58
59#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
60#[serde(try_from = "TableMetadataEnum")]
61/// Fields for the version 2 of the table metadata.
62///
63/// We assume that this data structure is always valid, so we will panic when invalid error happens.
64/// We check the validity of this data structure when constructing.
65pub struct TableMetadata {
66    /// Integer Version for the format.
67    pub(crate) format_version: FormatVersion,
68    /// A UUID that identifies the table
69    pub(crate) table_uuid: Uuid,
70    /// Location tables base location
71    pub(crate) location: String,
72    /// The tables highest sequence number
73    pub(crate) last_sequence_number: i64,
74    /// Timestamp in milliseconds from the unix epoch when the table was last updated.
75    pub(crate) last_updated_ms: i64,
76    /// An integer; the highest assigned column ID for the table.
77    pub(crate) last_column_id: i32,
78    /// A list of schemas, stored as objects with schema-id.
79    pub(crate) schemas: HashMap<i32, SchemaRef>,
80    /// ID of the table’s current schema.
81    pub(crate) current_schema_id: i32,
82    /// A list of partition specs, stored as full partition spec objects.
83    pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
84    /// ID of the “current” spec that writers should use by default.
85    pub(crate) default_spec: PartitionSpecRef,
86    /// Partition type of the default partition spec.
87    pub(crate) default_partition_type: StructType,
88    /// An integer; the highest assigned partition field ID across all partition specs for the table.
89    pub(crate) last_partition_id: i32,
90    ///A string to string map of table properties. This is used to control settings that
91    /// affect reading and writing and is not intended to be used for arbitrary metadata.
92    /// For example, commit.retry.num-retries is used to control the number of commit retries.
93    pub(crate) properties: HashMap<String, String>,
94    /// long ID of the current table snapshot; must be the same as the current
95    /// ID of the main branch in refs.
96    pub(crate) current_snapshot_id: Option<i64>,
97    ///A list of valid snapshots. Valid snapshots are snapshots for which all
98    /// data files exist in the file system. A data file must not be deleted
99    /// from the file system until the last snapshot in which it was listed is
100    /// garbage collected.
101    pub(crate) snapshots: HashMap<i64, SnapshotRef>,
102    /// A list (optional) of timestamp and snapshot ID pairs that encodes changes
103    /// to the current snapshot for the table. Each time the current-snapshot-id
104    /// is changed, a new entry should be added with the last-updated-ms
105    /// and the new current-snapshot-id. When snapshots are expired from
106    /// the list of valid snapshots, all entries before a snapshot that has
107    /// expired should be removed.
108    pub(crate) snapshot_log: Vec<SnapshotLog>,
109
110    /// A list (optional) of timestamp and metadata file location pairs
111    /// that encodes changes to the previous metadata files for the table.
112    /// Each time a new metadata file is created, a new entry of the
113    /// previous metadata file location should be added to the list.
114    /// Tables can be configured to remove the oldest metadata log entries and
115    /// keep a fixed-size log of the most recent entries after a commit.
116    pub(crate) metadata_log: Vec<MetadataLog>,
117
118    /// A list of sort orders, stored as full sort order objects.
119    pub(crate) sort_orders: HashMap<i64, SortOrderRef>,
120    /// Default sort order id of the table. Note that this could be used by
121    /// writers, but is not used when reading because reads use the specs
122    /// stored in manifest files.
123    pub(crate) default_sort_order_id: i64,
124    /// A map of snapshot references. The map keys are the unique snapshot reference
125    /// names in the table, and the map values are snapshot reference objects.
126    /// There is always a main branch reference pointing to the current-snapshot-id
127    /// even if the refs map is null.
128    pub(crate) refs: HashMap<String, SnapshotReference>,
129    /// Mapping of snapshot ids to statistics files.
130    pub(crate) statistics: HashMap<i64, StatisticsFile>,
131    /// Mapping of snapshot ids to partition statistics files.
132    pub(crate) partition_statistics: HashMap<i64, PartitionStatisticsFile>,
133    /// Encryption Keys - map of key id to the actual key
134    pub(crate) encryption_keys: HashMap<String, EncryptedKey>,
135    /// Next row id to be assigned for Row Lineage (v3)
136    pub(crate) next_row_id: u64,
137}
138
139impl TableMetadata {
140    /// Convert this Table Metadata into a builder for modification.
141    ///
142    /// `current_file_location` is the location where the current version
143    /// of the metadata file is stored. This is used to update the metadata log.
144    /// If `current_file_location` is `None`, the metadata log will not be updated.
145    /// This should only be used to stage-create tables.
146    #[must_use]
147    pub fn into_builder(self, current_file_location: Option<String>) -> TableMetadataBuilder {
148        TableMetadataBuilder::new_from_metadata(self, current_file_location)
149    }
150
151    /// Check if a partition field name exists in any partition spec.
152    #[inline]
153    pub(crate) fn partition_name_exists(&self, name: &str) -> bool {
154        self.partition_specs
155            .values()
156            .any(|spec| spec.fields().iter().any(|pf| pf.name == name))
157    }
158
159    /// Check if a field name exists in any schema.
160    #[inline]
161    pub(crate) fn name_exists_in_any_schema(&self, name: &str) -> bool {
162        self.schemas
163            .values()
164            .any(|schema| schema.field_by_name(name).is_some())
165    }
166
167    /// Returns format version of this metadata.
168    #[inline]
169    pub fn format_version(&self) -> FormatVersion {
170        self.format_version
171    }
172
173    /// Returns uuid of current table.
174    #[inline]
175    pub fn uuid(&self) -> Uuid {
176        self.table_uuid
177    }
178
179    /// Returns table location.
180    #[inline]
181    pub fn location(&self) -> &str {
182        self.location.as_str()
183    }
184
185    /// Returns last sequence number.
186    #[inline]
187    pub fn last_sequence_number(&self) -> i64 {
188        self.last_sequence_number
189    }
190
191    /// Returns the next sequence number for the table.
192    ///
193    /// For format version 1, it always returns the initial sequence number.
194    /// For other versions, it returns the last sequence number incremented by 1.
195    #[inline]
196    pub fn next_sequence_number(&self) -> i64 {
197        match self.format_version {
198            FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER,
199            _ => self.last_sequence_number + 1,
200        }
201    }
202
203    /// Returns the last column id.
204    #[inline]
205    pub fn last_column_id(&self) -> i32 {
206        self.last_column_id
207    }
208
209    /// Returns the last partition_id
210    #[inline]
211    pub fn last_partition_id(&self) -> i32 {
212        self.last_partition_id
213    }
214
215    /// Returns last updated time.
216    #[inline]
217    pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
218        timestamp_ms_to_utc(self.last_updated_ms)
219    }
220
221    /// Returns last updated time in milliseconds.
222    #[inline]
223    pub fn last_updated_ms(&self) -> i64 {
224        self.last_updated_ms
225    }
226
227    /// Returns schemas
228    #[inline]
229    pub fn schemas_iter(&self) -> impl ExactSizeIterator<Item = &SchemaRef> {
230        self.schemas.values()
231    }
232
233    /// Lookup schema by id.
234    #[inline]
235    pub fn schema_by_id(&self, schema_id: SchemaId) -> Option<&SchemaRef> {
236        self.schemas.get(&schema_id)
237    }
238
239    /// Get current schema
240    #[inline]
241    pub fn current_schema(&self) -> &SchemaRef {
242        self.schema_by_id(self.current_schema_id)
243            .expect("Current schema id set, but not found in table metadata")
244    }
245
246    /// Get the id of the current schema
247    #[inline]
248    pub fn current_schema_id(&self) -> SchemaId {
249        self.current_schema_id
250    }
251
252    /// Returns all partition specs.
253    #[inline]
254    pub fn partition_specs_iter(&self) -> impl ExactSizeIterator<Item = &PartitionSpecRef> {
255        self.partition_specs.values()
256    }
257
258    /// Lookup partition spec by id.
259    #[inline]
260    pub fn partition_spec_by_id(&self, spec_id: i32) -> Option<&PartitionSpecRef> {
261        self.partition_specs.get(&spec_id)
262    }
263
264    /// Get default partition spec
265    #[inline]
266    pub fn default_partition_spec(&self) -> &PartitionSpecRef {
267        &self.default_spec
268    }
269
270    /// Return the partition type of the default partition spec.
271    #[inline]
272    pub fn default_partition_type(&self) -> &StructType {
273        &self.default_partition_type
274    }
275
276    #[inline]
277    /// Returns spec id of the "current" partition spec.
278    pub fn default_partition_spec_id(&self) -> i32 {
279        self.default_spec.spec_id()
280    }
281
282    /// Returns all snapshots
283    #[inline]
284    pub fn snapshots(&self) -> impl ExactSizeIterator<Item = &SnapshotRef> {
285        self.snapshots.values()
286    }
287
288    /// Lookup snapshot by id.
289    #[inline]
290    pub fn snapshot_by_id(&self, snapshot_id: i64) -> Option<&SnapshotRef> {
291        self.snapshots.get(&snapshot_id)
292    }
293
294    /// Returns snapshot history.
295    #[inline]
296    pub fn history(&self) -> &[SnapshotLog] {
297        &self.snapshot_log
298    }
299
300    /// Returns the metadata log.
301    #[inline]
302    pub fn metadata_log(&self) -> &[MetadataLog] {
303        &self.metadata_log
304    }
305
306    /// Get current snapshot
307    #[inline]
308    pub fn current_snapshot(&self) -> Option<&SnapshotRef> {
309        self.current_snapshot_id.map(|s| {
310            self.snapshot_by_id(s)
311                .expect("Current snapshot id has been set, but doesn't exist in metadata")
312        })
313    }
314
315    /// Get the current snapshot id
316    #[inline]
317    pub fn current_snapshot_id(&self) -> Option<i64> {
318        self.current_snapshot_id
319    }
320
321    /// Get the snapshot for a reference
322    /// Returns an option if the `ref_name` is not found
323    #[inline]
324    pub fn snapshot_for_ref(&self, ref_name: &str) -> Option<&SnapshotRef> {
325        self.refs.get(ref_name).map(|r| {
326            self.snapshot_by_id(r.snapshot_id)
327                .unwrap_or_else(|| panic!("Snapshot id of ref {ref_name} doesn't exist"))
328        })
329    }
330
331    /// Return all sort orders.
332    #[inline]
333    pub fn sort_orders_iter(&self) -> impl ExactSizeIterator<Item = &SortOrderRef> {
334        self.sort_orders.values()
335    }
336
337    /// Lookup sort order by id.
338    #[inline]
339    pub fn sort_order_by_id(&self, sort_order_id: i64) -> Option<&SortOrderRef> {
340        self.sort_orders.get(&sort_order_id)
341    }
342
343    /// Returns default sort order id.
344    #[inline]
345    pub fn default_sort_order(&self) -> &SortOrderRef {
346        self.sort_orders
347            .get(&self.default_sort_order_id)
348            .expect("Default order id has been set, but not found in table metadata!")
349    }
350
351    /// Returns default sort order id.
352    #[inline]
353    pub fn default_sort_order_id(&self) -> i64 {
354        self.default_sort_order_id
355    }
356
357    /// Returns properties of table.
358    #[inline]
359    pub fn properties(&self) -> &HashMap<String, String> {
360        &self.properties
361    }
362
363    /// Return location of statistics files.
364    #[inline]
365    pub fn statistics_iter(&self) -> impl ExactSizeIterator<Item = &StatisticsFile> {
366        self.statistics.values()
367    }
368
369    /// Return location of partition statistics files.
370    #[inline]
371    pub fn partition_statistics_iter(
372        &self,
373    ) -> impl ExactSizeIterator<Item = &PartitionStatisticsFile> {
374        self.partition_statistics.values()
375    }
376
377    /// Get a statistics file for a snapshot id.
378    #[inline]
379    pub fn statistics_for_snapshot(&self, snapshot_id: i64) -> Option<&StatisticsFile> {
380        self.statistics.get(&snapshot_id)
381    }
382
383    /// Get a partition statistics file for a snapshot id.
384    #[inline]
385    pub fn partition_statistics_for_snapshot(
386        &self,
387        snapshot_id: i64,
388    ) -> Option<&PartitionStatisticsFile> {
389        self.partition_statistics.get(&snapshot_id)
390    }
391
392    fn construct_refs(&mut self) {
393        if let Some(current_snapshot_id) = self.current_snapshot_id {
394            if !self.refs.contains_key(MAIN_BRANCH) {
395                self.refs
396                    .insert(MAIN_BRANCH.to_string(), SnapshotReference {
397                        snapshot_id: current_snapshot_id,
398                        retention: SnapshotRetention::Branch {
399                            min_snapshots_to_keep: None,
400                            max_snapshot_age_ms: None,
401                            max_ref_age_ms: None,
402                        },
403                    });
404            }
405        }
406    }
407
408    /// Iterate over all encryption keys
409    #[inline]
410    pub fn encryption_keys_iter(&self) -> impl ExactSizeIterator<Item = &EncryptedKey> {
411        self.encryption_keys.values()
412    }
413
414    /// Get the encryption key for a given key id
415    #[inline]
416    pub fn encryption_key(&self, key_id: &str) -> Option<&EncryptedKey> {
417        self.encryption_keys.get(key_id)
418    }
419
420    /// Get the next row id to be assigned
421    #[inline]
422    pub fn next_row_id(&self) -> u64 {
423        self.next_row_id
424    }
425
426    /// Read table metadata from the given location.
427    pub async fn read_from(
428        file_io: &FileIO,
429        metadata_location: impl AsRef<str>,
430    ) -> Result<TableMetadata> {
431        let metadata_location = metadata_location.as_ref();
432        let input_file = file_io.new_input(metadata_location)?;
433        let metadata_content = input_file.read().await?;
434
435        // Check if the file is compressed by looking for the gzip "magic number".
436        let metadata = if metadata_content.len() > 2
437            && metadata_content[0] == 0x1F
438            && metadata_content[1] == 0x8B
439        {
440            let mut decoder = GzDecoder::new(metadata_content.as_ref());
441            let mut decompressed_data = Vec::new();
442            decoder.read_to_end(&mut decompressed_data).map_err(|e| {
443                Error::new(
444                    ErrorKind::DataInvalid,
445                    "Trying to read compressed metadata file",
446                )
447                .with_context("file_path", metadata_location)
448                .with_source(e)
449            })?;
450            serde_json::from_slice(&decompressed_data)?
451        } else {
452            serde_json::from_slice(&metadata_content)?
453        };
454
455        Ok(metadata)
456    }
457
458    /// Write table metadata to the given location.
459    pub async fn write_to(
460        &self,
461        file_io: &FileIO,
462        metadata_location: impl AsRef<str>,
463    ) -> Result<()> {
464        file_io
465            .new_output(metadata_location)?
466            .write(serde_json::to_vec(self)?.into())
467            .await
468    }
469
470    /// Normalize this partition spec.
471    ///
472    /// This is an internal method
473    /// meant to be called after constructing table metadata from untrusted sources.
474    /// We run this method after json deserialization.
475    /// All constructors for `TableMetadata` which are part of `iceberg-rust`
476    /// should return normalized `TableMetadata`.
477    pub(super) fn try_normalize(&mut self) -> Result<&mut Self> {
478        self.validate_current_schema()?;
479        self.normalize_current_snapshot()?;
480        self.construct_refs();
481        self.validate_refs()?;
482        self.validate_chronological_snapshot_logs()?;
483        self.validate_chronological_metadata_logs()?;
484        // Normalize location (remove trailing slash)
485        self.location = self.location.trim_end_matches('/').to_string();
486        self.validate_snapshot_sequence_number()?;
487        self.try_normalize_partition_spec()?;
488        self.try_normalize_sort_order()?;
489        Ok(self)
490    }
491
492    /// If the default partition spec is not present in specs, add it
493    fn try_normalize_partition_spec(&mut self) -> Result<()> {
494        if self
495            .partition_spec_by_id(self.default_spec.spec_id())
496            .is_none()
497        {
498            self.partition_specs.insert(
499                self.default_spec.spec_id(),
500                Arc::new(Arc::unwrap_or_clone(self.default_spec.clone())),
501            );
502        }
503
504        Ok(())
505    }
506
507    /// If the default sort order is unsorted but the sort order is not present, add it
508    fn try_normalize_sort_order(&mut self) -> Result<()> {
509        if self.sort_order_by_id(self.default_sort_order_id).is_some() {
510            return Ok(());
511        }
512
513        if self.default_sort_order_id != SortOrder::UNSORTED_ORDER_ID {
514            return Err(Error::new(
515                ErrorKind::DataInvalid,
516                format!(
517                    "No sort order exists with the default sort order id {}.",
518                    self.default_sort_order_id
519                ),
520            ));
521        }
522
523        let sort_order = SortOrder::unsorted_order();
524        self.sort_orders
525            .insert(SortOrder::UNSORTED_ORDER_ID, Arc::new(sort_order));
526        Ok(())
527    }
528
529    /// Validate the current schema is set and exists.
530    fn validate_current_schema(&self) -> Result<()> {
531        if self.schema_by_id(self.current_schema_id).is_none() {
532            return Err(Error::new(
533                ErrorKind::DataInvalid,
534                format!(
535                    "No schema exists with the current schema id {}.",
536                    self.current_schema_id
537                ),
538            ));
539        }
540        Ok(())
541    }
542
543    /// If current snapshot is Some(-1) then set it to None.
544    fn normalize_current_snapshot(&mut self) -> Result<()> {
545        if let Some(current_snapshot_id) = self.current_snapshot_id {
546            if current_snapshot_id == EMPTY_SNAPSHOT_ID {
547                self.current_snapshot_id = None;
548            } else if self.snapshot_by_id(current_snapshot_id).is_none() {
549                return Err(Error::new(
550                    ErrorKind::DataInvalid,
551                    format!(
552                        "Snapshot for current snapshot id {current_snapshot_id} does not exist in the existing snapshots list"
553                    ),
554                ));
555            }
556        }
557        Ok(())
558    }
559
560    /// Validate that all refs are valid (snapshot exists)
561    fn validate_refs(&self) -> Result<()> {
562        for (name, snapshot_ref) in self.refs.iter() {
563            if self.snapshot_by_id(snapshot_ref.snapshot_id).is_none() {
564                return Err(Error::new(
565                    ErrorKind::DataInvalid,
566                    format!(
567                        "Snapshot for reference {name} does not exist in the existing snapshots list"
568                    ),
569                ));
570            }
571        }
572
573        let main_ref = self.refs.get(MAIN_BRANCH);
574        if self.current_snapshot_id.is_some() {
575            if let Some(main_ref) = main_ref {
576                if main_ref.snapshot_id != self.current_snapshot_id.unwrap_or_default() {
577                    return Err(Error::new(
578                        ErrorKind::DataInvalid,
579                        format!(
580                            "Current snapshot id does not match main branch ({:?} != {:?})",
581                            self.current_snapshot_id.unwrap_or_default(),
582                            main_ref.snapshot_id
583                        ),
584                    ));
585                }
586            }
587        } else if main_ref.is_some() {
588            return Err(Error::new(
589                ErrorKind::DataInvalid,
590                "Current snapshot is not set, but main branch exists",
591            ));
592        }
593
594        Ok(())
595    }
596
597    /// Validate that for V1 Metadata the last_sequence_number is 0
598    fn validate_snapshot_sequence_number(&self) -> Result<()> {
599        if self.format_version < FormatVersion::V2 && self.last_sequence_number != 0 {
600            return Err(Error::new(
601                ErrorKind::DataInvalid,
602                format!(
603                    "Last sequence number must be 0 in v1. Found {}",
604                    self.last_sequence_number
605                ),
606            ));
607        }
608
609        if self.format_version >= FormatVersion::V2 {
610            if let Some(snapshot) = self
611                .snapshots
612                .values()
613                .find(|snapshot| snapshot.sequence_number() > self.last_sequence_number)
614            {
615                return Err(Error::new(
616                    ErrorKind::DataInvalid,
617                    format!(
618                        "Invalid snapshot with id {} and sequence number {} greater than last sequence number {}",
619                        snapshot.snapshot_id(),
620                        snapshot.sequence_number(),
621                        self.last_sequence_number
622                    ),
623                ));
624            }
625        }
626
627        Ok(())
628    }
629
630    /// Validate snapshots logs are chronological and last updated is after the last snapshot log.
631    fn validate_chronological_snapshot_logs(&self) -> Result<()> {
632        for window in self.snapshot_log.windows(2) {
633            let (prev, curr) = (&window[0], &window[1]);
634            // commits can happen concurrently from different machines.
635            // A tolerance helps us avoid failure for small clock skew
636            if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
637                return Err(Error::new(
638                    ErrorKind::DataInvalid,
639                    "Expected sorted snapshot log entries",
640                ));
641            }
642        }
643
644        if let Some(last) = self.snapshot_log.last() {
645            // commits can happen concurrently from different machines.
646            // A tolerance helps us avoid failure for small clock skew
647            if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
648                return Err(Error::new(
649                    ErrorKind::DataInvalid,
650                    format!(
651                        "Invalid update timestamp {}: before last snapshot log entry at {}",
652                        self.last_updated_ms, last.timestamp_ms
653                    ),
654                ));
655            }
656        }
657        Ok(())
658    }
659
660    fn validate_chronological_metadata_logs(&self) -> Result<()> {
661        for window in self.metadata_log.windows(2) {
662            let (prev, curr) = (&window[0], &window[1]);
663            // commits can happen concurrently from different machines.
664            // A tolerance helps us avoid failure for small clock skew
665            if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
666                return Err(Error::new(
667                    ErrorKind::DataInvalid,
668                    "Expected sorted metadata log entries",
669                ));
670            }
671        }
672
673        if let Some(last) = self.metadata_log.last() {
674            // commits can happen concurrently from different machines.
675            // A tolerance helps us avoid failure for small clock skew
676            if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
677                return Err(Error::new(
678                    ErrorKind::DataInvalid,
679                    format!(
680                        "Invalid update timestamp {}: before last metadata log entry at {}",
681                        self.last_updated_ms, last.timestamp_ms
682                    ),
683                ));
684            }
685        }
686
687        Ok(())
688    }
689}
690
691pub(super) mod _serde {
692    use std::borrow::BorrowMut;
693    /// This is a helper module that defines types to help with serialization/deserialization.
694    /// For deserialization the input first gets read into either the [TableMetadataV1] or [TableMetadataV2] struct
695    /// and then converted into the [TableMetadata] struct. Serialization works the other way around.
696    /// [TableMetadataV1] and [TableMetadataV2] are internal struct that are only used for serialization and deserialization.
697    use std::collections::HashMap;
698    /// This is a helper module that defines types to help with serialization/deserialization.
699    /// For deserialization the input first gets read into either the [TableMetadataV1] or [TableMetadataV2] struct
700    /// and then converted into the [TableMetadata] struct. Serialization works the other way around.
701    /// [TableMetadataV1] and [TableMetadataV2] are internal struct that are only used for serialization and deserialization.
702    use std::sync::Arc;
703
704    use serde::{Deserialize, Serialize};
705    use uuid::Uuid;
706
707    use super::{
708        DEFAULT_PARTITION_SPEC_ID, FormatVersion, MAIN_BRANCH, MetadataLog, SnapshotLog,
709        TableMetadata,
710    };
711    use crate::spec::schema::_serde::{SchemaV1, SchemaV2};
712    use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2, SnapshotV3};
713    use crate::spec::{
714        EncryptedKey, INITIAL_ROW_ID, PartitionField, PartitionSpec, PartitionSpecRef,
715        PartitionStatisticsFile, Schema, SchemaRef, Snapshot, SnapshotReference, SnapshotRetention,
716        SortOrder, StatisticsFile,
717    };
718    use crate::{Error, ErrorKind};
719
720    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
721    #[serde(untagged)]
722    pub(super) enum TableMetadataEnum {
723        V3(TableMetadataV3),
724        V2(TableMetadataV2),
725        V1(TableMetadataV1),
726    }
727
728    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
729    #[serde(rename_all = "kebab-case")]
730    /// Defines the structure of a v2 table metadata for serialization/deserialization
731    pub(super) struct TableMetadataV3 {
732        pub format_version: VersionNumber<3>,
733        #[serde(flatten)]
734        pub shared: TableMetadataV2V3Shared,
735        pub next_row_id: u64,
736        #[serde(skip_serializing_if = "Option::is_none")]
737        pub encryption_keys: Option<Vec<EncryptedKey>>,
738        #[serde(skip_serializing_if = "Option::is_none")]
739        pub snapshots: Option<Vec<SnapshotV3>>,
740    }
741
742    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
743    #[serde(rename_all = "kebab-case")]
744    /// Defines the structure of a v2 table metadata for serialization/deserialization
745    pub(super) struct TableMetadataV2V3Shared {
746        pub table_uuid: Uuid,
747        pub location: String,
748        pub last_sequence_number: i64,
749        pub last_updated_ms: i64,
750        pub last_column_id: i32,
751        pub schemas: Vec<SchemaV2>,
752        pub current_schema_id: i32,
753        pub partition_specs: Vec<PartitionSpec>,
754        pub default_spec_id: i32,
755        pub last_partition_id: i32,
756        #[serde(skip_serializing_if = "Option::is_none")]
757        pub properties: Option<HashMap<String, String>>,
758        #[serde(skip_serializing_if = "Option::is_none")]
759        pub current_snapshot_id: Option<i64>,
760        #[serde(skip_serializing_if = "Option::is_none")]
761        pub snapshot_log: Option<Vec<SnapshotLog>>,
762        #[serde(skip_serializing_if = "Option::is_none")]
763        pub metadata_log: Option<Vec<MetadataLog>>,
764        pub sort_orders: Vec<SortOrder>,
765        pub default_sort_order_id: i64,
766        #[serde(skip_serializing_if = "Option::is_none")]
767        pub refs: Option<HashMap<String, SnapshotReference>>,
768        #[serde(default, skip_serializing_if = "Vec::is_empty")]
769        pub statistics: Vec<StatisticsFile>,
770        #[serde(default, skip_serializing_if = "Vec::is_empty")]
771        pub partition_statistics: Vec<PartitionStatisticsFile>,
772    }
773
774    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
775    #[serde(rename_all = "kebab-case")]
776    /// Defines the structure of a v2 table metadata for serialization/deserialization
777    pub(super) struct TableMetadataV2 {
778        pub format_version: VersionNumber<2>,
779        #[serde(flatten)]
780        pub shared: TableMetadataV2V3Shared,
781        #[serde(skip_serializing_if = "Option::is_none")]
782        pub snapshots: Option<Vec<SnapshotV2>>,
783    }
784
785    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
786    #[serde(rename_all = "kebab-case")]
787    /// Defines the structure of a v1 table metadata for serialization/deserialization
788    pub(super) struct TableMetadataV1 {
789        pub format_version: VersionNumber<1>,
790        #[serde(skip_serializing_if = "Option::is_none")]
791        pub table_uuid: Option<Uuid>,
792        pub location: String,
793        pub last_updated_ms: i64,
794        pub last_column_id: i32,
795        /// `schema` is optional to prioritize `schemas` and `current-schema-id`, allowing liberal reading of V1 metadata.
796        pub schema: Option<SchemaV1>,
797        #[serde(skip_serializing_if = "Option::is_none")]
798        pub schemas: Option<Vec<SchemaV1>>,
799        #[serde(skip_serializing_if = "Option::is_none")]
800        pub current_schema_id: Option<i32>,
801        /// `partition_spec` is optional to prioritize `partition_specs`, aligning with liberal reading of potentially invalid V1 metadata.
802        pub partition_spec: Option<Vec<PartitionField>>,
803        #[serde(skip_serializing_if = "Option::is_none")]
804        pub partition_specs: Option<Vec<PartitionSpec>>,
805        #[serde(skip_serializing_if = "Option::is_none")]
806        pub default_spec_id: Option<i32>,
807        #[serde(skip_serializing_if = "Option::is_none")]
808        pub last_partition_id: Option<i32>,
809        #[serde(skip_serializing_if = "Option::is_none")]
810        pub properties: Option<HashMap<String, String>>,
811        #[serde(skip_serializing_if = "Option::is_none")]
812        pub current_snapshot_id: Option<i64>,
813        #[serde(skip_serializing_if = "Option::is_none")]
814        pub snapshots: Option<Vec<SnapshotV1>>,
815        #[serde(skip_serializing_if = "Option::is_none")]
816        pub snapshot_log: Option<Vec<SnapshotLog>>,
817        #[serde(skip_serializing_if = "Option::is_none")]
818        pub metadata_log: Option<Vec<MetadataLog>>,
819        pub sort_orders: Option<Vec<SortOrder>>,
820        pub default_sort_order_id: Option<i64>,
821        #[serde(default, skip_serializing_if = "Vec::is_empty")]
822        pub statistics: Vec<StatisticsFile>,
823        #[serde(default, skip_serializing_if = "Vec::is_empty")]
824        pub partition_statistics: Vec<PartitionStatisticsFile>,
825    }
826
827    /// Helper to serialize and deserialize the format version.
828    #[derive(Debug, PartialEq, Eq)]
829    pub(crate) struct VersionNumber<const V: u8>;
830
831    impl Serialize for TableMetadata {
832        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
833        where S: serde::Serializer {
834            // we must do a clone here
835            let table_metadata_enum: TableMetadataEnum =
836                self.clone().try_into().map_err(serde::ser::Error::custom)?;
837
838            table_metadata_enum.serialize(serializer)
839        }
840    }
841
842    impl<const V: u8> Serialize for VersionNumber<V> {
843        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
844        where S: serde::Serializer {
845            serializer.serialize_u8(V)
846        }
847    }
848
849    impl<'de, const V: u8> Deserialize<'de> for VersionNumber<V> {
850        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
851        where D: serde::Deserializer<'de> {
852            let value = u8::deserialize(deserializer)?;
853            if value == V {
854                Ok(VersionNumber::<V>)
855            } else {
856                Err(serde::de::Error::custom("Invalid Version"))
857            }
858        }
859    }
860
861    impl TryFrom<TableMetadataEnum> for TableMetadata {
862        type Error = Error;
863        fn try_from(value: TableMetadataEnum) -> Result<Self, Error> {
864            match value {
865                TableMetadataEnum::V3(value) => value.try_into(),
866                TableMetadataEnum::V2(value) => value.try_into(),
867                TableMetadataEnum::V1(value) => value.try_into(),
868            }
869        }
870    }
871
872    impl TryFrom<TableMetadata> for TableMetadataEnum {
873        type Error = Error;
874        fn try_from(value: TableMetadata) -> Result<Self, Error> {
875            Ok(match value.format_version {
876                FormatVersion::V3 => TableMetadataEnum::V3(value.try_into()?),
877                FormatVersion::V2 => TableMetadataEnum::V2(value.into()),
878                FormatVersion::V1 => TableMetadataEnum::V1(value.try_into()?),
879            })
880        }
881    }
882
883    impl TryFrom<TableMetadataV3> for TableMetadata {
884        type Error = Error;
885        fn try_from(value: TableMetadataV3) -> Result<Self, self::Error> {
886            let TableMetadataV3 {
887                format_version: _,
888                shared: value,
889                next_row_id,
890                encryption_keys,
891                snapshots,
892            } = value;
893            let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
894                None
895            } else {
896                value.current_snapshot_id
897            };
898            let schemas = HashMap::from_iter(
899                value
900                    .schemas
901                    .into_iter()
902                    .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
903                    .collect::<Result<Vec<_>, Error>>()?,
904            );
905
906            let current_schema: &SchemaRef =
907                schemas.get(&value.current_schema_id).ok_or_else(|| {
908                    Error::new(
909                        ErrorKind::DataInvalid,
910                        format!(
911                            "No schema exists with the current schema id {}.",
912                            value.current_schema_id
913                        ),
914                    )
915                })?;
916            let partition_specs = HashMap::from_iter(
917                value
918                    .partition_specs
919                    .into_iter()
920                    .map(|x| (x.spec_id(), Arc::new(x))),
921            );
922            let default_spec_id = value.default_spec_id;
923            let default_spec: PartitionSpecRef = partition_specs
924                .get(&value.default_spec_id)
925                .map(|spec| (**spec).clone())
926                .or_else(|| {
927                    (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
928                        .then(PartitionSpec::unpartition_spec)
929                })
930                .ok_or_else(|| {
931                    Error::new(
932                        ErrorKind::DataInvalid,
933                        format!("Default partition spec {default_spec_id} not found"),
934                    )
935                })?
936                .into();
937            let default_partition_type = default_spec.partition_type(current_schema)?;
938
939            let mut metadata = TableMetadata {
940                format_version: FormatVersion::V3,
941                table_uuid: value.table_uuid,
942                location: value.location,
943                last_sequence_number: value.last_sequence_number,
944                last_updated_ms: value.last_updated_ms,
945                last_column_id: value.last_column_id,
946                current_schema_id: value.current_schema_id,
947                schemas,
948                partition_specs,
949                default_partition_type,
950                default_spec,
951                last_partition_id: value.last_partition_id,
952                properties: value.properties.unwrap_or_default(),
953                current_snapshot_id,
954                snapshots: snapshots
955                    .map(|snapshots| {
956                        HashMap::from_iter(
957                            snapshots
958                                .into_iter()
959                                .map(|x| (x.snapshot_id, Arc::new(x.into()))),
960                        )
961                    })
962                    .unwrap_or_default(),
963                snapshot_log: value.snapshot_log.unwrap_or_default(),
964                metadata_log: value.metadata_log.unwrap_or_default(),
965                sort_orders: HashMap::from_iter(
966                    value
967                        .sort_orders
968                        .into_iter()
969                        .map(|x| (x.order_id, Arc::new(x))),
970                ),
971                default_sort_order_id: value.default_sort_order_id,
972                refs: value.refs.unwrap_or_else(|| {
973                    if let Some(snapshot_id) = current_snapshot_id {
974                        HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
975                            snapshot_id,
976                            retention: SnapshotRetention::Branch {
977                                min_snapshots_to_keep: None,
978                                max_snapshot_age_ms: None,
979                                max_ref_age_ms: None,
980                            },
981                        })])
982                    } else {
983                        HashMap::new()
984                    }
985                }),
986                statistics: index_statistics(value.statistics),
987                partition_statistics: index_partition_statistics(value.partition_statistics),
988                encryption_keys: encryption_keys
989                    .map(|keys| {
990                        HashMap::from_iter(keys.into_iter().map(|key| (key.key_id.clone(), key)))
991                    })
992                    .unwrap_or_default(),
993                next_row_id,
994            };
995
996            metadata.borrow_mut().try_normalize()?;
997            Ok(metadata)
998        }
999    }
1000
1001    impl TryFrom<TableMetadataV2> for TableMetadata {
1002        type Error = Error;
1003        fn try_from(value: TableMetadataV2) -> Result<Self, self::Error> {
1004            let snapshots = value.snapshots;
1005            let value = value.shared;
1006            let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1007                None
1008            } else {
1009                value.current_snapshot_id
1010            };
1011            let schemas = HashMap::from_iter(
1012                value
1013                    .schemas
1014                    .into_iter()
1015                    .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
1016                    .collect::<Result<Vec<_>, Error>>()?,
1017            );
1018
1019            let current_schema: &SchemaRef =
1020                schemas.get(&value.current_schema_id).ok_or_else(|| {
1021                    Error::new(
1022                        ErrorKind::DataInvalid,
1023                        format!(
1024                            "No schema exists with the current schema id {}.",
1025                            value.current_schema_id
1026                        ),
1027                    )
1028                })?;
1029            let partition_specs = HashMap::from_iter(
1030                value
1031                    .partition_specs
1032                    .into_iter()
1033                    .map(|x| (x.spec_id(), Arc::new(x))),
1034            );
1035            let default_spec_id = value.default_spec_id;
1036            let default_spec: PartitionSpecRef = partition_specs
1037                .get(&value.default_spec_id)
1038                .map(|spec| (**spec).clone())
1039                .or_else(|| {
1040                    (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
1041                        .then(PartitionSpec::unpartition_spec)
1042                })
1043                .ok_or_else(|| {
1044                    Error::new(
1045                        ErrorKind::DataInvalid,
1046                        format!("Default partition spec {default_spec_id} not found"),
1047                    )
1048                })?
1049                .into();
1050            let default_partition_type = default_spec.partition_type(current_schema)?;
1051
1052            let mut metadata = TableMetadata {
1053                format_version: FormatVersion::V2,
1054                table_uuid: value.table_uuid,
1055                location: value.location,
1056                last_sequence_number: value.last_sequence_number,
1057                last_updated_ms: value.last_updated_ms,
1058                last_column_id: value.last_column_id,
1059                current_schema_id: value.current_schema_id,
1060                schemas,
1061                partition_specs,
1062                default_partition_type,
1063                default_spec,
1064                last_partition_id: value.last_partition_id,
1065                properties: value.properties.unwrap_or_default(),
1066                current_snapshot_id,
1067                snapshots: snapshots
1068                    .map(|snapshots| {
1069                        HashMap::from_iter(
1070                            snapshots
1071                                .into_iter()
1072                                .map(|x| (x.snapshot_id, Arc::new(x.into()))),
1073                        )
1074                    })
1075                    .unwrap_or_default(),
1076                snapshot_log: value.snapshot_log.unwrap_or_default(),
1077                metadata_log: value.metadata_log.unwrap_or_default(),
1078                sort_orders: HashMap::from_iter(
1079                    value
1080                        .sort_orders
1081                        .into_iter()
1082                        .map(|x| (x.order_id, Arc::new(x))),
1083                ),
1084                default_sort_order_id: value.default_sort_order_id,
1085                refs: value.refs.unwrap_or_else(|| {
1086                    if let Some(snapshot_id) = current_snapshot_id {
1087                        HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1088                            snapshot_id,
1089                            retention: SnapshotRetention::Branch {
1090                                min_snapshots_to_keep: None,
1091                                max_snapshot_age_ms: None,
1092                                max_ref_age_ms: None,
1093                            },
1094                        })])
1095                    } else {
1096                        HashMap::new()
1097                    }
1098                }),
1099                statistics: index_statistics(value.statistics),
1100                partition_statistics: index_partition_statistics(value.partition_statistics),
1101                encryption_keys: HashMap::new(),
1102                next_row_id: INITIAL_ROW_ID,
1103            };
1104
1105            metadata.borrow_mut().try_normalize()?;
1106            Ok(metadata)
1107        }
1108    }
1109
1110    impl TryFrom<TableMetadataV1> for TableMetadata {
1111        type Error = Error;
1112        fn try_from(value: TableMetadataV1) -> Result<Self, Error> {
1113            let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1114                None
1115            } else {
1116                value.current_snapshot_id
1117            };
1118
1119            let (schemas, current_schema_id, current_schema) =
1120                if let (Some(schemas_vec), Some(schema_id)) =
1121                    (&value.schemas, value.current_schema_id)
1122                {
1123                    // Option 1: Use 'schemas' + 'current_schema_id'
1124                    let schema_map = HashMap::from_iter(
1125                        schemas_vec
1126                            .clone()
1127                            .into_iter()
1128                            .map(|schema| {
1129                                let schema: Schema = schema.try_into()?;
1130                                Ok((schema.schema_id(), Arc::new(schema)))
1131                            })
1132                            .collect::<Result<Vec<_>, Error>>()?,
1133                    );
1134
1135                    let schema = schema_map
1136                        .get(&schema_id)
1137                        .ok_or_else(|| {
1138                            Error::new(
1139                                ErrorKind::DataInvalid,
1140                                format!("No schema exists with the current schema id {schema_id}."),
1141                            )
1142                        })?
1143                        .clone();
1144                    (schema_map, schema_id, schema)
1145                } else if let Some(schema) = value.schema {
1146                    // Option 2: Fall back to `schema`
1147                    let schema: Schema = schema.try_into()?;
1148                    let schema_id = schema.schema_id();
1149                    let schema_arc = Arc::new(schema);
1150                    let schema_map = HashMap::from_iter(vec![(schema_id, schema_arc.clone())]);
1151                    (schema_map, schema_id, schema_arc)
1152                } else {
1153                    // Option 3: No valid schema configuration found
1154                    return Err(Error::new(
1155                        ErrorKind::DataInvalid,
1156                        "No valid schema configuration found in table metadata",
1157                    ));
1158                };
1159
1160            // Prioritize 'partition_specs' over 'partition_spec'
1161            let partition_specs = if let Some(specs_vec) = value.partition_specs {
1162                // Option 1: Use 'partition_specs'
1163                specs_vec
1164                    .into_iter()
1165                    .map(|x| (x.spec_id(), Arc::new(x)))
1166                    .collect::<HashMap<_, _>>()
1167            } else if let Some(partition_spec) = value.partition_spec {
1168                // Option 2: Fall back to 'partition_spec'
1169                let spec = PartitionSpec::builder(current_schema.clone())
1170                    .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1171                    .add_unbound_fields(partition_spec.into_iter().map(|f| f.into_unbound()))?
1172                    .build()?;
1173
1174                HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1175            } else {
1176                // Option 3: Create empty partition spec
1177                let spec = PartitionSpec::builder(current_schema.clone())
1178                    .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1179                    .build()?;
1180
1181                HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1182            };
1183
1184            // Get the default_spec_id, prioritizing the explicit value if provided
1185            let default_spec_id = value
1186                .default_spec_id
1187                .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default());
1188
1189            // Get the default spec
1190            let default_spec: PartitionSpecRef = partition_specs
1191                .get(&default_spec_id)
1192                .map(|x| Arc::unwrap_or_clone(x.clone()))
1193                .ok_or_else(|| {
1194                    Error::new(
1195                        ErrorKind::DataInvalid,
1196                        format!("Default partition spec {default_spec_id} not found"),
1197                    )
1198                })?
1199                .into();
1200            let default_partition_type = default_spec.partition_type(&current_schema)?;
1201
1202            let mut metadata = TableMetadata {
1203                format_version: FormatVersion::V1,
1204                table_uuid: value.table_uuid.unwrap_or_default(),
1205                location: value.location,
1206                last_sequence_number: 0,
1207                last_updated_ms: value.last_updated_ms,
1208                last_column_id: value.last_column_id,
1209                current_schema_id,
1210                default_spec,
1211                default_partition_type,
1212                last_partition_id: value
1213                    .last_partition_id
1214                    .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()),
1215                partition_specs,
1216                schemas,
1217                properties: value.properties.unwrap_or_default(),
1218                current_snapshot_id,
1219                snapshots: value
1220                    .snapshots
1221                    .map(|snapshots| {
1222                        Ok::<_, Error>(HashMap::from_iter(
1223                            snapshots
1224                                .into_iter()
1225                                .map(|x| Ok((x.snapshot_id, Arc::new(x.try_into()?))))
1226                                .collect::<Result<Vec<_>, Error>>()?,
1227                        ))
1228                    })
1229                    .transpose()?
1230                    .unwrap_or_default(),
1231                snapshot_log: value.snapshot_log.unwrap_or_default(),
1232                metadata_log: value.metadata_log.unwrap_or_default(),
1233                sort_orders: match value.sort_orders {
1234                    Some(sort_orders) => HashMap::from_iter(
1235                        sort_orders.into_iter().map(|x| (x.order_id, Arc::new(x))),
1236                    ),
1237                    None => HashMap::new(),
1238                },
1239                default_sort_order_id: value
1240                    .default_sort_order_id
1241                    .unwrap_or(SortOrder::UNSORTED_ORDER_ID),
1242                refs: if let Some(snapshot_id) = current_snapshot_id {
1243                    HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1244                        snapshot_id,
1245                        retention: SnapshotRetention::Branch {
1246                            min_snapshots_to_keep: None,
1247                            max_snapshot_age_ms: None,
1248                            max_ref_age_ms: None,
1249                        },
1250                    })])
1251                } else {
1252                    HashMap::new()
1253                },
1254                statistics: index_statistics(value.statistics),
1255                partition_statistics: index_partition_statistics(value.partition_statistics),
1256                encryption_keys: HashMap::new(),
1257                next_row_id: INITIAL_ROW_ID, // v1 has no row lineage
1258            };
1259
1260            metadata.borrow_mut().try_normalize()?;
1261            Ok(metadata)
1262        }
1263    }
1264
1265    impl TryFrom<TableMetadata> for TableMetadataV3 {
1266        type Error = Error;
1267
1268        fn try_from(mut v: TableMetadata) -> Result<Self, Self::Error> {
1269            let next_row_id = v.next_row_id;
1270            let encryption_keys = std::mem::take(&mut v.encryption_keys);
1271            let snapshots = std::mem::take(&mut v.snapshots);
1272            let shared = v.into();
1273
1274            Ok(TableMetadataV3 {
1275                format_version: VersionNumber::<3>,
1276                shared,
1277                next_row_id,
1278                encryption_keys: if encryption_keys.is_empty() {
1279                    None
1280                } else {
1281                    Some(encryption_keys.into_values().collect())
1282                },
1283                snapshots: if snapshots.is_empty() {
1284                    None
1285                } else {
1286                    Some(
1287                        snapshots
1288                            .into_values()
1289                            .map(|s| SnapshotV3::try_from(Arc::unwrap_or_clone(s)))
1290                            .collect::<Result<_, _>>()?,
1291                    )
1292                },
1293            })
1294        }
1295    }
1296
1297    impl From<TableMetadata> for TableMetadataV2 {
1298        fn from(mut v: TableMetadata) -> Self {
1299            let snapshots = std::mem::take(&mut v.snapshots);
1300            let shared = v.into();
1301
1302            TableMetadataV2 {
1303                format_version: VersionNumber::<2>,
1304                shared,
1305                snapshots: if snapshots.is_empty() {
1306                    None
1307                } else {
1308                    Some(
1309                        snapshots
1310                            .into_values()
1311                            .map(|s| SnapshotV2::from(Arc::unwrap_or_clone(s)))
1312                            .collect(),
1313                    )
1314                },
1315            }
1316        }
1317    }
1318
1319    impl From<TableMetadata> for TableMetadataV2V3Shared {
1320        fn from(v: TableMetadata) -> Self {
1321            TableMetadataV2V3Shared {
1322                table_uuid: v.table_uuid,
1323                location: v.location,
1324                last_sequence_number: v.last_sequence_number,
1325                last_updated_ms: v.last_updated_ms,
1326                last_column_id: v.last_column_id,
1327                schemas: v
1328                    .schemas
1329                    .into_values()
1330                    .map(|x| {
1331                        Arc::try_unwrap(x)
1332                            .unwrap_or_else(|schema| schema.as_ref().clone())
1333                            .into()
1334                    })
1335                    .collect(),
1336                current_schema_id: v.current_schema_id,
1337                partition_specs: v
1338                    .partition_specs
1339                    .into_values()
1340                    .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1341                    .collect(),
1342                default_spec_id: v.default_spec.spec_id(),
1343                last_partition_id: v.last_partition_id,
1344                properties: if v.properties.is_empty() {
1345                    None
1346                } else {
1347                    Some(v.properties)
1348                },
1349                current_snapshot_id: v.current_snapshot_id,
1350                snapshot_log: if v.snapshot_log.is_empty() {
1351                    None
1352                } else {
1353                    Some(v.snapshot_log)
1354                },
1355                metadata_log: if v.metadata_log.is_empty() {
1356                    None
1357                } else {
1358                    Some(v.metadata_log)
1359                },
1360                sort_orders: v
1361                    .sort_orders
1362                    .into_values()
1363                    .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1364                    .collect(),
1365                default_sort_order_id: v.default_sort_order_id,
1366                refs: Some(v.refs),
1367                statistics: v.statistics.into_values().collect(),
1368                partition_statistics: v.partition_statistics.into_values().collect(),
1369            }
1370        }
1371    }
1372
1373    impl TryFrom<TableMetadata> for TableMetadataV1 {
1374        type Error = Error;
1375        fn try_from(v: TableMetadata) -> Result<Self, Error> {
1376            Ok(TableMetadataV1 {
1377                format_version: VersionNumber::<1>,
1378                table_uuid: Some(v.table_uuid),
1379                location: v.location,
1380                last_updated_ms: v.last_updated_ms,
1381                last_column_id: v.last_column_id,
1382                schema: Some(
1383                    v.schemas
1384                        .get(&v.current_schema_id)
1385                        .ok_or(Error::new(
1386                            ErrorKind::Unexpected,
1387                            "current_schema_id not found in schemas",
1388                        ))?
1389                        .as_ref()
1390                        .clone()
1391                        .into(),
1392                ),
1393                schemas: Some(
1394                    v.schemas
1395                        .into_values()
1396                        .map(|x| {
1397                            Arc::try_unwrap(x)
1398                                .unwrap_or_else(|schema| schema.as_ref().clone())
1399                                .into()
1400                        })
1401                        .collect(),
1402                ),
1403                current_schema_id: Some(v.current_schema_id),
1404                partition_spec: Some(v.default_spec.fields().to_vec()),
1405                partition_specs: Some(
1406                    v.partition_specs
1407                        .into_values()
1408                        .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1409                        .collect(),
1410                ),
1411                default_spec_id: Some(v.default_spec.spec_id()),
1412                last_partition_id: Some(v.last_partition_id),
1413                properties: if v.properties.is_empty() {
1414                    None
1415                } else {
1416                    Some(v.properties)
1417                },
1418                current_snapshot_id: v.current_snapshot_id,
1419                snapshots: if v.snapshots.is_empty() {
1420                    None
1421                } else {
1422                    Some(
1423                        v.snapshots
1424                            .into_values()
1425                            .map(|x| Snapshot::clone(&x).into())
1426                            .collect(),
1427                    )
1428                },
1429                snapshot_log: if v.snapshot_log.is_empty() {
1430                    None
1431                } else {
1432                    Some(v.snapshot_log)
1433                },
1434                metadata_log: if v.metadata_log.is_empty() {
1435                    None
1436                } else {
1437                    Some(v.metadata_log)
1438                },
1439                sort_orders: Some(
1440                    v.sort_orders
1441                        .into_values()
1442                        .map(|s| Arc::try_unwrap(s).unwrap_or_else(|s| s.as_ref().clone()))
1443                        .collect(),
1444                ),
1445                default_sort_order_id: Some(v.default_sort_order_id),
1446                statistics: v.statistics.into_values().collect(),
1447                partition_statistics: v.partition_statistics.into_values().collect(),
1448            })
1449        }
1450    }
1451
1452    fn index_statistics(statistics: Vec<StatisticsFile>) -> HashMap<i64, StatisticsFile> {
1453        statistics
1454            .into_iter()
1455            .rev()
1456            .map(|s| (s.snapshot_id, s))
1457            .collect()
1458    }
1459
1460    fn index_partition_statistics(
1461        statistics: Vec<PartitionStatisticsFile>,
1462    ) -> HashMap<i64, PartitionStatisticsFile> {
1463        statistics
1464            .into_iter()
1465            .rev()
1466            .map(|s| (s.snapshot_id, s))
1467            .collect()
1468    }
1469}
1470
1471#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy, Hash)]
1472#[repr(u8)]
1473/// Iceberg format version
1474pub enum FormatVersion {
1475    /// Iceberg spec version 1
1476    V1 = 1u8,
1477    /// Iceberg spec version 2
1478    V2 = 2u8,
1479    /// Iceberg spec version 3
1480    V3 = 3u8,
1481}
1482
1483impl PartialOrd for FormatVersion {
1484    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1485        Some(self.cmp(other))
1486    }
1487}
1488
1489impl Ord for FormatVersion {
1490    fn cmp(&self, other: &Self) -> Ordering {
1491        (*self as u8).cmp(&(*other as u8))
1492    }
1493}
1494
1495impl Display for FormatVersion {
1496    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1497        match self {
1498            FormatVersion::V1 => write!(f, "v1"),
1499            FormatVersion::V2 => write!(f, "v2"),
1500            FormatVersion::V3 => write!(f, "v3"),
1501        }
1502    }
1503}
1504
1505#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1506#[serde(rename_all = "kebab-case")]
1507/// Encodes changes to the previous metadata files for the table
1508pub struct MetadataLog {
1509    /// The file for the log.
1510    pub metadata_file: String,
1511    /// Time new metadata was created
1512    pub timestamp_ms: i64,
1513}
1514
1515#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1516#[serde(rename_all = "kebab-case")]
1517/// A log of when each snapshot was made.
1518pub struct SnapshotLog {
1519    /// Id of the snapshot.
1520    pub snapshot_id: i64,
1521    /// Last updated timestamp
1522    pub timestamp_ms: i64,
1523}
1524
1525impl SnapshotLog {
1526    /// Returns the last updated timestamp as a DateTime<Utc> with millisecond precision
1527    pub fn timestamp(self) -> Result<DateTime<Utc>> {
1528        timestamp_ms_to_utc(self.timestamp_ms)
1529    }
1530
1531    /// Returns the timestamp in milliseconds
1532    #[inline]
1533    pub fn timestamp_ms(&self) -> i64 {
1534        self.timestamp_ms
1535    }
1536}
1537
1538#[cfg(test)]
1539mod tests {
1540    use std::collections::HashMap;
1541    use std::fs;
1542    use std::io::Write as _;
1543    use std::sync::Arc;
1544
1545    use anyhow::Result;
1546    use base64::Engine as _;
1547    use pretty_assertions::assert_eq;
1548    use tempfile::TempDir;
1549    use uuid::Uuid;
1550
1551    use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder};
1552    use crate::TableCreation;
1553    use crate::io::FileIOBuilder;
1554    use crate::spec::table_metadata::TableMetadata;
1555    use crate::spec::{
1556        BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation,
1557        PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot,
1558        SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile,
1559        Summary, Transform, Type, UnboundPartitionField,
1560    };
1561
1562    fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
1563        let desered_type: TableMetadata = serde_json::from_str(json).unwrap();
1564        assert_eq!(desered_type, expected_type);
1565
1566        let sered_json = serde_json::to_string(&expected_type).unwrap();
1567        let parsed_json_value = serde_json::from_str::<TableMetadata>(&sered_json).unwrap();
1568
1569        assert_eq!(parsed_json_value, desered_type);
1570    }
1571
1572    fn get_test_table_metadata(file_name: &str) -> TableMetadata {
1573        let path = format!("testdata/table_metadata/{file_name}");
1574        let metadata: String = fs::read_to_string(path).unwrap();
1575
1576        serde_json::from_str(&metadata).unwrap()
1577    }
1578
1579    #[test]
1580    fn test_table_data_v2() {
1581        let data = r#"
1582            {
1583                "format-version" : 2,
1584                "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1585                "location": "s3://b/wh/data.db/table",
1586                "last-sequence-number" : 1,
1587                "last-updated-ms": 1515100955770,
1588                "last-column-id": 1,
1589                "schemas": [
1590                    {
1591                        "schema-id" : 1,
1592                        "type" : "struct",
1593                        "fields" :[
1594                            {
1595                                "id": 1,
1596                                "name": "struct_name",
1597                                "required": true,
1598                                "type": "fixed[1]"
1599                            },
1600                            {
1601                                "id": 4,
1602                                "name": "ts",
1603                                "required": true,
1604                                "type": "timestamp"
1605                            }
1606                        ]
1607                    }
1608                ],
1609                "current-schema-id" : 1,
1610                "partition-specs": [
1611                    {
1612                        "spec-id": 0,
1613                        "fields": [
1614                            {
1615                                "source-id": 4,
1616                                "field-id": 1000,
1617                                "name": "ts_day",
1618                                "transform": "day"
1619                            }
1620                        ]
1621                    }
1622                ],
1623                "default-spec-id": 0,
1624                "last-partition-id": 1000,
1625                "properties": {
1626                    "commit.retry.num-retries": "1"
1627                },
1628                "metadata-log": [
1629                    {
1630                        "metadata-file": "s3://bucket/.../v1.json",
1631                        "timestamp-ms": 1515100
1632                    }
1633                ],
1634                "refs": {},
1635                "sort-orders": [
1636                    {
1637                    "order-id": 0,
1638                    "fields": []
1639                    }
1640                ],
1641                "default-sort-order-id": 0
1642            }
1643        "#;
1644
1645        let schema = Schema::builder()
1646            .with_schema_id(1)
1647            .with_fields(vec![
1648                Arc::new(NestedField::required(
1649                    1,
1650                    "struct_name",
1651                    Type::Primitive(PrimitiveType::Fixed(1)),
1652                )),
1653                Arc::new(NestedField::required(
1654                    4,
1655                    "ts",
1656                    Type::Primitive(PrimitiveType::Timestamp),
1657                )),
1658            ])
1659            .build()
1660            .unwrap();
1661
1662        let partition_spec = PartitionSpec::builder(schema.clone())
1663            .with_spec_id(0)
1664            .add_unbound_field(UnboundPartitionField {
1665                name: "ts_day".to_string(),
1666                transform: Transform::Day,
1667                source_id: 4,
1668                field_id: Some(1000),
1669            })
1670            .unwrap()
1671            .build()
1672            .unwrap();
1673
1674        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1675        let expected = TableMetadata {
1676            format_version: FormatVersion::V2,
1677            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1678            location: "s3://b/wh/data.db/table".to_string(),
1679            last_updated_ms: 1515100955770,
1680            last_column_id: 1,
1681            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1682            current_schema_id: 1,
1683            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1684            default_partition_type,
1685            default_spec: partition_spec.into(),
1686            last_partition_id: 1000,
1687            default_sort_order_id: 0,
1688            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1689            snapshots: HashMap::default(),
1690            current_snapshot_id: None,
1691            last_sequence_number: 1,
1692            properties: HashMap::from_iter(vec![(
1693                "commit.retry.num-retries".to_string(),
1694                "1".to_string(),
1695            )]),
1696            snapshot_log: Vec::new(),
1697            metadata_log: vec![MetadataLog {
1698                metadata_file: "s3://bucket/.../v1.json".to_string(),
1699                timestamp_ms: 1515100,
1700            }],
1701            refs: HashMap::new(),
1702            statistics: HashMap::new(),
1703            partition_statistics: HashMap::new(),
1704            encryption_keys: HashMap::new(),
1705            next_row_id: INITIAL_ROW_ID,
1706        };
1707
1708        let expected_json_value = serde_json::to_value(&expected).unwrap();
1709        check_table_metadata_serde(data, expected);
1710
1711        let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1712        assert_eq!(json_value, expected_json_value);
1713    }
1714
1715    #[test]
1716    fn test_table_data_v3() {
1717        let data = r#"
1718            {
1719                "format-version" : 3,
1720                "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1721                "location": "s3://b/wh/data.db/table",
1722                "last-sequence-number" : 1,
1723                "last-updated-ms": 1515100955770,
1724                "last-column-id": 1,
1725                "next-row-id": 5,
1726                "schemas": [
1727                    {
1728                        "schema-id" : 1,
1729                        "type" : "struct",
1730                        "fields" :[
1731                            {
1732                                "id": 4,
1733                                "name": "ts",
1734                                "required": true,
1735                                "type": "timestamp"
1736                            }
1737                        ]
1738                    }
1739                ],
1740                "current-schema-id" : 1,
1741                "partition-specs": [
1742                    {
1743                        "spec-id": 0,
1744                        "fields": [
1745                            {
1746                                "source-id": 4,
1747                                "field-id": 1000,
1748                                "name": "ts_day",
1749                                "transform": "day"
1750                            }
1751                        ]
1752                    }
1753                ],
1754                "default-spec-id": 0,
1755                "last-partition-id": 1000,
1756                "properties": {
1757                    "commit.retry.num-retries": "1"
1758                },
1759                "metadata-log": [
1760                    {
1761                        "metadata-file": "s3://bucket/.../v1.json",
1762                        "timestamp-ms": 1515100
1763                    }
1764                ],
1765                "refs": {},
1766                "snapshots" : [ {
1767                    "snapshot-id" : 1,
1768                    "timestamp-ms" : 1662532818843,
1769                    "sequence-number" : 0,
1770                    "first-row-id" : 0,
1771                    "added-rows" : 4,
1772                    "key-id" : "key1",
1773                    "summary" : {
1774                        "operation" : "append"
1775                    },
1776                    "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1777                    "schema-id" : 0
1778                    }
1779                ],
1780                "encryption-keys": [
1781                    {
1782                        "key-id": "key1",
1783                        "encrypted-by-id": "KMS",
1784                        "encrypted-key-metadata": "c29tZS1lbmNyeXB0aW9uLWtleQ==",
1785                        "properties": {
1786                            "p1": "v1"
1787                        }
1788                    }
1789                ],
1790                "sort-orders": [
1791                    {
1792                    "order-id": 0,
1793                    "fields": []
1794                    }
1795                ],
1796                "default-sort-order-id": 0
1797            }
1798        "#;
1799
1800        let schema = Schema::builder()
1801            .with_schema_id(1)
1802            .with_fields(vec![Arc::new(NestedField::required(
1803                4,
1804                "ts",
1805                Type::Primitive(PrimitiveType::Timestamp),
1806            ))])
1807            .build()
1808            .unwrap();
1809
1810        let partition_spec = PartitionSpec::builder(schema.clone())
1811            .with_spec_id(0)
1812            .add_unbound_field(UnboundPartitionField {
1813                name: "ts_day".to_string(),
1814                transform: Transform::Day,
1815                source_id: 4,
1816                field_id: Some(1000),
1817            })
1818            .unwrap()
1819            .build()
1820            .unwrap();
1821
1822        let snapshot = Snapshot::builder()
1823            .with_snapshot_id(1)
1824            .with_timestamp_ms(1662532818843)
1825            .with_sequence_number(0)
1826            .with_row_range(0, 4)
1827            .with_encryption_key_id(Some("key1".to_string()))
1828            .with_summary(Summary {
1829                operation: Operation::Append,
1830                additional_properties: HashMap::new(),
1831            })
1832            .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())
1833            .with_schema_id(0)
1834            .build();
1835
1836        let encryption_key = EncryptedKey::builder()
1837            .key_id("key1".to_string())
1838            .encrypted_by_id("KMS".to_string())
1839            .encrypted_key_metadata(
1840                base64::prelude::BASE64_STANDARD
1841                    .decode("c29tZS1lbmNyeXB0aW9uLWtleQ==")
1842                    .unwrap(),
1843            )
1844            .properties(HashMap::from_iter(vec![(
1845                "p1".to_string(),
1846                "v1".to_string(),
1847            )]))
1848            .build();
1849
1850        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1851        let expected = TableMetadata {
1852            format_version: FormatVersion::V3,
1853            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1854            location: "s3://b/wh/data.db/table".to_string(),
1855            last_updated_ms: 1515100955770,
1856            last_column_id: 1,
1857            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1858            current_schema_id: 1,
1859            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1860            default_partition_type,
1861            default_spec: partition_spec.into(),
1862            last_partition_id: 1000,
1863            default_sort_order_id: 0,
1864            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1865            snapshots: HashMap::from_iter(vec![(1, snapshot.into())]),
1866            current_snapshot_id: None,
1867            last_sequence_number: 1,
1868            properties: HashMap::from_iter(vec![(
1869                "commit.retry.num-retries".to_string(),
1870                "1".to_string(),
1871            )]),
1872            snapshot_log: Vec::new(),
1873            metadata_log: vec![MetadataLog {
1874                metadata_file: "s3://bucket/.../v1.json".to_string(),
1875                timestamp_ms: 1515100,
1876            }],
1877            refs: HashMap::new(),
1878            statistics: HashMap::new(),
1879            partition_statistics: HashMap::new(),
1880            encryption_keys: HashMap::from_iter(vec![("key1".to_string(), encryption_key)]),
1881            next_row_id: 5,
1882        };
1883
1884        let expected_json_value = serde_json::to_value(&expected).unwrap();
1885        check_table_metadata_serde(data, expected);
1886
1887        let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1888        assert_eq!(json_value, expected_json_value);
1889    }
1890
1891    #[test]
1892    fn test_table_data_v1() {
1893        let data = r#"
1894        {
1895            "format-version" : 1,
1896            "table-uuid" : "df838b92-0b32-465d-a44e-d39936e538b7",
1897            "location" : "/home/iceberg/warehouse/nyc/taxis",
1898            "last-updated-ms" : 1662532818843,
1899            "last-column-id" : 5,
1900            "schema" : {
1901              "type" : "struct",
1902              "schema-id" : 0,
1903              "fields" : [ {
1904                "id" : 1,
1905                "name" : "vendor_id",
1906                "required" : false,
1907                "type" : "long"
1908              }, {
1909                "id" : 2,
1910                "name" : "trip_id",
1911                "required" : false,
1912                "type" : "long"
1913              }, {
1914                "id" : 3,
1915                "name" : "trip_distance",
1916                "required" : false,
1917                "type" : "float"
1918              }, {
1919                "id" : 4,
1920                "name" : "fare_amount",
1921                "required" : false,
1922                "type" : "double"
1923              }, {
1924                "id" : 5,
1925                "name" : "store_and_fwd_flag",
1926                "required" : false,
1927                "type" : "string"
1928              } ]
1929            },
1930            "partition-spec" : [ {
1931              "name" : "vendor_id",
1932              "transform" : "identity",
1933              "source-id" : 1,
1934              "field-id" : 1000
1935            } ],
1936            "last-partition-id" : 1000,
1937            "default-sort-order-id" : 0,
1938            "sort-orders" : [ {
1939              "order-id" : 0,
1940              "fields" : [ ]
1941            } ],
1942            "properties" : {
1943              "owner" : "root"
1944            },
1945            "current-snapshot-id" : 638933773299822130,
1946            "refs" : {
1947              "main" : {
1948                "snapshot-id" : 638933773299822130,
1949                "type" : "branch"
1950              }
1951            },
1952            "snapshots" : [ {
1953              "snapshot-id" : 638933773299822130,
1954              "timestamp-ms" : 1662532818843,
1955              "sequence-number" : 0,
1956              "summary" : {
1957                "operation" : "append",
1958                "spark.app.id" : "local-1662532784305",
1959                "added-data-files" : "4",
1960                "added-records" : "4",
1961                "added-files-size" : "6001"
1962              },
1963              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1964              "schema-id" : 0
1965            } ],
1966            "snapshot-log" : [ {
1967              "timestamp-ms" : 1662532818843,
1968              "snapshot-id" : 638933773299822130
1969            } ],
1970            "metadata-log" : [ {
1971              "timestamp-ms" : 1662532805245,
1972              "metadata-file" : "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json"
1973            } ]
1974          }
1975        "#;
1976
1977        let schema = Schema::builder()
1978            .with_fields(vec![
1979                Arc::new(NestedField::optional(
1980                    1,
1981                    "vendor_id",
1982                    Type::Primitive(PrimitiveType::Long),
1983                )),
1984                Arc::new(NestedField::optional(
1985                    2,
1986                    "trip_id",
1987                    Type::Primitive(PrimitiveType::Long),
1988                )),
1989                Arc::new(NestedField::optional(
1990                    3,
1991                    "trip_distance",
1992                    Type::Primitive(PrimitiveType::Float),
1993                )),
1994                Arc::new(NestedField::optional(
1995                    4,
1996                    "fare_amount",
1997                    Type::Primitive(PrimitiveType::Double),
1998                )),
1999                Arc::new(NestedField::optional(
2000                    5,
2001                    "store_and_fwd_flag",
2002                    Type::Primitive(PrimitiveType::String),
2003                )),
2004            ])
2005            .build()
2006            .unwrap();
2007
2008        let schema = Arc::new(schema);
2009        let partition_spec = PartitionSpec::builder(schema.clone())
2010            .with_spec_id(0)
2011            .add_partition_field("vendor_id", "vendor_id", Transform::Identity)
2012            .unwrap()
2013            .build()
2014            .unwrap();
2015
2016        let sort_order = SortOrder::builder()
2017            .with_order_id(0)
2018            .build_unbound()
2019            .unwrap();
2020
2021        let snapshot = Snapshot::builder()
2022            .with_snapshot_id(638933773299822130)
2023            .with_timestamp_ms(1662532818843)
2024            .with_sequence_number(0)
2025            .with_schema_id(0)
2026            .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
2027            .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
2028            .build();
2029
2030        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2031        let expected = TableMetadata {
2032            format_version: FormatVersion::V1,
2033            table_uuid: Uuid::parse_str("df838b92-0b32-465d-a44e-d39936e538b7").unwrap(),
2034            location: "/home/iceberg/warehouse/nyc/taxis".to_string(),
2035            last_updated_ms: 1662532818843,
2036            last_column_id: 5,
2037            schemas: HashMap::from_iter(vec![(0, schema)]),
2038            current_schema_id: 0,
2039            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2040            default_partition_type,
2041            default_spec: Arc::new(partition_spec),
2042            last_partition_id: 1000,
2043            default_sort_order_id: 0,
2044            sort_orders: HashMap::from_iter(vec![(0, sort_order.into())]),
2045            snapshots: HashMap::from_iter(vec![(638933773299822130, Arc::new(snapshot))]),
2046            current_snapshot_id: Some(638933773299822130),
2047            last_sequence_number: 0,
2048            properties: HashMap::from_iter(vec![("owner".to_string(), "root".to_string())]),
2049            snapshot_log: vec![SnapshotLog {
2050                snapshot_id: 638933773299822130,
2051                timestamp_ms: 1662532818843,
2052            }],
2053            metadata_log: vec![MetadataLog { metadata_file: "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string(), timestamp_ms: 1662532805245 }],
2054            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference { snapshot_id: 638933773299822130, retention: SnapshotRetention::Branch { min_snapshots_to_keep: None, max_snapshot_age_ms: None, max_ref_age_ms: None } })]),
2055            statistics: HashMap::new(),
2056            partition_statistics: HashMap::new(),
2057            encryption_keys: HashMap::new(),
2058            next_row_id: INITIAL_ROW_ID,
2059        };
2060
2061        check_table_metadata_serde(data, expected);
2062    }
2063
2064    #[test]
2065    fn test_table_data_v2_no_snapshots() {
2066        let data = r#"
2067        {
2068            "format-version" : 2,
2069            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2070            "location": "s3://b/wh/data.db/table",
2071            "last-sequence-number" : 1,
2072            "last-updated-ms": 1515100955770,
2073            "last-column-id": 1,
2074            "schemas": [
2075                {
2076                    "schema-id" : 1,
2077                    "type" : "struct",
2078                    "fields" :[
2079                        {
2080                            "id": 1,
2081                            "name": "struct_name",
2082                            "required": true,
2083                            "type": "fixed[1]"
2084                        }
2085                    ]
2086                }
2087            ],
2088            "current-schema-id" : 1,
2089            "partition-specs": [
2090                {
2091                    "spec-id": 0,
2092                    "fields": []
2093                }
2094            ],
2095            "refs": {},
2096            "default-spec-id": 0,
2097            "last-partition-id": 1000,
2098            "metadata-log": [
2099                {
2100                    "metadata-file": "s3://bucket/.../v1.json",
2101                    "timestamp-ms": 1515100
2102                }
2103            ],
2104            "sort-orders": [
2105                {
2106                "order-id": 0,
2107                "fields": []
2108                }
2109            ],
2110            "default-sort-order-id": 0
2111        }
2112        "#;
2113
2114        let schema = Schema::builder()
2115            .with_schema_id(1)
2116            .with_fields(vec![Arc::new(NestedField::required(
2117                1,
2118                "struct_name",
2119                Type::Primitive(PrimitiveType::Fixed(1)),
2120            ))])
2121            .build()
2122            .unwrap();
2123
2124        let partition_spec = PartitionSpec::builder(schema.clone())
2125            .with_spec_id(0)
2126            .build()
2127            .unwrap();
2128
2129        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2130        let expected = TableMetadata {
2131            format_version: FormatVersion::V2,
2132            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
2133            location: "s3://b/wh/data.db/table".to_string(),
2134            last_updated_ms: 1515100955770,
2135            last_column_id: 1,
2136            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
2137            current_schema_id: 1,
2138            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2139            default_partition_type,
2140            default_spec: partition_spec.into(),
2141            last_partition_id: 1000,
2142            default_sort_order_id: 0,
2143            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2144            snapshots: HashMap::default(),
2145            current_snapshot_id: None,
2146            last_sequence_number: 1,
2147            properties: HashMap::new(),
2148            snapshot_log: Vec::new(),
2149            metadata_log: vec![MetadataLog {
2150                metadata_file: "s3://bucket/.../v1.json".to_string(),
2151                timestamp_ms: 1515100,
2152            }],
2153            refs: HashMap::new(),
2154            statistics: HashMap::new(),
2155            partition_statistics: HashMap::new(),
2156            encryption_keys: HashMap::new(),
2157            next_row_id: INITIAL_ROW_ID,
2158        };
2159
2160        let expected_json_value = serde_json::to_value(&expected).unwrap();
2161        check_table_metadata_serde(data, expected);
2162
2163        let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
2164        assert_eq!(json_value, expected_json_value);
2165    }
2166
2167    #[test]
2168    fn test_current_snapshot_id_must_match_main_branch() {
2169        let data = r#"
2170        {
2171            "format-version" : 2,
2172            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2173            "location": "s3://b/wh/data.db/table",
2174            "last-sequence-number" : 1,
2175            "last-updated-ms": 1515100955770,
2176            "last-column-id": 1,
2177            "schemas": [
2178                {
2179                    "schema-id" : 1,
2180                    "type" : "struct",
2181                    "fields" :[
2182                        {
2183                            "id": 1,
2184                            "name": "struct_name",
2185                            "required": true,
2186                            "type": "fixed[1]"
2187                        },
2188                        {
2189                            "id": 4,
2190                            "name": "ts",
2191                            "required": true,
2192                            "type": "timestamp"
2193                        }
2194                    ]
2195                }
2196            ],
2197            "current-schema-id" : 1,
2198            "partition-specs": [
2199                {
2200                    "spec-id": 0,
2201                    "fields": [
2202                        {
2203                            "source-id": 4,
2204                            "field-id": 1000,
2205                            "name": "ts_day",
2206                            "transform": "day"
2207                        }
2208                    ]
2209                }
2210            ],
2211            "default-spec-id": 0,
2212            "last-partition-id": 1000,
2213            "properties": {
2214                "commit.retry.num-retries": "1"
2215            },
2216            "metadata-log": [
2217                {
2218                    "metadata-file": "s3://bucket/.../v1.json",
2219                    "timestamp-ms": 1515100
2220                }
2221            ],
2222            "sort-orders": [
2223                {
2224                "order-id": 0,
2225                "fields": []
2226                }
2227            ],
2228            "default-sort-order-id": 0,
2229            "current-snapshot-id" : 1,
2230            "refs" : {
2231              "main" : {
2232                "snapshot-id" : 2,
2233                "type" : "branch"
2234              }
2235            },
2236            "snapshots" : [ {
2237              "snapshot-id" : 1,
2238              "timestamp-ms" : 1662532818843,
2239              "sequence-number" : 0,
2240              "summary" : {
2241                "operation" : "append",
2242                "spark.app.id" : "local-1662532784305",
2243                "added-data-files" : "4",
2244                "added-records" : "4",
2245                "added-files-size" : "6001"
2246              },
2247              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2248              "schema-id" : 0
2249            },
2250            {
2251              "snapshot-id" : 2,
2252              "timestamp-ms" : 1662532818844,
2253              "sequence-number" : 0,
2254              "summary" : {
2255                "operation" : "append",
2256                "spark.app.id" : "local-1662532784305",
2257                "added-data-files" : "4",
2258                "added-records" : "4",
2259                "added-files-size" : "6001"
2260              },
2261              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2262              "schema-id" : 0
2263            } ]
2264        }
2265    "#;
2266
2267        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2268        assert!(
2269            err.to_string()
2270                .contains("Current snapshot id does not match main branch")
2271        );
2272    }
2273
2274    #[test]
2275    fn test_main_without_current() {
2276        let data = r#"
2277        {
2278            "format-version" : 2,
2279            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2280            "location": "s3://b/wh/data.db/table",
2281            "last-sequence-number" : 1,
2282            "last-updated-ms": 1515100955770,
2283            "last-column-id": 1,
2284            "schemas": [
2285                {
2286                    "schema-id" : 1,
2287                    "type" : "struct",
2288                    "fields" :[
2289                        {
2290                            "id": 1,
2291                            "name": "struct_name",
2292                            "required": true,
2293                            "type": "fixed[1]"
2294                        },
2295                        {
2296                            "id": 4,
2297                            "name": "ts",
2298                            "required": true,
2299                            "type": "timestamp"
2300                        }
2301                    ]
2302                }
2303            ],
2304            "current-schema-id" : 1,
2305            "partition-specs": [
2306                {
2307                    "spec-id": 0,
2308                    "fields": [
2309                        {
2310                            "source-id": 4,
2311                            "field-id": 1000,
2312                            "name": "ts_day",
2313                            "transform": "day"
2314                        }
2315                    ]
2316                }
2317            ],
2318            "default-spec-id": 0,
2319            "last-partition-id": 1000,
2320            "properties": {
2321                "commit.retry.num-retries": "1"
2322            },
2323            "metadata-log": [
2324                {
2325                    "metadata-file": "s3://bucket/.../v1.json",
2326                    "timestamp-ms": 1515100
2327                }
2328            ],
2329            "sort-orders": [
2330                {
2331                "order-id": 0,
2332                "fields": []
2333                }
2334            ],
2335            "default-sort-order-id": 0,
2336            "refs" : {
2337              "main" : {
2338                "snapshot-id" : 1,
2339                "type" : "branch"
2340              }
2341            },
2342            "snapshots" : [ {
2343              "snapshot-id" : 1,
2344              "timestamp-ms" : 1662532818843,
2345              "sequence-number" : 0,
2346              "summary" : {
2347                "operation" : "append",
2348                "spark.app.id" : "local-1662532784305",
2349                "added-data-files" : "4",
2350                "added-records" : "4",
2351                "added-files-size" : "6001"
2352              },
2353              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2354              "schema-id" : 0
2355            } ]
2356        }
2357    "#;
2358
2359        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2360        assert!(
2361            err.to_string()
2362                .contains("Current snapshot is not set, but main branch exists")
2363        );
2364    }
2365
2366    #[test]
2367    fn test_branch_snapshot_missing() {
2368        let data = r#"
2369        {
2370            "format-version" : 2,
2371            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2372            "location": "s3://b/wh/data.db/table",
2373            "last-sequence-number" : 1,
2374            "last-updated-ms": 1515100955770,
2375            "last-column-id": 1,
2376            "schemas": [
2377                {
2378                    "schema-id" : 1,
2379                    "type" : "struct",
2380                    "fields" :[
2381                        {
2382                            "id": 1,
2383                            "name": "struct_name",
2384                            "required": true,
2385                            "type": "fixed[1]"
2386                        },
2387                        {
2388                            "id": 4,
2389                            "name": "ts",
2390                            "required": true,
2391                            "type": "timestamp"
2392                        }
2393                    ]
2394                }
2395            ],
2396            "current-schema-id" : 1,
2397            "partition-specs": [
2398                {
2399                    "spec-id": 0,
2400                    "fields": [
2401                        {
2402                            "source-id": 4,
2403                            "field-id": 1000,
2404                            "name": "ts_day",
2405                            "transform": "day"
2406                        }
2407                    ]
2408                }
2409            ],
2410            "default-spec-id": 0,
2411            "last-partition-id": 1000,
2412            "properties": {
2413                "commit.retry.num-retries": "1"
2414            },
2415            "metadata-log": [
2416                {
2417                    "metadata-file": "s3://bucket/.../v1.json",
2418                    "timestamp-ms": 1515100
2419                }
2420            ],
2421            "sort-orders": [
2422                {
2423                "order-id": 0,
2424                "fields": []
2425                }
2426            ],
2427            "default-sort-order-id": 0,
2428            "refs" : {
2429              "main" : {
2430                "snapshot-id" : 1,
2431                "type" : "branch"
2432              },
2433              "foo" : {
2434                "snapshot-id" : 2,
2435                "type" : "branch"
2436              }
2437            },
2438            "snapshots" : [ {
2439              "snapshot-id" : 1,
2440              "timestamp-ms" : 1662532818843,
2441              "sequence-number" : 0,
2442              "summary" : {
2443                "operation" : "append",
2444                "spark.app.id" : "local-1662532784305",
2445                "added-data-files" : "4",
2446                "added-records" : "4",
2447                "added-files-size" : "6001"
2448              },
2449              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2450              "schema-id" : 0
2451            } ]
2452        }
2453    "#;
2454
2455        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2456        assert!(
2457            err.to_string().contains(
2458                "Snapshot for reference foo does not exist in the existing snapshots list"
2459            )
2460        );
2461    }
2462
2463    #[test]
2464    fn test_v2_wrong_max_snapshot_sequence_number() {
2465        let data = r#"
2466        {
2467            "format-version": 2,
2468            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2469            "location": "s3://bucket/test/location",
2470            "last-sequence-number": 1,
2471            "last-updated-ms": 1602638573590,
2472            "last-column-id": 3,
2473            "current-schema-id": 0,
2474            "schemas": [
2475                {
2476                    "type": "struct",
2477                    "schema-id": 0,
2478                    "fields": [
2479                        {
2480                            "id": 1,
2481                            "name": "x",
2482                            "required": true,
2483                            "type": "long"
2484                        }
2485                    ]
2486                }
2487            ],
2488            "default-spec-id": 0,
2489            "partition-specs": [
2490                {
2491                    "spec-id": 0,
2492                    "fields": []
2493                }
2494            ],
2495            "last-partition-id": 1000,
2496            "default-sort-order-id": 0,
2497            "sort-orders": [
2498                {
2499                    "order-id": 0,
2500                    "fields": []
2501                }
2502            ],
2503            "properties": {},
2504            "current-snapshot-id": 3055729675574597004,
2505            "snapshots": [
2506                {
2507                    "snapshot-id": 3055729675574597004,
2508                    "timestamp-ms": 1555100955770,
2509                    "sequence-number": 4,
2510                    "summary": {
2511                        "operation": "append"
2512                    },
2513                    "manifest-list": "s3://a/b/2.avro",
2514                    "schema-id": 0
2515                }
2516            ],
2517            "statistics": [],
2518            "snapshot-log": [],
2519            "metadata-log": []
2520        }
2521    "#;
2522
2523        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2524        assert!(err.to_string().contains(
2525            "Invalid snapshot with id 3055729675574597004 and sequence number 4 greater than last sequence number 1"
2526        ));
2527
2528        // Change max sequence number to 4 - should work
2529        let data = data.replace(
2530            r#""last-sequence-number": 1,"#,
2531            r#""last-sequence-number": 4,"#,
2532        );
2533        let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2534        assert_eq!(metadata.last_sequence_number, 4);
2535
2536        // Change max sequence number to 5 - should work
2537        let data = data.replace(
2538            r#""last-sequence-number": 4,"#,
2539            r#""last-sequence-number": 5,"#,
2540        );
2541        let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2542        assert_eq!(metadata.last_sequence_number, 5);
2543    }
2544
2545    #[test]
2546    fn test_statistic_files() {
2547        let data = r#"
2548        {
2549            "format-version": 2,
2550            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2551            "location": "s3://bucket/test/location",
2552            "last-sequence-number": 34,
2553            "last-updated-ms": 1602638573590,
2554            "last-column-id": 3,
2555            "current-schema-id": 0,
2556            "schemas": [
2557                {
2558                    "type": "struct",
2559                    "schema-id": 0,
2560                    "fields": [
2561                        {
2562                            "id": 1,
2563                            "name": "x",
2564                            "required": true,
2565                            "type": "long"
2566                        }
2567                    ]
2568                }
2569            ],
2570            "default-spec-id": 0,
2571            "partition-specs": [
2572                {
2573                    "spec-id": 0,
2574                    "fields": []
2575                }
2576            ],
2577            "last-partition-id": 1000,
2578            "default-sort-order-id": 0,
2579            "sort-orders": [
2580                {
2581                    "order-id": 0,
2582                    "fields": []
2583                }
2584            ],
2585            "properties": {},
2586            "current-snapshot-id": 3055729675574597004,
2587            "snapshots": [
2588                {
2589                    "snapshot-id": 3055729675574597004,
2590                    "timestamp-ms": 1555100955770,
2591                    "sequence-number": 1,
2592                    "summary": {
2593                        "operation": "append"
2594                    },
2595                    "manifest-list": "s3://a/b/2.avro",
2596                    "schema-id": 0
2597                }
2598            ],
2599            "statistics": [
2600                {
2601                    "snapshot-id": 3055729675574597004,
2602                    "statistics-path": "s3://a/b/stats.puffin",
2603                    "file-size-in-bytes": 413,
2604                    "file-footer-size-in-bytes": 42,
2605                    "blob-metadata": [
2606                        {
2607                            "type": "ndv",
2608                            "snapshot-id": 3055729675574597004,
2609                            "sequence-number": 1,
2610                            "fields": [
2611                                1
2612                            ]
2613                        }
2614                    ]
2615                }
2616            ],
2617            "snapshot-log": [],
2618            "metadata-log": []
2619        }
2620    "#;
2621
2622        let schema = Schema::builder()
2623            .with_schema_id(0)
2624            .with_fields(vec![Arc::new(NestedField::required(
2625                1,
2626                "x",
2627                Type::Primitive(PrimitiveType::Long),
2628            ))])
2629            .build()
2630            .unwrap();
2631        let partition_spec = PartitionSpec::builder(schema.clone())
2632            .with_spec_id(0)
2633            .build()
2634            .unwrap();
2635        let snapshot = Snapshot::builder()
2636            .with_snapshot_id(3055729675574597004)
2637            .with_timestamp_ms(1555100955770)
2638            .with_sequence_number(1)
2639            .with_manifest_list("s3://a/b/2.avro")
2640            .with_schema_id(0)
2641            .with_summary(Summary {
2642                operation: Operation::Append,
2643                additional_properties: HashMap::new(),
2644            })
2645            .build();
2646
2647        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2648        let expected = TableMetadata {
2649            format_version: FormatVersion::V2,
2650            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2651            location: "s3://bucket/test/location".to_string(),
2652            last_updated_ms: 1602638573590,
2653            last_column_id: 3,
2654            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2655            current_schema_id: 0,
2656            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2657            default_partition_type,
2658            default_spec: Arc::new(partition_spec),
2659            last_partition_id: 1000,
2660            default_sort_order_id: 0,
2661            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2662            snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2663            current_snapshot_id: Some(3055729675574597004),
2664            last_sequence_number: 34,
2665            properties: HashMap::new(),
2666            snapshot_log: Vec::new(),
2667            metadata_log: Vec::new(),
2668            statistics: HashMap::from_iter(vec![(3055729675574597004, StatisticsFile {
2669                snapshot_id: 3055729675574597004,
2670                statistics_path: "s3://a/b/stats.puffin".to_string(),
2671                file_size_in_bytes: 413,
2672                file_footer_size_in_bytes: 42,
2673                key_metadata: None,
2674                blob_metadata: vec![BlobMetadata {
2675                    snapshot_id: 3055729675574597004,
2676                    sequence_number: 1,
2677                    fields: vec![1],
2678                    r#type: "ndv".to_string(),
2679                    properties: HashMap::new(),
2680                }],
2681            })]),
2682            partition_statistics: HashMap::new(),
2683            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2684                snapshot_id: 3055729675574597004,
2685                retention: SnapshotRetention::Branch {
2686                    min_snapshots_to_keep: None,
2687                    max_snapshot_age_ms: None,
2688                    max_ref_age_ms: None,
2689                },
2690            })]),
2691            encryption_keys: HashMap::new(),
2692            next_row_id: INITIAL_ROW_ID,
2693        };
2694
2695        check_table_metadata_serde(data, expected);
2696    }
2697
2698    #[test]
2699    fn test_partition_statistics_file() {
2700        let data = r#"
2701        {
2702            "format-version": 2,
2703            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2704            "location": "s3://bucket/test/location",
2705            "last-sequence-number": 34,
2706            "last-updated-ms": 1602638573590,
2707            "last-column-id": 3,
2708            "current-schema-id": 0,
2709            "schemas": [
2710                {
2711                    "type": "struct",
2712                    "schema-id": 0,
2713                    "fields": [
2714                        {
2715                            "id": 1,
2716                            "name": "x",
2717                            "required": true,
2718                            "type": "long"
2719                        }
2720                    ]
2721                }
2722            ],
2723            "default-spec-id": 0,
2724            "partition-specs": [
2725                {
2726                    "spec-id": 0,
2727                    "fields": []
2728                }
2729            ],
2730            "last-partition-id": 1000,
2731            "default-sort-order-id": 0,
2732            "sort-orders": [
2733                {
2734                    "order-id": 0,
2735                    "fields": []
2736                }
2737            ],
2738            "properties": {},
2739            "current-snapshot-id": 3055729675574597004,
2740            "snapshots": [
2741                {
2742                    "snapshot-id": 3055729675574597004,
2743                    "timestamp-ms": 1555100955770,
2744                    "sequence-number": 1,
2745                    "summary": {
2746                        "operation": "append"
2747                    },
2748                    "manifest-list": "s3://a/b/2.avro",
2749                    "schema-id": 0
2750                }
2751            ],
2752            "partition-statistics": [
2753                {
2754                    "snapshot-id": 3055729675574597004,
2755                    "statistics-path": "s3://a/b/partition-stats.parquet",
2756                    "file-size-in-bytes": 43
2757                }
2758            ],
2759            "snapshot-log": [],
2760            "metadata-log": []
2761        }
2762        "#;
2763
2764        let schema = Schema::builder()
2765            .with_schema_id(0)
2766            .with_fields(vec![Arc::new(NestedField::required(
2767                1,
2768                "x",
2769                Type::Primitive(PrimitiveType::Long),
2770            ))])
2771            .build()
2772            .unwrap();
2773        let partition_spec = PartitionSpec::builder(schema.clone())
2774            .with_spec_id(0)
2775            .build()
2776            .unwrap();
2777        let snapshot = Snapshot::builder()
2778            .with_snapshot_id(3055729675574597004)
2779            .with_timestamp_ms(1555100955770)
2780            .with_sequence_number(1)
2781            .with_manifest_list("s3://a/b/2.avro")
2782            .with_schema_id(0)
2783            .with_summary(Summary {
2784                operation: Operation::Append,
2785                additional_properties: HashMap::new(),
2786            })
2787            .build();
2788
2789        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2790        let expected = TableMetadata {
2791            format_version: FormatVersion::V2,
2792            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2793            location: "s3://bucket/test/location".to_string(),
2794            last_updated_ms: 1602638573590,
2795            last_column_id: 3,
2796            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2797            current_schema_id: 0,
2798            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2799            default_spec: Arc::new(partition_spec),
2800            default_partition_type,
2801            last_partition_id: 1000,
2802            default_sort_order_id: 0,
2803            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2804            snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2805            current_snapshot_id: Some(3055729675574597004),
2806            last_sequence_number: 34,
2807            properties: HashMap::new(),
2808            snapshot_log: Vec::new(),
2809            metadata_log: Vec::new(),
2810            statistics: HashMap::new(),
2811            partition_statistics: HashMap::from_iter(vec![(
2812                3055729675574597004,
2813                PartitionStatisticsFile {
2814                    snapshot_id: 3055729675574597004,
2815                    statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2816                    file_size_in_bytes: 43,
2817                },
2818            )]),
2819            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2820                snapshot_id: 3055729675574597004,
2821                retention: SnapshotRetention::Branch {
2822                    min_snapshots_to_keep: None,
2823                    max_snapshot_age_ms: None,
2824                    max_ref_age_ms: None,
2825                },
2826            })]),
2827            encryption_keys: HashMap::new(),
2828            next_row_id: INITIAL_ROW_ID,
2829        };
2830
2831        check_table_metadata_serde(data, expected);
2832    }
2833
2834    #[test]
2835    fn test_invalid_table_uuid() -> Result<()> {
2836        let data = r#"
2837            {
2838                "format-version" : 2,
2839                "table-uuid": "xxxx"
2840            }
2841        "#;
2842        assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2843        Ok(())
2844    }
2845
2846    #[test]
2847    fn test_deserialize_table_data_v2_invalid_format_version() -> Result<()> {
2848        let data = r#"
2849            {
2850                "format-version" : 1
2851            }
2852        "#;
2853        assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2854        Ok(())
2855    }
2856
2857    #[test]
2858    fn test_table_metadata_v3_valid_minimal() {
2859        let metadata_str =
2860            fs::read_to_string("testdata/table_metadata/TableMetadataV3ValidMinimal.json").unwrap();
2861
2862        let table_metadata = serde_json::from_str::<TableMetadata>(&metadata_str).unwrap();
2863        assert_eq!(table_metadata.format_version, FormatVersion::V3);
2864
2865        let schema = Schema::builder()
2866            .with_schema_id(0)
2867            .with_fields(vec![
2868                Arc::new(
2869                    NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long))
2870                        .with_initial_default(Literal::Primitive(PrimitiveLiteral::Long(1)))
2871                        .with_write_default(Literal::Primitive(PrimitiveLiteral::Long(1))),
2872                ),
2873                Arc::new(
2874                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2875                        .with_doc("comment"),
2876                ),
2877                Arc::new(NestedField::required(
2878                    3,
2879                    "z",
2880                    Type::Primitive(PrimitiveType::Long),
2881                )),
2882            ])
2883            .build()
2884            .unwrap();
2885
2886        let partition_spec = PartitionSpec::builder(schema.clone())
2887            .with_spec_id(0)
2888            .add_unbound_field(UnboundPartitionField {
2889                name: "x".to_string(),
2890                transform: Transform::Identity,
2891                source_id: 1,
2892                field_id: Some(1000),
2893            })
2894            .unwrap()
2895            .build()
2896            .unwrap();
2897
2898        let sort_order = SortOrder::builder()
2899            .with_order_id(3)
2900            .with_sort_field(SortField {
2901                source_id: 2,
2902                transform: Transform::Identity,
2903                direction: SortDirection::Ascending,
2904                null_order: NullOrder::First,
2905            })
2906            .with_sort_field(SortField {
2907                source_id: 3,
2908                transform: Transform::Bucket(4),
2909                direction: SortDirection::Descending,
2910                null_order: NullOrder::Last,
2911            })
2912            .build_unbound()
2913            .unwrap();
2914
2915        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2916        let expected = TableMetadata {
2917            format_version: FormatVersion::V3,
2918            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2919            location: "s3://bucket/test/location".to_string(),
2920            last_updated_ms: 1602638573590,
2921            last_column_id: 3,
2922            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2923            current_schema_id: 0,
2924            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2925            default_spec: Arc::new(partition_spec),
2926            default_partition_type,
2927            last_partition_id: 1000,
2928            default_sort_order_id: 3,
2929            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
2930            snapshots: HashMap::default(),
2931            current_snapshot_id: None,
2932            last_sequence_number: 34,
2933            properties: HashMap::new(),
2934            snapshot_log: Vec::new(),
2935            metadata_log: Vec::new(),
2936            refs: HashMap::new(),
2937            statistics: HashMap::new(),
2938            partition_statistics: HashMap::new(),
2939            encryption_keys: HashMap::new(),
2940            next_row_id: 0, // V3 specific field from the JSON
2941        };
2942
2943        check_table_metadata_serde(&metadata_str, expected);
2944    }
2945
2946    #[test]
2947    fn test_table_metadata_v2_file_valid() {
2948        let metadata =
2949            fs::read_to_string("testdata/table_metadata/TableMetadataV2Valid.json").unwrap();
2950
2951        let schema1 = Schema::builder()
2952            .with_schema_id(0)
2953            .with_fields(vec![Arc::new(NestedField::required(
2954                1,
2955                "x",
2956                Type::Primitive(PrimitiveType::Long),
2957            ))])
2958            .build()
2959            .unwrap();
2960
2961        let schema2 = Schema::builder()
2962            .with_schema_id(1)
2963            .with_fields(vec![
2964                Arc::new(NestedField::required(
2965                    1,
2966                    "x",
2967                    Type::Primitive(PrimitiveType::Long),
2968                )),
2969                Arc::new(
2970                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2971                        .with_doc("comment"),
2972                ),
2973                Arc::new(NestedField::required(
2974                    3,
2975                    "z",
2976                    Type::Primitive(PrimitiveType::Long),
2977                )),
2978            ])
2979            .with_identifier_field_ids(vec![1, 2])
2980            .build()
2981            .unwrap();
2982
2983        let partition_spec = PartitionSpec::builder(schema2.clone())
2984            .with_spec_id(0)
2985            .add_unbound_field(UnboundPartitionField {
2986                name: "x".to_string(),
2987                transform: Transform::Identity,
2988                source_id: 1,
2989                field_id: Some(1000),
2990            })
2991            .unwrap()
2992            .build()
2993            .unwrap();
2994
2995        let sort_order = SortOrder::builder()
2996            .with_order_id(3)
2997            .with_sort_field(SortField {
2998                source_id: 2,
2999                transform: Transform::Identity,
3000                direction: SortDirection::Ascending,
3001                null_order: NullOrder::First,
3002            })
3003            .with_sort_field(SortField {
3004                source_id: 3,
3005                transform: Transform::Bucket(4),
3006                direction: SortDirection::Descending,
3007                null_order: NullOrder::Last,
3008            })
3009            .build_unbound()
3010            .unwrap();
3011
3012        let snapshot1 = Snapshot::builder()
3013            .with_snapshot_id(3051729675574597004)
3014            .with_timestamp_ms(1515100955770)
3015            .with_sequence_number(0)
3016            .with_manifest_list("s3://a/b/1.avro")
3017            .with_summary(Summary {
3018                operation: Operation::Append,
3019                additional_properties: HashMap::new(),
3020            })
3021            .build();
3022
3023        let snapshot2 = Snapshot::builder()
3024            .with_snapshot_id(3055729675574597004)
3025            .with_parent_snapshot_id(Some(3051729675574597004))
3026            .with_timestamp_ms(1555100955770)
3027            .with_sequence_number(1)
3028            .with_schema_id(1)
3029            .with_manifest_list("s3://a/b/2.avro")
3030            .with_summary(Summary {
3031                operation: Operation::Append,
3032                additional_properties: HashMap::new(),
3033            })
3034            .build();
3035
3036        let default_partition_type = partition_spec.partition_type(&schema2).unwrap();
3037        let expected = TableMetadata {
3038            format_version: FormatVersion::V2,
3039            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3040            location: "s3://bucket/test/location".to_string(),
3041            last_updated_ms: 1602638573590,
3042            last_column_id: 3,
3043            schemas: HashMap::from_iter(vec![(0, Arc::new(schema1)), (1, Arc::new(schema2))]),
3044            current_schema_id: 1,
3045            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3046            default_spec: Arc::new(partition_spec),
3047            default_partition_type,
3048            last_partition_id: 1000,
3049            default_sort_order_id: 3,
3050            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3051            snapshots: HashMap::from_iter(vec![
3052                (3051729675574597004, Arc::new(snapshot1)),
3053                (3055729675574597004, Arc::new(snapshot2)),
3054            ]),
3055            current_snapshot_id: Some(3055729675574597004),
3056            last_sequence_number: 34,
3057            properties: HashMap::new(),
3058            snapshot_log: vec![
3059                SnapshotLog {
3060                    snapshot_id: 3051729675574597004,
3061                    timestamp_ms: 1515100955770,
3062                },
3063                SnapshotLog {
3064                    snapshot_id: 3055729675574597004,
3065                    timestamp_ms: 1555100955770,
3066                },
3067            ],
3068            metadata_log: Vec::new(),
3069            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
3070                snapshot_id: 3055729675574597004,
3071                retention: SnapshotRetention::Branch {
3072                    min_snapshots_to_keep: None,
3073                    max_snapshot_age_ms: None,
3074                    max_ref_age_ms: None,
3075                },
3076            })]),
3077            statistics: HashMap::new(),
3078            partition_statistics: HashMap::new(),
3079            encryption_keys: HashMap::new(),
3080            next_row_id: INITIAL_ROW_ID,
3081        };
3082
3083        check_table_metadata_serde(&metadata, expected);
3084    }
3085
3086    #[test]
3087    fn test_table_metadata_v2_file_valid_minimal() {
3088        let metadata =
3089            fs::read_to_string("testdata/table_metadata/TableMetadataV2ValidMinimal.json").unwrap();
3090
3091        let schema = Schema::builder()
3092            .with_schema_id(0)
3093            .with_fields(vec![
3094                Arc::new(NestedField::required(
3095                    1,
3096                    "x",
3097                    Type::Primitive(PrimitiveType::Long),
3098                )),
3099                Arc::new(
3100                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3101                        .with_doc("comment"),
3102                ),
3103                Arc::new(NestedField::required(
3104                    3,
3105                    "z",
3106                    Type::Primitive(PrimitiveType::Long),
3107                )),
3108            ])
3109            .build()
3110            .unwrap();
3111
3112        let partition_spec = PartitionSpec::builder(schema.clone())
3113            .with_spec_id(0)
3114            .add_unbound_field(UnboundPartitionField {
3115                name: "x".to_string(),
3116                transform: Transform::Identity,
3117                source_id: 1,
3118                field_id: Some(1000),
3119            })
3120            .unwrap()
3121            .build()
3122            .unwrap();
3123
3124        let sort_order = SortOrder::builder()
3125            .with_order_id(3)
3126            .with_sort_field(SortField {
3127                source_id: 2,
3128                transform: Transform::Identity,
3129                direction: SortDirection::Ascending,
3130                null_order: NullOrder::First,
3131            })
3132            .with_sort_field(SortField {
3133                source_id: 3,
3134                transform: Transform::Bucket(4),
3135                direction: SortDirection::Descending,
3136                null_order: NullOrder::Last,
3137            })
3138            .build_unbound()
3139            .unwrap();
3140
3141        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3142        let expected = TableMetadata {
3143            format_version: FormatVersion::V2,
3144            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3145            location: "s3://bucket/test/location".to_string(),
3146            last_updated_ms: 1602638573590,
3147            last_column_id: 3,
3148            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3149            current_schema_id: 0,
3150            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3151            default_partition_type,
3152            default_spec: Arc::new(partition_spec),
3153            last_partition_id: 1000,
3154            default_sort_order_id: 3,
3155            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3156            snapshots: HashMap::default(),
3157            current_snapshot_id: None,
3158            last_sequence_number: 34,
3159            properties: HashMap::new(),
3160            snapshot_log: vec![],
3161            metadata_log: Vec::new(),
3162            refs: HashMap::new(),
3163            statistics: HashMap::new(),
3164            partition_statistics: HashMap::new(),
3165            encryption_keys: HashMap::new(),
3166            next_row_id: INITIAL_ROW_ID,
3167        };
3168
3169        check_table_metadata_serde(&metadata, expected);
3170    }
3171
3172    #[test]
3173    fn test_table_metadata_v1_file_valid() {
3174        let metadata =
3175            fs::read_to_string("testdata/table_metadata/TableMetadataV1Valid.json").unwrap();
3176
3177        let schema = Schema::builder()
3178            .with_schema_id(0)
3179            .with_fields(vec![
3180                Arc::new(NestedField::required(
3181                    1,
3182                    "x",
3183                    Type::Primitive(PrimitiveType::Long),
3184                )),
3185                Arc::new(
3186                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3187                        .with_doc("comment"),
3188                ),
3189                Arc::new(NestedField::required(
3190                    3,
3191                    "z",
3192                    Type::Primitive(PrimitiveType::Long),
3193                )),
3194            ])
3195            .build()
3196            .unwrap();
3197
3198        let partition_spec = PartitionSpec::builder(schema.clone())
3199            .with_spec_id(0)
3200            .add_unbound_field(UnboundPartitionField {
3201                name: "x".to_string(),
3202                transform: Transform::Identity,
3203                source_id: 1,
3204                field_id: Some(1000),
3205            })
3206            .unwrap()
3207            .build()
3208            .unwrap();
3209
3210        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3211        let expected = TableMetadata {
3212            format_version: FormatVersion::V1,
3213            table_uuid: Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap(),
3214            location: "s3://bucket/test/location".to_string(),
3215            last_updated_ms: 1602638573874,
3216            last_column_id: 3,
3217            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3218            current_schema_id: 0,
3219            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3220            default_spec: Arc::new(partition_spec),
3221            default_partition_type,
3222            last_partition_id: 0,
3223            default_sort_order_id: 0,
3224            // Sort order is added during deserialization for V2 compatibility
3225            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
3226            snapshots: HashMap::new(),
3227            current_snapshot_id: None,
3228            last_sequence_number: 0,
3229            properties: HashMap::new(),
3230            snapshot_log: vec![],
3231            metadata_log: Vec::new(),
3232            refs: HashMap::new(),
3233            statistics: HashMap::new(),
3234            partition_statistics: HashMap::new(),
3235            encryption_keys: HashMap::new(),
3236            next_row_id: INITIAL_ROW_ID,
3237        };
3238
3239        check_table_metadata_serde(&metadata, expected);
3240    }
3241
3242    #[test]
3243    fn test_table_metadata_v1_compat() {
3244        let metadata =
3245            fs::read_to_string("testdata/table_metadata/TableMetadataV1Compat.json").unwrap();
3246
3247        // Deserialize the JSON to verify it works
3248        let desered_type: TableMetadata = serde_json::from_str(&metadata)
3249            .expect("Failed to deserialize TableMetadataV1Compat.json");
3250
3251        // Verify some key fields match
3252        assert_eq!(desered_type.format_version(), FormatVersion::V1);
3253        assert_eq!(
3254            desered_type.uuid(),
3255            Uuid::parse_str("3276010d-7b1d-488c-98d8-9025fc4fde6b").unwrap()
3256        );
3257        assert_eq!(
3258            desered_type.location(),
3259            "s3://bucket/warehouse/iceberg/glue.db/table_name"
3260        );
3261        assert_eq!(desered_type.last_updated_ms(), 1727773114005);
3262        assert_eq!(desered_type.current_schema_id(), 0);
3263    }
3264
3265    #[test]
3266    fn test_table_metadata_v1_schemas_without_current_id() {
3267        let metadata = fs::read_to_string(
3268            "testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json",
3269        )
3270        .unwrap();
3271
3272        // Deserialize the JSON - this should succeed by using the 'schema' field instead of 'schemas'
3273        let desered_type: TableMetadata = serde_json::from_str(&metadata)
3274            .expect("Failed to deserialize TableMetadataV1SchemasWithoutCurrentId.json");
3275
3276        // Verify it used the 'schema' field
3277        assert_eq!(desered_type.format_version(), FormatVersion::V1);
3278        assert_eq!(
3279            desered_type.uuid(),
3280            Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3281        );
3282
3283        // Get the schema and verify it has the expected fields
3284        let schema = desered_type.current_schema();
3285        assert_eq!(schema.as_struct().fields().len(), 3);
3286        assert_eq!(schema.as_struct().fields()[0].name, "x");
3287        assert_eq!(schema.as_struct().fields()[1].name, "y");
3288        assert_eq!(schema.as_struct().fields()[2].name, "z");
3289    }
3290
3291    #[test]
3292    fn test_table_metadata_v1_no_valid_schema() {
3293        let metadata =
3294            fs::read_to_string("testdata/table_metadata/TableMetadataV1NoValidSchema.json")
3295                .unwrap();
3296
3297        // Deserialize the JSON - this should fail because neither schemas + current_schema_id nor schema is valid
3298        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3299
3300        assert!(desered.is_err());
3301        let error_message = desered.unwrap_err().to_string();
3302        assert!(
3303            error_message.contains("No valid schema configuration found"),
3304            "Expected error about no valid schema configuration, got: {error_message}"
3305        );
3306    }
3307
3308    #[test]
3309    fn test_table_metadata_v1_partition_specs_without_default_id() {
3310        let metadata = fs::read_to_string(
3311            "testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json",
3312        )
3313        .unwrap();
3314
3315        // Deserialize the JSON - this should succeed by inferring default_spec_id as the max spec ID
3316        let desered_type: TableMetadata = serde_json::from_str(&metadata)
3317            .expect("Failed to deserialize TableMetadataV1PartitionSpecsWithoutDefaultId.json");
3318
3319        // Verify basic metadata
3320        assert_eq!(desered_type.format_version(), FormatVersion::V1);
3321        assert_eq!(
3322            desered_type.uuid(),
3323            Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3324        );
3325
3326        // Verify partition specs
3327        assert_eq!(desered_type.default_partition_spec_id(), 2); // Should pick the largest spec ID (2)
3328        assert_eq!(desered_type.partition_specs.len(), 2);
3329
3330        // Verify the default spec has the expected fields
3331        let default_spec = &desered_type.default_spec;
3332        assert_eq!(default_spec.spec_id(), 2);
3333        assert_eq!(default_spec.fields().len(), 1);
3334        assert_eq!(default_spec.fields()[0].name, "y");
3335        assert_eq!(default_spec.fields()[0].transform, Transform::Identity);
3336        assert_eq!(default_spec.fields()[0].source_id, 2);
3337    }
3338
3339    #[test]
3340    fn test_table_metadata_v2_schema_not_found() {
3341        let metadata =
3342            fs::read_to_string("testdata/table_metadata/TableMetadataV2CurrentSchemaNotFound.json")
3343                .unwrap();
3344
3345        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3346
3347        assert_eq!(
3348            desered.unwrap_err().to_string(),
3349            "DataInvalid => No schema exists with the current schema id 2."
3350        )
3351    }
3352
3353    #[test]
3354    fn test_table_metadata_v2_missing_sort_order() {
3355        let metadata =
3356            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSortOrder.json")
3357                .unwrap();
3358
3359        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3360
3361        assert_eq!(
3362            desered.unwrap_err().to_string(),
3363            "data did not match any variant of untagged enum TableMetadataEnum"
3364        )
3365    }
3366
3367    #[test]
3368    fn test_table_metadata_v2_missing_partition_specs() {
3369        let metadata =
3370            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingPartitionSpecs.json")
3371                .unwrap();
3372
3373        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3374
3375        assert_eq!(
3376            desered.unwrap_err().to_string(),
3377            "data did not match any variant of untagged enum TableMetadataEnum"
3378        )
3379    }
3380
3381    #[test]
3382    fn test_table_metadata_v2_missing_last_partition_id() {
3383        let metadata = fs::read_to_string(
3384            "testdata/table_metadata/TableMetadataV2MissingLastPartitionId.json",
3385        )
3386        .unwrap();
3387
3388        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3389
3390        assert_eq!(
3391            desered.unwrap_err().to_string(),
3392            "data did not match any variant of untagged enum TableMetadataEnum"
3393        )
3394    }
3395
3396    #[test]
3397    fn test_table_metadata_v2_missing_schemas() {
3398        let metadata =
3399            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSchemas.json")
3400                .unwrap();
3401
3402        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3403
3404        assert_eq!(
3405            desered.unwrap_err().to_string(),
3406            "data did not match any variant of untagged enum TableMetadataEnum"
3407        )
3408    }
3409
3410    #[test]
3411    fn test_table_metadata_v2_unsupported_version() {
3412        let metadata =
3413            fs::read_to_string("testdata/table_metadata/TableMetadataUnsupportedVersion.json")
3414                .unwrap();
3415
3416        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3417
3418        assert_eq!(
3419            desered.unwrap_err().to_string(),
3420            "data did not match any variant of untagged enum TableMetadataEnum"
3421        )
3422    }
3423
3424    #[test]
3425    fn test_order_of_format_version() {
3426        assert!(FormatVersion::V1 < FormatVersion::V2);
3427        assert_eq!(FormatVersion::V1, FormatVersion::V1);
3428        assert_eq!(FormatVersion::V2, FormatVersion::V2);
3429    }
3430
3431    #[test]
3432    fn test_default_partition_spec() {
3433        let default_spec_id = 1234;
3434        let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3435        let partition_spec = PartitionSpec::unpartition_spec();
3436        table_meta_data.default_spec = partition_spec.clone().into();
3437        table_meta_data
3438            .partition_specs
3439            .insert(default_spec_id, Arc::new(partition_spec));
3440
3441        assert_eq!(
3442            (*table_meta_data.default_partition_spec().clone()).clone(),
3443            (*table_meta_data
3444                .partition_spec_by_id(default_spec_id)
3445                .unwrap()
3446                .clone())
3447            .clone()
3448        );
3449    }
3450    #[test]
3451    fn test_default_sort_order() {
3452        let default_sort_order_id = 1234;
3453        let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3454        table_meta_data.default_sort_order_id = default_sort_order_id;
3455        table_meta_data
3456            .sort_orders
3457            .insert(default_sort_order_id, Arc::new(SortOrder::default()));
3458
3459        assert_eq!(
3460            table_meta_data.default_sort_order(),
3461            table_meta_data
3462                .sort_orders
3463                .get(&default_sort_order_id)
3464                .unwrap()
3465        )
3466    }
3467
3468    #[test]
3469    fn test_table_metadata_builder_from_table_creation() {
3470        let table_creation = TableCreation::builder()
3471            .location("s3://db/table".to_string())
3472            .name("table".to_string())
3473            .properties(HashMap::new())
3474            .schema(Schema::builder().build().unwrap())
3475            .build();
3476        let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
3477            .unwrap()
3478            .build()
3479            .unwrap()
3480            .metadata;
3481        assert_eq!(table_metadata.location, "s3://db/table");
3482        assert_eq!(table_metadata.schemas.len(), 1);
3483        assert_eq!(
3484            table_metadata
3485                .schemas
3486                .get(&0)
3487                .unwrap()
3488                .as_struct()
3489                .fields()
3490                .len(),
3491            0
3492        );
3493        assert_eq!(table_metadata.properties.len(), 0);
3494        assert_eq!(
3495            table_metadata.partition_specs,
3496            HashMap::from([(
3497                0,
3498                Arc::new(
3499                    PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap().clone())
3500                        .with_spec_id(0)
3501                        .build()
3502                        .unwrap()
3503                )
3504            )])
3505        );
3506        assert_eq!(
3507            table_metadata.sort_orders,
3508            HashMap::from([(
3509                0,
3510                Arc::new(SortOrder {
3511                    order_id: 0,
3512                    fields: vec![]
3513                })
3514            )])
3515        );
3516    }
3517
3518    #[tokio::test]
3519    async fn test_table_metadata_read_write() {
3520        // Create a temporary directory for our test
3521        let temp_dir = TempDir::new().unwrap();
3522        let temp_path = temp_dir.path().to_str().unwrap();
3523
3524        // Create a FileIO instance
3525        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3526
3527        // Use an existing test metadata from the test files
3528        let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3529
3530        // Define the metadata location
3531        let metadata_location = format!("{temp_path}/metadata.json");
3532
3533        // Write the metadata
3534        original_metadata
3535            .write_to(&file_io, &metadata_location)
3536            .await
3537            .unwrap();
3538
3539        // Verify the file exists
3540        assert!(fs::metadata(&metadata_location).is_ok());
3541
3542        // Read the metadata back
3543        let read_metadata = TableMetadata::read_from(&file_io, &metadata_location)
3544            .await
3545            .unwrap();
3546
3547        // Verify the metadata matches
3548        assert_eq!(read_metadata, original_metadata);
3549    }
3550
3551    #[tokio::test]
3552    async fn test_table_metadata_read_compressed() {
3553        let temp_dir = TempDir::new().unwrap();
3554        let metadata_location = temp_dir.path().join("v1.gz.metadata.json");
3555
3556        let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3557        let json = serde_json::to_string(&original_metadata).unwrap();
3558
3559        let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
3560        encoder.write_all(json.as_bytes()).unwrap();
3561        std::fs::write(&metadata_location, encoder.finish().unwrap())
3562            .expect("failed to write metadata");
3563
3564        // Read the metadata back
3565        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3566        let metadata_location = metadata_location.to_str().unwrap();
3567        let read_metadata = TableMetadata::read_from(&file_io, metadata_location)
3568            .await
3569            .unwrap();
3570
3571        // Verify the metadata matches
3572        assert_eq!(read_metadata, original_metadata);
3573    }
3574
3575    #[tokio::test]
3576    async fn test_table_metadata_read_nonexistent_file() {
3577        // Create a FileIO instance
3578        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3579
3580        // Try to read a non-existent file
3581        let result = TableMetadata::read_from(&file_io, "/nonexistent/path/metadata.json").await;
3582
3583        // Verify it returns an error
3584        assert!(result.is_err());
3585    }
3586
3587    #[test]
3588    fn test_partition_name_exists() {
3589        let schema = Schema::builder()
3590            .with_fields(vec![
3591                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3592                NestedField::required(2, "partition_col", Type::Primitive(PrimitiveType::Int))
3593                    .into(),
3594            ])
3595            .build()
3596            .unwrap();
3597
3598        let spec1 = PartitionSpec::builder(schema.clone())
3599            .with_spec_id(1)
3600            .add_partition_field("data", "data_partition", Transform::Identity)
3601            .unwrap()
3602            .build()
3603            .unwrap();
3604
3605        let spec2 = PartitionSpec::builder(schema.clone())
3606            .with_spec_id(2)
3607            .add_partition_field("partition_col", "partition_bucket", Transform::Bucket(16))
3608            .unwrap()
3609            .build()
3610            .unwrap();
3611
3612        // Build metadata with these specs
3613        let metadata = TableMetadataBuilder::new(
3614            schema,
3615            spec1.clone().into_unbound(),
3616            SortOrder::unsorted_order(),
3617            "s3://test/location".to_string(),
3618            FormatVersion::V2,
3619            HashMap::new(),
3620        )
3621        .unwrap()
3622        .add_partition_spec(spec2.into_unbound())
3623        .unwrap()
3624        .build()
3625        .unwrap()
3626        .metadata;
3627
3628        assert!(metadata.partition_name_exists("data_partition"));
3629        assert!(metadata.partition_name_exists("partition_bucket"));
3630
3631        assert!(!metadata.partition_name_exists("nonexistent_field"));
3632        assert!(!metadata.partition_name_exists("data")); // schema field name, not partition field name
3633        assert!(!metadata.partition_name_exists(""));
3634    }
3635
3636    #[test]
3637    fn test_partition_name_exists_empty_specs() {
3638        // Create metadata with no partition specs (unpartitioned table)
3639        let schema = Schema::builder()
3640            .with_fields(vec![
3641                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3642            ])
3643            .build()
3644            .unwrap();
3645
3646        let metadata = TableMetadataBuilder::new(
3647            schema,
3648            PartitionSpec::unpartition_spec().into_unbound(),
3649            SortOrder::unsorted_order(),
3650            "s3://test/location".to_string(),
3651            FormatVersion::V2,
3652            HashMap::new(),
3653        )
3654        .unwrap()
3655        .build()
3656        .unwrap()
3657        .metadata;
3658
3659        assert!(!metadata.partition_name_exists("any_field"));
3660        assert!(!metadata.partition_name_exists("data"));
3661    }
3662
3663    #[test]
3664    fn test_name_exists_in_any_schema() {
3665        // Create multiple schemas with different fields
3666        let schema1 = Schema::builder()
3667            .with_schema_id(1)
3668            .with_fields(vec![
3669                NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3670                NestedField::required(2, "field2", Type::Primitive(PrimitiveType::Int)).into(),
3671            ])
3672            .build()
3673            .unwrap();
3674
3675        let schema2 = Schema::builder()
3676            .with_schema_id(2)
3677            .with_fields(vec![
3678                NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3679                NestedField::required(3, "field3", Type::Primitive(PrimitiveType::Long)).into(),
3680            ])
3681            .build()
3682            .unwrap();
3683
3684        let metadata = TableMetadataBuilder::new(
3685            schema1,
3686            PartitionSpec::unpartition_spec().into_unbound(),
3687            SortOrder::unsorted_order(),
3688            "s3://test/location".to_string(),
3689            FormatVersion::V2,
3690            HashMap::new(),
3691        )
3692        .unwrap()
3693        .add_current_schema(schema2)
3694        .unwrap()
3695        .build()
3696        .unwrap()
3697        .metadata;
3698
3699        assert!(metadata.name_exists_in_any_schema("field1")); // exists in both schemas
3700        assert!(metadata.name_exists_in_any_schema("field2")); // exists only in schema1 (historical)
3701        assert!(metadata.name_exists_in_any_schema("field3")); // exists only in schema2 (current)
3702
3703        assert!(!metadata.name_exists_in_any_schema("nonexistent_field"));
3704        assert!(!metadata.name_exists_in_any_schema("field4"));
3705        assert!(!metadata.name_exists_in_any_schema(""));
3706    }
3707
3708    #[test]
3709    fn test_name_exists_in_any_schema_empty_schemas() {
3710        let schema = Schema::builder().with_fields(vec![]).build().unwrap();
3711
3712        let metadata = TableMetadataBuilder::new(
3713            schema,
3714            PartitionSpec::unpartition_spec().into_unbound(),
3715            SortOrder::unsorted_order(),
3716            "s3://test/location".to_string(),
3717            FormatVersion::V2,
3718            HashMap::new(),
3719        )
3720        .unwrap()
3721        .build()
3722        .unwrap()
3723        .metadata;
3724
3725        assert!(!metadata.name_exists_in_any_schema("any_field"));
3726    }
3727
3728    #[test]
3729    fn test_helper_methods_multi_version_scenario() {
3730        // Test a realistic multi-version scenario
3731        let initial_schema = Schema::builder()
3732            .with_fields(vec![
3733                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3734                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3735                NestedField::required(
3736                    3,
3737                    "deprecated_field",
3738                    Type::Primitive(PrimitiveType::String),
3739                )
3740                .into(),
3741            ])
3742            .build()
3743            .unwrap();
3744
3745        let metadata = TableMetadataBuilder::new(
3746            initial_schema,
3747            PartitionSpec::unpartition_spec().into_unbound(),
3748            SortOrder::unsorted_order(),
3749            "s3://test/location".to_string(),
3750            FormatVersion::V2,
3751            HashMap::new(),
3752        )
3753        .unwrap();
3754
3755        let evolved_schema = Schema::builder()
3756            .with_fields(vec![
3757                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3758                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3759                NestedField::required(
3760                    3,
3761                    "deprecated_field",
3762                    Type::Primitive(PrimitiveType::String),
3763                )
3764                .into(),
3765                NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3766                    .into(),
3767            ])
3768            .build()
3769            .unwrap();
3770
3771        // Then add a third schema that removes the deprecated field
3772        let _final_schema = Schema::builder()
3773            .with_fields(vec![
3774                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3775                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3776                NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3777                    .into(),
3778                NestedField::required(5, "latest_field", Type::Primitive(PrimitiveType::Boolean))
3779                    .into(),
3780            ])
3781            .build()
3782            .unwrap();
3783
3784        let final_metadata = metadata
3785            .add_current_schema(evolved_schema)
3786            .unwrap()
3787            .build()
3788            .unwrap()
3789            .metadata;
3790
3791        assert!(!final_metadata.partition_name_exists("nonexistent_partition")); // unpartitioned table
3792
3793        assert!(final_metadata.name_exists_in_any_schema("id")); // exists in both schemas
3794        assert!(final_metadata.name_exists_in_any_schema("name")); // exists in both schemas
3795        assert!(final_metadata.name_exists_in_any_schema("deprecated_field")); // exists in both schemas
3796        assert!(final_metadata.name_exists_in_any_schema("new_field")); // only in current schema
3797        assert!(!final_metadata.name_exists_in_any_schema("never_existed"));
3798    }
3799}