iceberg/spec/
table_metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the [table metadata](https://iceberg.apache.org/spec/#table-metadata).
19//! The main struct here is [TableMetadataV2] which defines the data for a table.
20
21use std::cmp::Ordering;
22use std::collections::HashMap;
23use std::fmt::{Display, Formatter};
24use std::hash::Hash;
25use std::sync::Arc;
26
27use _serde::TableMetadataEnum;
28use chrono::{DateTime, Utc};
29use serde::{Deserialize, Serialize};
30use serde_repr::{Deserialize_repr, Serialize_repr};
31use uuid::Uuid;
32
33use super::snapshot::SnapshotReference;
34pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder};
35use super::{
36    DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef,
37    SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType,
38    TableProperties,
39};
40use crate::compression::CompressionCodec;
41use crate::error::{Result, timestamp_ms_to_utc};
42use crate::io::FileIO;
43use crate::spec::EncryptedKey;
44use crate::{Error, ErrorKind};
45
46static MAIN_BRANCH: &str = "main";
47pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
48
49pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
50pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
51
52/// Initial row id for row lineage for new v3 tables and older tables upgrading to v3.
53pub const INITIAL_ROW_ID: u64 = 0;
54/// Minimum format version that supports row lineage (v3).
55pub const MIN_FORMAT_VERSION_ROW_LINEAGE: FormatVersion = FormatVersion::V3;
56/// Reference to [`TableMetadata`].
57pub type TableMetadataRef = Arc<TableMetadata>;
58
59#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
60#[serde(try_from = "TableMetadataEnum")]
61/// Fields for the version 2 of the table metadata.
62///
63/// We assume that this data structure is always valid, so we will panic when invalid error happens.
64/// We check the validity of this data structure when constructing.
65pub struct TableMetadata {
66    /// Integer Version for the format.
67    pub(crate) format_version: FormatVersion,
68    /// A UUID that identifies the table
69    pub(crate) table_uuid: Uuid,
70    /// Location tables base location
71    pub(crate) location: String,
72    /// The tables highest sequence number
73    pub(crate) last_sequence_number: i64,
74    /// Timestamp in milliseconds from the unix epoch when the table was last updated.
75    pub(crate) last_updated_ms: i64,
76    /// An integer; the highest assigned column ID for the table.
77    pub(crate) last_column_id: i32,
78    /// A list of schemas, stored as objects with schema-id.
79    pub(crate) schemas: HashMap<i32, SchemaRef>,
80    /// ID of the table’s current schema.
81    pub(crate) current_schema_id: i32,
82    /// A list of partition specs, stored as full partition spec objects.
83    pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
84    /// ID of the “current” spec that writers should use by default.
85    pub(crate) default_spec: PartitionSpecRef,
86    /// Partition type of the default partition spec.
87    pub(crate) default_partition_type: StructType,
88    /// An integer; the highest assigned partition field ID across all partition specs for the table.
89    pub(crate) last_partition_id: i32,
90    ///A string to string map of table properties. This is used to control settings that
91    /// affect reading and writing and is not intended to be used for arbitrary metadata.
92    /// For example, commit.retry.num-retries is used to control the number of commit retries.
93    pub(crate) properties: HashMap<String, String>,
94    /// long ID of the current table snapshot; must be the same as the current
95    /// ID of the main branch in refs.
96    pub(crate) current_snapshot_id: Option<i64>,
97    ///A list of valid snapshots. Valid snapshots are snapshots for which all
98    /// data files exist in the file system. A data file must not be deleted
99    /// from the file system until the last snapshot in which it was listed is
100    /// garbage collected.
101    pub(crate) snapshots: HashMap<i64, SnapshotRef>,
102    /// A list (optional) of timestamp and snapshot ID pairs that encodes changes
103    /// to the current snapshot for the table. Each time the current-snapshot-id
104    /// is changed, a new entry should be added with the last-updated-ms
105    /// and the new current-snapshot-id. When snapshots are expired from
106    /// the list of valid snapshots, all entries before a snapshot that has
107    /// expired should be removed.
108    pub(crate) snapshot_log: Vec<SnapshotLog>,
109
110    /// A list (optional) of timestamp and metadata file location pairs
111    /// that encodes changes to the previous metadata files for the table.
112    /// Each time a new metadata file is created, a new entry of the
113    /// previous metadata file location should be added to the list.
114    /// Tables can be configured to remove the oldest metadata log entries and
115    /// keep a fixed-size log of the most recent entries after a commit.
116    pub(crate) metadata_log: Vec<MetadataLog>,
117
118    /// A list of sort orders, stored as full sort order objects.
119    pub(crate) sort_orders: HashMap<i64, SortOrderRef>,
120    /// Default sort order id of the table. Note that this could be used by
121    /// writers, but is not used when reading because reads use the specs
122    /// stored in manifest files.
123    pub(crate) default_sort_order_id: i64,
124    /// A map of snapshot references. The map keys are the unique snapshot reference
125    /// names in the table, and the map values are snapshot reference objects.
126    /// There is always a main branch reference pointing to the current-snapshot-id
127    /// even if the refs map is null.
128    pub(crate) refs: HashMap<String, SnapshotReference>,
129    /// Mapping of snapshot ids to statistics files.
130    pub(crate) statistics: HashMap<i64, StatisticsFile>,
131    /// Mapping of snapshot ids to partition statistics files.
132    pub(crate) partition_statistics: HashMap<i64, PartitionStatisticsFile>,
133    /// Encryption Keys - map of key id to the actual key
134    pub(crate) encryption_keys: HashMap<String, EncryptedKey>,
135    /// Next row id to be assigned for Row Lineage (v3)
136    pub(crate) next_row_id: u64,
137}
138
139impl TableMetadata {
140    /// Convert this Table Metadata into a builder for modification.
141    ///
142    /// `current_file_location` is the location where the current version
143    /// of the metadata file is stored. This is used to update the metadata log.
144    /// If `current_file_location` is `None`, the metadata log will not be updated.
145    /// This should only be used to stage-create tables.
146    #[must_use]
147    pub fn into_builder(self, current_file_location: Option<String>) -> TableMetadataBuilder {
148        TableMetadataBuilder::new_from_metadata(self, current_file_location)
149    }
150
151    /// Check if a partition field name exists in any partition spec.
152    #[inline]
153    pub(crate) fn partition_name_exists(&self, name: &str) -> bool {
154        self.partition_specs
155            .values()
156            .any(|spec| spec.fields().iter().any(|pf| pf.name == name))
157    }
158
159    /// Check if a field name exists in any schema.
160    #[inline]
161    pub(crate) fn name_exists_in_any_schema(&self, name: &str) -> bool {
162        self.schemas
163            .values()
164            .any(|schema| schema.field_by_name(name).is_some())
165    }
166
167    /// Returns format version of this metadata.
168    #[inline]
169    pub fn format_version(&self) -> FormatVersion {
170        self.format_version
171    }
172
173    /// Returns uuid of current table.
174    #[inline]
175    pub fn uuid(&self) -> Uuid {
176        self.table_uuid
177    }
178
179    /// Returns table location.
180    #[inline]
181    pub fn location(&self) -> &str {
182        self.location.as_str()
183    }
184
185    /// Returns last sequence number.
186    #[inline]
187    pub fn last_sequence_number(&self) -> i64 {
188        self.last_sequence_number
189    }
190
191    /// Returns the next sequence number for the table.
192    ///
193    /// For format version 1, it always returns the initial sequence number.
194    /// For other versions, it returns the last sequence number incremented by 1.
195    #[inline]
196    pub fn next_sequence_number(&self) -> i64 {
197        match self.format_version {
198            FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER,
199            _ => self.last_sequence_number + 1,
200        }
201    }
202
203    /// Returns the last column id.
204    #[inline]
205    pub fn last_column_id(&self) -> i32 {
206        self.last_column_id
207    }
208
209    /// Returns the last partition_id
210    #[inline]
211    pub fn last_partition_id(&self) -> i32 {
212        self.last_partition_id
213    }
214
215    /// Returns last updated time.
216    #[inline]
217    pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
218        timestamp_ms_to_utc(self.last_updated_ms)
219    }
220
221    /// Returns last updated time in milliseconds.
222    #[inline]
223    pub fn last_updated_ms(&self) -> i64 {
224        self.last_updated_ms
225    }
226
227    /// Returns schemas
228    #[inline]
229    pub fn schemas_iter(&self) -> impl ExactSizeIterator<Item = &SchemaRef> {
230        self.schemas.values()
231    }
232
233    /// Lookup schema by id.
234    #[inline]
235    pub fn schema_by_id(&self, schema_id: SchemaId) -> Option<&SchemaRef> {
236        self.schemas.get(&schema_id)
237    }
238
239    /// Get current schema
240    #[inline]
241    pub fn current_schema(&self) -> &SchemaRef {
242        self.schema_by_id(self.current_schema_id)
243            .expect("Current schema id set, but not found in table metadata")
244    }
245
246    /// Get the id of the current schema
247    #[inline]
248    pub fn current_schema_id(&self) -> SchemaId {
249        self.current_schema_id
250    }
251
252    /// Returns all partition specs.
253    #[inline]
254    pub fn partition_specs_iter(&self) -> impl ExactSizeIterator<Item = &PartitionSpecRef> {
255        self.partition_specs.values()
256    }
257
258    /// Lookup partition spec by id.
259    #[inline]
260    pub fn partition_spec_by_id(&self, spec_id: i32) -> Option<&PartitionSpecRef> {
261        self.partition_specs.get(&spec_id)
262    }
263
264    /// Get default partition spec
265    #[inline]
266    pub fn default_partition_spec(&self) -> &PartitionSpecRef {
267        &self.default_spec
268    }
269
270    /// Return the partition type of the default partition spec.
271    #[inline]
272    pub fn default_partition_type(&self) -> &StructType {
273        &self.default_partition_type
274    }
275
276    #[inline]
277    /// Returns spec id of the "current" partition spec.
278    pub fn default_partition_spec_id(&self) -> i32 {
279        self.default_spec.spec_id()
280    }
281
282    /// Returns all snapshots
283    #[inline]
284    pub fn snapshots(&self) -> impl ExactSizeIterator<Item = &SnapshotRef> {
285        self.snapshots.values()
286    }
287
288    /// Lookup snapshot by id.
289    #[inline]
290    pub fn snapshot_by_id(&self, snapshot_id: i64) -> Option<&SnapshotRef> {
291        self.snapshots.get(&snapshot_id)
292    }
293
294    /// Returns snapshot history.
295    #[inline]
296    pub fn history(&self) -> &[SnapshotLog] {
297        &self.snapshot_log
298    }
299
300    /// Returns the metadata log.
301    #[inline]
302    pub fn metadata_log(&self) -> &[MetadataLog] {
303        &self.metadata_log
304    }
305
306    /// Get current snapshot
307    #[inline]
308    pub fn current_snapshot(&self) -> Option<&SnapshotRef> {
309        self.current_snapshot_id.map(|s| {
310            self.snapshot_by_id(s)
311                .expect("Current snapshot id has been set, but doesn't exist in metadata")
312        })
313    }
314
315    /// Get the current snapshot id
316    #[inline]
317    pub fn current_snapshot_id(&self) -> Option<i64> {
318        self.current_snapshot_id
319    }
320
321    /// Get the snapshot for a reference
322    /// Returns an option if the `ref_name` is not found
323    #[inline]
324    pub fn snapshot_for_ref(&self, ref_name: &str) -> Option<&SnapshotRef> {
325        self.refs.get(ref_name).map(|r| {
326            self.snapshot_by_id(r.snapshot_id)
327                .unwrap_or_else(|| panic!("Snapshot id of ref {ref_name} doesn't exist"))
328        })
329    }
330
331    /// Return all sort orders.
332    #[inline]
333    pub fn sort_orders_iter(&self) -> impl ExactSizeIterator<Item = &SortOrderRef> {
334        self.sort_orders.values()
335    }
336
337    /// Lookup sort order by id.
338    #[inline]
339    pub fn sort_order_by_id(&self, sort_order_id: i64) -> Option<&SortOrderRef> {
340        self.sort_orders.get(&sort_order_id)
341    }
342
343    /// Returns default sort order id.
344    #[inline]
345    pub fn default_sort_order(&self) -> &SortOrderRef {
346        self.sort_orders
347            .get(&self.default_sort_order_id)
348            .expect("Default order id has been set, but not found in table metadata!")
349    }
350
351    /// Returns default sort order id.
352    #[inline]
353    pub fn default_sort_order_id(&self) -> i64 {
354        self.default_sort_order_id
355    }
356
357    /// Returns properties of table.
358    #[inline]
359    pub fn properties(&self) -> &HashMap<String, String> {
360        &self.properties
361    }
362
363    /// Returns typed table properties parsed from the raw properties map with defaults.
364    pub fn table_properties(&self) -> Result<TableProperties> {
365        TableProperties::try_from(&self.properties).map_err(|e| {
366            Error::new(ErrorKind::DataInvalid, "Invalid table properties").with_source(e)
367        })
368    }
369
370    /// Return location of statistics files.
371    #[inline]
372    pub fn statistics_iter(&self) -> impl ExactSizeIterator<Item = &StatisticsFile> {
373        self.statistics.values()
374    }
375
376    /// Return location of partition statistics files.
377    #[inline]
378    pub fn partition_statistics_iter(
379        &self,
380    ) -> impl ExactSizeIterator<Item = &PartitionStatisticsFile> {
381        self.partition_statistics.values()
382    }
383
384    /// Get a statistics file for a snapshot id.
385    #[inline]
386    pub fn statistics_for_snapshot(&self, snapshot_id: i64) -> Option<&StatisticsFile> {
387        self.statistics.get(&snapshot_id)
388    }
389
390    /// Get a partition statistics file for a snapshot id.
391    #[inline]
392    pub fn partition_statistics_for_snapshot(
393        &self,
394        snapshot_id: i64,
395    ) -> Option<&PartitionStatisticsFile> {
396        self.partition_statistics.get(&snapshot_id)
397    }
398
399    fn construct_refs(&mut self) {
400        if let Some(current_snapshot_id) = self.current_snapshot_id
401            && !self.refs.contains_key(MAIN_BRANCH)
402        {
403            self.refs
404                .insert(MAIN_BRANCH.to_string(), SnapshotReference {
405                    snapshot_id: current_snapshot_id,
406                    retention: SnapshotRetention::Branch {
407                        min_snapshots_to_keep: None,
408                        max_snapshot_age_ms: None,
409                        max_ref_age_ms: None,
410                    },
411                });
412        }
413    }
414
415    /// Iterate over all encryption keys
416    #[inline]
417    pub fn encryption_keys_iter(&self) -> impl ExactSizeIterator<Item = &EncryptedKey> {
418        self.encryption_keys.values()
419    }
420
421    /// Get the encryption key for a given key id
422    #[inline]
423    pub fn encryption_key(&self, key_id: &str) -> Option<&EncryptedKey> {
424        self.encryption_keys.get(key_id)
425    }
426
427    /// Get the next row id to be assigned
428    #[inline]
429    pub fn next_row_id(&self) -> u64 {
430        self.next_row_id
431    }
432
433    /// Read table metadata from the given location.
434    pub async fn read_from(
435        file_io: &FileIO,
436        metadata_location: impl AsRef<str>,
437    ) -> Result<TableMetadata> {
438        let metadata_location = metadata_location.as_ref();
439        let input_file = file_io.new_input(metadata_location)?;
440        let metadata_content = input_file.read().await?;
441
442        // Check if the file is compressed by looking for the gzip "magic number".
443        let metadata = if metadata_content.len() > 2
444            && metadata_content[0] == 0x1F
445            && metadata_content[1] == 0x8B
446        {
447            let decompressed_data = CompressionCodec::Gzip
448                .decompress(metadata_content.to_vec())
449                .map_err(|e| {
450                    Error::new(
451                        ErrorKind::DataInvalid,
452                        "Trying to read compressed metadata file",
453                    )
454                    .with_context("file_path", metadata_location)
455                    .with_source(e)
456                })?;
457            serde_json::from_slice(&decompressed_data)?
458        } else {
459            serde_json::from_slice(&metadata_content)?
460        };
461
462        Ok(metadata)
463    }
464
465    /// Write table metadata to the given location.
466    pub async fn write_to(
467        &self,
468        file_io: &FileIO,
469        metadata_location: impl AsRef<str>,
470    ) -> Result<()> {
471        file_io
472            .new_output(metadata_location)?
473            .write(serde_json::to_vec(self)?.into())
474            .await
475    }
476
477    /// Normalize this partition spec.
478    ///
479    /// This is an internal method
480    /// meant to be called after constructing table metadata from untrusted sources.
481    /// We run this method after json deserialization.
482    /// All constructors for `TableMetadata` which are part of `iceberg-rust`
483    /// should return normalized `TableMetadata`.
484    pub(super) fn try_normalize(&mut self) -> Result<&mut Self> {
485        self.validate_current_schema()?;
486        self.normalize_current_snapshot()?;
487        self.construct_refs();
488        self.validate_refs()?;
489        self.validate_chronological_snapshot_logs()?;
490        self.validate_chronological_metadata_logs()?;
491        // Normalize location (remove trailing slash)
492        self.location = self.location.trim_end_matches('/').to_string();
493        self.validate_snapshot_sequence_number()?;
494        self.try_normalize_partition_spec()?;
495        self.try_normalize_sort_order()?;
496        Ok(self)
497    }
498
499    /// If the default partition spec is not present in specs, add it
500    fn try_normalize_partition_spec(&mut self) -> Result<()> {
501        if self
502            .partition_spec_by_id(self.default_spec.spec_id())
503            .is_none()
504        {
505            self.partition_specs.insert(
506                self.default_spec.spec_id(),
507                Arc::new(Arc::unwrap_or_clone(self.default_spec.clone())),
508            );
509        }
510
511        Ok(())
512    }
513
514    /// If the default sort order is unsorted but the sort order is not present, add it
515    fn try_normalize_sort_order(&mut self) -> Result<()> {
516        // Validate that sort order ID 0 (reserved for unsorted) has no fields
517        if let Some(sort_order) = self.sort_order_by_id(SortOrder::UNSORTED_ORDER_ID)
518            && !sort_order.fields.is_empty()
519        {
520            return Err(Error::new(
521                ErrorKind::Unexpected,
522                format!(
523                    "Sort order ID {} is reserved for unsorted order",
524                    SortOrder::UNSORTED_ORDER_ID
525                ),
526            ));
527        }
528
529        if self.sort_order_by_id(self.default_sort_order_id).is_some() {
530            return Ok(());
531        }
532
533        if self.default_sort_order_id != SortOrder::UNSORTED_ORDER_ID {
534            return Err(Error::new(
535                ErrorKind::DataInvalid,
536                format!(
537                    "No sort order exists with the default sort order id {}.",
538                    self.default_sort_order_id
539                ),
540            ));
541        }
542
543        let sort_order = SortOrder::unsorted_order();
544        self.sort_orders
545            .insert(SortOrder::UNSORTED_ORDER_ID, Arc::new(sort_order));
546        Ok(())
547    }
548
549    /// Validate the current schema is set and exists.
550    fn validate_current_schema(&self) -> Result<()> {
551        if self.schema_by_id(self.current_schema_id).is_none() {
552            return Err(Error::new(
553                ErrorKind::DataInvalid,
554                format!(
555                    "No schema exists with the current schema id {}.",
556                    self.current_schema_id
557                ),
558            ));
559        }
560        Ok(())
561    }
562
563    /// If current snapshot is Some(-1) then set it to None.
564    fn normalize_current_snapshot(&mut self) -> Result<()> {
565        if let Some(current_snapshot_id) = self.current_snapshot_id {
566            if current_snapshot_id == EMPTY_SNAPSHOT_ID {
567                self.current_snapshot_id = None;
568            } else if self.snapshot_by_id(current_snapshot_id).is_none() {
569                return Err(Error::new(
570                    ErrorKind::DataInvalid,
571                    format!(
572                        "Snapshot for current snapshot id {current_snapshot_id} does not exist in the existing snapshots list"
573                    ),
574                ));
575            }
576        }
577        Ok(())
578    }
579
580    /// Validate that all refs are valid (snapshot exists)
581    fn validate_refs(&self) -> Result<()> {
582        for (name, snapshot_ref) in self.refs.iter() {
583            if self.snapshot_by_id(snapshot_ref.snapshot_id).is_none() {
584                return Err(Error::new(
585                    ErrorKind::DataInvalid,
586                    format!(
587                        "Snapshot for reference {name} does not exist in the existing snapshots list"
588                    ),
589                ));
590            }
591        }
592
593        let main_ref = self.refs.get(MAIN_BRANCH);
594        if self.current_snapshot_id.is_some() {
595            if let Some(main_ref) = main_ref
596                && main_ref.snapshot_id != self.current_snapshot_id.unwrap_or_default()
597            {
598                return Err(Error::new(
599                    ErrorKind::DataInvalid,
600                    format!(
601                        "Current snapshot id does not match main branch ({:?} != {:?})",
602                        self.current_snapshot_id.unwrap_or_default(),
603                        main_ref.snapshot_id
604                    ),
605                ));
606            }
607        } else if main_ref.is_some() {
608            return Err(Error::new(
609                ErrorKind::DataInvalid,
610                "Current snapshot is not set, but main branch exists",
611            ));
612        }
613
614        Ok(())
615    }
616
617    /// Validate that for V1 Metadata the last_sequence_number is 0
618    fn validate_snapshot_sequence_number(&self) -> Result<()> {
619        if self.format_version < FormatVersion::V2 && self.last_sequence_number != 0 {
620            return Err(Error::new(
621                ErrorKind::DataInvalid,
622                format!(
623                    "Last sequence number must be 0 in v1. Found {}",
624                    self.last_sequence_number
625                ),
626            ));
627        }
628
629        if self.format_version >= FormatVersion::V2
630            && let Some(snapshot) = self
631                .snapshots
632                .values()
633                .find(|snapshot| snapshot.sequence_number() > self.last_sequence_number)
634        {
635            return Err(Error::new(
636                ErrorKind::DataInvalid,
637                format!(
638                    "Invalid snapshot with id {} and sequence number {} greater than last sequence number {}",
639                    snapshot.snapshot_id(),
640                    snapshot.sequence_number(),
641                    self.last_sequence_number
642                ),
643            ));
644        }
645
646        Ok(())
647    }
648
649    /// Validate snapshots logs are chronological and last updated is after the last snapshot log.
650    fn validate_chronological_snapshot_logs(&self) -> Result<()> {
651        for window in self.snapshot_log.windows(2) {
652            let (prev, curr) = (&window[0], &window[1]);
653            // commits can happen concurrently from different machines.
654            // A tolerance helps us avoid failure for small clock skew
655            if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
656                return Err(Error::new(
657                    ErrorKind::DataInvalid,
658                    "Expected sorted snapshot log entries",
659                ));
660            }
661        }
662
663        if let Some(last) = self.snapshot_log.last() {
664            // commits can happen concurrently from different machines.
665            // A tolerance helps us avoid failure for small clock skew
666            if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
667                return Err(Error::new(
668                    ErrorKind::DataInvalid,
669                    format!(
670                        "Invalid update timestamp {}: before last snapshot log entry at {}",
671                        self.last_updated_ms, last.timestamp_ms
672                    ),
673                ));
674            }
675        }
676        Ok(())
677    }
678
679    fn validate_chronological_metadata_logs(&self) -> Result<()> {
680        for window in self.metadata_log.windows(2) {
681            let (prev, curr) = (&window[0], &window[1]);
682            // commits can happen concurrently from different machines.
683            // A tolerance helps us avoid failure for small clock skew
684            if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
685                return Err(Error::new(
686                    ErrorKind::DataInvalid,
687                    "Expected sorted metadata log entries",
688                ));
689            }
690        }
691
692        if let Some(last) = self.metadata_log.last() {
693            // commits can happen concurrently from different machines.
694            // A tolerance helps us avoid failure for small clock skew
695            if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
696                return Err(Error::new(
697                    ErrorKind::DataInvalid,
698                    format!(
699                        "Invalid update timestamp {}: before last metadata log entry at {}",
700                        self.last_updated_ms, last.timestamp_ms
701                    ),
702                ));
703            }
704        }
705
706        Ok(())
707    }
708}
709
710pub(super) mod _serde {
711    use std::borrow::BorrowMut;
712    /// This is a helper module that defines types to help with serialization/deserialization.
713    /// For deserialization the input first gets read into either the [TableMetadataV1] or [TableMetadataV2] struct
714    /// and then converted into the [TableMetadata] struct. Serialization works the other way around.
715    /// [TableMetadataV1] and [TableMetadataV2] are internal struct that are only used for serialization and deserialization.
716    use std::collections::HashMap;
717    /// This is a helper module that defines types to help with serialization/deserialization.
718    /// For deserialization the input first gets read into either the [TableMetadataV1] or [TableMetadataV2] struct
719    /// and then converted into the [TableMetadata] struct. Serialization works the other way around.
720    /// [TableMetadataV1] and [TableMetadataV2] are internal struct that are only used for serialization and deserialization.
721    use std::sync::Arc;
722
723    use serde::{Deserialize, Serialize};
724    use uuid::Uuid;
725
726    use super::{
727        DEFAULT_PARTITION_SPEC_ID, FormatVersion, MAIN_BRANCH, MetadataLog, SnapshotLog,
728        TableMetadata,
729    };
730    use crate::spec::schema::_serde::{SchemaV1, SchemaV2};
731    use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2, SnapshotV3};
732    use crate::spec::{
733        EncryptedKey, INITIAL_ROW_ID, PartitionField, PartitionSpec, PartitionSpecRef,
734        PartitionStatisticsFile, Schema, SchemaRef, Snapshot, SnapshotReference, SnapshotRetention,
735        SortOrder, StatisticsFile,
736    };
737    use crate::{Error, ErrorKind};
738
739    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
740    #[serde(untagged)]
741    pub(super) enum TableMetadataEnum {
742        V3(TableMetadataV3),
743        V2(TableMetadataV2),
744        V1(TableMetadataV1),
745    }
746
747    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
748    #[serde(rename_all = "kebab-case")]
749    /// Defines the structure of a v2 table metadata for serialization/deserialization
750    pub(super) struct TableMetadataV3 {
751        pub format_version: VersionNumber<3>,
752        #[serde(flatten)]
753        pub shared: TableMetadataV2V3Shared,
754        pub next_row_id: u64,
755        #[serde(skip_serializing_if = "Option::is_none")]
756        pub encryption_keys: Option<Vec<EncryptedKey>>,
757        #[serde(skip_serializing_if = "Option::is_none")]
758        pub snapshots: Option<Vec<SnapshotV3>>,
759    }
760
761    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
762    #[serde(rename_all = "kebab-case")]
763    /// Defines the structure of a v2 table metadata for serialization/deserialization
764    pub(super) struct TableMetadataV2V3Shared {
765        pub table_uuid: Uuid,
766        pub location: String,
767        pub last_sequence_number: i64,
768        pub last_updated_ms: i64,
769        pub last_column_id: i32,
770        pub schemas: Vec<SchemaV2>,
771        pub current_schema_id: i32,
772        pub partition_specs: Vec<PartitionSpec>,
773        pub default_spec_id: i32,
774        pub last_partition_id: i32,
775        #[serde(skip_serializing_if = "Option::is_none")]
776        pub properties: Option<HashMap<String, String>>,
777        #[serde(skip_serializing_if = "Option::is_none")]
778        pub current_snapshot_id: Option<i64>,
779        #[serde(skip_serializing_if = "Option::is_none")]
780        pub snapshot_log: Option<Vec<SnapshotLog>>,
781        #[serde(skip_serializing_if = "Option::is_none")]
782        pub metadata_log: Option<Vec<MetadataLog>>,
783        pub sort_orders: Vec<SortOrder>,
784        pub default_sort_order_id: i64,
785        #[serde(skip_serializing_if = "Option::is_none")]
786        pub refs: Option<HashMap<String, SnapshotReference>>,
787        #[serde(default, skip_serializing_if = "Vec::is_empty")]
788        pub statistics: Vec<StatisticsFile>,
789        #[serde(default, skip_serializing_if = "Vec::is_empty")]
790        pub partition_statistics: Vec<PartitionStatisticsFile>,
791    }
792
793    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
794    #[serde(rename_all = "kebab-case")]
795    /// Defines the structure of a v2 table metadata for serialization/deserialization
796    pub(super) struct TableMetadataV2 {
797        pub format_version: VersionNumber<2>,
798        #[serde(flatten)]
799        pub shared: TableMetadataV2V3Shared,
800        #[serde(skip_serializing_if = "Option::is_none")]
801        pub snapshots: Option<Vec<SnapshotV2>>,
802    }
803
804    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
805    #[serde(rename_all = "kebab-case")]
806    /// Defines the structure of a v1 table metadata for serialization/deserialization
807    pub(super) struct TableMetadataV1 {
808        pub format_version: VersionNumber<1>,
809        #[serde(skip_serializing_if = "Option::is_none")]
810        pub table_uuid: Option<Uuid>,
811        pub location: String,
812        pub last_updated_ms: i64,
813        pub last_column_id: i32,
814        /// `schema` is optional to prioritize `schemas` and `current-schema-id`, allowing liberal reading of V1 metadata.
815        pub schema: Option<SchemaV1>,
816        #[serde(skip_serializing_if = "Option::is_none")]
817        pub schemas: Option<Vec<SchemaV1>>,
818        #[serde(skip_serializing_if = "Option::is_none")]
819        pub current_schema_id: Option<i32>,
820        /// `partition_spec` is optional to prioritize `partition_specs`, aligning with liberal reading of potentially invalid V1 metadata.
821        pub partition_spec: Option<Vec<PartitionField>>,
822        #[serde(skip_serializing_if = "Option::is_none")]
823        pub partition_specs: Option<Vec<PartitionSpec>>,
824        #[serde(skip_serializing_if = "Option::is_none")]
825        pub default_spec_id: Option<i32>,
826        #[serde(skip_serializing_if = "Option::is_none")]
827        pub last_partition_id: Option<i32>,
828        #[serde(skip_serializing_if = "Option::is_none")]
829        pub properties: Option<HashMap<String, String>>,
830        #[serde(skip_serializing_if = "Option::is_none")]
831        pub current_snapshot_id: Option<i64>,
832        #[serde(skip_serializing_if = "Option::is_none")]
833        pub snapshots: Option<Vec<SnapshotV1>>,
834        #[serde(skip_serializing_if = "Option::is_none")]
835        pub snapshot_log: Option<Vec<SnapshotLog>>,
836        #[serde(skip_serializing_if = "Option::is_none")]
837        pub metadata_log: Option<Vec<MetadataLog>>,
838        pub sort_orders: Option<Vec<SortOrder>>,
839        pub default_sort_order_id: Option<i64>,
840        #[serde(default, skip_serializing_if = "Vec::is_empty")]
841        pub statistics: Vec<StatisticsFile>,
842        #[serde(default, skip_serializing_if = "Vec::is_empty")]
843        pub partition_statistics: Vec<PartitionStatisticsFile>,
844    }
845
846    /// Helper to serialize and deserialize the format version.
847    #[derive(Debug, PartialEq, Eq)]
848    pub(crate) struct VersionNumber<const V: u8>;
849
850    impl Serialize for TableMetadata {
851        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
852        where S: serde::Serializer {
853            // we must do a clone here
854            let table_metadata_enum: TableMetadataEnum =
855                self.clone().try_into().map_err(serde::ser::Error::custom)?;
856
857            table_metadata_enum.serialize(serializer)
858        }
859    }
860
861    impl<const V: u8> Serialize for VersionNumber<V> {
862        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
863        where S: serde::Serializer {
864            serializer.serialize_u8(V)
865        }
866    }
867
868    impl<'de, const V: u8> Deserialize<'de> for VersionNumber<V> {
869        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
870        where D: serde::Deserializer<'de> {
871            let value = u8::deserialize(deserializer)?;
872            if value == V {
873                Ok(VersionNumber::<V>)
874            } else {
875                Err(serde::de::Error::custom("Invalid Version"))
876            }
877        }
878    }
879
880    impl TryFrom<TableMetadataEnum> for TableMetadata {
881        type Error = Error;
882        fn try_from(value: TableMetadataEnum) -> Result<Self, Error> {
883            match value {
884                TableMetadataEnum::V3(value) => value.try_into(),
885                TableMetadataEnum::V2(value) => value.try_into(),
886                TableMetadataEnum::V1(value) => value.try_into(),
887            }
888        }
889    }
890
891    impl TryFrom<TableMetadata> for TableMetadataEnum {
892        type Error = Error;
893        fn try_from(value: TableMetadata) -> Result<Self, Error> {
894            Ok(match value.format_version {
895                FormatVersion::V3 => TableMetadataEnum::V3(value.try_into()?),
896                FormatVersion::V2 => TableMetadataEnum::V2(value.into()),
897                FormatVersion::V1 => TableMetadataEnum::V1(value.try_into()?),
898            })
899        }
900    }
901
902    impl TryFrom<TableMetadataV3> for TableMetadata {
903        type Error = Error;
904        fn try_from(value: TableMetadataV3) -> Result<Self, self::Error> {
905            let TableMetadataV3 {
906                format_version: _,
907                shared: value,
908                next_row_id,
909                encryption_keys,
910                snapshots,
911            } = value;
912            let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
913                None
914            } else {
915                value.current_snapshot_id
916            };
917            let schemas = HashMap::from_iter(
918                value
919                    .schemas
920                    .into_iter()
921                    .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
922                    .collect::<Result<Vec<_>, Error>>()?,
923            );
924
925            let current_schema: &SchemaRef =
926                schemas.get(&value.current_schema_id).ok_or_else(|| {
927                    Error::new(
928                        ErrorKind::DataInvalid,
929                        format!(
930                            "No schema exists with the current schema id {}.",
931                            value.current_schema_id
932                        ),
933                    )
934                })?;
935            let partition_specs = HashMap::from_iter(
936                value
937                    .partition_specs
938                    .into_iter()
939                    .map(|x| (x.spec_id(), Arc::new(x))),
940            );
941            let default_spec_id = value.default_spec_id;
942            let default_spec: PartitionSpecRef = partition_specs
943                .get(&value.default_spec_id)
944                .map(|spec| (**spec).clone())
945                .or_else(|| {
946                    (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
947                        .then(PartitionSpec::unpartition_spec)
948                })
949                .ok_or_else(|| {
950                    Error::new(
951                        ErrorKind::DataInvalid,
952                        format!("Default partition spec {default_spec_id} not found"),
953                    )
954                })?
955                .into();
956            let default_partition_type = default_spec.partition_type(current_schema)?;
957
958            let mut metadata = TableMetadata {
959                format_version: FormatVersion::V3,
960                table_uuid: value.table_uuid,
961                location: value.location,
962                last_sequence_number: value.last_sequence_number,
963                last_updated_ms: value.last_updated_ms,
964                last_column_id: value.last_column_id,
965                current_schema_id: value.current_schema_id,
966                schemas,
967                partition_specs,
968                default_partition_type,
969                default_spec,
970                last_partition_id: value.last_partition_id,
971                properties: value.properties.unwrap_or_default(),
972                current_snapshot_id,
973                snapshots: snapshots
974                    .map(|snapshots| {
975                        HashMap::from_iter(
976                            snapshots
977                                .into_iter()
978                                .map(|x| (x.snapshot_id, Arc::new(x.into()))),
979                        )
980                    })
981                    .unwrap_or_default(),
982                snapshot_log: value.snapshot_log.unwrap_or_default(),
983                metadata_log: value.metadata_log.unwrap_or_default(),
984                sort_orders: HashMap::from_iter(
985                    value
986                        .sort_orders
987                        .into_iter()
988                        .map(|x| (x.order_id, Arc::new(x))),
989                ),
990                default_sort_order_id: value.default_sort_order_id,
991                refs: value.refs.unwrap_or_else(|| {
992                    if let Some(snapshot_id) = current_snapshot_id {
993                        HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
994                            snapshot_id,
995                            retention: SnapshotRetention::Branch {
996                                min_snapshots_to_keep: None,
997                                max_snapshot_age_ms: None,
998                                max_ref_age_ms: None,
999                            },
1000                        })])
1001                    } else {
1002                        HashMap::new()
1003                    }
1004                }),
1005                statistics: index_statistics(value.statistics),
1006                partition_statistics: index_partition_statistics(value.partition_statistics),
1007                encryption_keys: encryption_keys
1008                    .map(|keys| {
1009                        HashMap::from_iter(keys.into_iter().map(|key| (key.key_id.clone(), key)))
1010                    })
1011                    .unwrap_or_default(),
1012                next_row_id,
1013            };
1014
1015            metadata.borrow_mut().try_normalize()?;
1016            Ok(metadata)
1017        }
1018    }
1019
1020    impl TryFrom<TableMetadataV2> for TableMetadata {
1021        type Error = Error;
1022        fn try_from(value: TableMetadataV2) -> Result<Self, self::Error> {
1023            let snapshots = value.snapshots;
1024            let value = value.shared;
1025            let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1026                None
1027            } else {
1028                value.current_snapshot_id
1029            };
1030            let schemas = HashMap::from_iter(
1031                value
1032                    .schemas
1033                    .into_iter()
1034                    .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
1035                    .collect::<Result<Vec<_>, Error>>()?,
1036            );
1037
1038            let current_schema: &SchemaRef =
1039                schemas.get(&value.current_schema_id).ok_or_else(|| {
1040                    Error::new(
1041                        ErrorKind::DataInvalid,
1042                        format!(
1043                            "No schema exists with the current schema id {}.",
1044                            value.current_schema_id
1045                        ),
1046                    )
1047                })?;
1048            let partition_specs = HashMap::from_iter(
1049                value
1050                    .partition_specs
1051                    .into_iter()
1052                    .map(|x| (x.spec_id(), Arc::new(x))),
1053            );
1054            let default_spec_id = value.default_spec_id;
1055            let default_spec: PartitionSpecRef = partition_specs
1056                .get(&value.default_spec_id)
1057                .map(|spec| (**spec).clone())
1058                .or_else(|| {
1059                    (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
1060                        .then(PartitionSpec::unpartition_spec)
1061                })
1062                .ok_or_else(|| {
1063                    Error::new(
1064                        ErrorKind::DataInvalid,
1065                        format!("Default partition spec {default_spec_id} not found"),
1066                    )
1067                })?
1068                .into();
1069            let default_partition_type = default_spec.partition_type(current_schema)?;
1070
1071            let mut metadata = TableMetadata {
1072                format_version: FormatVersion::V2,
1073                table_uuid: value.table_uuid,
1074                location: value.location,
1075                last_sequence_number: value.last_sequence_number,
1076                last_updated_ms: value.last_updated_ms,
1077                last_column_id: value.last_column_id,
1078                current_schema_id: value.current_schema_id,
1079                schemas,
1080                partition_specs,
1081                default_partition_type,
1082                default_spec,
1083                last_partition_id: value.last_partition_id,
1084                properties: value.properties.unwrap_or_default(),
1085                current_snapshot_id,
1086                snapshots: snapshots
1087                    .map(|snapshots| {
1088                        HashMap::from_iter(
1089                            snapshots
1090                                .into_iter()
1091                                .map(|x| (x.snapshot_id, Arc::new(x.into()))),
1092                        )
1093                    })
1094                    .unwrap_or_default(),
1095                snapshot_log: value.snapshot_log.unwrap_or_default(),
1096                metadata_log: value.metadata_log.unwrap_or_default(),
1097                sort_orders: HashMap::from_iter(
1098                    value
1099                        .sort_orders
1100                        .into_iter()
1101                        .map(|x| (x.order_id, Arc::new(x))),
1102                ),
1103                default_sort_order_id: value.default_sort_order_id,
1104                refs: value.refs.unwrap_or_else(|| {
1105                    if let Some(snapshot_id) = current_snapshot_id {
1106                        HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1107                            snapshot_id,
1108                            retention: SnapshotRetention::Branch {
1109                                min_snapshots_to_keep: None,
1110                                max_snapshot_age_ms: None,
1111                                max_ref_age_ms: None,
1112                            },
1113                        })])
1114                    } else {
1115                        HashMap::new()
1116                    }
1117                }),
1118                statistics: index_statistics(value.statistics),
1119                partition_statistics: index_partition_statistics(value.partition_statistics),
1120                encryption_keys: HashMap::new(),
1121                next_row_id: INITIAL_ROW_ID,
1122            };
1123
1124            metadata.borrow_mut().try_normalize()?;
1125            Ok(metadata)
1126        }
1127    }
1128
1129    impl TryFrom<TableMetadataV1> for TableMetadata {
1130        type Error = Error;
1131        fn try_from(value: TableMetadataV1) -> Result<Self, Error> {
1132            let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1133                None
1134            } else {
1135                value.current_snapshot_id
1136            };
1137
1138            let (schemas, current_schema_id, current_schema) =
1139                if let (Some(schemas_vec), Some(schema_id)) =
1140                    (&value.schemas, value.current_schema_id)
1141                {
1142                    // Option 1: Use 'schemas' + 'current_schema_id'
1143                    let schema_map = HashMap::from_iter(
1144                        schemas_vec
1145                            .clone()
1146                            .into_iter()
1147                            .map(|schema| {
1148                                let schema: Schema = schema.try_into()?;
1149                                Ok((schema.schema_id(), Arc::new(schema)))
1150                            })
1151                            .collect::<Result<Vec<_>, Error>>()?,
1152                    );
1153
1154                    let schema = schema_map
1155                        .get(&schema_id)
1156                        .ok_or_else(|| {
1157                            Error::new(
1158                                ErrorKind::DataInvalid,
1159                                format!("No schema exists with the current schema id {schema_id}."),
1160                            )
1161                        })?
1162                        .clone();
1163                    (schema_map, schema_id, schema)
1164                } else if let Some(schema) = value.schema {
1165                    // Option 2: Fall back to `schema`
1166                    let schema: Schema = schema.try_into()?;
1167                    let schema_id = schema.schema_id();
1168                    let schema_arc = Arc::new(schema);
1169                    let schema_map = HashMap::from_iter(vec![(schema_id, schema_arc.clone())]);
1170                    (schema_map, schema_id, schema_arc)
1171                } else {
1172                    // Option 3: No valid schema configuration found
1173                    return Err(Error::new(
1174                        ErrorKind::DataInvalid,
1175                        "No valid schema configuration found in table metadata",
1176                    ));
1177                };
1178
1179            // Prioritize 'partition_specs' over 'partition_spec'
1180            let partition_specs = if let Some(specs_vec) = value.partition_specs {
1181                // Option 1: Use 'partition_specs'
1182                specs_vec
1183                    .into_iter()
1184                    .map(|x| (x.spec_id(), Arc::new(x)))
1185                    .collect::<HashMap<_, _>>()
1186            } else if let Some(partition_spec) = value.partition_spec {
1187                // Option 2: Fall back to 'partition_spec'
1188                let spec = PartitionSpec::builder(current_schema.clone())
1189                    .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1190                    .add_unbound_fields(partition_spec.into_iter().map(|f| f.into_unbound()))?
1191                    .build()?;
1192
1193                HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1194            } else {
1195                // Option 3: Create empty partition spec
1196                let spec = PartitionSpec::builder(current_schema.clone())
1197                    .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1198                    .build()?;
1199
1200                HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1201            };
1202
1203            // Get the default_spec_id, prioritizing the explicit value if provided
1204            let default_spec_id = value
1205                .default_spec_id
1206                .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default());
1207
1208            // Get the default spec
1209            let default_spec: PartitionSpecRef = partition_specs
1210                .get(&default_spec_id)
1211                .map(|x| Arc::unwrap_or_clone(x.clone()))
1212                .ok_or_else(|| {
1213                    Error::new(
1214                        ErrorKind::DataInvalid,
1215                        format!("Default partition spec {default_spec_id} not found"),
1216                    )
1217                })?
1218                .into();
1219            let default_partition_type = default_spec.partition_type(&current_schema)?;
1220
1221            let mut metadata = TableMetadata {
1222                format_version: FormatVersion::V1,
1223                table_uuid: value.table_uuid.unwrap_or_default(),
1224                location: value.location,
1225                last_sequence_number: 0,
1226                last_updated_ms: value.last_updated_ms,
1227                last_column_id: value.last_column_id,
1228                current_schema_id,
1229                default_spec,
1230                default_partition_type,
1231                last_partition_id: value
1232                    .last_partition_id
1233                    .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()),
1234                partition_specs,
1235                schemas,
1236                properties: value.properties.unwrap_or_default(),
1237                current_snapshot_id,
1238                snapshots: value
1239                    .snapshots
1240                    .map(|snapshots| {
1241                        Ok::<_, Error>(HashMap::from_iter(
1242                            snapshots
1243                                .into_iter()
1244                                .map(|x| Ok((x.snapshot_id, Arc::new(x.try_into()?))))
1245                                .collect::<Result<Vec<_>, Error>>()?,
1246                        ))
1247                    })
1248                    .transpose()?
1249                    .unwrap_or_default(),
1250                snapshot_log: value.snapshot_log.unwrap_or_default(),
1251                metadata_log: value.metadata_log.unwrap_or_default(),
1252                sort_orders: match value.sort_orders {
1253                    Some(sort_orders) => HashMap::from_iter(
1254                        sort_orders.into_iter().map(|x| (x.order_id, Arc::new(x))),
1255                    ),
1256                    None => HashMap::new(),
1257                },
1258                default_sort_order_id: value
1259                    .default_sort_order_id
1260                    .unwrap_or(SortOrder::UNSORTED_ORDER_ID),
1261                refs: if let Some(snapshot_id) = current_snapshot_id {
1262                    HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1263                        snapshot_id,
1264                        retention: SnapshotRetention::Branch {
1265                            min_snapshots_to_keep: None,
1266                            max_snapshot_age_ms: None,
1267                            max_ref_age_ms: None,
1268                        },
1269                    })])
1270                } else {
1271                    HashMap::new()
1272                },
1273                statistics: index_statistics(value.statistics),
1274                partition_statistics: index_partition_statistics(value.partition_statistics),
1275                encryption_keys: HashMap::new(),
1276                next_row_id: INITIAL_ROW_ID, // v1 has no row lineage
1277            };
1278
1279            metadata.borrow_mut().try_normalize()?;
1280            Ok(metadata)
1281        }
1282    }
1283
1284    impl TryFrom<TableMetadata> for TableMetadataV3 {
1285        type Error = Error;
1286
1287        fn try_from(mut v: TableMetadata) -> Result<Self, Self::Error> {
1288            let next_row_id = v.next_row_id;
1289            let encryption_keys = std::mem::take(&mut v.encryption_keys);
1290            let snapshots = std::mem::take(&mut v.snapshots);
1291            let shared = v.into();
1292
1293            Ok(TableMetadataV3 {
1294                format_version: VersionNumber::<3>,
1295                shared,
1296                next_row_id,
1297                encryption_keys: if encryption_keys.is_empty() {
1298                    None
1299                } else {
1300                    Some(encryption_keys.into_values().collect())
1301                },
1302                snapshots: if snapshots.is_empty() {
1303                    None
1304                } else {
1305                    Some(
1306                        snapshots
1307                            .into_values()
1308                            .map(|s| SnapshotV3::try_from(Arc::unwrap_or_clone(s)))
1309                            .collect::<Result<_, _>>()?,
1310                    )
1311                },
1312            })
1313        }
1314    }
1315
1316    impl From<TableMetadata> for TableMetadataV2 {
1317        fn from(mut v: TableMetadata) -> Self {
1318            let snapshots = std::mem::take(&mut v.snapshots);
1319            let shared = v.into();
1320
1321            TableMetadataV2 {
1322                format_version: VersionNumber::<2>,
1323                shared,
1324                snapshots: if snapshots.is_empty() {
1325                    None
1326                } else {
1327                    Some(
1328                        snapshots
1329                            .into_values()
1330                            .map(|s| SnapshotV2::from(Arc::unwrap_or_clone(s)))
1331                            .collect(),
1332                    )
1333                },
1334            }
1335        }
1336    }
1337
1338    impl From<TableMetadata> for TableMetadataV2V3Shared {
1339        fn from(v: TableMetadata) -> Self {
1340            TableMetadataV2V3Shared {
1341                table_uuid: v.table_uuid,
1342                location: v.location,
1343                last_sequence_number: v.last_sequence_number,
1344                last_updated_ms: v.last_updated_ms,
1345                last_column_id: v.last_column_id,
1346                schemas: v
1347                    .schemas
1348                    .into_values()
1349                    .map(|x| {
1350                        Arc::try_unwrap(x)
1351                            .unwrap_or_else(|schema| schema.as_ref().clone())
1352                            .into()
1353                    })
1354                    .collect(),
1355                current_schema_id: v.current_schema_id,
1356                partition_specs: v
1357                    .partition_specs
1358                    .into_values()
1359                    .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1360                    .collect(),
1361                default_spec_id: v.default_spec.spec_id(),
1362                last_partition_id: v.last_partition_id,
1363                properties: if v.properties.is_empty() {
1364                    None
1365                } else {
1366                    Some(v.properties)
1367                },
1368                current_snapshot_id: v.current_snapshot_id,
1369                snapshot_log: if v.snapshot_log.is_empty() {
1370                    None
1371                } else {
1372                    Some(v.snapshot_log)
1373                },
1374                metadata_log: if v.metadata_log.is_empty() {
1375                    None
1376                } else {
1377                    Some(v.metadata_log)
1378                },
1379                sort_orders: v
1380                    .sort_orders
1381                    .into_values()
1382                    .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1383                    .collect(),
1384                default_sort_order_id: v.default_sort_order_id,
1385                refs: Some(v.refs),
1386                statistics: v.statistics.into_values().collect(),
1387                partition_statistics: v.partition_statistics.into_values().collect(),
1388            }
1389        }
1390    }
1391
1392    impl TryFrom<TableMetadata> for TableMetadataV1 {
1393        type Error = Error;
1394        fn try_from(v: TableMetadata) -> Result<Self, Error> {
1395            Ok(TableMetadataV1 {
1396                format_version: VersionNumber::<1>,
1397                table_uuid: Some(v.table_uuid),
1398                location: v.location,
1399                last_updated_ms: v.last_updated_ms,
1400                last_column_id: v.last_column_id,
1401                schema: Some(
1402                    v.schemas
1403                        .get(&v.current_schema_id)
1404                        .ok_or(Error::new(
1405                            ErrorKind::Unexpected,
1406                            "current_schema_id not found in schemas",
1407                        ))?
1408                        .as_ref()
1409                        .clone()
1410                        .into(),
1411                ),
1412                schemas: Some(
1413                    v.schemas
1414                        .into_values()
1415                        .map(|x| {
1416                            Arc::try_unwrap(x)
1417                                .unwrap_or_else(|schema| schema.as_ref().clone())
1418                                .into()
1419                        })
1420                        .collect(),
1421                ),
1422                current_schema_id: Some(v.current_schema_id),
1423                partition_spec: Some(v.default_spec.fields().to_vec()),
1424                partition_specs: Some(
1425                    v.partition_specs
1426                        .into_values()
1427                        .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1428                        .collect(),
1429                ),
1430                default_spec_id: Some(v.default_spec.spec_id()),
1431                last_partition_id: Some(v.last_partition_id),
1432                properties: if v.properties.is_empty() {
1433                    None
1434                } else {
1435                    Some(v.properties)
1436                },
1437                current_snapshot_id: v.current_snapshot_id,
1438                snapshots: if v.snapshots.is_empty() {
1439                    None
1440                } else {
1441                    Some(
1442                        v.snapshots
1443                            .into_values()
1444                            .map(|x| Snapshot::clone(&x).into())
1445                            .collect(),
1446                    )
1447                },
1448                snapshot_log: if v.snapshot_log.is_empty() {
1449                    None
1450                } else {
1451                    Some(v.snapshot_log)
1452                },
1453                metadata_log: if v.metadata_log.is_empty() {
1454                    None
1455                } else {
1456                    Some(v.metadata_log)
1457                },
1458                sort_orders: Some(
1459                    v.sort_orders
1460                        .into_values()
1461                        .map(|s| Arc::try_unwrap(s).unwrap_or_else(|s| s.as_ref().clone()))
1462                        .collect(),
1463                ),
1464                default_sort_order_id: Some(v.default_sort_order_id),
1465                statistics: v.statistics.into_values().collect(),
1466                partition_statistics: v.partition_statistics.into_values().collect(),
1467            })
1468        }
1469    }
1470
1471    fn index_statistics(statistics: Vec<StatisticsFile>) -> HashMap<i64, StatisticsFile> {
1472        statistics
1473            .into_iter()
1474            .rev()
1475            .map(|s| (s.snapshot_id, s))
1476            .collect()
1477    }
1478
1479    fn index_partition_statistics(
1480        statistics: Vec<PartitionStatisticsFile>,
1481    ) -> HashMap<i64, PartitionStatisticsFile> {
1482        statistics
1483            .into_iter()
1484            .rev()
1485            .map(|s| (s.snapshot_id, s))
1486            .collect()
1487    }
1488}
1489
1490#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy, Hash)]
1491#[repr(u8)]
1492/// Iceberg format version
1493pub enum FormatVersion {
1494    /// Iceberg spec version 1
1495    V1 = 1u8,
1496    /// Iceberg spec version 2
1497    V2 = 2u8,
1498    /// Iceberg spec version 3
1499    V3 = 3u8,
1500}
1501
1502impl PartialOrd for FormatVersion {
1503    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1504        Some(self.cmp(other))
1505    }
1506}
1507
1508impl Ord for FormatVersion {
1509    fn cmp(&self, other: &Self) -> Ordering {
1510        (*self as u8).cmp(&(*other as u8))
1511    }
1512}
1513
1514impl Display for FormatVersion {
1515    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1516        match self {
1517            FormatVersion::V1 => write!(f, "v1"),
1518            FormatVersion::V2 => write!(f, "v2"),
1519            FormatVersion::V3 => write!(f, "v3"),
1520        }
1521    }
1522}
1523
1524#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1525#[serde(rename_all = "kebab-case")]
1526/// Encodes changes to the previous metadata files for the table
1527pub struct MetadataLog {
1528    /// The file for the log.
1529    pub metadata_file: String,
1530    /// Time new metadata was created
1531    pub timestamp_ms: i64,
1532}
1533
1534#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1535#[serde(rename_all = "kebab-case")]
1536/// A log of when each snapshot was made.
1537pub struct SnapshotLog {
1538    /// Id of the snapshot.
1539    pub snapshot_id: i64,
1540    /// Last updated timestamp
1541    pub timestamp_ms: i64,
1542}
1543
1544impl SnapshotLog {
1545    /// Returns the last updated timestamp as a DateTime<Utc> with millisecond precision
1546    pub fn timestamp(self) -> Result<DateTime<Utc>> {
1547        timestamp_ms_to_utc(self.timestamp_ms)
1548    }
1549
1550    /// Returns the timestamp in milliseconds
1551    #[inline]
1552    pub fn timestamp_ms(&self) -> i64 {
1553        self.timestamp_ms
1554    }
1555}
1556
1557#[cfg(test)]
1558mod tests {
1559    use std::collections::HashMap;
1560    use std::fs;
1561    use std::sync::Arc;
1562
1563    use anyhow::Result;
1564    use base64::Engine as _;
1565    use pretty_assertions::assert_eq;
1566    use tempfile::TempDir;
1567    use uuid::Uuid;
1568
1569    use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder};
1570    use crate::compression::CompressionCodec;
1571    use crate::io::FileIOBuilder;
1572    use crate::spec::table_metadata::TableMetadata;
1573    use crate::spec::{
1574        BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation,
1575        PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot,
1576        SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile,
1577        Summary, Transform, Type, UnboundPartitionField,
1578    };
1579    use crate::{ErrorKind, TableCreation};
1580
1581    fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
1582        let desered_type: TableMetadata = serde_json::from_str(json).unwrap();
1583        assert_eq!(desered_type, expected_type);
1584
1585        let sered_json = serde_json::to_string(&expected_type).unwrap();
1586        let parsed_json_value = serde_json::from_str::<TableMetadata>(&sered_json).unwrap();
1587
1588        assert_eq!(parsed_json_value, desered_type);
1589    }
1590
1591    fn get_test_table_metadata(file_name: &str) -> TableMetadata {
1592        let path = format!("testdata/table_metadata/{file_name}");
1593        let metadata: String = fs::read_to_string(path).unwrap();
1594
1595        serde_json::from_str(&metadata).unwrap()
1596    }
1597
1598    #[test]
1599    fn test_table_data_v2() {
1600        let data = r#"
1601            {
1602                "format-version" : 2,
1603                "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1604                "location": "s3://b/wh/data.db/table",
1605                "last-sequence-number" : 1,
1606                "last-updated-ms": 1515100955770,
1607                "last-column-id": 1,
1608                "schemas": [
1609                    {
1610                        "schema-id" : 1,
1611                        "type" : "struct",
1612                        "fields" :[
1613                            {
1614                                "id": 1,
1615                                "name": "struct_name",
1616                                "required": true,
1617                                "type": "fixed[1]"
1618                            },
1619                            {
1620                                "id": 4,
1621                                "name": "ts",
1622                                "required": true,
1623                                "type": "timestamp"
1624                            }
1625                        ]
1626                    }
1627                ],
1628                "current-schema-id" : 1,
1629                "partition-specs": [
1630                    {
1631                        "spec-id": 0,
1632                        "fields": [
1633                            {
1634                                "source-id": 4,
1635                                "field-id": 1000,
1636                                "name": "ts_day",
1637                                "transform": "day"
1638                            }
1639                        ]
1640                    }
1641                ],
1642                "default-spec-id": 0,
1643                "last-partition-id": 1000,
1644                "properties": {
1645                    "commit.retry.num-retries": "1"
1646                },
1647                "metadata-log": [
1648                    {
1649                        "metadata-file": "s3://bucket/.../v1.json",
1650                        "timestamp-ms": 1515100
1651                    }
1652                ],
1653                "refs": {},
1654                "sort-orders": [
1655                    {
1656                    "order-id": 0,
1657                    "fields": []
1658                    }
1659                ],
1660                "default-sort-order-id": 0
1661            }
1662        "#;
1663
1664        let schema = Schema::builder()
1665            .with_schema_id(1)
1666            .with_fields(vec![
1667                Arc::new(NestedField::required(
1668                    1,
1669                    "struct_name",
1670                    Type::Primitive(PrimitiveType::Fixed(1)),
1671                )),
1672                Arc::new(NestedField::required(
1673                    4,
1674                    "ts",
1675                    Type::Primitive(PrimitiveType::Timestamp),
1676                )),
1677            ])
1678            .build()
1679            .unwrap();
1680
1681        let partition_spec = PartitionSpec::builder(schema.clone())
1682            .with_spec_id(0)
1683            .add_unbound_field(UnboundPartitionField {
1684                name: "ts_day".to_string(),
1685                transform: Transform::Day,
1686                source_id: 4,
1687                field_id: Some(1000),
1688            })
1689            .unwrap()
1690            .build()
1691            .unwrap();
1692
1693        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1694        let expected = TableMetadata {
1695            format_version: FormatVersion::V2,
1696            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1697            location: "s3://b/wh/data.db/table".to_string(),
1698            last_updated_ms: 1515100955770,
1699            last_column_id: 1,
1700            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1701            current_schema_id: 1,
1702            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1703            default_partition_type,
1704            default_spec: partition_spec.into(),
1705            last_partition_id: 1000,
1706            default_sort_order_id: 0,
1707            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1708            snapshots: HashMap::default(),
1709            current_snapshot_id: None,
1710            last_sequence_number: 1,
1711            properties: HashMap::from_iter(vec![(
1712                "commit.retry.num-retries".to_string(),
1713                "1".to_string(),
1714            )]),
1715            snapshot_log: Vec::new(),
1716            metadata_log: vec![MetadataLog {
1717                metadata_file: "s3://bucket/.../v1.json".to_string(),
1718                timestamp_ms: 1515100,
1719            }],
1720            refs: HashMap::new(),
1721            statistics: HashMap::new(),
1722            partition_statistics: HashMap::new(),
1723            encryption_keys: HashMap::new(),
1724            next_row_id: INITIAL_ROW_ID,
1725        };
1726
1727        let expected_json_value = serde_json::to_value(&expected).unwrap();
1728        check_table_metadata_serde(data, expected);
1729
1730        let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1731        assert_eq!(json_value, expected_json_value);
1732    }
1733
1734    #[test]
1735    fn test_table_data_v3() {
1736        let data = r#"
1737            {
1738                "format-version" : 3,
1739                "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1740                "location": "s3://b/wh/data.db/table",
1741                "last-sequence-number" : 1,
1742                "last-updated-ms": 1515100955770,
1743                "last-column-id": 1,
1744                "next-row-id": 5,
1745                "schemas": [
1746                    {
1747                        "schema-id" : 1,
1748                        "type" : "struct",
1749                        "fields" :[
1750                            {
1751                                "id": 4,
1752                                "name": "ts",
1753                                "required": true,
1754                                "type": "timestamp"
1755                            }
1756                        ]
1757                    }
1758                ],
1759                "current-schema-id" : 1,
1760                "partition-specs": [
1761                    {
1762                        "spec-id": 0,
1763                        "fields": [
1764                            {
1765                                "source-id": 4,
1766                                "field-id": 1000,
1767                                "name": "ts_day",
1768                                "transform": "day"
1769                            }
1770                        ]
1771                    }
1772                ],
1773                "default-spec-id": 0,
1774                "last-partition-id": 1000,
1775                "properties": {
1776                    "commit.retry.num-retries": "1"
1777                },
1778                "metadata-log": [
1779                    {
1780                        "metadata-file": "s3://bucket/.../v1.json",
1781                        "timestamp-ms": 1515100
1782                    }
1783                ],
1784                "refs": {},
1785                "snapshots" : [ {
1786                    "snapshot-id" : 1,
1787                    "timestamp-ms" : 1662532818843,
1788                    "sequence-number" : 0,
1789                    "first-row-id" : 0,
1790                    "added-rows" : 4,
1791                    "key-id" : "key1",
1792                    "summary" : {
1793                        "operation" : "append"
1794                    },
1795                    "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1796                    "schema-id" : 0
1797                    }
1798                ],
1799                "encryption-keys": [
1800                    {
1801                        "key-id": "key1",
1802                        "encrypted-by-id": "KMS",
1803                        "encrypted-key-metadata": "c29tZS1lbmNyeXB0aW9uLWtleQ==",
1804                        "properties": {
1805                            "p1": "v1"
1806                        }
1807                    }
1808                ],
1809                "sort-orders": [
1810                    {
1811                    "order-id": 0,
1812                    "fields": []
1813                    }
1814                ],
1815                "default-sort-order-id": 0
1816            }
1817        "#;
1818
1819        let schema = Schema::builder()
1820            .with_schema_id(1)
1821            .with_fields(vec![Arc::new(NestedField::required(
1822                4,
1823                "ts",
1824                Type::Primitive(PrimitiveType::Timestamp),
1825            ))])
1826            .build()
1827            .unwrap();
1828
1829        let partition_spec = PartitionSpec::builder(schema.clone())
1830            .with_spec_id(0)
1831            .add_unbound_field(UnboundPartitionField {
1832                name: "ts_day".to_string(),
1833                transform: Transform::Day,
1834                source_id: 4,
1835                field_id: Some(1000),
1836            })
1837            .unwrap()
1838            .build()
1839            .unwrap();
1840
1841        let snapshot = Snapshot::builder()
1842            .with_snapshot_id(1)
1843            .with_timestamp_ms(1662532818843)
1844            .with_sequence_number(0)
1845            .with_row_range(0, 4)
1846            .with_encryption_key_id(Some("key1".to_string()))
1847            .with_summary(Summary {
1848                operation: Operation::Append,
1849                additional_properties: HashMap::new(),
1850            })
1851            .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())
1852            .with_schema_id(0)
1853            .build();
1854
1855        let encryption_key = EncryptedKey::builder()
1856            .key_id("key1".to_string())
1857            .encrypted_by_id("KMS".to_string())
1858            .encrypted_key_metadata(
1859                base64::prelude::BASE64_STANDARD
1860                    .decode("c29tZS1lbmNyeXB0aW9uLWtleQ==")
1861                    .unwrap(),
1862            )
1863            .properties(HashMap::from_iter(vec![(
1864                "p1".to_string(),
1865                "v1".to_string(),
1866            )]))
1867            .build();
1868
1869        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1870        let expected = TableMetadata {
1871            format_version: FormatVersion::V3,
1872            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1873            location: "s3://b/wh/data.db/table".to_string(),
1874            last_updated_ms: 1515100955770,
1875            last_column_id: 1,
1876            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1877            current_schema_id: 1,
1878            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1879            default_partition_type,
1880            default_spec: partition_spec.into(),
1881            last_partition_id: 1000,
1882            default_sort_order_id: 0,
1883            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1884            snapshots: HashMap::from_iter(vec![(1, snapshot.into())]),
1885            current_snapshot_id: None,
1886            last_sequence_number: 1,
1887            properties: HashMap::from_iter(vec![(
1888                "commit.retry.num-retries".to_string(),
1889                "1".to_string(),
1890            )]),
1891            snapshot_log: Vec::new(),
1892            metadata_log: vec![MetadataLog {
1893                metadata_file: "s3://bucket/.../v1.json".to_string(),
1894                timestamp_ms: 1515100,
1895            }],
1896            refs: HashMap::new(),
1897            statistics: HashMap::new(),
1898            partition_statistics: HashMap::new(),
1899            encryption_keys: HashMap::from_iter(vec![("key1".to_string(), encryption_key)]),
1900            next_row_id: 5,
1901        };
1902
1903        let expected_json_value = serde_json::to_value(&expected).unwrap();
1904        check_table_metadata_serde(data, expected);
1905
1906        let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1907        assert_eq!(json_value, expected_json_value);
1908    }
1909
1910    #[test]
1911    fn test_table_data_v1() {
1912        let data = r#"
1913        {
1914            "format-version" : 1,
1915            "table-uuid" : "df838b92-0b32-465d-a44e-d39936e538b7",
1916            "location" : "/home/iceberg/warehouse/nyc/taxis",
1917            "last-updated-ms" : 1662532818843,
1918            "last-column-id" : 5,
1919            "schema" : {
1920              "type" : "struct",
1921              "schema-id" : 0,
1922              "fields" : [ {
1923                "id" : 1,
1924                "name" : "vendor_id",
1925                "required" : false,
1926                "type" : "long"
1927              }, {
1928                "id" : 2,
1929                "name" : "trip_id",
1930                "required" : false,
1931                "type" : "long"
1932              }, {
1933                "id" : 3,
1934                "name" : "trip_distance",
1935                "required" : false,
1936                "type" : "float"
1937              }, {
1938                "id" : 4,
1939                "name" : "fare_amount",
1940                "required" : false,
1941                "type" : "double"
1942              }, {
1943                "id" : 5,
1944                "name" : "store_and_fwd_flag",
1945                "required" : false,
1946                "type" : "string"
1947              } ]
1948            },
1949            "partition-spec" : [ {
1950              "name" : "vendor_id",
1951              "transform" : "identity",
1952              "source-id" : 1,
1953              "field-id" : 1000
1954            } ],
1955            "last-partition-id" : 1000,
1956            "default-sort-order-id" : 0,
1957            "sort-orders" : [ {
1958              "order-id" : 0,
1959              "fields" : [ ]
1960            } ],
1961            "properties" : {
1962              "owner" : "root"
1963            },
1964            "current-snapshot-id" : 638933773299822130,
1965            "refs" : {
1966              "main" : {
1967                "snapshot-id" : 638933773299822130,
1968                "type" : "branch"
1969              }
1970            },
1971            "snapshots" : [ {
1972              "snapshot-id" : 638933773299822130,
1973              "timestamp-ms" : 1662532818843,
1974              "sequence-number" : 0,
1975              "summary" : {
1976                "operation" : "append",
1977                "spark.app.id" : "local-1662532784305",
1978                "added-data-files" : "4",
1979                "added-records" : "4",
1980                "added-files-size" : "6001"
1981              },
1982              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1983              "schema-id" : 0
1984            } ],
1985            "snapshot-log" : [ {
1986              "timestamp-ms" : 1662532818843,
1987              "snapshot-id" : 638933773299822130
1988            } ],
1989            "metadata-log" : [ {
1990              "timestamp-ms" : 1662532805245,
1991              "metadata-file" : "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json"
1992            } ]
1993          }
1994        "#;
1995
1996        let schema = Schema::builder()
1997            .with_fields(vec![
1998                Arc::new(NestedField::optional(
1999                    1,
2000                    "vendor_id",
2001                    Type::Primitive(PrimitiveType::Long),
2002                )),
2003                Arc::new(NestedField::optional(
2004                    2,
2005                    "trip_id",
2006                    Type::Primitive(PrimitiveType::Long),
2007                )),
2008                Arc::new(NestedField::optional(
2009                    3,
2010                    "trip_distance",
2011                    Type::Primitive(PrimitiveType::Float),
2012                )),
2013                Arc::new(NestedField::optional(
2014                    4,
2015                    "fare_amount",
2016                    Type::Primitive(PrimitiveType::Double),
2017                )),
2018                Arc::new(NestedField::optional(
2019                    5,
2020                    "store_and_fwd_flag",
2021                    Type::Primitive(PrimitiveType::String),
2022                )),
2023            ])
2024            .build()
2025            .unwrap();
2026
2027        let schema = Arc::new(schema);
2028        let partition_spec = PartitionSpec::builder(schema.clone())
2029            .with_spec_id(0)
2030            .add_partition_field("vendor_id", "vendor_id", Transform::Identity)
2031            .unwrap()
2032            .build()
2033            .unwrap();
2034
2035        let sort_order = SortOrder::builder()
2036            .with_order_id(0)
2037            .build_unbound()
2038            .unwrap();
2039
2040        let snapshot = Snapshot::builder()
2041            .with_snapshot_id(638933773299822130)
2042            .with_timestamp_ms(1662532818843)
2043            .with_sequence_number(0)
2044            .with_schema_id(0)
2045            .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
2046            .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
2047            .build();
2048
2049        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2050        let expected = TableMetadata {
2051            format_version: FormatVersion::V1,
2052            table_uuid: Uuid::parse_str("df838b92-0b32-465d-a44e-d39936e538b7").unwrap(),
2053            location: "/home/iceberg/warehouse/nyc/taxis".to_string(),
2054            last_updated_ms: 1662532818843,
2055            last_column_id: 5,
2056            schemas: HashMap::from_iter(vec![(0, schema)]),
2057            current_schema_id: 0,
2058            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2059            default_partition_type,
2060            default_spec: Arc::new(partition_spec),
2061            last_partition_id: 1000,
2062            default_sort_order_id: 0,
2063            sort_orders: HashMap::from_iter(vec![(0, sort_order.into())]),
2064            snapshots: HashMap::from_iter(vec![(638933773299822130, Arc::new(snapshot))]),
2065            current_snapshot_id: Some(638933773299822130),
2066            last_sequence_number: 0,
2067            properties: HashMap::from_iter(vec![("owner".to_string(), "root".to_string())]),
2068            snapshot_log: vec![SnapshotLog {
2069                snapshot_id: 638933773299822130,
2070                timestamp_ms: 1662532818843,
2071            }],
2072            metadata_log: vec![MetadataLog { metadata_file: "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string(), timestamp_ms: 1662532805245 }],
2073            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference { snapshot_id: 638933773299822130, retention: SnapshotRetention::Branch { min_snapshots_to_keep: None, max_snapshot_age_ms: None, max_ref_age_ms: None } })]),
2074            statistics: HashMap::new(),
2075            partition_statistics: HashMap::new(),
2076            encryption_keys: HashMap::new(),
2077            next_row_id: INITIAL_ROW_ID,
2078        };
2079
2080        check_table_metadata_serde(data, expected);
2081    }
2082
2083    #[test]
2084    fn test_table_data_v2_no_snapshots() {
2085        let data = r#"
2086        {
2087            "format-version" : 2,
2088            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2089            "location": "s3://b/wh/data.db/table",
2090            "last-sequence-number" : 1,
2091            "last-updated-ms": 1515100955770,
2092            "last-column-id": 1,
2093            "schemas": [
2094                {
2095                    "schema-id" : 1,
2096                    "type" : "struct",
2097                    "fields" :[
2098                        {
2099                            "id": 1,
2100                            "name": "struct_name",
2101                            "required": true,
2102                            "type": "fixed[1]"
2103                        }
2104                    ]
2105                }
2106            ],
2107            "current-schema-id" : 1,
2108            "partition-specs": [
2109                {
2110                    "spec-id": 0,
2111                    "fields": []
2112                }
2113            ],
2114            "refs": {},
2115            "default-spec-id": 0,
2116            "last-partition-id": 1000,
2117            "metadata-log": [
2118                {
2119                    "metadata-file": "s3://bucket/.../v1.json",
2120                    "timestamp-ms": 1515100
2121                }
2122            ],
2123            "sort-orders": [
2124                {
2125                "order-id": 0,
2126                "fields": []
2127                }
2128            ],
2129            "default-sort-order-id": 0
2130        }
2131        "#;
2132
2133        let schema = Schema::builder()
2134            .with_schema_id(1)
2135            .with_fields(vec![Arc::new(NestedField::required(
2136                1,
2137                "struct_name",
2138                Type::Primitive(PrimitiveType::Fixed(1)),
2139            ))])
2140            .build()
2141            .unwrap();
2142
2143        let partition_spec = PartitionSpec::builder(schema.clone())
2144            .with_spec_id(0)
2145            .build()
2146            .unwrap();
2147
2148        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2149        let expected = TableMetadata {
2150            format_version: FormatVersion::V2,
2151            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
2152            location: "s3://b/wh/data.db/table".to_string(),
2153            last_updated_ms: 1515100955770,
2154            last_column_id: 1,
2155            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
2156            current_schema_id: 1,
2157            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2158            default_partition_type,
2159            default_spec: partition_spec.into(),
2160            last_partition_id: 1000,
2161            default_sort_order_id: 0,
2162            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2163            snapshots: HashMap::default(),
2164            current_snapshot_id: None,
2165            last_sequence_number: 1,
2166            properties: HashMap::new(),
2167            snapshot_log: Vec::new(),
2168            metadata_log: vec![MetadataLog {
2169                metadata_file: "s3://bucket/.../v1.json".to_string(),
2170                timestamp_ms: 1515100,
2171            }],
2172            refs: HashMap::new(),
2173            statistics: HashMap::new(),
2174            partition_statistics: HashMap::new(),
2175            encryption_keys: HashMap::new(),
2176            next_row_id: INITIAL_ROW_ID,
2177        };
2178
2179        let expected_json_value = serde_json::to_value(&expected).unwrap();
2180        check_table_metadata_serde(data, expected);
2181
2182        let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
2183        assert_eq!(json_value, expected_json_value);
2184    }
2185
2186    #[test]
2187    fn test_current_snapshot_id_must_match_main_branch() {
2188        let data = r#"
2189        {
2190            "format-version" : 2,
2191            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2192            "location": "s3://b/wh/data.db/table",
2193            "last-sequence-number" : 1,
2194            "last-updated-ms": 1515100955770,
2195            "last-column-id": 1,
2196            "schemas": [
2197                {
2198                    "schema-id" : 1,
2199                    "type" : "struct",
2200                    "fields" :[
2201                        {
2202                            "id": 1,
2203                            "name": "struct_name",
2204                            "required": true,
2205                            "type": "fixed[1]"
2206                        },
2207                        {
2208                            "id": 4,
2209                            "name": "ts",
2210                            "required": true,
2211                            "type": "timestamp"
2212                        }
2213                    ]
2214                }
2215            ],
2216            "current-schema-id" : 1,
2217            "partition-specs": [
2218                {
2219                    "spec-id": 0,
2220                    "fields": [
2221                        {
2222                            "source-id": 4,
2223                            "field-id": 1000,
2224                            "name": "ts_day",
2225                            "transform": "day"
2226                        }
2227                    ]
2228                }
2229            ],
2230            "default-spec-id": 0,
2231            "last-partition-id": 1000,
2232            "properties": {
2233                "commit.retry.num-retries": "1"
2234            },
2235            "metadata-log": [
2236                {
2237                    "metadata-file": "s3://bucket/.../v1.json",
2238                    "timestamp-ms": 1515100
2239                }
2240            ],
2241            "sort-orders": [
2242                {
2243                "order-id": 0,
2244                "fields": []
2245                }
2246            ],
2247            "default-sort-order-id": 0,
2248            "current-snapshot-id" : 1,
2249            "refs" : {
2250              "main" : {
2251                "snapshot-id" : 2,
2252                "type" : "branch"
2253              }
2254            },
2255            "snapshots" : [ {
2256              "snapshot-id" : 1,
2257              "timestamp-ms" : 1662532818843,
2258              "sequence-number" : 0,
2259              "summary" : {
2260                "operation" : "append",
2261                "spark.app.id" : "local-1662532784305",
2262                "added-data-files" : "4",
2263                "added-records" : "4",
2264                "added-files-size" : "6001"
2265              },
2266              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2267              "schema-id" : 0
2268            },
2269            {
2270              "snapshot-id" : 2,
2271              "timestamp-ms" : 1662532818844,
2272              "sequence-number" : 0,
2273              "summary" : {
2274                "operation" : "append",
2275                "spark.app.id" : "local-1662532784305",
2276                "added-data-files" : "4",
2277                "added-records" : "4",
2278                "added-files-size" : "6001"
2279              },
2280              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2281              "schema-id" : 0
2282            } ]
2283        }
2284    "#;
2285
2286        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2287        assert!(
2288            err.to_string()
2289                .contains("Current snapshot id does not match main branch")
2290        );
2291    }
2292
2293    #[test]
2294    fn test_main_without_current() {
2295        let data = r#"
2296        {
2297            "format-version" : 2,
2298            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2299            "location": "s3://b/wh/data.db/table",
2300            "last-sequence-number" : 1,
2301            "last-updated-ms": 1515100955770,
2302            "last-column-id": 1,
2303            "schemas": [
2304                {
2305                    "schema-id" : 1,
2306                    "type" : "struct",
2307                    "fields" :[
2308                        {
2309                            "id": 1,
2310                            "name": "struct_name",
2311                            "required": true,
2312                            "type": "fixed[1]"
2313                        },
2314                        {
2315                            "id": 4,
2316                            "name": "ts",
2317                            "required": true,
2318                            "type": "timestamp"
2319                        }
2320                    ]
2321                }
2322            ],
2323            "current-schema-id" : 1,
2324            "partition-specs": [
2325                {
2326                    "spec-id": 0,
2327                    "fields": [
2328                        {
2329                            "source-id": 4,
2330                            "field-id": 1000,
2331                            "name": "ts_day",
2332                            "transform": "day"
2333                        }
2334                    ]
2335                }
2336            ],
2337            "default-spec-id": 0,
2338            "last-partition-id": 1000,
2339            "properties": {
2340                "commit.retry.num-retries": "1"
2341            },
2342            "metadata-log": [
2343                {
2344                    "metadata-file": "s3://bucket/.../v1.json",
2345                    "timestamp-ms": 1515100
2346                }
2347            ],
2348            "sort-orders": [
2349                {
2350                "order-id": 0,
2351                "fields": []
2352                }
2353            ],
2354            "default-sort-order-id": 0,
2355            "refs" : {
2356              "main" : {
2357                "snapshot-id" : 1,
2358                "type" : "branch"
2359              }
2360            },
2361            "snapshots" : [ {
2362              "snapshot-id" : 1,
2363              "timestamp-ms" : 1662532818843,
2364              "sequence-number" : 0,
2365              "summary" : {
2366                "operation" : "append",
2367                "spark.app.id" : "local-1662532784305",
2368                "added-data-files" : "4",
2369                "added-records" : "4",
2370                "added-files-size" : "6001"
2371              },
2372              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2373              "schema-id" : 0
2374            } ]
2375        }
2376    "#;
2377
2378        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2379        assert!(
2380            err.to_string()
2381                .contains("Current snapshot is not set, but main branch exists")
2382        );
2383    }
2384
2385    #[test]
2386    fn test_branch_snapshot_missing() {
2387        let data = r#"
2388        {
2389            "format-version" : 2,
2390            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2391            "location": "s3://b/wh/data.db/table",
2392            "last-sequence-number" : 1,
2393            "last-updated-ms": 1515100955770,
2394            "last-column-id": 1,
2395            "schemas": [
2396                {
2397                    "schema-id" : 1,
2398                    "type" : "struct",
2399                    "fields" :[
2400                        {
2401                            "id": 1,
2402                            "name": "struct_name",
2403                            "required": true,
2404                            "type": "fixed[1]"
2405                        },
2406                        {
2407                            "id": 4,
2408                            "name": "ts",
2409                            "required": true,
2410                            "type": "timestamp"
2411                        }
2412                    ]
2413                }
2414            ],
2415            "current-schema-id" : 1,
2416            "partition-specs": [
2417                {
2418                    "spec-id": 0,
2419                    "fields": [
2420                        {
2421                            "source-id": 4,
2422                            "field-id": 1000,
2423                            "name": "ts_day",
2424                            "transform": "day"
2425                        }
2426                    ]
2427                }
2428            ],
2429            "default-spec-id": 0,
2430            "last-partition-id": 1000,
2431            "properties": {
2432                "commit.retry.num-retries": "1"
2433            },
2434            "metadata-log": [
2435                {
2436                    "metadata-file": "s3://bucket/.../v1.json",
2437                    "timestamp-ms": 1515100
2438                }
2439            ],
2440            "sort-orders": [
2441                {
2442                "order-id": 0,
2443                "fields": []
2444                }
2445            ],
2446            "default-sort-order-id": 0,
2447            "refs" : {
2448              "main" : {
2449                "snapshot-id" : 1,
2450                "type" : "branch"
2451              },
2452              "foo" : {
2453                "snapshot-id" : 2,
2454                "type" : "branch"
2455              }
2456            },
2457            "snapshots" : [ {
2458              "snapshot-id" : 1,
2459              "timestamp-ms" : 1662532818843,
2460              "sequence-number" : 0,
2461              "summary" : {
2462                "operation" : "append",
2463                "spark.app.id" : "local-1662532784305",
2464                "added-data-files" : "4",
2465                "added-records" : "4",
2466                "added-files-size" : "6001"
2467              },
2468              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2469              "schema-id" : 0
2470            } ]
2471        }
2472    "#;
2473
2474        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2475        assert!(
2476            err.to_string().contains(
2477                "Snapshot for reference foo does not exist in the existing snapshots list"
2478            )
2479        );
2480    }
2481
2482    #[test]
2483    fn test_v2_wrong_max_snapshot_sequence_number() {
2484        let data = r#"
2485        {
2486            "format-version": 2,
2487            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2488            "location": "s3://bucket/test/location",
2489            "last-sequence-number": 1,
2490            "last-updated-ms": 1602638573590,
2491            "last-column-id": 3,
2492            "current-schema-id": 0,
2493            "schemas": [
2494                {
2495                    "type": "struct",
2496                    "schema-id": 0,
2497                    "fields": [
2498                        {
2499                            "id": 1,
2500                            "name": "x",
2501                            "required": true,
2502                            "type": "long"
2503                        }
2504                    ]
2505                }
2506            ],
2507            "default-spec-id": 0,
2508            "partition-specs": [
2509                {
2510                    "spec-id": 0,
2511                    "fields": []
2512                }
2513            ],
2514            "last-partition-id": 1000,
2515            "default-sort-order-id": 0,
2516            "sort-orders": [
2517                {
2518                    "order-id": 0,
2519                    "fields": []
2520                }
2521            ],
2522            "properties": {},
2523            "current-snapshot-id": 3055729675574597004,
2524            "snapshots": [
2525                {
2526                    "snapshot-id": 3055729675574597004,
2527                    "timestamp-ms": 1555100955770,
2528                    "sequence-number": 4,
2529                    "summary": {
2530                        "operation": "append"
2531                    },
2532                    "manifest-list": "s3://a/b/2.avro",
2533                    "schema-id": 0
2534                }
2535            ],
2536            "statistics": [],
2537            "snapshot-log": [],
2538            "metadata-log": []
2539        }
2540    "#;
2541
2542        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2543        assert!(err.to_string().contains(
2544            "Invalid snapshot with id 3055729675574597004 and sequence number 4 greater than last sequence number 1"
2545        ));
2546
2547        // Change max sequence number to 4 - should work
2548        let data = data.replace(
2549            r#""last-sequence-number": 1,"#,
2550            r#""last-sequence-number": 4,"#,
2551        );
2552        let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2553        assert_eq!(metadata.last_sequence_number, 4);
2554
2555        // Change max sequence number to 5 - should work
2556        let data = data.replace(
2557            r#""last-sequence-number": 4,"#,
2558            r#""last-sequence-number": 5,"#,
2559        );
2560        let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2561        assert_eq!(metadata.last_sequence_number, 5);
2562    }
2563
2564    #[test]
2565    fn test_statistic_files() {
2566        let data = r#"
2567        {
2568            "format-version": 2,
2569            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2570            "location": "s3://bucket/test/location",
2571            "last-sequence-number": 34,
2572            "last-updated-ms": 1602638573590,
2573            "last-column-id": 3,
2574            "current-schema-id": 0,
2575            "schemas": [
2576                {
2577                    "type": "struct",
2578                    "schema-id": 0,
2579                    "fields": [
2580                        {
2581                            "id": 1,
2582                            "name": "x",
2583                            "required": true,
2584                            "type": "long"
2585                        }
2586                    ]
2587                }
2588            ],
2589            "default-spec-id": 0,
2590            "partition-specs": [
2591                {
2592                    "spec-id": 0,
2593                    "fields": []
2594                }
2595            ],
2596            "last-partition-id": 1000,
2597            "default-sort-order-id": 0,
2598            "sort-orders": [
2599                {
2600                    "order-id": 0,
2601                    "fields": []
2602                }
2603            ],
2604            "properties": {},
2605            "current-snapshot-id": 3055729675574597004,
2606            "snapshots": [
2607                {
2608                    "snapshot-id": 3055729675574597004,
2609                    "timestamp-ms": 1555100955770,
2610                    "sequence-number": 1,
2611                    "summary": {
2612                        "operation": "append"
2613                    },
2614                    "manifest-list": "s3://a/b/2.avro",
2615                    "schema-id": 0
2616                }
2617            ],
2618            "statistics": [
2619                {
2620                    "snapshot-id": 3055729675574597004,
2621                    "statistics-path": "s3://a/b/stats.puffin",
2622                    "file-size-in-bytes": 413,
2623                    "file-footer-size-in-bytes": 42,
2624                    "blob-metadata": [
2625                        {
2626                            "type": "ndv",
2627                            "snapshot-id": 3055729675574597004,
2628                            "sequence-number": 1,
2629                            "fields": [
2630                                1
2631                            ]
2632                        }
2633                    ]
2634                }
2635            ],
2636            "snapshot-log": [],
2637            "metadata-log": []
2638        }
2639    "#;
2640
2641        let schema = Schema::builder()
2642            .with_schema_id(0)
2643            .with_fields(vec![Arc::new(NestedField::required(
2644                1,
2645                "x",
2646                Type::Primitive(PrimitiveType::Long),
2647            ))])
2648            .build()
2649            .unwrap();
2650        let partition_spec = PartitionSpec::builder(schema.clone())
2651            .with_spec_id(0)
2652            .build()
2653            .unwrap();
2654        let snapshot = Snapshot::builder()
2655            .with_snapshot_id(3055729675574597004)
2656            .with_timestamp_ms(1555100955770)
2657            .with_sequence_number(1)
2658            .with_manifest_list("s3://a/b/2.avro")
2659            .with_schema_id(0)
2660            .with_summary(Summary {
2661                operation: Operation::Append,
2662                additional_properties: HashMap::new(),
2663            })
2664            .build();
2665
2666        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2667        let expected = TableMetadata {
2668            format_version: FormatVersion::V2,
2669            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2670            location: "s3://bucket/test/location".to_string(),
2671            last_updated_ms: 1602638573590,
2672            last_column_id: 3,
2673            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2674            current_schema_id: 0,
2675            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2676            default_partition_type,
2677            default_spec: Arc::new(partition_spec),
2678            last_partition_id: 1000,
2679            default_sort_order_id: 0,
2680            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2681            snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2682            current_snapshot_id: Some(3055729675574597004),
2683            last_sequence_number: 34,
2684            properties: HashMap::new(),
2685            snapshot_log: Vec::new(),
2686            metadata_log: Vec::new(),
2687            statistics: HashMap::from_iter(vec![(3055729675574597004, StatisticsFile {
2688                snapshot_id: 3055729675574597004,
2689                statistics_path: "s3://a/b/stats.puffin".to_string(),
2690                file_size_in_bytes: 413,
2691                file_footer_size_in_bytes: 42,
2692                key_metadata: None,
2693                blob_metadata: vec![BlobMetadata {
2694                    snapshot_id: 3055729675574597004,
2695                    sequence_number: 1,
2696                    fields: vec![1],
2697                    r#type: "ndv".to_string(),
2698                    properties: HashMap::new(),
2699                }],
2700            })]),
2701            partition_statistics: HashMap::new(),
2702            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2703                snapshot_id: 3055729675574597004,
2704                retention: SnapshotRetention::Branch {
2705                    min_snapshots_to_keep: None,
2706                    max_snapshot_age_ms: None,
2707                    max_ref_age_ms: None,
2708                },
2709            })]),
2710            encryption_keys: HashMap::new(),
2711            next_row_id: INITIAL_ROW_ID,
2712        };
2713
2714        check_table_metadata_serde(data, expected);
2715    }
2716
2717    #[test]
2718    fn test_partition_statistics_file() {
2719        let data = r#"
2720        {
2721            "format-version": 2,
2722            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2723            "location": "s3://bucket/test/location",
2724            "last-sequence-number": 34,
2725            "last-updated-ms": 1602638573590,
2726            "last-column-id": 3,
2727            "current-schema-id": 0,
2728            "schemas": [
2729                {
2730                    "type": "struct",
2731                    "schema-id": 0,
2732                    "fields": [
2733                        {
2734                            "id": 1,
2735                            "name": "x",
2736                            "required": true,
2737                            "type": "long"
2738                        }
2739                    ]
2740                }
2741            ],
2742            "default-spec-id": 0,
2743            "partition-specs": [
2744                {
2745                    "spec-id": 0,
2746                    "fields": []
2747                }
2748            ],
2749            "last-partition-id": 1000,
2750            "default-sort-order-id": 0,
2751            "sort-orders": [
2752                {
2753                    "order-id": 0,
2754                    "fields": []
2755                }
2756            ],
2757            "properties": {},
2758            "current-snapshot-id": 3055729675574597004,
2759            "snapshots": [
2760                {
2761                    "snapshot-id": 3055729675574597004,
2762                    "timestamp-ms": 1555100955770,
2763                    "sequence-number": 1,
2764                    "summary": {
2765                        "operation": "append"
2766                    },
2767                    "manifest-list": "s3://a/b/2.avro",
2768                    "schema-id": 0
2769                }
2770            ],
2771            "partition-statistics": [
2772                {
2773                    "snapshot-id": 3055729675574597004,
2774                    "statistics-path": "s3://a/b/partition-stats.parquet",
2775                    "file-size-in-bytes": 43
2776                }
2777            ],
2778            "snapshot-log": [],
2779            "metadata-log": []
2780        }
2781        "#;
2782
2783        let schema = Schema::builder()
2784            .with_schema_id(0)
2785            .with_fields(vec![Arc::new(NestedField::required(
2786                1,
2787                "x",
2788                Type::Primitive(PrimitiveType::Long),
2789            ))])
2790            .build()
2791            .unwrap();
2792        let partition_spec = PartitionSpec::builder(schema.clone())
2793            .with_spec_id(0)
2794            .build()
2795            .unwrap();
2796        let snapshot = Snapshot::builder()
2797            .with_snapshot_id(3055729675574597004)
2798            .with_timestamp_ms(1555100955770)
2799            .with_sequence_number(1)
2800            .with_manifest_list("s3://a/b/2.avro")
2801            .with_schema_id(0)
2802            .with_summary(Summary {
2803                operation: Operation::Append,
2804                additional_properties: HashMap::new(),
2805            })
2806            .build();
2807
2808        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2809        let expected = TableMetadata {
2810            format_version: FormatVersion::V2,
2811            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2812            location: "s3://bucket/test/location".to_string(),
2813            last_updated_ms: 1602638573590,
2814            last_column_id: 3,
2815            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2816            current_schema_id: 0,
2817            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2818            default_spec: Arc::new(partition_spec),
2819            default_partition_type,
2820            last_partition_id: 1000,
2821            default_sort_order_id: 0,
2822            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2823            snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2824            current_snapshot_id: Some(3055729675574597004),
2825            last_sequence_number: 34,
2826            properties: HashMap::new(),
2827            snapshot_log: Vec::new(),
2828            metadata_log: Vec::new(),
2829            statistics: HashMap::new(),
2830            partition_statistics: HashMap::from_iter(vec![(
2831                3055729675574597004,
2832                PartitionStatisticsFile {
2833                    snapshot_id: 3055729675574597004,
2834                    statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2835                    file_size_in_bytes: 43,
2836                },
2837            )]),
2838            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2839                snapshot_id: 3055729675574597004,
2840                retention: SnapshotRetention::Branch {
2841                    min_snapshots_to_keep: None,
2842                    max_snapshot_age_ms: None,
2843                    max_ref_age_ms: None,
2844                },
2845            })]),
2846            encryption_keys: HashMap::new(),
2847            next_row_id: INITIAL_ROW_ID,
2848        };
2849
2850        check_table_metadata_serde(data, expected);
2851    }
2852
2853    #[test]
2854    fn test_invalid_table_uuid() -> Result<()> {
2855        let data = r#"
2856            {
2857                "format-version" : 2,
2858                "table-uuid": "xxxx"
2859            }
2860        "#;
2861        assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2862        Ok(())
2863    }
2864
2865    #[test]
2866    fn test_deserialize_table_data_v2_invalid_format_version() -> Result<()> {
2867        let data = r#"
2868            {
2869                "format-version" : 1
2870            }
2871        "#;
2872        assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2873        Ok(())
2874    }
2875
2876    #[test]
2877    fn test_table_metadata_v3_valid_minimal() {
2878        let metadata_str =
2879            fs::read_to_string("testdata/table_metadata/TableMetadataV3ValidMinimal.json").unwrap();
2880
2881        let table_metadata = serde_json::from_str::<TableMetadata>(&metadata_str).unwrap();
2882        assert_eq!(table_metadata.format_version, FormatVersion::V3);
2883
2884        let schema = Schema::builder()
2885            .with_schema_id(0)
2886            .with_fields(vec![
2887                Arc::new(
2888                    NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long))
2889                        .with_initial_default(Literal::Primitive(PrimitiveLiteral::Long(1)))
2890                        .with_write_default(Literal::Primitive(PrimitiveLiteral::Long(1))),
2891                ),
2892                Arc::new(
2893                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2894                        .with_doc("comment"),
2895                ),
2896                Arc::new(NestedField::required(
2897                    3,
2898                    "z",
2899                    Type::Primitive(PrimitiveType::Long),
2900                )),
2901            ])
2902            .build()
2903            .unwrap();
2904
2905        let partition_spec = PartitionSpec::builder(schema.clone())
2906            .with_spec_id(0)
2907            .add_unbound_field(UnboundPartitionField {
2908                name: "x".to_string(),
2909                transform: Transform::Identity,
2910                source_id: 1,
2911                field_id: Some(1000),
2912            })
2913            .unwrap()
2914            .build()
2915            .unwrap();
2916
2917        let sort_order = SortOrder::builder()
2918            .with_order_id(3)
2919            .with_sort_field(SortField {
2920                source_id: 2,
2921                transform: Transform::Identity,
2922                direction: SortDirection::Ascending,
2923                null_order: NullOrder::First,
2924            })
2925            .with_sort_field(SortField {
2926                source_id: 3,
2927                transform: Transform::Bucket(4),
2928                direction: SortDirection::Descending,
2929                null_order: NullOrder::Last,
2930            })
2931            .build_unbound()
2932            .unwrap();
2933
2934        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2935        let expected = TableMetadata {
2936            format_version: FormatVersion::V3,
2937            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2938            location: "s3://bucket/test/location".to_string(),
2939            last_updated_ms: 1602638573590,
2940            last_column_id: 3,
2941            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2942            current_schema_id: 0,
2943            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2944            default_spec: Arc::new(partition_spec),
2945            default_partition_type,
2946            last_partition_id: 1000,
2947            default_sort_order_id: 3,
2948            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
2949            snapshots: HashMap::default(),
2950            current_snapshot_id: None,
2951            last_sequence_number: 34,
2952            properties: HashMap::new(),
2953            snapshot_log: Vec::new(),
2954            metadata_log: Vec::new(),
2955            refs: HashMap::new(),
2956            statistics: HashMap::new(),
2957            partition_statistics: HashMap::new(),
2958            encryption_keys: HashMap::new(),
2959            next_row_id: 0, // V3 specific field from the JSON
2960        };
2961
2962        check_table_metadata_serde(&metadata_str, expected);
2963    }
2964
2965    #[test]
2966    fn test_table_metadata_v2_file_valid() {
2967        let metadata =
2968            fs::read_to_string("testdata/table_metadata/TableMetadataV2Valid.json").unwrap();
2969
2970        let schema1 = Schema::builder()
2971            .with_schema_id(0)
2972            .with_fields(vec![Arc::new(NestedField::required(
2973                1,
2974                "x",
2975                Type::Primitive(PrimitiveType::Long),
2976            ))])
2977            .build()
2978            .unwrap();
2979
2980        let schema2 = Schema::builder()
2981            .with_schema_id(1)
2982            .with_fields(vec![
2983                Arc::new(NestedField::required(
2984                    1,
2985                    "x",
2986                    Type::Primitive(PrimitiveType::Long),
2987                )),
2988                Arc::new(
2989                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2990                        .with_doc("comment"),
2991                ),
2992                Arc::new(NestedField::required(
2993                    3,
2994                    "z",
2995                    Type::Primitive(PrimitiveType::Long),
2996                )),
2997            ])
2998            .with_identifier_field_ids(vec![1, 2])
2999            .build()
3000            .unwrap();
3001
3002        let partition_spec = PartitionSpec::builder(schema2.clone())
3003            .with_spec_id(0)
3004            .add_unbound_field(UnboundPartitionField {
3005                name: "x".to_string(),
3006                transform: Transform::Identity,
3007                source_id: 1,
3008                field_id: Some(1000),
3009            })
3010            .unwrap()
3011            .build()
3012            .unwrap();
3013
3014        let sort_order = SortOrder::builder()
3015            .with_order_id(3)
3016            .with_sort_field(SortField {
3017                source_id: 2,
3018                transform: Transform::Identity,
3019                direction: SortDirection::Ascending,
3020                null_order: NullOrder::First,
3021            })
3022            .with_sort_field(SortField {
3023                source_id: 3,
3024                transform: Transform::Bucket(4),
3025                direction: SortDirection::Descending,
3026                null_order: NullOrder::Last,
3027            })
3028            .build_unbound()
3029            .unwrap();
3030
3031        let snapshot1 = Snapshot::builder()
3032            .with_snapshot_id(3051729675574597004)
3033            .with_timestamp_ms(1515100955770)
3034            .with_sequence_number(0)
3035            .with_manifest_list("s3://a/b/1.avro")
3036            .with_summary(Summary {
3037                operation: Operation::Append,
3038                additional_properties: HashMap::new(),
3039            })
3040            .build();
3041
3042        let snapshot2 = Snapshot::builder()
3043            .with_snapshot_id(3055729675574597004)
3044            .with_parent_snapshot_id(Some(3051729675574597004))
3045            .with_timestamp_ms(1555100955770)
3046            .with_sequence_number(1)
3047            .with_schema_id(1)
3048            .with_manifest_list("s3://a/b/2.avro")
3049            .with_summary(Summary {
3050                operation: Operation::Append,
3051                additional_properties: HashMap::new(),
3052            })
3053            .build();
3054
3055        let default_partition_type = partition_spec.partition_type(&schema2).unwrap();
3056        let expected = TableMetadata {
3057            format_version: FormatVersion::V2,
3058            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3059            location: "s3://bucket/test/location".to_string(),
3060            last_updated_ms: 1602638573590,
3061            last_column_id: 3,
3062            schemas: HashMap::from_iter(vec![(0, Arc::new(schema1)), (1, Arc::new(schema2))]),
3063            current_schema_id: 1,
3064            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3065            default_spec: Arc::new(partition_spec),
3066            default_partition_type,
3067            last_partition_id: 1000,
3068            default_sort_order_id: 3,
3069            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3070            snapshots: HashMap::from_iter(vec![
3071                (3051729675574597004, Arc::new(snapshot1)),
3072                (3055729675574597004, Arc::new(snapshot2)),
3073            ]),
3074            current_snapshot_id: Some(3055729675574597004),
3075            last_sequence_number: 34,
3076            properties: HashMap::new(),
3077            snapshot_log: vec![
3078                SnapshotLog {
3079                    snapshot_id: 3051729675574597004,
3080                    timestamp_ms: 1515100955770,
3081                },
3082                SnapshotLog {
3083                    snapshot_id: 3055729675574597004,
3084                    timestamp_ms: 1555100955770,
3085                },
3086            ],
3087            metadata_log: Vec::new(),
3088            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
3089                snapshot_id: 3055729675574597004,
3090                retention: SnapshotRetention::Branch {
3091                    min_snapshots_to_keep: None,
3092                    max_snapshot_age_ms: None,
3093                    max_ref_age_ms: None,
3094                },
3095            })]),
3096            statistics: HashMap::new(),
3097            partition_statistics: HashMap::new(),
3098            encryption_keys: HashMap::new(),
3099            next_row_id: INITIAL_ROW_ID,
3100        };
3101
3102        check_table_metadata_serde(&metadata, expected);
3103    }
3104
3105    #[test]
3106    fn test_table_metadata_v2_file_valid_minimal() {
3107        let metadata =
3108            fs::read_to_string("testdata/table_metadata/TableMetadataV2ValidMinimal.json").unwrap();
3109
3110        let schema = Schema::builder()
3111            .with_schema_id(0)
3112            .with_fields(vec![
3113                Arc::new(NestedField::required(
3114                    1,
3115                    "x",
3116                    Type::Primitive(PrimitiveType::Long),
3117                )),
3118                Arc::new(
3119                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3120                        .with_doc("comment"),
3121                ),
3122                Arc::new(NestedField::required(
3123                    3,
3124                    "z",
3125                    Type::Primitive(PrimitiveType::Long),
3126                )),
3127            ])
3128            .build()
3129            .unwrap();
3130
3131        let partition_spec = PartitionSpec::builder(schema.clone())
3132            .with_spec_id(0)
3133            .add_unbound_field(UnboundPartitionField {
3134                name: "x".to_string(),
3135                transform: Transform::Identity,
3136                source_id: 1,
3137                field_id: Some(1000),
3138            })
3139            .unwrap()
3140            .build()
3141            .unwrap();
3142
3143        let sort_order = SortOrder::builder()
3144            .with_order_id(3)
3145            .with_sort_field(SortField {
3146                source_id: 2,
3147                transform: Transform::Identity,
3148                direction: SortDirection::Ascending,
3149                null_order: NullOrder::First,
3150            })
3151            .with_sort_field(SortField {
3152                source_id: 3,
3153                transform: Transform::Bucket(4),
3154                direction: SortDirection::Descending,
3155                null_order: NullOrder::Last,
3156            })
3157            .build_unbound()
3158            .unwrap();
3159
3160        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3161        let expected = TableMetadata {
3162            format_version: FormatVersion::V2,
3163            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3164            location: "s3://bucket/test/location".to_string(),
3165            last_updated_ms: 1602638573590,
3166            last_column_id: 3,
3167            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3168            current_schema_id: 0,
3169            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3170            default_partition_type,
3171            default_spec: Arc::new(partition_spec),
3172            last_partition_id: 1000,
3173            default_sort_order_id: 3,
3174            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3175            snapshots: HashMap::default(),
3176            current_snapshot_id: None,
3177            last_sequence_number: 34,
3178            properties: HashMap::new(),
3179            snapshot_log: vec![],
3180            metadata_log: Vec::new(),
3181            refs: HashMap::new(),
3182            statistics: HashMap::new(),
3183            partition_statistics: HashMap::new(),
3184            encryption_keys: HashMap::new(),
3185            next_row_id: INITIAL_ROW_ID,
3186        };
3187
3188        check_table_metadata_serde(&metadata, expected);
3189    }
3190
3191    #[test]
3192    fn test_table_metadata_v1_file_valid() {
3193        let metadata =
3194            fs::read_to_string("testdata/table_metadata/TableMetadataV1Valid.json").unwrap();
3195
3196        let schema = Schema::builder()
3197            .with_schema_id(0)
3198            .with_fields(vec![
3199                Arc::new(NestedField::required(
3200                    1,
3201                    "x",
3202                    Type::Primitive(PrimitiveType::Long),
3203                )),
3204                Arc::new(
3205                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3206                        .with_doc("comment"),
3207                ),
3208                Arc::new(NestedField::required(
3209                    3,
3210                    "z",
3211                    Type::Primitive(PrimitiveType::Long),
3212                )),
3213            ])
3214            .build()
3215            .unwrap();
3216
3217        let partition_spec = PartitionSpec::builder(schema.clone())
3218            .with_spec_id(0)
3219            .add_unbound_field(UnboundPartitionField {
3220                name: "x".to_string(),
3221                transform: Transform::Identity,
3222                source_id: 1,
3223                field_id: Some(1000),
3224            })
3225            .unwrap()
3226            .build()
3227            .unwrap();
3228
3229        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3230        let expected = TableMetadata {
3231            format_version: FormatVersion::V1,
3232            table_uuid: Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap(),
3233            location: "s3://bucket/test/location".to_string(),
3234            last_updated_ms: 1602638573874,
3235            last_column_id: 3,
3236            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3237            current_schema_id: 0,
3238            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3239            default_spec: Arc::new(partition_spec),
3240            default_partition_type,
3241            last_partition_id: 0,
3242            default_sort_order_id: 0,
3243            // Sort order is added during deserialization for V2 compatibility
3244            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
3245            snapshots: HashMap::new(),
3246            current_snapshot_id: None,
3247            last_sequence_number: 0,
3248            properties: HashMap::new(),
3249            snapshot_log: vec![],
3250            metadata_log: Vec::new(),
3251            refs: HashMap::new(),
3252            statistics: HashMap::new(),
3253            partition_statistics: HashMap::new(),
3254            encryption_keys: HashMap::new(),
3255            next_row_id: INITIAL_ROW_ID,
3256        };
3257
3258        check_table_metadata_serde(&metadata, expected);
3259    }
3260
3261    #[test]
3262    fn test_table_metadata_v1_compat() {
3263        let metadata =
3264            fs::read_to_string("testdata/table_metadata/TableMetadataV1Compat.json").unwrap();
3265
3266        // Deserialize the JSON to verify it works
3267        let desered_type: TableMetadata = serde_json::from_str(&metadata)
3268            .expect("Failed to deserialize TableMetadataV1Compat.json");
3269
3270        // Verify some key fields match
3271        assert_eq!(desered_type.format_version(), FormatVersion::V1);
3272        assert_eq!(
3273            desered_type.uuid(),
3274            Uuid::parse_str("3276010d-7b1d-488c-98d8-9025fc4fde6b").unwrap()
3275        );
3276        assert_eq!(
3277            desered_type.location(),
3278            "s3://bucket/warehouse/iceberg/glue.db/table_name"
3279        );
3280        assert_eq!(desered_type.last_updated_ms(), 1727773114005);
3281        assert_eq!(desered_type.current_schema_id(), 0);
3282    }
3283
3284    #[test]
3285    fn test_table_metadata_v1_schemas_without_current_id() {
3286        let metadata = fs::read_to_string(
3287            "testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json",
3288        )
3289        .unwrap();
3290
3291        // Deserialize the JSON - this should succeed by using the 'schema' field instead of 'schemas'
3292        let desered_type: TableMetadata = serde_json::from_str(&metadata)
3293            .expect("Failed to deserialize TableMetadataV1SchemasWithoutCurrentId.json");
3294
3295        // Verify it used the 'schema' field
3296        assert_eq!(desered_type.format_version(), FormatVersion::V1);
3297        assert_eq!(
3298            desered_type.uuid(),
3299            Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3300        );
3301
3302        // Get the schema and verify it has the expected fields
3303        let schema = desered_type.current_schema();
3304        assert_eq!(schema.as_struct().fields().len(), 3);
3305        assert_eq!(schema.as_struct().fields()[0].name, "x");
3306        assert_eq!(schema.as_struct().fields()[1].name, "y");
3307        assert_eq!(schema.as_struct().fields()[2].name, "z");
3308    }
3309
3310    #[test]
3311    fn test_table_metadata_v1_no_valid_schema() {
3312        let metadata =
3313            fs::read_to_string("testdata/table_metadata/TableMetadataV1NoValidSchema.json")
3314                .unwrap();
3315
3316        // Deserialize the JSON - this should fail because neither schemas + current_schema_id nor schema is valid
3317        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3318
3319        assert!(desered.is_err());
3320        let error_message = desered.unwrap_err().to_string();
3321        assert!(
3322            error_message.contains("No valid schema configuration found"),
3323            "Expected error about no valid schema configuration, got: {error_message}"
3324        );
3325    }
3326
3327    #[test]
3328    fn test_table_metadata_v1_partition_specs_without_default_id() {
3329        let metadata = fs::read_to_string(
3330            "testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json",
3331        )
3332        .unwrap();
3333
3334        // Deserialize the JSON - this should succeed by inferring default_spec_id as the max spec ID
3335        let desered_type: TableMetadata = serde_json::from_str(&metadata)
3336            .expect("Failed to deserialize TableMetadataV1PartitionSpecsWithoutDefaultId.json");
3337
3338        // Verify basic metadata
3339        assert_eq!(desered_type.format_version(), FormatVersion::V1);
3340        assert_eq!(
3341            desered_type.uuid(),
3342            Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3343        );
3344
3345        // Verify partition specs
3346        assert_eq!(desered_type.default_partition_spec_id(), 2); // Should pick the largest spec ID (2)
3347        assert_eq!(desered_type.partition_specs.len(), 2);
3348
3349        // Verify the default spec has the expected fields
3350        let default_spec = &desered_type.default_spec;
3351        assert_eq!(default_spec.spec_id(), 2);
3352        assert_eq!(default_spec.fields().len(), 1);
3353        assert_eq!(default_spec.fields()[0].name, "y");
3354        assert_eq!(default_spec.fields()[0].transform, Transform::Identity);
3355        assert_eq!(default_spec.fields()[0].source_id, 2);
3356    }
3357
3358    #[test]
3359    fn test_table_metadata_v2_schema_not_found() {
3360        let metadata =
3361            fs::read_to_string("testdata/table_metadata/TableMetadataV2CurrentSchemaNotFound.json")
3362                .unwrap();
3363
3364        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3365
3366        assert_eq!(
3367            desered.unwrap_err().to_string(),
3368            "DataInvalid => No schema exists with the current schema id 2."
3369        )
3370    }
3371
3372    #[test]
3373    fn test_table_metadata_v2_missing_sort_order() {
3374        let metadata =
3375            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSortOrder.json")
3376                .unwrap();
3377
3378        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3379
3380        assert_eq!(
3381            desered.unwrap_err().to_string(),
3382            "data did not match any variant of untagged enum TableMetadataEnum"
3383        )
3384    }
3385
3386    #[test]
3387    fn test_table_metadata_v2_missing_partition_specs() {
3388        let metadata =
3389            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingPartitionSpecs.json")
3390                .unwrap();
3391
3392        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3393
3394        assert_eq!(
3395            desered.unwrap_err().to_string(),
3396            "data did not match any variant of untagged enum TableMetadataEnum"
3397        )
3398    }
3399
3400    #[test]
3401    fn test_table_metadata_v2_missing_last_partition_id() {
3402        let metadata = fs::read_to_string(
3403            "testdata/table_metadata/TableMetadataV2MissingLastPartitionId.json",
3404        )
3405        .unwrap();
3406
3407        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3408
3409        assert_eq!(
3410            desered.unwrap_err().to_string(),
3411            "data did not match any variant of untagged enum TableMetadataEnum"
3412        )
3413    }
3414
3415    #[test]
3416    fn test_table_metadata_v2_missing_schemas() {
3417        let metadata =
3418            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSchemas.json")
3419                .unwrap();
3420
3421        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3422
3423        assert_eq!(
3424            desered.unwrap_err().to_string(),
3425            "data did not match any variant of untagged enum TableMetadataEnum"
3426        )
3427    }
3428
3429    #[test]
3430    fn test_table_metadata_v2_unsupported_version() {
3431        let metadata =
3432            fs::read_to_string("testdata/table_metadata/TableMetadataUnsupportedVersion.json")
3433                .unwrap();
3434
3435        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3436
3437        assert_eq!(
3438            desered.unwrap_err().to_string(),
3439            "data did not match any variant of untagged enum TableMetadataEnum"
3440        )
3441    }
3442
3443    #[test]
3444    fn test_order_of_format_version() {
3445        assert!(FormatVersion::V1 < FormatVersion::V2);
3446        assert_eq!(FormatVersion::V1, FormatVersion::V1);
3447        assert_eq!(FormatVersion::V2, FormatVersion::V2);
3448    }
3449
3450    #[test]
3451    fn test_default_partition_spec() {
3452        let default_spec_id = 1234;
3453        let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3454        let partition_spec = PartitionSpec::unpartition_spec();
3455        table_meta_data.default_spec = partition_spec.clone().into();
3456        table_meta_data
3457            .partition_specs
3458            .insert(default_spec_id, Arc::new(partition_spec));
3459
3460        assert_eq!(
3461            (*table_meta_data.default_partition_spec().clone()).clone(),
3462            (*table_meta_data
3463                .partition_spec_by_id(default_spec_id)
3464                .unwrap()
3465                .clone())
3466            .clone()
3467        );
3468    }
3469    #[test]
3470    fn test_default_sort_order() {
3471        let default_sort_order_id = 1234;
3472        let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3473        table_meta_data.default_sort_order_id = default_sort_order_id;
3474        table_meta_data
3475            .sort_orders
3476            .insert(default_sort_order_id, Arc::new(SortOrder::default()));
3477
3478        assert_eq!(
3479            table_meta_data.default_sort_order(),
3480            table_meta_data
3481                .sort_orders
3482                .get(&default_sort_order_id)
3483                .unwrap()
3484        )
3485    }
3486
3487    #[test]
3488    fn test_table_metadata_builder_from_table_creation() {
3489        let table_creation = TableCreation::builder()
3490            .location("s3://db/table".to_string())
3491            .name("table".to_string())
3492            .properties(HashMap::new())
3493            .schema(Schema::builder().build().unwrap())
3494            .build();
3495        let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
3496            .unwrap()
3497            .build()
3498            .unwrap()
3499            .metadata;
3500        assert_eq!(table_metadata.location, "s3://db/table");
3501        assert_eq!(table_metadata.schemas.len(), 1);
3502        assert_eq!(
3503            table_metadata
3504                .schemas
3505                .get(&0)
3506                .unwrap()
3507                .as_struct()
3508                .fields()
3509                .len(),
3510            0
3511        );
3512        assert_eq!(table_metadata.properties.len(), 0);
3513        assert_eq!(
3514            table_metadata.partition_specs,
3515            HashMap::from([(
3516                0,
3517                Arc::new(
3518                    PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap().clone())
3519                        .with_spec_id(0)
3520                        .build()
3521                        .unwrap()
3522                )
3523            )])
3524        );
3525        assert_eq!(
3526            table_metadata.sort_orders,
3527            HashMap::from([(
3528                0,
3529                Arc::new(SortOrder {
3530                    order_id: 0,
3531                    fields: vec![]
3532                })
3533            )])
3534        );
3535    }
3536
3537    #[tokio::test]
3538    async fn test_table_metadata_read_write() {
3539        // Create a temporary directory for our test
3540        let temp_dir = TempDir::new().unwrap();
3541        let temp_path = temp_dir.path().to_str().unwrap();
3542
3543        // Create a FileIO instance
3544        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3545
3546        // Use an existing test metadata from the test files
3547        let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3548
3549        // Define the metadata location
3550        let metadata_location = format!("{temp_path}/metadata.json");
3551
3552        // Write the metadata
3553        original_metadata
3554            .write_to(&file_io, &metadata_location)
3555            .await
3556            .unwrap();
3557
3558        // Verify the file exists
3559        assert!(fs::metadata(&metadata_location).is_ok());
3560
3561        // Read the metadata back
3562        let read_metadata = TableMetadata::read_from(&file_io, &metadata_location)
3563            .await
3564            .unwrap();
3565
3566        // Verify the metadata matches
3567        assert_eq!(read_metadata, original_metadata);
3568    }
3569
3570    #[tokio::test]
3571    async fn test_table_metadata_read_compressed() {
3572        let temp_dir = TempDir::new().unwrap();
3573        let metadata_location = temp_dir.path().join("v1.gz.metadata.json");
3574
3575        let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3576        let json = serde_json::to_string(&original_metadata).unwrap();
3577
3578        let compressed = CompressionCodec::Gzip
3579            .compress(json.into_bytes())
3580            .expect("failed to compress metadata");
3581        std::fs::write(&metadata_location, &compressed).expect("failed to write metadata");
3582
3583        // Read the metadata back
3584        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3585        let metadata_location = metadata_location.to_str().unwrap();
3586        let read_metadata = TableMetadata::read_from(&file_io, metadata_location)
3587            .await
3588            .unwrap();
3589
3590        // Verify the metadata matches
3591        assert_eq!(read_metadata, original_metadata);
3592    }
3593
3594    #[tokio::test]
3595    async fn test_table_metadata_read_nonexistent_file() {
3596        // Create a FileIO instance
3597        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3598
3599        // Try to read a non-existent file
3600        let result = TableMetadata::read_from(&file_io, "/nonexistent/path/metadata.json").await;
3601
3602        // Verify it returns an error
3603        assert!(result.is_err());
3604    }
3605
3606    #[test]
3607    fn test_partition_name_exists() {
3608        let schema = Schema::builder()
3609            .with_fields(vec![
3610                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3611                NestedField::required(2, "partition_col", Type::Primitive(PrimitiveType::Int))
3612                    .into(),
3613            ])
3614            .build()
3615            .unwrap();
3616
3617        let spec1 = PartitionSpec::builder(schema.clone())
3618            .with_spec_id(1)
3619            .add_partition_field("data", "data_partition", Transform::Identity)
3620            .unwrap()
3621            .build()
3622            .unwrap();
3623
3624        let spec2 = PartitionSpec::builder(schema.clone())
3625            .with_spec_id(2)
3626            .add_partition_field("partition_col", "partition_bucket", Transform::Bucket(16))
3627            .unwrap()
3628            .build()
3629            .unwrap();
3630
3631        // Build metadata with these specs
3632        let metadata = TableMetadataBuilder::new(
3633            schema,
3634            spec1.clone().into_unbound(),
3635            SortOrder::unsorted_order(),
3636            "s3://test/location".to_string(),
3637            FormatVersion::V2,
3638            HashMap::new(),
3639        )
3640        .unwrap()
3641        .add_partition_spec(spec2.into_unbound())
3642        .unwrap()
3643        .build()
3644        .unwrap()
3645        .metadata;
3646
3647        assert!(metadata.partition_name_exists("data_partition"));
3648        assert!(metadata.partition_name_exists("partition_bucket"));
3649
3650        assert!(!metadata.partition_name_exists("nonexistent_field"));
3651        assert!(!metadata.partition_name_exists("data")); // schema field name, not partition field name
3652        assert!(!metadata.partition_name_exists(""));
3653    }
3654
3655    #[test]
3656    fn test_partition_name_exists_empty_specs() {
3657        // Create metadata with no partition specs (unpartitioned table)
3658        let schema = Schema::builder()
3659            .with_fields(vec![
3660                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3661            ])
3662            .build()
3663            .unwrap();
3664
3665        let metadata = TableMetadataBuilder::new(
3666            schema,
3667            PartitionSpec::unpartition_spec().into_unbound(),
3668            SortOrder::unsorted_order(),
3669            "s3://test/location".to_string(),
3670            FormatVersion::V2,
3671            HashMap::new(),
3672        )
3673        .unwrap()
3674        .build()
3675        .unwrap()
3676        .metadata;
3677
3678        assert!(!metadata.partition_name_exists("any_field"));
3679        assert!(!metadata.partition_name_exists("data"));
3680    }
3681
3682    #[test]
3683    fn test_name_exists_in_any_schema() {
3684        // Create multiple schemas with different fields
3685        let schema1 = Schema::builder()
3686            .with_schema_id(1)
3687            .with_fields(vec![
3688                NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3689                NestedField::required(2, "field2", Type::Primitive(PrimitiveType::Int)).into(),
3690            ])
3691            .build()
3692            .unwrap();
3693
3694        let schema2 = Schema::builder()
3695            .with_schema_id(2)
3696            .with_fields(vec![
3697                NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3698                NestedField::required(3, "field3", Type::Primitive(PrimitiveType::Long)).into(),
3699            ])
3700            .build()
3701            .unwrap();
3702
3703        let metadata = TableMetadataBuilder::new(
3704            schema1,
3705            PartitionSpec::unpartition_spec().into_unbound(),
3706            SortOrder::unsorted_order(),
3707            "s3://test/location".to_string(),
3708            FormatVersion::V2,
3709            HashMap::new(),
3710        )
3711        .unwrap()
3712        .add_current_schema(schema2)
3713        .unwrap()
3714        .build()
3715        .unwrap()
3716        .metadata;
3717
3718        assert!(metadata.name_exists_in_any_schema("field1")); // exists in both schemas
3719        assert!(metadata.name_exists_in_any_schema("field2")); // exists only in schema1 (historical)
3720        assert!(metadata.name_exists_in_any_schema("field3")); // exists only in schema2 (current)
3721
3722        assert!(!metadata.name_exists_in_any_schema("nonexistent_field"));
3723        assert!(!metadata.name_exists_in_any_schema("field4"));
3724        assert!(!metadata.name_exists_in_any_schema(""));
3725    }
3726
3727    #[test]
3728    fn test_name_exists_in_any_schema_empty_schemas() {
3729        let schema = Schema::builder().with_fields(vec![]).build().unwrap();
3730
3731        let metadata = TableMetadataBuilder::new(
3732            schema,
3733            PartitionSpec::unpartition_spec().into_unbound(),
3734            SortOrder::unsorted_order(),
3735            "s3://test/location".to_string(),
3736            FormatVersion::V2,
3737            HashMap::new(),
3738        )
3739        .unwrap()
3740        .build()
3741        .unwrap()
3742        .metadata;
3743
3744        assert!(!metadata.name_exists_in_any_schema("any_field"));
3745    }
3746
3747    #[test]
3748    fn test_helper_methods_multi_version_scenario() {
3749        // Test a realistic multi-version scenario
3750        let initial_schema = Schema::builder()
3751            .with_fields(vec![
3752                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3753                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3754                NestedField::required(
3755                    3,
3756                    "deprecated_field",
3757                    Type::Primitive(PrimitiveType::String),
3758                )
3759                .into(),
3760            ])
3761            .build()
3762            .unwrap();
3763
3764        let metadata = TableMetadataBuilder::new(
3765            initial_schema,
3766            PartitionSpec::unpartition_spec().into_unbound(),
3767            SortOrder::unsorted_order(),
3768            "s3://test/location".to_string(),
3769            FormatVersion::V2,
3770            HashMap::new(),
3771        )
3772        .unwrap();
3773
3774        let evolved_schema = Schema::builder()
3775            .with_fields(vec![
3776                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3777                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3778                NestedField::required(
3779                    3,
3780                    "deprecated_field",
3781                    Type::Primitive(PrimitiveType::String),
3782                )
3783                .into(),
3784                NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3785                    .into(),
3786            ])
3787            .build()
3788            .unwrap();
3789
3790        // Then add a third schema that removes the deprecated field
3791        let _final_schema = Schema::builder()
3792            .with_fields(vec![
3793                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3794                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3795                NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3796                    .into(),
3797                NestedField::required(5, "latest_field", Type::Primitive(PrimitiveType::Boolean))
3798                    .into(),
3799            ])
3800            .build()
3801            .unwrap();
3802
3803        let final_metadata = metadata
3804            .add_current_schema(evolved_schema)
3805            .unwrap()
3806            .build()
3807            .unwrap()
3808            .metadata;
3809
3810        assert!(!final_metadata.partition_name_exists("nonexistent_partition")); // unpartitioned table
3811
3812        assert!(final_metadata.name_exists_in_any_schema("id")); // exists in both schemas
3813        assert!(final_metadata.name_exists_in_any_schema("name")); // exists in both schemas
3814        assert!(final_metadata.name_exists_in_any_schema("deprecated_field")); // exists in both schemas
3815        assert!(final_metadata.name_exists_in_any_schema("new_field")); // only in current schema
3816        assert!(!final_metadata.name_exists_in_any_schema("never_existed"));
3817    }
3818
3819    #[test]
3820    fn test_invalid_sort_order_id_zero_with_fields() {
3821        let metadata = r#"
3822        {
3823            "format-version": 2,
3824            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
3825            "location": "s3://bucket/test/location",
3826            "last-sequence-number": 111,
3827            "last-updated-ms": 1600000000000,
3828            "last-column-id": 3,
3829            "current-schema-id": 1,
3830            "schemas": [
3831                {
3832                    "type": "struct",
3833                    "schema-id": 1,
3834                    "fields": [
3835                        {"id": 1, "name": "x", "required": true, "type": "long"},
3836                        {"id": 2, "name": "y", "required": true, "type": "long"}
3837                    ]
3838                }
3839            ],
3840            "default-spec-id": 0,
3841            "partition-specs": [{"spec-id": 0, "fields": []}],
3842            "last-partition-id": 999,
3843            "default-sort-order-id": 0,
3844            "sort-orders": [
3845                {
3846                    "order-id": 0,
3847                    "fields": [
3848                        {
3849                            "transform": "identity",
3850                            "source-id": 1,
3851                            "direction": "asc",
3852                            "null-order": "nulls-first"
3853                        }
3854                    ]
3855                }
3856            ],
3857            "properties": {},
3858            "current-snapshot-id": -1,
3859            "snapshots": []
3860        }
3861        "#;
3862
3863        let result: Result<TableMetadata, serde_json::Error> = serde_json::from_str(metadata);
3864
3865        // Should fail because sort order ID 0 is reserved for unsorted order and cannot have fields
3866        assert!(
3867            result.is_err(),
3868            "Parsing should fail for sort order ID 0 with fields"
3869        );
3870    }
3871
3872    #[test]
3873    fn test_table_properties_with_defaults() {
3874        use crate::spec::TableProperties;
3875
3876        let schema = Schema::builder()
3877            .with_fields(vec![
3878                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3879            ])
3880            .build()
3881            .unwrap();
3882
3883        let metadata = TableMetadataBuilder::new(
3884            schema,
3885            PartitionSpec::unpartition_spec().into_unbound(),
3886            SortOrder::unsorted_order(),
3887            "s3://test/location".to_string(),
3888            FormatVersion::V2,
3889            HashMap::new(),
3890        )
3891        .unwrap()
3892        .build()
3893        .unwrap()
3894        .metadata;
3895
3896        let props = metadata.table_properties().unwrap();
3897
3898        assert_eq!(
3899            props.commit_num_retries,
3900            TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT
3901        );
3902        assert_eq!(
3903            props.write_target_file_size_bytes,
3904            TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
3905        );
3906    }
3907
3908    #[test]
3909    fn test_table_properties_with_custom_values() {
3910        use crate::spec::TableProperties;
3911
3912        let schema = Schema::builder()
3913            .with_fields(vec![
3914                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3915            ])
3916            .build()
3917            .unwrap();
3918
3919        let properties = HashMap::from([
3920            (
3921                TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
3922                "10".to_string(),
3923            ),
3924            (
3925                TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
3926                "1024".to_string(),
3927            ),
3928        ]);
3929
3930        let metadata = TableMetadataBuilder::new(
3931            schema,
3932            PartitionSpec::unpartition_spec().into_unbound(),
3933            SortOrder::unsorted_order(),
3934            "s3://test/location".to_string(),
3935            FormatVersion::V2,
3936            properties,
3937        )
3938        .unwrap()
3939        .build()
3940        .unwrap()
3941        .metadata;
3942
3943        let props = metadata.table_properties().unwrap();
3944
3945        assert_eq!(props.commit_num_retries, 10);
3946        assert_eq!(props.write_target_file_size_bytes, 1024);
3947    }
3948
3949    #[test]
3950    fn test_table_properties_with_invalid_value() {
3951        let schema = Schema::builder()
3952            .with_fields(vec![
3953                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3954            ])
3955            .build()
3956            .unwrap();
3957
3958        let properties = HashMap::from([(
3959            "commit.retry.num-retries".to_string(),
3960            "not_a_number".to_string(),
3961        )]);
3962
3963        let metadata = TableMetadataBuilder::new(
3964            schema,
3965            PartitionSpec::unpartition_spec().into_unbound(),
3966            SortOrder::unsorted_order(),
3967            "s3://test/location".to_string(),
3968            FormatVersion::V2,
3969            properties,
3970        )
3971        .unwrap()
3972        .build()
3973        .unwrap()
3974        .metadata;
3975
3976        let err = metadata.table_properties().unwrap_err();
3977        assert_eq!(err.kind(), ErrorKind::DataInvalid);
3978        assert!(err.message().contains("Invalid table properties"));
3979    }
3980
3981    #[test]
3982    fn test_v2_to_v3_upgrade_preserves_existing_snapshots_without_row_lineage() {
3983        // Create a v2 table metadata
3984        let schema = Schema::builder()
3985            .with_fields(vec![
3986                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3987            ])
3988            .build()
3989            .unwrap();
3990
3991        let v2_metadata = TableMetadataBuilder::new(
3992            schema,
3993            PartitionSpec::unpartition_spec().into_unbound(),
3994            SortOrder::unsorted_order(),
3995            "s3://bucket/test/location".to_string(),
3996            FormatVersion::V2,
3997            HashMap::new(),
3998        )
3999        .unwrap()
4000        .build()
4001        .unwrap()
4002        .metadata;
4003
4004        // Add a v2 snapshot
4005        let snapshot = Snapshot::builder()
4006            .with_snapshot_id(1)
4007            .with_timestamp_ms(v2_metadata.last_updated_ms + 1)
4008            .with_sequence_number(1)
4009            .with_schema_id(0)
4010            .with_manifest_list("s3://bucket/test/metadata/snap-1.avro")
4011            .with_summary(Summary {
4012                operation: Operation::Append,
4013                additional_properties: HashMap::from([(
4014                    "added-data-files".to_string(),
4015                    "1".to_string(),
4016                )]),
4017            })
4018            .build();
4019
4020        let v2_with_snapshot = v2_metadata
4021            .into_builder(Some("s3://bucket/test/metadata/v00001.json".to_string()))
4022            .add_snapshot(snapshot)
4023            .unwrap()
4024            .set_ref("main", SnapshotReference {
4025                snapshot_id: 1,
4026                retention: SnapshotRetention::Branch {
4027                    min_snapshots_to_keep: None,
4028                    max_snapshot_age_ms: None,
4029                    max_ref_age_ms: None,
4030                },
4031            })
4032            .unwrap()
4033            .build()
4034            .unwrap()
4035            .metadata;
4036
4037        // Verify v2 serialization works fine
4038        let v2_json = serde_json::to_string(&v2_with_snapshot);
4039        assert!(v2_json.is_ok(), "v2 serialization should work");
4040
4041        // Upgrade to v3
4042        let v3_metadata = v2_with_snapshot
4043            .into_builder(Some("s3://bucket/test/metadata/v00002.json".to_string()))
4044            .upgrade_format_version(FormatVersion::V3)
4045            .unwrap()
4046            .build()
4047            .unwrap()
4048            .metadata;
4049
4050        assert_eq!(v3_metadata.format_version, FormatVersion::V3);
4051        assert_eq!(v3_metadata.next_row_id, INITIAL_ROW_ID);
4052        assert_eq!(v3_metadata.snapshots.len(), 1);
4053
4054        // Verify the snapshot has no row_range
4055        let snapshot = v3_metadata.snapshots.values().next().unwrap();
4056        assert!(
4057            snapshot.row_range().is_none(),
4058            "Snapshot should have no row_range after upgrade"
4059        );
4060
4061        // Try to serialize v3 metadata - this should now work
4062        let v3_json = serde_json::to_string(&v3_metadata);
4063        assert!(
4064            v3_json.is_ok(),
4065            "v3 serialization should work for upgraded tables"
4066        );
4067
4068        // Verify we can deserialize it back
4069        let deserialized: TableMetadata = serde_json::from_str(&v3_json.unwrap()).unwrap();
4070        assert_eq!(deserialized.format_version, FormatVersion::V3);
4071        assert_eq!(deserialized.snapshots.len(), 1);
4072
4073        // Verify the deserialized snapshot still has no row_range
4074        let deserialized_snapshot = deserialized.snapshots.values().next().unwrap();
4075        assert!(
4076            deserialized_snapshot.row_range().is_none(),
4077            "Deserialized snapshot should have no row_range"
4078        );
4079    }
4080
4081    #[test]
4082    fn test_v3_snapshot_with_row_lineage_serialization() {
4083        // Create a v3 table metadata
4084        let schema = Schema::builder()
4085            .with_fields(vec![
4086                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
4087            ])
4088            .build()
4089            .unwrap();
4090
4091        let v3_metadata = TableMetadataBuilder::new(
4092            schema,
4093            PartitionSpec::unpartition_spec().into_unbound(),
4094            SortOrder::unsorted_order(),
4095            "s3://bucket/test/location".to_string(),
4096            FormatVersion::V3,
4097            HashMap::new(),
4098        )
4099        .unwrap()
4100        .build()
4101        .unwrap()
4102        .metadata;
4103
4104        // Add a v3 snapshot with row lineage
4105        let snapshot = Snapshot::builder()
4106            .with_snapshot_id(1)
4107            .with_timestamp_ms(v3_metadata.last_updated_ms + 1)
4108            .with_sequence_number(1)
4109            .with_schema_id(0)
4110            .with_manifest_list("s3://bucket/test/metadata/snap-1.avro")
4111            .with_summary(Summary {
4112                operation: Operation::Append,
4113                additional_properties: HashMap::from([(
4114                    "added-data-files".to_string(),
4115                    "1".to_string(),
4116                )]),
4117            })
4118            .with_row_range(100, 50) // first_row_id=100, added_rows=50
4119            .build();
4120
4121        let v3_with_snapshot = v3_metadata
4122            .into_builder(Some("s3://bucket/test/metadata/v00001.json".to_string()))
4123            .add_snapshot(snapshot)
4124            .unwrap()
4125            .set_ref("main", SnapshotReference {
4126                snapshot_id: 1,
4127                retention: SnapshotRetention::Branch {
4128                    min_snapshots_to_keep: None,
4129                    max_snapshot_age_ms: None,
4130                    max_ref_age_ms: None,
4131                },
4132            })
4133            .unwrap()
4134            .build()
4135            .unwrap()
4136            .metadata;
4137
4138        // Verify the snapshot has row_range
4139        let snapshot = v3_with_snapshot.snapshots.values().next().unwrap();
4140        assert!(
4141            snapshot.row_range().is_some(),
4142            "Snapshot should have row_range"
4143        );
4144        let (first_row_id, added_rows) = snapshot.row_range().unwrap();
4145        assert_eq!(first_row_id, 100);
4146        assert_eq!(added_rows, 50);
4147
4148        // Serialize v3 metadata - this should work
4149        let v3_json = serde_json::to_string(&v3_with_snapshot);
4150        assert!(
4151            v3_json.is_ok(),
4152            "v3 serialization should work for snapshots with row lineage"
4153        );
4154
4155        // Verify we can deserialize it back
4156        let deserialized: TableMetadata = serde_json::from_str(&v3_json.unwrap()).unwrap();
4157        assert_eq!(deserialized.format_version, FormatVersion::V3);
4158        assert_eq!(deserialized.snapshots.len(), 1);
4159
4160        // Verify the deserialized snapshot has the correct row_range
4161        let deserialized_snapshot = deserialized.snapshots.values().next().unwrap();
4162        assert!(
4163            deserialized_snapshot.row_range().is_some(),
4164            "Deserialized snapshot should have row_range"
4165        );
4166        let (deserialized_first_row_id, deserialized_added_rows) =
4167            deserialized_snapshot.row_range().unwrap();
4168        assert_eq!(deserialized_first_row_id, 100);
4169        assert_eq!(deserialized_added_rows, 50);
4170    }
4171}