iceberg/spec/manifest/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cmp::min;
19
20use apache_avro::{Writer as AvroWriter, to_value};
21use bytes::Bytes;
22use itertools::Itertools;
23use serde_json::to_vec;
24
25use super::{
26    Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType,
27    UNASSIGNED_SEQUENCE_NUMBER,
28};
29use crate::error::Result;
30use crate::io::OutputFile;
31use crate::spec::manifest::_serde::{ManifestEntryV1, ManifestEntryV2};
32use crate::spec::manifest::{manifest_schema_v1, manifest_schema_v2};
33use crate::spec::{
34    DataContentType, DataFile, FieldSummary, ManifestEntry, ManifestFile, ManifestMetadata,
35    ManifestStatus, PrimitiveLiteral, SchemaRef, StructType, UNASSIGNED_SNAPSHOT_ID,
36};
37use crate::{Error, ErrorKind};
38
39/// The builder used to create a [`ManifestWriter`].
40pub struct ManifestWriterBuilder {
41    output: OutputFile,
42    snapshot_id: Option<i64>,
43    key_metadata: Option<Vec<u8>>,
44    schema: SchemaRef,
45    partition_spec: PartitionSpec,
46}
47
48impl ManifestWriterBuilder {
49    /// Create a new builder.
50    pub fn new(
51        output: OutputFile,
52        snapshot_id: Option<i64>,
53        key_metadata: Option<Vec<u8>>,
54        schema: SchemaRef,
55        partition_spec: PartitionSpec,
56    ) -> Self {
57        Self {
58            output,
59            snapshot_id,
60            key_metadata,
61            schema,
62            partition_spec,
63        }
64    }
65
66    /// Build a [`ManifestWriter`] for format version 1.
67    pub fn build_v1(self) -> ManifestWriter {
68        let metadata = ManifestMetadata::builder()
69            .schema_id(self.schema.schema_id())
70            .schema(self.schema)
71            .partition_spec(self.partition_spec)
72            .format_version(FormatVersion::V1)
73            .content(ManifestContentType::Data)
74            .build();
75        ManifestWriter::new(
76            self.output,
77            self.snapshot_id,
78            self.key_metadata,
79            metadata,
80            None,
81        )
82    }
83
84    /// Build a [`ManifestWriter`] for format version 2, data content.
85    pub fn build_v2_data(self) -> ManifestWriter {
86        let metadata = ManifestMetadata::builder()
87            .schema_id(self.schema.schema_id())
88            .schema(self.schema)
89            .partition_spec(self.partition_spec)
90            .format_version(FormatVersion::V2)
91            .content(ManifestContentType::Data)
92            .build();
93        ManifestWriter::new(
94            self.output,
95            self.snapshot_id,
96            self.key_metadata,
97            metadata,
98            None,
99        )
100    }
101
102    /// Build a [`ManifestWriter`] for format version 2, deletes content.
103    pub fn build_v2_deletes(self) -> ManifestWriter {
104        let metadata = ManifestMetadata::builder()
105            .schema_id(self.schema.schema_id())
106            .schema(self.schema)
107            .partition_spec(self.partition_spec)
108            .format_version(FormatVersion::V2)
109            .content(ManifestContentType::Deletes)
110            .build();
111        ManifestWriter::new(
112            self.output,
113            self.snapshot_id,
114            self.key_metadata,
115            metadata,
116            None,
117        )
118    }
119
120    /// Build a [`ManifestWriter`] for format version 2, data content.
121    pub fn build_v3_data(self) -> ManifestWriter {
122        let metadata = ManifestMetadata::builder()
123            .schema_id(self.schema.schema_id())
124            .schema(self.schema)
125            .partition_spec(self.partition_spec)
126            .format_version(FormatVersion::V3)
127            .content(ManifestContentType::Data)
128            .build();
129        ManifestWriter::new(
130            self.output,
131            self.snapshot_id,
132            self.key_metadata,
133            metadata,
134            // First row id is assigned by the [`ManifestListWriter`] when the manifest
135            // is added to the list.
136            None,
137        )
138    }
139
140    /// Build a [`ManifestWriter`] for format version 3, deletes content.
141    pub fn build_v3_deletes(self) -> ManifestWriter {
142        let metadata = ManifestMetadata::builder()
143            .schema_id(self.schema.schema_id())
144            .schema(self.schema)
145            .partition_spec(self.partition_spec)
146            .format_version(FormatVersion::V3)
147            .content(ManifestContentType::Deletes)
148            .build();
149        ManifestWriter::new(
150            self.output,
151            self.snapshot_id,
152            self.key_metadata,
153            metadata,
154            None,
155        )
156    }
157}
158
159/// A manifest writer.
160pub struct ManifestWriter {
161    output: OutputFile,
162
163    snapshot_id: Option<i64>,
164
165    added_files: u32,
166    added_rows: u64,
167    existing_files: u32,
168    existing_rows: u64,
169    deleted_files: u32,
170    deleted_rows: u64,
171    first_row_id: Option<u64>,
172
173    min_seq_num: Option<i64>,
174
175    key_metadata: Option<Vec<u8>>,
176
177    manifest_entries: Vec<ManifestEntry>,
178
179    metadata: ManifestMetadata,
180}
181
182impl ManifestWriter {
183    /// Create a new manifest writer.
184    pub(crate) fn new(
185        output: OutputFile,
186        snapshot_id: Option<i64>,
187        key_metadata: Option<Vec<u8>>,
188        metadata: ManifestMetadata,
189        first_row_id: Option<u64>,
190    ) -> Self {
191        Self {
192            output,
193            snapshot_id,
194            added_files: 0,
195            added_rows: 0,
196            existing_files: 0,
197            existing_rows: 0,
198            deleted_files: 0,
199            deleted_rows: 0,
200            first_row_id,
201            min_seq_num: None,
202            key_metadata,
203            manifest_entries: Vec::new(),
204            metadata,
205        }
206    }
207
208    fn construct_partition_summaries(
209        &mut self,
210        partition_type: &StructType,
211    ) -> Result<Vec<FieldSummary>> {
212        let mut field_stats: Vec<_> = partition_type
213            .fields()
214            .iter()
215            .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone()))
216            .collect();
217        for partition in self.manifest_entries.iter().map(|e| &e.data_file.partition) {
218            for (literal, stat) in partition.iter().zip_eq(field_stats.iter_mut()) {
219                let primitive_literal = literal.map(|v| v.as_primitive_literal().unwrap());
220                stat.update(primitive_literal)?;
221            }
222        }
223        Ok(field_stats.into_iter().map(|stat| stat.finish()).collect())
224    }
225
226    fn check_data_file(&self, data_file: &DataFile) -> Result<()> {
227        match self.metadata.content {
228            ManifestContentType::Data => {
229                if data_file.content != DataContentType::Data {
230                    return Err(Error::new(
231                        ErrorKind::DataInvalid,
232                        format!(
233                            "Date file at path {} with manifest content type `data`, should have DataContentType `Data`, but has `{:?}`",
234                            data_file.file_path(),
235                            data_file.content
236                        ),
237                    ));
238                }
239            }
240            ManifestContentType::Deletes => {
241                if data_file.content != DataContentType::EqualityDeletes
242                    && data_file.content != DataContentType::PositionDeletes
243                {
244                    return Err(Error::new(
245                        ErrorKind::DataInvalid,
246                        format!(
247                            "Date file at path {} with manifest content type `deletes`, should have DataContentType `Data`, but has `{:?}`",
248                            data_file.file_path(),
249                            data_file.content
250                        ),
251                    ));
252                }
253            }
254        }
255        Ok(())
256    }
257
258    /// Add a new manifest entry. This method will update following status of the entry:
259    /// - Update the entry status to `Added`
260    /// - Set the snapshot id to the current snapshot id
261    /// - Set the sequence number to `None` if it is invalid(smaller than 0)
262    /// - Set the file sequence number to `None`
263    pub(crate) fn add_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
264        self.check_data_file(&entry.data_file)?;
265        if entry.sequence_number().is_some_and(|n| n >= 0) {
266            entry.status = ManifestStatus::Added;
267            entry.snapshot_id = self.snapshot_id;
268            entry.file_sequence_number = None;
269        } else {
270            entry.status = ManifestStatus::Added;
271            entry.snapshot_id = self.snapshot_id;
272            entry.sequence_number = None;
273            entry.file_sequence_number = None;
274        };
275        self.add_entry_inner(entry)?;
276        Ok(())
277    }
278
279    /// Add file as an added entry with a specific sequence number. The entry's snapshot ID will be this manifest's snapshot ID. The entry's data sequence
280    /// number will be the provided data sequence number. The entry's file sequence number will be
281    /// assigned at commit.
282    pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) -> Result<()> {
283        self.check_data_file(&data_file)?;
284        let entry = ManifestEntry {
285            status: ManifestStatus::Added,
286            snapshot_id: self.snapshot_id,
287            sequence_number: (sequence_number >= 0).then_some(sequence_number),
288            file_sequence_number: None,
289            data_file,
290        };
291        self.add_entry_inner(entry)?;
292        Ok(())
293    }
294
295    /// Add a delete manifest entry. This method will update following status of the entry:
296    /// - Update the entry status to `Deleted`
297    /// - Set the snapshot id to the current snapshot id
298    ///
299    /// # TODO
300    /// Remove this allow later
301    #[allow(dead_code)]
302    pub(crate) fn add_delete_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
303        self.check_data_file(&entry.data_file)?;
304        entry.status = ManifestStatus::Deleted;
305        entry.snapshot_id = self.snapshot_id;
306        self.add_entry_inner(entry)?;
307        Ok(())
308    }
309
310    /// Add a file as delete manifest entry. The entry's snapshot ID will be this manifest's snapshot ID.
311    /// However, the original data and file sequence numbers of the file must be preserved when
312    /// the file is marked as deleted.
313    pub fn add_delete_file(
314        &mut self,
315        data_file: DataFile,
316        sequence_number: i64,
317        file_sequence_number: Option<i64>,
318    ) -> Result<()> {
319        self.check_data_file(&data_file)?;
320        let entry = ManifestEntry {
321            status: ManifestStatus::Deleted,
322            snapshot_id: self.snapshot_id,
323            sequence_number: Some(sequence_number),
324            file_sequence_number,
325            data_file,
326        };
327        self.add_entry_inner(entry)?;
328        Ok(())
329    }
330
331    /// Add an existing manifest entry. This method will update following status of the entry:
332    /// - Update the entry status to `Existing`
333    ///
334    /// # TODO
335    /// Remove this allow later
336    #[allow(dead_code)]
337    pub(crate) fn add_existing_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
338        self.check_data_file(&entry.data_file)?;
339        entry.status = ManifestStatus::Existing;
340        self.add_entry_inner(entry)?;
341        Ok(())
342    }
343
344    /// Add an file as existing manifest entry. The original data and file sequence numbers, snapshot ID,
345    /// which were assigned at commit, must be preserved when adding an existing entry.
346    pub fn add_existing_file(
347        &mut self,
348        data_file: DataFile,
349        snapshot_id: i64,
350        sequence_number: i64,
351        file_sequence_number: Option<i64>,
352    ) -> Result<()> {
353        self.check_data_file(&data_file)?;
354        let entry = ManifestEntry {
355            status: ManifestStatus::Existing,
356            snapshot_id: Some(snapshot_id),
357            sequence_number: Some(sequence_number),
358            file_sequence_number,
359            data_file,
360        };
361        self.add_entry_inner(entry)?;
362        Ok(())
363    }
364
365    fn add_entry_inner(&mut self, entry: ManifestEntry) -> Result<()> {
366        // Check if the entry has sequence number
367        if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing)
368            && (entry.sequence_number.is_none() || entry.file_sequence_number.is_none())
369        {
370            return Err(Error::new(
371                ErrorKind::DataInvalid,
372                "Manifest entry with status Existing or Deleted should have sequence number",
373            ));
374        }
375
376        // Update the statistics
377        match entry.status {
378            ManifestStatus::Added => {
379                self.added_files += 1;
380                self.added_rows += entry.data_file.record_count;
381            }
382            ManifestStatus::Deleted => {
383                self.deleted_files += 1;
384                self.deleted_rows += entry.data_file.record_count;
385            }
386            ManifestStatus::Existing => {
387                self.existing_files += 1;
388                self.existing_rows += entry.data_file.record_count;
389            }
390        }
391        if entry.is_alive() {
392            if let Some(seq_num) = entry.sequence_number {
393                self.min_seq_num = Some(self.min_seq_num.map_or(seq_num, |v| min(v, seq_num)));
394            }
395        }
396        self.manifest_entries.push(entry);
397        Ok(())
398    }
399
400    /// Write manifest file and return it.
401    pub async fn write_manifest_file(mut self) -> Result<ManifestFile> {
402        // Create the avro writer
403        let partition_type = self
404            .metadata
405            .partition_spec
406            .partition_type(&self.metadata.schema)?;
407        let table_schema = &self.metadata.schema;
408        let avro_schema = match self.metadata.format_version {
409            FormatVersion::V1 => manifest_schema_v1(&partition_type)?,
410            // Manifest schema did not change between V2 and V3
411            FormatVersion::V2 | FormatVersion::V3 => manifest_schema_v2(&partition_type)?,
412        };
413        let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new());
414        avro_writer.add_user_metadata(
415            "schema".to_string(),
416            to_vec(table_schema).map_err(|err| {
417                Error::new(ErrorKind::DataInvalid, "Fail to serialize table schema")
418                    .with_source(err)
419            })?,
420        )?;
421        avro_writer.add_user_metadata(
422            "schema-id".to_string(),
423            table_schema.schema_id().to_string(),
424        )?;
425        avro_writer.add_user_metadata(
426            "partition-spec".to_string(),
427            to_vec(&self.metadata.partition_spec.fields()).map_err(|err| {
428                Error::new(ErrorKind::DataInvalid, "Fail to serialize partition spec")
429                    .with_source(err)
430            })?,
431        )?;
432        avro_writer.add_user_metadata(
433            "partition-spec-id".to_string(),
434            self.metadata.partition_spec.spec_id().to_string(),
435        )?;
436        avro_writer.add_user_metadata(
437            "format-version".to_string(),
438            (self.metadata.format_version as u8).to_string(),
439        )?;
440        if self.metadata.format_version == FormatVersion::V2 {
441            avro_writer
442                .add_user_metadata("content".to_string(), self.metadata.content.to_string())?;
443        }
444
445        let partition_summary = self.construct_partition_summaries(&partition_type)?;
446        // Write manifest entries
447        for entry in std::mem::take(&mut self.manifest_entries) {
448            let value = match self.metadata.format_version {
449                FormatVersion::V1 => to_value(ManifestEntryV1::try_from(entry, &partition_type)?)?
450                    .resolve(&avro_schema)?,
451                // Manifest entry format did not change between V2 and V3
452                FormatVersion::V2 | FormatVersion::V3 => {
453                    to_value(ManifestEntryV2::try_from(entry, &partition_type)?)?
454                        .resolve(&avro_schema)?
455                }
456            };
457
458            avro_writer.append(value)?;
459        }
460
461        let content = avro_writer.into_inner()?;
462        let length = content.len();
463        self.output.write(Bytes::from(content)).await?;
464
465        Ok(ManifestFile {
466            manifest_path: self.output.location().to_string(),
467            manifest_length: length as i64,
468            partition_spec_id: self.metadata.partition_spec.spec_id(),
469            content: self.metadata.content,
470            // sequence_number and min_sequence_number with UNASSIGNED_SEQUENCE_NUMBER will be replace with
471            // real sequence number in `ManifestListWriter`.
472            sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
473            min_sequence_number: self.min_seq_num.unwrap_or(UNASSIGNED_SEQUENCE_NUMBER),
474            added_snapshot_id: self.snapshot_id.unwrap_or(UNASSIGNED_SNAPSHOT_ID),
475            added_files_count: Some(self.added_files),
476            existing_files_count: Some(self.existing_files),
477            deleted_files_count: Some(self.deleted_files),
478            added_rows_count: Some(self.added_rows),
479            existing_rows_count: Some(self.existing_rows),
480            deleted_rows_count: Some(self.deleted_rows),
481            partitions: Some(partition_summary),
482            key_metadata: self.key_metadata,
483            first_row_id: self.first_row_id,
484        })
485    }
486}
487
488struct PartitionFieldStats {
489    partition_type: PrimitiveType,
490
491    contains_null: bool,
492    contains_nan: Option<bool>,
493    lower_bound: Option<Datum>,
494    upper_bound: Option<Datum>,
495}
496
497impl PartitionFieldStats {
498    pub(crate) fn new(partition_type: PrimitiveType) -> Self {
499        Self {
500            partition_type,
501            contains_null: false,
502            contains_nan: Some(false),
503            upper_bound: None,
504            lower_bound: None,
505        }
506    }
507
508    pub(crate) fn update(&mut self, value: Option<PrimitiveLiteral>) -> Result<()> {
509        let Some(value) = value else {
510            self.contains_null = true;
511            return Ok(());
512        };
513        if !self.partition_type.compatible(&value) {
514            return Err(Error::new(
515                ErrorKind::DataInvalid,
516                "value is not compatible with type",
517            ));
518        }
519        let value = Datum::new(self.partition_type.clone(), value);
520
521        if value.is_nan() {
522            self.contains_nan = Some(true);
523            return Ok(());
524        }
525
526        self.lower_bound = Some(self.lower_bound.take().map_or(value.clone(), |original| {
527            if value < original {
528                value.clone()
529            } else {
530                original
531            }
532        }));
533        self.upper_bound = Some(self.upper_bound.take().map_or(value.clone(), |original| {
534            if value > original { value } else { original }
535        }));
536
537        Ok(())
538    }
539
540    pub(crate) fn finish(self) -> FieldSummary {
541        FieldSummary {
542            contains_null: self.contains_null,
543            contains_nan: self.contains_nan,
544            upper_bound: self.upper_bound.map(|v| v.to_bytes().unwrap()),
545            lower_bound: self.lower_bound.map(|v| v.to_bytes().unwrap()),
546        }
547    }
548}
549
550#[cfg(test)]
551mod tests {
552    use std::collections::HashMap;
553    use std::fs;
554    use std::sync::Arc;
555
556    use tempfile::TempDir;
557
558    use super::*;
559    use crate::io::FileIOBuilder;
560    use crate::spec::{DataFileFormat, Manifest, NestedField, PrimitiveType, Schema, Struct, Type};
561
562    #[tokio::test]
563    async fn test_add_delete_existing() {
564        let schema = Arc::new(
565            Schema::builder()
566                .with_fields(vec![
567                    Arc::new(NestedField::optional(
568                        1,
569                        "id",
570                        Type::Primitive(PrimitiveType::Int),
571                    )),
572                    Arc::new(NestedField::optional(
573                        2,
574                        "name",
575                        Type::Primitive(PrimitiveType::String),
576                    )),
577                ])
578                .build()
579                .unwrap(),
580        );
581        let metadata = ManifestMetadata {
582            schema_id: 0,
583            schema: schema.clone(),
584            partition_spec: PartitionSpec::builder(schema)
585                .with_spec_id(0)
586                .build()
587                .unwrap(),
588            content: ManifestContentType::Data,
589            format_version: FormatVersion::V2,
590        };
591        let mut entries = vec![
592                ManifestEntry {
593                    status: ManifestStatus::Added,
594                    snapshot_id: None,
595                    sequence_number: Some(1),
596                    file_sequence_number: Some(1),
597                    data_file: DataFile {
598                        content: DataContentType::Data,
599                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
600                        file_format: DataFileFormat::Parquet,
601                        partition: Struct::empty(),
602                        record_count: 1,
603                        file_size_in_bytes: 5442,
604                        column_sizes: HashMap::from([(1, 61), (2, 73)]),
605                        value_counts: HashMap::from([(1, 1), (2, 1)]),
606                        null_value_counts: HashMap::from([(1, 0), (2, 0)]),
607                        nan_value_counts: HashMap::new(),
608                        lower_bounds: HashMap::new(),
609                        upper_bounds: HashMap::new(),
610                        key_metadata: Some(Vec::new()),
611                        split_offsets: vec![4],
612                        equality_ids: None,
613                        sort_order_id: None,
614                        partition_spec_id: 0,
615                        first_row_id: None,
616                        referenced_data_file: None,
617                        content_offset: None,
618                        content_size_in_bytes: None,
619                    },
620                },
621                ManifestEntry {
622                    status: ManifestStatus::Deleted,
623                    snapshot_id: Some(1),
624                    sequence_number: Some(1),
625                    file_sequence_number: Some(1),
626                    data_file: DataFile {
627                        content: DataContentType::Data,
628                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
629                        file_format: DataFileFormat::Parquet,
630                        partition: Struct::empty(),
631                        record_count: 1,
632                        file_size_in_bytes: 5442,
633                        column_sizes: HashMap::from([(1, 61), (2, 73)]),
634                        value_counts: HashMap::from([(1, 1), (2, 1)]),
635                        null_value_counts: HashMap::from([(1, 0), (2, 0)]),
636                        nan_value_counts: HashMap::new(),
637                        lower_bounds: HashMap::new(),
638                        upper_bounds: HashMap::new(),
639                        key_metadata: Some(Vec::new()),
640                        split_offsets: vec![4],
641                        equality_ids: None,
642                        sort_order_id: None,
643                        partition_spec_id: 0,
644                        first_row_id: None,
645                        referenced_data_file: None,
646                        content_offset: None,
647                        content_size_in_bytes: None,
648                    },
649                },
650                ManifestEntry {
651                    status: ManifestStatus::Existing,
652                    snapshot_id: Some(1),
653                    sequence_number: Some(1),
654                    file_sequence_number: Some(1),
655                    data_file: DataFile {
656                        content: DataContentType::Data,
657                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
658                        file_format: DataFileFormat::Parquet,
659                        partition: Struct::empty(),
660                        record_count: 1,
661                        file_size_in_bytes: 5442,
662                        column_sizes: HashMap::from([(1, 61), (2, 73)]),
663                        value_counts: HashMap::from([(1, 1), (2, 1)]),
664                        null_value_counts: HashMap::from([(1, 0), (2, 0)]),
665                        nan_value_counts: HashMap::new(),
666                        lower_bounds: HashMap::new(),
667                        upper_bounds: HashMap::new(),
668                        key_metadata: Some(Vec::new()),
669                        split_offsets: vec![4],
670                        equality_ids: None,
671                        sort_order_id: None,
672                        partition_spec_id: 0,
673                        first_row_id: None,
674                        referenced_data_file: None,
675                        content_offset: None,
676                        content_size_in_bytes: None,
677                    },
678                },
679            ];
680
681        // write manifest to file
682        let tmp_dir = TempDir::new().unwrap();
683        let path = tmp_dir.path().join("test_manifest.avro");
684        let io = FileIOBuilder::new_fs_io().build().unwrap();
685        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
686        let mut writer = ManifestWriterBuilder::new(
687            output_file,
688            Some(3),
689            None,
690            metadata.schema.clone(),
691            metadata.partition_spec.clone(),
692        )
693        .build_v2_data();
694        writer.add_entry(entries[0].clone()).unwrap();
695        writer.add_delete_entry(entries[1].clone()).unwrap();
696        writer.add_existing_entry(entries[2].clone()).unwrap();
697        writer.write_manifest_file().await.unwrap();
698
699        // read back the manifest file and check the content
700        let actual_manifest =
701            Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
702                .unwrap();
703
704        // The snapshot id is assigned when the entry is added and delete to the manifest. Existing entries are keep original.
705        entries[0].snapshot_id = Some(3);
706        entries[1].snapshot_id = Some(3);
707        // file sequence number is assigned to None when the entry is added and delete to the manifest.
708        entries[0].file_sequence_number = None;
709        assert_eq!(actual_manifest, Manifest::new(metadata, entries));
710    }
711}