iceberg/spec/
table_metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the [table metadata](https://iceberg.apache.org/spec/#table-metadata).
19//! The main struct here is [TableMetadataV2] which defines the data for a table.
20
21use std::cmp::Ordering;
22use std::collections::HashMap;
23use std::fmt::{Display, Formatter};
24use std::hash::Hash;
25use std::io::Read as _;
26use std::sync::Arc;
27
28use _serde::TableMetadataEnum;
29use chrono::{DateTime, Utc};
30use flate2::read::GzDecoder;
31use serde::{Deserialize, Serialize};
32use serde_repr::{Deserialize_repr, Serialize_repr};
33use uuid::Uuid;
34
35use super::snapshot::SnapshotReference;
36pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder};
37use super::{
38    DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef,
39    SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType,
40    TableProperties,
41};
42use crate::error::{Result, timestamp_ms_to_utc};
43use crate::io::FileIO;
44use crate::spec::EncryptedKey;
45use crate::{Error, ErrorKind};
46
47static MAIN_BRANCH: &str = "main";
48pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
49
50pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
51pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
52
53/// Initial row id for row lineage for new v3 tables and older tables upgrading to v3.
54pub const INITIAL_ROW_ID: u64 = 0;
55/// Minimum format version that supports row lineage (v3).
56pub const MIN_FORMAT_VERSION_ROW_LINEAGE: FormatVersion = FormatVersion::V3;
57/// Reference to [`TableMetadata`].
58pub type TableMetadataRef = Arc<TableMetadata>;
59
60#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
61#[serde(try_from = "TableMetadataEnum")]
62/// Fields for the version 2 of the table metadata.
63///
64/// We assume that this data structure is always valid, so we will panic when invalid error happens.
65/// We check the validity of this data structure when constructing.
66pub struct TableMetadata {
67    /// Integer Version for the format.
68    pub(crate) format_version: FormatVersion,
69    /// A UUID that identifies the table
70    pub(crate) table_uuid: Uuid,
71    /// Location tables base location
72    pub(crate) location: String,
73    /// The tables highest sequence number
74    pub(crate) last_sequence_number: i64,
75    /// Timestamp in milliseconds from the unix epoch when the table was last updated.
76    pub(crate) last_updated_ms: i64,
77    /// An integer; the highest assigned column ID for the table.
78    pub(crate) last_column_id: i32,
79    /// A list of schemas, stored as objects with schema-id.
80    pub(crate) schemas: HashMap<i32, SchemaRef>,
81    /// ID of the table’s current schema.
82    pub(crate) current_schema_id: i32,
83    /// A list of partition specs, stored as full partition spec objects.
84    pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
85    /// ID of the “current” spec that writers should use by default.
86    pub(crate) default_spec: PartitionSpecRef,
87    /// Partition type of the default partition spec.
88    pub(crate) default_partition_type: StructType,
89    /// An integer; the highest assigned partition field ID across all partition specs for the table.
90    pub(crate) last_partition_id: i32,
91    ///A string to string map of table properties. This is used to control settings that
92    /// affect reading and writing and is not intended to be used for arbitrary metadata.
93    /// For example, commit.retry.num-retries is used to control the number of commit retries.
94    pub(crate) properties: HashMap<String, String>,
95    /// long ID of the current table snapshot; must be the same as the current
96    /// ID of the main branch in refs.
97    pub(crate) current_snapshot_id: Option<i64>,
98    ///A list of valid snapshots. Valid snapshots are snapshots for which all
99    /// data files exist in the file system. A data file must not be deleted
100    /// from the file system until the last snapshot in which it was listed is
101    /// garbage collected.
102    pub(crate) snapshots: HashMap<i64, SnapshotRef>,
103    /// A list (optional) of timestamp and snapshot ID pairs that encodes changes
104    /// to the current snapshot for the table. Each time the current-snapshot-id
105    /// is changed, a new entry should be added with the last-updated-ms
106    /// and the new current-snapshot-id. When snapshots are expired from
107    /// the list of valid snapshots, all entries before a snapshot that has
108    /// expired should be removed.
109    pub(crate) snapshot_log: Vec<SnapshotLog>,
110
111    /// A list (optional) of timestamp and metadata file location pairs
112    /// that encodes changes to the previous metadata files for the table.
113    /// Each time a new metadata file is created, a new entry of the
114    /// previous metadata file location should be added to the list.
115    /// Tables can be configured to remove the oldest metadata log entries and
116    /// keep a fixed-size log of the most recent entries after a commit.
117    pub(crate) metadata_log: Vec<MetadataLog>,
118
119    /// A list of sort orders, stored as full sort order objects.
120    pub(crate) sort_orders: HashMap<i64, SortOrderRef>,
121    /// Default sort order id of the table. Note that this could be used by
122    /// writers, but is not used when reading because reads use the specs
123    /// stored in manifest files.
124    pub(crate) default_sort_order_id: i64,
125    /// A map of snapshot references. The map keys are the unique snapshot reference
126    /// names in the table, and the map values are snapshot reference objects.
127    /// There is always a main branch reference pointing to the current-snapshot-id
128    /// even if the refs map is null.
129    pub(crate) refs: HashMap<String, SnapshotReference>,
130    /// Mapping of snapshot ids to statistics files.
131    pub(crate) statistics: HashMap<i64, StatisticsFile>,
132    /// Mapping of snapshot ids to partition statistics files.
133    pub(crate) partition_statistics: HashMap<i64, PartitionStatisticsFile>,
134    /// Encryption Keys - map of key id to the actual key
135    pub(crate) encryption_keys: HashMap<String, EncryptedKey>,
136    /// Next row id to be assigned for Row Lineage (v3)
137    pub(crate) next_row_id: u64,
138}
139
140impl TableMetadata {
141    /// Convert this Table Metadata into a builder for modification.
142    ///
143    /// `current_file_location` is the location where the current version
144    /// of the metadata file is stored. This is used to update the metadata log.
145    /// If `current_file_location` is `None`, the metadata log will not be updated.
146    /// This should only be used to stage-create tables.
147    #[must_use]
148    pub fn into_builder(self, current_file_location: Option<String>) -> TableMetadataBuilder {
149        TableMetadataBuilder::new_from_metadata(self, current_file_location)
150    }
151
152    /// Check if a partition field name exists in any partition spec.
153    #[inline]
154    pub(crate) fn partition_name_exists(&self, name: &str) -> bool {
155        self.partition_specs
156            .values()
157            .any(|spec| spec.fields().iter().any(|pf| pf.name == name))
158    }
159
160    /// Check if a field name exists in any schema.
161    #[inline]
162    pub(crate) fn name_exists_in_any_schema(&self, name: &str) -> bool {
163        self.schemas
164            .values()
165            .any(|schema| schema.field_by_name(name).is_some())
166    }
167
168    /// Returns format version of this metadata.
169    #[inline]
170    pub fn format_version(&self) -> FormatVersion {
171        self.format_version
172    }
173
174    /// Returns uuid of current table.
175    #[inline]
176    pub fn uuid(&self) -> Uuid {
177        self.table_uuid
178    }
179
180    /// Returns table location.
181    #[inline]
182    pub fn location(&self) -> &str {
183        self.location.as_str()
184    }
185
186    /// Returns last sequence number.
187    #[inline]
188    pub fn last_sequence_number(&self) -> i64 {
189        self.last_sequence_number
190    }
191
192    /// Returns the next sequence number for the table.
193    ///
194    /// For format version 1, it always returns the initial sequence number.
195    /// For other versions, it returns the last sequence number incremented by 1.
196    #[inline]
197    pub fn next_sequence_number(&self) -> i64 {
198        match self.format_version {
199            FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER,
200            _ => self.last_sequence_number + 1,
201        }
202    }
203
204    /// Returns the last column id.
205    #[inline]
206    pub fn last_column_id(&self) -> i32 {
207        self.last_column_id
208    }
209
210    /// Returns the last partition_id
211    #[inline]
212    pub fn last_partition_id(&self) -> i32 {
213        self.last_partition_id
214    }
215
216    /// Returns last updated time.
217    #[inline]
218    pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
219        timestamp_ms_to_utc(self.last_updated_ms)
220    }
221
222    /// Returns last updated time in milliseconds.
223    #[inline]
224    pub fn last_updated_ms(&self) -> i64 {
225        self.last_updated_ms
226    }
227
228    /// Returns schemas
229    #[inline]
230    pub fn schemas_iter(&self) -> impl ExactSizeIterator<Item = &SchemaRef> {
231        self.schemas.values()
232    }
233
234    /// Lookup schema by id.
235    #[inline]
236    pub fn schema_by_id(&self, schema_id: SchemaId) -> Option<&SchemaRef> {
237        self.schemas.get(&schema_id)
238    }
239
240    /// Get current schema
241    #[inline]
242    pub fn current_schema(&self) -> &SchemaRef {
243        self.schema_by_id(self.current_schema_id)
244            .expect("Current schema id set, but not found in table metadata")
245    }
246
247    /// Get the id of the current schema
248    #[inline]
249    pub fn current_schema_id(&self) -> SchemaId {
250        self.current_schema_id
251    }
252
253    /// Returns all partition specs.
254    #[inline]
255    pub fn partition_specs_iter(&self) -> impl ExactSizeIterator<Item = &PartitionSpecRef> {
256        self.partition_specs.values()
257    }
258
259    /// Lookup partition spec by id.
260    #[inline]
261    pub fn partition_spec_by_id(&self, spec_id: i32) -> Option<&PartitionSpecRef> {
262        self.partition_specs.get(&spec_id)
263    }
264
265    /// Get default partition spec
266    #[inline]
267    pub fn default_partition_spec(&self) -> &PartitionSpecRef {
268        &self.default_spec
269    }
270
271    /// Return the partition type of the default partition spec.
272    #[inline]
273    pub fn default_partition_type(&self) -> &StructType {
274        &self.default_partition_type
275    }
276
277    #[inline]
278    /// Returns spec id of the "current" partition spec.
279    pub fn default_partition_spec_id(&self) -> i32 {
280        self.default_spec.spec_id()
281    }
282
283    /// Returns all snapshots
284    #[inline]
285    pub fn snapshots(&self) -> impl ExactSizeIterator<Item = &SnapshotRef> {
286        self.snapshots.values()
287    }
288
289    /// Lookup snapshot by id.
290    #[inline]
291    pub fn snapshot_by_id(&self, snapshot_id: i64) -> Option<&SnapshotRef> {
292        self.snapshots.get(&snapshot_id)
293    }
294
295    /// Returns snapshot history.
296    #[inline]
297    pub fn history(&self) -> &[SnapshotLog] {
298        &self.snapshot_log
299    }
300
301    /// Returns the metadata log.
302    #[inline]
303    pub fn metadata_log(&self) -> &[MetadataLog] {
304        &self.metadata_log
305    }
306
307    /// Get current snapshot
308    #[inline]
309    pub fn current_snapshot(&self) -> Option<&SnapshotRef> {
310        self.current_snapshot_id.map(|s| {
311            self.snapshot_by_id(s)
312                .expect("Current snapshot id has been set, but doesn't exist in metadata")
313        })
314    }
315
316    /// Get the current snapshot id
317    #[inline]
318    pub fn current_snapshot_id(&self) -> Option<i64> {
319        self.current_snapshot_id
320    }
321
322    /// Get the snapshot for a reference
323    /// Returns an option if the `ref_name` is not found
324    #[inline]
325    pub fn snapshot_for_ref(&self, ref_name: &str) -> Option<&SnapshotRef> {
326        self.refs.get(ref_name).map(|r| {
327            self.snapshot_by_id(r.snapshot_id)
328                .unwrap_or_else(|| panic!("Snapshot id of ref {ref_name} doesn't exist"))
329        })
330    }
331
332    /// Return all sort orders.
333    #[inline]
334    pub fn sort_orders_iter(&self) -> impl ExactSizeIterator<Item = &SortOrderRef> {
335        self.sort_orders.values()
336    }
337
338    /// Lookup sort order by id.
339    #[inline]
340    pub fn sort_order_by_id(&self, sort_order_id: i64) -> Option<&SortOrderRef> {
341        self.sort_orders.get(&sort_order_id)
342    }
343
344    /// Returns default sort order id.
345    #[inline]
346    pub fn default_sort_order(&self) -> &SortOrderRef {
347        self.sort_orders
348            .get(&self.default_sort_order_id)
349            .expect("Default order id has been set, but not found in table metadata!")
350    }
351
352    /// Returns default sort order id.
353    #[inline]
354    pub fn default_sort_order_id(&self) -> i64 {
355        self.default_sort_order_id
356    }
357
358    /// Returns properties of table.
359    #[inline]
360    pub fn properties(&self) -> &HashMap<String, String> {
361        &self.properties
362    }
363
364    /// Returns typed table properties parsed from the raw properties map with defaults.
365    pub fn table_properties(&self) -> Result<TableProperties> {
366        TableProperties::try_from(&self.properties).map_err(|e| {
367            Error::new(ErrorKind::DataInvalid, "Invalid table properties").with_source(e)
368        })
369    }
370
371    /// Return location of statistics files.
372    #[inline]
373    pub fn statistics_iter(&self) -> impl ExactSizeIterator<Item = &StatisticsFile> {
374        self.statistics.values()
375    }
376
377    /// Return location of partition statistics files.
378    #[inline]
379    pub fn partition_statistics_iter(
380        &self,
381    ) -> impl ExactSizeIterator<Item = &PartitionStatisticsFile> {
382        self.partition_statistics.values()
383    }
384
385    /// Get a statistics file for a snapshot id.
386    #[inline]
387    pub fn statistics_for_snapshot(&self, snapshot_id: i64) -> Option<&StatisticsFile> {
388        self.statistics.get(&snapshot_id)
389    }
390
391    /// Get a partition statistics file for a snapshot id.
392    #[inline]
393    pub fn partition_statistics_for_snapshot(
394        &self,
395        snapshot_id: i64,
396    ) -> Option<&PartitionStatisticsFile> {
397        self.partition_statistics.get(&snapshot_id)
398    }
399
400    fn construct_refs(&mut self) {
401        if let Some(current_snapshot_id) = self.current_snapshot_id
402            && !self.refs.contains_key(MAIN_BRANCH)
403        {
404            self.refs
405                .insert(MAIN_BRANCH.to_string(), SnapshotReference {
406                    snapshot_id: current_snapshot_id,
407                    retention: SnapshotRetention::Branch {
408                        min_snapshots_to_keep: None,
409                        max_snapshot_age_ms: None,
410                        max_ref_age_ms: None,
411                    },
412                });
413        }
414    }
415
416    /// Iterate over all encryption keys
417    #[inline]
418    pub fn encryption_keys_iter(&self) -> impl ExactSizeIterator<Item = &EncryptedKey> {
419        self.encryption_keys.values()
420    }
421
422    /// Get the encryption key for a given key id
423    #[inline]
424    pub fn encryption_key(&self, key_id: &str) -> Option<&EncryptedKey> {
425        self.encryption_keys.get(key_id)
426    }
427
428    /// Get the next row id to be assigned
429    #[inline]
430    pub fn next_row_id(&self) -> u64 {
431        self.next_row_id
432    }
433
434    /// Read table metadata from the given location.
435    pub async fn read_from(
436        file_io: &FileIO,
437        metadata_location: impl AsRef<str>,
438    ) -> Result<TableMetadata> {
439        let metadata_location = metadata_location.as_ref();
440        let input_file = file_io.new_input(metadata_location)?;
441        let metadata_content = input_file.read().await?;
442
443        // Check if the file is compressed by looking for the gzip "magic number".
444        let metadata = if metadata_content.len() > 2
445            && metadata_content[0] == 0x1F
446            && metadata_content[1] == 0x8B
447        {
448            let mut decoder = GzDecoder::new(metadata_content.as_ref());
449            let mut decompressed_data = Vec::new();
450            decoder.read_to_end(&mut decompressed_data).map_err(|e| {
451                Error::new(
452                    ErrorKind::DataInvalid,
453                    "Trying to read compressed metadata file",
454                )
455                .with_context("file_path", metadata_location)
456                .with_source(e)
457            })?;
458            serde_json::from_slice(&decompressed_data)?
459        } else {
460            serde_json::from_slice(&metadata_content)?
461        };
462
463        Ok(metadata)
464    }
465
466    /// Write table metadata to the given location.
467    pub async fn write_to(
468        &self,
469        file_io: &FileIO,
470        metadata_location: impl AsRef<str>,
471    ) -> Result<()> {
472        file_io
473            .new_output(metadata_location)?
474            .write(serde_json::to_vec(self)?.into())
475            .await
476    }
477
478    /// Normalize this partition spec.
479    ///
480    /// This is an internal method
481    /// meant to be called after constructing table metadata from untrusted sources.
482    /// We run this method after json deserialization.
483    /// All constructors for `TableMetadata` which are part of `iceberg-rust`
484    /// should return normalized `TableMetadata`.
485    pub(super) fn try_normalize(&mut self) -> Result<&mut Self> {
486        self.validate_current_schema()?;
487        self.normalize_current_snapshot()?;
488        self.construct_refs();
489        self.validate_refs()?;
490        self.validate_chronological_snapshot_logs()?;
491        self.validate_chronological_metadata_logs()?;
492        // Normalize location (remove trailing slash)
493        self.location = self.location.trim_end_matches('/').to_string();
494        self.validate_snapshot_sequence_number()?;
495        self.try_normalize_partition_spec()?;
496        self.try_normalize_sort_order()?;
497        Ok(self)
498    }
499
500    /// If the default partition spec is not present in specs, add it
501    fn try_normalize_partition_spec(&mut self) -> Result<()> {
502        if self
503            .partition_spec_by_id(self.default_spec.spec_id())
504            .is_none()
505        {
506            self.partition_specs.insert(
507                self.default_spec.spec_id(),
508                Arc::new(Arc::unwrap_or_clone(self.default_spec.clone())),
509            );
510        }
511
512        Ok(())
513    }
514
515    /// If the default sort order is unsorted but the sort order is not present, add it
516    fn try_normalize_sort_order(&mut self) -> Result<()> {
517        // Validate that sort order ID 0 (reserved for unsorted) has no fields
518        if let Some(sort_order) = self.sort_order_by_id(SortOrder::UNSORTED_ORDER_ID)
519            && !sort_order.fields.is_empty()
520        {
521            return Err(Error::new(
522                ErrorKind::Unexpected,
523                format!(
524                    "Sort order ID {} is reserved for unsorted order",
525                    SortOrder::UNSORTED_ORDER_ID
526                ),
527            ));
528        }
529
530        if self.sort_order_by_id(self.default_sort_order_id).is_some() {
531            return Ok(());
532        }
533
534        if self.default_sort_order_id != SortOrder::UNSORTED_ORDER_ID {
535            return Err(Error::new(
536                ErrorKind::DataInvalid,
537                format!(
538                    "No sort order exists with the default sort order id {}.",
539                    self.default_sort_order_id
540                ),
541            ));
542        }
543
544        let sort_order = SortOrder::unsorted_order();
545        self.sort_orders
546            .insert(SortOrder::UNSORTED_ORDER_ID, Arc::new(sort_order));
547        Ok(())
548    }
549
550    /// Validate the current schema is set and exists.
551    fn validate_current_schema(&self) -> Result<()> {
552        if self.schema_by_id(self.current_schema_id).is_none() {
553            return Err(Error::new(
554                ErrorKind::DataInvalid,
555                format!(
556                    "No schema exists with the current schema id {}.",
557                    self.current_schema_id
558                ),
559            ));
560        }
561        Ok(())
562    }
563
564    /// If current snapshot is Some(-1) then set it to None.
565    fn normalize_current_snapshot(&mut self) -> Result<()> {
566        if let Some(current_snapshot_id) = self.current_snapshot_id {
567            if current_snapshot_id == EMPTY_SNAPSHOT_ID {
568                self.current_snapshot_id = None;
569            } else if self.snapshot_by_id(current_snapshot_id).is_none() {
570                return Err(Error::new(
571                    ErrorKind::DataInvalid,
572                    format!(
573                        "Snapshot for current snapshot id {current_snapshot_id} does not exist in the existing snapshots list"
574                    ),
575                ));
576            }
577        }
578        Ok(())
579    }
580
581    /// Validate that all refs are valid (snapshot exists)
582    fn validate_refs(&self) -> Result<()> {
583        for (name, snapshot_ref) in self.refs.iter() {
584            if self.snapshot_by_id(snapshot_ref.snapshot_id).is_none() {
585                return Err(Error::new(
586                    ErrorKind::DataInvalid,
587                    format!(
588                        "Snapshot for reference {name} does not exist in the existing snapshots list"
589                    ),
590                ));
591            }
592        }
593
594        let main_ref = self.refs.get(MAIN_BRANCH);
595        if self.current_snapshot_id.is_some() {
596            if let Some(main_ref) = main_ref
597                && main_ref.snapshot_id != self.current_snapshot_id.unwrap_or_default()
598            {
599                return Err(Error::new(
600                    ErrorKind::DataInvalid,
601                    format!(
602                        "Current snapshot id does not match main branch ({:?} != {:?})",
603                        self.current_snapshot_id.unwrap_or_default(),
604                        main_ref.snapshot_id
605                    ),
606                ));
607            }
608        } else if main_ref.is_some() {
609            return Err(Error::new(
610                ErrorKind::DataInvalid,
611                "Current snapshot is not set, but main branch exists",
612            ));
613        }
614
615        Ok(())
616    }
617
618    /// Validate that for V1 Metadata the last_sequence_number is 0
619    fn validate_snapshot_sequence_number(&self) -> Result<()> {
620        if self.format_version < FormatVersion::V2 && self.last_sequence_number != 0 {
621            return Err(Error::new(
622                ErrorKind::DataInvalid,
623                format!(
624                    "Last sequence number must be 0 in v1. Found {}",
625                    self.last_sequence_number
626                ),
627            ));
628        }
629
630        if self.format_version >= FormatVersion::V2
631            && let Some(snapshot) = self
632                .snapshots
633                .values()
634                .find(|snapshot| snapshot.sequence_number() > self.last_sequence_number)
635        {
636            return Err(Error::new(
637                ErrorKind::DataInvalid,
638                format!(
639                    "Invalid snapshot with id {} and sequence number {} greater than last sequence number {}",
640                    snapshot.snapshot_id(),
641                    snapshot.sequence_number(),
642                    self.last_sequence_number
643                ),
644            ));
645        }
646
647        Ok(())
648    }
649
650    /// Validate snapshots logs are chronological and last updated is after the last snapshot log.
651    fn validate_chronological_snapshot_logs(&self) -> Result<()> {
652        for window in self.snapshot_log.windows(2) {
653            let (prev, curr) = (&window[0], &window[1]);
654            // commits can happen concurrently from different machines.
655            // A tolerance helps us avoid failure for small clock skew
656            if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
657                return Err(Error::new(
658                    ErrorKind::DataInvalid,
659                    "Expected sorted snapshot log entries",
660                ));
661            }
662        }
663
664        if let Some(last) = self.snapshot_log.last() {
665            // commits can happen concurrently from different machines.
666            // A tolerance helps us avoid failure for small clock skew
667            if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
668                return Err(Error::new(
669                    ErrorKind::DataInvalid,
670                    format!(
671                        "Invalid update timestamp {}: before last snapshot log entry at {}",
672                        self.last_updated_ms, last.timestamp_ms
673                    ),
674                ));
675            }
676        }
677        Ok(())
678    }
679
680    fn validate_chronological_metadata_logs(&self) -> Result<()> {
681        for window in self.metadata_log.windows(2) {
682            let (prev, curr) = (&window[0], &window[1]);
683            // commits can happen concurrently from different machines.
684            // A tolerance helps us avoid failure for small clock skew
685            if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS {
686                return Err(Error::new(
687                    ErrorKind::DataInvalid,
688                    "Expected sorted metadata log entries",
689                ));
690            }
691        }
692
693        if let Some(last) = self.metadata_log.last() {
694            // commits can happen concurrently from different machines.
695            // A tolerance helps us avoid failure for small clock skew
696            if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS {
697                return Err(Error::new(
698                    ErrorKind::DataInvalid,
699                    format!(
700                        "Invalid update timestamp {}: before last metadata log entry at {}",
701                        self.last_updated_ms, last.timestamp_ms
702                    ),
703                ));
704            }
705        }
706
707        Ok(())
708    }
709}
710
711pub(super) mod _serde {
712    use std::borrow::BorrowMut;
713    /// This is a helper module that defines types to help with serialization/deserialization.
714    /// For deserialization the input first gets read into either the [TableMetadataV1] or [TableMetadataV2] struct
715    /// and then converted into the [TableMetadata] struct. Serialization works the other way around.
716    /// [TableMetadataV1] and [TableMetadataV2] are internal struct that are only used for serialization and deserialization.
717    use std::collections::HashMap;
718    /// This is a helper module that defines types to help with serialization/deserialization.
719    /// For deserialization the input first gets read into either the [TableMetadataV1] or [TableMetadataV2] struct
720    /// and then converted into the [TableMetadata] struct. Serialization works the other way around.
721    /// [TableMetadataV1] and [TableMetadataV2] are internal struct that are only used for serialization and deserialization.
722    use std::sync::Arc;
723
724    use serde::{Deserialize, Serialize};
725    use uuid::Uuid;
726
727    use super::{
728        DEFAULT_PARTITION_SPEC_ID, FormatVersion, MAIN_BRANCH, MetadataLog, SnapshotLog,
729        TableMetadata,
730    };
731    use crate::spec::schema::_serde::{SchemaV1, SchemaV2};
732    use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2, SnapshotV3};
733    use crate::spec::{
734        EncryptedKey, INITIAL_ROW_ID, PartitionField, PartitionSpec, PartitionSpecRef,
735        PartitionStatisticsFile, Schema, SchemaRef, Snapshot, SnapshotReference, SnapshotRetention,
736        SortOrder, StatisticsFile,
737    };
738    use crate::{Error, ErrorKind};
739
740    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
741    #[serde(untagged)]
742    pub(super) enum TableMetadataEnum {
743        V3(TableMetadataV3),
744        V2(TableMetadataV2),
745        V1(TableMetadataV1),
746    }
747
748    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
749    #[serde(rename_all = "kebab-case")]
750    /// Defines the structure of a v2 table metadata for serialization/deserialization
751    pub(super) struct TableMetadataV3 {
752        pub format_version: VersionNumber<3>,
753        #[serde(flatten)]
754        pub shared: TableMetadataV2V3Shared,
755        pub next_row_id: u64,
756        #[serde(skip_serializing_if = "Option::is_none")]
757        pub encryption_keys: Option<Vec<EncryptedKey>>,
758        #[serde(skip_serializing_if = "Option::is_none")]
759        pub snapshots: Option<Vec<SnapshotV3>>,
760    }
761
762    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
763    #[serde(rename_all = "kebab-case")]
764    /// Defines the structure of a v2 table metadata for serialization/deserialization
765    pub(super) struct TableMetadataV2V3Shared {
766        pub table_uuid: Uuid,
767        pub location: String,
768        pub last_sequence_number: i64,
769        pub last_updated_ms: i64,
770        pub last_column_id: i32,
771        pub schemas: Vec<SchemaV2>,
772        pub current_schema_id: i32,
773        pub partition_specs: Vec<PartitionSpec>,
774        pub default_spec_id: i32,
775        pub last_partition_id: i32,
776        #[serde(skip_serializing_if = "Option::is_none")]
777        pub properties: Option<HashMap<String, String>>,
778        #[serde(skip_serializing_if = "Option::is_none")]
779        pub current_snapshot_id: Option<i64>,
780        #[serde(skip_serializing_if = "Option::is_none")]
781        pub snapshot_log: Option<Vec<SnapshotLog>>,
782        #[serde(skip_serializing_if = "Option::is_none")]
783        pub metadata_log: Option<Vec<MetadataLog>>,
784        pub sort_orders: Vec<SortOrder>,
785        pub default_sort_order_id: i64,
786        #[serde(skip_serializing_if = "Option::is_none")]
787        pub refs: Option<HashMap<String, SnapshotReference>>,
788        #[serde(default, skip_serializing_if = "Vec::is_empty")]
789        pub statistics: Vec<StatisticsFile>,
790        #[serde(default, skip_serializing_if = "Vec::is_empty")]
791        pub partition_statistics: Vec<PartitionStatisticsFile>,
792    }
793
794    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
795    #[serde(rename_all = "kebab-case")]
796    /// Defines the structure of a v2 table metadata for serialization/deserialization
797    pub(super) struct TableMetadataV2 {
798        pub format_version: VersionNumber<2>,
799        #[serde(flatten)]
800        pub shared: TableMetadataV2V3Shared,
801        #[serde(skip_serializing_if = "Option::is_none")]
802        pub snapshots: Option<Vec<SnapshotV2>>,
803    }
804
805    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
806    #[serde(rename_all = "kebab-case")]
807    /// Defines the structure of a v1 table metadata for serialization/deserialization
808    pub(super) struct TableMetadataV1 {
809        pub format_version: VersionNumber<1>,
810        #[serde(skip_serializing_if = "Option::is_none")]
811        pub table_uuid: Option<Uuid>,
812        pub location: String,
813        pub last_updated_ms: i64,
814        pub last_column_id: i32,
815        /// `schema` is optional to prioritize `schemas` and `current-schema-id`, allowing liberal reading of V1 metadata.
816        pub schema: Option<SchemaV1>,
817        #[serde(skip_serializing_if = "Option::is_none")]
818        pub schemas: Option<Vec<SchemaV1>>,
819        #[serde(skip_serializing_if = "Option::is_none")]
820        pub current_schema_id: Option<i32>,
821        /// `partition_spec` is optional to prioritize `partition_specs`, aligning with liberal reading of potentially invalid V1 metadata.
822        pub partition_spec: Option<Vec<PartitionField>>,
823        #[serde(skip_serializing_if = "Option::is_none")]
824        pub partition_specs: Option<Vec<PartitionSpec>>,
825        #[serde(skip_serializing_if = "Option::is_none")]
826        pub default_spec_id: Option<i32>,
827        #[serde(skip_serializing_if = "Option::is_none")]
828        pub last_partition_id: Option<i32>,
829        #[serde(skip_serializing_if = "Option::is_none")]
830        pub properties: Option<HashMap<String, String>>,
831        #[serde(skip_serializing_if = "Option::is_none")]
832        pub current_snapshot_id: Option<i64>,
833        #[serde(skip_serializing_if = "Option::is_none")]
834        pub snapshots: Option<Vec<SnapshotV1>>,
835        #[serde(skip_serializing_if = "Option::is_none")]
836        pub snapshot_log: Option<Vec<SnapshotLog>>,
837        #[serde(skip_serializing_if = "Option::is_none")]
838        pub metadata_log: Option<Vec<MetadataLog>>,
839        pub sort_orders: Option<Vec<SortOrder>>,
840        pub default_sort_order_id: Option<i64>,
841        #[serde(default, skip_serializing_if = "Vec::is_empty")]
842        pub statistics: Vec<StatisticsFile>,
843        #[serde(default, skip_serializing_if = "Vec::is_empty")]
844        pub partition_statistics: Vec<PartitionStatisticsFile>,
845    }
846
847    /// Helper to serialize and deserialize the format version.
848    #[derive(Debug, PartialEq, Eq)]
849    pub(crate) struct VersionNumber<const V: u8>;
850
851    impl Serialize for TableMetadata {
852        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
853        where S: serde::Serializer {
854            // we must do a clone here
855            let table_metadata_enum: TableMetadataEnum =
856                self.clone().try_into().map_err(serde::ser::Error::custom)?;
857
858            table_metadata_enum.serialize(serializer)
859        }
860    }
861
862    impl<const V: u8> Serialize for VersionNumber<V> {
863        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
864        where S: serde::Serializer {
865            serializer.serialize_u8(V)
866        }
867    }
868
869    impl<'de, const V: u8> Deserialize<'de> for VersionNumber<V> {
870        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
871        where D: serde::Deserializer<'de> {
872            let value = u8::deserialize(deserializer)?;
873            if value == V {
874                Ok(VersionNumber::<V>)
875            } else {
876                Err(serde::de::Error::custom("Invalid Version"))
877            }
878        }
879    }
880
881    impl TryFrom<TableMetadataEnum> for TableMetadata {
882        type Error = Error;
883        fn try_from(value: TableMetadataEnum) -> Result<Self, Error> {
884            match value {
885                TableMetadataEnum::V3(value) => value.try_into(),
886                TableMetadataEnum::V2(value) => value.try_into(),
887                TableMetadataEnum::V1(value) => value.try_into(),
888            }
889        }
890    }
891
892    impl TryFrom<TableMetadata> for TableMetadataEnum {
893        type Error = Error;
894        fn try_from(value: TableMetadata) -> Result<Self, Error> {
895            Ok(match value.format_version {
896                FormatVersion::V3 => TableMetadataEnum::V3(value.try_into()?),
897                FormatVersion::V2 => TableMetadataEnum::V2(value.into()),
898                FormatVersion::V1 => TableMetadataEnum::V1(value.try_into()?),
899            })
900        }
901    }
902
903    impl TryFrom<TableMetadataV3> for TableMetadata {
904        type Error = Error;
905        fn try_from(value: TableMetadataV3) -> Result<Self, self::Error> {
906            let TableMetadataV3 {
907                format_version: _,
908                shared: value,
909                next_row_id,
910                encryption_keys,
911                snapshots,
912            } = value;
913            let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
914                None
915            } else {
916                value.current_snapshot_id
917            };
918            let schemas = HashMap::from_iter(
919                value
920                    .schemas
921                    .into_iter()
922                    .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
923                    .collect::<Result<Vec<_>, Error>>()?,
924            );
925
926            let current_schema: &SchemaRef =
927                schemas.get(&value.current_schema_id).ok_or_else(|| {
928                    Error::new(
929                        ErrorKind::DataInvalid,
930                        format!(
931                            "No schema exists with the current schema id {}.",
932                            value.current_schema_id
933                        ),
934                    )
935                })?;
936            let partition_specs = HashMap::from_iter(
937                value
938                    .partition_specs
939                    .into_iter()
940                    .map(|x| (x.spec_id(), Arc::new(x))),
941            );
942            let default_spec_id = value.default_spec_id;
943            let default_spec: PartitionSpecRef = partition_specs
944                .get(&value.default_spec_id)
945                .map(|spec| (**spec).clone())
946                .or_else(|| {
947                    (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
948                        .then(PartitionSpec::unpartition_spec)
949                })
950                .ok_or_else(|| {
951                    Error::new(
952                        ErrorKind::DataInvalid,
953                        format!("Default partition spec {default_spec_id} not found"),
954                    )
955                })?
956                .into();
957            let default_partition_type = default_spec.partition_type(current_schema)?;
958
959            let mut metadata = TableMetadata {
960                format_version: FormatVersion::V3,
961                table_uuid: value.table_uuid,
962                location: value.location,
963                last_sequence_number: value.last_sequence_number,
964                last_updated_ms: value.last_updated_ms,
965                last_column_id: value.last_column_id,
966                current_schema_id: value.current_schema_id,
967                schemas,
968                partition_specs,
969                default_partition_type,
970                default_spec,
971                last_partition_id: value.last_partition_id,
972                properties: value.properties.unwrap_or_default(),
973                current_snapshot_id,
974                snapshots: snapshots
975                    .map(|snapshots| {
976                        HashMap::from_iter(
977                            snapshots
978                                .into_iter()
979                                .map(|x| (x.snapshot_id, Arc::new(x.into()))),
980                        )
981                    })
982                    .unwrap_or_default(),
983                snapshot_log: value.snapshot_log.unwrap_or_default(),
984                metadata_log: value.metadata_log.unwrap_or_default(),
985                sort_orders: HashMap::from_iter(
986                    value
987                        .sort_orders
988                        .into_iter()
989                        .map(|x| (x.order_id, Arc::new(x))),
990                ),
991                default_sort_order_id: value.default_sort_order_id,
992                refs: value.refs.unwrap_or_else(|| {
993                    if let Some(snapshot_id) = current_snapshot_id {
994                        HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
995                            snapshot_id,
996                            retention: SnapshotRetention::Branch {
997                                min_snapshots_to_keep: None,
998                                max_snapshot_age_ms: None,
999                                max_ref_age_ms: None,
1000                            },
1001                        })])
1002                    } else {
1003                        HashMap::new()
1004                    }
1005                }),
1006                statistics: index_statistics(value.statistics),
1007                partition_statistics: index_partition_statistics(value.partition_statistics),
1008                encryption_keys: encryption_keys
1009                    .map(|keys| {
1010                        HashMap::from_iter(keys.into_iter().map(|key| (key.key_id.clone(), key)))
1011                    })
1012                    .unwrap_or_default(),
1013                next_row_id,
1014            };
1015
1016            metadata.borrow_mut().try_normalize()?;
1017            Ok(metadata)
1018        }
1019    }
1020
1021    impl TryFrom<TableMetadataV2> for TableMetadata {
1022        type Error = Error;
1023        fn try_from(value: TableMetadataV2) -> Result<Self, self::Error> {
1024            let snapshots = value.snapshots;
1025            let value = value.shared;
1026            let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1027                None
1028            } else {
1029                value.current_snapshot_id
1030            };
1031            let schemas = HashMap::from_iter(
1032                value
1033                    .schemas
1034                    .into_iter()
1035                    .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?))))
1036                    .collect::<Result<Vec<_>, Error>>()?,
1037            );
1038
1039            let current_schema: &SchemaRef =
1040                schemas.get(&value.current_schema_id).ok_or_else(|| {
1041                    Error::new(
1042                        ErrorKind::DataInvalid,
1043                        format!(
1044                            "No schema exists with the current schema id {}.",
1045                            value.current_schema_id
1046                        ),
1047                    )
1048                })?;
1049            let partition_specs = HashMap::from_iter(
1050                value
1051                    .partition_specs
1052                    .into_iter()
1053                    .map(|x| (x.spec_id(), Arc::new(x))),
1054            );
1055            let default_spec_id = value.default_spec_id;
1056            let default_spec: PartitionSpecRef = partition_specs
1057                .get(&value.default_spec_id)
1058                .map(|spec| (**spec).clone())
1059                .or_else(|| {
1060                    (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
1061                        .then(PartitionSpec::unpartition_spec)
1062                })
1063                .ok_or_else(|| {
1064                    Error::new(
1065                        ErrorKind::DataInvalid,
1066                        format!("Default partition spec {default_spec_id} not found"),
1067                    )
1068                })?
1069                .into();
1070            let default_partition_type = default_spec.partition_type(current_schema)?;
1071
1072            let mut metadata = TableMetadata {
1073                format_version: FormatVersion::V2,
1074                table_uuid: value.table_uuid,
1075                location: value.location,
1076                last_sequence_number: value.last_sequence_number,
1077                last_updated_ms: value.last_updated_ms,
1078                last_column_id: value.last_column_id,
1079                current_schema_id: value.current_schema_id,
1080                schemas,
1081                partition_specs,
1082                default_partition_type,
1083                default_spec,
1084                last_partition_id: value.last_partition_id,
1085                properties: value.properties.unwrap_or_default(),
1086                current_snapshot_id,
1087                snapshots: snapshots
1088                    .map(|snapshots| {
1089                        HashMap::from_iter(
1090                            snapshots
1091                                .into_iter()
1092                                .map(|x| (x.snapshot_id, Arc::new(x.into()))),
1093                        )
1094                    })
1095                    .unwrap_or_default(),
1096                snapshot_log: value.snapshot_log.unwrap_or_default(),
1097                metadata_log: value.metadata_log.unwrap_or_default(),
1098                sort_orders: HashMap::from_iter(
1099                    value
1100                        .sort_orders
1101                        .into_iter()
1102                        .map(|x| (x.order_id, Arc::new(x))),
1103                ),
1104                default_sort_order_id: value.default_sort_order_id,
1105                refs: value.refs.unwrap_or_else(|| {
1106                    if let Some(snapshot_id) = current_snapshot_id {
1107                        HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1108                            snapshot_id,
1109                            retention: SnapshotRetention::Branch {
1110                                min_snapshots_to_keep: None,
1111                                max_snapshot_age_ms: None,
1112                                max_ref_age_ms: None,
1113                            },
1114                        })])
1115                    } else {
1116                        HashMap::new()
1117                    }
1118                }),
1119                statistics: index_statistics(value.statistics),
1120                partition_statistics: index_partition_statistics(value.partition_statistics),
1121                encryption_keys: HashMap::new(),
1122                next_row_id: INITIAL_ROW_ID,
1123            };
1124
1125            metadata.borrow_mut().try_normalize()?;
1126            Ok(metadata)
1127        }
1128    }
1129
1130    impl TryFrom<TableMetadataV1> for TableMetadata {
1131        type Error = Error;
1132        fn try_from(value: TableMetadataV1) -> Result<Self, Error> {
1133            let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
1134                None
1135            } else {
1136                value.current_snapshot_id
1137            };
1138
1139            let (schemas, current_schema_id, current_schema) =
1140                if let (Some(schemas_vec), Some(schema_id)) =
1141                    (&value.schemas, value.current_schema_id)
1142                {
1143                    // Option 1: Use 'schemas' + 'current_schema_id'
1144                    let schema_map = HashMap::from_iter(
1145                        schemas_vec
1146                            .clone()
1147                            .into_iter()
1148                            .map(|schema| {
1149                                let schema: Schema = schema.try_into()?;
1150                                Ok((schema.schema_id(), Arc::new(schema)))
1151                            })
1152                            .collect::<Result<Vec<_>, Error>>()?,
1153                    );
1154
1155                    let schema = schema_map
1156                        .get(&schema_id)
1157                        .ok_or_else(|| {
1158                            Error::new(
1159                                ErrorKind::DataInvalid,
1160                                format!("No schema exists with the current schema id {schema_id}."),
1161                            )
1162                        })?
1163                        .clone();
1164                    (schema_map, schema_id, schema)
1165                } else if let Some(schema) = value.schema {
1166                    // Option 2: Fall back to `schema`
1167                    let schema: Schema = schema.try_into()?;
1168                    let schema_id = schema.schema_id();
1169                    let schema_arc = Arc::new(schema);
1170                    let schema_map = HashMap::from_iter(vec![(schema_id, schema_arc.clone())]);
1171                    (schema_map, schema_id, schema_arc)
1172                } else {
1173                    // Option 3: No valid schema configuration found
1174                    return Err(Error::new(
1175                        ErrorKind::DataInvalid,
1176                        "No valid schema configuration found in table metadata",
1177                    ));
1178                };
1179
1180            // Prioritize 'partition_specs' over 'partition_spec'
1181            let partition_specs = if let Some(specs_vec) = value.partition_specs {
1182                // Option 1: Use 'partition_specs'
1183                specs_vec
1184                    .into_iter()
1185                    .map(|x| (x.spec_id(), Arc::new(x)))
1186                    .collect::<HashMap<_, _>>()
1187            } else if let Some(partition_spec) = value.partition_spec {
1188                // Option 2: Fall back to 'partition_spec'
1189                let spec = PartitionSpec::builder(current_schema.clone())
1190                    .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1191                    .add_unbound_fields(partition_spec.into_iter().map(|f| f.into_unbound()))?
1192                    .build()?;
1193
1194                HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1195            } else {
1196                // Option 3: Create empty partition spec
1197                let spec = PartitionSpec::builder(current_schema.clone())
1198                    .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
1199                    .build()?;
1200
1201                HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
1202            };
1203
1204            // Get the default_spec_id, prioritizing the explicit value if provided
1205            let default_spec_id = value
1206                .default_spec_id
1207                .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default());
1208
1209            // Get the default spec
1210            let default_spec: PartitionSpecRef = partition_specs
1211                .get(&default_spec_id)
1212                .map(|x| Arc::unwrap_or_clone(x.clone()))
1213                .ok_or_else(|| {
1214                    Error::new(
1215                        ErrorKind::DataInvalid,
1216                        format!("Default partition spec {default_spec_id} not found"),
1217                    )
1218                })?
1219                .into();
1220            let default_partition_type = default_spec.partition_type(&current_schema)?;
1221
1222            let mut metadata = TableMetadata {
1223                format_version: FormatVersion::V1,
1224                table_uuid: value.table_uuid.unwrap_or_default(),
1225                location: value.location,
1226                last_sequence_number: 0,
1227                last_updated_ms: value.last_updated_ms,
1228                last_column_id: value.last_column_id,
1229                current_schema_id,
1230                default_spec,
1231                default_partition_type,
1232                last_partition_id: value
1233                    .last_partition_id
1234                    .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()),
1235                partition_specs,
1236                schemas,
1237                properties: value.properties.unwrap_or_default(),
1238                current_snapshot_id,
1239                snapshots: value
1240                    .snapshots
1241                    .map(|snapshots| {
1242                        Ok::<_, Error>(HashMap::from_iter(
1243                            snapshots
1244                                .into_iter()
1245                                .map(|x| Ok((x.snapshot_id, Arc::new(x.try_into()?))))
1246                                .collect::<Result<Vec<_>, Error>>()?,
1247                        ))
1248                    })
1249                    .transpose()?
1250                    .unwrap_or_default(),
1251                snapshot_log: value.snapshot_log.unwrap_or_default(),
1252                metadata_log: value.metadata_log.unwrap_or_default(),
1253                sort_orders: match value.sort_orders {
1254                    Some(sort_orders) => HashMap::from_iter(
1255                        sort_orders.into_iter().map(|x| (x.order_id, Arc::new(x))),
1256                    ),
1257                    None => HashMap::new(),
1258                },
1259                default_sort_order_id: value
1260                    .default_sort_order_id
1261                    .unwrap_or(SortOrder::UNSORTED_ORDER_ID),
1262                refs: if let Some(snapshot_id) = current_snapshot_id {
1263                    HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
1264                        snapshot_id,
1265                        retention: SnapshotRetention::Branch {
1266                            min_snapshots_to_keep: None,
1267                            max_snapshot_age_ms: None,
1268                            max_ref_age_ms: None,
1269                        },
1270                    })])
1271                } else {
1272                    HashMap::new()
1273                },
1274                statistics: index_statistics(value.statistics),
1275                partition_statistics: index_partition_statistics(value.partition_statistics),
1276                encryption_keys: HashMap::new(),
1277                next_row_id: INITIAL_ROW_ID, // v1 has no row lineage
1278            };
1279
1280            metadata.borrow_mut().try_normalize()?;
1281            Ok(metadata)
1282        }
1283    }
1284
1285    impl TryFrom<TableMetadata> for TableMetadataV3 {
1286        type Error = Error;
1287
1288        fn try_from(mut v: TableMetadata) -> Result<Self, Self::Error> {
1289            let next_row_id = v.next_row_id;
1290            let encryption_keys = std::mem::take(&mut v.encryption_keys);
1291            let snapshots = std::mem::take(&mut v.snapshots);
1292            let shared = v.into();
1293
1294            Ok(TableMetadataV3 {
1295                format_version: VersionNumber::<3>,
1296                shared,
1297                next_row_id,
1298                encryption_keys: if encryption_keys.is_empty() {
1299                    None
1300                } else {
1301                    Some(encryption_keys.into_values().collect())
1302                },
1303                snapshots: if snapshots.is_empty() {
1304                    None
1305                } else {
1306                    Some(
1307                        snapshots
1308                            .into_values()
1309                            .map(|s| SnapshotV3::try_from(Arc::unwrap_or_clone(s)))
1310                            .collect::<Result<_, _>>()?,
1311                    )
1312                },
1313            })
1314        }
1315    }
1316
1317    impl From<TableMetadata> for TableMetadataV2 {
1318        fn from(mut v: TableMetadata) -> Self {
1319            let snapshots = std::mem::take(&mut v.snapshots);
1320            let shared = v.into();
1321
1322            TableMetadataV2 {
1323                format_version: VersionNumber::<2>,
1324                shared,
1325                snapshots: if snapshots.is_empty() {
1326                    None
1327                } else {
1328                    Some(
1329                        snapshots
1330                            .into_values()
1331                            .map(|s| SnapshotV2::from(Arc::unwrap_or_clone(s)))
1332                            .collect(),
1333                    )
1334                },
1335            }
1336        }
1337    }
1338
1339    impl From<TableMetadata> for TableMetadataV2V3Shared {
1340        fn from(v: TableMetadata) -> Self {
1341            TableMetadataV2V3Shared {
1342                table_uuid: v.table_uuid,
1343                location: v.location,
1344                last_sequence_number: v.last_sequence_number,
1345                last_updated_ms: v.last_updated_ms,
1346                last_column_id: v.last_column_id,
1347                schemas: v
1348                    .schemas
1349                    .into_values()
1350                    .map(|x| {
1351                        Arc::try_unwrap(x)
1352                            .unwrap_or_else(|schema| schema.as_ref().clone())
1353                            .into()
1354                    })
1355                    .collect(),
1356                current_schema_id: v.current_schema_id,
1357                partition_specs: v
1358                    .partition_specs
1359                    .into_values()
1360                    .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1361                    .collect(),
1362                default_spec_id: v.default_spec.spec_id(),
1363                last_partition_id: v.last_partition_id,
1364                properties: if v.properties.is_empty() {
1365                    None
1366                } else {
1367                    Some(v.properties)
1368                },
1369                current_snapshot_id: v.current_snapshot_id,
1370                snapshot_log: if v.snapshot_log.is_empty() {
1371                    None
1372                } else {
1373                    Some(v.snapshot_log)
1374                },
1375                metadata_log: if v.metadata_log.is_empty() {
1376                    None
1377                } else {
1378                    Some(v.metadata_log)
1379                },
1380                sort_orders: v
1381                    .sort_orders
1382                    .into_values()
1383                    .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1384                    .collect(),
1385                default_sort_order_id: v.default_sort_order_id,
1386                refs: Some(v.refs),
1387                statistics: v.statistics.into_values().collect(),
1388                partition_statistics: v.partition_statistics.into_values().collect(),
1389            }
1390        }
1391    }
1392
1393    impl TryFrom<TableMetadata> for TableMetadataV1 {
1394        type Error = Error;
1395        fn try_from(v: TableMetadata) -> Result<Self, Error> {
1396            Ok(TableMetadataV1 {
1397                format_version: VersionNumber::<1>,
1398                table_uuid: Some(v.table_uuid),
1399                location: v.location,
1400                last_updated_ms: v.last_updated_ms,
1401                last_column_id: v.last_column_id,
1402                schema: Some(
1403                    v.schemas
1404                        .get(&v.current_schema_id)
1405                        .ok_or(Error::new(
1406                            ErrorKind::Unexpected,
1407                            "current_schema_id not found in schemas",
1408                        ))?
1409                        .as_ref()
1410                        .clone()
1411                        .into(),
1412                ),
1413                schemas: Some(
1414                    v.schemas
1415                        .into_values()
1416                        .map(|x| {
1417                            Arc::try_unwrap(x)
1418                                .unwrap_or_else(|schema| schema.as_ref().clone())
1419                                .into()
1420                        })
1421                        .collect(),
1422                ),
1423                current_schema_id: Some(v.current_schema_id),
1424                partition_spec: Some(v.default_spec.fields().to_vec()),
1425                partition_specs: Some(
1426                    v.partition_specs
1427                        .into_values()
1428                        .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone()))
1429                        .collect(),
1430                ),
1431                default_spec_id: Some(v.default_spec.spec_id()),
1432                last_partition_id: Some(v.last_partition_id),
1433                properties: if v.properties.is_empty() {
1434                    None
1435                } else {
1436                    Some(v.properties)
1437                },
1438                current_snapshot_id: v.current_snapshot_id,
1439                snapshots: if v.snapshots.is_empty() {
1440                    None
1441                } else {
1442                    Some(
1443                        v.snapshots
1444                            .into_values()
1445                            .map(|x| Snapshot::clone(&x).into())
1446                            .collect(),
1447                    )
1448                },
1449                snapshot_log: if v.snapshot_log.is_empty() {
1450                    None
1451                } else {
1452                    Some(v.snapshot_log)
1453                },
1454                metadata_log: if v.metadata_log.is_empty() {
1455                    None
1456                } else {
1457                    Some(v.metadata_log)
1458                },
1459                sort_orders: Some(
1460                    v.sort_orders
1461                        .into_values()
1462                        .map(|s| Arc::try_unwrap(s).unwrap_or_else(|s| s.as_ref().clone()))
1463                        .collect(),
1464                ),
1465                default_sort_order_id: Some(v.default_sort_order_id),
1466                statistics: v.statistics.into_values().collect(),
1467                partition_statistics: v.partition_statistics.into_values().collect(),
1468            })
1469        }
1470    }
1471
1472    fn index_statistics(statistics: Vec<StatisticsFile>) -> HashMap<i64, StatisticsFile> {
1473        statistics
1474            .into_iter()
1475            .rev()
1476            .map(|s| (s.snapshot_id, s))
1477            .collect()
1478    }
1479
1480    fn index_partition_statistics(
1481        statistics: Vec<PartitionStatisticsFile>,
1482    ) -> HashMap<i64, PartitionStatisticsFile> {
1483        statistics
1484            .into_iter()
1485            .rev()
1486            .map(|s| (s.snapshot_id, s))
1487            .collect()
1488    }
1489}
1490
1491#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy, Hash)]
1492#[repr(u8)]
1493/// Iceberg format version
1494pub enum FormatVersion {
1495    /// Iceberg spec version 1
1496    V1 = 1u8,
1497    /// Iceberg spec version 2
1498    V2 = 2u8,
1499    /// Iceberg spec version 3
1500    V3 = 3u8,
1501}
1502
1503impl PartialOrd for FormatVersion {
1504    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1505        Some(self.cmp(other))
1506    }
1507}
1508
1509impl Ord for FormatVersion {
1510    fn cmp(&self, other: &Self) -> Ordering {
1511        (*self as u8).cmp(&(*other as u8))
1512    }
1513}
1514
1515impl Display for FormatVersion {
1516    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1517        match self {
1518            FormatVersion::V1 => write!(f, "v1"),
1519            FormatVersion::V2 => write!(f, "v2"),
1520            FormatVersion::V3 => write!(f, "v3"),
1521        }
1522    }
1523}
1524
1525#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1526#[serde(rename_all = "kebab-case")]
1527/// Encodes changes to the previous metadata files for the table
1528pub struct MetadataLog {
1529    /// The file for the log.
1530    pub metadata_file: String,
1531    /// Time new metadata was created
1532    pub timestamp_ms: i64,
1533}
1534
1535#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
1536#[serde(rename_all = "kebab-case")]
1537/// A log of when each snapshot was made.
1538pub struct SnapshotLog {
1539    /// Id of the snapshot.
1540    pub snapshot_id: i64,
1541    /// Last updated timestamp
1542    pub timestamp_ms: i64,
1543}
1544
1545impl SnapshotLog {
1546    /// Returns the last updated timestamp as a DateTime<Utc> with millisecond precision
1547    pub fn timestamp(self) -> Result<DateTime<Utc>> {
1548        timestamp_ms_to_utc(self.timestamp_ms)
1549    }
1550
1551    /// Returns the timestamp in milliseconds
1552    #[inline]
1553    pub fn timestamp_ms(&self) -> i64 {
1554        self.timestamp_ms
1555    }
1556}
1557
1558#[cfg(test)]
1559mod tests {
1560    use std::collections::HashMap;
1561    use std::fs;
1562    use std::io::Write as _;
1563    use std::sync::Arc;
1564
1565    use anyhow::Result;
1566    use base64::Engine as _;
1567    use pretty_assertions::assert_eq;
1568    use tempfile::TempDir;
1569    use uuid::Uuid;
1570
1571    use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder};
1572    use crate::io::FileIOBuilder;
1573    use crate::spec::table_metadata::TableMetadata;
1574    use crate::spec::{
1575        BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation,
1576        PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot,
1577        SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile,
1578        Summary, Transform, Type, UnboundPartitionField,
1579    };
1580    use crate::{ErrorKind, TableCreation};
1581
1582    fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
1583        let desered_type: TableMetadata = serde_json::from_str(json).unwrap();
1584        assert_eq!(desered_type, expected_type);
1585
1586        let sered_json = serde_json::to_string(&expected_type).unwrap();
1587        let parsed_json_value = serde_json::from_str::<TableMetadata>(&sered_json).unwrap();
1588
1589        assert_eq!(parsed_json_value, desered_type);
1590    }
1591
1592    fn get_test_table_metadata(file_name: &str) -> TableMetadata {
1593        let path = format!("testdata/table_metadata/{file_name}");
1594        let metadata: String = fs::read_to_string(path).unwrap();
1595
1596        serde_json::from_str(&metadata).unwrap()
1597    }
1598
1599    #[test]
1600    fn test_table_data_v2() {
1601        let data = r#"
1602            {
1603                "format-version" : 2,
1604                "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1605                "location": "s3://b/wh/data.db/table",
1606                "last-sequence-number" : 1,
1607                "last-updated-ms": 1515100955770,
1608                "last-column-id": 1,
1609                "schemas": [
1610                    {
1611                        "schema-id" : 1,
1612                        "type" : "struct",
1613                        "fields" :[
1614                            {
1615                                "id": 1,
1616                                "name": "struct_name",
1617                                "required": true,
1618                                "type": "fixed[1]"
1619                            },
1620                            {
1621                                "id": 4,
1622                                "name": "ts",
1623                                "required": true,
1624                                "type": "timestamp"
1625                            }
1626                        ]
1627                    }
1628                ],
1629                "current-schema-id" : 1,
1630                "partition-specs": [
1631                    {
1632                        "spec-id": 0,
1633                        "fields": [
1634                            {
1635                                "source-id": 4,
1636                                "field-id": 1000,
1637                                "name": "ts_day",
1638                                "transform": "day"
1639                            }
1640                        ]
1641                    }
1642                ],
1643                "default-spec-id": 0,
1644                "last-partition-id": 1000,
1645                "properties": {
1646                    "commit.retry.num-retries": "1"
1647                },
1648                "metadata-log": [
1649                    {
1650                        "metadata-file": "s3://bucket/.../v1.json",
1651                        "timestamp-ms": 1515100
1652                    }
1653                ],
1654                "refs": {},
1655                "sort-orders": [
1656                    {
1657                    "order-id": 0,
1658                    "fields": []
1659                    }
1660                ],
1661                "default-sort-order-id": 0
1662            }
1663        "#;
1664
1665        let schema = Schema::builder()
1666            .with_schema_id(1)
1667            .with_fields(vec![
1668                Arc::new(NestedField::required(
1669                    1,
1670                    "struct_name",
1671                    Type::Primitive(PrimitiveType::Fixed(1)),
1672                )),
1673                Arc::new(NestedField::required(
1674                    4,
1675                    "ts",
1676                    Type::Primitive(PrimitiveType::Timestamp),
1677                )),
1678            ])
1679            .build()
1680            .unwrap();
1681
1682        let partition_spec = PartitionSpec::builder(schema.clone())
1683            .with_spec_id(0)
1684            .add_unbound_field(UnboundPartitionField {
1685                name: "ts_day".to_string(),
1686                transform: Transform::Day,
1687                source_id: 4,
1688                field_id: Some(1000),
1689            })
1690            .unwrap()
1691            .build()
1692            .unwrap();
1693
1694        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1695        let expected = TableMetadata {
1696            format_version: FormatVersion::V2,
1697            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1698            location: "s3://b/wh/data.db/table".to_string(),
1699            last_updated_ms: 1515100955770,
1700            last_column_id: 1,
1701            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1702            current_schema_id: 1,
1703            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1704            default_partition_type,
1705            default_spec: partition_spec.into(),
1706            last_partition_id: 1000,
1707            default_sort_order_id: 0,
1708            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1709            snapshots: HashMap::default(),
1710            current_snapshot_id: None,
1711            last_sequence_number: 1,
1712            properties: HashMap::from_iter(vec![(
1713                "commit.retry.num-retries".to_string(),
1714                "1".to_string(),
1715            )]),
1716            snapshot_log: Vec::new(),
1717            metadata_log: vec![MetadataLog {
1718                metadata_file: "s3://bucket/.../v1.json".to_string(),
1719                timestamp_ms: 1515100,
1720            }],
1721            refs: HashMap::new(),
1722            statistics: HashMap::new(),
1723            partition_statistics: HashMap::new(),
1724            encryption_keys: HashMap::new(),
1725            next_row_id: INITIAL_ROW_ID,
1726        };
1727
1728        let expected_json_value = serde_json::to_value(&expected).unwrap();
1729        check_table_metadata_serde(data, expected);
1730
1731        let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1732        assert_eq!(json_value, expected_json_value);
1733    }
1734
1735    #[test]
1736    fn test_table_data_v3() {
1737        let data = r#"
1738            {
1739                "format-version" : 3,
1740                "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
1741                "location": "s3://b/wh/data.db/table",
1742                "last-sequence-number" : 1,
1743                "last-updated-ms": 1515100955770,
1744                "last-column-id": 1,
1745                "next-row-id": 5,
1746                "schemas": [
1747                    {
1748                        "schema-id" : 1,
1749                        "type" : "struct",
1750                        "fields" :[
1751                            {
1752                                "id": 4,
1753                                "name": "ts",
1754                                "required": true,
1755                                "type": "timestamp"
1756                            }
1757                        ]
1758                    }
1759                ],
1760                "current-schema-id" : 1,
1761                "partition-specs": [
1762                    {
1763                        "spec-id": 0,
1764                        "fields": [
1765                            {
1766                                "source-id": 4,
1767                                "field-id": 1000,
1768                                "name": "ts_day",
1769                                "transform": "day"
1770                            }
1771                        ]
1772                    }
1773                ],
1774                "default-spec-id": 0,
1775                "last-partition-id": 1000,
1776                "properties": {
1777                    "commit.retry.num-retries": "1"
1778                },
1779                "metadata-log": [
1780                    {
1781                        "metadata-file": "s3://bucket/.../v1.json",
1782                        "timestamp-ms": 1515100
1783                    }
1784                ],
1785                "refs": {},
1786                "snapshots" : [ {
1787                    "snapshot-id" : 1,
1788                    "timestamp-ms" : 1662532818843,
1789                    "sequence-number" : 0,
1790                    "first-row-id" : 0,
1791                    "added-rows" : 4,
1792                    "key-id" : "key1",
1793                    "summary" : {
1794                        "operation" : "append"
1795                    },
1796                    "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1797                    "schema-id" : 0
1798                    }
1799                ],
1800                "encryption-keys": [
1801                    {
1802                        "key-id": "key1",
1803                        "encrypted-by-id": "KMS",
1804                        "encrypted-key-metadata": "c29tZS1lbmNyeXB0aW9uLWtleQ==",
1805                        "properties": {
1806                            "p1": "v1"
1807                        }
1808                    }
1809                ],
1810                "sort-orders": [
1811                    {
1812                    "order-id": 0,
1813                    "fields": []
1814                    }
1815                ],
1816                "default-sort-order-id": 0
1817            }
1818        "#;
1819
1820        let schema = Schema::builder()
1821            .with_schema_id(1)
1822            .with_fields(vec![Arc::new(NestedField::required(
1823                4,
1824                "ts",
1825                Type::Primitive(PrimitiveType::Timestamp),
1826            ))])
1827            .build()
1828            .unwrap();
1829
1830        let partition_spec = PartitionSpec::builder(schema.clone())
1831            .with_spec_id(0)
1832            .add_unbound_field(UnboundPartitionField {
1833                name: "ts_day".to_string(),
1834                transform: Transform::Day,
1835                source_id: 4,
1836                field_id: Some(1000),
1837            })
1838            .unwrap()
1839            .build()
1840            .unwrap();
1841
1842        let snapshot = Snapshot::builder()
1843            .with_snapshot_id(1)
1844            .with_timestamp_ms(1662532818843)
1845            .with_sequence_number(0)
1846            .with_row_range(0, 4)
1847            .with_encryption_key_id(Some("key1".to_string()))
1848            .with_summary(Summary {
1849                operation: Operation::Append,
1850                additional_properties: HashMap::new(),
1851            })
1852            .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())
1853            .with_schema_id(0)
1854            .build();
1855
1856        let encryption_key = EncryptedKey::builder()
1857            .key_id("key1".to_string())
1858            .encrypted_by_id("KMS".to_string())
1859            .encrypted_key_metadata(
1860                base64::prelude::BASE64_STANDARD
1861                    .decode("c29tZS1lbmNyeXB0aW9uLWtleQ==")
1862                    .unwrap(),
1863            )
1864            .properties(HashMap::from_iter(vec![(
1865                "p1".to_string(),
1866                "v1".to_string(),
1867            )]))
1868            .build();
1869
1870        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
1871        let expected = TableMetadata {
1872            format_version: FormatVersion::V3,
1873            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
1874            location: "s3://b/wh/data.db/table".to_string(),
1875            last_updated_ms: 1515100955770,
1876            last_column_id: 1,
1877            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
1878            current_schema_id: 1,
1879            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
1880            default_partition_type,
1881            default_spec: partition_spec.into(),
1882            last_partition_id: 1000,
1883            default_sort_order_id: 0,
1884            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
1885            snapshots: HashMap::from_iter(vec![(1, snapshot.into())]),
1886            current_snapshot_id: None,
1887            last_sequence_number: 1,
1888            properties: HashMap::from_iter(vec![(
1889                "commit.retry.num-retries".to_string(),
1890                "1".to_string(),
1891            )]),
1892            snapshot_log: Vec::new(),
1893            metadata_log: vec![MetadataLog {
1894                metadata_file: "s3://bucket/.../v1.json".to_string(),
1895                timestamp_ms: 1515100,
1896            }],
1897            refs: HashMap::new(),
1898            statistics: HashMap::new(),
1899            partition_statistics: HashMap::new(),
1900            encryption_keys: HashMap::from_iter(vec![("key1".to_string(), encryption_key)]),
1901            next_row_id: 5,
1902        };
1903
1904        let expected_json_value = serde_json::to_value(&expected).unwrap();
1905        check_table_metadata_serde(data, expected);
1906
1907        let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
1908        assert_eq!(json_value, expected_json_value);
1909    }
1910
1911    #[test]
1912    fn test_table_data_v1() {
1913        let data = r#"
1914        {
1915            "format-version" : 1,
1916            "table-uuid" : "df838b92-0b32-465d-a44e-d39936e538b7",
1917            "location" : "/home/iceberg/warehouse/nyc/taxis",
1918            "last-updated-ms" : 1662532818843,
1919            "last-column-id" : 5,
1920            "schema" : {
1921              "type" : "struct",
1922              "schema-id" : 0,
1923              "fields" : [ {
1924                "id" : 1,
1925                "name" : "vendor_id",
1926                "required" : false,
1927                "type" : "long"
1928              }, {
1929                "id" : 2,
1930                "name" : "trip_id",
1931                "required" : false,
1932                "type" : "long"
1933              }, {
1934                "id" : 3,
1935                "name" : "trip_distance",
1936                "required" : false,
1937                "type" : "float"
1938              }, {
1939                "id" : 4,
1940                "name" : "fare_amount",
1941                "required" : false,
1942                "type" : "double"
1943              }, {
1944                "id" : 5,
1945                "name" : "store_and_fwd_flag",
1946                "required" : false,
1947                "type" : "string"
1948              } ]
1949            },
1950            "partition-spec" : [ {
1951              "name" : "vendor_id",
1952              "transform" : "identity",
1953              "source-id" : 1,
1954              "field-id" : 1000
1955            } ],
1956            "last-partition-id" : 1000,
1957            "default-sort-order-id" : 0,
1958            "sort-orders" : [ {
1959              "order-id" : 0,
1960              "fields" : [ ]
1961            } ],
1962            "properties" : {
1963              "owner" : "root"
1964            },
1965            "current-snapshot-id" : 638933773299822130,
1966            "refs" : {
1967              "main" : {
1968                "snapshot-id" : 638933773299822130,
1969                "type" : "branch"
1970              }
1971            },
1972            "snapshots" : [ {
1973              "snapshot-id" : 638933773299822130,
1974              "timestamp-ms" : 1662532818843,
1975              "sequence-number" : 0,
1976              "summary" : {
1977                "operation" : "append",
1978                "spark.app.id" : "local-1662532784305",
1979                "added-data-files" : "4",
1980                "added-records" : "4",
1981                "added-files-size" : "6001"
1982              },
1983              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
1984              "schema-id" : 0
1985            } ],
1986            "snapshot-log" : [ {
1987              "timestamp-ms" : 1662532818843,
1988              "snapshot-id" : 638933773299822130
1989            } ],
1990            "metadata-log" : [ {
1991              "timestamp-ms" : 1662532805245,
1992              "metadata-file" : "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json"
1993            } ]
1994          }
1995        "#;
1996
1997        let schema = Schema::builder()
1998            .with_fields(vec![
1999                Arc::new(NestedField::optional(
2000                    1,
2001                    "vendor_id",
2002                    Type::Primitive(PrimitiveType::Long),
2003                )),
2004                Arc::new(NestedField::optional(
2005                    2,
2006                    "trip_id",
2007                    Type::Primitive(PrimitiveType::Long),
2008                )),
2009                Arc::new(NestedField::optional(
2010                    3,
2011                    "trip_distance",
2012                    Type::Primitive(PrimitiveType::Float),
2013                )),
2014                Arc::new(NestedField::optional(
2015                    4,
2016                    "fare_amount",
2017                    Type::Primitive(PrimitiveType::Double),
2018                )),
2019                Arc::new(NestedField::optional(
2020                    5,
2021                    "store_and_fwd_flag",
2022                    Type::Primitive(PrimitiveType::String),
2023                )),
2024            ])
2025            .build()
2026            .unwrap();
2027
2028        let schema = Arc::new(schema);
2029        let partition_spec = PartitionSpec::builder(schema.clone())
2030            .with_spec_id(0)
2031            .add_partition_field("vendor_id", "vendor_id", Transform::Identity)
2032            .unwrap()
2033            .build()
2034            .unwrap();
2035
2036        let sort_order = SortOrder::builder()
2037            .with_order_id(0)
2038            .build_unbound()
2039            .unwrap();
2040
2041        let snapshot = Snapshot::builder()
2042            .with_snapshot_id(638933773299822130)
2043            .with_timestamp_ms(1662532818843)
2044            .with_sequence_number(0)
2045            .with_schema_id(0)
2046            .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
2047            .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
2048            .build();
2049
2050        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2051        let expected = TableMetadata {
2052            format_version: FormatVersion::V1,
2053            table_uuid: Uuid::parse_str("df838b92-0b32-465d-a44e-d39936e538b7").unwrap(),
2054            location: "/home/iceberg/warehouse/nyc/taxis".to_string(),
2055            last_updated_ms: 1662532818843,
2056            last_column_id: 5,
2057            schemas: HashMap::from_iter(vec![(0, schema)]),
2058            current_schema_id: 0,
2059            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2060            default_partition_type,
2061            default_spec: Arc::new(partition_spec),
2062            last_partition_id: 1000,
2063            default_sort_order_id: 0,
2064            sort_orders: HashMap::from_iter(vec![(0, sort_order.into())]),
2065            snapshots: HashMap::from_iter(vec![(638933773299822130, Arc::new(snapshot))]),
2066            current_snapshot_id: Some(638933773299822130),
2067            last_sequence_number: 0,
2068            properties: HashMap::from_iter(vec![("owner".to_string(), "root".to_string())]),
2069            snapshot_log: vec![SnapshotLog {
2070                snapshot_id: 638933773299822130,
2071                timestamp_ms: 1662532818843,
2072            }],
2073            metadata_log: vec![MetadataLog { metadata_file: "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string(), timestamp_ms: 1662532805245 }],
2074            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference { snapshot_id: 638933773299822130, retention: SnapshotRetention::Branch { min_snapshots_to_keep: None, max_snapshot_age_ms: None, max_ref_age_ms: None } })]),
2075            statistics: HashMap::new(),
2076            partition_statistics: HashMap::new(),
2077            encryption_keys: HashMap::new(),
2078            next_row_id: INITIAL_ROW_ID,
2079        };
2080
2081        check_table_metadata_serde(data, expected);
2082    }
2083
2084    #[test]
2085    fn test_table_data_v2_no_snapshots() {
2086        let data = r#"
2087        {
2088            "format-version" : 2,
2089            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2090            "location": "s3://b/wh/data.db/table",
2091            "last-sequence-number" : 1,
2092            "last-updated-ms": 1515100955770,
2093            "last-column-id": 1,
2094            "schemas": [
2095                {
2096                    "schema-id" : 1,
2097                    "type" : "struct",
2098                    "fields" :[
2099                        {
2100                            "id": 1,
2101                            "name": "struct_name",
2102                            "required": true,
2103                            "type": "fixed[1]"
2104                        }
2105                    ]
2106                }
2107            ],
2108            "current-schema-id" : 1,
2109            "partition-specs": [
2110                {
2111                    "spec-id": 0,
2112                    "fields": []
2113                }
2114            ],
2115            "refs": {},
2116            "default-spec-id": 0,
2117            "last-partition-id": 1000,
2118            "metadata-log": [
2119                {
2120                    "metadata-file": "s3://bucket/.../v1.json",
2121                    "timestamp-ms": 1515100
2122                }
2123            ],
2124            "sort-orders": [
2125                {
2126                "order-id": 0,
2127                "fields": []
2128                }
2129            ],
2130            "default-sort-order-id": 0
2131        }
2132        "#;
2133
2134        let schema = Schema::builder()
2135            .with_schema_id(1)
2136            .with_fields(vec![Arc::new(NestedField::required(
2137                1,
2138                "struct_name",
2139                Type::Primitive(PrimitiveType::Fixed(1)),
2140            ))])
2141            .build()
2142            .unwrap();
2143
2144        let partition_spec = PartitionSpec::builder(schema.clone())
2145            .with_spec_id(0)
2146            .build()
2147            .unwrap();
2148
2149        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2150        let expected = TableMetadata {
2151            format_version: FormatVersion::V2,
2152            table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
2153            location: "s3://b/wh/data.db/table".to_string(),
2154            last_updated_ms: 1515100955770,
2155            last_column_id: 1,
2156            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
2157            current_schema_id: 1,
2158            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2159            default_partition_type,
2160            default_spec: partition_spec.into(),
2161            last_partition_id: 1000,
2162            default_sort_order_id: 0,
2163            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2164            snapshots: HashMap::default(),
2165            current_snapshot_id: None,
2166            last_sequence_number: 1,
2167            properties: HashMap::new(),
2168            snapshot_log: Vec::new(),
2169            metadata_log: vec![MetadataLog {
2170                metadata_file: "s3://bucket/.../v1.json".to_string(),
2171                timestamp_ms: 1515100,
2172            }],
2173            refs: HashMap::new(),
2174            statistics: HashMap::new(),
2175            partition_statistics: HashMap::new(),
2176            encryption_keys: HashMap::new(),
2177            next_row_id: INITIAL_ROW_ID,
2178        };
2179
2180        let expected_json_value = serde_json::to_value(&expected).unwrap();
2181        check_table_metadata_serde(data, expected);
2182
2183        let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
2184        assert_eq!(json_value, expected_json_value);
2185    }
2186
2187    #[test]
2188    fn test_current_snapshot_id_must_match_main_branch() {
2189        let data = r#"
2190        {
2191            "format-version" : 2,
2192            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2193            "location": "s3://b/wh/data.db/table",
2194            "last-sequence-number" : 1,
2195            "last-updated-ms": 1515100955770,
2196            "last-column-id": 1,
2197            "schemas": [
2198                {
2199                    "schema-id" : 1,
2200                    "type" : "struct",
2201                    "fields" :[
2202                        {
2203                            "id": 1,
2204                            "name": "struct_name",
2205                            "required": true,
2206                            "type": "fixed[1]"
2207                        },
2208                        {
2209                            "id": 4,
2210                            "name": "ts",
2211                            "required": true,
2212                            "type": "timestamp"
2213                        }
2214                    ]
2215                }
2216            ],
2217            "current-schema-id" : 1,
2218            "partition-specs": [
2219                {
2220                    "spec-id": 0,
2221                    "fields": [
2222                        {
2223                            "source-id": 4,
2224                            "field-id": 1000,
2225                            "name": "ts_day",
2226                            "transform": "day"
2227                        }
2228                    ]
2229                }
2230            ],
2231            "default-spec-id": 0,
2232            "last-partition-id": 1000,
2233            "properties": {
2234                "commit.retry.num-retries": "1"
2235            },
2236            "metadata-log": [
2237                {
2238                    "metadata-file": "s3://bucket/.../v1.json",
2239                    "timestamp-ms": 1515100
2240                }
2241            ],
2242            "sort-orders": [
2243                {
2244                "order-id": 0,
2245                "fields": []
2246                }
2247            ],
2248            "default-sort-order-id": 0,
2249            "current-snapshot-id" : 1,
2250            "refs" : {
2251              "main" : {
2252                "snapshot-id" : 2,
2253                "type" : "branch"
2254              }
2255            },
2256            "snapshots" : [ {
2257              "snapshot-id" : 1,
2258              "timestamp-ms" : 1662532818843,
2259              "sequence-number" : 0,
2260              "summary" : {
2261                "operation" : "append",
2262                "spark.app.id" : "local-1662532784305",
2263                "added-data-files" : "4",
2264                "added-records" : "4",
2265                "added-files-size" : "6001"
2266              },
2267              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2268              "schema-id" : 0
2269            },
2270            {
2271              "snapshot-id" : 2,
2272              "timestamp-ms" : 1662532818844,
2273              "sequence-number" : 0,
2274              "summary" : {
2275                "operation" : "append",
2276                "spark.app.id" : "local-1662532784305",
2277                "added-data-files" : "4",
2278                "added-records" : "4",
2279                "added-files-size" : "6001"
2280              },
2281              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2282              "schema-id" : 0
2283            } ]
2284        }
2285    "#;
2286
2287        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2288        assert!(
2289            err.to_string()
2290                .contains("Current snapshot id does not match main branch")
2291        );
2292    }
2293
2294    #[test]
2295    fn test_main_without_current() {
2296        let data = r#"
2297        {
2298            "format-version" : 2,
2299            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2300            "location": "s3://b/wh/data.db/table",
2301            "last-sequence-number" : 1,
2302            "last-updated-ms": 1515100955770,
2303            "last-column-id": 1,
2304            "schemas": [
2305                {
2306                    "schema-id" : 1,
2307                    "type" : "struct",
2308                    "fields" :[
2309                        {
2310                            "id": 1,
2311                            "name": "struct_name",
2312                            "required": true,
2313                            "type": "fixed[1]"
2314                        },
2315                        {
2316                            "id": 4,
2317                            "name": "ts",
2318                            "required": true,
2319                            "type": "timestamp"
2320                        }
2321                    ]
2322                }
2323            ],
2324            "current-schema-id" : 1,
2325            "partition-specs": [
2326                {
2327                    "spec-id": 0,
2328                    "fields": [
2329                        {
2330                            "source-id": 4,
2331                            "field-id": 1000,
2332                            "name": "ts_day",
2333                            "transform": "day"
2334                        }
2335                    ]
2336                }
2337            ],
2338            "default-spec-id": 0,
2339            "last-partition-id": 1000,
2340            "properties": {
2341                "commit.retry.num-retries": "1"
2342            },
2343            "metadata-log": [
2344                {
2345                    "metadata-file": "s3://bucket/.../v1.json",
2346                    "timestamp-ms": 1515100
2347                }
2348            ],
2349            "sort-orders": [
2350                {
2351                "order-id": 0,
2352                "fields": []
2353                }
2354            ],
2355            "default-sort-order-id": 0,
2356            "refs" : {
2357              "main" : {
2358                "snapshot-id" : 1,
2359                "type" : "branch"
2360              }
2361            },
2362            "snapshots" : [ {
2363              "snapshot-id" : 1,
2364              "timestamp-ms" : 1662532818843,
2365              "sequence-number" : 0,
2366              "summary" : {
2367                "operation" : "append",
2368                "spark.app.id" : "local-1662532784305",
2369                "added-data-files" : "4",
2370                "added-records" : "4",
2371                "added-files-size" : "6001"
2372              },
2373              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2374              "schema-id" : 0
2375            } ]
2376        }
2377    "#;
2378
2379        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2380        assert!(
2381            err.to_string()
2382                .contains("Current snapshot is not set, but main branch exists")
2383        );
2384    }
2385
2386    #[test]
2387    fn test_branch_snapshot_missing() {
2388        let data = r#"
2389        {
2390            "format-version" : 2,
2391            "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
2392            "location": "s3://b/wh/data.db/table",
2393            "last-sequence-number" : 1,
2394            "last-updated-ms": 1515100955770,
2395            "last-column-id": 1,
2396            "schemas": [
2397                {
2398                    "schema-id" : 1,
2399                    "type" : "struct",
2400                    "fields" :[
2401                        {
2402                            "id": 1,
2403                            "name": "struct_name",
2404                            "required": true,
2405                            "type": "fixed[1]"
2406                        },
2407                        {
2408                            "id": 4,
2409                            "name": "ts",
2410                            "required": true,
2411                            "type": "timestamp"
2412                        }
2413                    ]
2414                }
2415            ],
2416            "current-schema-id" : 1,
2417            "partition-specs": [
2418                {
2419                    "spec-id": 0,
2420                    "fields": [
2421                        {
2422                            "source-id": 4,
2423                            "field-id": 1000,
2424                            "name": "ts_day",
2425                            "transform": "day"
2426                        }
2427                    ]
2428                }
2429            ],
2430            "default-spec-id": 0,
2431            "last-partition-id": 1000,
2432            "properties": {
2433                "commit.retry.num-retries": "1"
2434            },
2435            "metadata-log": [
2436                {
2437                    "metadata-file": "s3://bucket/.../v1.json",
2438                    "timestamp-ms": 1515100
2439                }
2440            ],
2441            "sort-orders": [
2442                {
2443                "order-id": 0,
2444                "fields": []
2445                }
2446            ],
2447            "default-sort-order-id": 0,
2448            "refs" : {
2449              "main" : {
2450                "snapshot-id" : 1,
2451                "type" : "branch"
2452              },
2453              "foo" : {
2454                "snapshot-id" : 2,
2455                "type" : "branch"
2456              }
2457            },
2458            "snapshots" : [ {
2459              "snapshot-id" : 1,
2460              "timestamp-ms" : 1662532818843,
2461              "sequence-number" : 0,
2462              "summary" : {
2463                "operation" : "append",
2464                "spark.app.id" : "local-1662532784305",
2465                "added-data-files" : "4",
2466                "added-records" : "4",
2467                "added-files-size" : "6001"
2468              },
2469              "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
2470              "schema-id" : 0
2471            } ]
2472        }
2473    "#;
2474
2475        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2476        assert!(
2477            err.to_string().contains(
2478                "Snapshot for reference foo does not exist in the existing snapshots list"
2479            )
2480        );
2481    }
2482
2483    #[test]
2484    fn test_v2_wrong_max_snapshot_sequence_number() {
2485        let data = r#"
2486        {
2487            "format-version": 2,
2488            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2489            "location": "s3://bucket/test/location",
2490            "last-sequence-number": 1,
2491            "last-updated-ms": 1602638573590,
2492            "last-column-id": 3,
2493            "current-schema-id": 0,
2494            "schemas": [
2495                {
2496                    "type": "struct",
2497                    "schema-id": 0,
2498                    "fields": [
2499                        {
2500                            "id": 1,
2501                            "name": "x",
2502                            "required": true,
2503                            "type": "long"
2504                        }
2505                    ]
2506                }
2507            ],
2508            "default-spec-id": 0,
2509            "partition-specs": [
2510                {
2511                    "spec-id": 0,
2512                    "fields": []
2513                }
2514            ],
2515            "last-partition-id": 1000,
2516            "default-sort-order-id": 0,
2517            "sort-orders": [
2518                {
2519                    "order-id": 0,
2520                    "fields": []
2521                }
2522            ],
2523            "properties": {},
2524            "current-snapshot-id": 3055729675574597004,
2525            "snapshots": [
2526                {
2527                    "snapshot-id": 3055729675574597004,
2528                    "timestamp-ms": 1555100955770,
2529                    "sequence-number": 4,
2530                    "summary": {
2531                        "operation": "append"
2532                    },
2533                    "manifest-list": "s3://a/b/2.avro",
2534                    "schema-id": 0
2535                }
2536            ],
2537            "statistics": [],
2538            "snapshot-log": [],
2539            "metadata-log": []
2540        }
2541    "#;
2542
2543        let err = serde_json::from_str::<TableMetadata>(data).unwrap_err();
2544        assert!(err.to_string().contains(
2545            "Invalid snapshot with id 3055729675574597004 and sequence number 4 greater than last sequence number 1"
2546        ));
2547
2548        // Change max sequence number to 4 - should work
2549        let data = data.replace(
2550            r#""last-sequence-number": 1,"#,
2551            r#""last-sequence-number": 4,"#,
2552        );
2553        let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2554        assert_eq!(metadata.last_sequence_number, 4);
2555
2556        // Change max sequence number to 5 - should work
2557        let data = data.replace(
2558            r#""last-sequence-number": 4,"#,
2559            r#""last-sequence-number": 5,"#,
2560        );
2561        let metadata = serde_json::from_str::<TableMetadata>(data.as_str()).unwrap();
2562        assert_eq!(metadata.last_sequence_number, 5);
2563    }
2564
2565    #[test]
2566    fn test_statistic_files() {
2567        let data = r#"
2568        {
2569            "format-version": 2,
2570            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2571            "location": "s3://bucket/test/location",
2572            "last-sequence-number": 34,
2573            "last-updated-ms": 1602638573590,
2574            "last-column-id": 3,
2575            "current-schema-id": 0,
2576            "schemas": [
2577                {
2578                    "type": "struct",
2579                    "schema-id": 0,
2580                    "fields": [
2581                        {
2582                            "id": 1,
2583                            "name": "x",
2584                            "required": true,
2585                            "type": "long"
2586                        }
2587                    ]
2588                }
2589            ],
2590            "default-spec-id": 0,
2591            "partition-specs": [
2592                {
2593                    "spec-id": 0,
2594                    "fields": []
2595                }
2596            ],
2597            "last-partition-id": 1000,
2598            "default-sort-order-id": 0,
2599            "sort-orders": [
2600                {
2601                    "order-id": 0,
2602                    "fields": []
2603                }
2604            ],
2605            "properties": {},
2606            "current-snapshot-id": 3055729675574597004,
2607            "snapshots": [
2608                {
2609                    "snapshot-id": 3055729675574597004,
2610                    "timestamp-ms": 1555100955770,
2611                    "sequence-number": 1,
2612                    "summary": {
2613                        "operation": "append"
2614                    },
2615                    "manifest-list": "s3://a/b/2.avro",
2616                    "schema-id": 0
2617                }
2618            ],
2619            "statistics": [
2620                {
2621                    "snapshot-id": 3055729675574597004,
2622                    "statistics-path": "s3://a/b/stats.puffin",
2623                    "file-size-in-bytes": 413,
2624                    "file-footer-size-in-bytes": 42,
2625                    "blob-metadata": [
2626                        {
2627                            "type": "ndv",
2628                            "snapshot-id": 3055729675574597004,
2629                            "sequence-number": 1,
2630                            "fields": [
2631                                1
2632                            ]
2633                        }
2634                    ]
2635                }
2636            ],
2637            "snapshot-log": [],
2638            "metadata-log": []
2639        }
2640    "#;
2641
2642        let schema = Schema::builder()
2643            .with_schema_id(0)
2644            .with_fields(vec![Arc::new(NestedField::required(
2645                1,
2646                "x",
2647                Type::Primitive(PrimitiveType::Long),
2648            ))])
2649            .build()
2650            .unwrap();
2651        let partition_spec = PartitionSpec::builder(schema.clone())
2652            .with_spec_id(0)
2653            .build()
2654            .unwrap();
2655        let snapshot = Snapshot::builder()
2656            .with_snapshot_id(3055729675574597004)
2657            .with_timestamp_ms(1555100955770)
2658            .with_sequence_number(1)
2659            .with_manifest_list("s3://a/b/2.avro")
2660            .with_schema_id(0)
2661            .with_summary(Summary {
2662                operation: Operation::Append,
2663                additional_properties: HashMap::new(),
2664            })
2665            .build();
2666
2667        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2668        let expected = TableMetadata {
2669            format_version: FormatVersion::V2,
2670            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2671            location: "s3://bucket/test/location".to_string(),
2672            last_updated_ms: 1602638573590,
2673            last_column_id: 3,
2674            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2675            current_schema_id: 0,
2676            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2677            default_partition_type,
2678            default_spec: Arc::new(partition_spec),
2679            last_partition_id: 1000,
2680            default_sort_order_id: 0,
2681            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2682            snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2683            current_snapshot_id: Some(3055729675574597004),
2684            last_sequence_number: 34,
2685            properties: HashMap::new(),
2686            snapshot_log: Vec::new(),
2687            metadata_log: Vec::new(),
2688            statistics: HashMap::from_iter(vec![(3055729675574597004, StatisticsFile {
2689                snapshot_id: 3055729675574597004,
2690                statistics_path: "s3://a/b/stats.puffin".to_string(),
2691                file_size_in_bytes: 413,
2692                file_footer_size_in_bytes: 42,
2693                key_metadata: None,
2694                blob_metadata: vec![BlobMetadata {
2695                    snapshot_id: 3055729675574597004,
2696                    sequence_number: 1,
2697                    fields: vec![1],
2698                    r#type: "ndv".to_string(),
2699                    properties: HashMap::new(),
2700                }],
2701            })]),
2702            partition_statistics: HashMap::new(),
2703            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2704                snapshot_id: 3055729675574597004,
2705                retention: SnapshotRetention::Branch {
2706                    min_snapshots_to_keep: None,
2707                    max_snapshot_age_ms: None,
2708                    max_ref_age_ms: None,
2709                },
2710            })]),
2711            encryption_keys: HashMap::new(),
2712            next_row_id: INITIAL_ROW_ID,
2713        };
2714
2715        check_table_metadata_serde(data, expected);
2716    }
2717
2718    #[test]
2719    fn test_partition_statistics_file() {
2720        let data = r#"
2721        {
2722            "format-version": 2,
2723            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
2724            "location": "s3://bucket/test/location",
2725            "last-sequence-number": 34,
2726            "last-updated-ms": 1602638573590,
2727            "last-column-id": 3,
2728            "current-schema-id": 0,
2729            "schemas": [
2730                {
2731                    "type": "struct",
2732                    "schema-id": 0,
2733                    "fields": [
2734                        {
2735                            "id": 1,
2736                            "name": "x",
2737                            "required": true,
2738                            "type": "long"
2739                        }
2740                    ]
2741                }
2742            ],
2743            "default-spec-id": 0,
2744            "partition-specs": [
2745                {
2746                    "spec-id": 0,
2747                    "fields": []
2748                }
2749            ],
2750            "last-partition-id": 1000,
2751            "default-sort-order-id": 0,
2752            "sort-orders": [
2753                {
2754                    "order-id": 0,
2755                    "fields": []
2756                }
2757            ],
2758            "properties": {},
2759            "current-snapshot-id": 3055729675574597004,
2760            "snapshots": [
2761                {
2762                    "snapshot-id": 3055729675574597004,
2763                    "timestamp-ms": 1555100955770,
2764                    "sequence-number": 1,
2765                    "summary": {
2766                        "operation": "append"
2767                    },
2768                    "manifest-list": "s3://a/b/2.avro",
2769                    "schema-id": 0
2770                }
2771            ],
2772            "partition-statistics": [
2773                {
2774                    "snapshot-id": 3055729675574597004,
2775                    "statistics-path": "s3://a/b/partition-stats.parquet",
2776                    "file-size-in-bytes": 43
2777                }
2778            ],
2779            "snapshot-log": [],
2780            "metadata-log": []
2781        }
2782        "#;
2783
2784        let schema = Schema::builder()
2785            .with_schema_id(0)
2786            .with_fields(vec![Arc::new(NestedField::required(
2787                1,
2788                "x",
2789                Type::Primitive(PrimitiveType::Long),
2790            ))])
2791            .build()
2792            .unwrap();
2793        let partition_spec = PartitionSpec::builder(schema.clone())
2794            .with_spec_id(0)
2795            .build()
2796            .unwrap();
2797        let snapshot = Snapshot::builder()
2798            .with_snapshot_id(3055729675574597004)
2799            .with_timestamp_ms(1555100955770)
2800            .with_sequence_number(1)
2801            .with_manifest_list("s3://a/b/2.avro")
2802            .with_schema_id(0)
2803            .with_summary(Summary {
2804                operation: Operation::Append,
2805                additional_properties: HashMap::new(),
2806            })
2807            .build();
2808
2809        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2810        let expected = TableMetadata {
2811            format_version: FormatVersion::V2,
2812            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2813            location: "s3://bucket/test/location".to_string(),
2814            last_updated_ms: 1602638573590,
2815            last_column_id: 3,
2816            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2817            current_schema_id: 0,
2818            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2819            default_spec: Arc::new(partition_spec),
2820            default_partition_type,
2821            last_partition_id: 1000,
2822            default_sort_order_id: 0,
2823            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
2824            snapshots: HashMap::from_iter(vec![(3055729675574597004, Arc::new(snapshot))]),
2825            current_snapshot_id: Some(3055729675574597004),
2826            last_sequence_number: 34,
2827            properties: HashMap::new(),
2828            snapshot_log: Vec::new(),
2829            metadata_log: Vec::new(),
2830            statistics: HashMap::new(),
2831            partition_statistics: HashMap::from_iter(vec![(
2832                3055729675574597004,
2833                PartitionStatisticsFile {
2834                    snapshot_id: 3055729675574597004,
2835                    statistics_path: "s3://a/b/partition-stats.parquet".to_string(),
2836                    file_size_in_bytes: 43,
2837                },
2838            )]),
2839            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
2840                snapshot_id: 3055729675574597004,
2841                retention: SnapshotRetention::Branch {
2842                    min_snapshots_to_keep: None,
2843                    max_snapshot_age_ms: None,
2844                    max_ref_age_ms: None,
2845                },
2846            })]),
2847            encryption_keys: HashMap::new(),
2848            next_row_id: INITIAL_ROW_ID,
2849        };
2850
2851        check_table_metadata_serde(data, expected);
2852    }
2853
2854    #[test]
2855    fn test_invalid_table_uuid() -> Result<()> {
2856        let data = r#"
2857            {
2858                "format-version" : 2,
2859                "table-uuid": "xxxx"
2860            }
2861        "#;
2862        assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2863        Ok(())
2864    }
2865
2866    #[test]
2867    fn test_deserialize_table_data_v2_invalid_format_version() -> Result<()> {
2868        let data = r#"
2869            {
2870                "format-version" : 1
2871            }
2872        "#;
2873        assert!(serde_json::from_str::<TableMetadata>(data).is_err());
2874        Ok(())
2875    }
2876
2877    #[test]
2878    fn test_table_metadata_v3_valid_minimal() {
2879        let metadata_str =
2880            fs::read_to_string("testdata/table_metadata/TableMetadataV3ValidMinimal.json").unwrap();
2881
2882        let table_metadata = serde_json::from_str::<TableMetadata>(&metadata_str).unwrap();
2883        assert_eq!(table_metadata.format_version, FormatVersion::V3);
2884
2885        let schema = Schema::builder()
2886            .with_schema_id(0)
2887            .with_fields(vec![
2888                Arc::new(
2889                    NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long))
2890                        .with_initial_default(Literal::Primitive(PrimitiveLiteral::Long(1)))
2891                        .with_write_default(Literal::Primitive(PrimitiveLiteral::Long(1))),
2892                ),
2893                Arc::new(
2894                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2895                        .with_doc("comment"),
2896                ),
2897                Arc::new(NestedField::required(
2898                    3,
2899                    "z",
2900                    Type::Primitive(PrimitiveType::Long),
2901                )),
2902            ])
2903            .build()
2904            .unwrap();
2905
2906        let partition_spec = PartitionSpec::builder(schema.clone())
2907            .with_spec_id(0)
2908            .add_unbound_field(UnboundPartitionField {
2909                name: "x".to_string(),
2910                transform: Transform::Identity,
2911                source_id: 1,
2912                field_id: Some(1000),
2913            })
2914            .unwrap()
2915            .build()
2916            .unwrap();
2917
2918        let sort_order = SortOrder::builder()
2919            .with_order_id(3)
2920            .with_sort_field(SortField {
2921                source_id: 2,
2922                transform: Transform::Identity,
2923                direction: SortDirection::Ascending,
2924                null_order: NullOrder::First,
2925            })
2926            .with_sort_field(SortField {
2927                source_id: 3,
2928                transform: Transform::Bucket(4),
2929                direction: SortDirection::Descending,
2930                null_order: NullOrder::Last,
2931            })
2932            .build_unbound()
2933            .unwrap();
2934
2935        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
2936        let expected = TableMetadata {
2937            format_version: FormatVersion::V3,
2938            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
2939            location: "s3://bucket/test/location".to_string(),
2940            last_updated_ms: 1602638573590,
2941            last_column_id: 3,
2942            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
2943            current_schema_id: 0,
2944            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
2945            default_spec: Arc::new(partition_spec),
2946            default_partition_type,
2947            last_partition_id: 1000,
2948            default_sort_order_id: 3,
2949            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
2950            snapshots: HashMap::default(),
2951            current_snapshot_id: None,
2952            last_sequence_number: 34,
2953            properties: HashMap::new(),
2954            snapshot_log: Vec::new(),
2955            metadata_log: Vec::new(),
2956            refs: HashMap::new(),
2957            statistics: HashMap::new(),
2958            partition_statistics: HashMap::new(),
2959            encryption_keys: HashMap::new(),
2960            next_row_id: 0, // V3 specific field from the JSON
2961        };
2962
2963        check_table_metadata_serde(&metadata_str, expected);
2964    }
2965
2966    #[test]
2967    fn test_table_metadata_v2_file_valid() {
2968        let metadata =
2969            fs::read_to_string("testdata/table_metadata/TableMetadataV2Valid.json").unwrap();
2970
2971        let schema1 = Schema::builder()
2972            .with_schema_id(0)
2973            .with_fields(vec![Arc::new(NestedField::required(
2974                1,
2975                "x",
2976                Type::Primitive(PrimitiveType::Long),
2977            ))])
2978            .build()
2979            .unwrap();
2980
2981        let schema2 = Schema::builder()
2982            .with_schema_id(1)
2983            .with_fields(vec![
2984                Arc::new(NestedField::required(
2985                    1,
2986                    "x",
2987                    Type::Primitive(PrimitiveType::Long),
2988                )),
2989                Arc::new(
2990                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
2991                        .with_doc("comment"),
2992                ),
2993                Arc::new(NestedField::required(
2994                    3,
2995                    "z",
2996                    Type::Primitive(PrimitiveType::Long),
2997                )),
2998            ])
2999            .with_identifier_field_ids(vec![1, 2])
3000            .build()
3001            .unwrap();
3002
3003        let partition_spec = PartitionSpec::builder(schema2.clone())
3004            .with_spec_id(0)
3005            .add_unbound_field(UnboundPartitionField {
3006                name: "x".to_string(),
3007                transform: Transform::Identity,
3008                source_id: 1,
3009                field_id: Some(1000),
3010            })
3011            .unwrap()
3012            .build()
3013            .unwrap();
3014
3015        let sort_order = SortOrder::builder()
3016            .with_order_id(3)
3017            .with_sort_field(SortField {
3018                source_id: 2,
3019                transform: Transform::Identity,
3020                direction: SortDirection::Ascending,
3021                null_order: NullOrder::First,
3022            })
3023            .with_sort_field(SortField {
3024                source_id: 3,
3025                transform: Transform::Bucket(4),
3026                direction: SortDirection::Descending,
3027                null_order: NullOrder::Last,
3028            })
3029            .build_unbound()
3030            .unwrap();
3031
3032        let snapshot1 = Snapshot::builder()
3033            .with_snapshot_id(3051729675574597004)
3034            .with_timestamp_ms(1515100955770)
3035            .with_sequence_number(0)
3036            .with_manifest_list("s3://a/b/1.avro")
3037            .with_summary(Summary {
3038                operation: Operation::Append,
3039                additional_properties: HashMap::new(),
3040            })
3041            .build();
3042
3043        let snapshot2 = Snapshot::builder()
3044            .with_snapshot_id(3055729675574597004)
3045            .with_parent_snapshot_id(Some(3051729675574597004))
3046            .with_timestamp_ms(1555100955770)
3047            .with_sequence_number(1)
3048            .with_schema_id(1)
3049            .with_manifest_list("s3://a/b/2.avro")
3050            .with_summary(Summary {
3051                operation: Operation::Append,
3052                additional_properties: HashMap::new(),
3053            })
3054            .build();
3055
3056        let default_partition_type = partition_spec.partition_type(&schema2).unwrap();
3057        let expected = TableMetadata {
3058            format_version: FormatVersion::V2,
3059            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3060            location: "s3://bucket/test/location".to_string(),
3061            last_updated_ms: 1602638573590,
3062            last_column_id: 3,
3063            schemas: HashMap::from_iter(vec![(0, Arc::new(schema1)), (1, Arc::new(schema2))]),
3064            current_schema_id: 1,
3065            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3066            default_spec: Arc::new(partition_spec),
3067            default_partition_type,
3068            last_partition_id: 1000,
3069            default_sort_order_id: 3,
3070            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3071            snapshots: HashMap::from_iter(vec![
3072                (3051729675574597004, Arc::new(snapshot1)),
3073                (3055729675574597004, Arc::new(snapshot2)),
3074            ]),
3075            current_snapshot_id: Some(3055729675574597004),
3076            last_sequence_number: 34,
3077            properties: HashMap::new(),
3078            snapshot_log: vec![
3079                SnapshotLog {
3080                    snapshot_id: 3051729675574597004,
3081                    timestamp_ms: 1515100955770,
3082                },
3083                SnapshotLog {
3084                    snapshot_id: 3055729675574597004,
3085                    timestamp_ms: 1555100955770,
3086                },
3087            ],
3088            metadata_log: Vec::new(),
3089            refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference {
3090                snapshot_id: 3055729675574597004,
3091                retention: SnapshotRetention::Branch {
3092                    min_snapshots_to_keep: None,
3093                    max_snapshot_age_ms: None,
3094                    max_ref_age_ms: None,
3095                },
3096            })]),
3097            statistics: HashMap::new(),
3098            partition_statistics: HashMap::new(),
3099            encryption_keys: HashMap::new(),
3100            next_row_id: INITIAL_ROW_ID,
3101        };
3102
3103        check_table_metadata_serde(&metadata, expected);
3104    }
3105
3106    #[test]
3107    fn test_table_metadata_v2_file_valid_minimal() {
3108        let metadata =
3109            fs::read_to_string("testdata/table_metadata/TableMetadataV2ValidMinimal.json").unwrap();
3110
3111        let schema = Schema::builder()
3112            .with_schema_id(0)
3113            .with_fields(vec![
3114                Arc::new(NestedField::required(
3115                    1,
3116                    "x",
3117                    Type::Primitive(PrimitiveType::Long),
3118                )),
3119                Arc::new(
3120                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3121                        .with_doc("comment"),
3122                ),
3123                Arc::new(NestedField::required(
3124                    3,
3125                    "z",
3126                    Type::Primitive(PrimitiveType::Long),
3127                )),
3128            ])
3129            .build()
3130            .unwrap();
3131
3132        let partition_spec = PartitionSpec::builder(schema.clone())
3133            .with_spec_id(0)
3134            .add_unbound_field(UnboundPartitionField {
3135                name: "x".to_string(),
3136                transform: Transform::Identity,
3137                source_id: 1,
3138                field_id: Some(1000),
3139            })
3140            .unwrap()
3141            .build()
3142            .unwrap();
3143
3144        let sort_order = SortOrder::builder()
3145            .with_order_id(3)
3146            .with_sort_field(SortField {
3147                source_id: 2,
3148                transform: Transform::Identity,
3149                direction: SortDirection::Ascending,
3150                null_order: NullOrder::First,
3151            })
3152            .with_sort_field(SortField {
3153                source_id: 3,
3154                transform: Transform::Bucket(4),
3155                direction: SortDirection::Descending,
3156                null_order: NullOrder::Last,
3157            })
3158            .build_unbound()
3159            .unwrap();
3160
3161        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3162        let expected = TableMetadata {
3163            format_version: FormatVersion::V2,
3164            table_uuid: Uuid::parse_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap(),
3165            location: "s3://bucket/test/location".to_string(),
3166            last_updated_ms: 1602638573590,
3167            last_column_id: 3,
3168            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3169            current_schema_id: 0,
3170            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3171            default_partition_type,
3172            default_spec: Arc::new(partition_spec),
3173            last_partition_id: 1000,
3174            default_sort_order_id: 3,
3175            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
3176            snapshots: HashMap::default(),
3177            current_snapshot_id: None,
3178            last_sequence_number: 34,
3179            properties: HashMap::new(),
3180            snapshot_log: vec![],
3181            metadata_log: Vec::new(),
3182            refs: HashMap::new(),
3183            statistics: HashMap::new(),
3184            partition_statistics: HashMap::new(),
3185            encryption_keys: HashMap::new(),
3186            next_row_id: INITIAL_ROW_ID,
3187        };
3188
3189        check_table_metadata_serde(&metadata, expected);
3190    }
3191
3192    #[test]
3193    fn test_table_metadata_v1_file_valid() {
3194        let metadata =
3195            fs::read_to_string("testdata/table_metadata/TableMetadataV1Valid.json").unwrap();
3196
3197        let schema = Schema::builder()
3198            .with_schema_id(0)
3199            .with_fields(vec![
3200                Arc::new(NestedField::required(
3201                    1,
3202                    "x",
3203                    Type::Primitive(PrimitiveType::Long),
3204                )),
3205                Arc::new(
3206                    NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long))
3207                        .with_doc("comment"),
3208                ),
3209                Arc::new(NestedField::required(
3210                    3,
3211                    "z",
3212                    Type::Primitive(PrimitiveType::Long),
3213                )),
3214            ])
3215            .build()
3216            .unwrap();
3217
3218        let partition_spec = PartitionSpec::builder(schema.clone())
3219            .with_spec_id(0)
3220            .add_unbound_field(UnboundPartitionField {
3221                name: "x".to_string(),
3222                transform: Transform::Identity,
3223                source_id: 1,
3224                field_id: Some(1000),
3225            })
3226            .unwrap()
3227            .build()
3228            .unwrap();
3229
3230        let default_partition_type = partition_spec.partition_type(&schema).unwrap();
3231        let expected = TableMetadata {
3232            format_version: FormatVersion::V1,
3233            table_uuid: Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap(),
3234            location: "s3://bucket/test/location".to_string(),
3235            last_updated_ms: 1602638573874,
3236            last_column_id: 3,
3237            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
3238            current_schema_id: 0,
3239            partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into())]),
3240            default_spec: Arc::new(partition_spec),
3241            default_partition_type,
3242            last_partition_id: 0,
3243            default_sort_order_id: 0,
3244            // Sort order is added during deserialization for V2 compatibility
3245            sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
3246            snapshots: HashMap::new(),
3247            current_snapshot_id: None,
3248            last_sequence_number: 0,
3249            properties: HashMap::new(),
3250            snapshot_log: vec![],
3251            metadata_log: Vec::new(),
3252            refs: HashMap::new(),
3253            statistics: HashMap::new(),
3254            partition_statistics: HashMap::new(),
3255            encryption_keys: HashMap::new(),
3256            next_row_id: INITIAL_ROW_ID,
3257        };
3258
3259        check_table_metadata_serde(&metadata, expected);
3260    }
3261
3262    #[test]
3263    fn test_table_metadata_v1_compat() {
3264        let metadata =
3265            fs::read_to_string("testdata/table_metadata/TableMetadataV1Compat.json").unwrap();
3266
3267        // Deserialize the JSON to verify it works
3268        let desered_type: TableMetadata = serde_json::from_str(&metadata)
3269            .expect("Failed to deserialize TableMetadataV1Compat.json");
3270
3271        // Verify some key fields match
3272        assert_eq!(desered_type.format_version(), FormatVersion::V1);
3273        assert_eq!(
3274            desered_type.uuid(),
3275            Uuid::parse_str("3276010d-7b1d-488c-98d8-9025fc4fde6b").unwrap()
3276        );
3277        assert_eq!(
3278            desered_type.location(),
3279            "s3://bucket/warehouse/iceberg/glue.db/table_name"
3280        );
3281        assert_eq!(desered_type.last_updated_ms(), 1727773114005);
3282        assert_eq!(desered_type.current_schema_id(), 0);
3283    }
3284
3285    #[test]
3286    fn test_table_metadata_v1_schemas_without_current_id() {
3287        let metadata = fs::read_to_string(
3288            "testdata/table_metadata/TableMetadataV1SchemasWithoutCurrentId.json",
3289        )
3290        .unwrap();
3291
3292        // Deserialize the JSON - this should succeed by using the 'schema' field instead of 'schemas'
3293        let desered_type: TableMetadata = serde_json::from_str(&metadata)
3294            .expect("Failed to deserialize TableMetadataV1SchemasWithoutCurrentId.json");
3295
3296        // Verify it used the 'schema' field
3297        assert_eq!(desered_type.format_version(), FormatVersion::V1);
3298        assert_eq!(
3299            desered_type.uuid(),
3300            Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3301        );
3302
3303        // Get the schema and verify it has the expected fields
3304        let schema = desered_type.current_schema();
3305        assert_eq!(schema.as_struct().fields().len(), 3);
3306        assert_eq!(schema.as_struct().fields()[0].name, "x");
3307        assert_eq!(schema.as_struct().fields()[1].name, "y");
3308        assert_eq!(schema.as_struct().fields()[2].name, "z");
3309    }
3310
3311    #[test]
3312    fn test_table_metadata_v1_no_valid_schema() {
3313        let metadata =
3314            fs::read_to_string("testdata/table_metadata/TableMetadataV1NoValidSchema.json")
3315                .unwrap();
3316
3317        // Deserialize the JSON - this should fail because neither schemas + current_schema_id nor schema is valid
3318        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3319
3320        assert!(desered.is_err());
3321        let error_message = desered.unwrap_err().to_string();
3322        assert!(
3323            error_message.contains("No valid schema configuration found"),
3324            "Expected error about no valid schema configuration, got: {error_message}"
3325        );
3326    }
3327
3328    #[test]
3329    fn test_table_metadata_v1_partition_specs_without_default_id() {
3330        let metadata = fs::read_to_string(
3331            "testdata/table_metadata/TableMetadataV1PartitionSpecsWithoutDefaultId.json",
3332        )
3333        .unwrap();
3334
3335        // Deserialize the JSON - this should succeed by inferring default_spec_id as the max spec ID
3336        let desered_type: TableMetadata = serde_json::from_str(&metadata)
3337            .expect("Failed to deserialize TableMetadataV1PartitionSpecsWithoutDefaultId.json");
3338
3339        // Verify basic metadata
3340        assert_eq!(desered_type.format_version(), FormatVersion::V1);
3341        assert_eq!(
3342            desered_type.uuid(),
3343            Uuid::parse_str("d20125c8-7284-442c-9aea-15fee620737c").unwrap()
3344        );
3345
3346        // Verify partition specs
3347        assert_eq!(desered_type.default_partition_spec_id(), 2); // Should pick the largest spec ID (2)
3348        assert_eq!(desered_type.partition_specs.len(), 2);
3349
3350        // Verify the default spec has the expected fields
3351        let default_spec = &desered_type.default_spec;
3352        assert_eq!(default_spec.spec_id(), 2);
3353        assert_eq!(default_spec.fields().len(), 1);
3354        assert_eq!(default_spec.fields()[0].name, "y");
3355        assert_eq!(default_spec.fields()[0].transform, Transform::Identity);
3356        assert_eq!(default_spec.fields()[0].source_id, 2);
3357    }
3358
3359    #[test]
3360    fn test_table_metadata_v2_schema_not_found() {
3361        let metadata =
3362            fs::read_to_string("testdata/table_metadata/TableMetadataV2CurrentSchemaNotFound.json")
3363                .unwrap();
3364
3365        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3366
3367        assert_eq!(
3368            desered.unwrap_err().to_string(),
3369            "DataInvalid => No schema exists with the current schema id 2."
3370        )
3371    }
3372
3373    #[test]
3374    fn test_table_metadata_v2_missing_sort_order() {
3375        let metadata =
3376            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSortOrder.json")
3377                .unwrap();
3378
3379        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3380
3381        assert_eq!(
3382            desered.unwrap_err().to_string(),
3383            "data did not match any variant of untagged enum TableMetadataEnum"
3384        )
3385    }
3386
3387    #[test]
3388    fn test_table_metadata_v2_missing_partition_specs() {
3389        let metadata =
3390            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingPartitionSpecs.json")
3391                .unwrap();
3392
3393        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3394
3395        assert_eq!(
3396            desered.unwrap_err().to_string(),
3397            "data did not match any variant of untagged enum TableMetadataEnum"
3398        )
3399    }
3400
3401    #[test]
3402    fn test_table_metadata_v2_missing_last_partition_id() {
3403        let metadata = fs::read_to_string(
3404            "testdata/table_metadata/TableMetadataV2MissingLastPartitionId.json",
3405        )
3406        .unwrap();
3407
3408        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3409
3410        assert_eq!(
3411            desered.unwrap_err().to_string(),
3412            "data did not match any variant of untagged enum TableMetadataEnum"
3413        )
3414    }
3415
3416    #[test]
3417    fn test_table_metadata_v2_missing_schemas() {
3418        let metadata =
3419            fs::read_to_string("testdata/table_metadata/TableMetadataV2MissingSchemas.json")
3420                .unwrap();
3421
3422        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3423
3424        assert_eq!(
3425            desered.unwrap_err().to_string(),
3426            "data did not match any variant of untagged enum TableMetadataEnum"
3427        )
3428    }
3429
3430    #[test]
3431    fn test_table_metadata_v2_unsupported_version() {
3432        let metadata =
3433            fs::read_to_string("testdata/table_metadata/TableMetadataUnsupportedVersion.json")
3434                .unwrap();
3435
3436        let desered: Result<TableMetadata, serde_json::Error> = serde_json::from_str(&metadata);
3437
3438        assert_eq!(
3439            desered.unwrap_err().to_string(),
3440            "data did not match any variant of untagged enum TableMetadataEnum"
3441        )
3442    }
3443
3444    #[test]
3445    fn test_order_of_format_version() {
3446        assert!(FormatVersion::V1 < FormatVersion::V2);
3447        assert_eq!(FormatVersion::V1, FormatVersion::V1);
3448        assert_eq!(FormatVersion::V2, FormatVersion::V2);
3449    }
3450
3451    #[test]
3452    fn test_default_partition_spec() {
3453        let default_spec_id = 1234;
3454        let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3455        let partition_spec = PartitionSpec::unpartition_spec();
3456        table_meta_data.default_spec = partition_spec.clone().into();
3457        table_meta_data
3458            .partition_specs
3459            .insert(default_spec_id, Arc::new(partition_spec));
3460
3461        assert_eq!(
3462            (*table_meta_data.default_partition_spec().clone()).clone(),
3463            (*table_meta_data
3464                .partition_spec_by_id(default_spec_id)
3465                .unwrap()
3466                .clone())
3467            .clone()
3468        );
3469    }
3470    #[test]
3471    fn test_default_sort_order() {
3472        let default_sort_order_id = 1234;
3473        let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json");
3474        table_meta_data.default_sort_order_id = default_sort_order_id;
3475        table_meta_data
3476            .sort_orders
3477            .insert(default_sort_order_id, Arc::new(SortOrder::default()));
3478
3479        assert_eq!(
3480            table_meta_data.default_sort_order(),
3481            table_meta_data
3482                .sort_orders
3483                .get(&default_sort_order_id)
3484                .unwrap()
3485        )
3486    }
3487
3488    #[test]
3489    fn test_table_metadata_builder_from_table_creation() {
3490        let table_creation = TableCreation::builder()
3491            .location("s3://db/table".to_string())
3492            .name("table".to_string())
3493            .properties(HashMap::new())
3494            .schema(Schema::builder().build().unwrap())
3495            .build();
3496        let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
3497            .unwrap()
3498            .build()
3499            .unwrap()
3500            .metadata;
3501        assert_eq!(table_metadata.location, "s3://db/table");
3502        assert_eq!(table_metadata.schemas.len(), 1);
3503        assert_eq!(
3504            table_metadata
3505                .schemas
3506                .get(&0)
3507                .unwrap()
3508                .as_struct()
3509                .fields()
3510                .len(),
3511            0
3512        );
3513        assert_eq!(table_metadata.properties.len(), 0);
3514        assert_eq!(
3515            table_metadata.partition_specs,
3516            HashMap::from([(
3517                0,
3518                Arc::new(
3519                    PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap().clone())
3520                        .with_spec_id(0)
3521                        .build()
3522                        .unwrap()
3523                )
3524            )])
3525        );
3526        assert_eq!(
3527            table_metadata.sort_orders,
3528            HashMap::from([(
3529                0,
3530                Arc::new(SortOrder {
3531                    order_id: 0,
3532                    fields: vec![]
3533                })
3534            )])
3535        );
3536    }
3537
3538    #[tokio::test]
3539    async fn test_table_metadata_read_write() {
3540        // Create a temporary directory for our test
3541        let temp_dir = TempDir::new().unwrap();
3542        let temp_path = temp_dir.path().to_str().unwrap();
3543
3544        // Create a FileIO instance
3545        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3546
3547        // Use an existing test metadata from the test files
3548        let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3549
3550        // Define the metadata location
3551        let metadata_location = format!("{temp_path}/metadata.json");
3552
3553        // Write the metadata
3554        original_metadata
3555            .write_to(&file_io, &metadata_location)
3556            .await
3557            .unwrap();
3558
3559        // Verify the file exists
3560        assert!(fs::metadata(&metadata_location).is_ok());
3561
3562        // Read the metadata back
3563        let read_metadata = TableMetadata::read_from(&file_io, &metadata_location)
3564            .await
3565            .unwrap();
3566
3567        // Verify the metadata matches
3568        assert_eq!(read_metadata, original_metadata);
3569    }
3570
3571    #[tokio::test]
3572    async fn test_table_metadata_read_compressed() {
3573        let temp_dir = TempDir::new().unwrap();
3574        let metadata_location = temp_dir.path().join("v1.gz.metadata.json");
3575
3576        let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
3577        let json = serde_json::to_string(&original_metadata).unwrap();
3578
3579        let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
3580        encoder.write_all(json.as_bytes()).unwrap();
3581        std::fs::write(&metadata_location, encoder.finish().unwrap())
3582            .expect("failed to write metadata");
3583
3584        // Read the metadata back
3585        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3586        let metadata_location = metadata_location.to_str().unwrap();
3587        let read_metadata = TableMetadata::read_from(&file_io, metadata_location)
3588            .await
3589            .unwrap();
3590
3591        // Verify the metadata matches
3592        assert_eq!(read_metadata, original_metadata);
3593    }
3594
3595    #[tokio::test]
3596    async fn test_table_metadata_read_nonexistent_file() {
3597        // Create a FileIO instance
3598        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
3599
3600        // Try to read a non-existent file
3601        let result = TableMetadata::read_from(&file_io, "/nonexistent/path/metadata.json").await;
3602
3603        // Verify it returns an error
3604        assert!(result.is_err());
3605    }
3606
3607    #[test]
3608    fn test_partition_name_exists() {
3609        let schema = Schema::builder()
3610            .with_fields(vec![
3611                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3612                NestedField::required(2, "partition_col", Type::Primitive(PrimitiveType::Int))
3613                    .into(),
3614            ])
3615            .build()
3616            .unwrap();
3617
3618        let spec1 = PartitionSpec::builder(schema.clone())
3619            .with_spec_id(1)
3620            .add_partition_field("data", "data_partition", Transform::Identity)
3621            .unwrap()
3622            .build()
3623            .unwrap();
3624
3625        let spec2 = PartitionSpec::builder(schema.clone())
3626            .with_spec_id(2)
3627            .add_partition_field("partition_col", "partition_bucket", Transform::Bucket(16))
3628            .unwrap()
3629            .build()
3630            .unwrap();
3631
3632        // Build metadata with these specs
3633        let metadata = TableMetadataBuilder::new(
3634            schema,
3635            spec1.clone().into_unbound(),
3636            SortOrder::unsorted_order(),
3637            "s3://test/location".to_string(),
3638            FormatVersion::V2,
3639            HashMap::new(),
3640        )
3641        .unwrap()
3642        .add_partition_spec(spec2.into_unbound())
3643        .unwrap()
3644        .build()
3645        .unwrap()
3646        .metadata;
3647
3648        assert!(metadata.partition_name_exists("data_partition"));
3649        assert!(metadata.partition_name_exists("partition_bucket"));
3650
3651        assert!(!metadata.partition_name_exists("nonexistent_field"));
3652        assert!(!metadata.partition_name_exists("data")); // schema field name, not partition field name
3653        assert!(!metadata.partition_name_exists(""));
3654    }
3655
3656    #[test]
3657    fn test_partition_name_exists_empty_specs() {
3658        // Create metadata with no partition specs (unpartitioned table)
3659        let schema = Schema::builder()
3660            .with_fields(vec![
3661                NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
3662            ])
3663            .build()
3664            .unwrap();
3665
3666        let metadata = TableMetadataBuilder::new(
3667            schema,
3668            PartitionSpec::unpartition_spec().into_unbound(),
3669            SortOrder::unsorted_order(),
3670            "s3://test/location".to_string(),
3671            FormatVersion::V2,
3672            HashMap::new(),
3673        )
3674        .unwrap()
3675        .build()
3676        .unwrap()
3677        .metadata;
3678
3679        assert!(!metadata.partition_name_exists("any_field"));
3680        assert!(!metadata.partition_name_exists("data"));
3681    }
3682
3683    #[test]
3684    fn test_name_exists_in_any_schema() {
3685        // Create multiple schemas with different fields
3686        let schema1 = Schema::builder()
3687            .with_schema_id(1)
3688            .with_fields(vec![
3689                NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3690                NestedField::required(2, "field2", Type::Primitive(PrimitiveType::Int)).into(),
3691            ])
3692            .build()
3693            .unwrap();
3694
3695        let schema2 = Schema::builder()
3696            .with_schema_id(2)
3697            .with_fields(vec![
3698                NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
3699                NestedField::required(3, "field3", Type::Primitive(PrimitiveType::Long)).into(),
3700            ])
3701            .build()
3702            .unwrap();
3703
3704        let metadata = TableMetadataBuilder::new(
3705            schema1,
3706            PartitionSpec::unpartition_spec().into_unbound(),
3707            SortOrder::unsorted_order(),
3708            "s3://test/location".to_string(),
3709            FormatVersion::V2,
3710            HashMap::new(),
3711        )
3712        .unwrap()
3713        .add_current_schema(schema2)
3714        .unwrap()
3715        .build()
3716        .unwrap()
3717        .metadata;
3718
3719        assert!(metadata.name_exists_in_any_schema("field1")); // exists in both schemas
3720        assert!(metadata.name_exists_in_any_schema("field2")); // exists only in schema1 (historical)
3721        assert!(metadata.name_exists_in_any_schema("field3")); // exists only in schema2 (current)
3722
3723        assert!(!metadata.name_exists_in_any_schema("nonexistent_field"));
3724        assert!(!metadata.name_exists_in_any_schema("field4"));
3725        assert!(!metadata.name_exists_in_any_schema(""));
3726    }
3727
3728    #[test]
3729    fn test_name_exists_in_any_schema_empty_schemas() {
3730        let schema = Schema::builder().with_fields(vec![]).build().unwrap();
3731
3732        let metadata = TableMetadataBuilder::new(
3733            schema,
3734            PartitionSpec::unpartition_spec().into_unbound(),
3735            SortOrder::unsorted_order(),
3736            "s3://test/location".to_string(),
3737            FormatVersion::V2,
3738            HashMap::new(),
3739        )
3740        .unwrap()
3741        .build()
3742        .unwrap()
3743        .metadata;
3744
3745        assert!(!metadata.name_exists_in_any_schema("any_field"));
3746    }
3747
3748    #[test]
3749    fn test_helper_methods_multi_version_scenario() {
3750        // Test a realistic multi-version scenario
3751        let initial_schema = Schema::builder()
3752            .with_fields(vec![
3753                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3754                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3755                NestedField::required(
3756                    3,
3757                    "deprecated_field",
3758                    Type::Primitive(PrimitiveType::String),
3759                )
3760                .into(),
3761            ])
3762            .build()
3763            .unwrap();
3764
3765        let metadata = TableMetadataBuilder::new(
3766            initial_schema,
3767            PartitionSpec::unpartition_spec().into_unbound(),
3768            SortOrder::unsorted_order(),
3769            "s3://test/location".to_string(),
3770            FormatVersion::V2,
3771            HashMap::new(),
3772        )
3773        .unwrap();
3774
3775        let evolved_schema = Schema::builder()
3776            .with_fields(vec![
3777                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3778                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3779                NestedField::required(
3780                    3,
3781                    "deprecated_field",
3782                    Type::Primitive(PrimitiveType::String),
3783                )
3784                .into(),
3785                NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3786                    .into(),
3787            ])
3788            .build()
3789            .unwrap();
3790
3791        // Then add a third schema that removes the deprecated field
3792        let _final_schema = Schema::builder()
3793            .with_fields(vec![
3794                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3795                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
3796                NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
3797                    .into(),
3798                NestedField::required(5, "latest_field", Type::Primitive(PrimitiveType::Boolean))
3799                    .into(),
3800            ])
3801            .build()
3802            .unwrap();
3803
3804        let final_metadata = metadata
3805            .add_current_schema(evolved_schema)
3806            .unwrap()
3807            .build()
3808            .unwrap()
3809            .metadata;
3810
3811        assert!(!final_metadata.partition_name_exists("nonexistent_partition")); // unpartitioned table
3812
3813        assert!(final_metadata.name_exists_in_any_schema("id")); // exists in both schemas
3814        assert!(final_metadata.name_exists_in_any_schema("name")); // exists in both schemas
3815        assert!(final_metadata.name_exists_in_any_schema("deprecated_field")); // exists in both schemas
3816        assert!(final_metadata.name_exists_in_any_schema("new_field")); // only in current schema
3817        assert!(!final_metadata.name_exists_in_any_schema("never_existed"));
3818    }
3819
3820    #[test]
3821    fn test_invalid_sort_order_id_zero_with_fields() {
3822        let metadata = r#"
3823        {
3824            "format-version": 2,
3825            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
3826            "location": "s3://bucket/test/location",
3827            "last-sequence-number": 111,
3828            "last-updated-ms": 1600000000000,
3829            "last-column-id": 3,
3830            "current-schema-id": 1,
3831            "schemas": [
3832                {
3833                    "type": "struct",
3834                    "schema-id": 1,
3835                    "fields": [
3836                        {"id": 1, "name": "x", "required": true, "type": "long"},
3837                        {"id": 2, "name": "y", "required": true, "type": "long"}
3838                    ]
3839                }
3840            ],
3841            "default-spec-id": 0,
3842            "partition-specs": [{"spec-id": 0, "fields": []}],
3843            "last-partition-id": 999,
3844            "default-sort-order-id": 0,
3845            "sort-orders": [
3846                {
3847                    "order-id": 0,
3848                    "fields": [
3849                        {
3850                            "transform": "identity",
3851                            "source-id": 1,
3852                            "direction": "asc",
3853                            "null-order": "nulls-first"
3854                        }
3855                    ]
3856                }
3857            ],
3858            "properties": {},
3859            "current-snapshot-id": -1,
3860            "snapshots": []
3861        }
3862        "#;
3863
3864        let result: Result<TableMetadata, serde_json::Error> = serde_json::from_str(metadata);
3865
3866        // Should fail because sort order ID 0 is reserved for unsorted order and cannot have fields
3867        assert!(
3868            result.is_err(),
3869            "Parsing should fail for sort order ID 0 with fields"
3870        );
3871    }
3872
3873    #[test]
3874    fn test_table_properties_with_defaults() {
3875        use crate::spec::TableProperties;
3876
3877        let schema = Schema::builder()
3878            .with_fields(vec![
3879                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3880            ])
3881            .build()
3882            .unwrap();
3883
3884        let metadata = TableMetadataBuilder::new(
3885            schema,
3886            PartitionSpec::unpartition_spec().into_unbound(),
3887            SortOrder::unsorted_order(),
3888            "s3://test/location".to_string(),
3889            FormatVersion::V2,
3890            HashMap::new(),
3891        )
3892        .unwrap()
3893        .build()
3894        .unwrap()
3895        .metadata;
3896
3897        let props = metadata.table_properties().unwrap();
3898
3899        assert_eq!(
3900            props.commit_num_retries,
3901            TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT
3902        );
3903        assert_eq!(
3904            props.write_target_file_size_bytes,
3905            TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
3906        );
3907    }
3908
3909    #[test]
3910    fn test_table_properties_with_custom_values() {
3911        use crate::spec::TableProperties;
3912
3913        let schema = Schema::builder()
3914            .with_fields(vec![
3915                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3916            ])
3917            .build()
3918            .unwrap();
3919
3920        let properties = HashMap::from([
3921            (
3922                TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
3923                "10".to_string(),
3924            ),
3925            (
3926                TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
3927                "1024".to_string(),
3928            ),
3929        ]);
3930
3931        let metadata = TableMetadataBuilder::new(
3932            schema,
3933            PartitionSpec::unpartition_spec().into_unbound(),
3934            SortOrder::unsorted_order(),
3935            "s3://test/location".to_string(),
3936            FormatVersion::V2,
3937            properties,
3938        )
3939        .unwrap()
3940        .build()
3941        .unwrap()
3942        .metadata;
3943
3944        let props = metadata.table_properties().unwrap();
3945
3946        assert_eq!(props.commit_num_retries, 10);
3947        assert_eq!(props.write_target_file_size_bytes, 1024);
3948    }
3949
3950    #[test]
3951    fn test_table_properties_with_invalid_value() {
3952        let schema = Schema::builder()
3953            .with_fields(vec![
3954                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
3955            ])
3956            .build()
3957            .unwrap();
3958
3959        let properties = HashMap::from([(
3960            "commit.retry.num-retries".to_string(),
3961            "not_a_number".to_string(),
3962        )]);
3963
3964        let metadata = TableMetadataBuilder::new(
3965            schema,
3966            PartitionSpec::unpartition_spec().into_unbound(),
3967            SortOrder::unsorted_order(),
3968            "s3://test/location".to_string(),
3969            FormatVersion::V2,
3970            properties,
3971        )
3972        .unwrap()
3973        .build()
3974        .unwrap()
3975        .metadata;
3976
3977        let err = metadata.table_properties().unwrap_err();
3978        assert_eq!(err.kind(), ErrorKind::DataInvalid);
3979        assert!(err.message().contains("Invalid table properties"));
3980    }
3981}