iceberg/spec/
manifest_list.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! ManifestList for Iceberg.
19
20use std::collections::HashMap;
21use std::str::FromStr;
22
23use apache_avro::types::Value;
24use apache_avro::{Reader, Writer, from_value};
25use bytes::Bytes;
26pub use serde_bytes::ByteBuf;
27use serde_derive::{Deserialize, Serialize};
28
29use self::_const_schema::{MANIFEST_LIST_AVRO_SCHEMA_V1, MANIFEST_LIST_AVRO_SCHEMA_V2};
30use self::_serde::{ManifestFileV1, ManifestFileV2};
31use super::{FormatVersion, Manifest};
32use crate::error::Result;
33use crate::io::{FileIO, OutputFile};
34use crate::spec::manifest_list::_const_schema::MANIFEST_LIST_AVRO_SCHEMA_V3;
35use crate::spec::manifest_list::_serde::ManifestFileV3;
36use crate::{Error, ErrorKind};
37
38/// Placeholder for sequence number. The field with this value must be replaced with the actual sequence number before it write.
39pub const UNASSIGNED_SEQUENCE_NUMBER: i64 = -1;
40
41/// Snapshots are embedded in table metadata, but the list of manifests for a
42/// snapshot are stored in a separate manifest list file.
43///
44/// A new manifest list is written for each attempt to commit a snapshot
45/// because the list of manifests always changes to produce a new snapshot.
46/// When a manifest list is written, the (optimistic) sequence number of the
47/// snapshot is written for all new manifest files tracked by the list.
48///
49/// A manifest list includes summary metadata that can be used to avoid
50/// scanning all of the manifests in a snapshot when planning a table scan.
51/// This includes the number of added, existing, and deleted files, and a
52/// summary of values for each field of the partition spec used to write the
53/// manifest.
54#[derive(Debug, Clone, PartialEq)]
55pub struct ManifestList {
56    /// Entries in a manifest list.
57    entries: Vec<ManifestFile>,
58}
59
60impl ManifestList {
61    /// Parse manifest list from bytes.
62    pub fn parse_with_version(bs: &[u8], version: FormatVersion) -> Result<ManifestList> {
63        match version {
64            FormatVersion::V1 => {
65                let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V1, bs)?;
66                let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
67                from_value::<_serde::ManifestListV1>(&values)?.try_into()
68            }
69            FormatVersion::V2 => {
70                let reader = Reader::new(bs)?;
71                let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
72                from_value::<_serde::ManifestListV2>(&values)?.try_into()
73            }
74            FormatVersion::V3 => {
75                let reader = Reader::new(bs)?;
76                let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
77                from_value::<_serde::ManifestListV3>(&values)?.try_into()
78            }
79        }
80    }
81
82    /// Get the entries in the manifest list.
83    pub fn entries(&self) -> &[ManifestFile] {
84        &self.entries
85    }
86
87    /// Take ownership of the entries in the manifest list, consuming it
88    pub fn consume_entries(self) -> impl IntoIterator<Item = ManifestFile> {
89        Box::new(self.entries.into_iter())
90    }
91}
92
93/// A manifest list writer.
94pub struct ManifestListWriter {
95    format_version: FormatVersion,
96    output_file: OutputFile,
97    avro_writer: Writer<'static, Vec<u8>>,
98    sequence_number: i64,
99    snapshot_id: i64,
100    next_row_id: Option<u64>,
101}
102
103impl std::fmt::Debug for ManifestListWriter {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        f.debug_struct("ManifestListWriter")
106            .field("format_version", &self.format_version)
107            .field("output_file", &self.output_file)
108            .field("avro_writer", &self.avro_writer.schema())
109            .finish_non_exhaustive()
110    }
111}
112
113impl ManifestListWriter {
114    /// Get the next row ID that will be assigned to the next data manifest added.
115    pub fn next_row_id(&self) -> Option<u64> {
116        self.next_row_id
117    }
118
119    /// Construct a v1 [`ManifestListWriter`] that writes to a provided [`OutputFile`].
120    pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: Option<i64>) -> Self {
121        let mut metadata = HashMap::from_iter([
122            ("snapshot-id".to_string(), snapshot_id.to_string()),
123            ("format-version".to_string(), "1".to_string()),
124        ]);
125        if let Some(parent_snapshot_id) = parent_snapshot_id {
126            metadata.insert(
127                "parent-snapshot-id".to_string(),
128                parent_snapshot_id.to_string(),
129            );
130        }
131        Self::new(
132            FormatVersion::V1,
133            output_file,
134            metadata,
135            0,
136            snapshot_id,
137            None,
138        )
139    }
140
141    /// Construct a v2 [`ManifestListWriter`] that writes to a provided [`OutputFile`].
142    pub fn v2(
143        output_file: OutputFile,
144        snapshot_id: i64,
145        parent_snapshot_id: Option<i64>,
146        sequence_number: i64,
147    ) -> Self {
148        let mut metadata = HashMap::from_iter([
149            ("snapshot-id".to_string(), snapshot_id.to_string()),
150            ("sequence-number".to_string(), sequence_number.to_string()),
151            ("format-version".to_string(), "2".to_string()),
152        ]);
153        metadata.insert(
154            "parent-snapshot-id".to_string(),
155            parent_snapshot_id
156                .map(|v| v.to_string())
157                .unwrap_or("null".to_string()),
158        );
159        Self::new(
160            FormatVersion::V2,
161            output_file,
162            metadata,
163            sequence_number,
164            snapshot_id,
165            None,
166        )
167    }
168
169    /// Construct a v3 [`ManifestListWriter`] that writes to a provided [`OutputFile`].
170    pub fn v3(
171        output_file: OutputFile,
172        snapshot_id: i64,
173        parent_snapshot_id: Option<i64>,
174        sequence_number: i64,
175        first_row_id: Option<u64>, // Always None for delete manifests
176    ) -> Self {
177        let mut metadata = HashMap::from_iter([
178            ("snapshot-id".to_string(), snapshot_id.to_string()),
179            ("sequence-number".to_string(), sequence_number.to_string()),
180            ("format-version".to_string(), "3".to_string()),
181        ]);
182        metadata.insert(
183            "parent-snapshot-id".to_string(),
184            parent_snapshot_id
185                .map(|v| v.to_string())
186                .unwrap_or("null".to_string()),
187        );
188        metadata.insert(
189            "first-row-id".to_string(),
190            first_row_id
191                .map(|v| v.to_string())
192                .unwrap_or("null".to_string()),
193        );
194        Self::new(
195            FormatVersion::V3,
196            output_file,
197            metadata,
198            sequence_number,
199            snapshot_id,
200            first_row_id,
201        )
202    }
203
204    fn new(
205        format_version: FormatVersion,
206        output_file: OutputFile,
207        metadata: HashMap<String, String>,
208        sequence_number: i64,
209        snapshot_id: i64,
210        first_row_id: Option<u64>,
211    ) -> Self {
212        let avro_schema = match format_version {
213            FormatVersion::V1 => &MANIFEST_LIST_AVRO_SCHEMA_V1,
214            FormatVersion::V2 => &MANIFEST_LIST_AVRO_SCHEMA_V2,
215            FormatVersion::V3 => &MANIFEST_LIST_AVRO_SCHEMA_V3,
216        };
217        let mut avro_writer = Writer::new(avro_schema, Vec::new());
218        for (key, value) in metadata {
219            avro_writer
220                .add_user_metadata(key, value)
221                .expect("Avro metadata should be added to the writer before the first record.");
222        }
223        Self {
224            format_version,
225            output_file,
226            avro_writer,
227            sequence_number,
228            snapshot_id,
229            next_row_id: first_row_id,
230        }
231    }
232
233    /// Append manifests to be written.
234    ///
235    /// If V3 Manifests are added and the `first_row_id` of any data manifest is unassigned,
236    /// it will be assigned based on the `next_row_id` of the writer, and the `next_row_id` of the writer will be updated accordingly.
237    /// If `first_row_id` is already assigned, it will be validated against the `next_row_id` of the writer.
238    pub fn add_manifests(&mut self, manifests: impl Iterator<Item = ManifestFile>) -> Result<()> {
239        match self.format_version {
240            FormatVersion::V1 => {
241                for manifest in manifests {
242                    let manifests: ManifestFileV1 = manifest.try_into()?;
243                    self.avro_writer.append_ser(manifests)?;
244                }
245            }
246            FormatVersion::V2 | FormatVersion::V3 => {
247                for mut manifest in manifests {
248                    self.assign_sequence_numbers(&mut manifest)?;
249
250                    if self.format_version == FormatVersion::V2 {
251                        let manifest_entry: ManifestFileV2 = manifest.try_into()?;
252                        self.avro_writer.append_ser(manifest_entry)?;
253                    } else if self.format_version == FormatVersion::V3 {
254                        self.assign_first_row_id(&mut manifest)?;
255                        let manifest_entry: ManifestFileV3 = manifest.try_into()?;
256                        self.avro_writer.append_ser(manifest_entry)?;
257                    }
258                }
259            }
260        }
261        Ok(())
262    }
263
264    /// Write the manifest list to the output file.
265    pub async fn close(self) -> Result<()> {
266        let data = self.avro_writer.into_inner()?;
267        let mut writer = self.output_file.writer().await?;
268        writer.write(Bytes::from(data)).await?;
269        writer.close().await?;
270        Ok(())
271    }
272
273    /// Assign sequence numbers to manifest if they are unassigned
274    fn assign_sequence_numbers(&self, manifest: &mut ManifestFile) -> Result<()> {
275        if manifest.sequence_number == UNASSIGNED_SEQUENCE_NUMBER {
276            if manifest.added_snapshot_id != self.snapshot_id {
277                return Err(Error::new(
278                    ErrorKind::DataInvalid,
279                    format!(
280                        "Found unassigned sequence number for a manifest from snapshot {}.",
281                        manifest.added_snapshot_id
282                    ),
283                ));
284            }
285            manifest.sequence_number = self.sequence_number;
286        }
287
288        if manifest.min_sequence_number == UNASSIGNED_SEQUENCE_NUMBER {
289            if manifest.added_snapshot_id != self.snapshot_id {
290                return Err(Error::new(
291                    ErrorKind::DataInvalid,
292                    format!(
293                        "Found unassigned sequence number for a manifest from snapshot {}.",
294                        manifest.added_snapshot_id
295                    ),
296                ));
297            }
298            manifest.min_sequence_number = self.sequence_number;
299        }
300
301        Ok(())
302    }
303
304    /// Returns number of newly assigned first-row-ids, if any.
305    fn assign_first_row_id(&mut self, manifest: &mut ManifestFile) -> Result<()> {
306        match manifest.content {
307            ManifestContentType::Data => {
308                match (self.next_row_id, manifest.first_row_id) {
309                    (Some(_), Some(_)) => {
310                        // Case: Manifest with already assigned first row ID.
311                        // No need to increase next_row_id, as this manifest is already assigned.
312                    }
313                    (None, Some(manifest_first_row_id)) => {
314                        // Case: Assigned first row ID for data manifest, but the writer does not have a next-row-id assigned.
315                        return Err(Error::new(
316                            ErrorKind::Unexpected,
317                            format!(
318                                "Found invalid first-row-id assignment for Manifest {}. Writer does not have a next-row-id assigned, but the manifest has first-row-id assigned to {}.",
319                                manifest.manifest_path, manifest_first_row_id,
320                            ),
321                        ));
322                    }
323                    (Some(writer_next_row_id), None) => {
324                        // Case: Unassigned first row ID for data manifest. This is either a new
325                        // manifest, or a manifest from a pre-v3 snapshot. We need to assign one.
326                        let (existing_rows_count, added_rows_count) =
327                            require_row_counts_in_manifest(manifest)?;
328                        manifest.first_row_id = Some(writer_next_row_id);
329
330                        self.next_row_id = writer_next_row_id
331                        .checked_add(existing_rows_count)
332                        .and_then(|sum| sum.checked_add(added_rows_count))
333                        .ok_or_else(|| {
334                            Error::new(
335                                ErrorKind::DataInvalid,
336                                format!(
337                                    "Row ID overflow when computing next row ID for Manifest {}. Next Row ID: {writer_next_row_id}, Existing Rows Count: {existing_rows_count}, Added Rows Count: {added_rows_count}",
338                                    manifest.manifest_path
339                                ),
340                            )
341                        }).map(Some)?;
342                    }
343                    (None, None) => {
344                        // Case: Table without row lineage. No action needed.
345                    }
346                }
347            }
348            ManifestContentType::Deletes => {
349                // Deletes never have a first-row-id assigned.
350                manifest.first_row_id = None;
351            }
352        };
353
354        Ok(())
355    }
356}
357
358fn require_row_counts_in_manifest(manifest: &ManifestFile) -> Result<(u64, u64)> {
359    let existing_rows_count = manifest.existing_rows_count.ok_or_else(|| {
360        Error::new(
361            ErrorKind::DataInvalid,
362            format!(
363                "Cannot include a Manifest without existing-rows-count to a table with row lineage enabled. Manifest path: {}",
364                manifest.manifest_path,
365            ),
366        )
367    })?;
368    let added_rows_count = manifest.added_rows_count.ok_or_else(|| {
369        Error::new(
370            ErrorKind::DataInvalid,
371            format!(
372                "Cannot include a Manifest without added-rows-count to a table with row lineage enabled. Manifest path: {}",
373                manifest.manifest_path,
374            ),
375        )
376    })?;
377    Ok((existing_rows_count, added_rows_count))
378}
379
380/// This is a helper module that defines the schema field of the manifest list entry.
381mod _const_schema {
382    use std::sync::Arc;
383
384    use apache_avro::Schema as AvroSchema;
385    use once_cell::sync::Lazy;
386
387    use crate::avro::schema_to_avro_schema;
388    use crate::spec::{
389        ListType, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type,
390    };
391
392    static MANIFEST_PATH: Lazy<NestedFieldRef> = {
393        Lazy::new(|| {
394            Arc::new(NestedField::required(
395                500,
396                "manifest_path",
397                Type::Primitive(PrimitiveType::String),
398            ))
399        })
400    };
401    static MANIFEST_LENGTH: Lazy<NestedFieldRef> = {
402        Lazy::new(|| {
403            Arc::new(NestedField::required(
404                501,
405                "manifest_length",
406                Type::Primitive(PrimitiveType::Long),
407            ))
408        })
409    };
410    static PARTITION_SPEC_ID: Lazy<NestedFieldRef> = {
411        Lazy::new(|| {
412            Arc::new(NestedField::required(
413                502,
414                "partition_spec_id",
415                Type::Primitive(PrimitiveType::Int),
416            ))
417        })
418    };
419    static CONTENT: Lazy<NestedFieldRef> = {
420        Lazy::new(|| {
421            Arc::new(NestedField::required(
422                517,
423                "content",
424                Type::Primitive(PrimitiveType::Int),
425            ))
426        })
427    };
428    static SEQUENCE_NUMBER: Lazy<NestedFieldRef> = {
429        Lazy::new(|| {
430            Arc::new(NestedField::required(
431                515,
432                "sequence_number",
433                Type::Primitive(PrimitiveType::Long),
434            ))
435        })
436    };
437    static MIN_SEQUENCE_NUMBER: Lazy<NestedFieldRef> = {
438        Lazy::new(|| {
439            Arc::new(NestedField::required(
440                516,
441                "min_sequence_number",
442                Type::Primitive(PrimitiveType::Long),
443            ))
444        })
445    };
446    static ADDED_SNAPSHOT_ID: Lazy<NestedFieldRef> = {
447        Lazy::new(|| {
448            Arc::new(NestedField::required(
449                503,
450                "added_snapshot_id",
451                Type::Primitive(PrimitiveType::Long),
452            ))
453        })
454    };
455    static ADDED_FILES_COUNT_V2: Lazy<NestedFieldRef> = {
456        Lazy::new(|| {
457            Arc::new(NestedField::required(
458                504,
459                "added_files_count",
460                Type::Primitive(PrimitiveType::Int),
461            ))
462        })
463    };
464    static ADDED_FILES_COUNT_V1: Lazy<NestedFieldRef> = {
465        Lazy::new(|| {
466            Arc::new(NestedField::optional(
467                504,
468                "added_data_files_count",
469                Type::Primitive(PrimitiveType::Int),
470            ))
471        })
472    };
473    static EXISTING_FILES_COUNT_V2: Lazy<NestedFieldRef> = {
474        Lazy::new(|| {
475            Arc::new(NestedField::required(
476                505,
477                "existing_files_count",
478                Type::Primitive(PrimitiveType::Int),
479            ))
480        })
481    };
482    static EXISTING_FILES_COUNT_V1: Lazy<NestedFieldRef> = {
483        Lazy::new(|| {
484            Arc::new(NestedField::optional(
485                505,
486                "existing_data_files_count",
487                Type::Primitive(PrimitiveType::Int),
488            ))
489        })
490    };
491    static DELETED_FILES_COUNT_V2: Lazy<NestedFieldRef> = {
492        Lazy::new(|| {
493            Arc::new(NestedField::required(
494                506,
495                "deleted_files_count",
496                Type::Primitive(PrimitiveType::Int),
497            ))
498        })
499    };
500    static DELETED_FILES_COUNT_V1: Lazy<NestedFieldRef> = {
501        Lazy::new(|| {
502            Arc::new(NestedField::optional(
503                506,
504                "deleted_data_files_count",
505                Type::Primitive(PrimitiveType::Int),
506            ))
507        })
508    };
509    static ADDED_ROWS_COUNT_V2: Lazy<NestedFieldRef> = {
510        Lazy::new(|| {
511            Arc::new(NestedField::required(
512                512,
513                "added_rows_count",
514                Type::Primitive(PrimitiveType::Long),
515            ))
516        })
517    };
518    static ADDED_ROWS_COUNT_V1: Lazy<NestedFieldRef> = {
519        Lazy::new(|| {
520            Arc::new(NestedField::optional(
521                512,
522                "added_rows_count",
523                Type::Primitive(PrimitiveType::Long),
524            ))
525        })
526    };
527    static EXISTING_ROWS_COUNT_V2: Lazy<NestedFieldRef> = {
528        Lazy::new(|| {
529            Arc::new(NestedField::required(
530                513,
531                "existing_rows_count",
532                Type::Primitive(PrimitiveType::Long),
533            ))
534        })
535    };
536    static EXISTING_ROWS_COUNT_V1: Lazy<NestedFieldRef> = {
537        Lazy::new(|| {
538            Arc::new(NestedField::optional(
539                513,
540                "existing_rows_count",
541                Type::Primitive(PrimitiveType::Long),
542            ))
543        })
544    };
545    static DELETED_ROWS_COUNT_V2: Lazy<NestedFieldRef> = {
546        Lazy::new(|| {
547            Arc::new(NestedField::required(
548                514,
549                "deleted_rows_count",
550                Type::Primitive(PrimitiveType::Long),
551            ))
552        })
553    };
554    static DELETED_ROWS_COUNT_V1: Lazy<NestedFieldRef> = {
555        Lazy::new(|| {
556            Arc::new(NestedField::optional(
557                514,
558                "deleted_rows_count",
559                Type::Primitive(PrimitiveType::Long),
560            ))
561        })
562    };
563    static PARTITIONS: Lazy<NestedFieldRef> = {
564        Lazy::new(|| {
565            // element type
566            let fields = vec![
567                Arc::new(NestedField::required(
568                    509,
569                    "contains_null",
570                    Type::Primitive(PrimitiveType::Boolean),
571                )),
572                Arc::new(NestedField::optional(
573                    518,
574                    "contains_nan",
575                    Type::Primitive(PrimitiveType::Boolean),
576                )),
577                Arc::new(NestedField::optional(
578                    510,
579                    "lower_bound",
580                    Type::Primitive(PrimitiveType::Binary),
581                )),
582                Arc::new(NestedField::optional(
583                    511,
584                    "upper_bound",
585                    Type::Primitive(PrimitiveType::Binary),
586                )),
587            ];
588            let element_field = Arc::new(NestedField::required(
589                508,
590                "r_508",
591                Type::Struct(StructType::new(fields)),
592            ));
593            Arc::new(NestedField::optional(
594                507,
595                "partitions",
596                Type::List(ListType { element_field }),
597            ))
598        })
599    };
600    static KEY_METADATA: Lazy<NestedFieldRef> = {
601        Lazy::new(|| {
602            Arc::new(NestedField::optional(
603                519,
604                "key_metadata",
605                Type::Primitive(PrimitiveType::Binary),
606            ))
607        })
608    };
609    static FIRST_ROW_ID: Lazy<NestedFieldRef> = {
610        Lazy::new(|| {
611            Arc::new(NestedField::optional(
612                520,
613                "first_row_id",
614                Type::Primitive(PrimitiveType::Long),
615            ))
616        })
617    };
618
619    static V1_SCHEMA: Lazy<Schema> = {
620        Lazy::new(|| {
621            let fields = vec![
622                MANIFEST_PATH.clone(),
623                MANIFEST_LENGTH.clone(),
624                PARTITION_SPEC_ID.clone(),
625                ADDED_SNAPSHOT_ID.clone(),
626                ADDED_FILES_COUNT_V1.clone().to_owned(),
627                EXISTING_FILES_COUNT_V1.clone(),
628                DELETED_FILES_COUNT_V1.clone(),
629                ADDED_ROWS_COUNT_V1.clone(),
630                EXISTING_ROWS_COUNT_V1.clone(),
631                DELETED_ROWS_COUNT_V1.clone(),
632                PARTITIONS.clone(),
633                KEY_METADATA.clone(),
634            ];
635            Schema::builder().with_fields(fields).build().unwrap()
636        })
637    };
638
639    static V2_SCHEMA: Lazy<Schema> = {
640        Lazy::new(|| {
641            let fields = vec![
642                MANIFEST_PATH.clone(),
643                MANIFEST_LENGTH.clone(),
644                PARTITION_SPEC_ID.clone(),
645                CONTENT.clone(),
646                SEQUENCE_NUMBER.clone(),
647                MIN_SEQUENCE_NUMBER.clone(),
648                ADDED_SNAPSHOT_ID.clone(),
649                ADDED_FILES_COUNT_V2.clone(),
650                EXISTING_FILES_COUNT_V2.clone(),
651                DELETED_FILES_COUNT_V2.clone(),
652                ADDED_ROWS_COUNT_V2.clone(),
653                EXISTING_ROWS_COUNT_V2.clone(),
654                DELETED_ROWS_COUNT_V2.clone(),
655                PARTITIONS.clone(),
656                KEY_METADATA.clone(),
657            ];
658            Schema::builder().with_fields(fields).build().unwrap()
659        })
660    };
661
662    static V3_SCHEMA: Lazy<Schema> = {
663        Lazy::new(|| {
664            let fields = vec![
665                MANIFEST_PATH.clone(),
666                MANIFEST_LENGTH.clone(),
667                PARTITION_SPEC_ID.clone(),
668                CONTENT.clone(),
669                SEQUENCE_NUMBER.clone(),
670                MIN_SEQUENCE_NUMBER.clone(),
671                ADDED_SNAPSHOT_ID.clone(),
672                ADDED_FILES_COUNT_V2.clone(),
673                EXISTING_FILES_COUNT_V2.clone(),
674                DELETED_FILES_COUNT_V2.clone(),
675                ADDED_ROWS_COUNT_V2.clone(),
676                EXISTING_ROWS_COUNT_V2.clone(),
677                DELETED_ROWS_COUNT_V2.clone(),
678                PARTITIONS.clone(),
679                KEY_METADATA.clone(),
680                FIRST_ROW_ID.clone(),
681            ];
682            Schema::builder().with_fields(fields).build().unwrap()
683        })
684    };
685
686    pub(super) static MANIFEST_LIST_AVRO_SCHEMA_V1: Lazy<AvroSchema> =
687        Lazy::new(|| schema_to_avro_schema("manifest_file", &V1_SCHEMA).unwrap());
688
689    pub(super) static MANIFEST_LIST_AVRO_SCHEMA_V2: Lazy<AvroSchema> =
690        Lazy::new(|| schema_to_avro_schema("manifest_file", &V2_SCHEMA).unwrap());
691
692    pub(super) static MANIFEST_LIST_AVRO_SCHEMA_V3: Lazy<AvroSchema> =
693        Lazy::new(|| schema_to_avro_schema("manifest_file", &V3_SCHEMA).unwrap());
694}
695
696/// Entry in a manifest list.
697#[derive(Debug, PartialEq, Clone, Eq, Hash)]
698pub struct ManifestFile {
699    /// field: 500
700    ///
701    /// Location of the manifest file
702    pub manifest_path: String,
703    /// field: 501
704    ///
705    /// Length of the manifest file in bytes
706    pub manifest_length: i64,
707    /// field: 502
708    ///
709    /// ID of a partition spec used to write the manifest; must be listed
710    /// in table metadata partition-specs
711    pub partition_spec_id: i32,
712    /// field: 517
713    ///
714    /// The type of files tracked by the manifest, either data or delete
715    /// files; 0 for all v1 manifests
716    pub content: ManifestContentType,
717    /// field: 515
718    ///
719    /// The sequence number when the manifest was added to the table; use 0
720    /// when reading v1 manifest lists
721    pub sequence_number: i64,
722    /// field: 516
723    ///
724    /// The minimum data sequence number of all live data or delete files in
725    /// the manifest; use 0 when reading v1 manifest lists
726    pub min_sequence_number: i64,
727    /// field: 503
728    ///
729    /// ID of the snapshot where the manifest file was added
730    pub added_snapshot_id: i64,
731    /// field: 504
732    ///
733    /// Number of entries in the manifest that have status ADDED, when null
734    /// this is assumed to be non-zero
735    pub added_files_count: Option<u32>,
736    /// field: 505
737    ///
738    /// Number of entries in the manifest that have status EXISTING (0),
739    /// when null this is assumed to be non-zero
740    pub existing_files_count: Option<u32>,
741    /// field: 506
742    ///
743    /// Number of entries in the manifest that have status DELETED (2),
744    /// when null this is assumed to be non-zero
745    pub deleted_files_count: Option<u32>,
746    /// field: 512
747    ///
748    /// Number of rows in all of files in the manifest that have status
749    /// ADDED, when null this is assumed to be non-zero
750    pub added_rows_count: Option<u64>,
751    /// field: 513
752    ///
753    /// Number of rows in all of files in the manifest that have status
754    /// EXISTING, when null this is assumed to be non-zero
755    pub existing_rows_count: Option<u64>,
756    /// field: 514
757    ///
758    /// Number of rows in all of files in the manifest that have status
759    /// DELETED, when null this is assumed to be non-zero
760    pub deleted_rows_count: Option<u64>,
761    /// field: 507
762    /// element_field: 508
763    ///
764    /// A list of field summaries for each partition field in the spec. Each
765    /// field in the list corresponds to a field in the manifest file’s
766    /// partition spec.
767    pub partitions: Option<Vec<FieldSummary>>,
768    /// field: 519
769    ///
770    /// Implementation-specific key metadata for encryption
771    pub key_metadata: Option<Vec<u8>>,
772    /// field 520
773    ///
774    /// The starting _row_id to assign to rows added by ADDED data files
775    pub first_row_id: Option<u64>,
776}
777
778impl ManifestFile {
779    /// Checks if the manifest file has any added files.
780    pub fn has_added_files(&self) -> bool {
781        self.added_files_count.map(|c| c > 0).unwrap_or(true)
782    }
783
784    /// Checks whether this manifest contains entries with DELETED status.
785    pub fn has_deleted_files(&self) -> bool {
786        self.deleted_files_count.map(|c| c > 0).unwrap_or(true)
787    }
788
789    /// Checks if the manifest file has any existed files.
790    pub fn has_existing_files(&self) -> bool {
791        self.existing_files_count.map(|c| c > 0).unwrap_or(true)
792    }
793}
794
795/// The type of files tracked by the manifest, either data or delete files; Data(0) for all v1 manifests
796#[derive(Debug, PartialEq, Clone, Copy, Eq, Hash, Default)]
797pub enum ManifestContentType {
798    /// The manifest content is data.
799    #[default]
800    Data = 0,
801    /// The manifest content is deletes.
802    Deletes = 1,
803}
804
805impl FromStr for ManifestContentType {
806    type Err = Error;
807
808    fn from_str(s: &str) -> Result<Self> {
809        match s {
810            "data" => Ok(ManifestContentType::Data),
811            "deletes" => Ok(ManifestContentType::Deletes),
812            _ => Err(Error::new(
813                ErrorKind::DataInvalid,
814                format!("Invalid manifest content type: {s}"),
815            )),
816        }
817    }
818}
819
820impl std::fmt::Display for ManifestContentType {
821    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
822        match self {
823            ManifestContentType::Data => write!(f, "data"),
824            ManifestContentType::Deletes => write!(f, "deletes"),
825        }
826    }
827}
828
829impl TryFrom<i32> for ManifestContentType {
830    type Error = Error;
831
832    fn try_from(value: i32) -> std::result::Result<Self, Self::Error> {
833        match value {
834            0 => Ok(ManifestContentType::Data),
835            1 => Ok(ManifestContentType::Deletes),
836            _ => Err(Error::new(
837                crate::ErrorKind::DataInvalid,
838                format!("Invalid manifest content type. Expected 0 or 1, got {value}"),
839            )),
840        }
841    }
842}
843
844impl ManifestFile {
845    /// Load [`Manifest`].
846    ///
847    /// This method will also initialize inherited values of [`ManifestEntry`], such as `sequence_number`.
848    pub async fn load_manifest(&self, file_io: &FileIO) -> Result<Manifest> {
849        let avro = file_io.new_input(&self.manifest_path)?.read().await?;
850
851        let (metadata, mut entries) = Manifest::try_from_avro_bytes(&avro)?;
852
853        // Let entries inherit values from the manifest list entry.
854        for entry in &mut entries {
855            entry.inherit_data(self);
856        }
857
858        Ok(Manifest::new(metadata, entries))
859    }
860}
861
862/// Field summary for partition field in the spec.
863///
864/// Each field in the list corresponds to a field in the manifest file’s partition spec.
865#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Hash)]
866pub struct FieldSummary {
867    /// field: 509
868    ///
869    /// Whether the manifest contains at least one partition with a null
870    /// value for the field
871    pub contains_null: bool,
872    /// field: 518
873    /// Whether the manifest contains at least one partition with a NaN
874    /// value for the field
875    pub contains_nan: Option<bool>,
876    /// field: 510
877    /// The minimum value for the field in the manifests
878    /// partitions.
879    pub lower_bound: Option<ByteBuf>,
880    /// field: 511
881    /// The maximum value for the field in the manifests
882    /// partitions.
883    pub upper_bound: Option<ByteBuf>,
884}
885
886/// This is a helper module that defines types to help with serialization/deserialization.
887/// For deserialization the input first gets read into either the [ManifestFileV1] or [ManifestFileV2] struct
888/// and then converted into the [ManifestFile] struct. Serialization works the other way around.
889/// [ManifestFileV1] and [ManifestFileV2] are internal struct that are only used for serialization and deserialization.
890pub(super) mod _serde {
891    pub use serde_bytes::ByteBuf;
892    use serde_derive::{Deserialize, Serialize};
893
894    use super::ManifestFile;
895    use crate::Error;
896    use crate::error::Result;
897    use crate::spec::FieldSummary;
898
899    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
900    #[serde(transparent)]
901    pub(crate) struct ManifestListV3 {
902        entries: Vec<ManifestFileV3>,
903    }
904
905    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
906    #[serde(transparent)]
907    pub(crate) struct ManifestListV2 {
908        entries: Vec<ManifestFileV2>,
909    }
910
911    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
912    #[serde(transparent)]
913    pub(crate) struct ManifestListV1 {
914        entries: Vec<ManifestFileV1>,
915    }
916
917    impl ManifestListV3 {
918        /// Converts the [ManifestListV3] into a [ManifestList].
919        pub fn try_into(self) -> Result<super::ManifestList> {
920            Ok(super::ManifestList {
921                entries: self
922                    .entries
923                    .into_iter()
924                    .map(|v| v.try_into())
925                    .collect::<Result<Vec<_>>>()?,
926            })
927        }
928    }
929
930    impl TryFrom<super::ManifestList> for ManifestListV3 {
931        type Error = Error;
932
933        fn try_from(value: super::ManifestList) -> std::result::Result<Self, Self::Error> {
934            Ok(Self {
935                entries: value
936                    .entries
937                    .into_iter()
938                    .map(|v| v.try_into())
939                    .collect::<std::result::Result<Vec<_>, _>>()?,
940            })
941        }
942    }
943
944    impl ManifestListV2 {
945        /// Converts the [ManifestListV2] into a [ManifestList].
946        pub fn try_into(self) -> Result<super::ManifestList> {
947            Ok(super::ManifestList {
948                entries: self
949                    .entries
950                    .into_iter()
951                    .map(|v| v.try_into())
952                    .collect::<Result<Vec<_>>>()?,
953            })
954        }
955    }
956
957    impl TryFrom<super::ManifestList> for ManifestListV2 {
958        type Error = Error;
959
960        fn try_from(value: super::ManifestList) -> std::result::Result<Self, Self::Error> {
961            Ok(Self {
962                entries: value
963                    .entries
964                    .into_iter()
965                    .map(|v| v.try_into())
966                    .collect::<std::result::Result<Vec<_>, _>>()?,
967            })
968        }
969    }
970
971    impl ManifestListV1 {
972        /// Converts the [ManifestListV1] into a [ManifestList].
973        pub fn try_into(self) -> Result<super::ManifestList> {
974            Ok(super::ManifestList {
975                entries: self
976                    .entries
977                    .into_iter()
978                    .map(|v| v.try_into())
979                    .collect::<Result<Vec<_>>>()?,
980            })
981        }
982    }
983
984    impl TryFrom<super::ManifestList> for ManifestListV1 {
985        type Error = Error;
986
987        fn try_from(value: super::ManifestList) -> std::result::Result<Self, Self::Error> {
988            Ok(Self {
989                entries: value
990                    .entries
991                    .into_iter()
992                    .map(|v| v.try_into())
993                    .collect::<std::result::Result<Vec<_>, _>>()?,
994            })
995        }
996    }
997
998    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
999    pub(super) struct ManifestFileV1 {
1000        pub manifest_path: String,
1001        pub manifest_length: i64,
1002        pub partition_spec_id: i32,
1003        pub added_snapshot_id: i64,
1004        pub added_data_files_count: Option<i32>,
1005        pub existing_data_files_count: Option<i32>,
1006        pub deleted_data_files_count: Option<i32>,
1007        pub added_rows_count: Option<i64>,
1008        pub existing_rows_count: Option<i64>,
1009        pub deleted_rows_count: Option<i64>,
1010        pub partitions: Option<Vec<FieldSummary>>,
1011        pub key_metadata: Option<ByteBuf>,
1012    }
1013
1014    // Aliases were added to fields that were renamed in Iceberg  1.5.0 (https://github.com/apache/iceberg/pull/5338), in order to support both conventions/versions.
1015    // In the current implementation deserialization is done using field names, and therefore these fields may appear as either.
1016    // see issue that raised this here: https://github.com/apache/iceberg-rust/issues/338
1017    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
1018    pub(super) struct ManifestFileV2 {
1019        pub manifest_path: String,
1020        pub manifest_length: i64,
1021        pub partition_spec_id: i32,
1022        #[serde(default = "v2_default_content_for_v1")]
1023        pub content: i32,
1024        #[serde(default = "v2_default_sequence_number_for_v1")]
1025        pub sequence_number: i64,
1026        #[serde(default = "v2_default_min_sequence_number_for_v1")]
1027        pub min_sequence_number: i64,
1028        pub added_snapshot_id: i64,
1029        #[serde(alias = "added_data_files_count", alias = "added_files_count")]
1030        pub added_files_count: i32,
1031        #[serde(alias = "existing_data_files_count", alias = "existing_files_count")]
1032        pub existing_files_count: i32,
1033        #[serde(alias = "deleted_data_files_count", alias = "deleted_files_count")]
1034        pub deleted_files_count: i32,
1035        pub added_rows_count: i64,
1036        pub existing_rows_count: i64,
1037        pub deleted_rows_count: i64,
1038        pub partitions: Option<Vec<FieldSummary>>,
1039        pub key_metadata: Option<ByteBuf>,
1040    }
1041
1042    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
1043    pub(super) struct ManifestFileV3 {
1044        pub manifest_path: String,
1045        pub manifest_length: i64,
1046        pub partition_spec_id: i32,
1047        #[serde(default = "v2_default_content_for_v1")]
1048        pub content: i32,
1049        #[serde(default = "v2_default_sequence_number_for_v1")]
1050        pub sequence_number: i64,
1051        #[serde(default = "v2_default_min_sequence_number_for_v1")]
1052        pub min_sequence_number: i64,
1053        pub added_snapshot_id: i64,
1054        #[serde(alias = "added_data_files_count", alias = "added_files_count")]
1055        pub added_files_count: i32,
1056        #[serde(alias = "existing_data_files_count", alias = "existing_files_count")]
1057        pub existing_files_count: i32,
1058        #[serde(alias = "deleted_data_files_count", alias = "deleted_files_count")]
1059        pub deleted_files_count: i32,
1060        pub added_rows_count: i64,
1061        pub existing_rows_count: i64,
1062        pub deleted_rows_count: i64,
1063        pub partitions: Option<Vec<FieldSummary>>,
1064        pub key_metadata: Option<ByteBuf>,
1065        pub first_row_id: Option<u64>,
1066    }
1067
1068    impl ManifestFileV3 {
1069        /// Converts the [ManifestFileV3] into a [ManifestFile].
1070        pub fn try_into(self) -> Result<ManifestFile> {
1071            let manifest_file = ManifestFile {
1072                manifest_path: self.manifest_path,
1073                manifest_length: self.manifest_length,
1074                partition_spec_id: self.partition_spec_id,
1075                content: self.content.try_into()?,
1076                sequence_number: self.sequence_number,
1077                min_sequence_number: self.min_sequence_number,
1078                added_snapshot_id: self.added_snapshot_id,
1079                added_files_count: Some(self.added_files_count.try_into()?),
1080                existing_files_count: Some(self.existing_files_count.try_into()?),
1081                deleted_files_count: Some(self.deleted_files_count.try_into()?),
1082                added_rows_count: Some(self.added_rows_count.try_into()?),
1083                existing_rows_count: Some(self.existing_rows_count.try_into()?),
1084                deleted_rows_count: Some(self.deleted_rows_count.try_into()?),
1085                partitions: self.partitions,
1086                key_metadata: self.key_metadata.map(|b| b.into_vec()),
1087                first_row_id: self.first_row_id,
1088            };
1089
1090            Ok(manifest_file)
1091        }
1092    }
1093
1094    impl ManifestFileV2 {
1095        /// Converts the [ManifestFileV2] into a [ManifestFile].
1096        pub fn try_into(self) -> Result<ManifestFile> {
1097            Ok(ManifestFile {
1098                manifest_path: self.manifest_path,
1099                manifest_length: self.manifest_length,
1100                partition_spec_id: self.partition_spec_id,
1101                content: self.content.try_into()?,
1102                sequence_number: self.sequence_number,
1103                min_sequence_number: self.min_sequence_number,
1104                added_snapshot_id: self.added_snapshot_id,
1105                added_files_count: Some(self.added_files_count.try_into()?),
1106                existing_files_count: Some(self.existing_files_count.try_into()?),
1107                deleted_files_count: Some(self.deleted_files_count.try_into()?),
1108                added_rows_count: Some(self.added_rows_count.try_into()?),
1109                existing_rows_count: Some(self.existing_rows_count.try_into()?),
1110                deleted_rows_count: Some(self.deleted_rows_count.try_into()?),
1111                partitions: self.partitions,
1112                key_metadata: self.key_metadata.map(|b| b.into_vec()),
1113                first_row_id: None,
1114            })
1115        }
1116    }
1117
1118    fn v2_default_content_for_v1() -> i32 {
1119        super::ManifestContentType::Data as i32
1120    }
1121
1122    fn v2_default_sequence_number_for_v1() -> i64 {
1123        0
1124    }
1125
1126    fn v2_default_min_sequence_number_for_v1() -> i64 {
1127        0
1128    }
1129
1130    impl ManifestFileV1 {
1131        /// Converts the [ManifestFileV1] into a [ManifestFile].
1132        pub fn try_into(self) -> Result<ManifestFile> {
1133            Ok(ManifestFile {
1134                manifest_path: self.manifest_path,
1135                manifest_length: self.manifest_length,
1136                partition_spec_id: self.partition_spec_id,
1137                added_snapshot_id: self.added_snapshot_id,
1138                added_files_count: self
1139                    .added_data_files_count
1140                    .map(TryInto::try_into)
1141                    .transpose()?,
1142                existing_files_count: self
1143                    .existing_data_files_count
1144                    .map(TryInto::try_into)
1145                    .transpose()?,
1146                deleted_files_count: self
1147                    .deleted_data_files_count
1148                    .map(TryInto::try_into)
1149                    .transpose()?,
1150                added_rows_count: self.added_rows_count.map(TryInto::try_into).transpose()?,
1151                existing_rows_count: self
1152                    .existing_rows_count
1153                    .map(TryInto::try_into)
1154                    .transpose()?,
1155                deleted_rows_count: self.deleted_rows_count.map(TryInto::try_into).transpose()?,
1156                partitions: self.partitions,
1157                key_metadata: self.key_metadata.map(|b| b.into_vec()),
1158                // as ref: https://iceberg.apache.org/spec/#partitioning
1159                // use 0 when reading v1 manifest lists
1160                content: super::ManifestContentType::Data,
1161                sequence_number: 0,
1162                min_sequence_number: 0,
1163                first_row_id: None,
1164            })
1165        }
1166    }
1167
1168    fn convert_to_serde_key_metadata(key_metadata: Option<Vec<u8>>) -> Option<ByteBuf> {
1169        match key_metadata {
1170            Some(metadata) if !metadata.is_empty() => Some(ByteBuf::from(metadata)),
1171            _ => None,
1172        }
1173    }
1174
1175    impl TryFrom<ManifestFile> for ManifestFileV3 {
1176        type Error = Error;
1177
1178        fn try_from(value: ManifestFile) -> std::result::Result<Self, Self::Error> {
1179            let key_metadata = convert_to_serde_key_metadata(value.key_metadata);
1180            Ok(Self {
1181                manifest_path: value.manifest_path,
1182                manifest_length: value.manifest_length,
1183                partition_spec_id: value.partition_spec_id,
1184                content: value.content as i32,
1185                sequence_number: value.sequence_number,
1186                min_sequence_number: value.min_sequence_number,
1187                added_snapshot_id: value.added_snapshot_id,
1188                added_files_count: value
1189                    .added_files_count
1190                    .ok_or_else(|| {
1191                        Error::new(
1192                            crate::ErrorKind::DataInvalid,
1193                            "added_data_files_count in ManifestFileV3 is required",
1194                        )
1195                    })?
1196                    .try_into()?,
1197                existing_files_count: value
1198                    .existing_files_count
1199                    .ok_or_else(|| {
1200                        Error::new(
1201                            crate::ErrorKind::DataInvalid,
1202                            "existing_data_files_count in ManifestFileV3 is required",
1203                        )
1204                    })?
1205                    .try_into()?,
1206                deleted_files_count: value
1207                    .deleted_files_count
1208                    .ok_or_else(|| {
1209                        Error::new(
1210                            crate::ErrorKind::DataInvalid,
1211                            "deleted_data_files_count in ManifestFileV3 is required",
1212                        )
1213                    })?
1214                    .try_into()?,
1215                added_rows_count: value
1216                    .added_rows_count
1217                    .ok_or_else(|| {
1218                        Error::new(
1219                            crate::ErrorKind::DataInvalid,
1220                            "added_rows_count in ManifestFileV3 is required",
1221                        )
1222                    })?
1223                    .try_into()?,
1224                existing_rows_count: value
1225                    .existing_rows_count
1226                    .ok_or_else(|| {
1227                        Error::new(
1228                            crate::ErrorKind::DataInvalid,
1229                            "existing_rows_count in ManifestFileV3 is required",
1230                        )
1231                    })?
1232                    .try_into()?,
1233                deleted_rows_count: value
1234                    .deleted_rows_count
1235                    .ok_or_else(|| {
1236                        Error::new(
1237                            crate::ErrorKind::DataInvalid,
1238                            "deleted_rows_count in ManifestFileV3 is required",
1239                        )
1240                    })?
1241                    .try_into()?,
1242                partitions: value.partitions,
1243                key_metadata,
1244                first_row_id: value.first_row_id,
1245            })
1246        }
1247    }
1248
1249    impl TryFrom<ManifestFile> for ManifestFileV2 {
1250        type Error = Error;
1251
1252        fn try_from(value: ManifestFile) -> std::result::Result<Self, Self::Error> {
1253            let key_metadata = convert_to_serde_key_metadata(value.key_metadata);
1254            Ok(Self {
1255                manifest_path: value.manifest_path,
1256                manifest_length: value.manifest_length,
1257                partition_spec_id: value.partition_spec_id,
1258                content: value.content as i32,
1259                sequence_number: value.sequence_number,
1260                min_sequence_number: value.min_sequence_number,
1261                added_snapshot_id: value.added_snapshot_id,
1262                added_files_count: value
1263                    .added_files_count
1264                    .ok_or_else(|| {
1265                        Error::new(
1266                            crate::ErrorKind::DataInvalid,
1267                            "added_data_files_count in ManifestFileV2 should be require",
1268                        )
1269                    })?
1270                    .try_into()?,
1271                existing_files_count: value
1272                    .existing_files_count
1273                    .ok_or_else(|| {
1274                        Error::new(
1275                            crate::ErrorKind::DataInvalid,
1276                            "existing_data_files_count in ManifestFileV2 should be require",
1277                        )
1278                    })?
1279                    .try_into()?,
1280                deleted_files_count: value
1281                    .deleted_files_count
1282                    .ok_or_else(|| {
1283                        Error::new(
1284                            crate::ErrorKind::DataInvalid,
1285                            "deleted_data_files_count in ManifestFileV2 should be require",
1286                        )
1287                    })?
1288                    .try_into()?,
1289                added_rows_count: value
1290                    .added_rows_count
1291                    .ok_or_else(|| {
1292                        Error::new(
1293                            crate::ErrorKind::DataInvalid,
1294                            "added_rows_count in ManifestFileV2 should be require",
1295                        )
1296                    })?
1297                    .try_into()?,
1298                existing_rows_count: value
1299                    .existing_rows_count
1300                    .ok_or_else(|| {
1301                        Error::new(
1302                            crate::ErrorKind::DataInvalid,
1303                            "existing_rows_count in ManifestFileV2 should be require",
1304                        )
1305                    })?
1306                    .try_into()?,
1307                deleted_rows_count: value
1308                    .deleted_rows_count
1309                    .ok_or_else(|| {
1310                        Error::new(
1311                            crate::ErrorKind::DataInvalid,
1312                            "deleted_rows_count in ManifestFileV2 should be require",
1313                        )
1314                    })?
1315                    .try_into()?,
1316                partitions: value.partitions,
1317                key_metadata,
1318            })
1319        }
1320    }
1321
1322    impl TryFrom<ManifestFile> for ManifestFileV1 {
1323        type Error = Error;
1324
1325        fn try_from(value: ManifestFile) -> std::result::Result<Self, Self::Error> {
1326            let key_metadata = convert_to_serde_key_metadata(value.key_metadata);
1327            Ok(Self {
1328                manifest_path: value.manifest_path,
1329                manifest_length: value.manifest_length,
1330                partition_spec_id: value.partition_spec_id,
1331                added_snapshot_id: value.added_snapshot_id,
1332                added_data_files_count: value
1333                    .added_files_count
1334                    .map(TryInto::try_into)
1335                    .transpose()?,
1336                existing_data_files_count: value
1337                    .existing_files_count
1338                    .map(TryInto::try_into)
1339                    .transpose()?,
1340                deleted_data_files_count: value
1341                    .deleted_files_count
1342                    .map(TryInto::try_into)
1343                    .transpose()?,
1344                added_rows_count: value.added_rows_count.map(TryInto::try_into).transpose()?,
1345                existing_rows_count: value
1346                    .existing_rows_count
1347                    .map(TryInto::try_into)
1348                    .transpose()?,
1349                deleted_rows_count: value
1350                    .deleted_rows_count
1351                    .map(TryInto::try_into)
1352                    .transpose()?,
1353                partitions: value.partitions,
1354                key_metadata,
1355            })
1356        }
1357    }
1358}
1359
1360#[cfg(test)]
1361mod test {
1362    use std::fs;
1363
1364    use apache_avro::{Reader, Schema};
1365    use tempfile::TempDir;
1366
1367    use super::_serde::ManifestListV2;
1368    use crate::io::FileIOBuilder;
1369    use crate::spec::manifest_list::_serde::{ManifestListV1, ManifestListV3};
1370    use crate::spec::{
1371        Datum, FieldSummary, ManifestContentType, ManifestFile, ManifestList, ManifestListWriter,
1372        UNASSIGNED_SEQUENCE_NUMBER,
1373    };
1374
1375    #[tokio::test]
1376    async fn test_parse_manifest_list_v1() {
1377        let manifest_list = ManifestList {
1378            entries: vec![
1379                ManifestFile {
1380                    manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
1381                    manifest_length: 5806,
1382                    partition_spec_id: 0,
1383                    content: ManifestContentType::Data,
1384                    sequence_number: 0,
1385                    min_sequence_number: 0,
1386                    added_snapshot_id: 1646658105718557341,
1387                    added_files_count: Some(3),
1388                    existing_files_count: Some(0),
1389                    deleted_files_count: Some(0),
1390                    added_rows_count: Some(3),
1391                    existing_rows_count: Some(0),
1392                    deleted_rows_count: Some(0),
1393                    partitions: Some(vec![]),
1394                    key_metadata: None,
1395                    first_row_id: None,
1396                }
1397            ]
1398        };
1399
1400        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1401
1402        let tmp_dir = TempDir::new().unwrap();
1403        let file_name = "simple_manifest_list_v1.avro";
1404        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
1405
1406        let mut writer = ManifestListWriter::v1(
1407            file_io.new_output(full_path.clone()).unwrap(),
1408            1646658105718557341,
1409            Some(1646658105718557341),
1410        );
1411
1412        writer
1413            .add_manifests(manifest_list.entries.clone().into_iter())
1414            .unwrap();
1415        writer.close().await.unwrap();
1416
1417        let bs = fs::read(full_path).expect("read_file must succeed");
1418
1419        let parsed_manifest_list =
1420            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1).unwrap();
1421
1422        assert_eq!(manifest_list, parsed_manifest_list);
1423    }
1424
1425    #[tokio::test]
1426    async fn test_parse_manifest_list_v2() {
1427        let manifest_list = ManifestList {
1428            entries: vec![
1429                ManifestFile {
1430                    manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
1431                    manifest_length: 6926,
1432                    partition_spec_id: 1,
1433                    content: ManifestContentType::Data,
1434                    sequence_number: 1,
1435                    min_sequence_number: 1,
1436                    added_snapshot_id: 377075049360453639,
1437                    added_files_count: Some(1),
1438                    existing_files_count: Some(0),
1439                    deleted_files_count: Some(0),
1440                    added_rows_count: Some(3),
1441                    existing_rows_count: Some(0),
1442                    deleted_rows_count: Some(0),
1443                    partitions: Some(
1444                        vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1445                    ),
1446                    key_metadata: None,
1447                    first_row_id: None,
1448                },
1449                ManifestFile {
1450                    manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m1.avro".to_string(),
1451                    manifest_length: 6926,
1452                    partition_spec_id: 2,
1453                    content: ManifestContentType::Data,
1454                    sequence_number: 1,
1455                    min_sequence_number: 1,
1456                    added_snapshot_id: 377075049360453639,
1457                    added_files_count: Some(1),
1458                    existing_files_count: Some(0),
1459                    deleted_files_count: Some(0),
1460                    added_rows_count: Some(3),
1461                    existing_rows_count: Some(0),
1462                    deleted_rows_count: Some(0),
1463                    partitions: Some(
1464                        vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::float(1.1).to_bytes().unwrap()), upper_bound: Some(Datum::float(2.1).to_bytes().unwrap())}]
1465                    ),
1466                    key_metadata: None,
1467                    first_row_id: None,
1468                }
1469            ]
1470        };
1471
1472        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1473
1474        let tmp_dir = TempDir::new().unwrap();
1475        let file_name = "simple_manifest_list_v1.avro";
1476        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
1477
1478        let mut writer = ManifestListWriter::v2(
1479            file_io.new_output(full_path.clone()).unwrap(),
1480            1646658105718557341,
1481            Some(1646658105718557341),
1482            1,
1483        );
1484
1485        writer
1486            .add_manifests(manifest_list.entries.clone().into_iter())
1487            .unwrap();
1488        writer.close().await.unwrap();
1489
1490        let bs = fs::read(full_path).expect("read_file must succeed");
1491
1492        let parsed_manifest_list =
1493            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap();
1494
1495        assert_eq!(manifest_list, parsed_manifest_list);
1496    }
1497
1498    #[tokio::test]
1499    async fn test_parse_manifest_list_v3() {
1500        let manifest_list = ManifestList {
1501            entries: vec![
1502                ManifestFile {
1503                    manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
1504                    manifest_length: 6926,
1505                    partition_spec_id: 1,
1506                    content: ManifestContentType::Data,
1507                    sequence_number: 1,
1508                    min_sequence_number: 1,
1509                    added_snapshot_id: 377075049360453639,
1510                    added_files_count: Some(1),
1511                    existing_files_count: Some(0),
1512                    deleted_files_count: Some(0),
1513                    added_rows_count: Some(3),
1514                    existing_rows_count: Some(0),
1515                    deleted_rows_count: Some(0),
1516                    partitions: Some(
1517                        vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1518                    ),
1519                    key_metadata: None,
1520                    first_row_id: Some(10),
1521                },
1522                ManifestFile {
1523                    manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m1.avro".to_string(),
1524                    manifest_length: 6926,
1525                    partition_spec_id: 2,
1526                    content: ManifestContentType::Data,
1527                    sequence_number: 1,
1528                    min_sequence_number: 1,
1529                    added_snapshot_id: 377075049360453639,
1530                    added_files_count: Some(1),
1531                    existing_files_count: Some(0),
1532                    deleted_files_count: Some(0),
1533                    added_rows_count: Some(3),
1534                    existing_rows_count: Some(0),
1535                    deleted_rows_count: Some(0),
1536                    partitions: Some(
1537                        vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::float(1.1).to_bytes().unwrap()), upper_bound: Some(Datum::float(2.1).to_bytes().unwrap())}]
1538                    ),
1539                    key_metadata: None,
1540                    first_row_id: Some(13),
1541                }
1542            ]
1543        };
1544
1545        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1546
1547        let tmp_dir = TempDir::new().unwrap();
1548        let file_name = "simple_manifest_list_v3.avro";
1549        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
1550
1551        let mut writer = ManifestListWriter::v3(
1552            file_io.new_output(full_path.clone()).unwrap(),
1553            377075049360453639,
1554            Some(377075049360453639),
1555            1,
1556            Some(10),
1557        );
1558
1559        writer
1560            .add_manifests(manifest_list.entries.clone().into_iter())
1561            .unwrap();
1562        writer.close().await.unwrap();
1563
1564        let bs = fs::read(full_path).expect("read_file must succeed");
1565
1566        let parsed_manifest_list =
1567            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V3).unwrap();
1568
1569        assert_eq!(manifest_list, parsed_manifest_list);
1570    }
1571
1572    #[test]
1573    fn test_serialize_manifest_list_v1() {
1574        let manifest_list:ManifestListV1 = ManifestList {
1575            entries: vec![ManifestFile {
1576                manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
1577                manifest_length: 5806,
1578                partition_spec_id: 0,
1579                content: ManifestContentType::Data,
1580                sequence_number: 0,
1581                min_sequence_number: 0,
1582                added_snapshot_id: 1646658105718557341,
1583                added_files_count: Some(3),
1584                existing_files_count: Some(0),
1585                deleted_files_count: Some(0),
1586                added_rows_count: Some(3),
1587                existing_rows_count: Some(0),
1588                deleted_rows_count: Some(0),
1589                partitions: None,
1590                key_metadata: None,
1591                first_row_id: None,
1592            }]
1593        }.try_into().unwrap();
1594        let result = serde_json::to_string(&manifest_list).unwrap();
1595        assert_eq!(
1596            result,
1597            r#"[{"manifest_path":"/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro","manifest_length":5806,"partition_spec_id":0,"added_snapshot_id":1646658105718557341,"added_data_files_count":3,"existing_data_files_count":0,"deleted_data_files_count":0,"added_rows_count":3,"existing_rows_count":0,"deleted_rows_count":0,"partitions":null,"key_metadata":null}]"#
1598        );
1599    }
1600
1601    #[test]
1602    fn test_serialize_manifest_list_v2() {
1603        let manifest_list:ManifestListV2 = ManifestList {
1604            entries: vec![ManifestFile {
1605                manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
1606                manifest_length: 6926,
1607                partition_spec_id: 1,
1608                content: ManifestContentType::Data,
1609                sequence_number: 1,
1610                min_sequence_number: 1,
1611                added_snapshot_id: 377075049360453639,
1612                added_files_count: Some(1),
1613                existing_files_count: Some(0),
1614                deleted_files_count: Some(0),
1615                added_rows_count: Some(3),
1616                existing_rows_count: Some(0),
1617                deleted_rows_count: Some(0),
1618                partitions: Some(
1619                    vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1620                ),
1621                key_metadata: None,
1622                first_row_id: None,
1623            }]
1624        }.try_into().unwrap();
1625        let result = serde_json::to_string(&manifest_list).unwrap();
1626        assert_eq!(
1627            result,
1628            r#"[{"manifest_path":"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro","manifest_length":6926,"partition_spec_id":1,"content":0,"sequence_number":1,"min_sequence_number":1,"added_snapshot_id":377075049360453639,"added_files_count":1,"existing_files_count":0,"deleted_files_count":0,"added_rows_count":3,"existing_rows_count":0,"deleted_rows_count":0,"partitions":[{"contains_null":false,"contains_nan":false,"lower_bound":[1,0,0,0,0,0,0,0],"upper_bound":[1,0,0,0,0,0,0,0]}],"key_metadata":null}]"#
1629        );
1630    }
1631
1632    #[test]
1633    fn test_serialize_manifest_list_v3() {
1634        let manifest_list: ManifestListV3 = ManifestList {
1635            entries: vec![ManifestFile {
1636                manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
1637                manifest_length: 6926,
1638                partition_spec_id: 1,
1639                content: ManifestContentType::Data,
1640                sequence_number: 1,
1641                min_sequence_number: 1,
1642                added_snapshot_id: 377075049360453639,
1643                added_files_count: Some(1),
1644                existing_files_count: Some(0),
1645                deleted_files_count: Some(0),
1646                added_rows_count: Some(3),
1647                existing_rows_count: Some(0),
1648                deleted_rows_count: Some(0),
1649                partitions: Some(
1650                    vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1651                ),
1652                key_metadata: None,
1653                first_row_id: Some(10),
1654            }]
1655        }.try_into().unwrap();
1656        let result = serde_json::to_string(&manifest_list).unwrap();
1657        assert_eq!(
1658            result,
1659            r#"[{"manifest_path":"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro","manifest_length":6926,"partition_spec_id":1,"content":0,"sequence_number":1,"min_sequence_number":1,"added_snapshot_id":377075049360453639,"added_files_count":1,"existing_files_count":0,"deleted_files_count":0,"added_rows_count":3,"existing_rows_count":0,"deleted_rows_count":0,"partitions":[{"contains_null":false,"contains_nan":false,"lower_bound":[1,0,0,0,0,0,0,0],"upper_bound":[1,0,0,0,0,0,0,0]}],"key_metadata":null,"first_row_id":10}]"#
1660        );
1661    }
1662
1663    #[tokio::test]
1664    async fn test_manifest_list_writer_v1() {
1665        let expected_manifest_list = ManifestList {
1666            entries: vec![ManifestFile {
1667                manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
1668                manifest_length: 5806,
1669                partition_spec_id: 1,
1670                content: ManifestContentType::Data,
1671                sequence_number: 0,
1672                min_sequence_number: 0,
1673                added_snapshot_id: 1646658105718557341,
1674                added_files_count: Some(3),
1675                existing_files_count: Some(0),
1676                deleted_files_count: Some(0),
1677                added_rows_count: Some(3),
1678                existing_rows_count: Some(0),
1679                deleted_rows_count: Some(0),
1680                partitions: Some(
1681                    vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}],
1682                ),
1683                key_metadata: None,
1684                first_row_id: None,
1685            }]
1686        };
1687
1688        let temp_dir = TempDir::new().unwrap();
1689        let path = temp_dir.path().join("manifest_list_v1.avro");
1690        let io = FileIOBuilder::new_fs_io().build().unwrap();
1691        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1692
1693        let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0));
1694        writer
1695            .add_manifests(expected_manifest_list.entries.clone().into_iter())
1696            .unwrap();
1697        writer.close().await.unwrap();
1698
1699        let bs = fs::read(path).unwrap();
1700
1701        let manifest_list =
1702            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1).unwrap();
1703        assert_eq!(manifest_list, expected_manifest_list);
1704
1705        temp_dir.close().unwrap();
1706    }
1707
1708    #[tokio::test]
1709    async fn test_manifest_list_writer_v2() {
1710        let snapshot_id = 377075049360453639;
1711        let seq_num = 1;
1712        let mut expected_manifest_list = ManifestList {
1713            entries: vec![ManifestFile {
1714                manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
1715                manifest_length: 6926,
1716                partition_spec_id: 1,
1717                content: ManifestContentType::Data,
1718                sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
1719                min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
1720                added_snapshot_id: snapshot_id,
1721                added_files_count: Some(1),
1722                existing_files_count: Some(0),
1723                deleted_files_count: Some(0),
1724                added_rows_count: Some(3),
1725                existing_rows_count: Some(0),
1726                deleted_rows_count: Some(0),
1727                partitions: Some(
1728                    vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1729                ),
1730                key_metadata: None,
1731                first_row_id: None,
1732            }]
1733        };
1734
1735        let temp_dir = TempDir::new().unwrap();
1736        let path = temp_dir.path().join("manifest_list_v2.avro");
1737        let io = FileIOBuilder::new_fs_io().build().unwrap();
1738        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1739
1740        let mut writer = ManifestListWriter::v2(output_file, snapshot_id, Some(0), seq_num);
1741        writer
1742            .add_manifests(expected_manifest_list.entries.clone().into_iter())
1743            .unwrap();
1744        writer.close().await.unwrap();
1745
1746        let bs = fs::read(path).unwrap();
1747        let manifest_list =
1748            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap();
1749        expected_manifest_list.entries[0].sequence_number = seq_num;
1750        expected_manifest_list.entries[0].min_sequence_number = seq_num;
1751        assert_eq!(manifest_list, expected_manifest_list);
1752
1753        temp_dir.close().unwrap();
1754    }
1755
1756    #[tokio::test]
1757    async fn test_manifest_list_writer_v3() {
1758        let snapshot_id = 377075049360453639;
1759        let seq_num = 1;
1760        let mut expected_manifest_list = ManifestList {
1761            entries: vec![ManifestFile {
1762                manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
1763                manifest_length: 6926,
1764                partition_spec_id: 1,
1765                content: ManifestContentType::Data,
1766                sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
1767                min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
1768                added_snapshot_id: snapshot_id,
1769                added_files_count: Some(1),
1770                existing_files_count: Some(0),
1771                deleted_files_count: Some(0),
1772                added_rows_count: Some(3),
1773                existing_rows_count: Some(0),
1774                deleted_rows_count: Some(0),
1775                partitions: Some(
1776                    vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1777                ),
1778                key_metadata: None,
1779                first_row_id: Some(10),
1780            }]
1781        };
1782
1783        let temp_dir = TempDir::new().unwrap();
1784        let path = temp_dir.path().join("manifest_list_v2.avro");
1785        let io = FileIOBuilder::new_fs_io().build().unwrap();
1786        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1787
1788        let mut writer =
1789            ManifestListWriter::v3(output_file, snapshot_id, Some(0), seq_num, Some(10));
1790        writer
1791            .add_manifests(expected_manifest_list.entries.clone().into_iter())
1792            .unwrap();
1793        writer.close().await.unwrap();
1794
1795        let bs = fs::read(path).unwrap();
1796        let manifest_list =
1797            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V3).unwrap();
1798        expected_manifest_list.entries[0].sequence_number = seq_num;
1799        expected_manifest_list.entries[0].min_sequence_number = seq_num;
1800        expected_manifest_list.entries[0].first_row_id = Some(10);
1801        assert_eq!(manifest_list, expected_manifest_list);
1802
1803        temp_dir.close().unwrap();
1804    }
1805
1806    #[tokio::test]
1807    async fn test_manifest_list_writer_v1_as_v2() {
1808        let expected_manifest_list = ManifestList {
1809            entries: vec![ManifestFile {
1810                manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
1811                manifest_length: 5806,
1812                partition_spec_id: 1,
1813                content: ManifestContentType::Data,
1814                sequence_number: 0,
1815                min_sequence_number: 0,
1816                added_snapshot_id: 1646658105718557341,
1817                added_files_count: Some(3),
1818                existing_files_count: Some(0),
1819                deleted_files_count: Some(0),
1820                added_rows_count: Some(3),
1821                existing_rows_count: Some(0),
1822                deleted_rows_count: Some(0),
1823                partitions: Some(
1824                    vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1825                ),
1826                key_metadata: None,
1827                first_row_id: None,
1828            }]
1829        };
1830
1831        let temp_dir = TempDir::new().unwrap();
1832        let path = temp_dir.path().join("manifest_list_v1.avro");
1833        let io = FileIOBuilder::new_fs_io().build().unwrap();
1834        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1835
1836        let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0));
1837        writer
1838            .add_manifests(expected_manifest_list.entries.clone().into_iter())
1839            .unwrap();
1840        writer.close().await.unwrap();
1841
1842        let bs = fs::read(path).unwrap();
1843
1844        let manifest_list =
1845            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap();
1846        assert_eq!(manifest_list, expected_manifest_list);
1847
1848        temp_dir.close().unwrap();
1849    }
1850
1851    #[tokio::test]
1852    async fn test_manifest_list_writer_v1_as_v3() {
1853        let expected_manifest_list = ManifestList {
1854            entries: vec![ManifestFile {
1855                manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
1856                manifest_length: 5806,
1857                partition_spec_id: 1,
1858                content: ManifestContentType::Data,
1859                sequence_number: 0,
1860                min_sequence_number: 0,
1861                added_snapshot_id: 1646658105718557341,
1862                added_files_count: Some(3),
1863                existing_files_count: Some(0),
1864                deleted_files_count: Some(0),
1865                added_rows_count: Some(3),
1866                existing_rows_count: Some(0),
1867                deleted_rows_count: Some(0),
1868                partitions: Some(
1869                    vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1870                ),
1871                key_metadata: None,
1872                first_row_id: None,
1873            }]
1874        };
1875
1876        let temp_dir = TempDir::new().unwrap();
1877        let path = temp_dir.path().join("manifest_list_v1.avro");
1878        let io = FileIOBuilder::new_fs_io().build().unwrap();
1879        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1880
1881        let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0));
1882        writer
1883            .add_manifests(expected_manifest_list.entries.clone().into_iter())
1884            .unwrap();
1885        writer.close().await.unwrap();
1886
1887        let bs = fs::read(path).unwrap();
1888
1889        let manifest_list =
1890            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V3).unwrap();
1891        assert_eq!(manifest_list, expected_manifest_list);
1892
1893        temp_dir.close().unwrap();
1894    }
1895
1896    #[tokio::test]
1897    async fn test_manifest_list_writer_v2_as_v3() {
1898        let snapshot_id = 377075049360453639;
1899        let seq_num = 1;
1900        let mut expected_manifest_list = ManifestList {
1901            entries: vec![ManifestFile {
1902                manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
1903                manifest_length: 6926,
1904                partition_spec_id: 1,
1905                content: ManifestContentType::Data,
1906                sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
1907                min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
1908                added_snapshot_id: snapshot_id,
1909                added_files_count: Some(1),
1910                existing_files_count: Some(0),
1911                deleted_files_count: Some(0),
1912                added_rows_count: Some(3),
1913                existing_rows_count: Some(0),
1914                deleted_rows_count: Some(0),
1915                partitions: Some(
1916                    vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1917                ),
1918                key_metadata: None,
1919                first_row_id: None,
1920            }]
1921        };
1922
1923        let temp_dir = TempDir::new().unwrap();
1924        let path = temp_dir.path().join("manifest_list_v2.avro");
1925        let io = FileIOBuilder::new_fs_io().build().unwrap();
1926        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1927
1928        let mut writer = ManifestListWriter::v2(output_file, snapshot_id, Some(0), seq_num);
1929        writer
1930            .add_manifests(expected_manifest_list.entries.clone().into_iter())
1931            .unwrap();
1932        writer.close().await.unwrap();
1933
1934        let bs = fs::read(path).unwrap();
1935
1936        let manifest_list =
1937            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V3).unwrap();
1938        expected_manifest_list.entries[0].sequence_number = seq_num;
1939        expected_manifest_list.entries[0].min_sequence_number = seq_num;
1940        assert_eq!(manifest_list, expected_manifest_list);
1941
1942        temp_dir.close().unwrap();
1943    }
1944
1945    #[tokio::test]
1946    async fn test_manifest_list_v2_deserializer_aliases() {
1947        // reading avro manifest file generated by iceberg 1.4.0
1948        let avro_1_path = "testdata/manifests_lists/manifest-list-v2-1.avro";
1949        let bs_1 = fs::read(avro_1_path).unwrap();
1950        let avro_1_fields = read_avro_schema_fields_as_str(bs_1.clone()).await;
1951        assert_eq!(
1952            avro_1_fields,
1953            "manifest_path, manifest_length, partition_spec_id, content, sequence_number, min_sequence_number, added_snapshot_id, added_data_files_count, existing_data_files_count, deleted_data_files_count, added_rows_count, existing_rows_count, deleted_rows_count, partitions"
1954        );
1955        // reading avro manifest file generated by iceberg 1.5.0
1956        let avro_2_path = "testdata/manifests_lists/manifest-list-v2-2.avro";
1957        let bs_2 = fs::read(avro_2_path).unwrap();
1958        let avro_2_fields = read_avro_schema_fields_as_str(bs_2.clone()).await;
1959        assert_eq!(
1960            avro_2_fields,
1961            "manifest_path, manifest_length, partition_spec_id, content, sequence_number, min_sequence_number, added_snapshot_id, added_files_count, existing_files_count, deleted_files_count, added_rows_count, existing_rows_count, deleted_rows_count, partitions"
1962        );
1963        // deserializing both files to ManifestList struct
1964        let _manifest_list_1 =
1965            ManifestList::parse_with_version(&bs_1, crate::spec::FormatVersion::V2).unwrap();
1966        let _manifest_list_2 =
1967            ManifestList::parse_with_version(&bs_2, crate::spec::FormatVersion::V2).unwrap();
1968    }
1969
1970    async fn read_avro_schema_fields_as_str(bs: Vec<u8>) -> String {
1971        let reader = Reader::new(&bs[..]).unwrap();
1972        let schema = reader.writer_schema();
1973        let fields: String = match schema {
1974            Schema::Record(record) => record
1975                .fields
1976                .iter()
1977                .map(|field| field.name.clone())
1978                .collect::<Vec<String>>()
1979                .join(", "),
1980            _ => "".to_string(),
1981        };
1982        fields
1983    }
1984
1985    #[test]
1986    fn test_manifest_content_type_default() {
1987        assert_eq!(ManifestContentType::default(), ManifestContentType::Data);
1988    }
1989
1990    #[test]
1991    fn test_manifest_content_type_default_value() {
1992        assert_eq!(ManifestContentType::default() as i32, 0);
1993    }
1994
1995    #[test]
1996    fn test_manifest_file_v1_to_v2_projection() {
1997        use crate::spec::manifest_list::_serde::ManifestFileV1;
1998
1999        // Create a V1 manifest file object (without V2 fields)
2000        let v1_manifest = ManifestFileV1 {
2001            manifest_path: "/test/manifest.avro".to_string(),
2002            manifest_length: 5806,
2003            partition_spec_id: 0,
2004            added_snapshot_id: 1646658105718557341,
2005            added_data_files_count: Some(3),
2006            existing_data_files_count: Some(0),
2007            deleted_data_files_count: Some(0),
2008            added_rows_count: Some(3),
2009            existing_rows_count: Some(0),
2010            deleted_rows_count: Some(0),
2011            partitions: None,
2012            key_metadata: None,
2013        };
2014
2015        // Convert V1 to V2 - this should apply defaults for missing V2 fields
2016        let v2_manifest: ManifestFile = v1_manifest.try_into().unwrap();
2017
2018        // Verify V1→V2 projection defaults are applied correctly
2019        assert_eq!(
2020            v2_manifest.content,
2021            ManifestContentType::Data,
2022            "V1 manifest content should default to Data (0)"
2023        );
2024        assert_eq!(
2025            v2_manifest.sequence_number, 0,
2026            "V1 manifest sequence_number should default to 0"
2027        );
2028        assert_eq!(
2029            v2_manifest.min_sequence_number, 0,
2030            "V1 manifest min_sequence_number should default to 0"
2031        );
2032
2033        // Verify other fields are preserved correctly
2034        assert_eq!(v2_manifest.manifest_path, "/test/manifest.avro");
2035        assert_eq!(v2_manifest.manifest_length, 5806);
2036        assert_eq!(v2_manifest.partition_spec_id, 0);
2037        assert_eq!(v2_manifest.added_snapshot_id, 1646658105718557341);
2038        assert_eq!(v2_manifest.added_files_count, Some(3));
2039        assert_eq!(v2_manifest.existing_files_count, Some(0));
2040        assert_eq!(v2_manifest.deleted_files_count, Some(0));
2041        assert_eq!(v2_manifest.added_rows_count, Some(3));
2042        assert_eq!(v2_manifest.existing_rows_count, Some(0));
2043        assert_eq!(v2_manifest.deleted_rows_count, Some(0));
2044        assert_eq!(v2_manifest.partitions, None);
2045        assert_eq!(v2_manifest.key_metadata, None);
2046    }
2047}