iceberg/spec/
table_metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the [table metadata](https://iceberg.apache.org/spec/#table-metadata).
19//! The main struct here is [TableMetadataV2] which defines the data for a table.
20
21use std::cmp::Ordering;
22use std::collections::HashMap;
23use std::fmt::{Display, Formatter};
24use std::hash::Hash;
25use std::sync::Arc;
26
27use _serde::TableMetadataEnum;
28use chrono::{DateTime, Utc};
29use serde::{Deserialize, Serialize};
30use serde_repr::{Deserialize_repr, Serialize_repr};
31use uuid::Uuid;
32
33use super::snapshot::SnapshotReference;
34pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder};
35use super::{
36    DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef,
37    SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType,
38    TableProperties, parse_metadata_file_compression,
39};
40use crate::catalog::MetadataLocation;
41use crate::compression::CompressionCodec;
42use crate::error::{Result, timestamp_ms_to_utc};
43use crate::io::FileIO;
44use crate::spec::EncryptedKey;
45use crate::{Error, ErrorKind};
46
47static MAIN_BRANCH: &str = "main";
48pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
49
50pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
51pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
52
53/// Initial row id for row lineage for new v3 tables and older tables upgrading to v3.
54pub const INITIAL_ROW_ID: u64 = 0;
55/// Minimum format version that supports row lineage (v3).
56pub const MIN_FORMAT_VERSION_ROW_LINEAGE: FormatVersion = FormatVersion::V3;
57/// Reference to [`TableMetadata`].
58pub type TableMetadataRef = Arc<TableMetadata>;
59
60#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
61#[serde(try_from = "TableMetadataEnum")]
62/// Fields for the version 2 of the table metadata.
63///
64/// We assume that this data structure is always valid, so we will panic when invalid error happens.
65/// We check the validity of this data structure when constructing.
66pub struct TableMetadata {
67    /// Integer Version for the format.
68    pub(crate) format_version: FormatVersion,
69    /// A UUID that identifies the table
70    pub(crate) table_uuid: Uuid,
71    /// Location tables base location
72    pub(crate) location: String,
73    /// The tables highest sequence number
74    pub(crate) last_sequence_number: i64,
75    /// Timestamp in milliseconds from the unix epoch when the table was last updated.
76    pub(crate) last_updated_ms: i64,
77    /// An integer; the highest assigned column ID for the table.
78    pub(crate) last_column_id: i32,
79    /// A list of schemas, stored as objects with schema-id.
80    pub(crate) schemas: HashMap<i32, SchemaRef>,
81    /// ID of the table’s current schema.
82    pub(crate) current_schema_id: i32,
83    /// A list of partition specs, stored as full partition spec objects.
84    pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
85    /// ID of the “current” spec that writers should use by default.
86    pub(crate) default_spec: PartitionSpecRef,
87    /// Partition type of the default partition spec.
88    pub(crate) default_partition_type: StructType,
89    /// An integer; the highest assigned partition field ID across all partition specs for the table.
90    pub(crate) last_partition_id: i32,
91    ///A string to string map of table properties. This is used to control settings that
92    /// affect reading and writing and is not intended to be used for arbitrary metadata.
93    /// For example, commit.retry.num-retries is used to control the number of commit retries.
94    pub(crate) properties: HashMap<String, String>,
95    /// long ID of the current table snapshot; must be the same as the current
96    /// ID of the main branch in refs.
97    pub(crate) current_snapshot_id: Option<i64>,
98    ///A list of valid snapshots. Valid snapshots are snapshots for which all
99    /// data files exist in the file system. A data file must not be deleted
100    /// from the file system until the last snapshot in which it was listed is
101    /// garbage collected.
102    pub(crate) snapshots: HashMap<i64, SnapshotRef>,
103    /// A list (optional) of timestamp and snapshot ID pairs that encodes changes
104    /// to the current snapshot for the table. Each time the current-snapshot-id
105    /// is changed, a new entry should be added with the last-updated-ms
106    /// and the new current-snapshot-id. When snapshots are expired from
107    /// the list of valid snapshots, all entries before a snapshot that has
108    /// expired should be removed.
109    pub(crate) snapshot_log: Vec<SnapshotLog>,
110
111    /// A list (optional) of timestamp and metadata file location pairs
112    /// that encodes changes to the previous metadata files for the table.
113    /// Each time a new metadata file is created, a new entry of the
114    /// previous metadata file location should be added to the list.
115    /// Tables can be configured to remove the oldest metadata log entries and
116    /// keep a fixed-size log of the most recent entries after a commit.
117    pub(crate) metadata_log: Vec<MetadataLog>,
118
119    /// A list of sort orders, stored as full sort order objects.
120    pub(crate) sort_orders: HashMap<i64, SortOrderRef>,
121    /// Default sort order id of the table. Note that this could be used by
122    /// writers, but is not used when reading because reads use the specs
123    /// stored in manifest files.
124    pub(crate) default_sort_order_id: i64,
125    /// A map of snapshot references. The map keys are the unique snapshot reference
126    /// names in the table, and the map values are snapshot reference objects.
127    /// There is always a main branch reference pointing to the current-snapshot-id
128    /// even if the refs map is null.
129    pub(crate) refs: HashMap<String, SnapshotReference>,
130    /// Mapping of snapshot ids to statistics files.
131    pub(crate) statistics: HashMap<i64, StatisticsFile>,
132    /// Mapping of snapshot ids to partition statistics files.
133    pub(crate) partition_statistics: HashMap<i64, PartitionStatisticsFile>,
134    /// Encryption Keys - map of key id to the actual key
135    pub(crate) encryption_keys: HashMap<String, EncryptedKey>,
136    /// Next row id to be assigned for Row Lineage (v3)
137    pub(crate) next_row_id: u64,
138}
139
140impl TableMetadata {
141    /// Convert this Table Metadata into a builder for modification.
142    ///
143    /// `current_file_location` is the location where the current version
144    /// of the metadata file is stored. This is used to update the metadata log.
145    /// If `current_file_location` is `None`, the metadata log will not be updated.
146    /// This should only be used to stage-create tables.
147    #[must_use]
148    pub fn into_builder(self, current_file_location: Option<String>) -> TableMetadataBuilder {
149        TableMetadataBuilder::new_from_metadata(self, current_file_location)
150    }
151
152    /// Check if a partition field name exists in any partition spec.
153    #[inline]
154    pub(crate) fn partition_name_exists(&self, name: &str) -> bool {
155        self.partition_specs
156            .values()
157            .any(|spec| spec.fields().iter().any(|pf| pf.name == name))
158    }
159
160    /// Check if a field name exists in any schema.
161    #[inline]
162    pub(crate) fn name_exists_in_any_schema(&self, name: &str) -> bool {
163        self.schemas
164            .values()
165            .any(|schema| schema.field_by_name(name).is_some())
166    }
167
168    /// Returns format version of this metadata.
169    #[inline]
170    pub fn format_version(&self) -> FormatVersion {
171        self.format_version
172    }
173
174    /// Returns uuid of current table.
175    #[inline]
176    pub fn uuid(&self) -> Uuid {
177        self.table_uuid
178    }
179
180    /// Returns table location.
181    #[inline]
182    pub fn location(&self) -> &str {
183        self.location.as_str()
184    }
185
186    /// Returns last sequence number.
187    #[inline]
188    pub fn last_sequence_number(&self) -> i64 {
189        self.last_sequence_number
190    }
191
192    /// Returns the next sequence number for the table.
193    ///
194    /// For format version 1, it always returns the initial sequence number.
195    /// For other versions, it returns the last sequence number incremented by 1.
196    #[inline]
197    pub fn next_sequence_number(&self) -> i64 {
198        match self.format_version {
199            FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER,
200            _ => self.last_sequence_number + 1,
201        }
202    }
203
204    /// Returns the last column id.
205    #[inline]
206    pub fn last_column_id(&self) -> i32 {
207        self.last_column_id
208    }
209
210    /// Returns the last partition_id
211    #[inline]
212    pub fn last_partition_id(&self) -> i32 {
213        self.last_partition_id
214    }
215
216    /// Returns last updated time.
217    #[inline]
218    pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
219        timestamp_ms_to_utc(self.last_updated_ms)
220    }
221
222    /// Returns last updated time in milliseconds.
223    #[inline]
224    pub fn last_updated_ms(&self) -> i64 {
225        self.last_updated_ms
226    }
227
228    /// Returns schemas
229    #[inline]
230    pub fn schemas_iter(&self) -> impl ExactSizeIterator<Item = &SchemaRef> {
231        self.schemas.values()
232    }
233
234    /// Lookup schema by id.
235    #[inline]
236    pub fn schema_by_id(&self, schema_id: SchemaId) -> Option<&SchemaRef> {
237        self.schemas.get(&schema_id)
238    }
239
240    /// Get current schema
241    #[inline]
242    pub fn current_schema(&self) -> &SchemaRef {
243        self.schema_by_id(self.current_schema_id)
244            .expect("Current schema id set, but not found in table metadata")
245    }
246
247    /// Get the id of the current schema
248    #[inline]
249    pub fn current_schema_id(&self) -> SchemaId {
250        self.current_schema_id
251    }
252
253    /// Returns all partition specs.
254    #[inline]
255    pub fn partition_specs_iter(&self) -> impl ExactSizeIterator<Item = &PartitionSpecRef> {
256        self.partition_specs.values()
257    }
258
259    /// Lookup partition spec by id.
260    #[inline]
261    pub fn partition_spec_by_id(&self, spec_id: i32) -> Option<&PartitionSpecRef> {
262        self.partition_specs.get(&spec_id)
263    }
264
265    /// Get default partition spec
266    #[inline]
267    pub fn default_partition_spec(&self) -> &PartitionSpecRef {
268        &self.default_spec
269    }
270
271    /// Return the partition type of the default partition spec.
272    #[inline]
273    pub fn default_partition_type(&self) -> &StructType {
274        &self.default_partition_type
275    }
276
277    #[inline]
278    /// Returns spec id of the "current" partition spec.
279    pub fn default_partition_spec_id(&self) -> i32 {
280        self.default_spec.spec_id()
281    }
282
283    /// Returns all snapshots
284    #[inline]
285    pub fn snapshots(&self) -> impl ExactSizeIterator<Item = &SnapshotRef> {
286        self.snapshots.values()
287    }
288
289    /// Lookup snapshot by id.
290    #[inline]
291    pub fn snapshot_by_id(&self, snapshot_id: i64) -> Option<&SnapshotRef> {
292        self.snapshots.get(&snapshot_id)
293    }
294
295    /// Returns snapshot history.
296    #[inline]
297    pub fn history(&self) -> &[SnapshotLog] {
298        &self.snapshot_log
299    }
300
301    /// Returns the metadata log.
302    #[inline]
303    pub fn metadata_log(&self) -> &[MetadataLog] {
304        &self.metadata_log
305    }
306
307    /// Get current snapshot
308    #[inline]
309    pub fn current_snapshot(&self) -> Option<&SnapshotRef> {
310        self.current_snapshot_id.map(|s| {
311            self.snapshot_by_id(s)
312                .expect("Current snapshot id has been set, but doesn't exist in metadata")
313        })
314    }
315
316    /// Get the current snapshot id
317    #[inline]
318    pub fn current_snapshot_id(&self) -> Option<i64> {
319        self.current_snapshot_id
320    }
321
322    /// Get the snapshot for a reference
323    /// Returns an option if the `ref_name` is not found
324    #[inline]
325    pub fn snapshot_for_ref(&self, ref_name: &str) -> Option<&SnapshotRef> {
326        self.refs.get(ref_name).map(|r| {
327            self.snapshot_by_id(r.snapshot_id)
328                .unwrap_or_else(|| panic!("Snapshot id of ref {ref_name} doesn't exist"))
329        })
330    }
331
332    /// Return all sort orders.
333    #[inline]
334    pub fn sort_orders_iter(&self) -> impl ExactSizeIterator<Item = &SortOrderRef> {
335        self.sort_orders.values()
336    }
337
338    /// Lookup sort order by id.
339    #[inline]
340    pub fn sort_order_by_id(&self, sort_order_id: i64) -> Option<&SortOrderRef> {
341        self.sort_orders.get(&sort_order_id)
342    }
343
344    /// Returns default sort order id.
345    #[inline]
346    pub fn default_sort_order(&self) -> &SortOrderRef {
347        self.sort_orders
348            .get(&self.default_sort_order_id)
349            .expect("Default order id has been set, but not found in table metadata!")
350    }
351
352    /// Returns default sort order id.
353    #[inline]
354    pub fn default_sort_order_id(&self) -> i64 {
355        self.default_sort_order_id
356    }
357
358    /// Returns properties of table.
359    #[inline]
360    pub fn properties(&self) -> &HashMap<String, String> {
361        &self.properties
362    }
363
364    /// Returns the metadata compression codec from table properties.
365    ///
366    /// Returns `CompressionCodec::None` if compression is disabled or not configured.
367    /// Returns `CompressionCodec::Gzip` if gzip compression is enabled.
368    ///
369    /// # Errors
370    ///
371    /// Returns an error if the compression codec property has an invalid value.
372    pub fn metadata_compression_codec(&self) -> Result<CompressionCodec> {
373        parse_metadata_file_compression(&self.properties)
374    }
375
376    /// Returns typed table properties parsed from the raw properties map with defaults.
377    pub fn table_properties(&self) -> Result<TableProperties> {
378        TableProperties::try_from(&self.properties).map_err(|e| {
379            Error::new(ErrorKind::DataInvalid, "Invalid table properties").with_source(e)
380        })
381    }
382
383    /// Return location of statistics files.
384    #[inline]
385    pub fn statistics_iter(&self) -> impl ExactSizeIterator<Item = &StatisticsFile> {
386        self.statistics.values()
387    }
388
389    /// Return location of partition statistics files.
390    #[inline]
391    pub fn partition_statistics_iter(
392        &self,
393    ) -> impl ExactSizeIterator<Item = &PartitionStatisticsFile> {
394        self.partition_statistics.values()
395    }
396
397    /// Get a statistics file for a snapshot id.
398    #[inline]
399    pub fn statistics_for_snapshot(&self, snapshot_id: i64) -> Option<&StatisticsFile> {
400        self.statistics.get(&snapshot_id)
401    }
402
403    /// Get a partition statistics file for a snapshot id.
404    #[inline]
405    pub fn partition_statistics_for_snapshot(
406        &self,
407        snapshot_id: i64,
408    ) -> Option<&PartitionStatisticsFile> {
409        self.partition_statistics.get(&snapshot_id)
410    }
411
412    fn construct_refs(&mut self) {
413        if let Some(current_snapshot_id) = self.current_snapshot_id
414            && !self.refs.contains_key(MAIN_BRANCH)
415        {
416            self.refs
417                .insert(MAIN_BRANCH.to_string(), SnapshotReference {
418                    snapshot_id: current_snapshot_id,
419                    retention: SnapshotRetention::Branch {
420                        min_snapshots_to_keep: None,
421                        max_snapshot_age_ms: None,
422                        max_ref_age_ms: None,
423                    },
424                });
425        }
426    }
427
428    /// Iterate over all encryption keys
429    #[inline]
430    pub fn encryption_keys_iter(&self) -> impl ExactSizeIterator<Item = &EncryptedKey> {
431        self.encryption_keys.values()
432    }
433
434    /// Get the encryption key for a given key id
435    #[inline]
436    pub fn encryption_key(&self, key_id: &str) -> Option<&EncryptedKey> {
437        self.encryption_keys.get(key_id)
438    }
439
440    /// Get the next row id to be assigned
441    #[inline]
442    pub fn next_row_id(&self) -> u64 {
443        self.next_row_id
444    }
445
446    /// Read table metadata from the given location.
447    pub async fn read_from(
448        file_io: &FileIO,
449        metadata_location: impl AsRef<str>,
450    ) -> Result<TableMetadata> {
451        let metadata_location = metadata_location.as_ref();
452        let input_file = file_io.new_input(metadata_location)?;
453        let metadata_content = input_file.read().await?;
454
455        // Check if the file is compressed by looking for the gzip "magic number".
456        let metadata = if metadata_content.len() > 2
457            && metadata_content[0] == 0x1F
458            && metadata_content[1] == 0x8B
459        {
460            let decompressed_data = CompressionCodec::Gzip
461                .decompress(metadata_content.to_vec())
462                .map_err(|e| {
463                    Error::new(
464                        ErrorKind::DataInvalid,
465                        "Trying to read compressed metadata file",
466                    )
467                    .with_context("file_path", metadata_location)
468                    .with_source(e)
469                })?;
470            serde_json::from_slice(&decompressed_data)?
471        } else {
472            serde_json::from_slice(&metadata_content)?
473        };
474
475        Ok(metadata)
476    }
477
478    /// Write table metadata to the given location.
479    pub async fn write_to(
480        &self,
481        file_io: &FileIO,
482        metadata_location: &MetadataLocation,
483    ) -> Result<()> {
484        let json_data = serde_json::to_vec(self)?;
485
486        // Check if compression codec from properties matches the one in metadata_location
487        let codec = parse_metadata_file_compression(&self.properties)?;
488
489        if codec != metadata_location.compression_codec() {
490            return Err(Error::new(
491                ErrorKind::DataInvalid,
492                format!(
493                    "Compression codec mismatch: metadata_location has {:?}, but table properties specify {:?}",
494                    metadata_location.compression_codec(),
495                    codec
496                ),
497            ));
498        }
499
500        // Apply compression based on codec
501        let data_to_write = match codec {
502            CompressionCodec::Gzip => codec.compress(json_data)?,
503            CompressionCodec::None => json_data,
504            _ => {
505                return Err(Error::new(
506                    ErrorKind::DataInvalid,
507                    format!("Unsupported metadata compression codec: {codec:?}"),
508                ));
509            }
510        };
511
512        file_io
513            .new_output(metadata_location.to_string())?
514            .write(data_to_write.into())
515            .await
516    }
517
518    /// Normalize this partition spec.
519    ///
520    /// This is an internal method
521    /// meant to be called after constructing table metadata from untrusted sources.
522    /// We run this method after json deserialization.
523    /// All constructors for `TableMetadata` which are part of `iceberg-rust`
524    /// should return normalized `TableMetadata`.
525    pub(super) fn try_normalize(&mut self) -> Result<&mut Self> {
526        self.validate_current_schema()?;
527        self.normalize_current_snapshot()?;
528        self.construct_refs();
529        self.validate_refs()?;
530        self.validate_chronological_snapshot_logs()?;
531        self.validate_chronological_metadata_logs()?;
532        // Normalize location (remove trailing slash)
533        self.location = self.location.trim_end_matches('/').to_string();
534        self.validate_snapshot_sequence_number()?;
535        self.try_normalize_partition_spec()?;
536        self.try_normalize_sort_order()?;
537        Ok(self)
538    }
539
540    /// If the default partition spec is not present in specs, add it
541    fn try_normalize_partition_spec(&mut self) -> Result<()> {
542        if self
543            .partition_spec_by_id(self.default_spec.spec_id())
544            .is_none()
545        {
546            self.partition_specs.insert(
547                self.default_spec.spec_id(),
548                Arc::new(Arc::unwrap_or_clone(self.default_spec.clone())),
549            );
550        }
551
552        Ok(())
553    }
554
555    /// If the default sort order is unsorted but the sort order is not present, add it
556    fn try_normalize_sort_order(&mut self) -> Result<()> {
557        // Validate that sort order ID 0 (reserved for unsorted) has no fields
558        if let Some(sort_order) = self.sort_order_by_id(SortOrder::UNSORTED_ORDER_ID)
559            && !sort_order.fields.is_empty()
560        {
561            return Err(Error::new(
562                ErrorKind::Unexpected,
563                format!(
564                    "Sort order ID {} is reserved for unsorted order",
565                    SortOrder::UNSORTED_ORDER_ID
566                ),
567            ));
568        }
569
570        if self.sort_order_by_id(self.default_sort_order_id).is_some() {
571            return Ok(());
572        }
573
574        if self.default_sort_order_id != SortOrder::UNSORTED_ORDER_ID {
575            return Err(Error::new(
576                ErrorKind::DataInvalid,
577                format!(
578                    "No sort order exists with the default sort order id {}.",
579                    self.default_sort_order_id
580                ),
581            ));
582        }
583
584        let sort_order = SortOrder::unsorted_order();
585        self.sort_orders
586            .insert(SortOrder::UNSORTED_ORDER_ID, Arc::new(sort_order));
587        Ok(())
588    }
589
590    /// Validate the current schema is set and exists.
591    fn validate_current_schema(&self) -> Result<()> {
592        if self.schema_by_id(self.current_schema_id).is_none() {
593            return Err(Error::new(
594                ErrorKind::DataInvalid,
595                format!(
596                    "No schema exists with the current schema id {}.",
597                    self.current_schema_id
598                ),
599            ));
600        }
601        Ok(())
602    }
603
604    /// If current snapshot is Some(-1) then set it to None.
605    fn normalize_current_snapshot(&mut self) -> Result<()> {
606        if let Some(current_snapshot_id) = self.current_snapshot_id {
607            if current_snapshot_id == EMPTY_SNAPSHOT_ID {
608                self.current_snapshot_id = None;
609            } else if self.snapshot_by_id(current_snapshot_id).is_none() {
610                return Err(Error::new(
611                    ErrorKind::DataInvalid,
612                    format!(
613                        "Snapshot for current snapshot id {current_snapshot_id} does not exist in the existing snapshots list"
614                    ),
615                ));
616            }
617        }
618        Ok(())
619    }
620
621    /// Validate that all refs are valid (snapshot exists)
622    fn validate_refs(&self) -> Result<()> {
623        for (name, snapshot_ref) in self.refs.iter() {
624            if self.snapshot_by_id(snapshot_ref.snapshot_id).is_none() {
625                return Err(Error::new(
626                    ErrorKind::DataInvalid,
627                    format!(
628                        "Snapshot for reference {name} does not exist in the existing snapshots list"
629                    ),
630                ));
631            }
632        }
633
634        let main_ref = self.refs.get(MAIN_BRANCH);
635        if self.current_snapshot_id.is_some() {
636            if let Some(main_ref) = main_ref
637                && main_ref.snapshot_id != self.current_snapshot_id.unwrap_or_default()
638            {
639                return Err(Error::new(
640                    ErrorKind::DataInvalid,
641                    format!(
642                        "Current snapshot id does not match main branch ({:?} != {:?})",
643                        self.current_snapshot_id.unwrap_or_default(),
644                        main_ref.snapshot_id
645                    ),
646                ));
647            }
648        } else if main_ref.is_some() {
649            return Err(Error::new(
650                ErrorKind::DataInvalid,
651                "Current snapshot is not set, but main branch exists",
652            ));
653        }
654
655        Ok(())
656    }
657
658    /// Validate that for V1 Metadata the last_sequence_number is 0
659    fn validate_snapshot_sequence_number(&self) -> Result<()> {
660        if self.format_version < FormatVersion::V2 && self.last_sequence_number != 0 {
661            return Err(Error::new(
662                ErrorKind::DataInvalid,
663                format!(
664                    "Last sequence number must be 0 in v1. Found {}",
665                    self.last_sequence_number
666                ),
667            ));
668        }
669
670        if self.format_version >= FormatVersion::V2
671            && let Some(snapshot) = self
672                .snapshots
673                .values()
674                .find(|snapshot| snapshot.sequence_number() > self.last_sequence_number)
675        {
676            return Err(Error::new(
677                ErrorKind::DataInvalid,
678                format!(
679                    "Invalid snapshot with id {} and sequence number {} greater than last sequence number {}",
680                    snapshot.snapshot_id(),
681                    snapshot.sequence_number(),
682                    self.last_sequence_number
683                ),
684            ));
685        }
686
687        Ok(())
688    }
689
690    /// Validate snapshots logs are chronological and last updated is after the last snapshot log.
691    fn validate_chronological_snapshot_logs(&self) -> Result<()> {
692        for window in self.snapshot_log.windows(2) {
693            let (prev, curr) = (&window[0], &window[1]);
694            // commits can happen concurrently from different machines.
695            // A tolerance helps us avoid failure for small clock skew
696            if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
697                return Err(Error::new(
698                    ErrorKind::DataInvalid,
699                    "Expected sorted snapshot log entries",
700                ));
701            }
702        }
703
704        if let Some(last) = self.snapshot_log.last() {
705            // commits can happen concurrently from different machines.
706            // A tolerance helps us avoid failure for small clock skew
707            if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
708                return Err(Error::new(
709                    ErrorKind::DataInvalid,
710                    format!(
711                        "Invalid update timestamp {}: before last snapshot log entry at {}",
712                        self.last_updated_ms, last.timestamp_ms
713                    ),
714                ));
715            }
716        }
717        Ok(())
718    }
719
720    fn validate_chronological_metadata_logs(&self) -> Result<()> {
721        for window in self.metadata_log.windows(2) {
722            let (prev, curr) = (&window[0], &window[1]);
723            // commits can happen concurrently from different machines.
724            // A tolerance helps us avoid failure for small clock skew
725            if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
726                return Err(Error::new(
727                    ErrorKind::DataInvalid,
728                    "Expected sorted metadata log entries",
729                ));
730            }
731        }
732
733        if let Some(last) = self.metadata_log.last() {
734            // commits can happen concurrently from different machines.
735            // A tolerance helps us avoid failure for small clock skew
736            if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
737                return Err(Error::new(
738                    ErrorKind::DataInvalid,
739                    format!(
740                        "Invalid update timestamp {}: before last metadata log entry at {}",
741                        self.last_updated_ms, last.timestamp_ms
742                    ),
743                ));
744            }
745        }
746
747        Ok(())
748    }
749}
750
751pub(super) mod _serde {
752    use std::borrow::BorrowMut;
753    /// This is a helper module that defines types to help with serialization/deserialization.
754    /// For deserialization the input first gets read into either the [TableMetadataV1] or [TableMetadataV2] struct
755    /// and then converted into the [TableMetadata] struct. Serialization works the other way around.
756    /// [TableMetadataV1] and [TableMetadataV2] are internal struct that are only used for serialization and deserialization.
757    use std::collections::HashMap;
758    /// This is a helper module that defines types to help with serialization/deserialization.
759    /// For deserialization the input first gets read into either the [TableMetadataV1] or [TableMetadataV2] struct
760    /// and then converted into the [TableMetadata] struct. Serialization works the other way around.
761    /// [TableMetadataV1] and [TableMetadataV2] are internal struct that are only used for serialization and deserialization.
762    use std::sync::Arc;
763
764    use serde::{Deserialize, Serialize};
765    use uuid::Uuid;
766
767    use super::{
768        DEFAULT_PARTITION_SPEC_ID, FormatVersion, MAIN_BRANCH, MetadataLog, SnapshotLog,
769        TableMetadata,
770    };
771    use crate::spec::schema::_serde::{SchemaV1, SchemaV2};
772    use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2, SnapshotV3};
773    use crate::spec::{
774        EncryptedKey, INITIAL_ROW_ID, PartitionField, PartitionSpec, PartitionSpecRef,
775        PartitionStatisticsFile, Schema, SchemaRef, Snapshot, SnapshotReference, SnapshotRetention,
776        SortOrder, StatisticsFile,
777    };
778    use crate::{Error, ErrorKind};
779
780    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
781    #[serde(untagged)]
782    pub(super) enum TableMetadataEnum {
783        V3(TableMetadataV3),
784        V2(TableMetadataV2),
785        V1(TableMetadataV1),
786    }
787
788    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
789    #[serde(rename_all = "kebab-case")]
790    /// Defines the structure of a v2 table metadata for serialization/deserialization
791    pub(super) struct TableMetadataV3 {
792        pub format_version: VersionNumber<3>,
793        #[serde(flatten)]
794        pub shared: TableMetadataV2V3Shared,
795        pub next_row_id: u64,
796        #[serde(skip_serializing_if = "Option::is_none")]
797        pub encryption_keys: Option<Vec<EncryptedKey>>,
798        #[serde(skip_serializing_if = "Option::is_none")]
799        pub snapshots: Option<Vec<SnapshotV3>>,
800    }
801
802    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
803    #[serde(rename_all = "kebab-case")]
804    /// Defines the structure of a v2 table metadata for serialization/deserialization
805    pub(super) struct TableMetadataV2V3Shared {
806        pub table_uuid: Uuid,
807        pub location: String,
808        pub last_sequence_number: i64,
809        pub last_updated_ms: i64,
810        pub last_column_id: i32,
811        pub schemas: Vec<SchemaV2>,
812        pub current_schema_id: i32,
813        pub partition_specs: Vec<PartitionSpec>,
814        pub default_spec_id: i32,
815        pub last_partition_id: i32,
816        #[serde(skip_serializing_if = "Option::is_none")]
817        pub properties: Option<HashMap<String, String>>,
818        #[serde(skip_serializing_if = "Option::is_none")]
819        pub current_snapshot_id: Option<i64>,
820        #[serde(skip_serializing_if = "Option::is_none")]
821        pub snapshot_log: Option<Vec<SnapshotLog>>,
822        #[serde(skip_serializing_if = "Option::is_none")]
823        pub metadata_log: Option<Vec<MetadataLog>>,
824        pub sort_orders: Vec<SortOrder>,
825        pub default_sort_order_id: i64,
826        #[serde(skip_serializing_if = "Option::is_none")]
827        pub refs: Option<HashMap<String, SnapshotReference>>,
828        #[serde(default, skip_serializing_if = "Vec::is_empty")]
829        pub statistics: Vec<StatisticsFile>,
830        #[serde(default, skip_serializing_if = "Vec::is_empty")]
831        pub partition_statistics: Vec<PartitionStatisticsFile>,
832    }
833
834    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
835    #[serde(rename_all = "kebab-case")]
836    /// Defines the structure of a v2 table metadata for serialization/deserialization
837    pub(super) struct TableMetadataV2 {
838        pub format_version: VersionNumber<2>,
839        #[serde(flatten)]
840        pub shared: TableMetadataV2V3Shared,
841        #[serde(skip_serializing_if = "Option::is_none")]
842        pub snapshots: Option<Vec<SnapshotV2>>,
843    }
844
845    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
846    #[serde(rename_all = "kebab-case")]
847    /// Defines the structure of a v1 table metadata for serialization/deserialization
848    pub(super) struct TableMetadataV1 {
849        pub format_version: VersionNumber<1>,
850        #[serde(skip_serializing_if = "Option::is_none")]
851        pub table_uuid: Option<Uuid>,
852        pub location: String,
853        pub last_updated_ms: i64,
854        pub last_column_id: i32,
855        /// `schema` is optional to prioritize `schemas` and `current-schema-id`, allowing liberal reading of V1 metadata.
856        pub schema: Option<SchemaV1>,
857        #[serde(skip_serializing_if = "Option::is_none")]
858        pub schemas: Option<Vec<SchemaV1>>,
859        #[serde(skip_serializing_if = "Option::is_none")]
860        pub current_schema_id: Option<i32>,
861        /// `partition_spec` is optional to prioritize `partition_specs`, aligning with liberal reading of potentially invalid V1 metadata.
862        pub partition_spec: Option<Vec<PartitionField>>,
863        #[serde(skip_serializing_if = "Option::is_none")]
864        pub partition_specs: Option<Vec<PartitionSpec>>,
865        #[serde(skip_serializing_if = "Option::is_none")]
866        pub default_spec_id: Option<i32>,
867        #[serde(skip_serializing_if = "Option::is_none")]
868        pub last_partition_id: Option<i32>,
869        #[serde(skip_serializing_if = "Option::is_none")]
870        pub properties: Option<HashMap<String, String>>,
871        #[serde(skip_serializing_if = "Option::is_none")]
872        pub current_snapshot_id: Option<i64>,
873        #[serde(skip_serializing_if = "Option::is_none")]
874        pub snapshots: Option<Vec<SnapshotV1>>,
875        #[serde(skip_serializing_if = "Option::is_none")]
876        pub snapshot_log: Option<Vec<SnapshotLog>>,
877        #[serde(skip_serializing_if = "Option::is_none")]
878        pub metadata_log: Option<Vec<MetadataLog>>,
879        pub sort_orders: Option<Vec<SortOrder>>,
880        pub default_sort_order_id: Option<i64>,
881        #[serde(default, skip_serializing_if = "Vec::is_empty")]
882        pub statistics: Vec<StatisticsFile>,
883        #[serde(default, skip_serializing_if = "Vec::is_empty")]
884        pub partition_statistics: Vec<PartitionStatisticsFile>,
885    }
886
887    /// Helper to serialize and deserialize the format version.
888    #[derive(Debug, PartialEq, Eq)]
889    pub(crate) struct VersionNumber<const V: u8>;
890
891    impl Serialize for TableMetadata {
892        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
893        where S: serde::Serializer {
894            // we must do a clone here
895            let table_metadata_enum: TableMetadataEnum =
896                self.clone().try_into().map_err(serde::ser::Error::custom)?;
897
898            table_metadata_enum.serialize(serializer)
899        }
900    }
901
902    impl<const V: u8> Serialize for VersionNumber<V> {
903        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
904        where S: serde::Serializer {
905            serializer.serialize_u8(V)
906        }
907    }
908
909    impl<'de, const V: u8> Deserialize<'de> for VersionNumber<V> {
910        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
911        where D: serde::Deserializer<'de> {
912            let value = u8::deserialize(deserializer)?;
913            if value == V {
914                Ok(VersionNumber::<V>)
915            } else {
916                Err(serde::de::Error::custom("Invalid Version"))
917            }
918        }
919    }
920
921    impl TryFrom<TableMetadataEnum> for TableMetadata {
922        type Error = Error;
923        fn try_from(value: TableMetadataEnum) -> Result<Self, Error> {
924            match value {
925                TableMetadataEnum::V3(value) => value.try_into(),
926                TableMetadataEnum::V2(value) => value.try_into(),
927                TableMetadataEnum::V1(value) => value.try_into(),
928            }
929        }
930    }
931
932    impl TryFrom<TableMetadata> for TableMetadataEnum {
933        type Error = Error;
934        fn try_from(value: TableMetadata) -> Result<Self, Error> {
935            Ok(match value.format_version {
936                FormatVersion::V3 => TableMetadataEnum::V3(value.try_into()?),
937                FormatVersion::V2 => TableMetadataEnum::V2(value.into()),
938                FormatVersion::V1 => TableMetadataEnum::V1(value.try_into()?),
939            })
940        }
941    }
942
943    impl TryFrom<TableMetadataV3> for TableMetadata {
944        type Error = Error;
945        fn try_from(value: TableMetadataV3) -> Result<Self, self::Error> {
946            let TableMetadataV3 {
947                format_version: _,
948                shared: value,
949                next_row_id,
950                encryption_keys,
951                snapshots,
952            } = value;
953            let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
954                None
955            } else {
956                value.current_snapshot_id
957            };
958            let schemas = HashMap::from_iter(
959                value
960                    .schemas
961                    .into_iter()
962                    .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
963                    .collect::<Result<Vec<_>, Error>>()?,
964            );
965
966            let current_schema: &SchemaRef =
967                schemas.get(&value.current_schema_id).ok_or_else(|| {
968                    Error::new(
969                        ErrorKind::DataInvalid,
970                        format!(
971                            "No schema exists with the current schema id {}.",
972                            value.current_schema_id
973                        ),
974                    )
975                })?;
976            let partition_specs = HashMap::from_iter(
977                value
978                    .partition_specs
979                    .into_iter()
980                    .map(|x| (x.spec_id(), Arc::new(x))),
981            );
982            let default_spec_id = value.default_spec_id;
983            let default_spec: PartitionSpecRef = partition_specs
984                .get(&value.default_spec_id)
985                .map(|spec| (**spec).clone())
986                .or_else(|| {
987                    (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
988                        .then(PartitionSpec::unpartition_spec)
989                })
990                .ok_or_else(|| {
991                    Error::new(
992                        ErrorKind::DataInvalid,
993                        format!("Default partition spec {default_spec_id} not found"),
994                    )
995                })?
996                .into();
997            let default_partition_type = default_spec.partition_type(current_schema)?;
998
999            let mut metadata = TableMetadata {
1000                format_version: FormatVersion::V3,
1001                table_uuid: value.table_uuid,
1002                location: value.location,
1003                last_sequence_number: value.last_sequence_number,
1004                last_updated_ms: value.last_updated_ms,
1005                last_column_id: value.last_column_id,
1006                current_schema_id: value.current_schema_id,
1007                schemas,
1008                partition_specs,
1009                default_partition_type,
1010                default_spec,
1011                last_partition_id: value.last_partition_id,
1012                properties: value.properties.unwrap_or_default(),
1013                current_snapshot_id,
1014                snapshots: snapshots
1015                    .map(|snapshots| {
1016                        HashMap::from_iter(
1017                            snapshots
1018                                .into_iter()
1019                                .map(|x| (x.snapshot_id, Arc::new(x.into()))),
1020                        )
1021                    })
1022                    .unwrap_or_default(),
1023                snapshot_log: value.snapshot_log.unwrap_or_default(),
1024                metadata_log: value.metadata_log.unwrap_or_default(),
1025                sort_orders: HashMap::from_iter(
1026                    value
1027                        .sort_orders
1028                        .into_iter()
1029                        .map(|x| (x.order_id, Arc::new(x))),
1030                ),
1031                default_sort_order_id: value.default_sort_order_id,
1032                refs: value.refs.unwrap_or_else(|| {
1033                    if let Some(snapshot_id) = current_snapshot_id {
1034                        HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1035                            snapshot_id,
1036                            retention: SnapshotRetention::Branch {
1037                                min_snapshots_to_keep: None,
1038                                max_snapshot_age_ms: None,
1039                                max_ref_age_ms: None,
1040                            },
1041                        })])
1042                    } else {
1043                        HashMap::new()
1044                    }
1045                }),
1046                statistics: index_statistics(value.statistics),
1047                partition_statistics: index_partition_statistics(value.partition_statistics),
1048                encryption_keys: encryption_keys
1049                    .map(|keys| {
1050                        HashMap::from_iter(keys.into_iter().map(|key| (key.key_id.clone(), key)))
1051                    })
1052                    .unwrap_or_default(),
1053                next_row_id,
1054            };
1055
1056            metadata.borrow_mut().try_normalize()?;
1057            Ok(metadata)
1058        }
1059    }
1060
1061    impl TryFrom<TableMetadataV2> for TableMetadata {
1062        type Error = Error;
1063        fn try_from(value: TableMetadataV2) -> Result<Self, self::Error> {
1064            let snapshots = value.snapshots;
1065            let value = value.shared;
1066            let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1067                None
1068            } else {
1069                value.current_snapshot_id
1070            };
1071            let schemas = HashMap::from_iter(
1072                value
1073                    .schemas
1074                    .into_iter()
1075                    .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
1076                    .collect::<Result<Vec<_>, Error>>()?,
1077            );
1078
1079            let current_schema: &SchemaRef =
1080                schemas.get(&value.current_schema_id).ok_or_else(|| {
1081                    Error::new(
1082                        ErrorKind::DataInvalid,
1083                        format!(
1084                            "No schema exists with the current schema id {}.",
1085                            value.current_schema_id
1086                        ),
1087                    )
1088                })?;
1089            let partition_specs = HashMap::from_iter(
1090                value
1091                    .partition_specs
1092                    .into_iter()
1093                    .map(|x| (x.spec_id(), Arc::new(x))),
1094            );
1095            let default_spec_id = value.default_spec_id;
1096            let default_spec: PartitionSpecRef = partition_specs
1097                .get(&value.default_spec_id)
1098                .map(|spec| (**spec).clone())
1099                .or_else(|| {
1100                    (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
1101                        .then(PartitionSpec::unpartition_spec)
1102                })
1103                .ok_or_else(|| {
1104                    Error::new(
1105                        ErrorKind::DataInvalid,
1106                        format!("Default partition spec {default_spec_id} not found"),
1107                    )
1108                })?
1109                .into();
1110            let default_partition_type = default_spec.partition_type(current_schema)?;
1111
1112            let mut metadata = TableMetadata {
1113                format_version: FormatVersion::V2,
1114                table_uuid: value.table_uuid,
1115                location: value.location,
1116                last_sequence_number: value.last_sequence_number,
1117                last_updated_ms: value.last_updated_ms,
1118                last_column_id: value.last_column_id,
1119                current_schema_id: value.current_schema_id,
1120                schemas,
1121                partition_specs,
1122                default_partition_type,
1123                default_spec,
1124                last_partition_id: value.last_partition_id,
1125                properties: value.properties.unwrap_or_default(),
1126                current_snapshot_id,
1127                snapshots: snapshots
1128                    .map(|snapshots| {
1129                        HashMap::from_iter(
1130                            snapshots
1131                                .into_iter()
1132                                .map(|x| (x.snapshot_id, Arc::new(x.into()))),
1133                        )
1134                    })
1135                    .unwrap_or_default(),
1136                snapshot_log: value.snapshot_log.unwrap_or_default(),
1137                metadata_log: value.metadata_log.unwrap_or_default(),
1138                sort_orders: HashMap::from_iter(
1139                    value
1140                        .sort_orders
1141                        .into_iter()
1142                        .map(|x| (x.order_id, Arc::new(x))),
1143                ),
1144                default_sort_order_id: value.default_sort_order_id,
1145                refs: value.refs.unwrap_or_else(|| {
1146                    if let Some(snapshot_id) = current_snapshot_id {
1147                        HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1148                            snapshot_id,
1149                            retention: SnapshotRetention::Branch {
1150                                min_snapshots_to_keep: None,
1151                                max_snapshot_age_ms: None,
1152                                max_ref_age_ms: None,
1153                            },
1154                        })])
1155                    } else {
1156                        HashMap::new()
1157                    }
1158                }),
1159                statistics: index_statistics(value.statistics),
1160                partition_statistics: index_partition_statistics(value.partition_statistics),
1161                encryption_keys: HashMap::new(),
1162                next_row_id: INITIAL_ROW_ID,
1163            };
1164
1165            metadata.borrow_mut().try_normalize()?;
1166            Ok(metadata)
1167        }
1168    }
1169
1170    impl TryFrom<TableMetadataV1> for TableMetadata {
1171        type Error = Error;
1172        fn try_from(value: TableMetadataV1) -> Result<Self, Error> {
1173            let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1174                None
1175            } else {
1176                value.current_snapshot_id
1177            };
1178
1179            let (schemas, current_schema_id, current_schema) =
1180                if let (Some(schemas_vec), Some(schema_id)) =
1181                    (&value.schemas, value.current_schema_id)
1182                {
1183                    // Option 1: Use 'schemas' + 'current_schema_id'
1184                    let schema_map = HashMap::from_iter(
1185                        schemas_vec
1186                            .clone()
1187                            .into_iter()
1188                            .map(|schema| {
1189                                let schema: Schema = schema.try_into()?;
1190                                Ok((schema.schema_id(), Arc::new(schema)))
1191                            })
1192                            .collect::<Result<Vec<_>, Error>>()?,
1193                    );
1194
1195                    let schema = schema_map
1196                        .get(&schema_id)
1197                        .ok_or_else(|| {
1198                            Error::new(
1199                                ErrorKind::DataInvalid,
1200                                format!("No schema exists with the current schema id {schema_id}."),
1201                            )
1202                        })?
1203                        .clone();
1204                    (schema_map, schema_id, schema)
1205                } else if let Some(schema) = value.schema {
1206                    // Option 2: Fall back to `schema`
1207                    let schema: Schema = schema.try_into()?;
1208                    let schema_id = schema.schema_id();
1209                    let schema_arc = Arc::new(schema);
1210                    let schema_map = HashMap::from_iter(vec![(schema_id, schema_arc.clone())]);
1211                    (schema_map, schema_id, schema_arc)
1212                } else {
1213                    // Option 3: No valid schema configuration found
1214                    return Err(Error::new(
1215                        ErrorKind::DataInvalid,
1216                        "No valid schema configuration found in table metadata",
1217                    ));
1218                };
1219
1220            // Prioritize 'partition_specs' over 'partition_spec'
1221            let partition_specs = if let Some(specs_vec) = value.partition_specs {
1222                // Option 1: Use 'partition_specs'
1223                specs_vec
1224                    .into_iter()
1225                    .map(|x| (x.spec_id(), Arc::new(x)))
1226                    .collect::<HashMap<_, _>>()
1227            } else if let Some(partition_spec) = value.partition_spec {
1228                // Option 2: Fall back to 'partition_spec'
1229                let spec = PartitionSpec::builder(current_schema.clone())
1230                    .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1231                    .add_unbound_fields(partition_spec.into_iter().map(|f| f.into_unbound()))?
1232                    .build()?;
1233
1234                HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1235            } else {
1236                // Option 3: Create empty partition spec
1237                let spec = PartitionSpec::builder(current_schema.clone())
1238                    .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1239                    .build()?;
1240
1241                HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1242            };
1243
1244            // Get the default_spec_id, prioritizing the explicit value if provided
1245            let default_spec_id = value
1246                .default_spec_id
1247                .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default());
1248
1249            // Get the default spec
1250            let default_spec: PartitionSpecRef = partition_specs
1251                .get(&default_spec_id)
1252                .map(|x| Arc::unwrap_or_clone(x.clone()))
1253                .ok_or_else(|| {
1254                    Error::new(
1255                        ErrorKind::DataInvalid,
1256                        format!("Default partition spec {default_spec_id} not found"),
1257                    )
1258                })?
1259                .into();
1260            let default_partition_type = default_spec.partition_type(&current_schema)?;
1261
1262            let mut metadata = TableMetadata {
1263                format_version: FormatVersion::V1,
1264                table_uuid: value.table_uuid.unwrap_or_default(),
1265                location: value.location,
1266                last_sequence_number: 0,
1267                last_updated_ms: value.last_updated_ms,
1268                last_column_id: value.last_column_id,
1269                current_schema_id,
1270                default_spec,
1271                default_partition_type,
1272                last_partition_id: value
1273                    .last_partition_id
1274                    .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()),
1275                partition_specs,
1276                schemas,
1277                properties: value.properties.unwrap_or_default(),
1278                current_snapshot_id,
1279                snapshots: value
1280                    .snapshots
1281                    .map(|snapshots| {
1282                        Ok::<_, Error>(HashMap::from_iter(
1283                            snapshots
1284                                .into_iter()
1285                                .map(|x| Ok((x.snapshot_id, Arc::new(x.try_into()?))))
1286                                .collect::<Result<Vec<_>, Error>>()?,
1287                        ))
1288                    })
1289                    .transpose()?
1290                    .unwrap_or_default(),
1291                snapshot_log: value.snapshot_log.unwrap_or_default(),
1292                metadata_log: value.metadata_log.unwrap_or_default(),
1293                sort_orders: match value.sort_orders {
1294                    Some(sort_orders) => HashMap::from_iter(
1295                        sort_orders.into_iter().map(|x| (x.order_id, Arc::new(x))),
1296                    ),
1297                    None => HashMap::new(),
1298                },
1299                default_sort_order_id: value
1300                    .default_sort_order_id
1301                    .unwrap_or(SortOrder::UNSORTED_ORDER_ID),
1302                refs: if let Some(snapshot_id) = current_snapshot_id {
1303                    HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1304                        snapshot_id,
1305                        retention: SnapshotRetention::Branch {
1306                            min_snapshots_to_keep: None,
1307                            max_snapshot_age_ms: None,
1308                            max_ref_age_ms: None,
1309                        },
1310                    })])
1311                } else {
1312                    HashMap::new()
1313                },
1314                statistics: index_statistics(value.statistics),
1315                partition_statistics: index_partition_statistics(value.partition_statistics),
1316                encryption_keys: HashMap::new(),
1317                next_row_id: INITIAL_ROW_ID, // v1 has no row lineage
1318            };
1319
1320            metadata.borrow_mut().try_normalize()?;
1321            Ok(metadata)
1322        }
1323    }
1324
1325    impl TryFrom<TableMetadata> for TableMetadataV3 {
1326        type Error = Error;
1327
1328        fn try_from(mut v: TableMetadata) -> Result<Self, Self::Error> {
1329            let next_row_id = v.next_row_id;
1330            let encryption_keys = std::mem::take(&mut v.encryption_keys);
1331            let snapshots = std::mem::take(&mut v.snapshots);
1332            let shared = v.into();
1333
1334            Ok(TableMetadataV3 {
1335                format_version: VersionNumber::<3>,
1336                shared,
1337                next_row_id,
1338                encryption_keys: if encryption_keys.is_empty() {
1339                    None
1340                } else {
1341                    Some(encryption_keys.into_values().collect())
1342                },
1343                snapshots: if snapshots.is_empty() {
1344                    None
1345                } else {
1346                    Some(
1347                        snapshots
1348                            .into_values()
1349                            .map(|s| SnapshotV3::try_from(Arc::unwrap_or_clone(s)))
1350                            .collect::<Result<_, _>>()?,
1351                    )
1352                },
1353            })
1354        }
1355    }
1356
1357    impl From<TableMetadata> for TableMetadataV2 {
1358        fn from(mut v: TableMetadata) -> Self {
1359            let snapshots = std::mem::take(&mut v.snapshots);
1360            let shared = v.into();
1361
1362            TableMetadataV2 {
1363                format_version: VersionNumber::<2>,
1364                shared,
1365                snapshots: if snapshots.is_empty() {
1366                    None
1367                } else {
1368                    Some(
1369                        snapshots
1370                            .into_values()
1371                            .map(|s| SnapshotV2::from(Arc::unwrap_or_clone(s)))
1372                            .collect(),
1373                    )
1374                },
1375            }
1376        }
1377    }
1378
1379    impl From<TableMetadata> for TableMetadataV2V3Shared {
1380        fn from(v: TableMetadata) -> Self {
1381            TableMetadataV2V3Shared {
1382                table_uuid: v.table_uuid,
1383                location: v.location,
1384                last_sequence_number: v.last_sequence_number,
1385                last_updated_ms: v.last_updated_ms,
1386                last_column_id: v.last_column_id,
1387                schemas: v
1388                    .schemas
1389                    .into_values()
1390                    .map(|x| {
1391                        Arc::try_unwrap(x)
1392                            .unwrap_or_else(|schema| schema.as_ref().clone())
1393                            .into()
1394                    })
1395                    .collect(),
1396                current_schema_id: v.current_schema_id,
1397                partition_specs: v
1398                    .partition_specs
1399                    .into_values()
1400                    .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1401                    .collect(),
1402                default_spec_id: v.default_spec.spec_id(),
1403                last_partition_id: v.last_partition_id,
1404                properties: if v.properties.is_empty() {
1405                    None
1406                } else {
1407                    Some(v.properties)
1408                },
1409                current_snapshot_id: v.current_snapshot_id,
1410                snapshot_log: if v.snapshot_log.is_empty() {
1411                    None
1412                } else {
1413                    Some(v.snapshot_log)
1414                },
1415                metadata_log: if v.metadata_log.is_empty() {
1416                    None
1417                } else {
1418                    Some(v.metadata_log)
1419                },
1420                sort_orders: v
1421                    .sort_orders
1422                    .into_values()
1423                    .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1424                    .collect(),
1425                default_sort_order_id: v.default_sort_order_id,
1426                refs: Some(v.refs),
1427                statistics: v.statistics.into_values().collect(),
1428                partition_statistics: v.partition_statistics.into_values().collect(),
1429            }
1430        }
1431    }
1432
1433    impl TryFrom<TableMetadata> for TableMetadataV1 {
1434        type Error = Error;
1435        fn try_from(v: TableMetadata) -> Result<Self, Error> {
1436            Ok(TableMetadataV1 {
1437                format_version: VersionNumber::<1>,
1438                table_uuid: Some(v.table_uuid),
1439                location: v.location,
1440                last_updated_ms: v.last_updated_ms,
1441                last_column_id: v.last_column_id,
1442                schema: Some(
1443                    v.schemas
1444                        .get(&v.current_schema_id)
1445                        .ok_or(Error::new(
1446                            ErrorKind::Unexpected,
1447                            "current_schema_id not found in schemas",
1448                        ))?
1449                        .as_ref()
1450                        .clone()
1451                        .into(),
1452                ),
1453                schemas: Some(
1454                    v.schemas
1455                        .into_values()
1456                        .map(|x| {
1457                            Arc::try_unwrap(x)
1458                                .unwrap_or_else(|schema| schema.as_ref().clone())
1459                                .into()
1460                        })
1461                        .collect(),
1462                ),
1463                current_schema_id: Some(v.current_schema_id),
1464                partition_spec: Some(v.default_spec.fields().to_vec()),
1465                partition_specs: Some(
1466                    v.partition_specs
1467                        .into_values()
1468                        .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1469                        .collect(),
1470                ),
1471                default_spec_id: Some(v.default_spec.spec_id()),
1472                last_partition_id: Some(v.last_partition_id),
1473                properties: if v.properties.is_empty() {
1474                    None
1475                } else {
1476                    Some(v.properties)
1477                },
1478                current_snapshot_id: v.current_snapshot_id,
1479                snapshots: if v.snapshots.is_empty() {
1480                    None
1481                } else {
1482                    Some(
1483                        v.snapshots
1484                            .into_values()
1485                            .map(|x| Snapshot::clone(&x).into())
1486                            .collect(),
1487                    )
1488                },
1489                snapshot_log: if v.snapshot_log.is_empty() {
1490                    None
1491                } else {
1492                    Some(v.snapshot_log)
1493                },
1494                metadata_log: if v.metadata_log.is_empty() {
1495                    None
1496                } else {
1497                    Some(v.metadata_log)
1498                },
1499                sort_orders: Some(
1500                    v.sort_orders
1501                        .into_values()
1502                        .map(|s| Arc::try_unwrap(s).unwrap_or_else(|s| s.as_ref().clone()))
1503                        .collect(),
1504                ),
1505                default_sort_order_id: Some(v.default_sort_order_id),
1506                statistics: v.statistics.into_values().collect(),
1507                partition_statistics: v.partition_statistics.into_values().collect(),
1508            })
1509        }
1510    }
1511
1512    fn index_statistics(statistics: Vec<StatisticsFile>) -> HashMap<i64, StatisticsFile> {
1513        statistics
1514            .into_iter()
1515            .rev()
1516            .map(|s| (s.snapshot_id, s))
1517            .collect()
1518    }
1519
1520    fn index_partition_statistics(
1521        statistics: Vec<PartitionStatisticsFile>,
1522    ) -> HashMap<i64, PartitionStatisticsFile> {
1523        statistics
1524            .into_iter()
1525            .rev()
1526            .map(|s| (s.snapshot_id, s))
1527            .collect()
1528    }
1529}
1530
1531#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy, Hash)]
1532#[repr(u8)]
1533/// Iceberg format version
1534pub enum FormatVersion {
1535    /// Iceberg spec version 1
1536    V1 = 1u8,
1537    /// Iceberg spec version 2
1538    V2 = 2u8,
1539    /// Iceberg spec version 3
1540    V3 = 3u8,
1541}
1542
1543impl PartialOrd for FormatVersion {
1544    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1545        Some(self.cmp(other))
1546    }
1547}
1548
1549impl Ord for FormatVersion {
1550    fn cmp(&self, other: &Self) -> Ordering {
1551        (*self as u8).cmp(&(*other as u8))
1552    }
1553}
1554
1555impl Display for FormatVersion {
1556    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1557        match self {
1558            FormatVersion::V1 => write!(f, "v1"),
1559            FormatVersion::V2 => write!(f, "v2"),
1560            FormatVersion::V3 => write!(f, "v3"),
1561        }
1562    }
1563}
1564
1565#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1566#[serde(rename_all = "kebab-case")]
1567/// Encodes changes to the previous metadata files for the table
1568pub struct MetadataLog {
1569    /// The file for the log.
1570    pub metadata_file: String,
1571    /// Time new metadata was created
1572    pub timestamp_ms: i64,
1573}
1574
1575#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1576#[serde(rename_all = "kebab-case")]
1577/// A log of when each snapshot was made.
1578pub struct SnapshotLog {
1579    /// Id of the snapshot.
1580    pub snapshot_id: i64,
1581    /// Last updated timestamp
1582    pub timestamp_ms: i64,
1583}
1584
1585impl SnapshotLog {
1586    /// Returns the last updated timestamp as a DateTime<Utc> with millisecond precision
1587    pub fn timestamp(self) -> Result<DateTime<Utc>> {
1588        timestamp_ms_to_utc(self.timestamp_ms)
1589    }
1590
1591    /// Returns the timestamp in milliseconds
1592    #[inline]
1593    pub fn timestamp_ms(&self) -> i64 {
1594        self.timestamp_ms
1595    }
1596}
1597
1598#[cfg(test)]
1599mod tests {
1600    use std::collections::HashMap;
1601    use std::fs;
1602    use std::sync::Arc;
1603
1604    use anyhow::Result;
1605    use base64::Engine as _;
1606    use pretty_assertions::assert_eq;
1607    use tempfile::TempDir;
1608    use uuid::Uuid;
1609
1610    use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder};
1611    use crate::catalog::MetadataLocation;
1612    use crate::compression::CompressionCodec;
1613    use crate::io::FileIO;
1614    use crate::spec::table_metadata::TableMetadata;
1615    use crate::spec::{
1616        BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation,
1617        PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot,
1618        SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile,
1619        Summary, TableProperties, Transform, Type, UnboundPartitionField,
1620    };
1621    use crate::{ErrorKind, TableCreation};
1622
1623    fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
1624        let desered_type: TableMetadata = serde_json::from_str(json).unwrap();
1625        assert_eq!(desered_type, expected_type);
1626
1627        let sered_json = serde_json::to_string(&expected_type).unwrap();
1628        let parsed_json_value = serde_json::from_str::<TableMetadata>(&sered_json).unwrap();
1629
1630        assert_eq!(parsed_json_value, desered_type);
1631    }
1632
1633    fn get_test_table_metadata(file_name: &str) -> TableMetadata {
1634        let path = format!("testdata/table_metadata/{file_name}");
1635        let metadata: String = fs::read_to_string(path).unwrap();
1636
1637        serde_json::from_str(&metadata).unwrap()
1638    }
1639
1640    #[test]
1641    fn test_table_data_v2() {
1642        let data = r#"
1643            {
1644                "format-version" : 2,
1645                "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1646                "location": "s3://b/wh/data.db/table",
1647                "last-sequence-number" : 1,
1648                "last-updated-ms": 1515100955770,
1649                "last-column-id": 1,
1650                "schemas": [
1651                    {
1652                        "schema-id" : 1,
1653                        "type" : "struct",
1654                        "fields" :[
1655                            {
1656                                "id": 1,
1657                                "name": "struct_name",
1658                                "required": true,
1659                                "type": "fixed[1]"
1660                            },
1661                            {
1662                                "id": 4,
1663                                "name": "ts",
1664                                "required": true,
1665                                "type": "timestamp"
1666                            }
1667                        ]
1668                    }
1669                ],
1670                "current-schema-id" : 1,
1671                "partition-specs": [
1672                    {
1673                        "spec-id": 0,
1674                        "fields": [
1675                            {
1676                                "source-id": 4,
1677                                "field-id": 1000,
1678                                "name": "ts_day",
1679                                "transform": "day"
1680                            }
1681                        ]
1682                    }
1683                ],
1684                "default-spec-id": 0,
1685                "last-partition-id": 1000,
1686                "properties": {
1687                    "commit.retry.num-retries": "1"
1688                },
1689                "metadata-log": [
1690                    {
1691                        "metadata-file": "s3://bucket/.../v1.json",
1692                        "timestamp-ms": 1515100
1693                    }
1694                ],
1695                "refs": {},
1696                "sort-orders": [
1697                    {
1698                    "order-id": 0,
1699                    "fields": []
1700                    }
1701                ],
1702                "default-sort-order-id": 0
1703            }
1704        "#;
1705
1706        let schema = Schema::builder()
1707            .with_schema_id(1)
1708            .with_fields(vec![
1709                Arc::new(NestedField::required(
1710                    1,
1711                    "struct_name",
1712                    Type::Primitive(PrimitiveType::Fixed(1)),
1713                )),
1714                Arc::new(NestedField::required(
1715                    4,
1716                    "ts",
1717                    Type::Primitive(PrimitiveType::Timestamp),
1718                )),
1719            ])
1720            .build()
1721            .unwrap();
1722
1723        let partition_spec = PartitionSpec::builder(schema.clone())
1724            .with_spec_id(0)
1725            .add_unbound_field(UnboundPartitionField {
1726                name: "ts_day".to_string(),
1727                transform: Transform::Day,
1728                source_id: 4,
1729                field_id: Some(1000),
1730            })
1731            .unwrap()
1732            .build()
1733            .unwrap();
1734
1735        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1736        let expected = TableMetadata {
1737            format_version: FormatVersion::V2,
1738            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1739            location: "s3://b/wh/data.db/table".to_string(),
1740            last_updated_ms: 1515100955770,
1741            last_column_id: 1,
1742            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1743            current_schema_id: 1,
1744            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1745            default_partition_type,
1746            default_spec: partition_spec.into(),
1747            last_partition_id: 1000,
1748            default_sort_order_id: 0,
1749            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1750            snapshots: HashMap::default(),
1751            current_snapshot_id: None,
1752            last_sequence_number: 1,
1753            properties: HashMap::from_iter(vec![(
1754                "commit.retry.num-retries".to_string(),
1755                "1".to_string(),
1756            )]),
1757            snapshot_log: Vec::new(),
1758            metadata_log: vec![MetadataLog {
1759                metadata_file: "s3://bucket/.../v1.json".to_string(),
1760                timestamp_ms: 1515100,
1761            }],
1762            refs: HashMap::new(),
1763            statistics: HashMap::new(),
1764            partition_statistics: HashMap::new(),
1765            encryption_keys: HashMap::new(),
1766            next_row_id: INITIAL_ROW_ID,
1767        };
1768
1769        let expected_json_value = serde_json::to_value(&expected).unwrap();
1770        check_table_metadata_serde(data, expected);
1771
1772        let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1773        assert_eq!(json_value, expected_json_value);
1774    }
1775
1776    #[test]
1777    fn test_table_data_v3() {
1778        let data = r#"
1779            {
1780                "format-version" : 3,
1781                "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1782                "location": "s3://b/wh/data.db/table",
1783                "last-sequence-number" : 1,
1784                "last-updated-ms": 1515100955770,
1785                "last-column-id": 1,
1786                "next-row-id": 5,
1787                "schemas": [
1788                    {
1789                        "schema-id" : 1,
1790                        "type" : "struct",
1791                        "fields" :[
1792                            {
1793                                "id": 4,
1794                                "name": "ts",
1795                                "required": true,
1796                                "type": "timestamp"
1797                            }
1798                        ]
1799                    }
1800                ],
1801                "current-schema-id" : 1,
1802                "partition-specs": [
1803                    {
1804                        "spec-id": 0,
1805                        "fields": [
1806                            {
1807                                "source-id": 4,
1808                                "field-id": 1000,
1809                                "name": "ts_day",
1810                                "transform": "day"
1811                            }
1812                        ]
1813                    }
1814                ],
1815                "default-spec-id": 0,
1816                "last-partition-id": 1000,
1817                "properties": {
1818                    "commit.retry.num-retries": "1"
1819                },
1820                "metadata-log": [
1821                    {
1822                        "metadata-file": "s3://bucket/.../v1.json",
1823                        "timestamp-ms": 1515100
1824                    }
1825                ],
1826                "refs": {},
1827                "snapshots" : [ {
1828                    "snapshot-id" : 1,
1829                    "timestamp-ms" : 1662532818843,
1830                    "sequence-number" : 0,
1831                    "first-row-id" : 0,
1832                    "added-rows" : 4,
1833                    "key-id" : "key1",
1834                    "summary" : {
1835                        "operation" : "append"
1836                    },
1837                    "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1838                    "schema-id" : 0
1839                    }
1840                ],
1841                "encryption-keys": [
1842                    {
1843                        "key-id": "key1",
1844                        "encrypted-by-id": "KMS",
1845                        "encrypted-key-metadata": "c29tZS1lbmNyeXB0aW9uLWtleQ==",
1846                        "properties": {
1847                            "p1": "v1"
1848                        }
1849                    }
1850                ],
1851                "sort-orders": [
1852                    {
1853                    "order-id": 0,
1854                    "fields": []
1855                    }
1856                ],
1857                "default-sort-order-id": 0
1858            }
1859        "#;
1860
1861        let schema = Schema::builder()
1862            .with_schema_id(1)
1863            .with_fields(vec![Arc::new(NestedField::required(
1864                4,
1865                "ts",
1866                Type::Primitive(PrimitiveType::Timestamp),
1867            ))])
1868            .build()
1869            .unwrap();
1870
1871        let partition_spec = PartitionSpec::builder(schema.clone())
1872            .with_spec_id(0)
1873            .add_unbound_field(UnboundPartitionField {
1874                name: "ts_day".to_string(),
1875                transform: Transform::Day,
1876                source_id: 4,
1877                field_id: Some(1000),
1878            })
1879            .unwrap()
1880            .build()
1881            .unwrap();
1882
1883        let snapshot = Snapshot::builder()
1884            .with_snapshot_id(1)
1885            .with_timestamp_ms(1662532818843)
1886            .with_sequence_number(0)
1887            .with_row_range(0, 4)
1888            .with_encryption_key_id(Some("key1".to_string()))
1889            .with_summary(Summary {
1890                operation: Operation::Append,
1891                additional_properties: HashMap::new(),
1892            })
1893            .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())
1894            .with_schema_id(0)
1895            .build();
1896
1897        let encryption_key = EncryptedKey::builder()
1898            .key_id("key1".to_string())
1899            .encrypted_by_id("KMS".to_string())
1900            .encrypted_key_metadata(
1901                base64::prelude::BASE64_STANDARD
1902                    .decode("c29tZS1lbmNyeXB0aW9uLWtleQ==")
1903                    .unwrap(),
1904            )
1905            .properties(HashMap::from_iter(vec![(
1906                "p1".to_string(),
1907                "v1".to_string(),
1908            )]))
1909            .build();
1910
1911        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1912        let expected = TableMetadata {
1913            format_version: FormatVersion::V3,
1914            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1915            location: "s3://b/wh/data.db/table".to_string(),
1916            last_updated_ms: 1515100955770,
1917            last_column_id: 1,
1918            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1919            current_schema_id: 1,
1920            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1921            default_partition_type,
1922            default_spec: partition_spec.into(),
1923            last_partition_id: 1000,
1924            default_sort_order_id: 0,
1925            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1926            snapshots: HashMap::from_iter(vec![(1, snapshot.into())]),
1927            current_snapshot_id: None,
1928            last_sequence_number: 1,
1929            properties: HashMap::from_iter(vec![(
1930                "commit.retry.num-retries".to_string(),
1931                "1".to_string(),
1932            )]),
1933            snapshot_log: Vec::new(),
1934            metadata_log: vec![MetadataLog {
1935                metadata_file: "s3://bucket/.../v1.json".to_string(),
1936                timestamp_ms: 1515100,
1937            }],
1938            refs: HashMap::new(),
1939            statistics: HashMap::new(),
1940            partition_statistics: HashMap::new(),
1941            encryption_keys: HashMap::from_iter(vec![("key1".to_string(), encryption_key)]),
1942            next_row_id: 5,
1943        };
1944
1945        let expected_json_value = serde_json::to_value(&expected).unwrap();
1946        check_table_metadata_serde(data, expected);
1947
1948        let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1949        assert_eq!(json_value, expected_json_value);
1950    }
1951
1952    #[test]
1953    fn test_table_data_v1() {
1954        let data = r#"
1955        {
1956            "format-version" : 1,
1957            "table-uuid" : "df838b92-0b32-465d-a44e-d39936e538b7",
1958            "location" : "/home/iceberg/warehouse/nyc/taxis",
1959            "last-updated-ms" : 1662532818843,
1960            "last-column-id" : 5,
1961            "schema" : {
1962              "type" : "struct",
1963              "schema-id" : 0,
1964              "fields" : [ {
1965                "id" : 1,
1966                "name" : "vendor_id",
1967                "required" : false,
1968                "type" : "long"
1969              }, {
1970                "id" : 2,
1971                "name" : "trip_id",
1972                "required" : false,
1973                "type" : "long"
1974              }, {
1975                "id" : 3,
1976                "name" : "trip_distance",
1977                "required" : false,
1978                "type" : "float"
1979              }, {
1980                "id" : 4,
1981                "name" : "fare_amount",
1982                "required" : false,
1983                "type" : "double"
1984              }, {
1985                "id" : 5,
1986                "name" : "store_and_fwd_flag",
1987                "required" : false,
1988                "type" : "string"
1989              } ]
1990            },
1991            "partition-spec" : [ {
1992              "name" : "vendor_id",
1993              "transform" : "identity",
1994              "source-id" : 1,
1995              "field-id" : 1000
1996            } ],
1997            "last-partition-id" : 1000,
1998            "default-sort-order-id" : 0,
1999            "sort-orders" : [ {
2000              "order-id" : 0,
2001              "fields" : [ ]
2002            } ],
2003            "properties" : {
2004              "owner" : "root"
2005            },
2006            "current-snapshot-id" : 638933773299822130,
2007            "refs" : {
2008              "main" : {
2009                "snapshot-id" : 638933773299822130,
2010                "type" : "branch"
2011              }
2012            },
2013            "snapshots" : [ {
2014              "snapshot-id" : 638933773299822130,
2015              "timestamp-ms" : 1662532818843,
2016              "sequence-number" : 0,
2017              "summary" : {
2018                "operation" : "append",
2019                "spark.app.id" : "local-1662532784305",
2020                "added-data-files" : "4",
2021                "added-records" : "4",
2022                "added-files-size" : "6001"
2023              },
2024              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2025              "schema-id" : 0
2026            } ],
2027            "snapshot-log" : [ {
2028              "timestamp-ms" : 1662532818843,
2029              "snapshot-id" : 638933773299822130
2030            } ],
2031            "metadata-log" : [ {
2032              "timestamp-ms" : 1662532805245,
2033              "metadata-file" : "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json"
2034            } ]
2035          }
2036        "#;
2037
2038        let schema = Schema::builder()
2039            .with_fields(vec![
2040                Arc::new(NestedField::optional(
2041                    1,
2042                    "vendor_id",
2043                    Type::Primitive(PrimitiveType::Long),
2044                )),
2045                Arc::new(NestedField::optional(
2046                    2,
2047                    "trip_id",
2048                    Type::Primitive(PrimitiveType::Long),
2049                )),
2050                Arc::new(NestedField::optional(
2051                    3,
2052                    "trip_distance",
2053                    Type::Primitive(PrimitiveType::Float),
2054                )),
2055                Arc::new(NestedField::optional(
2056                    4,
2057                    "fare_amount",
2058                    Type::Primitive(PrimitiveType::Double),
2059                )),
2060                Arc::new(NestedField::optional(
2061                    5,
2062                    "store_and_fwd_flag",
2063                    Type::Primitive(PrimitiveType::String),
2064                )),
2065            ])
2066            .build()
2067            .unwrap();
2068
2069        let schema = Arc::new(schema);
2070        let partition_spec = PartitionSpec::builder(schema.clone())
2071            .with_spec_id(0)
2072            .add_partition_field("vendor_id", "vendor_id", Transform::Identity)
2073            .unwrap()
2074            .build()
2075            .unwrap();
2076
2077        let sort_order = SortOrder::builder()
2078            .with_order_id(0)
2079            .build_unbound()
2080            .unwrap();
2081
2082        let snapshot = Snapshot::builder()
2083            .with_snapshot_id(638933773299822130)
2084            .with_timestamp_ms(1662532818843)
2085            .with_sequence_number(0)
2086            .with_schema_id(0)
2087            .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
2088            .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
2089            .build();
2090
2091        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2092        let expected = TableMetadata {
2093            format_version: FormatVersion::V1,
2094            table_uuid: Uuid::parse_str("df838b92-0b32-465d-a44e-d39936e538b7").unwrap(),
2095            location: "/home/iceberg/warehouse/nyc/taxis".to_string(),
2096            last_updated_ms: 1662532818843,
2097            last_column_id: 5,
2098            schemas: HashMap::from_iter(vec![(0, schema)]),
2099            current_schema_id: 0,
2100            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2101            default_partition_type,
2102            default_spec: Arc::new(partition_spec),
2103            last_partition_id: 1000,
2104            default_sort_order_id: 0,
2105            sort_orders: HashMap::from_iter(vec![(0, sort_order.into())]),
2106            snapshots: HashMap::from_iter(vec![(638933773299822130, Arc::new(snapshot))]),
2107            current_snapshot_id: Some(638933773299822130),
2108            last_sequence_number: 0,
2109            properties: HashMap::from_iter(vec![("owner".to_string(), "root".to_string())]),
2110            snapshot_log: vec![SnapshotLog {
2111                snapshot_id: 638933773299822130,
2112                timestamp_ms: 1662532818843,
2113            }],
2114            metadata_log: vec![MetadataLog { metadata_file: "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string(), timestamp_ms: 1662532805245 }],
2115            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference { snapshot_id: 638933773299822130, retention: SnapshotRetention::Branch { min_snapshots_to_keep: None, max_snapshot_age_ms: None, max_ref_age_ms: None } })]),
2116            statistics: HashMap::new(),
2117            partition_statistics: HashMap::new(),
2118            encryption_keys: HashMap::new(),
2119            next_row_id: INITIAL_ROW_ID,
2120        };
2121
2122        check_table_metadata_serde(data, expected);
2123    }
2124
2125    #[test]
2126    fn test_table_data_v2_no_snapshots() {
2127        let data = r#"
2128        {
2129            "format-version" : 2,
2130            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2131            "location": "s3://b/wh/data.db/table",
2132            "last-sequence-number" : 1,
2133            "last-updated-ms": 1515100955770,
2134            "last-column-id": 1,
2135            "schemas": [
2136                {
2137                    "schema-id" : 1,
2138                    "type" : "struct",
2139                    "fields" :[
2140                        {
2141                            "id": 1,
2142                            "name": "struct_name",
2143                            "required": true,
2144                            "type": "fixed[1]"
2145                        }
2146                    ]
2147                }
2148            ],
2149            "current-schema-id" : 1,
2150            "partition-specs": [
2151                {
2152                    "spec-id": 0,
2153                    "fields": []
2154                }
2155            ],
2156            "refs": {},
2157            "default-spec-id": 0,
2158            "last-partition-id": 1000,
2159            "metadata-log": [
2160                {
2161                    "metadata-file": "s3://bucket/.../v1.json",
2162                    "timestamp-ms": 1515100
2163                }
2164            ],
2165            "sort-orders": [
2166                {
2167                "order-id": 0,
2168                "fields": []
2169                }
2170            ],
2171            "default-sort-order-id": 0
2172        }
2173        "#;
2174
2175        let schema = Schema::builder()
2176            .with_schema_id(1)
2177            .with_fields(vec![Arc::new(NestedField::required(
2178                1,
2179                "struct_name",
2180                Type::Primitive(PrimitiveType::Fixed(1)),
2181            ))])
2182            .build()
2183            .unwrap();
2184
2185        let partition_spec = PartitionSpec::builder(schema.clone())
2186            .with_spec_id(0)
2187            .build()
2188            .unwrap();
2189
2190        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2191        let expected = TableMetadata {
2192            format_version: FormatVersion::V2,
2193            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
2194            location: "s3://b/wh/data.db/table".to_string(),
2195            last_updated_ms: 1515100955770,
2196            last_column_id: 1,
2197            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
2198            current_schema_id: 1,
2199            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2200            default_partition_type,
2201            default_spec: partition_spec.into(),
2202            last_partition_id: 1000,
2203            default_sort_order_id: 0,
2204            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2205            snapshots: HashMap::default(),
2206            current_snapshot_id: None,
2207            last_sequence_number: 1,
2208            properties: HashMap::new(),
2209            snapshot_log: Vec::new(),
2210            metadata_log: vec![MetadataLog {
2211                metadata_file: "s3://bucket/.../v1.json".to_string(),
2212                timestamp_ms: 1515100,
2213            }],
2214            refs: HashMap::new(),
2215            statistics: HashMap::new(),
2216            partition_statistics: HashMap::new(),
2217            encryption_keys: HashMap::new(),
2218            next_row_id: INITIAL_ROW_ID,
2219        };
2220
2221        let expected_json_value = serde_json::to_value(&expected).unwrap();
2222        check_table_metadata_serde(data, expected);
2223
2224        let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
2225        assert_eq!(json_value, expected_json_value);
2226    }
2227
2228    #[test]
2229    fn test_current_snapshot_id_must_match_main_branch() {
2230        let data = r#"
2231        {
2232            "format-version" : 2,
2233            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2234            "location": "s3://b/wh/data.db/table",
2235            "last-sequence-number" : 1,
2236            "last-updated-ms": 1515100955770,
2237            "last-column-id": 1,
2238            "schemas": [
2239                {
2240                    "schema-id" : 1,
2241                    "type" : "struct",
2242                    "fields" :[
2243                        {
2244                            "id": 1,
2245                            "name": "struct_name",
2246                            "required": true,
2247                            "type": "fixed[1]"
2248                        },
2249                        {
2250                            "id": 4,
2251                            "name": "ts",
2252                            "required": true,
2253                            "type": "timestamp"
2254                        }
2255                    ]
2256                }
2257            ],
2258            "current-schema-id" : 1,
2259            "partition-specs": [
2260                {
2261                    "spec-id": 0,
2262                    "fields": [
2263                        {
2264                            "source-id": 4,
2265                            "field-id": 1000,
2266                            "name": "ts_day",
2267                            "transform": "day"
2268                        }
2269                    ]
2270                }
2271            ],
2272            "default-spec-id": 0,
2273            "last-partition-id": 1000,
2274            "properties": {
2275                "commit.retry.num-retries": "1"
2276            },
2277            "metadata-log": [
2278                {
2279                    "metadata-file": "s3://bucket/.../v1.json",
2280                    "timestamp-ms": 1515100
2281                }
2282            ],
2283            "sort-orders": [
2284                {
2285                "order-id": 0,
2286                "fields": []
2287                }
2288            ],
2289            "default-sort-order-id": 0,
2290            "current-snapshot-id" : 1,
2291            "refs" : {
2292              "main" : {
2293                "snapshot-id" : 2,
2294                "type" : "branch"
2295              }
2296            },
2297            "snapshots" : [ {
2298              "snapshot-id" : 1,
2299              "timestamp-ms" : 1662532818843,
2300              "sequence-number" : 0,
2301              "summary" : {
2302                "operation" : "append",
2303                "spark.app.id" : "local-1662532784305",
2304                "added-data-files" : "4",
2305                "added-records" : "4",
2306                "added-files-size" : "6001"
2307              },
2308              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2309              "schema-id" : 0
2310            },
2311            {
2312              "snapshot-id" : 2,
2313              "timestamp-ms" : 1662532818844,
2314              "sequence-number" : 0,
2315              "summary" : {
2316                "operation" : "append",
2317                "spark.app.id" : "local-1662532784305",
2318                "added-data-files" : "4",
2319                "added-records" : "4",
2320                "added-files-size" : "6001"
2321              },
2322              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2323              "schema-id" : 0
2324            } ]
2325        }
2326    "#;
2327
2328        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2329        assert!(
2330            err.to_string()
2331                .contains("Current snapshot id does not match main branch")
2332        );
2333    }
2334
2335    #[test]
2336    fn test_main_without_current() {
2337        let data = r#"
2338        {
2339            "format-version" : 2,
2340            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2341            "location": "s3://b/wh/data.db/table",
2342            "last-sequence-number" : 1,
2343            "last-updated-ms": 1515100955770,
2344            "last-column-id": 1,
2345            "schemas": [
2346                {
2347                    "schema-id" : 1,
2348                    "type" : "struct",
2349                    "fields" :[
2350                        {
2351                            "id": 1,
2352                            "name": "struct_name",
2353                            "required": true,
2354                            "type": "fixed[1]"
2355                        },
2356                        {
2357                            "id": 4,
2358                            "name": "ts",
2359                            "required": true,
2360                            "type": "timestamp"
2361                        }
2362                    ]
2363                }
2364            ],
2365            "current-schema-id" : 1,
2366            "partition-specs": [
2367                {
2368                    "spec-id": 0,
2369                    "fields": [
2370                        {
2371                            "source-id": 4,
2372                            "field-id": 1000,
2373                            "name": "ts_day",
2374                            "transform": "day"
2375                        }
2376                    ]
2377                }
2378            ],
2379            "default-spec-id": 0,
2380            "last-partition-id": 1000,
2381            "properties": {
2382                "commit.retry.num-retries": "1"
2383            },
2384            "metadata-log": [
2385                {
2386                    "metadata-file": "s3://bucket/.../v1.json",
2387                    "timestamp-ms": 1515100
2388                }
2389            ],
2390            "sort-orders": [
2391                {
2392                "order-id": 0,
2393                "fields": []
2394                }
2395            ],
2396            "default-sort-order-id": 0,
2397            "refs" : {
2398              "main" : {
2399                "snapshot-id" : 1,
2400                "type" : "branch"
2401              }
2402            },
2403            "snapshots" : [ {
2404              "snapshot-id" : 1,
2405              "timestamp-ms" : 1662532818843,
2406              "sequence-number" : 0,
2407              "summary" : {
2408                "operation" : "append",
2409                "spark.app.id" : "local-1662532784305",
2410                "added-data-files" : "4",
2411                "added-records" : "4",
2412                "added-files-size" : "6001"
2413              },
2414              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2415              "schema-id" : 0
2416            } ]
2417        }
2418    "#;
2419
2420        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2421        assert!(
2422            err.to_string()
2423                .contains("Current snapshot is not set, but main branch exists")
2424        );
2425    }
2426
2427    #[test]
2428    fn test_branch_snapshot_missing() {
2429        let data = r#"
2430        {
2431            "format-version" : 2,
2432            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2433            "location": "s3://b/wh/data.db/table",
2434            "last-sequence-number" : 1,
2435            "last-updated-ms": 1515100955770,
2436            "last-column-id": 1,
2437            "schemas": [
2438                {
2439                    "schema-id" : 1,
2440                    "type" : "struct",
2441                    "fields" :[
2442                        {
2443                            "id": 1,
2444                            "name": "struct_name",
2445                            "required": true,
2446                            "type": "fixed[1]"
2447                        },
2448                        {
2449                            "id": 4,
2450                            "name": "ts",
2451                            "required": true,
2452                            "type": "timestamp"
2453                        }
2454                    ]
2455                }
2456            ],
2457            "current-schema-id" : 1,
2458            "partition-specs": [
2459                {
2460                    "spec-id": 0,
2461                    "fields": [
2462                        {
2463                            "source-id": 4,
2464                            "field-id": 1000,
2465                            "name": "ts_day",
2466                            "transform": "day"
2467                        }
2468                    ]
2469                }
2470            ],
2471            "default-spec-id": 0,
2472            "last-partition-id": 1000,
2473            "properties": {
2474                "commit.retry.num-retries": "1"
2475            },
2476            "metadata-log": [
2477                {
2478                    "metadata-file": "s3://bucket/.../v1.json",
2479                    "timestamp-ms": 1515100
2480                }
2481            ],
2482            "sort-orders": [
2483                {
2484                "order-id": 0,
2485                "fields": []
2486                }
2487            ],
2488            "default-sort-order-id": 0,
2489            "refs" : {
2490              "main" : {
2491                "snapshot-id" : 1,
2492                "type" : "branch"
2493              },
2494              "foo" : {
2495                "snapshot-id" : 2,
2496                "type" : "branch"
2497              }
2498            },
2499            "snapshots" : [ {
2500              "snapshot-id" : 1,
2501              "timestamp-ms" : 1662532818843,
2502              "sequence-number" : 0,
2503              "summary" : {
2504                "operation" : "append",
2505                "spark.app.id" : "local-1662532784305",
2506                "added-data-files" : "4",
2507                "added-records" : "4",
2508                "added-files-size" : "6001"
2509              },
2510              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2511              "schema-id" : 0
2512            } ]
2513        }
2514    "#;
2515
2516        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2517        assert!(
2518            err.to_string().contains(
2519                "Snapshot for reference foo does not exist in the existing snapshots list"
2520            )
2521        );
2522    }
2523
2524    #[test]
2525    fn test_v2_wrong_max_snapshot_sequence_number() {
2526        let data = r#"
2527        {
2528            "format-version": 2,
2529            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2530            "location": "s3://bucket/test/location",
2531            "last-sequence-number": 1,
2532            "last-updated-ms": 1602638573590,
2533            "last-column-id": 3,
2534            "current-schema-id": 0,
2535            "schemas": [
2536                {
2537                    "type": "struct",
2538                    "schema-id": 0,
2539                    "fields": [
2540                        {
2541                            "id": 1,
2542                            "name": "x",
2543                            "required": true,
2544                            "type": "long"
2545                        }
2546                    ]
2547                }
2548            ],
2549            "default-spec-id": 0,
2550            "partition-specs": [
2551                {
2552                    "spec-id": 0,
2553                    "fields": []
2554                }
2555            ],
2556            "last-partition-id": 1000,
2557            "default-sort-order-id": 0,
2558            "sort-orders": [
2559                {
2560                    "order-id": 0,
2561                    "fields": []
2562                }
2563            ],
2564            "properties": {},
2565            "current-snapshot-id": 3055729675574597004,
2566            "snapshots": [
2567                {
2568                    "snapshot-id": 3055729675574597004,
2569                    "timestamp-ms": 1555100955770,
2570                    "sequence-number": 4,
2571                    "summary": {
2572                        "operation": "append"
2573                    },
2574                    "manifest-list": "s3://a/b/2.avro",
2575                    "schema-id": 0
2576                }
2577            ],
2578            "statistics": [],
2579            "snapshot-log": [],
2580            "metadata-log": []
2581        }
2582    "#;
2583
2584        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2585        assert!(err.to_string().contains(
2586            "Invalid snapshot with id 3055729675574597004 and sequence number 4 greater than last sequence number 1"
2587        ));
2588
2589        // Change max sequence number to 4 - should work
2590        let data = data.replace(
2591            r#""last-sequence-number": 1,"#,
2592            r#""last-sequence-number": 4,"#,
2593        );
2594        let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2595        assert_eq!(metadata.last_sequence_number, 4);
2596
2597        // Change max sequence number to 5 - should work
2598        let data = data.replace(
2599            r#""last-sequence-number": 4,"#,
2600            r#""last-sequence-number": 5,"#,
2601        );
2602        let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2603        assert_eq!(metadata.last_sequence_number, 5);
2604    }
2605
2606    #[test]
2607    fn test_statistic_files() {
2608        let data = r#"
2609        {
2610            "format-version": 2,
2611            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2612            "location": "s3://bucket/test/location",
2613            "last-sequence-number": 34,
2614            "last-updated-ms": 1602638573590,
2615            "last-column-id": 3,
2616            "current-schema-id": 0,
2617            "schemas": [
2618                {
2619                    "type": "struct",
2620                    "schema-id": 0,
2621                    "fields": [
2622                        {
2623                            "id": 1,
2624                            "name": "x",
2625                            "required": true,
2626                            "type": "long"
2627                        }
2628                    ]
2629                }
2630            ],
2631            "default-spec-id": 0,
2632            "partition-specs": [
2633                {
2634                    "spec-id": 0,
2635                    "fields": []
2636                }
2637            ],
2638            "last-partition-id": 1000,
2639            "default-sort-order-id": 0,
2640            "sort-orders": [
2641                {
2642                    "order-id": 0,
2643                    "fields": []
2644                }
2645            ],
2646            "properties": {},
2647            "current-snapshot-id": 3055729675574597004,
2648            "snapshots": [
2649                {
2650                    "snapshot-id": 3055729675574597004,
2651                    "timestamp-ms": 1555100955770,
2652                    "sequence-number": 1,
2653                    "summary": {
2654                        "operation": "append"
2655                    },
2656                    "manifest-list": "s3://a/b/2.avro",
2657                    "schema-id": 0
2658                }
2659            ],
2660            "statistics": [
2661                {
2662                    "snapshot-id": 3055729675574597004,
2663                    "statistics-path": "s3://a/b/stats.puffin",
2664                    "file-size-in-bytes": 413,
2665                    "file-footer-size-in-bytes": 42,
2666                    "blob-metadata": [
2667                        {
2668                            "type": "ndv",
2669                            "snapshot-id": 3055729675574597004,
2670                            "sequence-number": 1,
2671                            "fields": [
2672                                1
2673                            ]
2674                        }
2675                    ]
2676                }
2677            ],
2678            "snapshot-log": [],
2679            "metadata-log": []
2680        }
2681    "#;
2682
2683        let schema = Schema::builder()
2684            .with_schema_id(0)
2685            .with_fields(vec![Arc::new(NestedField::required(
2686                1,
2687                "x",
2688                Type::Primitive(PrimitiveType::Long),
2689            ))])
2690            .build()
2691            .unwrap();
2692        let partition_spec = PartitionSpec::builder(schema.clone())
2693            .with_spec_id(0)
2694            .build()
2695            .unwrap();
2696        let snapshot = Snapshot::builder()
2697            .with_snapshot_id(3055729675574597004)
2698            .with_timestamp_ms(1555100955770)
2699            .with_sequence_number(1)
2700            .with_manifest_list("s3://a/b/2.avro")
2701            .with_schema_id(0)
2702            .with_summary(Summary {
2703                operation: Operation::Append,
2704                additional_properties: HashMap::new(),
2705            })
2706            .build();
2707
2708        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2709        let expected = TableMetadata {
2710            format_version: FormatVersion::V2,
2711            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2712            location: "s3://bucket/test/location".to_string(),
2713            last_updated_ms: 1602638573590,
2714            last_column_id: 3,
2715            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2716            current_schema_id: 0,
2717            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2718            default_partition_type,
2719            default_spec: Arc::new(partition_spec),
2720            last_partition_id: 1000,
2721            default_sort_order_id: 0,
2722            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2723            snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2724            current_snapshot_id: Some(3055729675574597004),
2725            last_sequence_number: 34,
2726            properties: HashMap::new(),
2727            snapshot_log: Vec::new(),
2728            metadata_log: Vec::new(),
2729            statistics: HashMap::from_iter(vec![(3055729675574597004, StatisticsFile {
2730                snapshot_id: 3055729675574597004,
2731                statistics_path: "s3://a/b/stats.puffin".to_string(),
2732                file_size_in_bytes: 413,
2733                file_footer_size_in_bytes: 42,
2734                key_metadata: None,
2735                blob_metadata: vec![BlobMetadata {
2736                    snapshot_id: 3055729675574597004,
2737                    sequence_number: 1,
2738                    fields: vec![1],
2739                    r#type: "ndv".to_string(),
2740                    properties: HashMap::new(),
2741                }],
2742            })]),
2743            partition_statistics: HashMap::new(),
2744            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2745                snapshot_id: 3055729675574597004,
2746                retention: SnapshotRetention::Branch {
2747                    min_snapshots_to_keep: None,
2748                    max_snapshot_age_ms: None,
2749                    max_ref_age_ms: None,
2750                },
2751            })]),
2752            encryption_keys: HashMap::new(),
2753            next_row_id: INITIAL_ROW_ID,
2754        };
2755
2756        check_table_metadata_serde(data, expected);
2757    }
2758
2759    #[test]
2760    fn test_partition_statistics_file() {
2761        let data = r#"
2762        {
2763            "format-version": 2,
2764            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2765            "location": "s3://bucket/test/location",
2766            "last-sequence-number": 34,
2767            "last-updated-ms": 1602638573590,
2768            "last-column-id": 3,
2769            "current-schema-id": 0,
2770            "schemas": [
2771                {
2772                    "type": "struct",
2773                    "schema-id": 0,
2774                    "fields": [
2775                        {
2776                            "id": 1,
2777                            "name": "x",
2778                            "required": true,
2779                            "type": "long"
2780                        }
2781                    ]
2782                }
2783            ],
2784            "default-spec-id": 0,
2785            "partition-specs": [
2786                {
2787                    "spec-id": 0,
2788                    "fields": []
2789                }
2790            ],
2791            "last-partition-id": 1000,
2792            "default-sort-order-id": 0,
2793            "sort-orders": [
2794                {
2795                    "order-id": 0,
2796                    "fields": []
2797                }
2798            ],
2799            "properties": {},
2800            "current-snapshot-id": 3055729675574597004,
2801            "snapshots": [
2802                {
2803                    "snapshot-id": 3055729675574597004,
2804                    "timestamp-ms": 1555100955770,
2805                    "sequence-number": 1,
2806                    "summary": {
2807                        "operation": "append"
2808                    },
2809                    "manifest-list": "s3://a/b/2.avro",
2810                    "schema-id": 0
2811                }
2812            ],
2813            "partition-statistics": [
2814                {
2815                    "snapshot-id": 3055729675574597004,
2816                    "statistics-path": "s3://a/b/partition-stats.parquet",
2817                    "file-size-in-bytes": 43
2818                }
2819            ],
2820            "snapshot-log": [],
2821            "metadata-log": []
2822        }
2823        "#;
2824
2825        let schema = Schema::builder()
2826            .with_schema_id(0)
2827            .with_fields(vec![Arc::new(NestedField::required(
2828                1,
2829                "x",
2830                Type::Primitive(PrimitiveType::Long),
2831            ))])
2832            .build()
2833            .unwrap();
2834        let partition_spec = PartitionSpec::builder(schema.clone())
2835            .with_spec_id(0)
2836            .build()
2837            .unwrap();
2838        let snapshot = Snapshot::builder()
2839            .with_snapshot_id(3055729675574597004)
2840            .with_timestamp_ms(1555100955770)
2841            .with_sequence_number(1)
2842            .with_manifest_list("s3://a/b/2.avro")
2843            .with_schema_id(0)
2844            .with_summary(Summary {
2845                operation: Operation::Append,
2846                additional_properties: HashMap::new(),
2847            })
2848            .build();
2849
2850        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2851        let expected = TableMetadata {
2852            format_version: FormatVersion::V2,
2853            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2854            location: "s3://bucket/test/location".to_string(),
2855            last_updated_ms: 1602638573590,
2856            last_column_id: 3,
2857            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2858            current_schema_id: 0,
2859            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2860            default_spec: Arc::new(partition_spec),
2861            default_partition_type,
2862            last_partition_id: 1000,
2863            default_sort_order_id: 0,
2864            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2865            snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2866            current_snapshot_id: Some(3055729675574597004),
2867            last_sequence_number: 34,
2868            properties: HashMap::new(),
2869            snapshot_log: Vec::new(),
2870            metadata_log: Vec::new(),
2871            statistics: HashMap::new(),
2872            partition_statistics: HashMap::from_iter(vec![(
2873                3055729675574597004,
2874                PartitionStatisticsFile {
2875                    snapshot_id: 3055729675574597004,
2876                    statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2877                    file_size_in_bytes: 43,
2878                },
2879            )]),
2880            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2881                snapshot_id: 3055729675574597004,
2882                retention: SnapshotRetention::Branch {
2883                    min_snapshots_to_keep: None,
2884                    max_snapshot_age_ms: None,
2885                    max_ref_age_ms: None,
2886                },
2887            })]),
2888            encryption_keys: HashMap::new(),
2889            next_row_id: INITIAL_ROW_ID,
2890        };
2891
2892        check_table_metadata_serde(data, expected);
2893    }
2894
2895    #[test]
2896    fn test_invalid_table_uuid() -> Result<()> {
2897        let data = r#"
2898            {
2899                "format-version" : 2,
2900                "table-uuid": "xxxx"
2901            }
2902        "#;
2903        assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2904        Ok(())
2905    }
2906
2907    #[test]
2908    fn test_deserialize_table_data_v2_invalid_format_version() -> Result<()> {
2909        let data = r#"
2910            {
2911                "format-version" : 1
2912            }
2913        "#;
2914        assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2915        Ok(())
2916    }
2917
2918    #[test]
2919    fn test_table_metadata_v3_valid_minimal() {
2920        let metadata_str =
2921            fs::read_to_string("testdata/table_metadata/TableMetadataV3ValidMinimal.json").unwrap();
2922
2923        let table_metadata = serde_json::from_str::<TableMetadata>(&metadata_str).unwrap();
2924        assert_eq!(table_metadata.format_version, FormatVersion::V3);
2925
2926        let schema = Schema::builder()
2927            .with_schema_id(0)
2928            .with_fields(vec![
2929                Arc::new(
2930                    NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long))
2931                        .with_initial_default(Literal::Primitive(PrimitiveLiteral::Long(1)))
2932                        .with_write_default(Literal::Primitive(PrimitiveLiteral::Long(1))),
2933                ),
2934                Arc::new(
2935                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2936                        .with_doc("comment"),
2937                ),
2938                Arc::new(NestedField::required(
2939                    3,
2940                    "z",
2941                    Type::Primitive(PrimitiveType::Long),
2942                )),
2943            ])
2944            .build()
2945            .unwrap();
2946
2947        let partition_spec = PartitionSpec::builder(schema.clone())
2948            .with_spec_id(0)
2949            .add_unbound_field(UnboundPartitionField {
2950                name: "x".to_string(),
2951                transform: Transform::Identity,
2952                source_id: 1,
2953                field_id: Some(1000),
2954            })
2955            .unwrap()
2956            .build()
2957            .unwrap();
2958
2959        let sort_order = SortOrder::builder()
2960            .with_order_id(3)
2961            .with_sort_field(SortField {
2962                source_id: 2,
2963                transform: Transform::Identity,
2964                direction: SortDirection::Ascending,
2965                null_order: NullOrder::First,
2966            })
2967            .with_sort_field(SortField {
2968                source_id: 3,
2969                transform: Transform::Bucket(4),
2970                direction: SortDirection::Descending,
2971                null_order: NullOrder::Last,
2972            })
2973            .build_unbound()
2974            .unwrap();
2975
2976        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2977        let expected = TableMetadata {
2978            format_version: FormatVersion::V3,
2979            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2980            location: "s3://bucket/test/location".to_string(),
2981            last_updated_ms: 1602638573590,
2982            last_column_id: 3,
2983            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2984            current_schema_id: 0,
2985            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2986            default_spec: Arc::new(partition_spec),
2987            default_partition_type,
2988            last_partition_id: 1000,
2989            default_sort_order_id: 3,
2990            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
2991            snapshots: HashMap::default(),
2992            current_snapshot_id: None,
2993            last_sequence_number: 34,
2994            properties: HashMap::new(),
2995            snapshot_log: Vec::new(),
2996            metadata_log: Vec::new(),
2997            refs: HashMap::new(),
2998            statistics: HashMap::new(),
2999            partition_statistics: HashMap::new(),
3000            encryption_keys: HashMap::new(),
3001            next_row_id: 0, // V3 specific field from the JSON
3002        };
3003
3004        check_table_metadata_serde(&metadata_str, expected);
3005    }
3006
3007    #[test]
3008    fn test_table_metadata_v2_file_valid() {
3009        let metadata =
3010            fs::read_to_string("testdata/table_metadata/TableMetadataV2Valid.json").unwrap();
3011
3012        let schema1 = Schema::builder()
3013            .with_schema_id(0)
3014            .with_fields(vec![Arc::new(NestedField::required(
3015                1,
3016                "x",
3017                Type::Primitive(PrimitiveType::Long),
3018            ))])
3019            .build()
3020            .unwrap();
3021
3022        let schema2 = Schema::builder()
3023            .with_schema_id(1)
3024            .with_fields(vec![
3025                Arc::new(NestedField::required(
3026                    1,
3027                    "x",
3028                    Type::Primitive(PrimitiveType::Long),
3029                )),
3030                Arc::new(
3031                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3032                        .with_doc("comment"),
3033                ),
3034                Arc::new(NestedField::required(
3035                    3,
3036                    "z",
3037                    Type::Primitive(PrimitiveType::Long),
3038                )),
3039            ])
3040            .with_identifier_field_ids(vec![1, 2])
3041            .build()
3042            .unwrap();
3043
3044        let partition_spec = PartitionSpec::builder(schema2.clone())
3045            .with_spec_id(0)
3046            .add_unbound_field(UnboundPartitionField {
3047                name: "x".to_string(),
3048                transform: Transform::Identity,
3049                source_id: 1,
3050                field_id: Some(1000),
3051            })
3052            .unwrap()
3053            .build()
3054            .unwrap();
3055
3056        let sort_order = SortOrder::builder()
3057            .with_order_id(3)
3058            .with_sort_field(SortField {
3059                source_id: 2,
3060                transform: Transform::Identity,
3061                direction: SortDirection::Ascending,
3062                null_order: NullOrder::First,
3063            })
3064            .with_sort_field(SortField {
3065                source_id: 3,
3066                transform: Transform::Bucket(4),
3067                direction: SortDirection::Descending,
3068                null_order: NullOrder::Last,
3069            })
3070            .build_unbound()
3071            .unwrap();
3072
3073        let snapshot1 = Snapshot::builder()
3074            .with_snapshot_id(3051729675574597004)
3075            .with_timestamp_ms(1515100955770)
3076            .with_sequence_number(0)
3077            .with_manifest_list("s3://a/b/1.avro")
3078            .with_summary(Summary {
3079                operation: Operation::Append,
3080                additional_properties: HashMap::new(),
3081            })
3082            .build();
3083
3084        let snapshot2 = Snapshot::builder()
3085            .with_snapshot_id(3055729675574597004)
3086            .with_parent_snapshot_id(Some(3051729675574597004))
3087            .with_timestamp_ms(1555100955770)
3088            .with_sequence_number(1)
3089            .with_schema_id(1)
3090            .with_manifest_list("s3://a/b/2.avro")
3091            .with_summary(Summary {
3092                operation: Operation::Append,
3093                additional_properties: HashMap::new(),
3094            })
3095            .build();
3096
3097        let default_partition_type = partition_spec.partition_type(&schema2).unwrap();
3098        let expected = TableMetadata {
3099            format_version: FormatVersion::V2,
3100            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3101            location: "s3://bucket/test/location".to_string(),
3102            last_updated_ms: 1602638573590,
3103            last_column_id: 3,
3104            schemas: HashMap::from_iter(vec![(0, Arc::new(schema1)), (1, Arc::new(schema2))]),
3105            current_schema_id: 1,
3106            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3107            default_spec: Arc::new(partition_spec),
3108            default_partition_type,
3109            last_partition_id: 1000,
3110            default_sort_order_id: 3,
3111            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3112            snapshots: HashMap::from_iter(vec![
3113                (3051729675574597004, Arc::new(snapshot1)),
3114                (3055729675574597004, Arc::new(snapshot2)),
3115            ]),
3116            current_snapshot_id: Some(3055729675574597004),
3117            last_sequence_number: 34,
3118            properties: HashMap::new(),
3119            snapshot_log: vec![
3120                SnapshotLog {
3121                    snapshot_id: 3051729675574597004,
3122                    timestamp_ms: 1515100955770,
3123                },
3124                SnapshotLog {
3125                    snapshot_id: 3055729675574597004,
3126                    timestamp_ms: 1555100955770,
3127                },
3128            ],
3129            metadata_log: Vec::new(),
3130            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
3131                snapshot_id: 3055729675574597004,
3132                retention: SnapshotRetention::Branch {
3133                    min_snapshots_to_keep: None,
3134                    max_snapshot_age_ms: None,
3135                    max_ref_age_ms: None,
3136                },
3137            })]),
3138            statistics: HashMap::new(),
3139            partition_statistics: HashMap::new(),
3140            encryption_keys: HashMap::new(),
3141            next_row_id: INITIAL_ROW_ID,
3142        };
3143
3144        check_table_metadata_serde(&metadata, expected);
3145    }
3146
3147    #[test]
3148    fn test_table_metadata_v2_file_valid_minimal() {
3149        let metadata =
3150            fs::read_to_string("testdata/table_metadata/TableMetadataV2ValidMinimal.json").unwrap();
3151
3152        let schema = Schema::builder()
3153            .with_schema_id(0)
3154            .with_fields(vec![
3155                Arc::new(NestedField::required(
3156                    1,
3157                    "x",
3158                    Type::Primitive(PrimitiveType::Long),
3159                )),
3160                Arc::new(
3161                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3162                        .with_doc("comment"),
3163                ),
3164                Arc::new(NestedField::required(
3165                    3,
3166                    "z",
3167                    Type::Primitive(PrimitiveType::Long),
3168                )),
3169            ])
3170            .build()
3171            .unwrap();
3172
3173        let partition_spec = PartitionSpec::builder(schema.clone())
3174            .with_spec_id(0)
3175            .add_unbound_field(UnboundPartitionField {
3176                name: "x".to_string(),
3177                transform: Transform::Identity,
3178                source_id: 1,
3179                field_id: Some(1000),
3180            })
3181            .unwrap()
3182            .build()
3183            .unwrap();
3184
3185        let sort_order = SortOrder::builder()
3186            .with_order_id(3)
3187            .with_sort_field(SortField {
3188                source_id: 2,
3189                transform: Transform::Identity,
3190                direction: SortDirection::Ascending,
3191                null_order: NullOrder::First,
3192            })
3193            .with_sort_field(SortField {
3194                source_id: 3,
3195                transform: Transform::Bucket(4),
3196                direction: SortDirection::Descending,
3197                null_order: NullOrder::Last,
3198            })
3199            .build_unbound()
3200            .unwrap();
3201
3202        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3203        let expected = TableMetadata {
3204            format_version: FormatVersion::V2,
3205            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3206            location: "s3://bucket/test/location".to_string(),
3207            last_updated_ms: 1602638573590,
3208            last_column_id: 3,
3209            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3210            current_schema_id: 0,
3211            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3212            default_partition_type,
3213            default_spec: Arc::new(partition_spec),
3214            last_partition_id: 1000,
3215            default_sort_order_id: 3,
3216            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3217            snapshots: HashMap::default(),
3218            current_snapshot_id: None,
3219            last_sequence_number: 34,
3220            properties: HashMap::new(),
3221            snapshot_log: vec![],
3222            metadata_log: Vec::new(),
3223            refs: HashMap::new(),
3224            statistics: HashMap::new(),
3225            partition_statistics: HashMap::new(),
3226            encryption_keys: HashMap::new(),
3227            next_row_id: INITIAL_ROW_ID,
3228        };
3229
3230        check_table_metadata_serde(&metadata, expected);
3231    }
3232
3233    #[test]
3234    fn test_table_metadata_v1_file_valid() {
3235        let metadata =
3236            fs::read_to_string("testdata/table_metadata/TableMetadataV1Valid.json").unwrap();
3237
3238        let schema = Schema::builder()
3239            .with_schema_id(0)
3240            .with_fields(vec![
3241                Arc::new(NestedField::required(
3242                    1,
3243                    "x",
3244                    Type::Primitive(PrimitiveType::Long),
3245                )),
3246                Arc::new(
3247                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3248                        .with_doc("comment"),
3249                ),
3250                Arc::new(NestedField::required(
3251                    3,
3252                    "z",
3253                    Type::Primitive(PrimitiveType::Long),
3254                )),
3255            ])
3256            .build()
3257            .unwrap();
3258
3259        let partition_spec = PartitionSpec::builder(schema.clone())
3260            .with_spec_id(0)
3261            .add_unbound_field(UnboundPartitionField {
3262                name: "x".to_string(),
3263                transform: Transform::Identity,
3264                source_id: 1,
3265                field_id: Some(1000),
3266            })
3267            .unwrap()
3268            .build()
3269            .unwrap();
3270
3271        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3272        let expected = TableMetadata {
3273            format_version: FormatVersion::V1,
3274            table_uuid: Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap(),
3275            location: "s3://bucket/test/location".to_string(),
3276            last_updated_ms: 1602638573874,
3277            last_column_id: 3,
3278            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3279            current_schema_id: 0,
3280            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3281            default_spec: Arc::new(partition_spec),
3282            default_partition_type,
3283            last_partition_id: 0,
3284            default_sort_order_id: 0,
3285            // Sort order is added during deserialization for V2 compatibility
3286            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
3287            snapshots: HashMap::new(),
3288            current_snapshot_id: None,
3289            last_sequence_number: 0,
3290            properties: HashMap::new(),
3291            snapshot_log: vec![],
3292            metadata_log: Vec::new(),
3293            refs: HashMap::new(),
3294            statistics: HashMap::new(),
3295            partition_statistics: HashMap::new(),
3296            encryption_keys: HashMap::new(),
3297            next_row_id: INITIAL_ROW_ID,
3298        };
3299
3300        check_table_metadata_serde(&metadata, expected);
3301    }
3302
3303    #[test]
3304    fn test_table_metadata_v1_compat() {
3305        let metadata =
3306            fs::read_to_string("testdata/table_metadata/TableMetadataV1Compat.json").unwrap();
3307
3308        // Deserialize the JSON to verify it works
3309        let desered_type: TableMetadata = serde_json::from_str(&metadata)
3310            .expect("Failed to deserialize TableMetadataV1Compat.json");
3311
3312        // Verify some key fields match
3313        assert_eq!(desered_type.format_version(), FormatVersion::V1);
3314        assert_eq!(
3315            desered_type.uuid(),
3316            Uuid::parse_str("3276010d-7b1d-488c-98d8-9025fc4fde6b").unwrap()
3317        );
3318        assert_eq!(
3319            desered_type.location(),
3320            "s3://bucket/warehouse/iceberg/glue.db/table_name"
3321        );
3322        assert_eq!(desered_type.last_updated_ms(), 1727773114005);
3323        assert_eq!(desered_type.current_schema_id(), 0);
3324    }
3325
3326    #[test]
3327    fn test_table_metadata_v1_schemas_without_current_id() {
3328        let metadata = fs::read_to_string(
3329            "testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json",
3330        )
3331        .unwrap();
3332
3333        // Deserialize the JSON - this should succeed by using the 'schema' field instead of 'schemas'
3334        let desered_type: TableMetadata = serde_json::from_str(&metadata)
3335            .expect("Failed to deserialize TableMetadataV1SchemasWithoutCurrentId.json");
3336
3337        // Verify it used the 'schema' field
3338        assert_eq!(desered_type.format_version(), FormatVersion::V1);
3339        assert_eq!(
3340            desered_type.uuid(),
3341            Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3342        );
3343
3344        // Get the schema and verify it has the expected fields
3345        let schema = desered_type.current_schema();
3346        assert_eq!(schema.as_struct().fields().len(), 3);
3347        assert_eq!(schema.as_struct().fields()[0].name, "x");
3348        assert_eq!(schema.as_struct().fields()[1].name, "y");
3349        assert_eq!(schema.as_struct().fields()[2].name, "z");
3350    }
3351
3352    #[test]
3353    fn test_table_metadata_v1_no_valid_schema() {
3354        let metadata =
3355            fs::read_to_string("testdata/table_metadata/TableMetadataV1NoValidSchema.json")
3356                .unwrap();
3357
3358        // Deserialize the JSON - this should fail because neither schemas + current_schema_id nor schema is valid
3359        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3360
3361        assert!(desered.is_err());
3362        let error_message = desered.unwrap_err().to_string();
3363        assert!(
3364            error_message.contains("No valid schema configuration found"),
3365            "Expected error about no valid schema configuration, got: {error_message}"
3366        );
3367    }
3368
3369    #[test]
3370    fn test_table_metadata_v1_partition_specs_without_default_id() {
3371        let metadata = fs::read_to_string(
3372            "testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json",
3373        )
3374        .unwrap();
3375
3376        // Deserialize the JSON - this should succeed by inferring default_spec_id as the max spec ID
3377        let desered_type: TableMetadata = serde_json::from_str(&metadata)
3378            .expect("Failed to deserialize TableMetadataV1PartitionSpecsWithoutDefaultId.json");
3379
3380        // Verify basic metadata
3381        assert_eq!(desered_type.format_version(), FormatVersion::V1);
3382        assert_eq!(
3383            desered_type.uuid(),
3384            Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3385        );
3386
3387        // Verify partition specs
3388        assert_eq!(desered_type.default_partition_spec_id(), 2); // Should pick the largest spec ID (2)
3389        assert_eq!(desered_type.partition_specs.len(), 2);
3390
3391        // Verify the default spec has the expected fields
3392        let default_spec = &desered_type.default_spec;
3393        assert_eq!(default_spec.spec_id(), 2);
3394        assert_eq!(default_spec.fields().len(), 1);
3395        assert_eq!(default_spec.fields()[0].name, "y");
3396        assert_eq!(default_spec.fields()[0].transform, Transform::Identity);
3397        assert_eq!(default_spec.fields()[0].source_id, 2);
3398    }
3399
3400    #[test]
3401    fn test_table_metadata_v2_schema_not_found() {
3402        let metadata =
3403            fs::read_to_string("testdata/table_metadata/TableMetadataV2CurrentSchemaNotFound.json")
3404                .unwrap();
3405
3406        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3407
3408        assert_eq!(
3409            desered.unwrap_err().to_string(),
3410            "DataInvalid => No schema exists with the current schema id 2."
3411        )
3412    }
3413
3414    #[test]
3415    fn test_table_metadata_v2_missing_sort_order() {
3416        let metadata =
3417            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSortOrder.json")
3418                .unwrap();
3419
3420        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3421
3422        assert_eq!(
3423            desered.unwrap_err().to_string(),
3424            "data did not match any variant of untagged enum TableMetadataEnum"
3425        )
3426    }
3427
3428    #[test]
3429    fn test_table_metadata_v2_missing_partition_specs() {
3430        let metadata =
3431            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingPartitionSpecs.json")
3432                .unwrap();
3433
3434        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3435
3436        assert_eq!(
3437            desered.unwrap_err().to_string(),
3438            "data did not match any variant of untagged enum TableMetadataEnum"
3439        )
3440    }
3441
3442    #[test]
3443    fn test_table_metadata_v2_missing_last_partition_id() {
3444        let metadata = fs::read_to_string(
3445            "testdata/table_metadata/TableMetadataV2MissingLastPartitionId.json",
3446        )
3447        .unwrap();
3448
3449        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3450
3451        assert_eq!(
3452            desered.unwrap_err().to_string(),
3453            "data did not match any variant of untagged enum TableMetadataEnum"
3454        )
3455    }
3456
3457    #[test]
3458    fn test_table_metadata_v2_missing_schemas() {
3459        let metadata =
3460            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSchemas.json")
3461                .unwrap();
3462
3463        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3464
3465        assert_eq!(
3466            desered.unwrap_err().to_string(),
3467            "data did not match any variant of untagged enum TableMetadataEnum"
3468        )
3469    }
3470
3471    #[test]
3472    fn test_table_metadata_v2_unsupported_version() {
3473        let metadata =
3474            fs::read_to_string("testdata/table_metadata/TableMetadataUnsupportedVersion.json")
3475                .unwrap();
3476
3477        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3478
3479        assert_eq!(
3480            desered.unwrap_err().to_string(),
3481            "data did not match any variant of untagged enum TableMetadataEnum"
3482        )
3483    }
3484
3485    #[test]
3486    fn test_order_of_format_version() {
3487        assert!(FormatVersion::V1 < FormatVersion::V2);
3488        assert_eq!(FormatVersion::V1, FormatVersion::V1);
3489        assert_eq!(FormatVersion::V2, FormatVersion::V2);
3490    }
3491
3492    #[test]
3493    fn test_default_partition_spec() {
3494        let default_spec_id = 1234;
3495        let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3496        let partition_spec = PartitionSpec::unpartition_spec();
3497        table_meta_data.default_spec = partition_spec.clone().into();
3498        table_meta_data
3499            .partition_specs
3500            .insert(default_spec_id, Arc::new(partition_spec));
3501
3502        assert_eq!(
3503            (*table_meta_data.default_partition_spec().clone()).clone(),
3504            (*table_meta_data
3505                .partition_spec_by_id(default_spec_id)
3506                .unwrap()
3507                .clone())
3508            .clone()
3509        );
3510    }
3511    #[test]
3512    fn test_default_sort_order() {
3513        let default_sort_order_id = 1234;
3514        let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3515        table_meta_data.default_sort_order_id = default_sort_order_id;
3516        table_meta_data
3517            .sort_orders
3518            .insert(default_sort_order_id, Arc::new(SortOrder::default()));
3519
3520        assert_eq!(
3521            table_meta_data.default_sort_order(),
3522            table_meta_data
3523                .sort_orders
3524                .get(&default_sort_order_id)
3525                .unwrap()
3526        )
3527    }
3528
3529    #[test]
3530    fn test_table_metadata_builder_from_table_creation() {
3531        let table_creation = TableCreation::builder()
3532            .location("s3://db/table".to_string())
3533            .name("table".to_string())
3534            .properties(HashMap::new())
3535            .schema(Schema::builder().build().unwrap())
3536            .build();
3537        let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
3538            .unwrap()
3539            .build()
3540            .unwrap()
3541            .metadata;
3542        assert_eq!(table_metadata.location, "s3://db/table");
3543        assert_eq!(table_metadata.schemas.len(), 1);
3544        assert_eq!(
3545            table_metadata
3546                .schemas
3547                .get(&0)
3548                .unwrap()
3549                .as_struct()
3550                .fields()
3551                .len(),
3552            0
3553        );
3554        assert_eq!(table_metadata.properties.len(), 0);
3555        assert_eq!(
3556            table_metadata.partition_specs,
3557            HashMap::from([(
3558                0,
3559                Arc::new(
3560                    PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap().clone())
3561                        .with_spec_id(0)
3562                        .build()
3563                        .unwrap()
3564                )
3565            )])
3566        );
3567        assert_eq!(
3568            table_metadata.sort_orders,
3569            HashMap::from([(
3570                0,
3571                Arc::new(SortOrder {
3572                    order_id: 0,
3573                    fields: vec![]
3574                })
3575            )])
3576        );
3577    }
3578
3579    #[tokio::test]
3580    async fn test_table_metadata_read_write() {
3581        // Create a temporary directory for our test
3582        let temp_dir = TempDir::new().unwrap();
3583        let temp_path = temp_dir.path().to_str().unwrap();
3584
3585        // Create a FileIO instance
3586        let file_io = FileIO::new_with_fs();
3587
3588        // Use an existing test metadata from the test files
3589        let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3590
3591        // Define the metadata location
3592        let metadata_location = MetadataLocation::new_with_metadata(temp_path, &original_metadata);
3593        let metadata_location_str = metadata_location.to_string();
3594
3595        // Write the metadata
3596        original_metadata
3597            .write_to(&file_io, &metadata_location)
3598            .await
3599            .unwrap();
3600
3601        // Verify the file exists
3602        assert!(fs::metadata(&metadata_location_str).is_ok());
3603
3604        // Read the metadata back
3605        let read_metadata = TableMetadata::read_from(&file_io, &metadata_location_str)
3606            .await
3607            .unwrap();
3608
3609        // Verify the metadata matches
3610        assert_eq!(read_metadata, original_metadata);
3611    }
3612
3613    #[tokio::test]
3614    async fn test_table_metadata_read_compressed() {
3615        let temp_dir = TempDir::new().unwrap();
3616        let metadata_location = temp_dir.path().join("v1.gz.metadata.json");
3617
3618        let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3619        let json = serde_json::to_string(&original_metadata).unwrap();
3620
3621        let compressed = CompressionCodec::Gzip
3622            .compress(json.into_bytes())
3623            .expect("failed to compress metadata");
3624        std::fs::write(&metadata_location, &compressed).expect("failed to write metadata");
3625
3626        // Read the metadata back
3627        let file_io = FileIO::new_with_fs();
3628        let metadata_location = metadata_location.to_str().unwrap();
3629        let read_metadata = TableMetadata::read_from(&file_io, metadata_location)
3630            .await
3631            .unwrap();
3632
3633        // Verify the metadata matches
3634        assert_eq!(read_metadata, original_metadata);
3635    }
3636
3637    #[tokio::test]
3638    async fn test_table_metadata_read_nonexistent_file() {
3639        // Create a FileIO instance
3640        let file_io = FileIO::new_with_fs();
3641
3642        // Try to read a non-existent file
3643        let result = TableMetadata::read_from(&file_io, "/nonexistent/path/metadata.json").await;
3644
3645        // Verify it returns an error
3646        assert!(result.is_err());
3647    }
3648
3649    #[tokio::test]
3650    async fn test_table_metadata_write_with_gzip_compression() {
3651        let temp_dir = TempDir::new().unwrap();
3652        let temp_path = temp_dir.path().to_str().unwrap();
3653        let file_io = FileIO::new_with_fs();
3654
3655        // Get a test metadata and add gzip compression property
3656        let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3657
3658        // Modify properties to enable gzip compression (using mixed case to test case-insensitive matching)
3659        let mut props = original_metadata.properties.clone();
3660        props.insert(
3661            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
3662            "GziP".to_string(),
3663        );
3664        // Use builder to create new metadata with updated properties
3665        let compressed_metadata =
3666            TableMetadataBuilder::new_from_metadata(original_metadata.clone(), None)
3667                .assign_uuid(original_metadata.table_uuid)
3668                .set_properties(props.clone())
3669                .unwrap()
3670                .build()
3671                .unwrap()
3672                .metadata;
3673
3674        // Create MetadataLocation with compression codec from metadata
3675        let metadata_location =
3676            MetadataLocation::new_with_metadata(temp_path, &compressed_metadata);
3677        let metadata_location_str = metadata_location.to_string();
3678
3679        // Verify the location has the .gz extension
3680        assert!(metadata_location_str.contains(".gz.metadata.json"));
3681
3682        // Write the metadata with compression
3683        compressed_metadata
3684            .write_to(&file_io, &metadata_location)
3685            .await
3686            .unwrap();
3687
3688        // Verify the compressed file exists
3689        assert!(std::path::Path::new(&metadata_location_str).exists());
3690
3691        // Read the raw file and check it's gzip compressed
3692        let raw_content = std::fs::read(&metadata_location_str).unwrap();
3693        assert!(raw_content.len() > 2);
3694        assert_eq!(raw_content[0], 0x1F); // gzip magic number
3695        assert_eq!(raw_content[1], 0x8B); // gzip magic number
3696
3697        // Read the metadata back using the compressed location
3698        let read_metadata = TableMetadata::read_from(&file_io, &metadata_location_str)
3699            .await
3700            .unwrap();
3701
3702        // Verify the complete round-trip: read metadata should match what we wrote
3703        assert_eq!(read_metadata, compressed_metadata);
3704    }
3705
3706    #[test]
3707    fn test_partition_name_exists() {
3708        let schema = Schema::builder()
3709            .with_fields(vec![
3710                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3711                NestedField::required(2, "partition_col", Type::Primitive(PrimitiveType::Int))
3712                    .into(),
3713            ])
3714            .build()
3715            .unwrap();
3716
3717        let spec1 = PartitionSpec::builder(schema.clone())
3718            .with_spec_id(1)
3719            .add_partition_field("data", "data_partition", Transform::Identity)
3720            .unwrap()
3721            .build()
3722            .unwrap();
3723
3724        let spec2 = PartitionSpec::builder(schema.clone())
3725            .with_spec_id(2)
3726            .add_partition_field("partition_col", "partition_bucket", Transform::Bucket(16))
3727            .unwrap()
3728            .build()
3729            .unwrap();
3730
3731        // Build metadata with these specs
3732        let metadata = TableMetadataBuilder::new(
3733            schema,
3734            spec1.clone().into_unbound(),
3735            SortOrder::unsorted_order(),
3736            "s3://test/location".to_string(),
3737            FormatVersion::V2,
3738            HashMap::new(),
3739        )
3740        .unwrap()
3741        .add_partition_spec(spec2.into_unbound())
3742        .unwrap()
3743        .build()
3744        .unwrap()
3745        .metadata;
3746
3747        assert!(metadata.partition_name_exists("data_partition"));
3748        assert!(metadata.partition_name_exists("partition_bucket"));
3749
3750        assert!(!metadata.partition_name_exists("nonexistent_field"));
3751        assert!(!metadata.partition_name_exists("data")); // schema field name, not partition field name
3752        assert!(!metadata.partition_name_exists(""));
3753    }
3754
3755    #[test]
3756    fn test_partition_name_exists_empty_specs() {
3757        // Create metadata with no partition specs (unpartitioned table)
3758        let schema = Schema::builder()
3759            .with_fields(vec![
3760                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3761            ])
3762            .build()
3763            .unwrap();
3764
3765        let metadata = TableMetadataBuilder::new(
3766            schema,
3767            PartitionSpec::unpartition_spec().into_unbound(),
3768            SortOrder::unsorted_order(),
3769            "s3://test/location".to_string(),
3770            FormatVersion::V2,
3771            HashMap::new(),
3772        )
3773        .unwrap()
3774        .build()
3775        .unwrap()
3776        .metadata;
3777
3778        assert!(!metadata.partition_name_exists("any_field"));
3779        assert!(!metadata.partition_name_exists("data"));
3780    }
3781
3782    #[test]
3783    fn test_name_exists_in_any_schema() {
3784        // Create multiple schemas with different fields
3785        let schema1 = Schema::builder()
3786            .with_schema_id(1)
3787            .with_fields(vec![
3788                NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3789                NestedField::required(2, "field2", Type::Primitive(PrimitiveType::Int)).into(),
3790            ])
3791            .build()
3792            .unwrap();
3793
3794        let schema2 = Schema::builder()
3795            .with_schema_id(2)
3796            .with_fields(vec![
3797                NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3798                NestedField::required(3, "field3", Type::Primitive(PrimitiveType::Long)).into(),
3799            ])
3800            .build()
3801            .unwrap();
3802
3803        let metadata = TableMetadataBuilder::new(
3804            schema1,
3805            PartitionSpec::unpartition_spec().into_unbound(),
3806            SortOrder::unsorted_order(),
3807            "s3://test/location".to_string(),
3808            FormatVersion::V2,
3809            HashMap::new(),
3810        )
3811        .unwrap()
3812        .add_current_schema(schema2)
3813        .unwrap()
3814        .build()
3815        .unwrap()
3816        .metadata;
3817
3818        assert!(metadata.name_exists_in_any_schema("field1")); // exists in both schemas
3819        assert!(metadata.name_exists_in_any_schema("field2")); // exists only in schema1 (historical)
3820        assert!(metadata.name_exists_in_any_schema("field3")); // exists only in schema2 (current)
3821
3822        assert!(!metadata.name_exists_in_any_schema("nonexistent_field"));
3823        assert!(!metadata.name_exists_in_any_schema("field4"));
3824        assert!(!metadata.name_exists_in_any_schema(""));
3825    }
3826
3827    #[test]
3828    fn test_name_exists_in_any_schema_empty_schemas() {
3829        let schema = Schema::builder().with_fields(vec![]).build().unwrap();
3830
3831        let metadata = TableMetadataBuilder::new(
3832            schema,
3833            PartitionSpec::unpartition_spec().into_unbound(),
3834            SortOrder::unsorted_order(),
3835            "s3://test/location".to_string(),
3836            FormatVersion::V2,
3837            HashMap::new(),
3838        )
3839        .unwrap()
3840        .build()
3841        .unwrap()
3842        .metadata;
3843
3844        assert!(!metadata.name_exists_in_any_schema("any_field"));
3845    }
3846
3847    #[test]
3848    fn test_helper_methods_multi_version_scenario() {
3849        // Test a realistic multi-version scenario
3850        let initial_schema = Schema::builder()
3851            .with_fields(vec![
3852                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3853                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3854                NestedField::required(
3855                    3,
3856                    "deprecated_field",
3857                    Type::Primitive(PrimitiveType::String),
3858                )
3859                .into(),
3860            ])
3861            .build()
3862            .unwrap();
3863
3864        let metadata = TableMetadataBuilder::new(
3865            initial_schema,
3866            PartitionSpec::unpartition_spec().into_unbound(),
3867            SortOrder::unsorted_order(),
3868            "s3://test/location".to_string(),
3869            FormatVersion::V2,
3870            HashMap::new(),
3871        )
3872        .unwrap();
3873
3874        let evolved_schema = Schema::builder()
3875            .with_fields(vec![
3876                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3877                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3878                NestedField::required(
3879                    3,
3880                    "deprecated_field",
3881                    Type::Primitive(PrimitiveType::String),
3882                )
3883                .into(),
3884                NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3885                    .into(),
3886            ])
3887            .build()
3888            .unwrap();
3889
3890        // Then add a third schema that removes the deprecated field
3891        let _final_schema = Schema::builder()
3892            .with_fields(vec![
3893                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3894                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3895                NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3896                    .into(),
3897                NestedField::required(5, "latest_field", Type::Primitive(PrimitiveType::Boolean))
3898                    .into(),
3899            ])
3900            .build()
3901            .unwrap();
3902
3903        let final_metadata = metadata
3904            .add_current_schema(evolved_schema)
3905            .unwrap()
3906            .build()
3907            .unwrap()
3908            .metadata;
3909
3910        assert!(!final_metadata.partition_name_exists("nonexistent_partition")); // unpartitioned table
3911
3912        assert!(final_metadata.name_exists_in_any_schema("id")); // exists in both schemas
3913        assert!(final_metadata.name_exists_in_any_schema("name")); // exists in both schemas
3914        assert!(final_metadata.name_exists_in_any_schema("deprecated_field")); // exists in both schemas
3915        assert!(final_metadata.name_exists_in_any_schema("new_field")); // only in current schema
3916        assert!(!final_metadata.name_exists_in_any_schema("never_existed"));
3917    }
3918
3919    #[test]
3920    fn test_invalid_sort_order_id_zero_with_fields() {
3921        let metadata = r#"
3922        {
3923            "format-version": 2,
3924            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
3925            "location": "s3://bucket/test/location",
3926            "last-sequence-number": 111,
3927            "last-updated-ms": 1600000000000,
3928            "last-column-id": 3,
3929            "current-schema-id": 1,
3930            "schemas": [
3931                {
3932                    "type": "struct",
3933                    "schema-id": 1,
3934                    "fields": [
3935                        {"id": 1, "name": "x", "required": true, "type": "long"},
3936                        {"id": 2, "name": "y", "required": true, "type": "long"}
3937                    ]
3938                }
3939            ],
3940            "default-spec-id": 0,
3941            "partition-specs": [{"spec-id": 0, "fields": []}],
3942            "last-partition-id": 999,
3943            "default-sort-order-id": 0,
3944            "sort-orders": [
3945                {
3946                    "order-id": 0,
3947                    "fields": [
3948                        {
3949                            "transform": "identity",
3950                            "source-id": 1,
3951                            "direction": "asc",
3952                            "null-order": "nulls-first"
3953                        }
3954                    ]
3955                }
3956            ],
3957            "properties": {},
3958            "current-snapshot-id": -1,
3959            "snapshots": []
3960        }
3961        "#;
3962
3963        let result: Result<TableMetadata, serde_json::Error> = serde_json::from_str(metadata);
3964
3965        // Should fail because sort order ID 0 is reserved for unsorted order and cannot have fields
3966        assert!(
3967            result.is_err(),
3968            "Parsing should fail for sort order ID 0 with fields"
3969        );
3970    }
3971
3972    #[test]
3973    fn test_table_properties_with_defaults() {
3974        use crate::spec::TableProperties;
3975
3976        let schema = Schema::builder()
3977            .with_fields(vec![
3978                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3979            ])
3980            .build()
3981            .unwrap();
3982
3983        let metadata = TableMetadataBuilder::new(
3984            schema,
3985            PartitionSpec::unpartition_spec().into_unbound(),
3986            SortOrder::unsorted_order(),
3987            "s3://test/location".to_string(),
3988            FormatVersion::V2,
3989            HashMap::new(),
3990        )
3991        .unwrap()
3992        .build()
3993        .unwrap()
3994        .metadata;
3995
3996        let props = metadata.table_properties().unwrap();
3997
3998        assert_eq!(
3999            props.commit_num_retries,
4000            TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT
4001        );
4002        assert_eq!(
4003            props.write_target_file_size_bytes,
4004            TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
4005        );
4006    }
4007
4008    #[test]
4009    fn test_table_properties_with_custom_values() {
4010        use crate::spec::TableProperties;
4011
4012        let schema = Schema::builder()
4013            .with_fields(vec![
4014                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
4015            ])
4016            .build()
4017            .unwrap();
4018
4019        let properties = HashMap::from([
4020            (
4021                TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
4022                "10".to_string(),
4023            ),
4024            (
4025                TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
4026                "1024".to_string(),
4027            ),
4028        ]);
4029
4030        let metadata = TableMetadataBuilder::new(
4031            schema,
4032            PartitionSpec::unpartition_spec().into_unbound(),
4033            SortOrder::unsorted_order(),
4034            "s3://test/location".to_string(),
4035            FormatVersion::V2,
4036            properties,
4037        )
4038        .unwrap()
4039        .build()
4040        .unwrap()
4041        .metadata;
4042
4043        let props = metadata.table_properties().unwrap();
4044
4045        assert_eq!(props.commit_num_retries, 10);
4046        assert_eq!(props.write_target_file_size_bytes, 1024);
4047    }
4048
4049    #[test]
4050    fn test_table_properties_with_invalid_value() {
4051        let schema = Schema::builder()
4052            .with_fields(vec![
4053                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
4054            ])
4055            .build()
4056            .unwrap();
4057
4058        let properties = HashMap::from([(
4059            "commit.retry.num-retries".to_string(),
4060            "not_a_number".to_string(),
4061        )]);
4062
4063        let metadata = TableMetadataBuilder::new(
4064            schema,
4065            PartitionSpec::unpartition_spec().into_unbound(),
4066            SortOrder::unsorted_order(),
4067            "s3://test/location".to_string(),
4068            FormatVersion::V2,
4069            properties,
4070        )
4071        .unwrap()
4072        .build()
4073        .unwrap()
4074        .metadata;
4075
4076        let err = metadata.table_properties().unwrap_err();
4077        assert_eq!(err.kind(), ErrorKind::DataInvalid);
4078        assert!(err.message().contains("Invalid table properties"));
4079    }
4080
4081    #[test]
4082    fn test_v2_to_v3_upgrade_preserves_existing_snapshots_without_row_lineage() {
4083        // Create a v2 table metadata
4084        let schema = Schema::builder()
4085            .with_fields(vec![
4086                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
4087            ])
4088            .build()
4089            .unwrap();
4090
4091        let v2_metadata = TableMetadataBuilder::new(
4092            schema,
4093            PartitionSpec::unpartition_spec().into_unbound(),
4094            SortOrder::unsorted_order(),
4095            "s3://bucket/test/location".to_string(),
4096            FormatVersion::V2,
4097            HashMap::new(),
4098        )
4099        .unwrap()
4100        .build()
4101        .unwrap()
4102        .metadata;
4103
4104        // Add a v2 snapshot
4105        let snapshot = Snapshot::builder()
4106            .with_snapshot_id(1)
4107            .with_timestamp_ms(v2_metadata.last_updated_ms + 1)
4108            .with_sequence_number(1)
4109            .with_schema_id(0)
4110            .with_manifest_list("s3://bucket/test/metadata/snap-1.avro")
4111            .with_summary(Summary {
4112                operation: Operation::Append,
4113                additional_properties: HashMap::from([(
4114                    "added-data-files".to_string(),
4115                    "1".to_string(),
4116                )]),
4117            })
4118            .build();
4119
4120        let v2_with_snapshot = v2_metadata
4121            .into_builder(Some("s3://bucket/test/metadata/v00001.json".to_string()))
4122            .add_snapshot(snapshot)
4123            .unwrap()
4124            .set_ref("main", SnapshotReference {
4125                snapshot_id: 1,
4126                retention: SnapshotRetention::Branch {
4127                    min_snapshots_to_keep: None,
4128                    max_snapshot_age_ms: None,
4129                    max_ref_age_ms: None,
4130                },
4131            })
4132            .unwrap()
4133            .build()
4134            .unwrap()
4135            .metadata;
4136
4137        // Verify v2 serialization works fine
4138        let v2_json = serde_json::to_string(&v2_with_snapshot);
4139        assert!(v2_json.is_ok(), "v2 serialization should work");
4140
4141        // Upgrade to v3
4142        let v3_metadata = v2_with_snapshot
4143            .into_builder(Some("s3://bucket/test/metadata/v00002.json".to_string()))
4144            .upgrade_format_version(FormatVersion::V3)
4145            .unwrap()
4146            .build()
4147            .unwrap()
4148            .metadata;
4149
4150        assert_eq!(v3_metadata.format_version, FormatVersion::V3);
4151        assert_eq!(v3_metadata.next_row_id, INITIAL_ROW_ID);
4152        assert_eq!(v3_metadata.snapshots.len(), 1);
4153
4154        // Verify the snapshot has no row_range
4155        let snapshot = v3_metadata.snapshots.values().next().unwrap();
4156        assert!(
4157            snapshot.row_range().is_none(),
4158            "Snapshot should have no row_range after upgrade"
4159        );
4160
4161        // Try to serialize v3 metadata - this should now work
4162        let v3_json = serde_json::to_string(&v3_metadata);
4163        assert!(
4164            v3_json.is_ok(),
4165            "v3 serialization should work for upgraded tables"
4166        );
4167
4168        // Verify we can deserialize it back
4169        let deserialized: TableMetadata = serde_json::from_str(&v3_json.unwrap()).unwrap();
4170        assert_eq!(deserialized.format_version, FormatVersion::V3);
4171        assert_eq!(deserialized.snapshots.len(), 1);
4172
4173        // Verify the deserialized snapshot still has no row_range
4174        let deserialized_snapshot = deserialized.snapshots.values().next().unwrap();
4175        assert!(
4176            deserialized_snapshot.row_range().is_none(),
4177            "Deserialized snapshot should have no row_range"
4178        );
4179    }
4180
4181    #[test]
4182    fn test_v3_snapshot_with_row_lineage_serialization() {
4183        // Create a v3 table metadata
4184        let schema = Schema::builder()
4185            .with_fields(vec![
4186                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
4187            ])
4188            .build()
4189            .unwrap();
4190
4191        let v3_metadata = TableMetadataBuilder::new(
4192            schema,
4193            PartitionSpec::unpartition_spec().into_unbound(),
4194            SortOrder::unsorted_order(),
4195            "s3://bucket/test/location".to_string(),
4196            FormatVersion::V3,
4197            HashMap::new(),
4198        )
4199        .unwrap()
4200        .build()
4201        .unwrap()
4202        .metadata;
4203
4204        // Add a v3 snapshot with row lineage
4205        let snapshot = Snapshot::builder()
4206            .with_snapshot_id(1)
4207            .with_timestamp_ms(v3_metadata.last_updated_ms + 1)
4208            .with_sequence_number(1)
4209            .with_schema_id(0)
4210            .with_manifest_list("s3://bucket/test/metadata/snap-1.avro")
4211            .with_summary(Summary {
4212                operation: Operation::Append,
4213                additional_properties: HashMap::from([(
4214                    "added-data-files".to_string(),
4215                    "1".to_string(),
4216                )]),
4217            })
4218            .with_row_range(100, 50) // first_row_id=100, added_rows=50
4219            .build();
4220
4221        let v3_with_snapshot = v3_metadata
4222            .into_builder(Some("s3://bucket/test/metadata/v00001.json".to_string()))
4223            .add_snapshot(snapshot)
4224            .unwrap()
4225            .set_ref("main", SnapshotReference {
4226                snapshot_id: 1,
4227                retention: SnapshotRetention::Branch {
4228                    min_snapshots_to_keep: None,
4229                    max_snapshot_age_ms: None,
4230                    max_ref_age_ms: None,
4231                },
4232            })
4233            .unwrap()
4234            .build()
4235            .unwrap()
4236            .metadata;
4237
4238        // Verify the snapshot has row_range
4239        let snapshot = v3_with_snapshot.snapshots.values().next().unwrap();
4240        assert!(
4241            snapshot.row_range().is_some(),
4242            "Snapshot should have row_range"
4243        );
4244        let (first_row_id, added_rows) = snapshot.row_range().unwrap();
4245        assert_eq!(first_row_id, 100);
4246        assert_eq!(added_rows, 50);
4247
4248        // Serialize v3 metadata - this should work
4249        let v3_json = serde_json::to_string(&v3_with_snapshot);
4250        assert!(
4251            v3_json.is_ok(),
4252            "v3 serialization should work for snapshots with row lineage"
4253        );
4254
4255        // Verify we can deserialize it back
4256        let deserialized: TableMetadata = serde_json::from_str(&v3_json.unwrap()).unwrap();
4257        assert_eq!(deserialized.format_version, FormatVersion::V3);
4258        assert_eq!(deserialized.snapshots.len(), 1);
4259
4260        // Verify the deserialized snapshot has the correct row_range
4261        let deserialized_snapshot = deserialized.snapshots.values().next().unwrap();
4262        assert!(
4263            deserialized_snapshot.row_range().is_some(),
4264            "Deserialized snapshot should have row_range"
4265        );
4266        let (deserialized_first_row_id, deserialized_added_rows) =
4267            deserialized_snapshot.row_range().unwrap();
4268        assert_eq!(deserialized_first_row_id, 100);
4269        assert_eq!(deserialized_added_rows, 50);
4270    }
4271}