iceberg/spec/
table_metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the [table metadata](https://iceberg.apache.org/spec/#table-metadata).
19//! The main struct here is [TableMetadataV2] which defines the data for a table.
20
21use std::cmp::Ordering;
22use std::collections::HashMap;
23use std::fmt::{Display, Formatter};
24use std::hash::Hash;
25use std::sync::Arc;
26
27use _serde::TableMetadataEnum;
28use chrono::{DateTime, Utc};
29use serde::{Deserialize, Serialize};
30use serde_repr::{Deserialize_repr, Serialize_repr};
31use uuid::Uuid;
32
33use super::snapshot::SnapshotReference;
34pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder};
35use super::{
36    DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef,
37    SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType,
38    TableProperties, parse_metadata_file_compression,
39};
40use crate::catalog::MetadataLocation;
41use crate::compression::CompressionCodec;
42use crate::error::{Result, timestamp_ms_to_utc};
43use crate::io::FileIO;
44use crate::spec::EncryptedKey;
45use crate::{Error, ErrorKind};
46
47static MAIN_BRANCH: &str = "main";
48pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
49
50/// Sentinel value used by the Java implementation and older metadata files
51/// to represent a missing/empty current snapshot ID. During deserialization,
52/// this value is normalized to `None`.
53pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
54pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
55
56/// Initial row id for row lineage for new v3 tables and older tables upgrading to v3.
57pub const INITIAL_ROW_ID: u64 = 0;
58/// Minimum format version that supports row lineage (v3).
59pub const MIN_FORMAT_VERSION_ROW_LINEAGE: FormatVersion = FormatVersion::V3;
60/// Reference to [`TableMetadata`].
61pub type TableMetadataRef = Arc<TableMetadata>;
62
63#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
64#[serde(try_from = "TableMetadataEnum")]
65/// Fields for the version 2 of the table metadata.
66///
67/// We assume that this data structure is always valid, so we will panic when invalid error happens.
68/// We check the validity of this data structure when constructing.
69pub struct TableMetadata {
70    /// Integer Version for the format.
71    pub(crate) format_version: FormatVersion,
72    /// A UUID that identifies the table
73    pub(crate) table_uuid: Uuid,
74    /// Location tables base location
75    pub(crate) location: String,
76    /// The tables highest sequence number
77    pub(crate) last_sequence_number: i64,
78    /// Timestamp in milliseconds from the unix epoch when the table was last updated.
79    pub(crate) last_updated_ms: i64,
80    /// An integer; the highest assigned column ID for the table.
81    pub(crate) last_column_id: i32,
82    /// A list of schemas, stored as objects with schema-id.
83    pub(crate) schemas: HashMap<i32, SchemaRef>,
84    /// ID of the table’s current schema.
85    pub(crate) current_schema_id: i32,
86    /// A list of partition specs, stored as full partition spec objects.
87    pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
88    /// ID of the “current” spec that writers should use by default.
89    pub(crate) default_spec: PartitionSpecRef,
90    /// Partition type of the default partition spec.
91    pub(crate) default_partition_type: StructType,
92    /// An integer; the highest assigned partition field ID across all partition specs for the table.
93    pub(crate) last_partition_id: i32,
94    ///A string to string map of table properties. This is used to control settings that
95    /// affect reading and writing and is not intended to be used for arbitrary metadata.
96    /// For example, commit.retry.num-retries is used to control the number of commit retries.
97    pub(crate) properties: HashMap<String, String>,
98    /// long ID of the current table snapshot; must be the same as the current
99    /// ID of the main branch in refs.
100    pub(crate) current_snapshot_id: Option<i64>,
101    ///A list of valid snapshots. Valid snapshots are snapshots for which all
102    /// data files exist in the file system. A data file must not be deleted
103    /// from the file system until the last snapshot in which it was listed is
104    /// garbage collected.
105    pub(crate) snapshots: HashMap<i64, SnapshotRef>,
106    /// A list (optional) of timestamp and snapshot ID pairs that encodes changes
107    /// to the current snapshot for the table. Each time the current-snapshot-id
108    /// is changed, a new entry should be added with the last-updated-ms
109    /// and the new current-snapshot-id. When snapshots are expired from
110    /// the list of valid snapshots, all entries before a snapshot that has
111    /// expired should be removed.
112    pub(crate) snapshot_log: Vec<SnapshotLog>,
113
114    /// A list (optional) of timestamp and metadata file location pairs
115    /// that encodes changes to the previous metadata files for the table.
116    /// Each time a new metadata file is created, a new entry of the
117    /// previous metadata file location should be added to the list.
118    /// Tables can be configured to remove the oldest metadata log entries and
119    /// keep a fixed-size log of the most recent entries after a commit.
120    pub(crate) metadata_log: Vec<MetadataLog>,
121
122    /// A list of sort orders, stored as full sort order objects.
123    pub(crate) sort_orders: HashMap<i64, SortOrderRef>,
124    /// Default sort order id of the table. Note that this could be used by
125    /// writers, but is not used when reading because reads use the specs
126    /// stored in manifest files.
127    pub(crate) default_sort_order_id: i64,
128    /// A map of snapshot references. The map keys are the unique snapshot reference
129    /// names in the table, and the map values are snapshot reference objects.
130    /// There is always a main branch reference pointing to the current-snapshot-id
131    /// even if the refs map is null.
132    pub(crate) refs: HashMap<String, SnapshotReference>,
133    /// Mapping of snapshot ids to statistics files.
134    pub(crate) statistics: HashMap<i64, StatisticsFile>,
135    /// Mapping of snapshot ids to partition statistics files.
136    pub(crate) partition_statistics: HashMap<i64, PartitionStatisticsFile>,
137    /// Encryption Keys - map of key id to the actual key
138    pub(crate) encryption_keys: HashMap<String, EncryptedKey>,
139    /// Next row id to be assigned for Row Lineage (v3)
140    pub(crate) next_row_id: u64,
141}
142
143impl TableMetadata {
144    /// Convert this Table Metadata into a builder for modification.
145    ///
146    /// `current_file_location` is the location where the current version
147    /// of the metadata file is stored. This is used to update the metadata log.
148    /// If `current_file_location` is `None`, the metadata log will not be updated.
149    /// This should only be used to stage-create tables.
150    #[must_use]
151    pub fn into_builder(self, current_file_location: Option<String>) -> TableMetadataBuilder {
152        TableMetadataBuilder::new_from_metadata(self, current_file_location)
153    }
154
155    /// Check if a partition field name exists in any partition spec.
156    #[inline]
157    pub(crate) fn partition_name_exists(&self, name: &str) -> bool {
158        self.partition_specs
159            .values()
160            .any(|spec| spec.fields().iter().any(|pf| pf.name == name))
161    }
162
163    /// Check if a field name exists in any schema.
164    #[inline]
165    pub(crate) fn name_exists_in_any_schema(&self, name: &str) -> bool {
166        self.schemas
167            .values()
168            .any(|schema| schema.field_by_name(name).is_some())
169    }
170
171    /// Returns format version of this metadata.
172    #[inline]
173    pub fn format_version(&self) -> FormatVersion {
174        self.format_version
175    }
176
177    /// Returns uuid of current table.
178    #[inline]
179    pub fn uuid(&self) -> Uuid {
180        self.table_uuid
181    }
182
183    /// Returns table location.
184    #[inline]
185    pub fn location(&self) -> &str {
186        self.location.as_str()
187    }
188
189    /// Returns last sequence number.
190    #[inline]
191    pub fn last_sequence_number(&self) -> i64 {
192        self.last_sequence_number
193    }
194
195    /// Returns the next sequence number for the table.
196    ///
197    /// For format version 1, it always returns the initial sequence number.
198    /// For other versions, it returns the last sequence number incremented by 1.
199    #[inline]
200    pub fn next_sequence_number(&self) -> i64 {
201        match self.format_version {
202            FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER,
203            _ => self.last_sequence_number + 1,
204        }
205    }
206
207    /// Returns the last column id.
208    #[inline]
209    pub fn last_column_id(&self) -> i32 {
210        self.last_column_id
211    }
212
213    /// Returns the last partition_id
214    #[inline]
215    pub fn last_partition_id(&self) -> i32 {
216        self.last_partition_id
217    }
218
219    /// Returns last updated time.
220    #[inline]
221    pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
222        timestamp_ms_to_utc(self.last_updated_ms)
223    }
224
225    /// Returns last updated time in milliseconds.
226    #[inline]
227    pub fn last_updated_ms(&self) -> i64 {
228        self.last_updated_ms
229    }
230
231    /// Returns schemas
232    #[inline]
233    pub fn schemas_iter(&self) -> impl ExactSizeIterator<Item = &SchemaRef> {
234        self.schemas.values()
235    }
236
237    /// Lookup schema by id.
238    #[inline]
239    pub fn schema_by_id(&self, schema_id: SchemaId) -> Option<&SchemaRef> {
240        self.schemas.get(&schema_id)
241    }
242
243    /// Get current schema
244    #[inline]
245    pub fn current_schema(&self) -> &SchemaRef {
246        self.schema_by_id(self.current_schema_id)
247            .expect("Current schema id set, but not found in table metadata")
248    }
249
250    /// Get the id of the current schema
251    #[inline]
252    pub fn current_schema_id(&self) -> SchemaId {
253        self.current_schema_id
254    }
255
256    /// Returns all partition specs.
257    #[inline]
258    pub fn partition_specs_iter(&self) -> impl ExactSizeIterator<Item = &PartitionSpecRef> {
259        self.partition_specs.values()
260    }
261
262    /// Lookup partition spec by id.
263    #[inline]
264    pub fn partition_spec_by_id(&self, spec_id: i32) -> Option<&PartitionSpecRef> {
265        self.partition_specs.get(&spec_id)
266    }
267
268    /// Get default partition spec
269    #[inline]
270    pub fn default_partition_spec(&self) -> &PartitionSpecRef {
271        &self.default_spec
272    }
273
274    /// Return the partition type of the default partition spec.
275    #[inline]
276    pub fn default_partition_type(&self) -> &StructType {
277        &self.default_partition_type
278    }
279
280    #[inline]
281    /// Returns spec id of the "current" partition spec.
282    pub fn default_partition_spec_id(&self) -> i32 {
283        self.default_spec.spec_id()
284    }
285
286    /// Returns all snapshots
287    #[inline]
288    pub fn snapshots(&self) -> impl ExactSizeIterator<Item = &SnapshotRef> {
289        self.snapshots.values()
290    }
291
292    /// Lookup snapshot by id.
293    #[inline]
294    pub fn snapshot_by_id(&self, snapshot_id: i64) -> Option<&SnapshotRef> {
295        self.snapshots.get(&snapshot_id)
296    }
297
298    /// Returns snapshot history.
299    #[inline]
300    pub fn history(&self) -> &[SnapshotLog] {
301        &self.snapshot_log
302    }
303
304    /// Returns the metadata log.
305    #[inline]
306    pub fn metadata_log(&self) -> &[MetadataLog] {
307        &self.metadata_log
308    }
309
310    /// Get current snapshot
311    #[inline]
312    pub fn current_snapshot(&self) -> Option<&SnapshotRef> {
313        self.current_snapshot_id.map(|s| {
314            self.snapshot_by_id(s)
315                .expect("Current snapshot id has been set, but doesn't exist in metadata")
316        })
317    }
318
319    /// Get the current snapshot id
320    #[inline]
321    pub fn current_snapshot_id(&self) -> Option<i64> {
322        self.current_snapshot_id
323    }
324
325    /// Get the snapshot for a reference
326    /// Returns an option if the `ref_name` is not found
327    #[inline]
328    pub fn snapshot_for_ref(&self, ref_name: &str) -> Option<&SnapshotRef> {
329        self.refs.get(ref_name).map(|r| {
330            self.snapshot_by_id(r.snapshot_id)
331                .unwrap_or_else(|| panic!("Snapshot id of ref {ref_name} doesn't exist"))
332        })
333    }
334
335    /// Return all sort orders.
336    #[inline]
337    pub fn sort_orders_iter(&self) -> impl ExactSizeIterator<Item = &SortOrderRef> {
338        self.sort_orders.values()
339    }
340
341    /// Lookup sort order by id.
342    #[inline]
343    pub fn sort_order_by_id(&self, sort_order_id: i64) -> Option<&SortOrderRef> {
344        self.sort_orders.get(&sort_order_id)
345    }
346
347    /// Returns default sort order id.
348    #[inline]
349    pub fn default_sort_order(&self) -> &SortOrderRef {
350        self.sort_orders
351            .get(&self.default_sort_order_id)
352            .expect("Default order id has been set, but not found in table metadata!")
353    }
354
355    /// Returns default sort order id.
356    #[inline]
357    pub fn default_sort_order_id(&self) -> i64 {
358        self.default_sort_order_id
359    }
360
361    /// Returns properties of table.
362    #[inline]
363    pub fn properties(&self) -> &HashMap<String, String> {
364        &self.properties
365    }
366
367    /// Returns the metadata compression codec from table properties.
368    ///
369    /// Returns `CompressionCodec::None` if compression is disabled or not configured.
370    /// Returns `CompressionCodec::Gzip` if gzip compression is enabled.
371    ///
372    /// # Errors
373    ///
374    /// Returns an error if the compression codec property has an invalid value.
375    pub fn metadata_compression_codec(&self) -> Result<CompressionCodec> {
376        parse_metadata_file_compression(&self.properties)
377    }
378
379    /// Returns typed table properties parsed from the raw properties map with defaults.
380    pub fn table_properties(&self) -> Result<TableProperties> {
381        TableProperties::try_from(&self.properties).map_err(|e| {
382            Error::new(ErrorKind::DataInvalid, "Invalid table properties").with_source(e)
383        })
384    }
385
386    /// Return location of statistics files.
387    #[inline]
388    pub fn statistics_iter(&self) -> impl ExactSizeIterator<Item = &StatisticsFile> {
389        self.statistics.values()
390    }
391
392    /// Return location of partition statistics files.
393    #[inline]
394    pub fn partition_statistics_iter(
395        &self,
396    ) -> impl ExactSizeIterator<Item = &PartitionStatisticsFile> {
397        self.partition_statistics.values()
398    }
399
400    /// Get a statistics file for a snapshot id.
401    #[inline]
402    pub fn statistics_for_snapshot(&self, snapshot_id: i64) -> Option<&StatisticsFile> {
403        self.statistics.get(&snapshot_id)
404    }
405
406    /// Get a partition statistics file for a snapshot id.
407    #[inline]
408    pub fn partition_statistics_for_snapshot(
409        &self,
410        snapshot_id: i64,
411    ) -> Option<&PartitionStatisticsFile> {
412        self.partition_statistics.get(&snapshot_id)
413    }
414
415    fn construct_refs(&mut self) {
416        if let Some(current_snapshot_id) = self.current_snapshot_id
417            && !self.refs.contains_key(MAIN_BRANCH)
418        {
419            self.refs
420                .insert(MAIN_BRANCH.to_string(), SnapshotReference {
421                    snapshot_id: current_snapshot_id,
422                    retention: SnapshotRetention::Branch {
423                        min_snapshots_to_keep: None,
424                        max_snapshot_age_ms: None,
425                        max_ref_age_ms: None,
426                    },
427                });
428        }
429    }
430
431    /// Iterate over all encryption keys
432    #[inline]
433    pub fn encryption_keys_iter(&self) -> impl ExactSizeIterator<Item = &EncryptedKey> {
434        self.encryption_keys.values()
435    }
436
437    /// Get the encryption key for a given key id
438    #[inline]
439    pub fn encryption_key(&self, key_id: &str) -> Option<&EncryptedKey> {
440        self.encryption_keys.get(key_id)
441    }
442
443    /// Get the next row id to be assigned
444    #[inline]
445    pub fn next_row_id(&self) -> u64 {
446        self.next_row_id
447    }
448
449    /// Read table metadata from the given location.
450    pub async fn read_from(
451        file_io: &FileIO,
452        metadata_location: impl AsRef<str>,
453    ) -> Result<TableMetadata> {
454        let metadata_location = metadata_location.as_ref();
455        let input_file = file_io.new_input(metadata_location)?;
456        let metadata_content = input_file.read().await?;
457
458        // Check if the file is compressed by looking for the gzip "magic number".
459        let metadata = if metadata_content.len() > 2
460            && metadata_content[0] == 0x1F
461            && metadata_content[1] == 0x8B
462        {
463            let decompressed_data = CompressionCodec::gzip_default()
464                .decompress(metadata_content.to_vec())
465                .map_err(|e| {
466                    Error::new(
467                        ErrorKind::DataInvalid,
468                        "Trying to read compressed metadata file",
469                    )
470                    .with_context("file_path", metadata_location)
471                    .with_source(e)
472                })?;
473            serde_json::from_slice(&decompressed_data)?
474        } else {
475            serde_json::from_slice(&metadata_content)?
476        };
477
478        Ok(metadata)
479    }
480
481    /// Write table metadata to the given location.
482    pub async fn write_to(
483        &self,
484        file_io: &FileIO,
485        metadata_location: &MetadataLocation,
486    ) -> Result<()> {
487        let json_data = serde_json::to_vec(self)?;
488
489        // Check if compression codec from properties matches the one in metadata_location
490        let codec = parse_metadata_file_compression(&self.properties)?;
491
492        if codec != metadata_location.compression_codec() {
493            return Err(Error::new(
494                ErrorKind::DataInvalid,
495                format!(
496                    "Compression codec mismatch: metadata_location has {:?}, but table properties specify {:?}",
497                    metadata_location.compression_codec(),
498                    codec
499                ),
500            ));
501        }
502
503        // Apply compression based on codec
504        let data_to_write = match codec {
505            CompressionCodec::Gzip(_) => codec.compress(json_data)?,
506            CompressionCodec::None => json_data,
507            _ => {
508                return Err(Error::new(
509                    ErrorKind::DataInvalid,
510                    format!("Unsupported metadata compression codec: {codec:?}"),
511                ));
512            }
513        };
514
515        file_io
516            .new_output(metadata_location.to_string())?
517            .write(data_to_write.into())
518            .await
519    }
520
521    /// Normalize this partition spec.
522    ///
523    /// This is an internal method
524    /// meant to be called after constructing table metadata from untrusted sources.
525    /// We run this method after json deserialization.
526    /// All constructors for `TableMetadata` which are part of `iceberg-rust`
527    /// should return normalized `TableMetadata`.
528    pub(super) fn try_normalize(&mut self) -> Result<&mut Self> {
529        self.validate_current_schema()?;
530        self.normalize_current_snapshot()?;
531        self.construct_refs();
532        self.validate_refs()?;
533        self.validate_chronological_snapshot_logs()?;
534        self.validate_chronological_metadata_logs()?;
535        // Normalize location (remove trailing slash)
536        self.location = self.location.trim_end_matches('/').to_string();
537        self.validate_snapshot_sequence_number()?;
538        self.try_normalize_partition_spec()?;
539        self.try_normalize_sort_order()?;
540        Ok(self)
541    }
542
543    /// If the default partition spec is not present in specs, add it
544    fn try_normalize_partition_spec(&mut self) -> Result<()> {
545        if self
546            .partition_spec_by_id(self.default_spec.spec_id())
547            .is_none()
548        {
549            self.partition_specs.insert(
550                self.default_spec.spec_id(),
551                Arc::new(Arc::unwrap_or_clone(self.default_spec.clone())),
552            );
553        }
554
555        Ok(())
556    }
557
558    /// If the default sort order is unsorted but the sort order is not present, add it
559    fn try_normalize_sort_order(&mut self) -> Result<()> {
560        // Validate that sort order ID 0 (reserved for unsorted) has no fields
561        if let Some(sort_order) = self.sort_order_by_id(SortOrder::UNSORTED_ORDER_ID)
562            && !sort_order.fields.is_empty()
563        {
564            return Err(Error::new(
565                ErrorKind::Unexpected,
566                format!(
567                    "Sort order ID {} is reserved for unsorted order",
568                    SortOrder::UNSORTED_ORDER_ID
569                ),
570            ));
571        }
572
573        if self.sort_order_by_id(self.default_sort_order_id).is_some() {
574            return Ok(());
575        }
576
577        if self.default_sort_order_id != SortOrder::UNSORTED_ORDER_ID {
578            return Err(Error::new(
579                ErrorKind::DataInvalid,
580                format!(
581                    "No sort order exists with the default sort order id {}.",
582                    self.default_sort_order_id
583                ),
584            ));
585        }
586
587        let sort_order = SortOrder::unsorted_order();
588        self.sort_orders
589            .insert(SortOrder::UNSORTED_ORDER_ID, Arc::new(sort_order));
590        Ok(())
591    }
592
593    /// Validate the current schema is set and exists.
594    fn validate_current_schema(&self) -> Result<()> {
595        if self.schema_by_id(self.current_schema_id).is_none() {
596            return Err(Error::new(
597                ErrorKind::DataInvalid,
598                format!(
599                    "No schema exists with the current schema id {}.",
600                    self.current_schema_id
601                ),
602            ));
603        }
604        Ok(())
605    }
606
607    /// If current snapshot is Some(-1) then set it to None.
608    fn normalize_current_snapshot(&mut self) -> Result<()> {
609        if let Some(current_snapshot_id) = self.current_snapshot_id {
610            if current_snapshot_id == EMPTY_SNAPSHOT_ID {
611                self.current_snapshot_id = None;
612            } else if self.snapshot_by_id(current_snapshot_id).is_none() {
613                return Err(Error::new(
614                    ErrorKind::DataInvalid,
615                    format!(
616                        "Snapshot for current snapshot id {current_snapshot_id} does not exist in the existing snapshots list"
617                    ),
618                ));
619            }
620        }
621        Ok(())
622    }
623
624    /// Validate that all refs are valid (snapshot exists)
625    fn validate_refs(&self) -> Result<()> {
626        for (name, snapshot_ref) in self.refs.iter() {
627            if self.snapshot_by_id(snapshot_ref.snapshot_id).is_none() {
628                return Err(Error::new(
629                    ErrorKind::DataInvalid,
630                    format!(
631                        "Snapshot for reference {name} does not exist in the existing snapshots list"
632                    ),
633                ));
634            }
635        }
636
637        let main_ref = self.refs.get(MAIN_BRANCH);
638        if self.current_snapshot_id.is_some() {
639            if let Some(main_ref) = main_ref
640                && main_ref.snapshot_id != self.current_snapshot_id.unwrap_or_default()
641            {
642                return Err(Error::new(
643                    ErrorKind::DataInvalid,
644                    format!(
645                        "Current snapshot id does not match main branch ({:?} != {:?})",
646                        self.current_snapshot_id.unwrap_or_default(),
647                        main_ref.snapshot_id
648                    ),
649                ));
650            }
651        } else if main_ref.is_some() {
652            return Err(Error::new(
653                ErrorKind::DataInvalid,
654                "Current snapshot is not set, but main branch exists",
655            ));
656        }
657
658        Ok(())
659    }
660
661    /// Validate that for V1 Metadata the last_sequence_number is 0
662    fn validate_snapshot_sequence_number(&self) -> Result<()> {
663        if self.format_version < FormatVersion::V2 && self.last_sequence_number != 0 {
664            return Err(Error::new(
665                ErrorKind::DataInvalid,
666                format!(
667                    "Last sequence number must be 0 in v1. Found {}",
668                    self.last_sequence_number
669                ),
670            ));
671        }
672
673        if self.format_version >= FormatVersion::V2
674            && let Some(snapshot) = self
675                .snapshots
676                .values()
677                .find(|snapshot| snapshot.sequence_number() > self.last_sequence_number)
678        {
679            return Err(Error::new(
680                ErrorKind::DataInvalid,
681                format!(
682                    "Invalid snapshot with id {} and sequence number {} greater than last sequence number {}",
683                    snapshot.snapshot_id(),
684                    snapshot.sequence_number(),
685                    self.last_sequence_number
686                ),
687            ));
688        }
689
690        Ok(())
691    }
692
693    /// Validate snapshots logs are chronological and last updated is after the last snapshot log.
694    fn validate_chronological_snapshot_logs(&self) -> Result<()> {
695        for window in self.snapshot_log.windows(2) {
696            let (prev, curr) = (&window[0], &window[1]);
697            // commits can happen concurrently from different machines.
698            // A tolerance helps us avoid failure for small clock skew
699            if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
700                return Err(Error::new(
701                    ErrorKind::DataInvalid,
702                    "Expected sorted snapshot log entries",
703                ));
704            }
705        }
706
707        if let Some(last) = self.snapshot_log.last() {
708            // commits can happen concurrently from different machines.
709            // A tolerance helps us avoid failure for small clock skew
710            if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
711                return Err(Error::new(
712                    ErrorKind::DataInvalid,
713                    format!(
714                        "Invalid update timestamp {}: before last snapshot log entry at {}",
715                        self.last_updated_ms, last.timestamp_ms
716                    ),
717                ));
718            }
719        }
720        Ok(())
721    }
722
723    fn validate_chronological_metadata_logs(&self) -> Result<()> {
724        for window in self.metadata_log.windows(2) {
725            let (prev, curr) = (&window[0], &window[1]);
726            // commits can happen concurrently from different machines.
727            // A tolerance helps us avoid failure for small clock skew
728            if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
729                return Err(Error::new(
730                    ErrorKind::DataInvalid,
731                    "Expected sorted metadata log entries",
732                ));
733            }
734        }
735
736        if let Some(last) = self.metadata_log.last() {
737            // commits can happen concurrently from different machines.
738            // A tolerance helps us avoid failure for small clock skew
739            if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
740                return Err(Error::new(
741                    ErrorKind::DataInvalid,
742                    format!(
743                        "Invalid update timestamp {}: before last metadata log entry at {}",
744                        self.last_updated_ms, last.timestamp_ms
745                    ),
746                ));
747            }
748        }
749
750        Ok(())
751    }
752}
753
754pub(super) mod _serde {
755    use std::borrow::BorrowMut;
756    /// This is a helper module that defines types to help with serialization/deserialization.
757    /// For deserialization the input first gets read into either the [TableMetadataV1] or [TableMetadataV2] struct
758    /// and then converted into the [TableMetadata] struct. Serialization works the other way around.
759    /// [TableMetadataV1] and [TableMetadataV2] are internal struct that are only used for serialization and deserialization.
760    use std::collections::HashMap;
761    /// This is a helper module that defines types to help with serialization/deserialization.
762    /// For deserialization the input first gets read into either the [TableMetadataV1] or [TableMetadataV2] struct
763    /// and then converted into the [TableMetadata] struct. Serialization works the other way around.
764    /// [TableMetadataV1] and [TableMetadataV2] are internal struct that are only used for serialization and deserialization.
765    use std::sync::Arc;
766
767    use serde::{Deserialize, Serialize};
768    use uuid::Uuid;
769
770    use super::{
771        DEFAULT_PARTITION_SPEC_ID, EMPTY_SNAPSHOT_ID, FormatVersion, MAIN_BRANCH, MetadataLog,
772        SnapshotLog, TableMetadata,
773    };
774    use crate::spec::schema::_serde::{SchemaV1, SchemaV2};
775    use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2, SnapshotV3};
776    use crate::spec::{
777        EncryptedKey, INITIAL_ROW_ID, PartitionField, PartitionSpec, PartitionSpecRef,
778        PartitionStatisticsFile, Schema, SchemaRef, Snapshot, SnapshotReference, SnapshotRetention,
779        SortOrder, StatisticsFile,
780    };
781    use crate::{Error, ErrorKind};
782
783    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
784    #[serde(untagged)]
785    pub(super) enum TableMetadataEnum {
786        V3(TableMetadataV3),
787        V2(TableMetadataV2),
788        V1(TableMetadataV1),
789    }
790
791    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
792    #[serde(rename_all = "kebab-case")]
793    /// Defines the structure of a v2 table metadata for serialization/deserialization
794    pub(super) struct TableMetadataV3 {
795        pub format_version: VersionNumber<3>,
796        #[serde(flatten)]
797        pub shared: TableMetadataV2V3Shared,
798        pub next_row_id: u64,
799        #[serde(skip_serializing_if = "Option::is_none")]
800        pub encryption_keys: Option<Vec<EncryptedKey>>,
801        #[serde(skip_serializing_if = "Option::is_none")]
802        pub snapshots: Option<Vec<SnapshotV3>>,
803    }
804
805    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
806    #[serde(rename_all = "kebab-case")]
807    /// Defines the structure of a v2 table metadata for serialization/deserialization
808    pub(super) struct TableMetadataV2V3Shared {
809        pub table_uuid: Uuid,
810        pub location: String,
811        pub last_sequence_number: i64,
812        pub last_updated_ms: i64,
813        pub last_column_id: i32,
814        pub schemas: Vec<SchemaV2>,
815        pub current_schema_id: i32,
816        pub partition_specs: Vec<PartitionSpec>,
817        pub default_spec_id: i32,
818        pub last_partition_id: i32,
819        #[serde(skip_serializing_if = "Option::is_none")]
820        pub properties: Option<HashMap<String, String>>,
821        #[serde(skip_serializing_if = "Option::is_none")]
822        pub current_snapshot_id: Option<i64>,
823        #[serde(skip_serializing_if = "Option::is_none")]
824        pub snapshot_log: Option<Vec<SnapshotLog>>,
825        #[serde(skip_serializing_if = "Option::is_none")]
826        pub metadata_log: Option<Vec<MetadataLog>>,
827        pub sort_orders: Vec<SortOrder>,
828        pub default_sort_order_id: i64,
829        #[serde(skip_serializing_if = "Option::is_none")]
830        pub refs: Option<HashMap<String, SnapshotReference>>,
831        #[serde(default, skip_serializing_if = "Vec::is_empty")]
832        pub statistics: Vec<StatisticsFile>,
833        #[serde(default, skip_serializing_if = "Vec::is_empty")]
834        pub partition_statistics: Vec<PartitionStatisticsFile>,
835    }
836
837    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
838    #[serde(rename_all = "kebab-case")]
839    /// Defines the structure of a v2 table metadata for serialization/deserialization
840    pub(super) struct TableMetadataV2 {
841        pub format_version: VersionNumber<2>,
842        #[serde(flatten)]
843        pub shared: TableMetadataV2V3Shared,
844        #[serde(skip_serializing_if = "Option::is_none")]
845        pub snapshots: Option<Vec<SnapshotV2>>,
846    }
847
848    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
849    #[serde(rename_all = "kebab-case")]
850    /// Defines the structure of a v1 table metadata for serialization/deserialization
851    pub(super) struct TableMetadataV1 {
852        pub format_version: VersionNumber<1>,
853        #[serde(skip_serializing_if = "Option::is_none")]
854        pub table_uuid: Option<Uuid>,
855        pub location: String,
856        pub last_updated_ms: i64,
857        pub last_column_id: i32,
858        /// `schema` is optional to prioritize `schemas` and `current-schema-id`, allowing liberal reading of V1 metadata.
859        pub schema: Option<SchemaV1>,
860        #[serde(skip_serializing_if = "Option::is_none")]
861        pub schemas: Option<Vec<SchemaV1>>,
862        #[serde(skip_serializing_if = "Option::is_none")]
863        pub current_schema_id: Option<i32>,
864        /// `partition_spec` is optional to prioritize `partition_specs`, aligning with liberal reading of potentially invalid V1 metadata.
865        pub partition_spec: Option<Vec<PartitionField>>,
866        #[serde(skip_serializing_if = "Option::is_none")]
867        pub partition_specs: Option<Vec<PartitionSpec>>,
868        #[serde(skip_serializing_if = "Option::is_none")]
869        pub default_spec_id: Option<i32>,
870        #[serde(skip_serializing_if = "Option::is_none")]
871        pub last_partition_id: Option<i32>,
872        #[serde(skip_serializing_if = "Option::is_none")]
873        pub properties: Option<HashMap<String, String>>,
874        #[serde(skip_serializing_if = "Option::is_none")]
875        pub current_snapshot_id: Option<i64>,
876        #[serde(skip_serializing_if = "Option::is_none")]
877        pub snapshots: Option<Vec<SnapshotV1>>,
878        #[serde(skip_serializing_if = "Option::is_none")]
879        pub snapshot_log: Option<Vec<SnapshotLog>>,
880        #[serde(skip_serializing_if = "Option::is_none")]
881        pub metadata_log: Option<Vec<MetadataLog>>,
882        pub sort_orders: Option<Vec<SortOrder>>,
883        pub default_sort_order_id: Option<i64>,
884        #[serde(default, skip_serializing_if = "Vec::is_empty")]
885        pub statistics: Vec<StatisticsFile>,
886        #[serde(default, skip_serializing_if = "Vec::is_empty")]
887        pub partition_statistics: Vec<PartitionStatisticsFile>,
888    }
889
890    /// Helper to serialize and deserialize the format version.
891    #[derive(Debug, PartialEq, Eq)]
892    pub(crate) struct VersionNumber<const V: u8>;
893
894    impl Serialize for TableMetadata {
895        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
896        where S: serde::Serializer {
897            // we must do a clone here
898            let table_metadata_enum: TableMetadataEnum =
899                self.clone().try_into().map_err(serde::ser::Error::custom)?;
900
901            table_metadata_enum.serialize(serializer)
902        }
903    }
904
905    impl<const V: u8> Serialize for VersionNumber<V> {
906        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
907        where S: serde::Serializer {
908            serializer.serialize_u8(V)
909        }
910    }
911
912    impl<'de, const V: u8> Deserialize<'de> for VersionNumber<V> {
913        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
914        where D: serde::Deserializer<'de> {
915            let value = u8::deserialize(deserializer)?;
916            if value == V {
917                Ok(VersionNumber::<V>)
918            } else {
919                Err(serde::de::Error::custom("Invalid Version"))
920            }
921        }
922    }
923
924    impl TryFrom<TableMetadataEnum> for TableMetadata {
925        type Error = Error;
926        fn try_from(value: TableMetadataEnum) -> Result<Self, Error> {
927            match value {
928                TableMetadataEnum::V3(value) => value.try_into(),
929                TableMetadataEnum::V2(value) => value.try_into(),
930                TableMetadataEnum::V1(value) => value.try_into(),
931            }
932        }
933    }
934
935    impl TryFrom<TableMetadata> for TableMetadataEnum {
936        type Error = Error;
937        fn try_from(value: TableMetadata) -> Result<Self, Error> {
938            Ok(match value.format_version {
939                FormatVersion::V3 => TableMetadataEnum::V3(value.try_into()?),
940                FormatVersion::V2 => TableMetadataEnum::V2(value.into()),
941                FormatVersion::V1 => TableMetadataEnum::V1(value.try_into()?),
942            })
943        }
944    }
945
946    impl TryFrom<TableMetadataV3> for TableMetadata {
947        type Error = Error;
948        fn try_from(value: TableMetadataV3) -> Result<Self, self::Error> {
949            let TableMetadataV3 {
950                format_version: _,
951                shared: value,
952                next_row_id,
953                encryption_keys,
954                snapshots,
955            } = value;
956            let current_snapshot_id = if value.current_snapshot_id == Some(EMPTY_SNAPSHOT_ID) {
957                None
958            } else {
959                value.current_snapshot_id
960            };
961            let schemas = HashMap::from_iter(
962                value
963                    .schemas
964                    .into_iter()
965                    .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
966                    .collect::<Result<Vec<_>, Error>>()?,
967            );
968
969            let current_schema: &SchemaRef =
970                schemas.get(&value.current_schema_id).ok_or_else(|| {
971                    Error::new(
972                        ErrorKind::DataInvalid,
973                        format!(
974                            "No schema exists with the current schema id {}.",
975                            value.current_schema_id
976                        ),
977                    )
978                })?;
979            let partition_specs = HashMap::from_iter(
980                value
981                    .partition_specs
982                    .into_iter()
983                    .map(|x| (x.spec_id(), Arc::new(x))),
984            );
985            let default_spec_id = value.default_spec_id;
986            let default_spec: PartitionSpecRef = partition_specs
987                .get(&value.default_spec_id)
988                .map(|spec| (**spec).clone())
989                .or_else(|| {
990                    (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
991                        .then(PartitionSpec::unpartition_spec)
992                })
993                .ok_or_else(|| {
994                    Error::new(
995                        ErrorKind::DataInvalid,
996                        format!("Default partition spec {default_spec_id} not found"),
997                    )
998                })?
999                .into();
1000            let default_partition_type = default_spec.partition_type(current_schema)?;
1001
1002            let mut metadata = TableMetadata {
1003                format_version: FormatVersion::V3,
1004                table_uuid: value.table_uuid,
1005                location: value.location,
1006                last_sequence_number: value.last_sequence_number,
1007                last_updated_ms: value.last_updated_ms,
1008                last_column_id: value.last_column_id,
1009                current_schema_id: value.current_schema_id,
1010                schemas,
1011                partition_specs,
1012                default_partition_type,
1013                default_spec,
1014                last_partition_id: value.last_partition_id,
1015                properties: value.properties.unwrap_or_default(),
1016                current_snapshot_id,
1017                snapshots: snapshots
1018                    .map(|snapshots| {
1019                        HashMap::from_iter(
1020                            snapshots
1021                                .into_iter()
1022                                .map(|x| (x.snapshot_id, Arc::new(x.into()))),
1023                        )
1024                    })
1025                    .unwrap_or_default(),
1026                snapshot_log: value.snapshot_log.unwrap_or_default(),
1027                metadata_log: value.metadata_log.unwrap_or_default(),
1028                sort_orders: HashMap::from_iter(
1029                    value
1030                        .sort_orders
1031                        .into_iter()
1032                        .map(|x| (x.order_id, Arc::new(x))),
1033                ),
1034                default_sort_order_id: value.default_sort_order_id,
1035                refs: value.refs.unwrap_or_else(|| {
1036                    if let Some(snapshot_id) = current_snapshot_id {
1037                        HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1038                            snapshot_id,
1039                            retention: SnapshotRetention::Branch {
1040                                min_snapshots_to_keep: None,
1041                                max_snapshot_age_ms: None,
1042                                max_ref_age_ms: None,
1043                            },
1044                        })])
1045                    } else {
1046                        HashMap::new()
1047                    }
1048                }),
1049                statistics: index_statistics(value.statistics),
1050                partition_statistics: index_partition_statistics(value.partition_statistics),
1051                encryption_keys: encryption_keys
1052                    .map(|keys| {
1053                        HashMap::from_iter(keys.into_iter().map(|key| (key.key_id.clone(), key)))
1054                    })
1055                    .unwrap_or_default(),
1056                next_row_id,
1057            };
1058
1059            metadata.borrow_mut().try_normalize()?;
1060            Ok(metadata)
1061        }
1062    }
1063
1064    impl TryFrom<TableMetadataV2> for TableMetadata {
1065        type Error = Error;
1066        fn try_from(value: TableMetadataV2) -> Result<Self, self::Error> {
1067            let snapshots = value.snapshots;
1068            let value = value.shared;
1069            let current_snapshot_id = if value.current_snapshot_id == Some(EMPTY_SNAPSHOT_ID) {
1070                None
1071            } else {
1072                value.current_snapshot_id
1073            };
1074            let schemas = HashMap::from_iter(
1075                value
1076                    .schemas
1077                    .into_iter()
1078                    .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
1079                    .collect::<Result<Vec<_>, Error>>()?,
1080            );
1081
1082            let current_schema: &SchemaRef =
1083                schemas.get(&value.current_schema_id).ok_or_else(|| {
1084                    Error::new(
1085                        ErrorKind::DataInvalid,
1086                        format!(
1087                            "No schema exists with the current schema id {}.",
1088                            value.current_schema_id
1089                        ),
1090                    )
1091                })?;
1092            let partition_specs = HashMap::from_iter(
1093                value
1094                    .partition_specs
1095                    .into_iter()
1096                    .map(|x| (x.spec_id(), Arc::new(x))),
1097            );
1098            let default_spec_id = value.default_spec_id;
1099            let default_spec: PartitionSpecRef = partition_specs
1100                .get(&value.default_spec_id)
1101                .map(|spec| (**spec).clone())
1102                .or_else(|| {
1103                    (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
1104                        .then(PartitionSpec::unpartition_spec)
1105                })
1106                .ok_or_else(|| {
1107                    Error::new(
1108                        ErrorKind::DataInvalid,
1109                        format!("Default partition spec {default_spec_id} not found"),
1110                    )
1111                })?
1112                .into();
1113            let default_partition_type = default_spec.partition_type(current_schema)?;
1114
1115            let mut metadata = TableMetadata {
1116                format_version: FormatVersion::V2,
1117                table_uuid: value.table_uuid,
1118                location: value.location,
1119                last_sequence_number: value.last_sequence_number,
1120                last_updated_ms: value.last_updated_ms,
1121                last_column_id: value.last_column_id,
1122                current_schema_id: value.current_schema_id,
1123                schemas,
1124                partition_specs,
1125                default_partition_type,
1126                default_spec,
1127                last_partition_id: value.last_partition_id,
1128                properties: value.properties.unwrap_or_default(),
1129                current_snapshot_id,
1130                snapshots: snapshots
1131                    .map(|snapshots| {
1132                        HashMap::from_iter(
1133                            snapshots
1134                                .into_iter()
1135                                .map(|x| (x.snapshot_id, Arc::new(x.into()))),
1136                        )
1137                    })
1138                    .unwrap_or_default(),
1139                snapshot_log: value.snapshot_log.unwrap_or_default(),
1140                metadata_log: value.metadata_log.unwrap_or_default(),
1141                sort_orders: HashMap::from_iter(
1142                    value
1143                        .sort_orders
1144                        .into_iter()
1145                        .map(|x| (x.order_id, Arc::new(x))),
1146                ),
1147                default_sort_order_id: value.default_sort_order_id,
1148                refs: value.refs.unwrap_or_else(|| {
1149                    if let Some(snapshot_id) = current_snapshot_id {
1150                        HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1151                            snapshot_id,
1152                            retention: SnapshotRetention::Branch {
1153                                min_snapshots_to_keep: None,
1154                                max_snapshot_age_ms: None,
1155                                max_ref_age_ms: None,
1156                            },
1157                        })])
1158                    } else {
1159                        HashMap::new()
1160                    }
1161                }),
1162                statistics: index_statistics(value.statistics),
1163                partition_statistics: index_partition_statistics(value.partition_statistics),
1164                encryption_keys: HashMap::new(),
1165                next_row_id: INITIAL_ROW_ID,
1166            };
1167
1168            metadata.borrow_mut().try_normalize()?;
1169            Ok(metadata)
1170        }
1171    }
1172
1173    impl TryFrom<TableMetadataV1> for TableMetadata {
1174        type Error = Error;
1175        fn try_from(value: TableMetadataV1) -> Result<Self, Error> {
1176            let current_snapshot_id = if value.current_snapshot_id == Some(EMPTY_SNAPSHOT_ID) {
1177                None
1178            } else {
1179                value.current_snapshot_id
1180            };
1181
1182            let (schemas, current_schema_id, current_schema) =
1183                if let (Some(schemas_vec), Some(schema_id)) =
1184                    (&value.schemas, value.current_schema_id)
1185                {
1186                    // Option 1: Use 'schemas' + 'current_schema_id'
1187                    let schema_map = HashMap::from_iter(
1188                        schemas_vec
1189                            .clone()
1190                            .into_iter()
1191                            .map(|schema| {
1192                                let schema: Schema = schema.try_into()?;
1193                                Ok((schema.schema_id(), Arc::new(schema)))
1194                            })
1195                            .collect::<Result<Vec<_>, Error>>()?,
1196                    );
1197
1198                    let schema = schema_map
1199                        .get(&schema_id)
1200                        .ok_or_else(|| {
1201                            Error::new(
1202                                ErrorKind::DataInvalid,
1203                                format!("No schema exists with the current schema id {schema_id}."),
1204                            )
1205                        })?
1206                        .clone();
1207                    (schema_map, schema_id, schema)
1208                } else if let Some(schema) = value.schema {
1209                    // Option 2: Fall back to `schema`
1210                    let schema: Schema = schema.try_into()?;
1211                    let schema_id = schema.schema_id();
1212                    let schema_arc = Arc::new(schema);
1213                    let schema_map = HashMap::from_iter(vec![(schema_id, schema_arc.clone())]);
1214                    (schema_map, schema_id, schema_arc)
1215                } else {
1216                    // Option 3: No valid schema configuration found
1217                    return Err(Error::new(
1218                        ErrorKind::DataInvalid,
1219                        "No valid schema configuration found in table metadata",
1220                    ));
1221                };
1222
1223            // Prioritize 'partition_specs' over 'partition_spec'
1224            let partition_specs = if let Some(specs_vec) = value.partition_specs {
1225                // Option 1: Use 'partition_specs'
1226                specs_vec
1227                    .into_iter()
1228                    .map(|x| (x.spec_id(), Arc::new(x)))
1229                    .collect::<HashMap<_, _>>()
1230            } else if let Some(partition_spec) = value.partition_spec {
1231                // Option 2: Fall back to 'partition_spec'
1232                let spec = PartitionSpec::builder(current_schema.clone())
1233                    .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1234                    .add_unbound_fields(partition_spec.into_iter().map(|f| f.into_unbound()))?
1235                    .build()?;
1236
1237                HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1238            } else {
1239                // Option 3: Create empty partition spec
1240                let spec = PartitionSpec::builder(current_schema.clone())
1241                    .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1242                    .build()?;
1243
1244                HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1245            };
1246
1247            // Get the default_spec_id, prioritizing the explicit value if provided
1248            let default_spec_id = value
1249                .default_spec_id
1250                .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default());
1251
1252            // Get the default spec
1253            let default_spec: PartitionSpecRef = partition_specs
1254                .get(&default_spec_id)
1255                .map(|x| Arc::unwrap_or_clone(x.clone()))
1256                .ok_or_else(|| {
1257                    Error::new(
1258                        ErrorKind::DataInvalid,
1259                        format!("Default partition spec {default_spec_id} not found"),
1260                    )
1261                })?
1262                .into();
1263            let default_partition_type = default_spec.partition_type(&current_schema)?;
1264
1265            let mut metadata = TableMetadata {
1266                format_version: FormatVersion::V1,
1267                table_uuid: value.table_uuid.unwrap_or_default(),
1268                location: value.location,
1269                last_sequence_number: 0,
1270                last_updated_ms: value.last_updated_ms,
1271                last_column_id: value.last_column_id,
1272                current_schema_id,
1273                default_spec,
1274                default_partition_type,
1275                last_partition_id: value
1276                    .last_partition_id
1277                    .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()),
1278                partition_specs,
1279                schemas,
1280                properties: value.properties.unwrap_or_default(),
1281                current_snapshot_id,
1282                snapshots: value
1283                    .snapshots
1284                    .map(|snapshots| {
1285                        Ok::<_, Error>(HashMap::from_iter(
1286                            snapshots
1287                                .into_iter()
1288                                .map(|x| Ok((x.snapshot_id, Arc::new(x.try_into()?))))
1289                                .collect::<Result<Vec<_>, Error>>()?,
1290                        ))
1291                    })
1292                    .transpose()?
1293                    .unwrap_or_default(),
1294                snapshot_log: value.snapshot_log.unwrap_or_default(),
1295                metadata_log: value.metadata_log.unwrap_or_default(),
1296                sort_orders: match value.sort_orders {
1297                    Some(sort_orders) => HashMap::from_iter(
1298                        sort_orders.into_iter().map(|x| (x.order_id, Arc::new(x))),
1299                    ),
1300                    None => HashMap::new(),
1301                },
1302                default_sort_order_id: value
1303                    .default_sort_order_id
1304                    .unwrap_or(SortOrder::UNSORTED_ORDER_ID),
1305                refs: if let Some(snapshot_id) = current_snapshot_id {
1306                    HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1307                        snapshot_id,
1308                        retention: SnapshotRetention::Branch {
1309                            min_snapshots_to_keep: None,
1310                            max_snapshot_age_ms: None,
1311                            max_ref_age_ms: None,
1312                        },
1313                    })])
1314                } else {
1315                    HashMap::new()
1316                },
1317                statistics: index_statistics(value.statistics),
1318                partition_statistics: index_partition_statistics(value.partition_statistics),
1319                encryption_keys: HashMap::new(),
1320                next_row_id: INITIAL_ROW_ID, // v1 has no row lineage
1321            };
1322
1323            metadata.borrow_mut().try_normalize()?;
1324            Ok(metadata)
1325        }
1326    }
1327
1328    impl TryFrom<TableMetadata> for TableMetadataV3 {
1329        type Error = Error;
1330
1331        fn try_from(mut v: TableMetadata) -> Result<Self, Self::Error> {
1332            let next_row_id = v.next_row_id;
1333            let encryption_keys = std::mem::take(&mut v.encryption_keys);
1334            let snapshots = std::mem::take(&mut v.snapshots);
1335            let shared = v.into();
1336
1337            Ok(TableMetadataV3 {
1338                format_version: VersionNumber::<3>,
1339                shared,
1340                next_row_id,
1341                encryption_keys: if encryption_keys.is_empty() {
1342                    None
1343                } else {
1344                    Some(encryption_keys.into_values().collect())
1345                },
1346                snapshots: if snapshots.is_empty() {
1347                    None
1348                } else {
1349                    Some(
1350                        snapshots
1351                            .into_values()
1352                            .map(|s| SnapshotV3::try_from(Arc::unwrap_or_clone(s)))
1353                            .collect::<Result<_, _>>()?,
1354                    )
1355                },
1356            })
1357        }
1358    }
1359
1360    impl From<TableMetadata> for TableMetadataV2 {
1361        fn from(mut v: TableMetadata) -> Self {
1362            let snapshots = std::mem::take(&mut v.snapshots);
1363            let shared = v.into();
1364
1365            TableMetadataV2 {
1366                format_version: VersionNumber::<2>,
1367                shared,
1368                snapshots: if snapshots.is_empty() {
1369                    None
1370                } else {
1371                    Some(
1372                        snapshots
1373                            .into_values()
1374                            .map(|s| SnapshotV2::from(Arc::unwrap_or_clone(s)))
1375                            .collect(),
1376                    )
1377                },
1378            }
1379        }
1380    }
1381
1382    impl From<TableMetadata> for TableMetadataV2V3Shared {
1383        fn from(v: TableMetadata) -> Self {
1384            TableMetadataV2V3Shared {
1385                table_uuid: v.table_uuid,
1386                location: v.location,
1387                last_sequence_number: v.last_sequence_number,
1388                last_updated_ms: v.last_updated_ms,
1389                last_column_id: v.last_column_id,
1390                schemas: v
1391                    .schemas
1392                    .into_values()
1393                    .map(|x| {
1394                        Arc::try_unwrap(x)
1395                            .unwrap_or_else(|schema| schema.as_ref().clone())
1396                            .into()
1397                    })
1398                    .collect(),
1399                current_schema_id: v.current_schema_id,
1400                partition_specs: v
1401                    .partition_specs
1402                    .into_values()
1403                    .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1404                    .collect(),
1405                default_spec_id: v.default_spec.spec_id(),
1406                last_partition_id: v.last_partition_id,
1407                properties: if v.properties.is_empty() {
1408                    None
1409                } else {
1410                    Some(v.properties)
1411                },
1412                current_snapshot_id: v.current_snapshot_id,
1413                snapshot_log: if v.snapshot_log.is_empty() {
1414                    None
1415                } else {
1416                    Some(v.snapshot_log)
1417                },
1418                metadata_log: if v.metadata_log.is_empty() {
1419                    None
1420                } else {
1421                    Some(v.metadata_log)
1422                },
1423                sort_orders: v
1424                    .sort_orders
1425                    .into_values()
1426                    .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1427                    .collect(),
1428                default_sort_order_id: v.default_sort_order_id,
1429                refs: Some(v.refs),
1430                statistics: v.statistics.into_values().collect(),
1431                partition_statistics: v.partition_statistics.into_values().collect(),
1432            }
1433        }
1434    }
1435
1436    impl TryFrom<TableMetadata> for TableMetadataV1 {
1437        type Error = Error;
1438        fn try_from(v: TableMetadata) -> Result<Self, Error> {
1439            Ok(TableMetadataV1 {
1440                format_version: VersionNumber::<1>,
1441                table_uuid: Some(v.table_uuid),
1442                location: v.location,
1443                last_updated_ms: v.last_updated_ms,
1444                last_column_id: v.last_column_id,
1445                schema: Some(
1446                    v.schemas
1447                        .get(&v.current_schema_id)
1448                        .ok_or(Error::new(
1449                            ErrorKind::Unexpected,
1450                            "current_schema_id not found in schemas",
1451                        ))?
1452                        .as_ref()
1453                        .clone()
1454                        .into(),
1455                ),
1456                schemas: Some(
1457                    v.schemas
1458                        .into_values()
1459                        .map(|x| {
1460                            Arc::try_unwrap(x)
1461                                .unwrap_or_else(|schema| schema.as_ref().clone())
1462                                .into()
1463                        })
1464                        .collect(),
1465                ),
1466                current_schema_id: Some(v.current_schema_id),
1467                partition_spec: Some(v.default_spec.fields().to_vec()),
1468                partition_specs: Some(
1469                    v.partition_specs
1470                        .into_values()
1471                        .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1472                        .collect(),
1473                ),
1474                default_spec_id: Some(v.default_spec.spec_id()),
1475                last_partition_id: Some(v.last_partition_id),
1476                properties: if v.properties.is_empty() {
1477                    None
1478                } else {
1479                    Some(v.properties)
1480                },
1481                current_snapshot_id: v.current_snapshot_id,
1482                snapshots: if v.snapshots.is_empty() {
1483                    None
1484                } else {
1485                    Some(
1486                        v.snapshots
1487                            .into_values()
1488                            .map(|x| Snapshot::clone(&x).into())
1489                            .collect(),
1490                    )
1491                },
1492                snapshot_log: if v.snapshot_log.is_empty() {
1493                    None
1494                } else {
1495                    Some(v.snapshot_log)
1496                },
1497                metadata_log: if v.metadata_log.is_empty() {
1498                    None
1499                } else {
1500                    Some(v.metadata_log)
1501                },
1502                sort_orders: Some(
1503                    v.sort_orders
1504                        .into_values()
1505                        .map(|s| Arc::try_unwrap(s).unwrap_or_else(|s| s.as_ref().clone()))
1506                        .collect(),
1507                ),
1508                default_sort_order_id: Some(v.default_sort_order_id),
1509                statistics: v.statistics.into_values().collect(),
1510                partition_statistics: v.partition_statistics.into_values().collect(),
1511            })
1512        }
1513    }
1514
1515    fn index_statistics(statistics: Vec<StatisticsFile>) -> HashMap<i64, StatisticsFile> {
1516        statistics
1517            .into_iter()
1518            .rev()
1519            .map(|s| (s.snapshot_id, s))
1520            .collect()
1521    }
1522
1523    fn index_partition_statistics(
1524        statistics: Vec<PartitionStatisticsFile>,
1525    ) -> HashMap<i64, PartitionStatisticsFile> {
1526        statistics
1527            .into_iter()
1528            .rev()
1529            .map(|s| (s.snapshot_id, s))
1530            .collect()
1531    }
1532}
1533
1534#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy, Hash)]
1535#[repr(u8)]
1536/// Iceberg format version
1537pub enum FormatVersion {
1538    /// Iceberg spec version 1
1539    V1 = 1u8,
1540    /// Iceberg spec version 2
1541    V2 = 2u8,
1542    /// Iceberg spec version 3
1543    V3 = 3u8,
1544}
1545
1546impl PartialOrd for FormatVersion {
1547    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1548        Some(self.cmp(other))
1549    }
1550}
1551
1552impl Ord for FormatVersion {
1553    fn cmp(&self, other: &Self) -> Ordering {
1554        (*self as u8).cmp(&(*other as u8))
1555    }
1556}
1557
1558impl Display for FormatVersion {
1559    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1560        match self {
1561            FormatVersion::V1 => write!(f, "v1"),
1562            FormatVersion::V2 => write!(f, "v2"),
1563            FormatVersion::V3 => write!(f, "v3"),
1564        }
1565    }
1566}
1567
1568#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1569#[serde(rename_all = "kebab-case")]
1570/// Encodes changes to the previous metadata files for the table
1571pub struct MetadataLog {
1572    /// The file for the log.
1573    pub metadata_file: String,
1574    /// Time new metadata was created
1575    pub timestamp_ms: i64,
1576}
1577
1578#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1579#[serde(rename_all = "kebab-case")]
1580/// A log of when each snapshot was made.
1581pub struct SnapshotLog {
1582    /// Id of the snapshot.
1583    pub snapshot_id: i64,
1584    /// Last updated timestamp
1585    pub timestamp_ms: i64,
1586}
1587
1588impl SnapshotLog {
1589    /// Returns the last updated timestamp as a DateTime<Utc> with millisecond precision
1590    pub fn timestamp(self) -> Result<DateTime<Utc>> {
1591        timestamp_ms_to_utc(self.timestamp_ms)
1592    }
1593
1594    /// Returns the timestamp in milliseconds
1595    #[inline]
1596    pub fn timestamp_ms(&self) -> i64 {
1597        self.timestamp_ms
1598    }
1599}
1600
1601#[cfg(test)]
1602mod tests {
1603    use std::collections::HashMap;
1604    use std::fs;
1605    use std::sync::Arc;
1606
1607    use anyhow::Result;
1608    use base64::Engine as _;
1609    use pretty_assertions::assert_eq;
1610    use tempfile::TempDir;
1611    use uuid::Uuid;
1612
1613    use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder};
1614    use crate::catalog::MetadataLocation;
1615    use crate::compression::CompressionCodec;
1616    use crate::io::FileIO;
1617    use crate::spec::table_metadata::TableMetadata;
1618    use crate::spec::{
1619        BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation,
1620        PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot,
1621        SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile,
1622        Summary, TableProperties, Transform, Type, UnboundPartitionField,
1623    };
1624    use crate::{ErrorKind, TableCreation};
1625
1626    fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
1627        let desered_type: TableMetadata = serde_json::from_str(json).unwrap();
1628        assert_eq!(desered_type, expected_type);
1629
1630        let sered_json = serde_json::to_string(&expected_type).unwrap();
1631        let parsed_json_value = serde_json::from_str::<TableMetadata>(&sered_json).unwrap();
1632
1633        assert_eq!(parsed_json_value, desered_type);
1634    }
1635
1636    fn get_test_table_metadata(file_name: &str) -> TableMetadata {
1637        let path = format!("testdata/table_metadata/{file_name}");
1638        let metadata: String = fs::read_to_string(path).unwrap();
1639
1640        serde_json::from_str(&metadata).unwrap()
1641    }
1642
1643    #[test]
1644    fn test_table_data_v2() {
1645        let data = r#"
1646            {
1647                "format-version" : 2,
1648                "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1649                "location": "s3://b/wh/data.db/table",
1650                "last-sequence-number" : 1,
1651                "last-updated-ms": 1515100955770,
1652                "last-column-id": 1,
1653                "schemas": [
1654                    {
1655                        "schema-id" : 1,
1656                        "type" : "struct",
1657                        "fields" :[
1658                            {
1659                                "id": 1,
1660                                "name": "struct_name",
1661                                "required": true,
1662                                "type": "fixed[1]"
1663                            },
1664                            {
1665                                "id": 4,
1666                                "name": "ts",
1667                                "required": true,
1668                                "type": "timestamp"
1669                            }
1670                        ]
1671                    }
1672                ],
1673                "current-schema-id" : 1,
1674                "partition-specs": [
1675                    {
1676                        "spec-id": 0,
1677                        "fields": [
1678                            {
1679                                "source-id": 4,
1680                                "field-id": 1000,
1681                                "name": "ts_day",
1682                                "transform": "day"
1683                            }
1684                        ]
1685                    }
1686                ],
1687                "default-spec-id": 0,
1688                "last-partition-id": 1000,
1689                "properties": {
1690                    "commit.retry.num-retries": "1"
1691                },
1692                "metadata-log": [
1693                    {
1694                        "metadata-file": "s3://bucket/.../v1.json",
1695                        "timestamp-ms": 1515100
1696                    }
1697                ],
1698                "refs": {},
1699                "sort-orders": [
1700                    {
1701                    "order-id": 0,
1702                    "fields": []
1703                    }
1704                ],
1705                "default-sort-order-id": 0
1706            }
1707        "#;
1708
1709        let schema = Schema::builder()
1710            .with_schema_id(1)
1711            .with_fields(vec![
1712                Arc::new(NestedField::required(
1713                    1,
1714                    "struct_name",
1715                    Type::Primitive(PrimitiveType::Fixed(1)),
1716                )),
1717                Arc::new(NestedField::required(
1718                    4,
1719                    "ts",
1720                    Type::Primitive(PrimitiveType::Timestamp),
1721                )),
1722            ])
1723            .build()
1724            .unwrap();
1725
1726        let partition_spec = PartitionSpec::builder(schema.clone())
1727            .with_spec_id(0)
1728            .add_unbound_field(UnboundPartitionField {
1729                name: "ts_day".to_string(),
1730                transform: Transform::Day,
1731                source_id: 4,
1732                field_id: Some(1000),
1733            })
1734            .unwrap()
1735            .build()
1736            .unwrap();
1737
1738        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1739        let expected = TableMetadata {
1740            format_version: FormatVersion::V2,
1741            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1742            location: "s3://b/wh/data.db/table".to_string(),
1743            last_updated_ms: 1515100955770,
1744            last_column_id: 1,
1745            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1746            current_schema_id: 1,
1747            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1748            default_partition_type,
1749            default_spec: partition_spec.into(),
1750            last_partition_id: 1000,
1751            default_sort_order_id: 0,
1752            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1753            snapshots: HashMap::default(),
1754            current_snapshot_id: None,
1755            last_sequence_number: 1,
1756            properties: HashMap::from_iter(vec![(
1757                "commit.retry.num-retries".to_string(),
1758                "1".to_string(),
1759            )]),
1760            snapshot_log: Vec::new(),
1761            metadata_log: vec![MetadataLog {
1762                metadata_file: "s3://bucket/.../v1.json".to_string(),
1763                timestamp_ms: 1515100,
1764            }],
1765            refs: HashMap::new(),
1766            statistics: HashMap::new(),
1767            partition_statistics: HashMap::new(),
1768            encryption_keys: HashMap::new(),
1769            next_row_id: INITIAL_ROW_ID,
1770        };
1771
1772        let expected_json_value = serde_json::to_value(&expected).unwrap();
1773        check_table_metadata_serde(data, expected);
1774
1775        let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1776        assert_eq!(json_value, expected_json_value);
1777    }
1778
1779    #[test]
1780    fn test_table_data_v3() {
1781        let data = r#"
1782            {
1783                "format-version" : 3,
1784                "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1785                "location": "s3://b/wh/data.db/table",
1786                "last-sequence-number" : 1,
1787                "last-updated-ms": 1515100955770,
1788                "last-column-id": 1,
1789                "next-row-id": 5,
1790                "schemas": [
1791                    {
1792                        "schema-id" : 1,
1793                        "type" : "struct",
1794                        "fields" :[
1795                            {
1796                                "id": 4,
1797                                "name": "ts",
1798                                "required": true,
1799                                "type": "timestamp"
1800                            }
1801                        ]
1802                    }
1803                ],
1804                "current-schema-id" : 1,
1805                "partition-specs": [
1806                    {
1807                        "spec-id": 0,
1808                        "fields": [
1809                            {
1810                                "source-id": 4,
1811                                "field-id": 1000,
1812                                "name": "ts_day",
1813                                "transform": "day"
1814                            }
1815                        ]
1816                    }
1817                ],
1818                "default-spec-id": 0,
1819                "last-partition-id": 1000,
1820                "properties": {
1821                    "commit.retry.num-retries": "1"
1822                },
1823                "metadata-log": [
1824                    {
1825                        "metadata-file": "s3://bucket/.../v1.json",
1826                        "timestamp-ms": 1515100
1827                    }
1828                ],
1829                "refs": {},
1830                "snapshots" : [ {
1831                    "snapshot-id" : 1,
1832                    "timestamp-ms" : 1662532818843,
1833                    "sequence-number" : 0,
1834                    "first-row-id" : 0,
1835                    "added-rows" : 4,
1836                    "key-id" : "key1",
1837                    "summary" : {
1838                        "operation" : "append"
1839                    },
1840                    "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1841                    "schema-id" : 0
1842                    }
1843                ],
1844                "encryption-keys": [
1845                    {
1846                        "key-id": "key1",
1847                        "encrypted-by-id": "KMS",
1848                        "encrypted-key-metadata": "c29tZS1lbmNyeXB0aW9uLWtleQ==",
1849                        "properties": {
1850                            "p1": "v1"
1851                        }
1852                    }
1853                ],
1854                "sort-orders": [
1855                    {
1856                    "order-id": 0,
1857                    "fields": []
1858                    }
1859                ],
1860                "default-sort-order-id": 0
1861            }
1862        "#;
1863
1864        let schema = Schema::builder()
1865            .with_schema_id(1)
1866            .with_fields(vec![Arc::new(NestedField::required(
1867                4,
1868                "ts",
1869                Type::Primitive(PrimitiveType::Timestamp),
1870            ))])
1871            .build()
1872            .unwrap();
1873
1874        let partition_spec = PartitionSpec::builder(schema.clone())
1875            .with_spec_id(0)
1876            .add_unbound_field(UnboundPartitionField {
1877                name: "ts_day".to_string(),
1878                transform: Transform::Day,
1879                source_id: 4,
1880                field_id: Some(1000),
1881            })
1882            .unwrap()
1883            .build()
1884            .unwrap();
1885
1886        let snapshot = Snapshot::builder()
1887            .with_snapshot_id(1)
1888            .with_timestamp_ms(1662532818843)
1889            .with_sequence_number(0)
1890            .with_row_range(0, 4)
1891            .with_encryption_key_id(Some("key1".to_string()))
1892            .with_summary(Summary {
1893                operation: Operation::Append,
1894                additional_properties: HashMap::new(),
1895            })
1896            .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())
1897            .with_schema_id(0)
1898            .build();
1899
1900        let encryption_key = EncryptedKey::builder()
1901            .key_id("key1".to_string())
1902            .encrypted_by_id("KMS".to_string())
1903            .encrypted_key_metadata(
1904                base64::prelude::BASE64_STANDARD
1905                    .decode("c29tZS1lbmNyeXB0aW9uLWtleQ==")
1906                    .unwrap(),
1907            )
1908            .properties(HashMap::from_iter(vec![(
1909                "p1".to_string(),
1910                "v1".to_string(),
1911            )]))
1912            .build();
1913
1914        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1915        let expected = TableMetadata {
1916            format_version: FormatVersion::V3,
1917            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1918            location: "s3://b/wh/data.db/table".to_string(),
1919            last_updated_ms: 1515100955770,
1920            last_column_id: 1,
1921            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1922            current_schema_id: 1,
1923            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1924            default_partition_type,
1925            default_spec: partition_spec.into(),
1926            last_partition_id: 1000,
1927            default_sort_order_id: 0,
1928            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1929            snapshots: HashMap::from_iter(vec![(1, snapshot.into())]),
1930            current_snapshot_id: None,
1931            last_sequence_number: 1,
1932            properties: HashMap::from_iter(vec![(
1933                "commit.retry.num-retries".to_string(),
1934                "1".to_string(),
1935            )]),
1936            snapshot_log: Vec::new(),
1937            metadata_log: vec![MetadataLog {
1938                metadata_file: "s3://bucket/.../v1.json".to_string(),
1939                timestamp_ms: 1515100,
1940            }],
1941            refs: HashMap::new(),
1942            statistics: HashMap::new(),
1943            partition_statistics: HashMap::new(),
1944            encryption_keys: HashMap::from_iter(vec![("key1".to_string(), encryption_key)]),
1945            next_row_id: 5,
1946        };
1947
1948        let expected_json_value = serde_json::to_value(&expected).unwrap();
1949        check_table_metadata_serde(data, expected);
1950
1951        let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1952        assert_eq!(json_value, expected_json_value);
1953    }
1954
1955    #[test]
1956    fn test_table_data_v1() {
1957        let data = r#"
1958        {
1959            "format-version" : 1,
1960            "table-uuid" : "df838b92-0b32-465d-a44e-d39936e538b7",
1961            "location" : "/home/iceberg/warehouse/nyc/taxis",
1962            "last-updated-ms" : 1662532818843,
1963            "last-column-id" : 5,
1964            "schema" : {
1965              "type" : "struct",
1966              "schema-id" : 0,
1967              "fields" : [ {
1968                "id" : 1,
1969                "name" : "vendor_id",
1970                "required" : false,
1971                "type" : "long"
1972              }, {
1973                "id" : 2,
1974                "name" : "trip_id",
1975                "required" : false,
1976                "type" : "long"
1977              }, {
1978                "id" : 3,
1979                "name" : "trip_distance",
1980                "required" : false,
1981                "type" : "float"
1982              }, {
1983                "id" : 4,
1984                "name" : "fare_amount",
1985                "required" : false,
1986                "type" : "double"
1987              }, {
1988                "id" : 5,
1989                "name" : "store_and_fwd_flag",
1990                "required" : false,
1991                "type" : "string"
1992              } ]
1993            },
1994            "partition-spec" : [ {
1995              "name" : "vendor_id",
1996              "transform" : "identity",
1997              "source-id" : 1,
1998              "field-id" : 1000
1999            } ],
2000            "last-partition-id" : 1000,
2001            "default-sort-order-id" : 0,
2002            "sort-orders" : [ {
2003              "order-id" : 0,
2004              "fields" : [ ]
2005            } ],
2006            "properties" : {
2007              "owner" : "root"
2008            },
2009            "current-snapshot-id" : 638933773299822130,
2010            "refs" : {
2011              "main" : {
2012                "snapshot-id" : 638933773299822130,
2013                "type" : "branch"
2014              }
2015            },
2016            "snapshots" : [ {
2017              "snapshot-id" : 638933773299822130,
2018              "timestamp-ms" : 1662532818843,
2019              "sequence-number" : 0,
2020              "summary" : {
2021                "operation" : "append",
2022                "spark.app.id" : "local-1662532784305",
2023                "added-data-files" : "4",
2024                "added-records" : "4",
2025                "added-files-size" : "6001"
2026              },
2027              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2028              "schema-id" : 0
2029            } ],
2030            "snapshot-log" : [ {
2031              "timestamp-ms" : 1662532818843,
2032              "snapshot-id" : 638933773299822130
2033            } ],
2034            "metadata-log" : [ {
2035              "timestamp-ms" : 1662532805245,
2036              "metadata-file" : "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json"
2037            } ]
2038          }
2039        "#;
2040
2041        let schema = Schema::builder()
2042            .with_fields(vec![
2043                Arc::new(NestedField::optional(
2044                    1,
2045                    "vendor_id",
2046                    Type::Primitive(PrimitiveType::Long),
2047                )),
2048                Arc::new(NestedField::optional(
2049                    2,
2050                    "trip_id",
2051                    Type::Primitive(PrimitiveType::Long),
2052                )),
2053                Arc::new(NestedField::optional(
2054                    3,
2055                    "trip_distance",
2056                    Type::Primitive(PrimitiveType::Float),
2057                )),
2058                Arc::new(NestedField::optional(
2059                    4,
2060                    "fare_amount",
2061                    Type::Primitive(PrimitiveType::Double),
2062                )),
2063                Arc::new(NestedField::optional(
2064                    5,
2065                    "store_and_fwd_flag",
2066                    Type::Primitive(PrimitiveType::String),
2067                )),
2068            ])
2069            .build()
2070            .unwrap();
2071
2072        let schema = Arc::new(schema);
2073        let partition_spec = PartitionSpec::builder(schema.clone())
2074            .with_spec_id(0)
2075            .add_partition_field("vendor_id", "vendor_id", Transform::Identity)
2076            .unwrap()
2077            .build()
2078            .unwrap();
2079
2080        let sort_order = SortOrder::builder()
2081            .with_order_id(0)
2082            .build_unbound()
2083            .unwrap();
2084
2085        let snapshot = Snapshot::builder()
2086            .with_snapshot_id(638933773299822130)
2087            .with_timestamp_ms(1662532818843)
2088            .with_sequence_number(0)
2089            .with_schema_id(0)
2090            .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
2091            .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
2092            .build();
2093
2094        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2095        let expected = TableMetadata {
2096            format_version: FormatVersion::V1,
2097            table_uuid: Uuid::parse_str("df838b92-0b32-465d-a44e-d39936e538b7").unwrap(),
2098            location: "/home/iceberg/warehouse/nyc/taxis".to_string(),
2099            last_updated_ms: 1662532818843,
2100            last_column_id: 5,
2101            schemas: HashMap::from_iter(vec![(0, schema)]),
2102            current_schema_id: 0,
2103            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2104            default_partition_type,
2105            default_spec: Arc::new(partition_spec),
2106            last_partition_id: 1000,
2107            default_sort_order_id: 0,
2108            sort_orders: HashMap::from_iter(vec![(0, sort_order.into())]),
2109            snapshots: HashMap::from_iter(vec![(638933773299822130, Arc::new(snapshot))]),
2110            current_snapshot_id: Some(638933773299822130),
2111            last_sequence_number: 0,
2112            properties: HashMap::from_iter(vec![("owner".to_string(), "root".to_string())]),
2113            snapshot_log: vec![SnapshotLog {
2114                snapshot_id: 638933773299822130,
2115                timestamp_ms: 1662532818843,
2116            }],
2117            metadata_log: vec![MetadataLog { metadata_file: "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string(), timestamp_ms: 1662532805245 }],
2118            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference { snapshot_id: 638933773299822130, retention: SnapshotRetention::Branch { min_snapshots_to_keep: None, max_snapshot_age_ms: None, max_ref_age_ms: None } })]),
2119            statistics: HashMap::new(),
2120            partition_statistics: HashMap::new(),
2121            encryption_keys: HashMap::new(),
2122            next_row_id: INITIAL_ROW_ID,
2123        };
2124
2125        check_table_metadata_serde(data, expected);
2126    }
2127
2128    #[test]
2129    fn test_table_data_v2_no_snapshots() {
2130        let data = r#"
2131        {
2132            "format-version" : 2,
2133            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2134            "location": "s3://b/wh/data.db/table",
2135            "last-sequence-number" : 1,
2136            "last-updated-ms": 1515100955770,
2137            "last-column-id": 1,
2138            "schemas": [
2139                {
2140                    "schema-id" : 1,
2141                    "type" : "struct",
2142                    "fields" :[
2143                        {
2144                            "id": 1,
2145                            "name": "struct_name",
2146                            "required": true,
2147                            "type": "fixed[1]"
2148                        }
2149                    ]
2150                }
2151            ],
2152            "current-schema-id" : 1,
2153            "partition-specs": [
2154                {
2155                    "spec-id": 0,
2156                    "fields": []
2157                }
2158            ],
2159            "refs": {},
2160            "default-spec-id": 0,
2161            "last-partition-id": 1000,
2162            "metadata-log": [
2163                {
2164                    "metadata-file": "s3://bucket/.../v1.json",
2165                    "timestamp-ms": 1515100
2166                }
2167            ],
2168            "sort-orders": [
2169                {
2170                "order-id": 0,
2171                "fields": []
2172                }
2173            ],
2174            "default-sort-order-id": 0
2175        }
2176        "#;
2177
2178        let schema = Schema::builder()
2179            .with_schema_id(1)
2180            .with_fields(vec![Arc::new(NestedField::required(
2181                1,
2182                "struct_name",
2183                Type::Primitive(PrimitiveType::Fixed(1)),
2184            ))])
2185            .build()
2186            .unwrap();
2187
2188        let partition_spec = PartitionSpec::builder(schema.clone())
2189            .with_spec_id(0)
2190            .build()
2191            .unwrap();
2192
2193        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2194        let expected = TableMetadata {
2195            format_version: FormatVersion::V2,
2196            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
2197            location: "s3://b/wh/data.db/table".to_string(),
2198            last_updated_ms: 1515100955770,
2199            last_column_id: 1,
2200            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
2201            current_schema_id: 1,
2202            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2203            default_partition_type,
2204            default_spec: partition_spec.into(),
2205            last_partition_id: 1000,
2206            default_sort_order_id: 0,
2207            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2208            snapshots: HashMap::default(),
2209            current_snapshot_id: None,
2210            last_sequence_number: 1,
2211            properties: HashMap::new(),
2212            snapshot_log: Vec::new(),
2213            metadata_log: vec![MetadataLog {
2214                metadata_file: "s3://bucket/.../v1.json".to_string(),
2215                timestamp_ms: 1515100,
2216            }],
2217            refs: HashMap::new(),
2218            statistics: HashMap::new(),
2219            partition_statistics: HashMap::new(),
2220            encryption_keys: HashMap::new(),
2221            next_row_id: INITIAL_ROW_ID,
2222        };
2223
2224        let expected_json_value = serde_json::to_value(&expected).unwrap();
2225        check_table_metadata_serde(data, expected);
2226
2227        let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
2228        assert_eq!(json_value, expected_json_value);
2229    }
2230
2231    #[test]
2232    fn test_current_snapshot_id_must_match_main_branch() {
2233        let data = r#"
2234        {
2235            "format-version" : 2,
2236            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2237            "location": "s3://b/wh/data.db/table",
2238            "last-sequence-number" : 1,
2239            "last-updated-ms": 1515100955770,
2240            "last-column-id": 1,
2241            "schemas": [
2242                {
2243                    "schema-id" : 1,
2244                    "type" : "struct",
2245                    "fields" :[
2246                        {
2247                            "id": 1,
2248                            "name": "struct_name",
2249                            "required": true,
2250                            "type": "fixed[1]"
2251                        },
2252                        {
2253                            "id": 4,
2254                            "name": "ts",
2255                            "required": true,
2256                            "type": "timestamp"
2257                        }
2258                    ]
2259                }
2260            ],
2261            "current-schema-id" : 1,
2262            "partition-specs": [
2263                {
2264                    "spec-id": 0,
2265                    "fields": [
2266                        {
2267                            "source-id": 4,
2268                            "field-id": 1000,
2269                            "name": "ts_day",
2270                            "transform": "day"
2271                        }
2272                    ]
2273                }
2274            ],
2275            "default-spec-id": 0,
2276            "last-partition-id": 1000,
2277            "properties": {
2278                "commit.retry.num-retries": "1"
2279            },
2280            "metadata-log": [
2281                {
2282                    "metadata-file": "s3://bucket/.../v1.json",
2283                    "timestamp-ms": 1515100
2284                }
2285            ],
2286            "sort-orders": [
2287                {
2288                "order-id": 0,
2289                "fields": []
2290                }
2291            ],
2292            "default-sort-order-id": 0,
2293            "current-snapshot-id" : 1,
2294            "refs" : {
2295              "main" : {
2296                "snapshot-id" : 2,
2297                "type" : "branch"
2298              }
2299            },
2300            "snapshots" : [ {
2301              "snapshot-id" : 1,
2302              "timestamp-ms" : 1662532818843,
2303              "sequence-number" : 0,
2304              "summary" : {
2305                "operation" : "append",
2306                "spark.app.id" : "local-1662532784305",
2307                "added-data-files" : "4",
2308                "added-records" : "4",
2309                "added-files-size" : "6001"
2310              },
2311              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2312              "schema-id" : 0
2313            },
2314            {
2315              "snapshot-id" : 2,
2316              "timestamp-ms" : 1662532818844,
2317              "sequence-number" : 0,
2318              "summary" : {
2319                "operation" : "append",
2320                "spark.app.id" : "local-1662532784305",
2321                "added-data-files" : "4",
2322                "added-records" : "4",
2323                "added-files-size" : "6001"
2324              },
2325              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2326              "schema-id" : 0
2327            } ]
2328        }
2329    "#;
2330
2331        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2332        assert!(
2333            err.to_string()
2334                .contains("Current snapshot id does not match main branch")
2335        );
2336    }
2337
2338    #[test]
2339    fn test_main_without_current() {
2340        let data = r#"
2341        {
2342            "format-version" : 2,
2343            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2344            "location": "s3://b/wh/data.db/table",
2345            "last-sequence-number" : 1,
2346            "last-updated-ms": 1515100955770,
2347            "last-column-id": 1,
2348            "schemas": [
2349                {
2350                    "schema-id" : 1,
2351                    "type" : "struct",
2352                    "fields" :[
2353                        {
2354                            "id": 1,
2355                            "name": "struct_name",
2356                            "required": true,
2357                            "type": "fixed[1]"
2358                        },
2359                        {
2360                            "id": 4,
2361                            "name": "ts",
2362                            "required": true,
2363                            "type": "timestamp"
2364                        }
2365                    ]
2366                }
2367            ],
2368            "current-schema-id" : 1,
2369            "partition-specs": [
2370                {
2371                    "spec-id": 0,
2372                    "fields": [
2373                        {
2374                            "source-id": 4,
2375                            "field-id": 1000,
2376                            "name": "ts_day",
2377                            "transform": "day"
2378                        }
2379                    ]
2380                }
2381            ],
2382            "default-spec-id": 0,
2383            "last-partition-id": 1000,
2384            "properties": {
2385                "commit.retry.num-retries": "1"
2386            },
2387            "metadata-log": [
2388                {
2389                    "metadata-file": "s3://bucket/.../v1.json",
2390                    "timestamp-ms": 1515100
2391                }
2392            ],
2393            "sort-orders": [
2394                {
2395                "order-id": 0,
2396                "fields": []
2397                }
2398            ],
2399            "default-sort-order-id": 0,
2400            "refs" : {
2401              "main" : {
2402                "snapshot-id" : 1,
2403                "type" : "branch"
2404              }
2405            },
2406            "snapshots" : [ {
2407              "snapshot-id" : 1,
2408              "timestamp-ms" : 1662532818843,
2409              "sequence-number" : 0,
2410              "summary" : {
2411                "operation" : "append",
2412                "spark.app.id" : "local-1662532784305",
2413                "added-data-files" : "4",
2414                "added-records" : "4",
2415                "added-files-size" : "6001"
2416              },
2417              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2418              "schema-id" : 0
2419            } ]
2420        }
2421    "#;
2422
2423        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2424        assert!(
2425            err.to_string()
2426                .contains("Current snapshot is not set, but main branch exists")
2427        );
2428    }
2429
2430    #[test]
2431    fn test_branch_snapshot_missing() {
2432        let data = r#"
2433        {
2434            "format-version" : 2,
2435            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2436            "location": "s3://b/wh/data.db/table",
2437            "last-sequence-number" : 1,
2438            "last-updated-ms": 1515100955770,
2439            "last-column-id": 1,
2440            "schemas": [
2441                {
2442                    "schema-id" : 1,
2443                    "type" : "struct",
2444                    "fields" :[
2445                        {
2446                            "id": 1,
2447                            "name": "struct_name",
2448                            "required": true,
2449                            "type": "fixed[1]"
2450                        },
2451                        {
2452                            "id": 4,
2453                            "name": "ts",
2454                            "required": true,
2455                            "type": "timestamp"
2456                        }
2457                    ]
2458                }
2459            ],
2460            "current-schema-id" : 1,
2461            "partition-specs": [
2462                {
2463                    "spec-id": 0,
2464                    "fields": [
2465                        {
2466                            "source-id": 4,
2467                            "field-id": 1000,
2468                            "name": "ts_day",
2469                            "transform": "day"
2470                        }
2471                    ]
2472                }
2473            ],
2474            "default-spec-id": 0,
2475            "last-partition-id": 1000,
2476            "properties": {
2477                "commit.retry.num-retries": "1"
2478            },
2479            "metadata-log": [
2480                {
2481                    "metadata-file": "s3://bucket/.../v1.json",
2482                    "timestamp-ms": 1515100
2483                }
2484            ],
2485            "sort-orders": [
2486                {
2487                "order-id": 0,
2488                "fields": []
2489                }
2490            ],
2491            "default-sort-order-id": 0,
2492            "refs" : {
2493              "main" : {
2494                "snapshot-id" : 1,
2495                "type" : "branch"
2496              },
2497              "foo" : {
2498                "snapshot-id" : 2,
2499                "type" : "branch"
2500              }
2501            },
2502            "snapshots" : [ {
2503              "snapshot-id" : 1,
2504              "timestamp-ms" : 1662532818843,
2505              "sequence-number" : 0,
2506              "summary" : {
2507                "operation" : "append",
2508                "spark.app.id" : "local-1662532784305",
2509                "added-data-files" : "4",
2510                "added-records" : "4",
2511                "added-files-size" : "6001"
2512              },
2513              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2514              "schema-id" : 0
2515            } ]
2516        }
2517    "#;
2518
2519        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2520        assert!(
2521            err.to_string().contains(
2522                "Snapshot for reference foo does not exist in the existing snapshots list"
2523            )
2524        );
2525    }
2526
2527    #[test]
2528    fn test_v2_wrong_max_snapshot_sequence_number() {
2529        let data = r#"
2530        {
2531            "format-version": 2,
2532            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2533            "location": "s3://bucket/test/location",
2534            "last-sequence-number": 1,
2535            "last-updated-ms": 1602638573590,
2536            "last-column-id": 3,
2537            "current-schema-id": 0,
2538            "schemas": [
2539                {
2540                    "type": "struct",
2541                    "schema-id": 0,
2542                    "fields": [
2543                        {
2544                            "id": 1,
2545                            "name": "x",
2546                            "required": true,
2547                            "type": "long"
2548                        }
2549                    ]
2550                }
2551            ],
2552            "default-spec-id": 0,
2553            "partition-specs": [
2554                {
2555                    "spec-id": 0,
2556                    "fields": []
2557                }
2558            ],
2559            "last-partition-id": 1000,
2560            "default-sort-order-id": 0,
2561            "sort-orders": [
2562                {
2563                    "order-id": 0,
2564                    "fields": []
2565                }
2566            ],
2567            "properties": {},
2568            "current-snapshot-id": 3055729675574597004,
2569            "snapshots": [
2570                {
2571                    "snapshot-id": 3055729675574597004,
2572                    "timestamp-ms": 1555100955770,
2573                    "sequence-number": 4,
2574                    "summary": {
2575                        "operation": "append"
2576                    },
2577                    "manifest-list": "s3://a/b/2.avro",
2578                    "schema-id": 0
2579                }
2580            ],
2581            "statistics": [],
2582            "snapshot-log": [],
2583            "metadata-log": []
2584        }
2585    "#;
2586
2587        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2588        assert!(err.to_string().contains(
2589            "Invalid snapshot with id 3055729675574597004 and sequence number 4 greater than last sequence number 1"
2590        ));
2591
2592        // Change max sequence number to 4 - should work
2593        let data = data.replace(
2594            r#""last-sequence-number": 1,"#,
2595            r#""last-sequence-number": 4,"#,
2596        );
2597        let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2598        assert_eq!(metadata.last_sequence_number, 4);
2599
2600        // Change max sequence number to 5 - should work
2601        let data = data.replace(
2602            r#""last-sequence-number": 4,"#,
2603            r#""last-sequence-number": 5,"#,
2604        );
2605        let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2606        assert_eq!(metadata.last_sequence_number, 5);
2607    }
2608
2609    #[test]
2610    fn test_statistic_files() {
2611        let data = r#"
2612        {
2613            "format-version": 2,
2614            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2615            "location": "s3://bucket/test/location",
2616            "last-sequence-number": 34,
2617            "last-updated-ms": 1602638573590,
2618            "last-column-id": 3,
2619            "current-schema-id": 0,
2620            "schemas": [
2621                {
2622                    "type": "struct",
2623                    "schema-id": 0,
2624                    "fields": [
2625                        {
2626                            "id": 1,
2627                            "name": "x",
2628                            "required": true,
2629                            "type": "long"
2630                        }
2631                    ]
2632                }
2633            ],
2634            "default-spec-id": 0,
2635            "partition-specs": [
2636                {
2637                    "spec-id": 0,
2638                    "fields": []
2639                }
2640            ],
2641            "last-partition-id": 1000,
2642            "default-sort-order-id": 0,
2643            "sort-orders": [
2644                {
2645                    "order-id": 0,
2646                    "fields": []
2647                }
2648            ],
2649            "properties": {},
2650            "current-snapshot-id": 3055729675574597004,
2651            "snapshots": [
2652                {
2653                    "snapshot-id": 3055729675574597004,
2654                    "timestamp-ms": 1555100955770,
2655                    "sequence-number": 1,
2656                    "summary": {
2657                        "operation": "append"
2658                    },
2659                    "manifest-list": "s3://a/b/2.avro",
2660                    "schema-id": 0
2661                }
2662            ],
2663            "statistics": [
2664                {
2665                    "snapshot-id": 3055729675574597004,
2666                    "statistics-path": "s3://a/b/stats.puffin",
2667                    "file-size-in-bytes": 413,
2668                    "file-footer-size-in-bytes": 42,
2669                    "blob-metadata": [
2670                        {
2671                            "type": "ndv",
2672                            "snapshot-id": 3055729675574597004,
2673                            "sequence-number": 1,
2674                            "fields": [
2675                                1
2676                            ]
2677                        }
2678                    ]
2679                }
2680            ],
2681            "snapshot-log": [],
2682            "metadata-log": []
2683        }
2684    "#;
2685
2686        let schema = Schema::builder()
2687            .with_schema_id(0)
2688            .with_fields(vec![Arc::new(NestedField::required(
2689                1,
2690                "x",
2691                Type::Primitive(PrimitiveType::Long),
2692            ))])
2693            .build()
2694            .unwrap();
2695        let partition_spec = PartitionSpec::builder(schema.clone())
2696            .with_spec_id(0)
2697            .build()
2698            .unwrap();
2699        let snapshot = Snapshot::builder()
2700            .with_snapshot_id(3055729675574597004)
2701            .with_timestamp_ms(1555100955770)
2702            .with_sequence_number(1)
2703            .with_manifest_list("s3://a/b/2.avro")
2704            .with_schema_id(0)
2705            .with_summary(Summary {
2706                operation: Operation::Append,
2707                additional_properties: HashMap::new(),
2708            })
2709            .build();
2710
2711        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2712        let expected = TableMetadata {
2713            format_version: FormatVersion::V2,
2714            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2715            location: "s3://bucket/test/location".to_string(),
2716            last_updated_ms: 1602638573590,
2717            last_column_id: 3,
2718            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2719            current_schema_id: 0,
2720            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2721            default_partition_type,
2722            default_spec: Arc::new(partition_spec),
2723            last_partition_id: 1000,
2724            default_sort_order_id: 0,
2725            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2726            snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2727            current_snapshot_id: Some(3055729675574597004),
2728            last_sequence_number: 34,
2729            properties: HashMap::new(),
2730            snapshot_log: Vec::new(),
2731            metadata_log: Vec::new(),
2732            statistics: HashMap::from_iter(vec![(3055729675574597004, StatisticsFile {
2733                snapshot_id: 3055729675574597004,
2734                statistics_path: "s3://a/b/stats.puffin".to_string(),
2735                file_size_in_bytes: 413,
2736                file_footer_size_in_bytes: 42,
2737                key_metadata: None,
2738                blob_metadata: vec![BlobMetadata {
2739                    snapshot_id: 3055729675574597004,
2740                    sequence_number: 1,
2741                    fields: vec![1],
2742                    r#type: "ndv".to_string(),
2743                    properties: HashMap::new(),
2744                }],
2745            })]),
2746            partition_statistics: HashMap::new(),
2747            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2748                snapshot_id: 3055729675574597004,
2749                retention: SnapshotRetention::Branch {
2750                    min_snapshots_to_keep: None,
2751                    max_snapshot_age_ms: None,
2752                    max_ref_age_ms: None,
2753                },
2754            })]),
2755            encryption_keys: HashMap::new(),
2756            next_row_id: INITIAL_ROW_ID,
2757        };
2758
2759        check_table_metadata_serde(data, expected);
2760    }
2761
2762    #[test]
2763    fn test_partition_statistics_file() {
2764        let data = r#"
2765        {
2766            "format-version": 2,
2767            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2768            "location": "s3://bucket/test/location",
2769            "last-sequence-number": 34,
2770            "last-updated-ms": 1602638573590,
2771            "last-column-id": 3,
2772            "current-schema-id": 0,
2773            "schemas": [
2774                {
2775                    "type": "struct",
2776                    "schema-id": 0,
2777                    "fields": [
2778                        {
2779                            "id": 1,
2780                            "name": "x",
2781                            "required": true,
2782                            "type": "long"
2783                        }
2784                    ]
2785                }
2786            ],
2787            "default-spec-id": 0,
2788            "partition-specs": [
2789                {
2790                    "spec-id": 0,
2791                    "fields": []
2792                }
2793            ],
2794            "last-partition-id": 1000,
2795            "default-sort-order-id": 0,
2796            "sort-orders": [
2797                {
2798                    "order-id": 0,
2799                    "fields": []
2800                }
2801            ],
2802            "properties": {},
2803            "current-snapshot-id": 3055729675574597004,
2804            "snapshots": [
2805                {
2806                    "snapshot-id": 3055729675574597004,
2807                    "timestamp-ms": 1555100955770,
2808                    "sequence-number": 1,
2809                    "summary": {
2810                        "operation": "append"
2811                    },
2812                    "manifest-list": "s3://a/b/2.avro",
2813                    "schema-id": 0
2814                }
2815            ],
2816            "partition-statistics": [
2817                {
2818                    "snapshot-id": 3055729675574597004,
2819                    "statistics-path": "s3://a/b/partition-stats.parquet",
2820                    "file-size-in-bytes": 43
2821                }
2822            ],
2823            "snapshot-log": [],
2824            "metadata-log": []
2825        }
2826        "#;
2827
2828        let schema = Schema::builder()
2829            .with_schema_id(0)
2830            .with_fields(vec![Arc::new(NestedField::required(
2831                1,
2832                "x",
2833                Type::Primitive(PrimitiveType::Long),
2834            ))])
2835            .build()
2836            .unwrap();
2837        let partition_spec = PartitionSpec::builder(schema.clone())
2838            .with_spec_id(0)
2839            .build()
2840            .unwrap();
2841        let snapshot = Snapshot::builder()
2842            .with_snapshot_id(3055729675574597004)
2843            .with_timestamp_ms(1555100955770)
2844            .with_sequence_number(1)
2845            .with_manifest_list("s3://a/b/2.avro")
2846            .with_schema_id(0)
2847            .with_summary(Summary {
2848                operation: Operation::Append,
2849                additional_properties: HashMap::new(),
2850            })
2851            .build();
2852
2853        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2854        let expected = TableMetadata {
2855            format_version: FormatVersion::V2,
2856            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2857            location: "s3://bucket/test/location".to_string(),
2858            last_updated_ms: 1602638573590,
2859            last_column_id: 3,
2860            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2861            current_schema_id: 0,
2862            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2863            default_spec: Arc::new(partition_spec),
2864            default_partition_type,
2865            last_partition_id: 1000,
2866            default_sort_order_id: 0,
2867            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2868            snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2869            current_snapshot_id: Some(3055729675574597004),
2870            last_sequence_number: 34,
2871            properties: HashMap::new(),
2872            snapshot_log: Vec::new(),
2873            metadata_log: Vec::new(),
2874            statistics: HashMap::new(),
2875            partition_statistics: HashMap::from_iter(vec![(
2876                3055729675574597004,
2877                PartitionStatisticsFile {
2878                    snapshot_id: 3055729675574597004,
2879                    statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2880                    file_size_in_bytes: 43,
2881                },
2882            )]),
2883            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2884                snapshot_id: 3055729675574597004,
2885                retention: SnapshotRetention::Branch {
2886                    min_snapshots_to_keep: None,
2887                    max_snapshot_age_ms: None,
2888                    max_ref_age_ms: None,
2889                },
2890            })]),
2891            encryption_keys: HashMap::new(),
2892            next_row_id: INITIAL_ROW_ID,
2893        };
2894
2895        check_table_metadata_serde(data, expected);
2896    }
2897
2898    #[test]
2899    fn test_invalid_table_uuid() -> Result<()> {
2900        let data = r#"
2901            {
2902                "format-version" : 2,
2903                "table-uuid": "xxxx"
2904            }
2905        "#;
2906        assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2907        Ok(())
2908    }
2909
2910    #[test]
2911    fn test_deserialize_table_data_v2_invalid_format_version() -> Result<()> {
2912        let data = r#"
2913            {
2914                "format-version" : 1
2915            }
2916        "#;
2917        assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2918        Ok(())
2919    }
2920
2921    #[test]
2922    fn test_table_metadata_v3_valid_minimal() {
2923        let metadata_str =
2924            fs::read_to_string("testdata/table_metadata/TableMetadataV3ValidMinimal.json").unwrap();
2925
2926        let table_metadata = serde_json::from_str::<TableMetadata>(&metadata_str).unwrap();
2927        assert_eq!(table_metadata.format_version, FormatVersion::V3);
2928
2929        let schema = Schema::builder()
2930            .with_schema_id(0)
2931            .with_fields(vec![
2932                Arc::new(
2933                    NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long))
2934                        .with_initial_default(Literal::Primitive(PrimitiveLiteral::Long(1)))
2935                        .with_write_default(Literal::Primitive(PrimitiveLiteral::Long(1))),
2936                ),
2937                Arc::new(
2938                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2939                        .with_doc("comment"),
2940                ),
2941                Arc::new(NestedField::required(
2942                    3,
2943                    "z",
2944                    Type::Primitive(PrimitiveType::Long),
2945                )),
2946            ])
2947            .build()
2948            .unwrap();
2949
2950        let partition_spec = PartitionSpec::builder(schema.clone())
2951            .with_spec_id(0)
2952            .add_unbound_field(UnboundPartitionField {
2953                name: "x".to_string(),
2954                transform: Transform::Identity,
2955                source_id: 1,
2956                field_id: Some(1000),
2957            })
2958            .unwrap()
2959            .build()
2960            .unwrap();
2961
2962        let sort_order = SortOrder::builder()
2963            .with_order_id(3)
2964            .with_sort_field(SortField {
2965                source_id: 2,
2966                transform: Transform::Identity,
2967                direction: SortDirection::Ascending,
2968                null_order: NullOrder::First,
2969            })
2970            .with_sort_field(SortField {
2971                source_id: 3,
2972                transform: Transform::Bucket(4),
2973                direction: SortDirection::Descending,
2974                null_order: NullOrder::Last,
2975            })
2976            .build_unbound()
2977            .unwrap();
2978
2979        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2980        let expected = TableMetadata {
2981            format_version: FormatVersion::V3,
2982            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2983            location: "s3://bucket/test/location".to_string(),
2984            last_updated_ms: 1602638573590,
2985            last_column_id: 3,
2986            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2987            current_schema_id: 0,
2988            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2989            default_spec: Arc::new(partition_spec),
2990            default_partition_type,
2991            last_partition_id: 1000,
2992            default_sort_order_id: 3,
2993            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
2994            snapshots: HashMap::default(),
2995            current_snapshot_id: None,
2996            last_sequence_number: 34,
2997            properties: HashMap::new(),
2998            snapshot_log: Vec::new(),
2999            metadata_log: Vec::new(),
3000            refs: HashMap::new(),
3001            statistics: HashMap::new(),
3002            partition_statistics: HashMap::new(),
3003            encryption_keys: HashMap::new(),
3004            next_row_id: 0, // V3 specific field from the JSON
3005        };
3006
3007        check_table_metadata_serde(&metadata_str, expected);
3008    }
3009
3010    #[test]
3011    fn test_table_metadata_v2_file_valid() {
3012        let metadata =
3013            fs::read_to_string("testdata/table_metadata/TableMetadataV2Valid.json").unwrap();
3014
3015        let schema1 = Schema::builder()
3016            .with_schema_id(0)
3017            .with_fields(vec![Arc::new(NestedField::required(
3018                1,
3019                "x",
3020                Type::Primitive(PrimitiveType::Long),
3021            ))])
3022            .build()
3023            .unwrap();
3024
3025        let schema2 = Schema::builder()
3026            .with_schema_id(1)
3027            .with_fields(vec![
3028                Arc::new(NestedField::required(
3029                    1,
3030                    "x",
3031                    Type::Primitive(PrimitiveType::Long),
3032                )),
3033                Arc::new(
3034                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3035                        .with_doc("comment"),
3036                ),
3037                Arc::new(NestedField::required(
3038                    3,
3039                    "z",
3040                    Type::Primitive(PrimitiveType::Long),
3041                )),
3042            ])
3043            .with_identifier_field_ids(vec![1, 2])
3044            .build()
3045            .unwrap();
3046
3047        let partition_spec = PartitionSpec::builder(schema2.clone())
3048            .with_spec_id(0)
3049            .add_unbound_field(UnboundPartitionField {
3050                name: "x".to_string(),
3051                transform: Transform::Identity,
3052                source_id: 1,
3053                field_id: Some(1000),
3054            })
3055            .unwrap()
3056            .build()
3057            .unwrap();
3058
3059        let sort_order = SortOrder::builder()
3060            .with_order_id(3)
3061            .with_sort_field(SortField {
3062                source_id: 2,
3063                transform: Transform::Identity,
3064                direction: SortDirection::Ascending,
3065                null_order: NullOrder::First,
3066            })
3067            .with_sort_field(SortField {
3068                source_id: 3,
3069                transform: Transform::Bucket(4),
3070                direction: SortDirection::Descending,
3071                null_order: NullOrder::Last,
3072            })
3073            .build_unbound()
3074            .unwrap();
3075
3076        let snapshot1 = Snapshot::builder()
3077            .with_snapshot_id(3051729675574597004)
3078            .with_timestamp_ms(1515100955770)
3079            .with_sequence_number(0)
3080            .with_manifest_list("s3://a/b/1.avro")
3081            .with_summary(Summary {
3082                operation: Operation::Append,
3083                additional_properties: HashMap::new(),
3084            })
3085            .build();
3086
3087        let snapshot2 = Snapshot::builder()
3088            .with_snapshot_id(3055729675574597004)
3089            .with_parent_snapshot_id(Some(3051729675574597004))
3090            .with_timestamp_ms(1555100955770)
3091            .with_sequence_number(1)
3092            .with_schema_id(1)
3093            .with_manifest_list("s3://a/b/2.avro")
3094            .with_summary(Summary {
3095                operation: Operation::Append,
3096                additional_properties: HashMap::new(),
3097            })
3098            .build();
3099
3100        let default_partition_type = partition_spec.partition_type(&schema2).unwrap();
3101        let expected = TableMetadata {
3102            format_version: FormatVersion::V2,
3103            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3104            location: "s3://bucket/test/location".to_string(),
3105            last_updated_ms: 1602638573590,
3106            last_column_id: 3,
3107            schemas: HashMap::from_iter(vec![(0, Arc::new(schema1)), (1, Arc::new(schema2))]),
3108            current_schema_id: 1,
3109            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3110            default_spec: Arc::new(partition_spec),
3111            default_partition_type,
3112            last_partition_id: 1000,
3113            default_sort_order_id: 3,
3114            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3115            snapshots: HashMap::from_iter(vec![
3116                (3051729675574597004, Arc::new(snapshot1)),
3117                (3055729675574597004, Arc::new(snapshot2)),
3118            ]),
3119            current_snapshot_id: Some(3055729675574597004),
3120            last_sequence_number: 34,
3121            properties: HashMap::new(),
3122            snapshot_log: vec![
3123                SnapshotLog {
3124                    snapshot_id: 3051729675574597004,
3125                    timestamp_ms: 1515100955770,
3126                },
3127                SnapshotLog {
3128                    snapshot_id: 3055729675574597004,
3129                    timestamp_ms: 1555100955770,
3130                },
3131            ],
3132            metadata_log: Vec::new(),
3133            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
3134                snapshot_id: 3055729675574597004,
3135                retention: SnapshotRetention::Branch {
3136                    min_snapshots_to_keep: None,
3137                    max_snapshot_age_ms: None,
3138                    max_ref_age_ms: None,
3139                },
3140            })]),
3141            statistics: HashMap::new(),
3142            partition_statistics: HashMap::new(),
3143            encryption_keys: HashMap::new(),
3144            next_row_id: INITIAL_ROW_ID,
3145        };
3146
3147        check_table_metadata_serde(&metadata, expected);
3148    }
3149
3150    #[test]
3151    fn test_table_metadata_v2_file_valid_minimal() {
3152        let metadata =
3153            fs::read_to_string("testdata/table_metadata/TableMetadataV2ValidMinimal.json").unwrap();
3154
3155        let schema = Schema::builder()
3156            .with_schema_id(0)
3157            .with_fields(vec![
3158                Arc::new(NestedField::required(
3159                    1,
3160                    "x",
3161                    Type::Primitive(PrimitiveType::Long),
3162                )),
3163                Arc::new(
3164                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3165                        .with_doc("comment"),
3166                ),
3167                Arc::new(NestedField::required(
3168                    3,
3169                    "z",
3170                    Type::Primitive(PrimitiveType::Long),
3171                )),
3172            ])
3173            .build()
3174            .unwrap();
3175
3176        let partition_spec = PartitionSpec::builder(schema.clone())
3177            .with_spec_id(0)
3178            .add_unbound_field(UnboundPartitionField {
3179                name: "x".to_string(),
3180                transform: Transform::Identity,
3181                source_id: 1,
3182                field_id: Some(1000),
3183            })
3184            .unwrap()
3185            .build()
3186            .unwrap();
3187
3188        let sort_order = SortOrder::builder()
3189            .with_order_id(3)
3190            .with_sort_field(SortField {
3191                source_id: 2,
3192                transform: Transform::Identity,
3193                direction: SortDirection::Ascending,
3194                null_order: NullOrder::First,
3195            })
3196            .with_sort_field(SortField {
3197                source_id: 3,
3198                transform: Transform::Bucket(4),
3199                direction: SortDirection::Descending,
3200                null_order: NullOrder::Last,
3201            })
3202            .build_unbound()
3203            .unwrap();
3204
3205        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3206        let expected = TableMetadata {
3207            format_version: FormatVersion::V2,
3208            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3209            location: "s3://bucket/test/location".to_string(),
3210            last_updated_ms: 1602638573590,
3211            last_column_id: 3,
3212            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3213            current_schema_id: 0,
3214            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3215            default_partition_type,
3216            default_spec: Arc::new(partition_spec),
3217            last_partition_id: 1000,
3218            default_sort_order_id: 3,
3219            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3220            snapshots: HashMap::default(),
3221            current_snapshot_id: None,
3222            last_sequence_number: 34,
3223            properties: HashMap::new(),
3224            snapshot_log: vec![],
3225            metadata_log: Vec::new(),
3226            refs: HashMap::new(),
3227            statistics: HashMap::new(),
3228            partition_statistics: HashMap::new(),
3229            encryption_keys: HashMap::new(),
3230            next_row_id: INITIAL_ROW_ID,
3231        };
3232
3233        check_table_metadata_serde(&metadata, expected);
3234    }
3235
3236    #[test]
3237    fn test_table_metadata_v1_file_valid() {
3238        let metadata =
3239            fs::read_to_string("testdata/table_metadata/TableMetadataV1Valid.json").unwrap();
3240
3241        let schema = Schema::builder()
3242            .with_schema_id(0)
3243            .with_fields(vec![
3244                Arc::new(NestedField::required(
3245                    1,
3246                    "x",
3247                    Type::Primitive(PrimitiveType::Long),
3248                )),
3249                Arc::new(
3250                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3251                        .with_doc("comment"),
3252                ),
3253                Arc::new(NestedField::required(
3254                    3,
3255                    "z",
3256                    Type::Primitive(PrimitiveType::Long),
3257                )),
3258            ])
3259            .build()
3260            .unwrap();
3261
3262        let partition_spec = PartitionSpec::builder(schema.clone())
3263            .with_spec_id(0)
3264            .add_unbound_field(UnboundPartitionField {
3265                name: "x".to_string(),
3266                transform: Transform::Identity,
3267                source_id: 1,
3268                field_id: Some(1000),
3269            })
3270            .unwrap()
3271            .build()
3272            .unwrap();
3273
3274        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3275        let expected = TableMetadata {
3276            format_version: FormatVersion::V1,
3277            table_uuid: Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap(),
3278            location: "s3://bucket/test/location".to_string(),
3279            last_updated_ms: 1602638573874,
3280            last_column_id: 3,
3281            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3282            current_schema_id: 0,
3283            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3284            default_spec: Arc::new(partition_spec),
3285            default_partition_type,
3286            last_partition_id: 0,
3287            default_sort_order_id: 0,
3288            // Sort order is added during deserialization for V2 compatibility
3289            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
3290            snapshots: HashMap::new(),
3291            current_snapshot_id: None,
3292            last_sequence_number: 0,
3293            properties: HashMap::new(),
3294            snapshot_log: vec![],
3295            metadata_log: Vec::new(),
3296            refs: HashMap::new(),
3297            statistics: HashMap::new(),
3298            partition_statistics: HashMap::new(),
3299            encryption_keys: HashMap::new(),
3300            next_row_id: INITIAL_ROW_ID,
3301        };
3302
3303        check_table_metadata_serde(&metadata, expected);
3304    }
3305
3306    #[test]
3307    fn test_empty_snapshot_id_is_normalized_to_none() {
3308        let metadata =
3309            fs::read_to_string("testdata/table_metadata/TableMetadataV1Valid.json").unwrap();
3310        let deserialized: TableMetadata = serde_json::from_str(&metadata).unwrap();
3311        assert_eq!(
3312            deserialized.current_snapshot_id(),
3313            None,
3314            "current_snapshot_id of -1 should be deserialized as None"
3315        );
3316    }
3317
3318    #[test]
3319    fn test_table_metadata_v1_compat() {
3320        let metadata =
3321            fs::read_to_string("testdata/table_metadata/TableMetadataV1Compat.json").unwrap();
3322
3323        // Deserialize the JSON to verify it works
3324        let desered_type: TableMetadata = serde_json::from_str(&metadata)
3325            .expect("Failed to deserialize TableMetadataV1Compat.json");
3326
3327        // Verify some key fields match
3328        assert_eq!(desered_type.format_version(), FormatVersion::V1);
3329        assert_eq!(
3330            desered_type.uuid(),
3331            Uuid::parse_str("3276010d-7b1d-488c-98d8-9025fc4fde6b").unwrap()
3332        );
3333        assert_eq!(
3334            desered_type.location(),
3335            "s3://bucket/warehouse/iceberg/glue.db/table_name"
3336        );
3337        assert_eq!(desered_type.last_updated_ms(), 1727773114005);
3338        assert_eq!(desered_type.current_schema_id(), 0);
3339    }
3340
3341    #[test]
3342    fn test_table_metadata_v1_schemas_without_current_id() {
3343        let metadata = fs::read_to_string(
3344            "testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json",
3345        )
3346        .unwrap();
3347
3348        // Deserialize the JSON - this should succeed by using the 'schema' field instead of 'schemas'
3349        let desered_type: TableMetadata = serde_json::from_str(&metadata)
3350            .expect("Failed to deserialize TableMetadataV1SchemasWithoutCurrentId.json");
3351
3352        // Verify it used the 'schema' field
3353        assert_eq!(desered_type.format_version(), FormatVersion::V1);
3354        assert_eq!(
3355            desered_type.uuid(),
3356            Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3357        );
3358
3359        // Get the schema and verify it has the expected fields
3360        let schema = desered_type.current_schema();
3361        assert_eq!(schema.as_struct().fields().len(), 3);
3362        assert_eq!(schema.as_struct().fields()[0].name, "x");
3363        assert_eq!(schema.as_struct().fields()[1].name, "y");
3364        assert_eq!(schema.as_struct().fields()[2].name, "z");
3365    }
3366
3367    #[test]
3368    fn test_table_metadata_v1_no_valid_schema() {
3369        let metadata =
3370            fs::read_to_string("testdata/table_metadata/TableMetadataV1NoValidSchema.json")
3371                .unwrap();
3372
3373        // Deserialize the JSON - this should fail because neither schemas + current_schema_id nor schema is valid
3374        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3375
3376        assert!(desered.is_err());
3377        let error_message = desered.unwrap_err().to_string();
3378        assert!(
3379            error_message.contains("No valid schema configuration found"),
3380            "Expected error about no valid schema configuration, got: {error_message}"
3381        );
3382    }
3383
3384    #[test]
3385    fn test_table_metadata_v1_partition_specs_without_default_id() {
3386        let metadata = fs::read_to_string(
3387            "testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json",
3388        )
3389        .unwrap();
3390
3391        // Deserialize the JSON - this should succeed by inferring default_spec_id as the max spec ID
3392        let desered_type: TableMetadata = serde_json::from_str(&metadata)
3393            .expect("Failed to deserialize TableMetadataV1PartitionSpecsWithoutDefaultId.json");
3394
3395        // Verify basic metadata
3396        assert_eq!(desered_type.format_version(), FormatVersion::V1);
3397        assert_eq!(
3398            desered_type.uuid(),
3399            Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3400        );
3401
3402        // Verify partition specs
3403        assert_eq!(desered_type.default_partition_spec_id(), 2); // Should pick the largest spec ID (2)
3404        assert_eq!(desered_type.partition_specs.len(), 2);
3405
3406        // Verify the default spec has the expected fields
3407        let default_spec = &desered_type.default_spec;
3408        assert_eq!(default_spec.spec_id(), 2);
3409        assert_eq!(default_spec.fields().len(), 1);
3410        assert_eq!(default_spec.fields()[0].name, "y");
3411        assert_eq!(default_spec.fields()[0].transform, Transform::Identity);
3412        assert_eq!(default_spec.fields()[0].source_id, 2);
3413    }
3414
3415    #[test]
3416    fn test_table_metadata_v2_schema_not_found() {
3417        let metadata =
3418            fs::read_to_string("testdata/table_metadata/TableMetadataV2CurrentSchemaNotFound.json")
3419                .unwrap();
3420
3421        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3422
3423        assert_eq!(
3424            desered.unwrap_err().to_string(),
3425            "DataInvalid => No schema exists with the current schema id 2."
3426        )
3427    }
3428
3429    #[test]
3430    fn test_table_metadata_v2_missing_sort_order() {
3431        let metadata =
3432            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSortOrder.json")
3433                .unwrap();
3434
3435        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3436
3437        assert_eq!(
3438            desered.unwrap_err().to_string(),
3439            "data did not match any variant of untagged enum TableMetadataEnum"
3440        )
3441    }
3442
3443    #[test]
3444    fn test_table_metadata_v2_missing_partition_specs() {
3445        let metadata =
3446            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingPartitionSpecs.json")
3447                .unwrap();
3448
3449        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3450
3451        assert_eq!(
3452            desered.unwrap_err().to_string(),
3453            "data did not match any variant of untagged enum TableMetadataEnum"
3454        )
3455    }
3456
3457    #[test]
3458    fn test_table_metadata_v2_missing_last_partition_id() {
3459        let metadata = fs::read_to_string(
3460            "testdata/table_metadata/TableMetadataV2MissingLastPartitionId.json",
3461        )
3462        .unwrap();
3463
3464        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3465
3466        assert_eq!(
3467            desered.unwrap_err().to_string(),
3468            "data did not match any variant of untagged enum TableMetadataEnum"
3469        )
3470    }
3471
3472    #[test]
3473    fn test_table_metadata_v2_missing_schemas() {
3474        let metadata =
3475            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSchemas.json")
3476                .unwrap();
3477
3478        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3479
3480        assert_eq!(
3481            desered.unwrap_err().to_string(),
3482            "data did not match any variant of untagged enum TableMetadataEnum"
3483        )
3484    }
3485
3486    #[test]
3487    fn test_table_metadata_v2_unsupported_version() {
3488        let metadata =
3489            fs::read_to_string("testdata/table_metadata/TableMetadataUnsupportedVersion.json")
3490                .unwrap();
3491
3492        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3493
3494        assert_eq!(
3495            desered.unwrap_err().to_string(),
3496            "data did not match any variant of untagged enum TableMetadataEnum"
3497        )
3498    }
3499
3500    #[test]
3501    fn test_order_of_format_version() {
3502        assert!(FormatVersion::V1 < FormatVersion::V2);
3503        assert_eq!(FormatVersion::V1, FormatVersion::V1);
3504        assert_eq!(FormatVersion::V2, FormatVersion::V2);
3505    }
3506
3507    #[test]
3508    fn test_default_partition_spec() {
3509        let default_spec_id = 1234;
3510        let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3511        let partition_spec = PartitionSpec::unpartition_spec();
3512        table_meta_data.default_spec = partition_spec.clone().into();
3513        table_meta_data
3514            .partition_specs
3515            .insert(default_spec_id, Arc::new(partition_spec));
3516
3517        assert_eq!(
3518            (*table_meta_data.default_partition_spec().clone()).clone(),
3519            (*table_meta_data
3520                .partition_spec_by_id(default_spec_id)
3521                .unwrap()
3522                .clone())
3523            .clone()
3524        );
3525    }
3526    #[test]
3527    fn test_default_sort_order() {
3528        let default_sort_order_id = 1234;
3529        let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3530        table_meta_data.default_sort_order_id = default_sort_order_id;
3531        table_meta_data
3532            .sort_orders
3533            .insert(default_sort_order_id, Arc::new(SortOrder::default()));
3534
3535        assert_eq!(
3536            table_meta_data.default_sort_order(),
3537            table_meta_data
3538                .sort_orders
3539                .get(&default_sort_order_id)
3540                .unwrap()
3541        )
3542    }
3543
3544    #[test]
3545    fn test_table_metadata_builder_from_table_creation() {
3546        let table_creation = TableCreation::builder()
3547            .location("s3://db/table".to_string())
3548            .name("table".to_string())
3549            .properties(HashMap::new())
3550            .schema(Schema::builder().build().unwrap())
3551            .build();
3552        let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
3553            .unwrap()
3554            .build()
3555            .unwrap()
3556            .metadata;
3557        assert_eq!(table_metadata.location, "s3://db/table");
3558        assert_eq!(table_metadata.schemas.len(), 1);
3559        assert_eq!(
3560            table_metadata
3561                .schemas
3562                .get(&0)
3563                .unwrap()
3564                .as_struct()
3565                .fields()
3566                .len(),
3567            0
3568        );
3569        assert_eq!(table_metadata.properties.len(), 0);
3570        assert_eq!(
3571            table_metadata.partition_specs,
3572            HashMap::from([(
3573                0,
3574                Arc::new(
3575                    PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap().clone())
3576                        .with_spec_id(0)
3577                        .build()
3578                        .unwrap()
3579                )
3580            )])
3581        );
3582        assert_eq!(
3583            table_metadata.sort_orders,
3584            HashMap::from([(
3585                0,
3586                Arc::new(SortOrder {
3587                    order_id: 0,
3588                    fields: vec![]
3589                })
3590            )])
3591        );
3592    }
3593
3594    #[tokio::test]
3595    async fn test_table_metadata_read_write() {
3596        // Create a temporary directory for our test
3597        let temp_dir = TempDir::new().unwrap();
3598        let temp_path = temp_dir.path().to_str().unwrap();
3599
3600        // Create a FileIO instance
3601        let file_io = FileIO::new_with_fs();
3602
3603        // Use an existing test metadata from the test files
3604        let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3605
3606        // Define the metadata location
3607        let metadata_location = MetadataLocation::new_with_metadata(temp_path, &original_metadata);
3608        let metadata_location_str = metadata_location.to_string();
3609
3610        // Write the metadata
3611        original_metadata
3612            .write_to(&file_io, &metadata_location)
3613            .await
3614            .unwrap();
3615
3616        // Verify the file exists
3617        assert!(fs::metadata(&metadata_location_str).is_ok());
3618
3619        // Read the metadata back
3620        let read_metadata = TableMetadata::read_from(&file_io, &metadata_location_str)
3621            .await
3622            .unwrap();
3623
3624        // Verify the metadata matches
3625        assert_eq!(read_metadata, original_metadata);
3626    }
3627
3628    #[tokio::test]
3629    async fn test_table_metadata_read_compressed() {
3630        let temp_dir = TempDir::new().unwrap();
3631        let metadata_location = temp_dir.path().join("v1.gz.metadata.json");
3632
3633        let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3634        let json = serde_json::to_string(&original_metadata).unwrap();
3635
3636        let compressed = CompressionCodec::gzip_default()
3637            .compress(json.into_bytes())
3638            .expect("failed to compress metadata");
3639        std::fs::write(&metadata_location, &compressed).expect("failed to write metadata");
3640
3641        // Read the metadata back
3642        let file_io = FileIO::new_with_fs();
3643        let metadata_location = metadata_location.to_str().unwrap();
3644        let read_metadata = TableMetadata::read_from(&file_io, metadata_location)
3645            .await
3646            .unwrap();
3647
3648        // Verify the metadata matches
3649        assert_eq!(read_metadata, original_metadata);
3650    }
3651
3652    #[tokio::test]
3653    async fn test_table_metadata_read_nonexistent_file() {
3654        // Create a FileIO instance
3655        let file_io = FileIO::new_with_fs();
3656
3657        // Try to read a non-existent file
3658        let result = TableMetadata::read_from(&file_io, "/nonexistent/path/metadata.json").await;
3659
3660        // Verify it returns an error
3661        assert!(result.is_err());
3662    }
3663
3664    #[tokio::test]
3665    async fn test_table_metadata_write_with_gzip_compression() {
3666        let temp_dir = TempDir::new().unwrap();
3667        let temp_path = temp_dir.path().to_str().unwrap();
3668        let file_io = FileIO::new_with_fs();
3669
3670        // Get a test metadata and add gzip compression property
3671        let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3672
3673        // Modify properties to enable gzip compression (using mixed case to test case-insensitive matching)
3674        let mut props = original_metadata.properties.clone();
3675        props.insert(
3676            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
3677            "GziP".to_string(),
3678        );
3679        // Use builder to create new metadata with updated properties
3680        let compressed_metadata =
3681            TableMetadataBuilder::new_from_metadata(original_metadata.clone(), None)
3682                .assign_uuid(original_metadata.table_uuid)
3683                .set_properties(props.clone())
3684                .unwrap()
3685                .build()
3686                .unwrap()
3687                .metadata;
3688
3689        // Create MetadataLocation with compression codec from metadata
3690        let metadata_location =
3691            MetadataLocation::new_with_metadata(temp_path, &compressed_metadata);
3692        let metadata_location_str = metadata_location.to_string();
3693
3694        // Verify the location has the .gz extension
3695        assert!(metadata_location_str.contains(".gz.metadata.json"));
3696
3697        // Write the metadata with compression
3698        compressed_metadata
3699            .write_to(&file_io, &metadata_location)
3700            .await
3701            .unwrap();
3702
3703        // Verify the compressed file exists
3704        assert!(std::path::Path::new(&metadata_location_str).exists());
3705
3706        // Read the raw file and check it's gzip compressed
3707        let raw_content = std::fs::read(&metadata_location_str).unwrap();
3708        assert!(raw_content.len() > 2);
3709        assert_eq!(raw_content[0], 0x1F); // gzip magic number
3710        assert_eq!(raw_content[1], 0x8B); // gzip magic number
3711
3712        // Read the metadata back using the compressed location
3713        let read_metadata = TableMetadata::read_from(&file_io, &metadata_location_str)
3714            .await
3715            .unwrap();
3716
3717        // Verify the complete round-trip: read metadata should match what we wrote
3718        assert_eq!(read_metadata, compressed_metadata);
3719    }
3720
3721    #[test]
3722    fn test_partition_name_exists() {
3723        let schema = Schema::builder()
3724            .with_fields(vec![
3725                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3726                NestedField::required(2, "partition_col", Type::Primitive(PrimitiveType::Int))
3727                    .into(),
3728            ])
3729            .build()
3730            .unwrap();
3731
3732        let spec1 = PartitionSpec::builder(schema.clone())
3733            .with_spec_id(1)
3734            .add_partition_field("data", "data_partition", Transform::Identity)
3735            .unwrap()
3736            .build()
3737            .unwrap();
3738
3739        let spec2 = PartitionSpec::builder(schema.clone())
3740            .with_spec_id(2)
3741            .add_partition_field("partition_col", "partition_bucket", Transform::Bucket(16))
3742            .unwrap()
3743            .build()
3744            .unwrap();
3745
3746        // Build metadata with these specs
3747        let metadata = TableMetadataBuilder::new(
3748            schema,
3749            spec1.clone().into_unbound(),
3750            SortOrder::unsorted_order(),
3751            "s3://test/location".to_string(),
3752            FormatVersion::V2,
3753            HashMap::new(),
3754        )
3755        .unwrap()
3756        .add_partition_spec(spec2.into_unbound())
3757        .unwrap()
3758        .build()
3759        .unwrap()
3760        .metadata;
3761
3762        assert!(metadata.partition_name_exists("data_partition"));
3763        assert!(metadata.partition_name_exists("partition_bucket"));
3764
3765        assert!(!metadata.partition_name_exists("nonexistent_field"));
3766        assert!(!metadata.partition_name_exists("data")); // schema field name, not partition field name
3767        assert!(!metadata.partition_name_exists(""));
3768    }
3769
3770    #[test]
3771    fn test_partition_name_exists_empty_specs() {
3772        // Create metadata with no partition specs (unpartitioned table)
3773        let schema = Schema::builder()
3774            .with_fields(vec![
3775                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3776            ])
3777            .build()
3778            .unwrap();
3779
3780        let metadata = TableMetadataBuilder::new(
3781            schema,
3782            PartitionSpec::unpartition_spec().into_unbound(),
3783            SortOrder::unsorted_order(),
3784            "s3://test/location".to_string(),
3785            FormatVersion::V2,
3786            HashMap::new(),
3787        )
3788        .unwrap()
3789        .build()
3790        .unwrap()
3791        .metadata;
3792
3793        assert!(!metadata.partition_name_exists("any_field"));
3794        assert!(!metadata.partition_name_exists("data"));
3795    }
3796
3797    #[test]
3798    fn test_name_exists_in_any_schema() {
3799        // Create multiple schemas with different fields
3800        let schema1 = Schema::builder()
3801            .with_schema_id(1)
3802            .with_fields(vec![
3803                NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3804                NestedField::required(2, "field2", Type::Primitive(PrimitiveType::Int)).into(),
3805            ])
3806            .build()
3807            .unwrap();
3808
3809        let schema2 = Schema::builder()
3810            .with_schema_id(2)
3811            .with_fields(vec![
3812                NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3813                NestedField::required(3, "field3", Type::Primitive(PrimitiveType::Long)).into(),
3814            ])
3815            .build()
3816            .unwrap();
3817
3818        let metadata = TableMetadataBuilder::new(
3819            schema1,
3820            PartitionSpec::unpartition_spec().into_unbound(),
3821            SortOrder::unsorted_order(),
3822            "s3://test/location".to_string(),
3823            FormatVersion::V2,
3824            HashMap::new(),
3825        )
3826        .unwrap()
3827        .add_current_schema(schema2)
3828        .unwrap()
3829        .build()
3830        .unwrap()
3831        .metadata;
3832
3833        assert!(metadata.name_exists_in_any_schema("field1")); // exists in both schemas
3834        assert!(metadata.name_exists_in_any_schema("field2")); // exists only in schema1 (historical)
3835        assert!(metadata.name_exists_in_any_schema("field3")); // exists only in schema2 (current)
3836
3837        assert!(!metadata.name_exists_in_any_schema("nonexistent_field"));
3838        assert!(!metadata.name_exists_in_any_schema("field4"));
3839        assert!(!metadata.name_exists_in_any_schema(""));
3840    }
3841
3842    #[test]
3843    fn test_name_exists_in_any_schema_empty_schemas() {
3844        let schema = Schema::builder().with_fields(vec![]).build().unwrap();
3845
3846        let metadata = TableMetadataBuilder::new(
3847            schema,
3848            PartitionSpec::unpartition_spec().into_unbound(),
3849            SortOrder::unsorted_order(),
3850            "s3://test/location".to_string(),
3851            FormatVersion::V2,
3852            HashMap::new(),
3853        )
3854        .unwrap()
3855        .build()
3856        .unwrap()
3857        .metadata;
3858
3859        assert!(!metadata.name_exists_in_any_schema("any_field"));
3860    }
3861
3862    #[test]
3863    fn test_helper_methods_multi_version_scenario() {
3864        // Test a realistic multi-version scenario
3865        let initial_schema = Schema::builder()
3866            .with_fields(vec![
3867                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3868                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3869                NestedField::required(
3870                    3,
3871                    "deprecated_field",
3872                    Type::Primitive(PrimitiveType::String),
3873                )
3874                .into(),
3875            ])
3876            .build()
3877            .unwrap();
3878
3879        let metadata = TableMetadataBuilder::new(
3880            initial_schema,
3881            PartitionSpec::unpartition_spec().into_unbound(),
3882            SortOrder::unsorted_order(),
3883            "s3://test/location".to_string(),
3884            FormatVersion::V2,
3885            HashMap::new(),
3886        )
3887        .unwrap();
3888
3889        let evolved_schema = Schema::builder()
3890            .with_fields(vec![
3891                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3892                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3893                NestedField::required(
3894                    3,
3895                    "deprecated_field",
3896                    Type::Primitive(PrimitiveType::String),
3897                )
3898                .into(),
3899                NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3900                    .into(),
3901            ])
3902            .build()
3903            .unwrap();
3904
3905        // Then add a third schema that removes the deprecated field
3906        let _final_schema = Schema::builder()
3907            .with_fields(vec![
3908                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3909                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3910                NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3911                    .into(),
3912                NestedField::required(5, "latest_field", Type::Primitive(PrimitiveType::Boolean))
3913                    .into(),
3914            ])
3915            .build()
3916            .unwrap();
3917
3918        let final_metadata = metadata
3919            .add_current_schema(evolved_schema)
3920            .unwrap()
3921            .build()
3922            .unwrap()
3923            .metadata;
3924
3925        assert!(!final_metadata.partition_name_exists("nonexistent_partition")); // unpartitioned table
3926
3927        assert!(final_metadata.name_exists_in_any_schema("id")); // exists in both schemas
3928        assert!(final_metadata.name_exists_in_any_schema("name")); // exists in both schemas
3929        assert!(final_metadata.name_exists_in_any_schema("deprecated_field")); // exists in both schemas
3930        assert!(final_metadata.name_exists_in_any_schema("new_field")); // only in current schema
3931        assert!(!final_metadata.name_exists_in_any_schema("never_existed"));
3932    }
3933
3934    #[test]
3935    fn test_invalid_sort_order_id_zero_with_fields() {
3936        let metadata = r#"
3937        {
3938            "format-version": 2,
3939            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
3940            "location": "s3://bucket/test/location",
3941            "last-sequence-number": 111,
3942            "last-updated-ms": 1600000000000,
3943            "last-column-id": 3,
3944            "current-schema-id": 1,
3945            "schemas": [
3946                {
3947                    "type": "struct",
3948                    "schema-id": 1,
3949                    "fields": [
3950                        {"id": 1, "name": "x", "required": true, "type": "long"},
3951                        {"id": 2, "name": "y", "required": true, "type": "long"}
3952                    ]
3953                }
3954            ],
3955            "default-spec-id": 0,
3956            "partition-specs": [{"spec-id": 0, "fields": []}],
3957            "last-partition-id": 999,
3958            "default-sort-order-id": 0,
3959            "sort-orders": [
3960                {
3961                    "order-id": 0,
3962                    "fields": [
3963                        {
3964                            "transform": "identity",
3965                            "source-id": 1,
3966                            "direction": "asc",
3967                            "null-order": "nulls-first"
3968                        }
3969                    ]
3970                }
3971            ],
3972            "properties": {},
3973            "current-snapshot-id": -1,
3974            "snapshots": []
3975        }
3976        "#;
3977
3978        let result: Result<TableMetadata, serde_json::Error> = serde_json::from_str(metadata);
3979
3980        // Should fail because sort order ID 0 is reserved for unsorted order and cannot have fields
3981        assert!(
3982            result.is_err(),
3983            "Parsing should fail for sort order ID 0 with fields"
3984        );
3985    }
3986
3987    #[test]
3988    fn test_table_properties_with_defaults() {
3989        use crate::spec::TableProperties;
3990
3991        let schema = Schema::builder()
3992            .with_fields(vec![
3993                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3994            ])
3995            .build()
3996            .unwrap();
3997
3998        let metadata = TableMetadataBuilder::new(
3999            schema,
4000            PartitionSpec::unpartition_spec().into_unbound(),
4001            SortOrder::unsorted_order(),
4002            "s3://test/location".to_string(),
4003            FormatVersion::V2,
4004            HashMap::new(),
4005        )
4006        .unwrap()
4007        .build()
4008        .unwrap()
4009        .metadata;
4010
4011        let props = metadata.table_properties().unwrap();
4012
4013        assert_eq!(
4014            props.commit_num_retries,
4015            TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT
4016        );
4017        assert_eq!(
4018            props.write_target_file_size_bytes,
4019            TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
4020        );
4021    }
4022
4023    #[test]
4024    fn test_table_properties_with_custom_values() {
4025        use crate::spec::TableProperties;
4026
4027        let schema = Schema::builder()
4028            .with_fields(vec![
4029                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
4030            ])
4031            .build()
4032            .unwrap();
4033
4034        let properties = HashMap::from([
4035            (
4036                TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
4037                "10".to_string(),
4038            ),
4039            (
4040                TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
4041                "1024".to_string(),
4042            ),
4043        ]);
4044
4045        let metadata = TableMetadataBuilder::new(
4046            schema,
4047            PartitionSpec::unpartition_spec().into_unbound(),
4048            SortOrder::unsorted_order(),
4049            "s3://test/location".to_string(),
4050            FormatVersion::V2,
4051            properties,
4052        )
4053        .unwrap()
4054        .build()
4055        .unwrap()
4056        .metadata;
4057
4058        let props = metadata.table_properties().unwrap();
4059
4060        assert_eq!(props.commit_num_retries, 10);
4061        assert_eq!(props.write_target_file_size_bytes, 1024);
4062    }
4063
4064    #[test]
4065    fn test_table_properties_with_invalid_value() {
4066        let schema = Schema::builder()
4067            .with_fields(vec![
4068                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
4069            ])
4070            .build()
4071            .unwrap();
4072
4073        let properties = HashMap::from([(
4074            "commit.retry.num-retries".to_string(),
4075            "not_a_number".to_string(),
4076        )]);
4077
4078        let metadata = TableMetadataBuilder::new(
4079            schema,
4080            PartitionSpec::unpartition_spec().into_unbound(),
4081            SortOrder::unsorted_order(),
4082            "s3://test/location".to_string(),
4083            FormatVersion::V2,
4084            properties,
4085        )
4086        .unwrap()
4087        .build()
4088        .unwrap()
4089        .metadata;
4090
4091        let err = metadata.table_properties().unwrap_err();
4092        assert_eq!(err.kind(), ErrorKind::DataInvalid);
4093        assert!(err.message().contains("Invalid table properties"));
4094    }
4095
4096    #[test]
4097    fn test_v2_to_v3_upgrade_preserves_existing_snapshots_without_row_lineage() {
4098        // Create a v2 table metadata
4099        let schema = Schema::builder()
4100            .with_fields(vec![
4101                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
4102            ])
4103            .build()
4104            .unwrap();
4105
4106        let v2_metadata = TableMetadataBuilder::new(
4107            schema,
4108            PartitionSpec::unpartition_spec().into_unbound(),
4109            SortOrder::unsorted_order(),
4110            "s3://bucket/test/location".to_string(),
4111            FormatVersion::V2,
4112            HashMap::new(),
4113        )
4114        .unwrap()
4115        .build()
4116        .unwrap()
4117        .metadata;
4118
4119        // Add a v2 snapshot
4120        let snapshot = Snapshot::builder()
4121            .with_snapshot_id(1)
4122            .with_timestamp_ms(v2_metadata.last_updated_ms + 1)
4123            .with_sequence_number(1)
4124            .with_schema_id(0)
4125            .with_manifest_list("s3://bucket/test/metadata/snap-1.avro")
4126            .with_summary(Summary {
4127                operation: Operation::Append,
4128                additional_properties: HashMap::from([(
4129                    "added-data-files".to_string(),
4130                    "1".to_string(),
4131                )]),
4132            })
4133            .build();
4134
4135        let v2_with_snapshot = v2_metadata
4136            .into_builder(Some("s3://bucket/test/metadata/v00001.json".to_string()))
4137            .add_snapshot(snapshot)
4138            .unwrap()
4139            .set_ref("main", SnapshotReference {
4140                snapshot_id: 1,
4141                retention: SnapshotRetention::Branch {
4142                    min_snapshots_to_keep: None,
4143                    max_snapshot_age_ms: None,
4144                    max_ref_age_ms: None,
4145                },
4146            })
4147            .unwrap()
4148            .build()
4149            .unwrap()
4150            .metadata;
4151
4152        // Verify v2 serialization works fine
4153        let v2_json = serde_json::to_string(&v2_with_snapshot);
4154        assert!(v2_json.is_ok(), "v2 serialization should work");
4155
4156        // Upgrade to v3
4157        let v3_metadata = v2_with_snapshot
4158            .into_builder(Some("s3://bucket/test/metadata/v00002.json".to_string()))
4159            .upgrade_format_version(FormatVersion::V3)
4160            .unwrap()
4161            .build()
4162            .unwrap()
4163            .metadata;
4164
4165        assert_eq!(v3_metadata.format_version, FormatVersion::V3);
4166        assert_eq!(v3_metadata.next_row_id, INITIAL_ROW_ID);
4167        assert_eq!(v3_metadata.snapshots.len(), 1);
4168
4169        // Verify the snapshot has no row_range
4170        let snapshot = v3_metadata.snapshots.values().next().unwrap();
4171        assert!(
4172            snapshot.row_range().is_none(),
4173            "Snapshot should have no row_range after upgrade"
4174        );
4175
4176        // Try to serialize v3 metadata - this should now work
4177        let v3_json = serde_json::to_string(&v3_metadata);
4178        assert!(
4179            v3_json.is_ok(),
4180            "v3 serialization should work for upgraded tables"
4181        );
4182
4183        // Verify we can deserialize it back
4184        let deserialized: TableMetadata = serde_json::from_str(&v3_json.unwrap()).unwrap();
4185        assert_eq!(deserialized.format_version, FormatVersion::V3);
4186        assert_eq!(deserialized.snapshots.len(), 1);
4187
4188        // Verify the deserialized snapshot still has no row_range
4189        let deserialized_snapshot = deserialized.snapshots.values().next().unwrap();
4190        assert!(
4191            deserialized_snapshot.row_range().is_none(),
4192            "Deserialized snapshot should have no row_range"
4193        );
4194    }
4195
4196    #[test]
4197    fn test_v3_snapshot_with_row_lineage_serialization() {
4198        // Create a v3 table metadata
4199        let schema = Schema::builder()
4200            .with_fields(vec![
4201                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
4202            ])
4203            .build()
4204            .unwrap();
4205
4206        let v3_metadata = TableMetadataBuilder::new(
4207            schema,
4208            PartitionSpec::unpartition_spec().into_unbound(),
4209            SortOrder::unsorted_order(),
4210            "s3://bucket/test/location".to_string(),
4211            FormatVersion::V3,
4212            HashMap::new(),
4213        )
4214        .unwrap()
4215        .build()
4216        .unwrap()
4217        .metadata;
4218
4219        // Add a v3 snapshot with row lineage
4220        let snapshot = Snapshot::builder()
4221            .with_snapshot_id(1)
4222            .with_timestamp_ms(v3_metadata.last_updated_ms + 1)
4223            .with_sequence_number(1)
4224            .with_schema_id(0)
4225            .with_manifest_list("s3://bucket/test/metadata/snap-1.avro")
4226            .with_summary(Summary {
4227                operation: Operation::Append,
4228                additional_properties: HashMap::from([(
4229                    "added-data-files".to_string(),
4230                    "1".to_string(),
4231                )]),
4232            })
4233            .with_row_range(100, 50) // first_row_id=100, added_rows=50
4234            .build();
4235
4236        let v3_with_snapshot = v3_metadata
4237            .into_builder(Some("s3://bucket/test/metadata/v00001.json".to_string()))
4238            .add_snapshot(snapshot)
4239            .unwrap()
4240            .set_ref("main", SnapshotReference {
4241                snapshot_id: 1,
4242                retention: SnapshotRetention::Branch {
4243                    min_snapshots_to_keep: None,
4244                    max_snapshot_age_ms: None,
4245                    max_ref_age_ms: None,
4246                },
4247            })
4248            .unwrap()
4249            .build()
4250            .unwrap()
4251            .metadata;
4252
4253        // Verify the snapshot has row_range
4254        let snapshot = v3_with_snapshot.snapshots.values().next().unwrap();
4255        assert!(
4256            snapshot.row_range().is_some(),
4257            "Snapshot should have row_range"
4258        );
4259        let (first_row_id, added_rows) = snapshot.row_range().unwrap();
4260        assert_eq!(first_row_id, 100);
4261        assert_eq!(added_rows, 50);
4262
4263        // Serialize v3 metadata - this should work
4264        let v3_json = serde_json::to_string(&v3_with_snapshot);
4265        assert!(
4266            v3_json.is_ok(),
4267            "v3 serialization should work for snapshots with row lineage"
4268        );
4269
4270        // Verify we can deserialize it back
4271        let deserialized: TableMetadata = serde_json::from_str(&v3_json.unwrap()).unwrap();
4272        assert_eq!(deserialized.format_version, FormatVersion::V3);
4273        assert_eq!(deserialized.snapshots.len(), 1);
4274
4275        // Verify the deserialized snapshot has the correct row_range
4276        let deserialized_snapshot = deserialized.snapshots.values().next().unwrap();
4277        assert!(
4278            deserialized_snapshot.row_range().is_some(),
4279            "Deserialized snapshot should have row_range"
4280        );
4281        let (deserialized_first_row_id, deserialized_added_rows) =
4282            deserialized_snapshot.row_range().unwrap();
4283        assert_eq!(deserialized_first_row_id, 100);
4284        assert_eq!(deserialized_added_rows, 50);
4285    }
4286}